Skip to content

feat(redis-worker): batched pop in MollifierDrainer for fast single-env drains#3797

Open
d-cs wants to merge 14 commits into
mainfrom
feat/mollifier-drain-batch
Open

feat(redis-worker): batched pop in MollifierDrainer for fast single-env drains#3797
d-cs wants to merge 14 commits into
mainfrom
feat/mollifier-drain-batch

Conversation

@d-cs
Copy link
Copy Markdown
Collaborator

@d-cs d-cs commented Jun 1, 2026

Summary

Adds drainBatchSize to MollifierDrainer (default 1 — preserves existing behaviour) and wires TRIGGER_MOLLIFIER_DRAIN_BATCH_SIZE through the webapp (default 50). Each tick the drainer now pops up to drainBatchSize from each chosen env, then dispatches every popped entry through the shared concurrency-bounded pLimit. Per-org/per-env fairness is unchanged — only the in-env pop count grows.

Pre-existing behaviour was one pop per env per tick. For a single-env burst that single-flighted the drain at the per-tick floor of pop + engine.trigger ≈ 50–60 ms. With buffer entries piling up under a real-world tenant burst that's tens of minutes of tail latency to fully materialise — even though PG itself could comfortably sustain the writes.

Why this matters — heavy-tail illustration

Scenario: 100 customers in one window — 94 fire 20 triggers each, 5 fire 100, 1 fires 1000. Gate at THRESHOLD=10/s, HOLD_MS=500. First 10 of each burst hit PG directly; the rest buffer.

Customers Triggers each PG direct Buffered each Total buffered
94 small 20 10 10 940
5 medium 100 10 90 450
1 heavy 1000 10 990 990

With DRAIN_BATCH_SIZE=50, DRAIN_CONCURRENCY=50, ~50 ms engine.trigger:

Tick Pops Dispatch waves Wall-clock
1 94×10 + 5×50 + 1×50 = 1 240 25 × 50 ms ~1 300 ms (94 smalls done)
2 5×40 + 1×50 = 250 5 × 50 ms ~300 ms (5 mediums done)
3–20 heavy alone, 50/tick 1 × 50 ms ~100 ms each
Customer class Buffered fully drained
94 small ~1.3 s
5 medium ~1.6 s
1 heavy ~3.4 s

Without batching (one pop per env per tick — current behaviour):

Customer class Buffered fully drained
94 small ~500 ms
5 medium ~4.5 s
1 heavy ~49 s

So the heavy single-tenant tail drops from ~49 s to ~3.4 s (~14× faster) without changing PG load characteristics. Smalls go up slightly in this scenario (500 ms → 1.3 s) because all 100 envs share one tick's dispatch queue — that's the trade we accept for the heavy tail; the worst-case small wait is still inside one tick. PG load is identical either way (50 concurrent inserts at a time, capped by DRAIN_CONCURRENCY).

What changed

packages/redis-worker

  • New drainBatchSize option (default 1 — full backward compat).
  • runOnce() refactored to pop per-env batches in parallel, then dispatch all popped entries through the existing global pLimit. Mid-batch pop failure aborts only that env's batch and counts as one failure (same semantic as the old per-env path).
  • Removed the now-unused processOneFromEnv helper.

apps/webapp

  • TRIGGER_MOLLIFIER_DRAIN_BATCH_SIZE env var (default 50, matching DRAIN_CONCURRENCY).
  • Wired into mollifierDrainer.server.ts.

Test cloud config (separate cloud PR): TRIGGER_MOLLIFIER_DRAIN_BATCH_SIZE="50" on the worker service. Production rollout deferred until we've watched it on test cloud.

Test plan

  • All 25 stub-based drainer tests pass (18 pre-existing + 7 new). 7 new tests under MollifierDrainer.drainBatchSize:
    • pops up to drainBatchSize across ticks
    • global concurrency cap still holds when batch > concurrency
    • mid-batch pop failure isolation
    • multi-env batch fan-out in one tick
    • hierarchical org fairness preserved at drainBatchSize > 1 (load-bearing — guards against future regressions to per-env-instead-of-per-org rotation)
    • mixed success/failure accounting in a batched tick
    • bounded pops on empty queue (no Lua spam past drainBatchSize)
  • All pre-existing tests still pass unchanged at default drainBatchSize=1 → backward-compat locked.
  • pnpm run build --filter @trigger.dev/redis-worker clean.
  • pnpm run typecheck --filter webapp clean.
  • redisTest block (real Redis via testcontainers) — couldn't run locally on this branch due to testcontainers runtime discovery; will validate in CI.
  • Test-cloud smoke after cloud PR lands: fire burst 50 against a flagged env and confirm the 50th entry's drain time drops from ~2.5 s to <200 ms.

Notes

  • Per-tick memory bound: maxOrgsPerTick × drainBatchSize entries can sit in the JS pLimit queue between pop and dispatch. At defaults that's 500 × 50 = 25 000 × ~5 KB snapshot ≈ ~125 MB worst case per worker — well within headroom.
  • The pre-batch model's strict per-env throughput cap of 1/tick is documented as the fairness baseline elsewhere. Org-level fairness is what callers actually rely on; this change does not weaken that.

🤖 Generated with Claude Code

d-cs and others added 3 commits June 1, 2026 16:52
Design for an autonomous, containerised pipeline that validates and
proposes production-grade fixes for ~250 deepsec security findings
tracked in Linear, with local-only review and no GitHub exposure
until disclosure timing is decided. Implementation lives in a
separate repository.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
End-to-end plumbing plan — new repo at ~/Development/sec-fix-pipeline/,
worker daemon, stub agent container, MinIO artifact storage, Linear
label state machine. Stub agent only; Phases 2-6 (real agent, full
stack, durability, resumable runs, dashboard) tracked separately.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Adds a drainBatchSize option (default 1, preserves existing behaviour)
that lets the drainer pop up to N entries from each chosen env per tick
and dispatch them all through the shared concurrency-bounded limiter.
Org/env fairness is preserved — the per-tick env selection is
unchanged, only the in-env pop count grows. Wires
TRIGGER_MOLLIFIER_DRAIN_BATCH_SIZE through the webapp (default 50).

For a single-env burst of K entries with K > 1, drain time drops from
K × tick_time to ceil(K / drainBatchSize) × tick_time with handler
parallelism capped at concurrency. Heavy single-tenant tails go from
minutes to tens of seconds without changing PG load characteristics.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
@changeset-bot
Copy link
Copy Markdown

changeset-bot Bot commented Jun 1, 2026

🦋 Changeset detected

Latest commit: d38b449

The changes in this PR will be included in the next version bump.

This PR includes changesets to release 32 packages
Name Type
@trigger.dev/redis-worker Patch
@internal/run-engine Patch
@internal/schedule-engine Patch
@trigger.dev/build Patch
@trigger.dev/core Patch
@trigger.dev/plugins Patch
@trigger.dev/python Patch
@trigger.dev/react-hooks Patch
@trigger.dev/rsc Patch
@trigger.dev/schema-to-json Patch
@trigger.dev/sdk Patch
@trigger.dev/database Patch
@trigger.dev/otlp-importer Patch
@trigger.dev/rbac Patch
trigger.dev Patch
references-ai-chat Patch
d3-chat Patch
references-d3-openai-agents Patch
@internal/cache Patch
@internal/clickhouse Patch
@internal/llm-model-catalog Patch
@internal/redis Patch
@internal/replication Patch
@internal/testcontainers Patch
@internal/tracing Patch
@internal/tsql Patch
@internal/zod-worker Patch
references-nextjs-realtime Patch
references-realtime-hooks-test Patch
references-realtime-streams Patch
@internal/sdk-compat-tests Patch
references-telemetry Patch

Not sure what this means? Click here to learn what changesets are.

Click here if you're a maintainer who wants to add another changeset to this PR

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Jun 1, 2026

Review Change Stack

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review

Walkthrough

This PR adds a per-environment batching option to MollifierDrainer. A new option/env var (TRIGGER_MOLLIFIER_DRAIN_BATCH_SIZE, server default 50) is wired into initialization and logging. The drainer gains an optional drainBatchSize option and refactors runOnce to a worker-pool that enforces per-env pop budgets, catches buffer.pop failures per-env, processes each entry with inline try/catch, and returns aggregated drained/failed counts. The per-env helper and p-limit usage are removed. Tests and changeset/server-changes docs were added.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly and specifically summarizes the main change: adding batched pop functionality to MollifierDrainer to improve single-environment drain performance.
Description check ✅ Passed The description provides comprehensive context including a summary, performance impact analysis, test plan coverage, and architectural notes; it aligns well with the template requirements despite some sections being custom content rather than template-prescribed fields.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/mollifier-drain-batch

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@mintlify
Copy link
Copy Markdown
Contributor

mintlify Bot commented Jun 1, 2026

Preview deployment for your docs. Learn more about Mintlify Previews.

Project Status Preview Updated (UTC)
trigger 🟢 Ready View Preview Jun 1, 2026, 7:24 PM

💡 Tip: Enable Workflows to automatically generate PRs for you.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
coderabbitai[bot]

This comment was marked as resolved.

coderabbitai[bot]

This comment was marked as resolved.

Addresses CodeRabbit review: the prefetched-pop tick design moved every
popped entry into DRAINING before any of them got a pLimit slot, so a
process crash mid-tick stranded up to maxOrgsPerTick × drainBatchSize
entries for stale-sweep to recover (~25k at defaults).

Replaces it with a worker-pool: spawn min(concurrency, totalBudget)
workers; each worker round-robin-picks an env with budget remaining,
pops one entry, processes it, releases its slot. At any moment, the
count of popped-but-not-acked entries is bounded by `concurrency` —
identical safety to the pre-batch one-pop-per-env path — while a
single-env burst still uses the full concurrency budget (all workers
can pull from the same env).

Adds a regression test pinning the bound: never has more than
`concurrency` entries popped-but-not-acked at any moment.

Two existing batch tests now use concurrency=1 to isolate the
break-on-empty/error semantic from the worker-pool's parallel-pick
race (the race semantic itself is covered by the new safety test).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
devin-ai-integration[bot]

This comment was marked as resolved.

d-cs and others added 2 commits June 1, 2026 20:42
…-tolerant assertions

The two batch tests that exercise pop-failure / queue-empty behaviour
were temporarily set to concurrency=1 to dodge the worker pool's
parallel-pick race. That collapsed the worker pool and stopped the
tests from validating their semantics under genuine concurrency.

Restored concurrency=5 (matching the rest of the suite) and switched
the non-deterministic counts to bounded-range assertions:

- mid-batch pop failure: actual drained entries are deterministic (the
  two bad pops + one good pop); failure count is in [1, concurrency]
  because workers that loop after a sibling's empty/null pop can re-pop
  the broken env before skip.add propagates. envBadPops is bounded by
  drainBatchSize + concurrency — the property is "bounded retry", not
  "exactly one".

- stops popping early: popCalls in [3, concurrency + 2] and strictly
  less than drainBatchSize — the property is "we don't pop all the way
  to the batch ceiling once the queue empties".

These bounds are tight enough to catch a regression to unbounded pops
while tolerating the legitimate race between worker iterations.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
coderabbitai[bot]

This comment was marked as resolved.

d-cs and others added 2 commits June 1, 2026 20:46
Multiple workers can race past pickNextEnv into the same env before
skip propagates from the first failing pop. With the prior unguarded
`failed += 1` each racing worker bumped the count, so a single broken
env could contribute up to `concurrency` failures in one tick — drifting
from the documented "one failure per env batch" contract.

Guard the increment on `!skip.has(envId)` so the per-env failure count
is exactly one regardless of race. Tightens the test assertion from
"in [1, concurrency]" to "=== 1".

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
@d-cs d-cs marked this pull request as ready for review June 1, 2026 19:47
d-cs and others added 3 commits June 2, 2026 09:22
… entries

Adds a Redis sorted set `mollifier:draining` mirroring entries currently
in DRAINING state (popped by the drainer, not yet acked/failed/requeued),
scored by pop wall-clock millis. Maintained atomically with the existing
per-entry status transitions:

  - popAndMarkDraining → ZADD score=now-ms
  - ackMollifierEntry → ZREM
  - failMollifierEntry → ZREM
  - requeueMollifierEntry → ZREM

Each pre-existing Lua picks up one extra Redis op; ack/fail also gain a
runId arg so they can ZREM without a hash read. Buffer exposes:

  - getDrainingCount(): ZCARD — gauge value
  - listStaleDraining(olderThanMs, limit): ZRANGEBYSCORE — forensics
    after an ECS OOM ("which entries were stranded?")

NOT load-bearing for correctness — per-entry hash still carries status,
stale-sweep still scans queue LISTs. The set is a fast top-level index
so a wiped/out-of-date set just over-reports the gauge; recovery paths
are untouched. A test pins this graceful-degradation invariant.

Wires `mollifier.draining.current` ObservableGauge polled every 15s on
the drainer worker pods. unref'd setInterval so it can't block graceful
shutdown; idempotent under dev hot-reload. Test seam exported for unit
testing without spinning a real OTel meter.

Tests:
  - 7 redisTest cases in buffer.test.ts (lifecycle on every Lua boundary,
    requeue-and-repop score replacement, listStaleDraining cutoff/limit,
    graceful-degradation when set is wiped)
  - 6 unit tests in webapp for the gauge poller (eager fire, cadence,
    null buffer no-op, transient-error survives, idempotent start,
    stop halts loop)

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
`mollifier-buffer-extensions.md` and
`mollifier-drainer-terminal-failure-callback.md` describe changes that
have already shipped on prior merged PRs; carrying them on this branch
would double-publish in the next release.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants