Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions .changeset/mollifier-buffer-extensions.md

This file was deleted.

5 changes: 5 additions & 0 deletions .changeset/mollifier-drain-batch-size.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@trigger.dev/redis-worker": patch
---

`MollifierDrainer` accepts a `drainBatchSize` option (default 1) that controls how many entries are popped per env per tick — in-flight handlers remain capped by the global `concurrency`. `MollifierBuffer` also gains `getDrainingCount()` / `listStaleDraining()`, backed by a new `mollifier:draining` ZSET maintained atomically with pop/ack/fail/requeue (observability-only).
5 changes: 0 additions & 5 deletions .changeset/mollifier-drainer-terminal-failure-callback.md

This file was deleted.

6 changes: 6 additions & 0 deletions .server-changes/mollifier-drain-batch-size.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
area: webapp
type: improvement
---

Wire `TRIGGER_MOLLIFIER_DRAIN_BATCH_SIZE` (default 50) so single-env bursts drain at the full `DRAIN_CONCURRENCY` budget per tick instead of one entry per tick. Also expose `mollifier.draining.current` ObservableGauge (polled every 15s on drainer pods) for in-flight DRAINING entries.
10 changes: 10 additions & 0 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1101,6 +1101,16 @@ const EnvironmentSchema = z
TRIGGER_MOLLIFIER_DRAIN_MAX_ATTEMPTS: z.coerce.number().int().positive().default(3),
TRIGGER_MOLLIFIER_DRAIN_SHUTDOWN_TIMEOUT_MS: z.coerce.number().int().positive().default(30_000),
TRIGGER_MOLLIFIER_DRAIN_MAX_ORGS_PER_TICK: z.coerce.number().int().positive().default(500),
// Per-env per-tick pop cap. The drainer rotates one env per org per
// tick; this bounds how many entries it pops from that env before
// dispatching them through the shared `DRAIN_CONCURRENCY`-bounded
// limiter. Default matches `DRAIN_CONCURRENCY` so a single-env burst
// uses the full handler-parallelism budget — for 20k buffered on one
// env this is the difference between ~17m (one-pop-per-tick × ~50ms)
// and ~20s (400 ticks × concurrent engine.trigger). Org/env fairness
// is preserved because the per-tick env selection is unchanged; only
// the in-env pop count grows.
TRIGGER_MOLLIFIER_DRAIN_BATCH_SIZE: z.coerce.number().int().positive().default(50),
// Periodic sweep that scans buffer queue LISTs for entries whose
// dwell exceeds the stale threshold. Independent of the drainer —
// its job is exactly to make a stuck/offline drainer visible to
Expand Down
2 changes: 2 additions & 0 deletions apps/webapp/app/v3/mollifier/mollifierDrainer.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ function initializeMollifierDrainer(): MollifierDrainer<MollifierSnapshot> {
logger.debug("Initializing mollifier drainer", {
concurrency: env.TRIGGER_MOLLIFIER_DRAIN_CONCURRENCY,
maxAttempts: env.TRIGGER_MOLLIFIER_DRAIN_MAX_ATTEMPTS,
drainBatchSize: env.TRIGGER_MOLLIFIER_DRAIN_BATCH_SIZE,
});

const drainer = new MollifierDrainer<MollifierSnapshot>({
Expand All @@ -81,6 +82,7 @@ function initializeMollifierDrainer(): MollifierDrainer<MollifierSnapshot> {
concurrency: env.TRIGGER_MOLLIFIER_DRAIN_CONCURRENCY,
maxAttempts: env.TRIGGER_MOLLIFIER_DRAIN_MAX_ATTEMPTS,
maxOrgsPerTick: env.TRIGGER_MOLLIFIER_DRAIN_MAX_ORGS_PER_TICK,
drainBatchSize: env.TRIGGER_MOLLIFIER_DRAIN_BATCH_SIZE,
isRetryable: isRetryablePgError,
});

Expand Down
63 changes: 63 additions & 0 deletions apps/webapp/app/v3/mollifier/mollifierDrainingGauge.server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import { logger } from "~/services/logger.server";
import { getMollifierBuffer } from "./mollifierBuffer.server";
import { reportDrainingCount } from "./mollifierTelemetry.server";

// How often we ZCARD the draining-tracker set. Each poll is a single
// O(1) Redis call, so cadence is bounded by "how fresh do we want the
// gauge?" rather than cost. 15s gives a tight-enough window to spot a
// brief OOM-induced spike without burning RTTs, and lines up well with
// typical Prometheus scrape intervals.
const POLL_INTERVAL_MS = 15_000;

let intervalHandle: ReturnType<typeof setInterval> | null = null;

// Polls `mollifier:draining` cardinality on an interval and feeds the
// gauge in `mollifierTelemetry.server.ts`. Started from the drainer
// worker bootstrap (alongside `drainer.start()`) so it runs on the same
// pods that actually pop/ack entries — observability is colocated with
// the lifecycle.
//
// Idempotent: a second call is a no-op (Remix dev hot-reload re-runs
// the bootstrap; the existing interval keeps ticking).
export function startMollifierDrainingGauge(opts: {
intervalMs?: number;
getBuffer?: typeof getMollifierBuffer;
} = {}): void {
if (intervalHandle !== null) return;

const intervalMs = opts.intervalMs ?? POLL_INTERVAL_MS;
const getBuffer = opts.getBuffer ?? getMollifierBuffer;

// Fire one poll immediately so the gauge populates before the first
// scrape rather than reading 0 for a full interval after boot.
const tick = async () => {
const buffer = getBuffer();
if (!buffer) return;
try {
const count = await buffer.getDrainingCount();
reportDrainingCount(count);
} catch (err) {
// Transient Redis blip — don't tank the loop, just leave the
// gauge at its last-known value. A sustained Redis outage will
// surface via the drainer's own alerts long before this gauge
// staleness becomes a primary signal.
logger.warn("Mollifier draining gauge poll failed; keeping previous value", { err });
}
};

void tick();
// unref so the interval doesn't keep the process alive past
// graceful shutdown — the gauge is best-effort, not a flush boundary.
intervalHandle = setInterval(() => {
void tick();
}, intervalMs);
intervalHandle.unref?.();
}

// Test seam. Production code never calls this; lifecycle is implicitly
// process-end.
export function stopMollifierDrainingGauge(): void {
if (intervalHandle === null) return;
clearInterval(intervalHandle);
intervalHandle = null;
}
33 changes: 33 additions & 0 deletions apps/webapp/app/v3/mollifier/mollifierTelemetry.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,39 @@ meter.addBatchObservableCallback(
[staleEntriesGauge],
);

// Observability gauge for entries currently in DRAINING state — popped
// by the drainer but not yet acked/failed/requeued. Backed by the
// `mollifier:draining` ZSET (see `MollifierBuffer.getDrainingCount`)
// and polled by the loop in `mollifierDrainingGaugeLoop.server.ts`.
//
// Useful for:
// - "Is anything mid-drain right now?" panels
// - Post-crash forensics ("how many entries got stranded by that ECS OOM?")
// - Alerting: a sustained non-zero with no drainer progress is a stall
//
// No `envId` attribute — same high-cardinality constraint as the other
// mollifier gauges. The per-entry hash carries env/org for drill-down.
export const drainingCountGauge = meter.createObservableGauge(
"mollifier.draining.current",
{
description:
"Mollifier buffer entries currently in DRAINING state (popped but not yet acked/failed/requeued)",
},
);

let latestDrainingCount = 0;

export function reportDrainingCount(count: number): void {
latestDrainingCount = count;
}

meter.addBatchObservableCallback(
(result) => {
result.observe(drainingCountGauge, latestDrainingCount);
},
[drainingCountGauge],
);

// Electric SQL's shape-stream protocol adds a `handle=` query param on
// every reconnect after the initial GET. Gating the realtime-buffered
// log/counter on its absence keeps the signal at one tick per
Expand Down
7 changes: 7 additions & 0 deletions apps/webapp/app/v3/mollifierDrainerWorker.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
getMollifierDrainer,
MollifierConfigurationError,
} from "./mollifier/mollifierDrainer.server";
import { startMollifierDrainingGauge } from "./mollifier/mollifierDrainingGauge.server";

declare global {
// eslint-disable-next-line no-var
Expand Down Expand Up @@ -92,6 +93,12 @@ export function initMollifierDrainerWorker(
signalsEmitter.on("SIGINT", stopDrainer);
global.__mollifierShutdownRegistered__ = true;
drainer.start();
// Spin up the observability-only gauge poller for the
// `mollifier:draining` ZSET cardinality. Colocated with the
// drainer because that's the loop creating the DRAINING entries
// — same pod, same Redis client lifecycle. Idempotent + unref'd
// so it's safe under dev hot-reload and doesn't block shutdown.
startMollifierDrainingGauge();
}
} catch (error) {
// Deterministic misconfig (shutdown-timeout vs GRACEFUL_SHUTDOWN_TIMEOUT,
Expand Down
116 changes: 116 additions & 0 deletions apps/webapp/test/mollifierDrainingGauge.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
import { describe, expect, it, vi, beforeEach, afterEach } from "vitest";

// Same defensive mocks as mollifierDrainerWorker.test.ts: importing
// the gauge module transitively loads telemetry → meter → OTel
// initialisation, plus the buffer singleton's runtime resolution.
vi.mock("~/db.server", () => ({ prisma: {}, $replica: {} }));
vi.mock("~/services/logger.server", () => ({
logger: { warn: vi.fn(), error: vi.fn(), info: vi.fn(), debug: vi.fn() },
}));

const reportDrainingCount = vi.fn();
vi.mock("~/v3/mollifier/mollifierTelemetry.server", () => ({
reportDrainingCount: (count: number) => reportDrainingCount(count),
}));

import {
startMollifierDrainingGauge,
stopMollifierDrainingGauge,
} from "~/v3/mollifier/mollifierDrainingGauge.server";

// The gauge poller reads `mollifier:draining` cardinality on a cadence
// and forwards it to `reportDrainingCount`. These tests pin the
// observable contract: the gauge value is the buffer's count, transient
// errors keep the last value, and the loop never blocks the main thread
// (unref'd interval — verified implicitly because Vitest exits cleanly).
describe("startMollifierDrainingGauge", () => {
beforeEach(() => {
reportDrainingCount.mockReset();
stopMollifierDrainingGauge();
});

afterEach(() => {
stopMollifierDrainingGauge();
});

it("fires an immediate poll on start so the gauge populates before the first scrape", async () => {
const buffer = { getDrainingCount: vi.fn().mockResolvedValue(7) } as any;
startMollifierDrainingGauge({
intervalMs: 100_000, // long — we're checking the immediate fire, not the interval
getBuffer: () => buffer,
});

// Wait one microtask tick so the eager poll resolves.
await new Promise((r) => setImmediate(r));
expect(reportDrainingCount).toHaveBeenCalledWith(7);
expect(buffer.getDrainingCount).toHaveBeenCalledTimes(1);
});

it("polls on the configured cadence", async () => {
const buffer = { getDrainingCount: vi.fn().mockResolvedValue(3) } as any;
startMollifierDrainingGauge({
intervalMs: 20,
getBuffer: () => buffer,
});

// Eager tick + at least one interval tick.
await new Promise((r) => setTimeout(r, 80));
expect(buffer.getDrainingCount.mock.calls.length).toBeGreaterThanOrEqual(2);
expect(reportDrainingCount).toHaveBeenCalledWith(3);
});

it("no-ops when the buffer singleton returns null (mollifier disabled)", async () => {
startMollifierDrainingGauge({
intervalMs: 20,
getBuffer: () => null,
});
await new Promise((r) => setTimeout(r, 60));
expect(reportDrainingCount).not.toHaveBeenCalled();
});

it("swallows a transient ZCARD failure so the loop keeps running", async () => {
let calls = 0;
const buffer = {
getDrainingCount: vi.fn(async () => {
calls += 1;
if (calls === 1) throw new Error("transient redis blip");
return 4;
}),
} as any;
startMollifierDrainingGauge({
intervalMs: 20,
getBuffer: () => buffer,
});

await new Promise((r) => setTimeout(r, 80));
// First call threw → no report. Second call succeeded → reported.
// The gauge keeps its previous value (stale-but-non-zero) between
// the failed poll and the next successful one — better than
// crashing the loop and going silent forever.
expect(reportDrainingCount).toHaveBeenCalledWith(4);
expect(buffer.getDrainingCount.mock.calls.length).toBeGreaterThanOrEqual(2);
});

it("is idempotent: a second start does not spawn a parallel loop", async () => {
const buffer = { getDrainingCount: vi.fn().mockResolvedValue(1) } as any;
startMollifierDrainingGauge({ intervalMs: 25, getBuffer: () => buffer });
startMollifierDrainingGauge({ intervalMs: 25, getBuffer: () => buffer });

await new Promise((r) => setTimeout(r, 90));
// One eager + a small number of interval ticks. Doubled-loop would
// produce ~2× the calls in the same window. Upper bound is generous
// for CI jitter; the property is "single loop", not exact count.
expect(buffer.getDrainingCount.mock.calls.length).toBeLessThan(8);
});

it("stop halts the polling loop", async () => {
const buffer = { getDrainingCount: vi.fn().mockResolvedValue(2) } as any;
startMollifierDrainingGauge({ intervalMs: 20, getBuffer: () => buffer });
await new Promise((r) => setTimeout(r, 50));
const callsAtStop = buffer.getDrainingCount.mock.calls.length;
stopMollifierDrainingGauge();

await new Promise((r) => setTimeout(r, 80));
expect(buffer.getDrainingCount.mock.calls.length).toBe(callsAtStop);
});
});
Loading