diff --git a/.changeset/mollifier-buffer-extensions.md b/.changeset/mollifier-buffer-extensions.md deleted file mode 100644 index c2a3b1a0e8..0000000000 --- a/.changeset/mollifier-buffer-extensions.md +++ /dev/null @@ -1,5 +0,0 @@ ---- -"@trigger.dev/redis-worker": minor ---- - -Mollifier buffer extensions: idempotency dedup, an atomic `mutateSnapshot` API, metadata CAS, claim primitives, and a `MollifierSnapshot` type. The buffer's Redis client now reconnects with jittered backoff so a fleet of clients doesn't stampede Redis in lockstep after a blip. diff --git a/.changeset/mollifier-drain-batch-size.md b/.changeset/mollifier-drain-batch-size.md new file mode 100644 index 0000000000..9e848b5011 --- /dev/null +++ b/.changeset/mollifier-drain-batch-size.md @@ -0,0 +1,5 @@ +--- +"@trigger.dev/redis-worker": patch +--- + +`MollifierDrainer` accepts a `drainBatchSize` option (default 1) that controls how many entries are popped per env per tick — in-flight handlers remain capped by the global `concurrency`. `MollifierBuffer` also gains `getDrainingCount()` / `listStaleDraining()`, backed by a new `mollifier:draining` ZSET maintained atomically with pop/ack/fail/requeue (observability-only). diff --git a/.changeset/mollifier-drainer-terminal-failure-callback.md b/.changeset/mollifier-drainer-terminal-failure-callback.md deleted file mode 100644 index e0ac3400ff..0000000000 --- a/.changeset/mollifier-drainer-terminal-failure-callback.md +++ /dev/null @@ -1,5 +0,0 @@ ---- -"@trigger.dev/redis-worker": minor ---- - -Add `onTerminalFailure` callback to `MollifierDrainerOptions` so the customer's run lands a SYSTEM_FAILURE PG row even when the drainer exhausts `maxAttempts` on a retryable PG error. Previously, retryable-error exhaustion called `buffer.fail()` directly, which atomically marks FAILED + DELs the entry hash with no PG write — silent data loss when PG was unreachable across the full retry budget. The callback fires before `buffer.fail()` on any terminal path (`cause: "non-retryable"` or `"max-attempts-exhausted"`); throwing a retryable error from the callback causes the drainer to requeue rather than fail. diff --git a/.server-changes/mollifier-drain-batch-size.md b/.server-changes/mollifier-drain-batch-size.md new file mode 100644 index 0000000000..ddb6845f63 --- /dev/null +++ b/.server-changes/mollifier-drain-batch-size.md @@ -0,0 +1,6 @@ +--- +area: webapp +type: improvement +--- + +Wire `TRIGGER_MOLLIFIER_DRAIN_BATCH_SIZE` (default 50) so single-env bursts drain at the full `DRAIN_CONCURRENCY` budget per tick instead of one entry per tick. Also expose `mollifier.draining.current` ObservableGauge (polled every 15s on drainer pods) for in-flight DRAINING entries. diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index dafd67124b..a3041f4c7e 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -1101,6 +1101,16 @@ const EnvironmentSchema = z TRIGGER_MOLLIFIER_DRAIN_MAX_ATTEMPTS: z.coerce.number().int().positive().default(3), TRIGGER_MOLLIFIER_DRAIN_SHUTDOWN_TIMEOUT_MS: z.coerce.number().int().positive().default(30_000), TRIGGER_MOLLIFIER_DRAIN_MAX_ORGS_PER_TICK: z.coerce.number().int().positive().default(500), + // Per-env per-tick pop cap. The drainer rotates one env per org per + // tick; this bounds how many entries it pops from that env before + // dispatching them through the shared `DRAIN_CONCURRENCY`-bounded + // limiter. Default matches `DRAIN_CONCURRENCY` so a single-env burst + // uses the full handler-parallelism budget — for 20k buffered on one + // env this is the difference between ~17m (one-pop-per-tick × ~50ms) + // and ~20s (400 ticks × concurrent engine.trigger). Org/env fairness + // is preserved because the per-tick env selection is unchanged; only + // the in-env pop count grows. + TRIGGER_MOLLIFIER_DRAIN_BATCH_SIZE: z.coerce.number().int().positive().default(50), // Periodic sweep that scans buffer queue LISTs for entries whose // dwell exceeds the stale threshold. Independent of the drainer — // its job is exactly to make a stuck/offline drainer visible to diff --git a/apps/webapp/app/v3/mollifier/mollifierDrainer.server.ts b/apps/webapp/app/v3/mollifier/mollifierDrainer.server.ts index 26ac60f180..1b64da3345 100644 --- a/apps/webapp/app/v3/mollifier/mollifierDrainer.server.ts +++ b/apps/webapp/app/v3/mollifier/mollifierDrainer.server.ts @@ -72,6 +72,7 @@ function initializeMollifierDrainer(): MollifierDrainer { logger.debug("Initializing mollifier drainer", { concurrency: env.TRIGGER_MOLLIFIER_DRAIN_CONCURRENCY, maxAttempts: env.TRIGGER_MOLLIFIER_DRAIN_MAX_ATTEMPTS, + drainBatchSize: env.TRIGGER_MOLLIFIER_DRAIN_BATCH_SIZE, }); const drainer = new MollifierDrainer({ @@ -81,6 +82,7 @@ function initializeMollifierDrainer(): MollifierDrainer { concurrency: env.TRIGGER_MOLLIFIER_DRAIN_CONCURRENCY, maxAttempts: env.TRIGGER_MOLLIFIER_DRAIN_MAX_ATTEMPTS, maxOrgsPerTick: env.TRIGGER_MOLLIFIER_DRAIN_MAX_ORGS_PER_TICK, + drainBatchSize: env.TRIGGER_MOLLIFIER_DRAIN_BATCH_SIZE, isRetryable: isRetryablePgError, }); diff --git a/apps/webapp/app/v3/mollifier/mollifierDrainingGauge.server.ts b/apps/webapp/app/v3/mollifier/mollifierDrainingGauge.server.ts new file mode 100644 index 0000000000..eda8f45ebf --- /dev/null +++ b/apps/webapp/app/v3/mollifier/mollifierDrainingGauge.server.ts @@ -0,0 +1,63 @@ +import { logger } from "~/services/logger.server"; +import { getMollifierBuffer } from "./mollifierBuffer.server"; +import { reportDrainingCount } from "./mollifierTelemetry.server"; + +// How often we ZCARD the draining-tracker set. Each poll is a single +// O(1) Redis call, so cadence is bounded by "how fresh do we want the +// gauge?" rather than cost. 15s gives a tight-enough window to spot a +// brief OOM-induced spike without burning RTTs, and lines up well with +// typical Prometheus scrape intervals. +const POLL_INTERVAL_MS = 15_000; + +let intervalHandle: ReturnType | null = null; + +// Polls `mollifier:draining` cardinality on an interval and feeds the +// gauge in `mollifierTelemetry.server.ts`. Started from the drainer +// worker bootstrap (alongside `drainer.start()`) so it runs on the same +// pods that actually pop/ack entries — observability is colocated with +// the lifecycle. +// +// Idempotent: a second call is a no-op (Remix dev hot-reload re-runs +// the bootstrap; the existing interval keeps ticking). +export function startMollifierDrainingGauge(opts: { + intervalMs?: number; + getBuffer?: typeof getMollifierBuffer; +} = {}): void { + if (intervalHandle !== null) return; + + const intervalMs = opts.intervalMs ?? POLL_INTERVAL_MS; + const getBuffer = opts.getBuffer ?? getMollifierBuffer; + + // Fire one poll immediately so the gauge populates before the first + // scrape rather than reading 0 for a full interval after boot. + const tick = async () => { + const buffer = getBuffer(); + if (!buffer) return; + try { + const count = await buffer.getDrainingCount(); + reportDrainingCount(count); + } catch (err) { + // Transient Redis blip — don't tank the loop, just leave the + // gauge at its last-known value. A sustained Redis outage will + // surface via the drainer's own alerts long before this gauge + // staleness becomes a primary signal. + logger.warn("Mollifier draining gauge poll failed; keeping previous value", { err }); + } + }; + + void tick(); + // unref so the interval doesn't keep the process alive past + // graceful shutdown — the gauge is best-effort, not a flush boundary. + intervalHandle = setInterval(() => { + void tick(); + }, intervalMs); + intervalHandle.unref?.(); +} + +// Test seam. Production code never calls this; lifecycle is implicitly +// process-end. +export function stopMollifierDrainingGauge(): void { + if (intervalHandle === null) return; + clearInterval(intervalHandle); + intervalHandle = null; +} diff --git a/apps/webapp/app/v3/mollifier/mollifierTelemetry.server.ts b/apps/webapp/app/v3/mollifier/mollifierTelemetry.server.ts index f9c7ca72f1..deaa32bb74 100644 --- a/apps/webapp/app/v3/mollifier/mollifierTelemetry.server.ts +++ b/apps/webapp/app/v3/mollifier/mollifierTelemetry.server.ts @@ -90,6 +90,39 @@ meter.addBatchObservableCallback( [staleEntriesGauge], ); +// Observability gauge for entries currently in DRAINING state — popped +// by the drainer but not yet acked/failed/requeued. Backed by the +// `mollifier:draining` ZSET (see `MollifierBuffer.getDrainingCount`) +// and polled by the loop in `mollifierDrainingGaugeLoop.server.ts`. +// +// Useful for: +// - "Is anything mid-drain right now?" panels +// - Post-crash forensics ("how many entries got stranded by that ECS OOM?") +// - Alerting: a sustained non-zero with no drainer progress is a stall +// +// No `envId` attribute — same high-cardinality constraint as the other +// mollifier gauges. The per-entry hash carries env/org for drill-down. +export const drainingCountGauge = meter.createObservableGauge( + "mollifier.draining.current", + { + description: + "Mollifier buffer entries currently in DRAINING state (popped but not yet acked/failed/requeued)", + }, +); + +let latestDrainingCount = 0; + +export function reportDrainingCount(count: number): void { + latestDrainingCount = count; +} + +meter.addBatchObservableCallback( + (result) => { + result.observe(drainingCountGauge, latestDrainingCount); + }, + [drainingCountGauge], +); + // Electric SQL's shape-stream protocol adds a `handle=` query param on // every reconnect after the initial GET. Gating the realtime-buffered // log/counter on its absence keeps the signal at one tick per diff --git a/apps/webapp/app/v3/mollifierDrainerWorker.server.ts b/apps/webapp/app/v3/mollifierDrainerWorker.server.ts index e571344141..bd348f8112 100644 --- a/apps/webapp/app/v3/mollifierDrainerWorker.server.ts +++ b/apps/webapp/app/v3/mollifierDrainerWorker.server.ts @@ -5,6 +5,7 @@ import { getMollifierDrainer, MollifierConfigurationError, } from "./mollifier/mollifierDrainer.server"; +import { startMollifierDrainingGauge } from "./mollifier/mollifierDrainingGauge.server"; declare global { // eslint-disable-next-line no-var @@ -92,6 +93,12 @@ export function initMollifierDrainerWorker( signalsEmitter.on("SIGINT", stopDrainer); global.__mollifierShutdownRegistered__ = true; drainer.start(); + // Spin up the observability-only gauge poller for the + // `mollifier:draining` ZSET cardinality. Colocated with the + // drainer because that's the loop creating the DRAINING entries + // — same pod, same Redis client lifecycle. Idempotent + unref'd + // so it's safe under dev hot-reload and doesn't block shutdown. + startMollifierDrainingGauge(); } } catch (error) { // Deterministic misconfig (shutdown-timeout vs GRACEFUL_SHUTDOWN_TIMEOUT, diff --git a/apps/webapp/test/mollifierDrainingGauge.test.ts b/apps/webapp/test/mollifierDrainingGauge.test.ts new file mode 100644 index 0000000000..18251e310b --- /dev/null +++ b/apps/webapp/test/mollifierDrainingGauge.test.ts @@ -0,0 +1,116 @@ +import { describe, expect, it, vi, beforeEach, afterEach } from "vitest"; + +// Same defensive mocks as mollifierDrainerWorker.test.ts: importing +// the gauge module transitively loads telemetry → meter → OTel +// initialisation, plus the buffer singleton's runtime resolution. +vi.mock("~/db.server", () => ({ prisma: {}, $replica: {} })); +vi.mock("~/services/logger.server", () => ({ + logger: { warn: vi.fn(), error: vi.fn(), info: vi.fn(), debug: vi.fn() }, +})); + +const reportDrainingCount = vi.fn(); +vi.mock("~/v3/mollifier/mollifierTelemetry.server", () => ({ + reportDrainingCount: (count: number) => reportDrainingCount(count), +})); + +import { + startMollifierDrainingGauge, + stopMollifierDrainingGauge, +} from "~/v3/mollifier/mollifierDrainingGauge.server"; + +// The gauge poller reads `mollifier:draining` cardinality on a cadence +// and forwards it to `reportDrainingCount`. These tests pin the +// observable contract: the gauge value is the buffer's count, transient +// errors keep the last value, and the loop never blocks the main thread +// (unref'd interval — verified implicitly because Vitest exits cleanly). +describe("startMollifierDrainingGauge", () => { + beforeEach(() => { + reportDrainingCount.mockReset(); + stopMollifierDrainingGauge(); + }); + + afterEach(() => { + stopMollifierDrainingGauge(); + }); + + it("fires an immediate poll on start so the gauge populates before the first scrape", async () => { + const buffer = { getDrainingCount: vi.fn().mockResolvedValue(7) } as any; + startMollifierDrainingGauge({ + intervalMs: 100_000, // long — we're checking the immediate fire, not the interval + getBuffer: () => buffer, + }); + + // Wait one microtask tick so the eager poll resolves. + await new Promise((r) => setImmediate(r)); + expect(reportDrainingCount).toHaveBeenCalledWith(7); + expect(buffer.getDrainingCount).toHaveBeenCalledTimes(1); + }); + + it("polls on the configured cadence", async () => { + const buffer = { getDrainingCount: vi.fn().mockResolvedValue(3) } as any; + startMollifierDrainingGauge({ + intervalMs: 20, + getBuffer: () => buffer, + }); + + // Eager tick + at least one interval tick. + await new Promise((r) => setTimeout(r, 80)); + expect(buffer.getDrainingCount.mock.calls.length).toBeGreaterThanOrEqual(2); + expect(reportDrainingCount).toHaveBeenCalledWith(3); + }); + + it("no-ops when the buffer singleton returns null (mollifier disabled)", async () => { + startMollifierDrainingGauge({ + intervalMs: 20, + getBuffer: () => null, + }); + await new Promise((r) => setTimeout(r, 60)); + expect(reportDrainingCount).not.toHaveBeenCalled(); + }); + + it("swallows a transient ZCARD failure so the loop keeps running", async () => { + let calls = 0; + const buffer = { + getDrainingCount: vi.fn(async () => { + calls += 1; + if (calls === 1) throw new Error("transient redis blip"); + return 4; + }), + } as any; + startMollifierDrainingGauge({ + intervalMs: 20, + getBuffer: () => buffer, + }); + + await new Promise((r) => setTimeout(r, 80)); + // First call threw → no report. Second call succeeded → reported. + // The gauge keeps its previous value (stale-but-non-zero) between + // the failed poll and the next successful one — better than + // crashing the loop and going silent forever. + expect(reportDrainingCount).toHaveBeenCalledWith(4); + expect(buffer.getDrainingCount.mock.calls.length).toBeGreaterThanOrEqual(2); + }); + + it("is idempotent: a second start does not spawn a parallel loop", async () => { + const buffer = { getDrainingCount: vi.fn().mockResolvedValue(1) } as any; + startMollifierDrainingGauge({ intervalMs: 25, getBuffer: () => buffer }); + startMollifierDrainingGauge({ intervalMs: 25, getBuffer: () => buffer }); + + await new Promise((r) => setTimeout(r, 90)); + // One eager + a small number of interval ticks. Doubled-loop would + // produce ~2× the calls in the same window. Upper bound is generous + // for CI jitter; the property is "single loop", not exact count. + expect(buffer.getDrainingCount.mock.calls.length).toBeLessThan(8); + }); + + it("stop halts the polling loop", async () => { + const buffer = { getDrainingCount: vi.fn().mockResolvedValue(2) } as any; + startMollifierDrainingGauge({ intervalMs: 20, getBuffer: () => buffer }); + await new Promise((r) => setTimeout(r, 50)); + const callsAtStop = buffer.getDrainingCount.mock.calls.length; + stopMollifierDrainingGauge(); + + await new Promise((r) => setTimeout(r, 80)); + expect(buffer.getDrainingCount.mock.calls.length).toBe(callsAtStop); + }); +}); diff --git a/packages/redis-worker/src/mollifier/buffer.test.ts b/packages/redis-worker/src/mollifier/buffer.test.ts index b47e41589e..3a775bbb8f 100644 --- a/packages/redis-worker/src/mollifier/buffer.test.ts +++ b/packages/redis-worker/src/mollifier/buffer.test.ts @@ -3,6 +3,7 @@ import { BufferEntrySchema, serialiseSnapshot, deserialiseSnapshot } from "./sch import { redisTest } from "@internal/testcontainers"; import { Logger } from "@trigger.dev/core/logger"; import { + DRAINING_SET_KEY, MollifierBuffer, idempotencyLookupKeyFor, makeIdempotencyClaimKey, @@ -2724,3 +2725,250 @@ describe("MollifierBuffer pre-gate claim — ownership token safety", () => { }, ); }); + +// The DRAINING set is observability-only: a sorted set keyed by the +// pop wall-clock millis whose membership mirrors entries currently in +// DRAINING state (popped, not yet acked/failed/requeued). The gauge in +// `mollifierDrainerWorker.server.ts` polls `getDrainingCount` and emits +// `mollifier.draining.current` for ops dashboards / post-crash +// forensics. Tests pin the lifecycle transitions on every Lua boundary +// so a regression that breaks the gauge surfaces here, not at 03:00. +describe("MollifierBuffer.draining tracker (observability)", () => { + redisTest( + "pop ZADDs to the draining set with a positive recent score", + { timeout: 20_000 }, + async ({ redisContainer }) => { + const buffer = new MollifierBuffer({ + redisOptions: { + host: redisContainer.getHost(), + port: redisContainer.getPort(), + password: redisContainer.getPassword(), + }, + logger: new Logger("test", "log"), + }); + + try { + const before = Date.now(); + await buffer.accept({ runId: "drn_1", envId: "env_a", orgId: "org_1", payload: "{}" }); + await buffer.pop("env_a"); + + const count = await buffer.getDrainingCount(); + expect(count).toBe(1); + + // Score is the pop wall-clock in millis (Redis TIME, computed + // inside the Lua). Sanity-check it's within a tight window of + // the test's wall-clock so a future bug substituting createdAt + // or zero would surface. + const score = await buffer["redis"].zscore(DRAINING_SET_KEY, "drn_1"); + expect(score).not.toBeNull(); + const scoreMs = Number(score); + expect(scoreMs).toBeGreaterThanOrEqual(before - 1_000); + expect(scoreMs).toBeLessThanOrEqual(Date.now() + 1_000); + } finally { + await buffer.close(); + } + }, + ); + + redisTest( + "ack ZREMs from the draining set", + { timeout: 20_000 }, + async ({ redisContainer }) => { + const buffer = new MollifierBuffer({ + redisOptions: { + host: redisContainer.getHost(), + port: redisContainer.getPort(), + password: redisContainer.getPassword(), + }, + logger: new Logger("test", "log"), + }); + + try { + await buffer.accept({ runId: "drn_ack", envId: "env_a", orgId: "org_1", payload: "{}" }); + await buffer.pop("env_a"); + expect(await buffer.getDrainingCount()).toBe(1); + + await buffer.ack("drn_ack"); + expect(await buffer.getDrainingCount()).toBe(0); + expect(await buffer["redis"].zscore(DRAINING_SET_KEY, "drn_ack")).toBeNull(); + } finally { + await buffer.close(); + } + }, + ); + + redisTest( + "fail ZREMs from the draining set even though the entry hash is torn down", + { timeout: 20_000 }, + async ({ redisContainer }) => { + const buffer = new MollifierBuffer({ + redisOptions: { + host: redisContainer.getHost(), + port: redisContainer.getPort(), + password: redisContainer.getPassword(), + }, + logger: new Logger("test", "log"), + }); + + try { + await buffer.accept({ runId: "drn_fail", envId: "env_a", orgId: "org_1", payload: "{}" }); + await buffer.pop("env_a"); + expect(await buffer.getDrainingCount()).toBe(1); + + await buffer.fail("drn_fail", { code: "X", message: "y" }); + expect(await buffer.getDrainingCount()).toBe(0); + } finally { + await buffer.close(); + } + }, + ); + + redisTest( + "requeue ZREMs from the draining set so the entry is no longer counted as in-flight", + { timeout: 20_000 }, + async ({ redisContainer }) => { + const buffer = new MollifierBuffer({ + redisOptions: { + host: redisContainer.getHost(), + port: redisContainer.getPort(), + password: redisContainer.getPassword(), + }, + logger: new Logger("test", "log"), + }); + + try { + await buffer.accept({ runId: "drn_rq", envId: "env_a", orgId: "org_1", payload: "{}" }); + await buffer.pop("env_a"); + expect(await buffer.getDrainingCount()).toBe(1); + + await buffer.requeue("drn_rq"); + // Back in QUEUED — not currently "draining", so the tracker is + // empty even though the entry hash still exists. + expect(await buffer.getDrainingCount()).toBe(0); + const entry = await buffer.getEntry("drn_rq"); + expect(entry!.status).toBe("QUEUED"); + } finally { + await buffer.close(); + } + }, + ); + + redisTest( + "the same entry going through pop → requeue → pop is tracked at the latest pop's score", + { timeout: 20_000 }, + async ({ redisContainer }) => { + // The pop Lua does ZADD (no NX/XX/GT flags) so the second pop + // overwrites the score with the new wall-clock. listStaleDraining + // therefore measures "time since the most recent pop", which is + // what an operator wants — a requeued entry that just got picked + // up again isn't stale, only one that was popped and stayed there. + const buffer = new MollifierBuffer({ + redisOptions: { + host: redisContainer.getHost(), + port: redisContainer.getPort(), + password: redisContainer.getPassword(), + }, + logger: new Logger("test", "log"), + }); + + try { + await buffer.accept({ runId: "drn_re", envId: "env_a", orgId: "org_1", payload: "{}" }); + await buffer.pop("env_a"); + const firstScore = Number(await buffer["redis"].zscore(DRAINING_SET_KEY, "drn_re")); + + await buffer.requeue("drn_re"); + // ZREMed; not counted as draining between pops. + expect(await buffer.getDrainingCount()).toBe(0); + + // Tiny sleep so the second pop's TIME is observably later. + await new Promise((r) => setTimeout(r, 25)); + await buffer.pop("env_a"); + const secondScore = Number(await buffer["redis"].zscore(DRAINING_SET_KEY, "drn_re")); + expect(secondScore).toBeGreaterThanOrEqual(firstScore); + } finally { + await buffer.close(); + } + }, + ); + + redisTest( + "listStaleDraining returns runIds popped before the cutoff and respects the limit", + { timeout: 20_000 }, + async ({ redisContainer }) => { + const buffer = new MollifierBuffer({ + redisOptions: { + host: redisContainer.getHost(), + port: redisContainer.getPort(), + password: redisContainer.getPassword(), + }, + logger: new Logger("test", "log"), + }); + + try { + // Pop two entries, wait, then pop a third. With the cutoff set + // to the gap, only the first two should come back. + await buffer.accept({ runId: "old_1", envId: "env_a", orgId: "org_1", payload: "{}" }); + await buffer.accept({ runId: "old_2", envId: "env_a", orgId: "org_1", payload: "{}" }); + await buffer.pop("env_a"); + await buffer.pop("env_a"); + + await new Promise((r) => setTimeout(r, 75)); + + await buffer.accept({ runId: "new_1", envId: "env_b", orgId: "org_1", payload: "{}" }); + await buffer.pop("env_b"); + + // 50ms cutoff: old_1 and old_2 qualify, new_1 is too fresh. + const stale = await buffer.listStaleDraining(50, 10); + expect(stale.sort()).toEqual(["old_1", "old_2"]); + + // Limit caps the result set. + const capped = await buffer.listStaleDraining(50, 1); + expect(capped.length).toBe(1); + expect(["old_1", "old_2"]).toContain(capped[0]); + } finally { + await buffer.close(); + } + }, + ); + + redisTest( + "draining set is not load-bearing for correctness: stale-sweep & entry hash still drive recovery", + { timeout: 20_000 }, + async ({ redisContainer }) => { + // Documents the invariant we rely on for graceful degradation: if + // a deploy somehow regresses the ZREM-on-ack (or the set is + // manually wiped), correctness still holds because the per-entry + // hash carries `status` and the stale-sweep scans the queue LISTs. + // The gauge would just over-report — an ops issue, not a data-loss + // bug. Pinning this with a direct DEL keeps the principle visible + // in test output. + const buffer = new MollifierBuffer({ + redisOptions: { + host: redisContainer.getHost(), + port: redisContainer.getPort(), + password: redisContainer.getPassword(), + }, + logger: new Logger("test", "log"), + }); + + try { + await buffer.accept({ runId: "deg_1", envId: "env_a", orgId: "org_1", payload: "{}" }); + const popped = await buffer.pop("env_a"); + expect(popped!.status).toBe("DRAINING"); + + // Wipe the tracker out-of-band. Correctness must not regress. + await buffer["redis"].del(DRAINING_SET_KEY); + + // ack still succeeds, entry hash still flips to materialised, + // grace TTL still applied. The next gauge poll just sees a + // count of 0 instead of 1 — an observability blip, not a bug. + await buffer.ack("deg_1"); + const after = await buffer.getEntry("deg_1"); + expect(after).not.toBeNull(); + expect(after!.materialised).toBe(true); + } finally { + await buffer.close(); + } + }, + ); +}); diff --git a/packages/redis-worker/src/mollifier/buffer.ts b/packages/redis-worker/src/mollifier/buffer.ts index 71920bb4ff..d2fff14dfc 100644 --- a/packages/redis-worker/src/mollifier/buffer.ts +++ b/packages/redis-worker/src/mollifier/buffer.ts @@ -18,6 +18,17 @@ export type MollifierBufferOptions = { // have a safety net while PG replica lag settles. const ACK_GRACE_TTL_SECONDS = 30; +// Observability-only sorted set of entries currently in DRAINING state +// (popped by the drainer, not yet acked/failed/requeued). Score is the +// pop wall-clock in milliseconds — `ZRANGEBYSCORE 0 ` gives +// the entries stuck mid-drain for longer than X. NOT load-bearing for +// correctness: the per-entry hash already carries `status` and the +// stale-sweep would catch stranded entries via the queue LISTs. This +// set is a fast top-level index for ops (gauge cardinality, post-crash +// forensics after an ECS OOM) — see `mollifierDrainerWorker` for the +// gauge wiring. +export const DRAINING_SET_KEY = "mollifier:draining"; + // ioredis reconnect backoff for the mollifier buffer client. The base // grows linearly with the attempt count and is capped at 1s (the same // envelope as the previous fixed `Math.min(times * 50, 1000)` schedule). @@ -204,6 +215,7 @@ export class MollifierBuffer { const encoded = (await this.redis.popAndMarkDraining( queueKey, orgsKey, + DRAINING_SET_KEY, entryPrefix, envId, "mollifier:org-envs:", @@ -493,7 +505,9 @@ export class MollifierBuffer { async ack(runId: string): Promise { await this.redis.ackMollifierEntry( `mollifier:entries:${runId}`, + DRAINING_SET_KEY, String(ACK_GRACE_TTL_SECONDS), + runId, ); } @@ -501,6 +515,7 @@ export class MollifierBuffer { await this.redis.requeueMollifierEntry( `mollifier:entries:${runId}`, "mollifier:orgs", + DRAINING_SET_KEY, "mollifier:queue:", runId, "mollifier:org-envs:", @@ -516,11 +531,39 @@ export class MollifierBuffer { async fail(runId: string, error: { code: string; message: string }): Promise { const result = await this.redis.failMollifierEntry( `mollifier:entries:${runId}`, + DRAINING_SET_KEY, JSON.stringify(error), + runId, ); return result === 1; } + // Observability-only: number of entries currently in DRAINING state + // (popped, not yet acked/failed/requeued). The gauge in the webapp + // drainer worker polls this on a short interval and emits it as + // `mollifier.draining.current` for ops dashboards and post-crash + // forensics. Cheap (single ZCARD). + async getDrainingCount(): Promise { + return this.redis.zcard(DRAINING_SET_KEY); + } + + // Observability-only: list runIds that have been DRAINING longer than + // `olderThanMs` (i.e. popped before `now - olderThanMs`). Bounded by + // `limit` to keep the result set tractable when something has gone + // very wrong. ZRANGEBYSCORE is O(log N + K). Score is the pop wall-clock + // in milliseconds as written by the popAndMarkDraining Lua. + async listStaleDraining(olderThanMs: number, limit: number): Promise { + const maxScore = Date.now() - Math.max(0, olderThanMs); + return this.redis.zrangebyscore( + DRAINING_SET_KEY, + "-inf", + String(maxScore), + "LIMIT", + 0, + Math.max(0, limit), + ); + } + // Returns Redis-side TTL on the entry hash. Returns -1 for entries // with no TTL — the steady state under the current design, where // entries persist until drainer ack/fail. The ack grace TTL (30s @@ -630,10 +673,11 @@ export class MollifierBuffer { }); this.redis.defineCommand("requeueMollifierEntry", { - numberOfKeys: 2, + numberOfKeys: 3, lua: ` local entryKey = KEYS[1] local orgsKey = KEYS[2] + local drainingSetKey = KEYS[3] local queuePrefix = ARGV[1] local runId = ARGV[2] local orgEnvsPrefix = ARGV[3] @@ -661,19 +705,32 @@ export class MollifierBuffer { redis.call('SADD', orgsKey, orgId) redis.call('SADD', orgEnvsPrefix .. orgId, envId) end + -- Observability-only: leaving DRAINING state, so drop the + -- entry from the draining-tracker set. ZREM on absent member + -- is a no-op. + redis.call('ZREM', drainingSetKey, runId) return 1 `, }); this.redis.defineCommand("popAndMarkDraining", { - numberOfKeys: 2, + numberOfKeys: 3, lua: ` local queueKey = KEYS[1] local orgsKey = KEYS[2] + local drainingSetKey = KEYS[3] local entryPrefix = ARGV[1] local envId = ARGV[2] local orgEnvsPrefix = ARGV[3] + -- Wall-clock millis used as the ZADD score on the draining-tracker + -- set. Computed once per script invocation so all observers see + -- the same pop instant. redis.call('TIME') is deterministic per + -- script execution (Lua sees it as a single read), satisfying the + -- write-determinism contract on replicas/AOF replay. + local timeArr = redis.call('TIME') + local nowMs = tonumber(timeArr[1]) * 1000 + math.floor(tonumber(timeArr[2]) / 1000) + -- Helper: prune org-level membership when an env's queue empties. -- Called only from the success branch where we know orgId from the -- popped entry. The no-runId branch below can't reach this because @@ -706,6 +763,14 @@ export class MollifierBuffer { local entryKey = entryPrefix .. runId if redis.call('EXISTS', entryKey) == 1 then redis.call('HSET', entryKey, 'status', 'DRAINING') + -- Observability-only: track the runId in the draining set + -- with the pop wall-clock as score. Acked/failed/requeued + -- in the corresponding Lua scripts. The set is NOT + -- load-bearing for correctness — the per-entry hash carries + -- status — so a missed ZREM on a partial Lua execution is + -- recoverable via the stale-sweep + entry hash, not a + -- correctness bug. + redis.call('ZADD', drainingSetKey, nowMs, runId) local raw = redis.call('HGETALL', entryKey) local result = {} for i = 1, #raw, 2 do @@ -957,10 +1022,18 @@ export class MollifierBuffer { }); this.redis.defineCommand("ackMollifierEntry", { - numberOfKeys: 1, + numberOfKeys: 2, lua: ` local entryKey = KEYS[1] + local drainingSetKey = KEYS[2] local graceTtlSeconds = tonumber(ARGV[1]) + local runId = ARGV[2] + + -- Always ZREM from the draining-tracker — even if the entry hash + -- has been concurrently torn down, the runId might still be in + -- the set (e.g. fail() ran first and cleared the hash but a + -- delayed ack races in). Idempotent: ZREM on absent is a no-op. + redis.call('ZREM', drainingSetKey, runId) -- Guard: never create a partial entry. If the hash is gone between -- pop and ack (concurrent fail or eviction — QUEUED entries carry @@ -984,10 +1057,17 @@ export class MollifierBuffer { }); this.redis.defineCommand("failMollifierEntry", { - numberOfKeys: 1, + numberOfKeys: 2, lua: ` local entryKey = KEYS[1] + local drainingSetKey = KEYS[2] local errorPayload = ARGV[1] + local runId = ARGV[2] + + -- Always ZREM from the draining-tracker (idempotent on absent). + -- Mirrors ack: the runId may be in the set even if the entry hash + -- has been raced away. + redis.call('ZREM', drainingSetKey, runId) -- Guard: nothing to mark FAILED if the hash is gone (concurrent -- ack/manual cleanup). Returning 0 lets the caller distinguish @@ -1077,6 +1157,7 @@ declare module "@internal/redis" { popAndMarkDraining( queueKey: string, orgsKey: string, + drainingSetKey: string, entryPrefix: string, envId: string, orgEnvsPrefix: string, @@ -1085,6 +1166,7 @@ declare module "@internal/redis" { requeueMollifierEntry( entryKey: string, orgsKey: string, + drainingSetKey: string, queuePrefix: string, runId: string, orgEnvsPrefix: string, @@ -1129,12 +1211,16 @@ declare module "@internal/redis" { ): Result; ackMollifierEntry( entryKey: string, + drainingSetKey: string, graceTtlSeconds: string, + runId: string, callback?: Callback, ): Result; failMollifierEntry( entryKey: string, + drainingSetKey: string, errorPayload: string, + runId: string, callback?: Callback, ): Result; delMollifierKeyIfEquals( diff --git a/packages/redis-worker/src/mollifier/drainer.test.ts b/packages/redis-worker/src/mollifier/drainer.test.ts index c6832e94c7..d4250641ee 100644 --- a/packages/redis-worker/src/mollifier/drainer.test.ts +++ b/packages/redis-worker/src/mollifier/drainer.test.ts @@ -130,6 +130,483 @@ describe("MollifierDrainer.runOnce", () => { }); }); +describe("MollifierDrainer.drainBatchSize", () => { + // Default behaviour (drainBatchSize=1) is exercised by every other + // test in this file — one pop per env per tick. These tests pin the + // single-env batched-pop fast path: with drainBatchSize=N, a single + // env with K buffered entries drains in ceil(K / N) ticks instead of + // K ticks, capped by the shared `concurrency` for in-flight handlers. + + it("pops up to drainBatchSize entries from a single env in one tick", async () => { + const queue: string[] = Array.from({ length: 10 }, (_, i) => `run_${i}`); + const handled: string[] = []; + const buffer = makeStubBuffer({ + ...eachEnvAsOwnOrg(["env_a"]), + pop: async (envId: string) => { + if (envId !== "env_a") return null; + const runId = queue.shift(); + if (!runId) return null; + return { + runId, + envId: "env_a", + orgId: "org_1", + payload: "{}", + attempts: 0, + createdAt: new Date(), + } as any; + }, + }); + + const drainer = new MollifierDrainer({ + buffer, + handler: async (input) => { + handled.push(input.runId); + }, + concurrency: 5, + maxAttempts: 3, + isRetryable: () => false, + drainBatchSize: 5, + logger: new Logger("test-drainer", "log"), + }); + + const r1 = await drainer.runOnce(); + expect(r1.drained).toBe(5); + expect(handled).toHaveLength(5); + + const r2 = await drainer.runOnce(); + expect(r2.drained).toBe(5); + expect(handled).toHaveLength(10); + + // Queue now empty — next tick is a no-op. + const r3 = await drainer.runOnce(); + expect(r3.drained).toBe(0); + expect(r3.failed).toBe(0); + }); + + it("respects global concurrency cap when batch dispatch exceeds it", async () => { + // drainBatchSize=10 with concurrency=3 means each tick pops 10 + // entries but only 3 handlers run in parallel; the other 7 sit in + // pLimit's queue. The cap is on in-flight handlers, not on per-tick + // pop count. + const queue: string[] = Array.from({ length: 10 }, (_, i) => `run_${i}`); + let inflight = 0; + let peak = 0; + const buffer = makeStubBuffer({ + ...eachEnvAsOwnOrg(["env_a"]), + pop: async (envId: string) => { + if (envId !== "env_a") return null; + const runId = queue.shift(); + if (!runId) return null; + return { + runId, + envId: "env_a", + orgId: "org_1", + payload: "{}", + attempts: 0, + createdAt: new Date(), + } as any; + }, + }); + + const drainer = new MollifierDrainer({ + buffer, + handler: async () => { + inflight += 1; + if (inflight > peak) peak = inflight; + await new Promise((r) => setTimeout(r, 25)); + inflight -= 1; + }, + concurrency: 3, + maxAttempts: 3, + isRetryable: () => false, + drainBatchSize: 10, + logger: new Logger("test-drainer", "log"), + }); + + const result = await drainer.runOnce(); + expect(result.drained).toBe(10); + expect(peak).toBeGreaterThan(1); // genuinely parallel + expect(peak).toBeLessThanOrEqual(3); // capped + }); + + it("a mid-batch pop failure aborts that env's batch and counts as one failure", async () => { + // Pin: when the third pop on env_bad throws, the drainer stops + // popping from that env for this tick (no infinite retry inside one + // tick), the two entries already popped still get processed, and + // the env contributes exactly one to the failed count. + let envBadPops = 0; + let envGoodPops = 0; + const handled: string[] = []; + const buffer = makeStubBuffer({ + ...eachEnvAsOwnOrg(["env_bad", "env_good"]), + pop: async (envId: string) => { + if (envId === "env_bad") { + envBadPops += 1; + if (envBadPops > 2) { + throw new Error("simulated pop failure mid-batch"); + } + return { + runId: `bad_${envBadPops}`, + envId: "env_bad", + orgId: "org_bad", + payload: "{}", + attempts: 0, + createdAt: new Date(), + } as any; + } + // env_good — one entry then empty. Track via pop-count rather + // than handler-side state so the pop loop's synchronous "is the + // queue empty?" check doesn't race against the parallel handler + // dispatch that runs after the whole batch is collected. + envGoodPops += 1; + if (envGoodPops > 1) return null; + return { + runId: "good_1", + envId: "env_good", + orgId: "org_good", + payload: "{}", + attempts: 0, + createdAt: new Date(), + } as any; + }, + }); + + const concurrency = 5; + const drainBatchSize = 5; + const drainer = new MollifierDrainer({ + buffer, + handler: async (input) => { + handled.push(input.runId); + }, + concurrency, + maxAttempts: 3, + isRetryable: () => false, + drainBatchSize, + logger: new Logger("test-drainer", "log"), + }); + + const result = await drainer.runOnce(); + // The actual ENTRIES drained are deterministic regardless of races: + // env_bad's pop returns bad_1 then bad_2 (the only two snapshots it + // ever produces) and env_good's pop returns good_1 (its only entry). + expect(result.drained).toBe(3); + expect(new Set(handled)).toEqual(new Set(["bad_1", "bad_2", "good_1"])); + // Exactly one failure recorded for env_bad, even though multiple + // workers can race into a broken env before skip propagates — the + // catch guards the increment on `!skip.has(envId)`, so the documented + // "one failure per env batch" contract holds. + expect(result.failed).toBe(1); + // env_bad's pop call count is bounded too — at most concurrency + // retries after the first throw — definitely never reaches the + // drainBatchSize ceiling. + expect(envBadPops).toBeGreaterThanOrEqual(3); + expect(envBadPops).toBeLessThan(drainBatchSize + concurrency); + }); + + it("fans batched pops out across multiple envs in a single tick", async () => { + // Pin: with N envs each holding K entries and drainBatchSize=K, one + // tick pops N×K entries and dispatches them all through the shared + // pLimit. Closes the gap that all the other batch tests cover a + // single env in isolation. + const envCount = 10; + const perEnv = 10; + const queues = new Map(); + for (let i = 0; i < envCount; i++) { + queues.set( + `env_${i}`, + Array.from({ length: perEnv }, (_, j) => `env_${i}_run_${j}`), + ); + } + const handled: string[] = []; + const buffer = makeStubBuffer({ + ...eachEnvAsOwnOrg([...queues.keys()]), + pop: async (envId: string) => { + const q = queues.get(envId); + if (!q || q.length === 0) return null; + const runId = q.shift()!; + return { + runId, + envId, + orgId: envId, + payload: "{}", + attempts: 0, + createdAt: new Date(), + } as any; + }, + }); + + const drainer = new MollifierDrainer({ + buffer, + handler: async (input) => { + handled.push(input.runId); + }, + concurrency: 20, + maxAttempts: 3, + isRetryable: () => false, + drainBatchSize: perEnv, + logger: new Logger("test-drainer", "log"), + }); + + const r = await drainer.runOnce(); + expect(r.drained).toBe(envCount * perEnv); + expect(handled).toHaveLength(envCount * perEnv); + // Every env contributed exactly perEnv entries. + const perEnvCounts = handled.reduce>((acc, runId) => { + const env = runId.replace(/_run_\d+$/, ""); + acc[env] = (acc[env] ?? 0) + 1; + return acc; + }, {}); + for (let i = 0; i < envCount; i++) { + expect(perEnvCounts[`env_${i}`]).toBe(perEnv); + } + }); + + it("preserves org-level fairness with drainBatchSize > 1", async () => { + // Regression guard for the hierarchical rotation property at batch + // > 1: a heavy org with many envs still gets ~1 org-slot per tick, + // not N. The original test at line ~1066 only exercises batchSize=1; + // this re-runs the same shape with batchSize=5 to ensure batching + // doesn't somehow give the noisy tenant more slots. + const orgAEnvs = Array.from({ length: 6 }, (_, i) => `env_orgA_${i}`); + const orgBEnv = "env_orgB_only"; + const envOrg = new Map(); + for (const e of orgAEnvs) envOrg.set(e, "org_A"); + envOrg.set(orgBEnv, "org_B"); + const queues = new Map>(); + for (const e of orgAEnvs) { + queues.set( + e, + Array.from({ length: 100 }, (_, i) => ({ + runId: `${e}_run_${i}`, + orgId: "org_A", + })), + ); + } + queues.set( + orgBEnv, + Array.from({ length: 100 }, (_, i) => ({ + runId: `${orgBEnv}_run_${i}`, + orgId: "org_B", + })), + ); + + const drainedByOrg: Record = { org_A: 0, org_B: 0 }; + const buffer = makeStubBuffer({ + listOrgs: async () => { + const orgs = new Set(); + for (const [envId, items] of queues.entries()) { + if (items.length > 0) orgs.add(envOrg.get(envId)!); + } + return [...orgs]; + }, + listEnvsForOrg: async (orgId: string) => { + const envs: string[] = []; + for (const [envId, items] of queues.entries()) { + if (items.length > 0 && envOrg.get(envId) === orgId) envs.push(envId); + } + return envs; + }, + pop: async (envId: string) => { + const q = queues.get(envId); + if (!q || q.length === 0) return null; + const entry = q.shift()!; + return { + runId: entry.runId, + envId, + orgId: entry.orgId, + payload: "{}", + attempts: 0, + createdAt: new Date(), + } as any; + }, + }); + + const drainer = new MollifierDrainer({ + buffer, + handler: async (input) => { + drainedByOrg[input.orgId] = (drainedByOrg[input.orgId] ?? 0) + 1; + }, + concurrency: 10, + maxAttempts: 3, + isRetryable: () => false, + maxOrgsPerTick: 100, + drainBatchSize: 5, + logger: new Logger("test-drainer", "log"), + }); + + for (let i = 0; i < 20; i++) { + await drainer.runOnce(); + } + + expect(drainedByOrg["org_A"]).toBeGreaterThan(0); + expect(drainedByOrg["org_B"]).toBeGreaterThan(0); + const ratio = drainedByOrg["org_A"]! / drainedByOrg["org_B"]!; + // Same fairness window as the batchSize=1 sibling test — batching + // multiplies per-tick throughput uniformly, not asymmetrically. + expect(ratio).toBeGreaterThan(0.7); + expect(ratio).toBeLessThan(1.5); + }); + + it("counts mixed handler success and failure within a batched tick correctly", async () => { + // 5 envs, one entry each, drainBatchSize=5. Three handlers succeed, + // two throw non-retryable → drained=3, failed=2. Pins that the batched + // dispatch's drained/failed accounting per entry is preserved when + // multiple outcomes interleave in one tick. + const envs = ["env_ok_1", "env_ok_2", "env_ok_3", "env_fail_1", "env_fail_2"]; + const popsByEnv = new Map(); + const buffer = makeStubBuffer({ + ...eachEnvAsOwnOrg(envs), + pop: async (envId: string) => { + if (!envs.includes(envId)) return null; + // One entry per env then empty. Track via a per-env pop counter + // so the batch loop terminates after the first hit even though + // drainBatchSize=5. + const popped = (popsByEnv.get(envId) ?? 0) + 1; + popsByEnv.set(envId, popped); + if (popped > 1) return null; + return { + runId: `run_${envId}`, + envId, + orgId: envId, + payload: "{}", + attempts: 0, + createdAt: new Date(), + } as any; + }, + }); + + const drainer = new MollifierDrainer({ + buffer, + handler: async (input) => { + if (input.envId.startsWith("env_fail")) { + throw new Error("simulated handler failure"); + } + }, + concurrency: 10, + maxAttempts: 3, + isRetryable: () => false, // non-retryable → terminal on first attempt + drainBatchSize: 5, + logger: new Logger("test-drainer", "log"), + }); + + const r = await drainer.runOnce(); + expect(r.drained).toBe(3); + expect(r.failed).toBe(2); + }); + + it("never has more than `concurrency` entries popped-but-not-acked at any moment", async () => { + // Regression guard for the DRAINING blast radius. Each pop+process + // happens inside a single pLimit slot, so at any instant the number + // of entries that have been popped (and therefore marked DRAINING in + // a real buffer) but not yet acked is bounded by `concurrency`. This + // matters because the stale sweep only catches DRAINING entries + // visibly after a threshold — a process crash with thousands of + // mid-flight entries would mean a long detection/recovery window. + const envCount = 10; + const perEnv = 20; + const queues = new Map(); + for (let i = 0; i < envCount; i++) { + queues.set( + `env_${i}`, + Array.from({ length: perEnv }, (_, j) => `env_${i}_run_${j}`), + ); + } + let inflightPoppedNotAcked = 0; + let peak = 0; + const concurrency = 4; + const buffer = makeStubBuffer({ + ...eachEnvAsOwnOrg([...queues.keys()]), + pop: async (envId: string) => { + const q = queues.get(envId); + if (!q || q.length === 0) return null; + const runId = q.shift()!; + inflightPoppedNotAcked += 1; + if (inflightPoppedNotAcked > peak) peak = inflightPoppedNotAcked; + return { + runId, + envId, + orgId: envId, + payload: "{}", + attempts: 0, + createdAt: new Date(), + } as any; + }, + ack: async () => { + inflightPoppedNotAcked -= 1; + }, + }); + + const drainer = new MollifierDrainer({ + buffer, + handler: async () => { + // Force handler overlap if scheduling allowed it — without a + // tight per-slot bound the peak would visibly exceed `concurrency`. + await new Promise((r) => setTimeout(r, 15)); + }, + concurrency, + maxAttempts: 3, + isRetryable: () => false, + drainBatchSize: perEnv, + logger: new Logger("test-drainer", "log"), + }); + + const r = await drainer.runOnce(); + expect(r.drained).toBe(envCount * perEnv); + expect(peak).toBeGreaterThan(1); // concurrency is real, not serialised + expect(peak).toBeLessThanOrEqual(concurrency); // and bounded by it + expect(inflightPoppedNotAcked).toBe(0); // everything settled + }); + + it("stops popping early when the env's queue empties before reaching drainBatchSize", async () => { + const queue = ["only_1", "only_2"]; + const handled: string[] = []; + let popCalls = 0; + const buffer = makeStubBuffer({ + ...eachEnvAsOwnOrg(["env_a"]), + pop: async (envId: string) => { + if (envId !== "env_a") return null; + popCalls += 1; + const runId = queue.shift(); + if (!runId) return null; + return { + runId, + envId: "env_a", + orgId: "org_1", + payload: "{}", + attempts: 0, + createdAt: new Date(), + } as any; + }, + }); + + const concurrency = 5; + const drainBatchSize = 10; + const drainer = new MollifierDrainer({ + buffer, + handler: async (input) => { + handled.push(input.runId); + }, + concurrency, + maxAttempts: 3, + isRetryable: () => false, + drainBatchSize, + logger: new Logger("test-drainer", "log"), + }); + + const r = await drainer.runOnce(); + expect(r.drained).toBe(2); + expect(new Set(handled)).toEqual(new Set(["only_1", "only_2"])); + // The property we're pinning: pop calls are bounded by concurrency + // (plus the original two successes) once the queue empties — they + // never run all the way up to drainBatchSize. With concurrency > 1 + // multiple workers can race to pop env_a before `skip.add` lands, + // so the upper bound is the worker count, not a tight "3". + expect(popCalls).toBeGreaterThanOrEqual(3); // 2 success + ≥1 sentinel null + expect(popCalls).toBeLessThanOrEqual(concurrency + 2); + expect(popCalls).toBeLessThan(drainBatchSize); // bounded — the actual safety property + }); +}); + describe("MollifierDrainer error handling", () => { redisTest("retryable error requeues and increments attempts", { timeout: 20_000 }, async ({ redisContainer }) => { const buffer = new MollifierBuffer({ diff --git a/packages/redis-worker/src/mollifier/drainer.ts b/packages/redis-worker/src/mollifier/drainer.ts index 20b5ee3ae1..b5b90cdb2a 100644 --- a/packages/redis-worker/src/mollifier/drainer.ts +++ b/packages/redis-worker/src/mollifier/drainer.ts @@ -1,5 +1,4 @@ import { Logger } from "@trigger.dev/core/logger"; -import pLimit from "p-limit"; import { MollifierBuffer } from "./buffer.js"; import { BufferEntry, deserialiseSnapshot } from "./schemas.js"; @@ -52,6 +51,21 @@ export type MollifierDrainerOptions = { // as an org with 1 env — tenant-level drainage throughput is determined // by org count, not env count. maxOrgsPerTick?: number; + // Per-env per-tick pop cap. Default 1 preserves the original + // one-pop-per-env-per-tick behaviour. Setting it higher lets a single + // env drain at handler-parallelism speed: each tick the drainer pops + // up to `drainBatchSize` entries from the env's queue, then dispatches + // them all through the shared `concurrency`-bounded pLimit. For a + // single-env burst this turns N sequential ticks into one tick of N + // parallel handler calls, capped by `concurrency`. Org/env fairness + // still holds — each org still contributes exactly one env per tick. + // + // Memory: per-tick in-flight entries ≤ `maxOrgsPerTick × drainBatchSize`. + // Operators sizing this should ensure their PG pool / engine handler + // can sustain `concurrency` parallel writes; popping more than the + // handler can process per tick just queues entries in JS waiting on + // pLimit. + drainBatchSize?: number; logger?: Logger; }; @@ -68,8 +82,9 @@ export class MollifierDrainer { private readonly isRetryable: (err: unknown) => boolean; private readonly pollIntervalMs: number; private readonly maxOrgsPerTick: number; + private readonly drainBatchSize: number; + private readonly concurrency: number; private readonly logger: Logger; - private readonly limit: ReturnType; // Rotation state. `orgCursor` advances through the active-orgs list. // Each org has its own internal cursor in `perOrgEnvCursors` for // cycling through that org's envs. Both reset on `start()`. @@ -87,8 +102,9 @@ export class MollifierDrainer { this.isRetryable = options.isRetryable; this.pollIntervalMs = options.pollIntervalMs ?? 100; this.maxOrgsPerTick = options.maxOrgsPerTick ?? 500; + this.drainBatchSize = Math.max(1, options.drainBatchSize ?? 1); + this.concurrency = Math.max(1, options.concurrency); this.logger = options.logger ?? new Logger("MollifierDrainer", "debug"); - this.limit = pLimit(options.concurrency); } async runOnce(): Promise { @@ -116,16 +132,107 @@ export class MollifierDrainer { targets.push(envId); } - const inflight: Promise<"drained" | "failed" | "empty">[] = []; - for (const envId of targets) { - inflight.push(this.limit(() => this.processOneFromEnv(envId))); - } + if (targets.length === 0) return { drained: 0, failed: 0 }; + + // Worker-pool draining. We spawn up to `concurrency` workers; each + // worker repeatedly: + // 1. Picks the next env with budget remaining (round-robin), + // atomically claiming one slot of that env's per-tick budget. + // 2. Pops one entry and processes it. + // 3. Repeats until pickNextEnv returns null. + // + // This pattern gives us both invariants the prior two designs traded + // off: + // - Single-env bursts use the full `concurrency` budget. All + // workers can pull from one env, processing `concurrency` entries + // in parallel. + // - The number of entries in "popped-but-not-acked" (DRAINING) + // state at any moment is bounded by the worker count, i.e. + // `concurrency` — same blast radius as the pre-batch + // one-pop-per-env model. A process crash mid-tick strands at + // most `concurrency` entries for stale-sweep to recover, not + // `maxOrgsPerTick × drainBatchSize`. + // + // Fairness: pickNextEnv advances a cursor by 1 each successful pick, + // so workers round-robin across envs at the entry level. Combined + // with the per-env budget cap, an env contributes at most + // `drainBatchSize` entries per tick regardless of how many workers + // are free — a heavy env can't starve siblings within a tick. + const remaining = new Map(); + const skip = new Set(); // envs with empty queue or pop failure this tick + for (const envId of targets) remaining.set(envId, this.drainBatchSize); - const results = await Promise.all(inflight); - return { - drained: results.filter((r) => r === "drained").length, - failed: results.filter((r) => r === "failed").length, + let cursor = 0; + const pickNextEnv = (): string | null => { + for (let i = 0; i < targets.length; i++) { + const idx = (cursor + i) % targets.length; + const envId = targets[idx]!; + if (skip.has(envId)) continue; + const r = remaining.get(envId) ?? 0; + if (r > 0) { + remaining.set(envId, r - 1); + cursor = (idx + 1) % targets.length; + return envId; + } + } + return null; + }; + + let drained = 0; + let failed = 0; + + const worker = async (): Promise => { + while (true) { + const envId = pickNextEnv(); + if (envId === null) return; + let entry: BufferEntry | null; + try { + entry = await this.buffer.pop(envId); + } catch (err) { + // A pop failure on one env aborts that env's batch for this + // tick (don't keep hammering a broken Redis) and counts as + // exactly one failure — same as the pre-batch path on a pop + // blowup. Other envs continue. + // + // `pickNextEnv` decrements `remaining` before the pop settles, + // so multiple workers can race into the same env and all hit + // a throwing pop before the first catch lands. Guarding the + // failure increment on `!skip.has(envId)` keeps the per-env + // failure count at exactly one even under that race — + // matching the documented contract. + this.logger.error("MollifierDrainer.pop failed", { envId, err }); + if (!skip.has(envId)) { + skip.add(envId); + failed += 1; + } + continue; + } + if (!entry) { + // Queue exhausted between scheduling and this pop. Mark the + // env skipped so siblings aren't held up by repeated empty pops. + skip.add(envId); + continue; + } + try { + const outcome = await this.processEntry(entry); + if (outcome === "drained") drained += 1; + else failed += 1; + } catch (err) { + this.logger.error("MollifierDrainer.processEntry failed", { + envId, + runId: entry.runId, + err, + }); + failed += 1; + } + } }; + + const totalBudget = targets.length * this.drainBatchSize; + const workerCount = Math.min(this.concurrency, totalBudget); + await Promise.all(Array.from({ length: workerCount }, () => worker())); + + return { drained, failed }; } start(): void { @@ -249,32 +356,6 @@ export class MollifierDrainer { return sorted[idx]!; } - // A failure for one env (e.g. a Redis hiccup mid-batch in `pop`, or in - // `requeue`/`fail` during error recovery inside `processEntry`) must not - // poison the rest of the batch — `Promise.all` would otherwise reject and - // bubble all the way to `loop()`. Catch both stages here so the failed env - // is just counted as "failed" for this tick and we move on. - private async processOneFromEnv(envId: string): Promise<"drained" | "failed" | "empty"> { - let entry: BufferEntry | null; - try { - entry = await this.buffer.pop(envId); - } catch (err) { - this.logger.error("MollifierDrainer.pop failed", { envId, err }); - return "failed"; - } - if (!entry) return "empty"; - try { - return await this.processEntry(entry); - } catch (err) { - this.logger.error("MollifierDrainer.processEntry failed", { - envId, - runId: entry.runId, - err, - }); - return "failed"; - } - } - private async processEntry(entry: BufferEntry): Promise<"drained" | "failed"> { try { const payload = deserialiseSnapshot(entry.payload);