diff --git a/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/table-grid/table-grid.tsx b/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/table-grid/table-grid.tsx index e38c2fbdcbe..de9888b539c 100644 --- a/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/table-grid/table-grid.tsx +++ b/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/table-grid/table-grid.tsx @@ -95,6 +95,10 @@ export interface SelectionSnapshot { /** Total running/queued workflow runs across ALL rows. Drives the page-header * RunStatusControl ("N running, Stop all"). */ totalRunning: number + /** Whether any dispatch is active (pending/dispatching). Keeps the RunStatusControl + * + Stop-all visible during a run even when the per-row count momentarily reads 0 + * (e.g. the first window of an auto-fired/capped dispatch before cells stamp). */ + hasActiveDispatch: boolean /** Whether the table has any workflow-output columns (drives the Run/Stop visibility). */ hasWorkflowColumns: boolean /** Cells the Play / Refresh / Stop buttons act on. Null when the selection @@ -333,6 +337,7 @@ export function TableGrid({ // rows still inside a dispatch's scope — e.g. a cascade where 3 of 4 columns // finished would read "4 running" instead of "1". const totalRunning = Object.values(runningByRowId).reduce((sum, n) => sum + n, 0) + const hasActiveDispatch = (activeDispatches?.length ?? 0) > 0 const tableRowCountRef = useRef(tableData?.rowCount ?? 0) tableRowCountRef.current = tableData?.rowCount ?? 0 @@ -3194,6 +3199,7 @@ export function TableGrid({ sameStats && prev.runningInActionBarSelection === runningInActionBarSelection && prev.totalRunning === totalRunning && + prev.hasActiveDispatch === hasActiveDispatch && prev.hasWorkflowColumns === hasWorkflowColumns && prev.actionBarRowIds.length === actionBarRowIds.length && prev.actionBarRowIds.every((id, i) => id === actionBarRowIds[i]) @@ -3204,6 +3210,7 @@ export function TableGrid({ actionBarRowIds, runningInActionBarSelection, totalRunning, + hasActiveDispatch, hasWorkflowColumns, selectedRunScope, selectionStats, @@ -3215,6 +3222,7 @@ export function TableGrid({ actionBarRowIds, runningInActionBarSelection, totalRunning, + hasActiveDispatch, hasWorkflowColumns, selectedRunScope, selectionStats, diff --git a/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/hooks/use-table-event-stream.ts b/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/hooks/use-table-event-stream.ts index b82eec533c1..78eb7e96103 100644 --- a/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/hooks/use-table-event-stream.ts +++ b/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/hooks/use-table-event-stream.ts @@ -1,6 +1,6 @@ 'use client' -import { useEffect } from 'react' +import { useEffect, useRef } from 'react' import { createLogger } from '@sim/logger' import { useQueryClient } from '@tanstack/react-query' import type { ActiveDispatch } from '@/lib/api/contracts/tables' @@ -44,6 +44,9 @@ interface UseTableEventStreamArgs { tableId: string | undefined workspaceId: string | undefined enabled?: boolean + /** Fired when the server halts a dispatch because the billed account is over + * its usage limit. The page surfaces an upgrade prompt + redirect. */ + onUsageLimitReached?: (event: { dispatchId?: string; message: string }) => void } /** @@ -59,9 +62,14 @@ export function useTableEventStream({ tableId, workspaceId, enabled = true, + onUsageLimitReached, }: UseTableEventStreamArgs): void { const queryClient = useQueryClient() + // Ref so a changing callback identity doesn't tear down + reconnect the SSE. + const onUsageLimitReachedRef = useRef(onUsageLimitReached) + onUsageLimitReachedRef.current = onUsageLimitReached + useEffect(() => { if (!enabled || !tableId || !workspaceId) return @@ -205,6 +213,28 @@ export function useTableEventStream({ scheduleDispatchInvalidate() } + const applyUsageLimit = (event: Extract): void => { + // Drop the halted dispatch from the overlay so the "running" UI clears + // immediately (the dispatcher was marked complete server-side). Cascade / + // auto-fire events carry no dispatchId — nothing to remove. + if (event.dispatchId) { + queryClient.setQueryData(tableKeys.activeDispatches(tableId), (prev) => { + if (!prev) return prev + const filtered = prev.dispatches.filter((d) => d.id !== event.dispatchId) + return filtered.length === prev.dispatches.length + ? prev + : { ...prev, dispatches: filtered } + }) + } + // Blocked cells are left `queued` in the DB with no terminal cell event, + // so `runningByRowId` would otherwise stay non-zero (stale "X running"). + // Re-sync the server counts, and refetch rows so cells whose pre-stamps + // the server cleared drop their "Queued" state. + scheduleDispatchInvalidate() + void queryClient.invalidateQueries({ queryKey: tableKeys.rowsRoot(tableId) }) + onUsageLimitReachedRef.current?.({ dispatchId: event.dispatchId, message: event.message }) + } + const handlePrune = (payload: PrunedEvent): void => { logger.info('Table event buffer pruned — full refetch', { tableId, ...payload }) void queryClient.invalidateQueries({ queryKey: tableKeys.rowsRoot(tableId) }) @@ -253,6 +283,7 @@ export function useTableEventStream({ savePointer(tableId, lastEventId) if (entry.event?.kind === 'cell') applyCell(entry.event) else if (entry.event?.kind === 'dispatch') applyDispatch(entry.event) + else if (entry.event?.kind === 'usageLimitReached') applyUsageLimit(entry.event) } catch (err) { logger.warn('Failed to parse table event', { tableId, err }) } diff --git a/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/table.tsx b/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/table.tsx index 14fa7c6408c..5ba7f380d01 100644 --- a/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/table.tsx +++ b/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/table.tsx @@ -36,6 +36,7 @@ import { useRunColumn, } from '@/hooks/queries/tables' import { useInlineRename } from '@/hooks/use-inline-rename' +import { useSettingsNavigation } from '@/hooks/use-settings-navigation' import { useLogDetailsUIStore } from '@/stores/logs/store' import type { DeletedRowSnapshot } from '@/stores/table/types' import { @@ -129,7 +130,15 @@ export function Table({ const posthogRef = useRef(posthog) posthogRef.current = posthog - useTableEventStream({ tableId, workspaceId }) + const { navigateToSettings } = useSettingsNavigation() + // Plain function: `useTableEventStream` keeps it in a ref (its effect doesn't + // depend on the identity), so a stable reference buys nothing here. + const onUsageLimitReached = ({ message }: { dispatchId?: string; message: string }) => { + toast.error(message, { + action: { label: 'Upgrade', onClick: () => navigateToSettings({ section: 'subscription' }) }, + }) + } + useTableEventStream({ tableId, workspaceId, onUsageLimitReached }) const [slideout, dispatch] = useReducer(slideoutReducer, { kind: 'none' }) const [showDeleteTableConfirm, setShowDeleteTableConfirm] = useState(false) @@ -141,6 +150,7 @@ export function Table({ actionBarRowIds: [], runningInActionBarSelection: 0, totalRunning: 0, + hasActiveDispatch: false, hasWorkflowColumns: false, selectedRunScope: null, selectionStats: { hasIncompleteOrFailed: false, hasCompleted: false, hasInFlight: false }, @@ -509,7 +519,7 @@ export function Table({ createTrigger={createTrigger} actions={headerActions} leadingActions={ - selection.totalRunning > 0 ? ( + selection.totalRunning > 0 || selection.hasActiveDispatch ? ( setFilterOpen((prev) => !prev)} filterActive={filterOpen || !!queryOptions.filter} trailing={ - embedded && selection.totalRunning > 0 ? ( + embedded && (selection.totalRunning > 0 || selection.hasActiveDispatch) ? ( { +): Promise<'blocked' | undefined> { const { tableId, rowId, workspaceId } = payload const { getTableById, getRowById } = await import('@/lib/table/service') const { pickNextEligibleGroupForRow } = await import('@/lib/table/workflow-columns') @@ -121,6 +137,10 @@ export async function runRowCascadeLoop( ) if (result === 'paused') break + // Hard stop (e.g. usage limit): the dispatch was halted and no cell was + // marked. Propagate so the outer re-drive loop stops too — otherwise it + // would re-pick the still-pending queued marker and spin. + if (result === 'blocked') return 'blocked' const freshRow = await getRowById(tableId, rowId, workspaceId) if (!freshRow) break @@ -130,17 +150,20 @@ export async function runRowCascadeLoop( currentWorkflowId = next.workflowId currentExecutionId = generateId() } + return undefined } /** Returns `'paused'` to signal the cascade loop must exit (resume worker - * takes over). `'completed' | 'error'` keep the loop running. */ + * takes over) and `'blocked'` for a hard stop (usage limit — dispatch halted, + * cell left unmarked). `'completed' | 'error'` keep the loop running. */ async function runWorkflowAndWriteTerminal( payload: WorkflowGroupCellPayload, signal: AbortSignal | undefined, table: TableDefinition, group: WorkflowGroup -): Promise<'completed' | 'error' | 'paused'> { - const { tableId, tableName, rowId, groupId, workflowId, workspaceId, executionId } = payload +): Promise<'completed' | 'error' | 'paused' | 'blocked'> { + const { tableId, tableName, rowId, groupId, workflowId, workspaceId, executionId, dispatchId } = + payload const requestId = `wfgrp-${executionId}` return runWithRequestContext({ requestId }, async () => { @@ -155,6 +178,17 @@ async function runWorkflowAndWriteTerminal( const writeState = (executionState: RowExecutionMetadata, dataPatch?: RowData) => writeWorkflowGroupState(cellCtx, { executionState, dataPatch }) + /** Pre-execution cancellation guard: a cell cancelled while it sat in the + * queue (e.g. trigger.dev concurrency backlog) must not run once it + * dequeues. Reads the already-loaded row's exec — no extra query. */ + const cancelledBeforeRun = (exec: RowExecutionMetadata | undefined): boolean => { + if (!isExecCancelled(exec)) return false + logger.info( + `Skipping cell — cancelled before execution (table=${tableId} row=${rowId} group=${groupId})` + ) + return true + } + // Enrichment groups call a registry function directly instead of running a // workflow, reusing the same pickup → run → terminal-write status flow. The // `enrichmentId` guard ensures only true registry enrichments take this path @@ -184,6 +218,8 @@ async function runWorkflowAndWriteTerminal( return 'error' } + if (cancelledBeforeRun(row.executions?.[groupId])) return 'error' + const pickedUp = await markWorkflowGroupPickedUp(cellCtx, { workflowId: statusId, jobId: null, @@ -366,8 +402,125 @@ async function runWorkflowAndWriteTerminal( return 'error' } - // SQL guard rejects if a stop click stamped `cancelled` between enqueue - // and pickup. + if (cancelledBeforeRun(row.executions?.[groupId])) return 'error' + + // Billing / usage / timeout gate — route table cells through the same + // preprocessing every other trigger uses. Keep running draft + // (checkDeployment: false). Rate limiting is paced separately below so a + // retry doesn't re-run the (stable) billing/usage/subscription lookups. + // Failures are surfaced via cell state / SSE / dispatch halt, so suppress + // preprocessing's own execution-log writes. + const preprocess = await preprocessExecution({ + workflowId, + executionId, + requestId, + workspaceId, + workflowRecord, + userId: workflowRecord.userId, + triggerType: 'workflow', + checkDeployment: false, + checkRateLimit: false, + logPreprocessingErrors: false, + }) + if (!preprocess.success) { + // Usage/quota exhausted: retrying won't help. Halt the dispatch without + // marking any cell, and signal the client to upgrade. + if (preprocess.error?.statusCode === 402) { + logger.warn( + `Usage limit reached — halting dispatch (table=${tableId} row=${rowId} group=${groupId})` + ) + // Don't leave the cell stuck on its `pending` pre-stamp. Clear this + // cell's exec so it reverts to un-run (no error/cancelled badge — + // matching "don't mark"; re-runnable after upgrade). Each blocked + // cell clears its own. + const { updateRow } = await import('@/lib/table/service') + await updateRow( + { tableId, rowId, data: {}, workspaceId, executionsPatch: { [groupId]: null } }, + table, + requestId + ).catch((err) => + logger.warn(`Failed to clear cell pre-stamp on usage limit`, { + error: toError(err).message, + }) + ) + // With up to 20 concurrent cells all hitting the limit at once, only + // the cell that transitions the dispatch active→complete emits the + // event — otherwise the user sees a toast per in-flight cell. Cells + // with no owning dispatch (auto-fire) always emit. + let shouldEmit = true + if (dispatchId) { + const { completeDispatchIfActive } = await import('@/lib/table/dispatcher') + shouldEmit = await completeDispatchIfActive(dispatchId) + } + if (shouldEmit) { + await appendTableEvent({ + kind: 'usageLimitReached', + tableId, + ...(dispatchId ? { dispatchId } : {}), + message: + preprocess.error?.message ?? + 'Usage limit exceeded. Please upgrade your plan to continue.', + }) + } + return 'blocked' + } + await writeState({ + status: 'error', + executionId, + jobId: null, + workflowId, + error: preprocess.error?.message ?? 'Workflow could not start', + }) + return 'error' + } + + const actorUserId = preprocess.actorUserId ?? workflowRecord.userId + const asyncTimeoutMs = preprocess.executionTimeout?.async + + // Rate-limit pacing: tables count against the async counter (background + // jobs). On a hit, wait & retry so the row still runs rather than being + // skipped — only this cheap check repeats. The waiting cell holds its + // concurrency slot, pacing the whole dispatch to the user's rate limit. + const rateLimiter = new RateLimiter() + for (let attempt = 1; ; attempt++) { + if (signal?.aborted) return 'error' + const rl = await rateLimiter.checkRateLimitWithSubscription( + actorUserId, + preprocess.userSubscription ?? null, + 'workflow', + true + ) + if (rl.allowed) break + if (attempt >= RATE_LIMIT_MAX_ATTEMPTS) { + await writeState({ + status: 'error', + executionId, + jobId: null, + workflowId, + error: 'Rate limit exceeded — please retry later', + }) + return 'error' + } + // Exponential backoff WITH jitter — pass null, not the bucket's + // resetAt. That reset time is shared across all waiters, and + // backoffWithJitter clamps a non-null hint to a fixed value with no + // jitter, so honoring it would wake all ~20 concurrent cells in + // lockstep and stampede the bucket. Jittered backoff spreads retries. + const waitMs = backoffWithJitter(attempt, null) + logger.info( + `Rate limited — waiting ${Math.round(waitMs)}ms before retry ${attempt + 1} (table=${tableId} row=${rowId} group=${groupId})` + ) + await sleep(waitMs) + // Stop All can land mid-wait. On the trigger.dev backend `signal` never + // fires (cancelByKey is a no-op there), so re-check the DB tombstone and + // release this concurrency slot promptly instead of sleeping out the + // full retry budget. + const refreshed = await getRowById(tableId, rowId, workspaceId) + if (!refreshed || cancelledBeforeRun(refreshed.executions?.[groupId])) return 'error' + } + + // SQL guard also rejects if a stop click stamped `cancelled` between this + // check and pickup. const pickedUp = await markWorkflowGroupPickedUp(cellCtx, { workflowId, jobId: null, @@ -485,28 +638,42 @@ async function runWorkflowAndWriteTerminal( schedulePartialWrite() } - const result = await executeWorkflow( - { - id: workflowRecord.id, - userId: workflowRecord.userId, - workspaceId: workflowRecord.workspaceId, - variables: (workflowRecord.variables as Record | null) ?? {}, - }, - requestId, - input, - workflowRecord.userId, - { - enabled: true, - executionMode: 'sync', - workflowTriggerType: 'table', - triggerBlockId: startBlock.id, - useDraftState: true, - abortSignal: signal, - onBlockStart, - onBlockComplete, - }, - executionId - ) + // Enforce the per-plan execution timeout (from preprocessing), combined + // with the existing cancel signal so either a timeout or a Stop aborts. + const timeoutController = createTimeoutAbortController(asyncTimeoutMs) + const abortSignal = signal + ? AbortSignal.any([signal, timeoutController.signal]) + : timeoutController.signal + + let result: Awaited> + try { + result = await executeWorkflow( + { + id: workflowRecord.id, + // Workflow owner — drives personal env-var resolution + ownership. + userId: workflowRecord.userId, + workspaceId: workflowRecord.workspaceId, + variables: (workflowRecord.variables as Record | null) ?? {}, + }, + requestId, + input, + // Billing/usage/rate actor — the workspace billed account. + actorUserId, + { + enabled: true, + executionMode: 'sync', + workflowTriggerType: 'table', + triggerBlockId: startBlock.id, + useDraftState: true, + abortSignal, + onBlockStart, + onBlockComplete, + }, + executionId + ) + } finally { + timeoutController.cleanup() + } terminalWritten = true await writeChain.catch(() => {}) diff --git a/apps/sim/lib/execution/preprocessing.test.ts b/apps/sim/lib/execution/preprocessing.test.ts index a74950ec982..326a5f9f07f 100644 --- a/apps/sim/lib/execution/preprocessing.test.ts +++ b/apps/sim/lib/execution/preprocessing.test.ts @@ -3,10 +3,11 @@ */ import { loggingSessionMock } from '@sim/testing' -import { describe, expect, it, vi } from 'vitest' +import { beforeEach, describe, expect, it, vi } from 'vitest' -const { mockGetWorkspaceBilledAccountUserId } = vi.hoisted(() => ({ +const { mockGetWorkspaceBilledAccountUserId, mockCheckRateLimit } = vi.hoisted(() => ({ mockGetWorkspaceBilledAccountUserId: vi.fn(), + mockCheckRateLimit: vi.fn(), })) vi.mock('@sim/db', () => ({ db: {} })) @@ -21,7 +22,7 @@ vi.mock('@/lib/core/execution-limits', () => ({ getExecutionTimeout: vi.fn(() => 0), })) vi.mock('@/lib/core/rate-limiter/rate-limiter', () => ({ - RateLimiter: vi.fn(), + RateLimiter: vi.fn(() => ({ checkRateLimitWithSubscription: mockCheckRateLimit })), })) vi.mock('@/lib/logs/execution/logging-session', () => loggingSessionMock) vi.mock('@/lib/workspaces/utils', () => ({ @@ -36,6 +37,8 @@ vi.mock('@sim/workflow-authz', () => ({ }), })) +import { checkServerSideUsageLimits } from '@/lib/billing/calculations/usage-monitor' +import { getHighestPrioritySubscription } from '@/lib/billing/core/subscription' import { preprocessExecution } from './preprocessing' describe('preprocessExecution correlation logging', () => { @@ -88,3 +91,56 @@ describe('preprocessExecution correlation logging', () => { }) }) }) + +describe('preprocessExecution logPreprocessingErrors option', () => { + const baseOptions = { + workflowId: 'workflow-1', + userId: 'owner-1', + triggerType: 'workflow' as const, + executionId: 'execution-1', + requestId: 'request-1', + checkDeployment: false, + checkRateLimit: true, + workflowRecord: { id: 'workflow-1', workspaceId: 'workspace-1', isDeployed: false } as any, + } + + beforeEach(() => { + vi.clearAllMocks() + mockGetWorkspaceBilledAccountUserId.mockResolvedValue('billed-account-1') + vi.mocked(getHighestPrioritySubscription).mockResolvedValue({ plan: 'free' } as any) + vi.mocked(checkServerSideUsageLimits).mockResolvedValue({ + isExceeded: false, + currentUsage: 1, + limit: 10, + } as any) + mockCheckRateLimit.mockResolvedValue({ + allowed: true, + remaining: 100, + resetAt: new Date(), + }) + }) + + it('suppresses preprocessing-error logging when logPreprocessingErrors is false', async () => { + vi.mocked(checkServerSideUsageLimits).mockResolvedValueOnce({ + isExceeded: true, + currentUsage: 20, + limit: 10, + message: 'Usage limit exceeded. Please upgrade your plan to continue.', + } as any) + + const loggingSession = { + safeStart: vi.fn().mockResolvedValue(true), + safeCompleteWithError: vi.fn().mockResolvedValue(undefined), + } + + const result = await preprocessExecution({ + ...baseOptions, + logPreprocessingErrors: false, + loggingSession: loggingSession as any, + }) + + expect(result).toMatchObject({ success: false, error: { statusCode: 402 } }) + // No execution-log row written — the caller (table cell) surfaces it instead. + expect(loggingSession.safeStart).not.toHaveBeenCalled() + }) +}) diff --git a/apps/sim/lib/execution/preprocessing.ts b/apps/sim/lib/execution/preprocessing.ts index d65e0331c43..8b3ef86b418 100644 --- a/apps/sim/lib/execution/preprocessing.ts +++ b/apps/sim/lib/execution/preprocessing.ts @@ -35,6 +35,7 @@ export interface PreprocessExecutionOptions { checkRateLimit?: boolean // Default: false for manual/chat, true for others checkDeployment?: boolean // Default: true for non-manual triggers skipUsageLimits?: boolean // Default: false (only use for test mode) + logPreprocessingErrors?: boolean // Default: true. When false, skip writing workflow_execution_logs error rows (caller surfaces failures itself, e.g. table cells) // Context information workspaceId?: string // If known, used for billing resolution @@ -89,6 +90,7 @@ export async function preprocessExecution( checkRateLimit = triggerType !== 'manual' && triggerType !== 'chat', checkDeployment = triggerType !== 'manual', skipUsageLimits = false, + logPreprocessingErrors = true, workspaceId: providedWorkspaceId, loggingSession: providedLoggingSession, triggerData, @@ -97,6 +99,11 @@ export async function preprocessExecution( workflowRecord: prefetchedWorkflowRecord, } = options + // When `logPreprocessingErrors` is false the caller surfaces failures itself + // (e.g. table cells use cell state / SSE), so skip the execution-log writes. + const recordPreprocessingError: typeof logPreprocessingError = (args) => + logPreprocessingErrors ? logPreprocessingError(args) : Promise.resolve() + logger.info(`[${requestId}] Starting execution preprocessing`, { workflowId, userId, @@ -122,7 +129,7 @@ export async function preprocessExecution( if (!workflowRecord) { logger.warn(`[${requestId}] Workflow not found: ${workflowId}`) - await logPreprocessingError({ + await recordPreprocessingError({ workflowId, executionId, triggerType, @@ -147,7 +154,7 @@ export async function preprocessExecution( } catch (error) { logger.error(`[${requestId}] Error fetching workflow`, { error, workflowId }) - await logPreprocessingError({ + await recordPreprocessingError({ workflowId, executionId, triggerType, @@ -253,7 +260,7 @@ export async function preprocessExecution( workspaceId, }) - await logPreprocessingError({ + await recordPreprocessingError({ workflowId, executionId, triggerType, @@ -277,7 +284,7 @@ export async function preprocessExecution( } catch (error) { logger.error(`[${requestId}] Error resolving billing actor`, { error, workflowId }) const fallbackUserId = userId || 'unknown' - await logPreprocessingError({ + await recordPreprocessingError({ workflowId, executionId, triggerType, @@ -319,7 +326,7 @@ export async function preprocessExecution( } ) - await logPreprocessingError({ + await recordPreprocessingError({ workflowId, executionId, triggerType, @@ -349,7 +356,7 @@ export async function preprocessExecution( actorUserId, }) - await logPreprocessingError({ + await recordPreprocessingError({ workflowId, executionId, triggerType, @@ -395,7 +402,7 @@ export async function preprocessExecution( resetAt: rateLimitInfo.resetAt, }) - await logPreprocessingError({ + await recordPreprocessingError({ workflowId, executionId, triggerType, @@ -419,7 +426,7 @@ export async function preprocessExecution( } catch (error) { logger.error(`[${requestId}] Error checking rate limits`, { error, actorUserId }) - await logPreprocessingError({ + await recordPreprocessingError({ workflowId, executionId, triggerType, diff --git a/apps/sim/lib/table/cell-write.ts b/apps/sim/lib/table/cell-write.ts index c2655a1e745..c5caf6dc2dc 100644 --- a/apps/sim/lib/table/cell-write.ts +++ b/apps/sim/lib/table/cell-write.ts @@ -10,6 +10,7 @@ */ import { createLogger } from '@sim/logger' +import { isExecCancelled } from '@/lib/table/deps' import { appendTableEvent } from '@/lib/table/events' import type { RowData, RowExecutionMetadata, RowExecutions, WorkflowGroup } from '@/lib/table/types' @@ -80,11 +81,13 @@ export async function writeWorkflowGroupState( ) return 'skipped' } - if ( - current?.status === 'cancelled' && - current.executionId === executionId && - payload.executionState.status !== 'cancelled' - ) { + // A `cancelled` cell rejects any worker write regardless of executionId — a + // stop click can only stamp the dispatcher pre-stamp's executionId (often + // null), so an executionId-matched guard would let the worker that later + // claims the cell with its real id resurrect it. `bypassStaleWorker` (a fresh + // `queued` claim from a new dispatch, or the authoritative cancel write + // itself) still passes; manual re-runs clear the tombstone before stamping. + if (!bypassStaleWorker && isExecCancelled(current)) { logger.info( `Skipping group write — cancelled (table=${tableId} row=${rowId} group=${groupId} executionId=${executionId})` ) diff --git a/apps/sim/lib/table/deps.test.ts b/apps/sim/lib/table/deps.test.ts new file mode 100644 index 00000000000..b1ab3f8a314 --- /dev/null +++ b/apps/sim/lib/table/deps.test.ts @@ -0,0 +1,116 @@ +/** + * @vitest-environment node + */ +import { describe, expect, it } from 'vitest' +import { + areGroupDepsSatisfied, + getUnmetGroupDeps, + isExecCancelled, + isExecCancelledAfter, + optimisticallyScheduleNewlyEligibleGroups, +} from '@/lib/table/deps' +import type { RowExecutionMetadata, TableRow, WorkflowGroup } from '@/lib/table/types' + +function makeGroup(overrides: Partial & { id: string }): WorkflowGroup { + return { + workflowId: `wf-${overrides.id}`, + outputs: [{ blockId: 'b1', path: 'out', columnName: `${overrides.id}_out` }], + ...overrides, + } +} + +function makeRow( + data: Record = {}, + executions: Record = {} +): TableRow { + return { + id: 'row1', + data: data as TableRow['data'], + executions, + position: 0, + createdAt: new Date(), + updatedAt: new Date(), + } +} + +describe('areGroupDepsSatisfied — checkbox dependency', () => { + const group = makeGroup({ id: 'g1', dependencies: { columns: ['flag'] } }) + + it('treats a checked box (true) as satisfied', () => { + expect(areGroupDepsSatisfied(group, makeRow({ flag: true }))).toBe(true) + }) + + it('treats an unchecked box (false) as unmet', () => { + expect(areGroupDepsSatisfied(group, makeRow({ flag: false }))).toBe(false) + }) + + it('treats empty / null / undefined as unmet', () => { + expect(areGroupDepsSatisfied(group, makeRow({ flag: '' }))).toBe(false) + expect(areGroupDepsSatisfied(group, makeRow({ flag: null }))).toBe(false) + expect(areGroupDepsSatisfied(group, makeRow({}))).toBe(false) + }) + + it('reports an unchecked box in unmet deps', () => { + expect(getUnmetGroupDeps(group, makeRow({ flag: false })).columns).toEqual(['flag']) + expect(getUnmetGroupDeps(group, makeRow({ flag: true })).columns).toEqual([]) + }) +}) + +describe('optimisticallyScheduleNewlyEligibleGroups — checkbox toggle', () => { + const group = makeGroup({ id: 'g1', autoRun: true, dependencies: { columns: ['flag'] } }) + + it('flips the dependent to pending when checking (false → true)', () => { + const before = makeRow({ flag: false }) + const next = optimisticallyScheduleNewlyEligibleGroups([group], before, { flag: true }) + expect(next?.g1?.status).toBe('pending') + }) + + it('does NOT schedule anything when unchecking (true → false)', () => { + const before = makeRow({ flag: true }, { g1: completedExec('wf-g1') }) + const next = optimisticallyScheduleNewlyEligibleGroups([group], before, { flag: false }) + expect(next).toBeNull() + }) +}) + +function completedExec(workflowId: string): RowExecutionMetadata { + return { status: 'completed', executionId: 'e1', jobId: null, workflowId, error: null } +} + +describe('isExecCancelled', () => { + it('is true only for cancelled status', () => { + expect(isExecCancelled({ status: 'cancelled' } as RowExecutionMetadata)).toBe(true) + expect(isExecCancelled({ status: 'running' } as RowExecutionMetadata)).toBe(false) + expect(isExecCancelled(undefined)).toBe(false) + }) + + it('is true regardless of executionId — the resurrection-bug guard', () => { + // A stop click can only stamp the pre-stamp's (often null) executionId. + expect( + isExecCancelled({ status: 'cancelled', executionId: null } as RowExecutionMetadata) + ).toBe(true) + }) +}) + +describe('isExecCancelledAfter — dispatcher tombstone', () => { + const since = new Date('2026-01-01T00:00:00Z') + + it('is true when cancelled after the dispatch was requested', () => { + const exec = { + status: 'cancelled', + cancelledAt: '2026-01-01T00:00:05Z', + } as RowExecutionMetadata + expect(isExecCancelledAfter(exec, since)).toBe(true) + }) + + it('is false for a cancel that predates the dispatch (a prior, cleared run)', () => { + const exec = { + status: 'cancelled', + cancelledAt: '2025-12-31T23:59:59Z', + } as RowExecutionMetadata + expect(isExecCancelledAfter(exec, since)).toBe(false) + }) + + it('is false without a cancelledAt timestamp', () => { + expect(isExecCancelledAfter({ status: 'cancelled' } as RowExecutionMetadata, since)).toBe(false) + }) +}) diff --git a/apps/sim/lib/table/deps.ts b/apps/sim/lib/table/deps.ts index 9cc19293a3f..a8e2e246fd2 100644 --- a/apps/sim/lib/table/deps.ts +++ b/apps/sim/lib/table/deps.ts @@ -24,6 +24,37 @@ export function isExecInFlight(exec: RowExecutionMetadata | undefined): boolean return s === 'queued' || s === 'running' || s === 'pending' } +/** + * A cell run the user/stop killed. The single source of truth for "do not run / + * do not write this cell" — used by the in-memory write guard, the worker's + * pre-execution check, and the resume worker. The SQL guard in + * `writeExecutionsPatch` mirrors this status test in its `WHERE`. + */ +export function isExecCancelled(exec: RowExecutionMetadata | undefined): boolean { + return exec?.status === 'cancelled' +} + +/** + * Cancelled AND killed after `since`. The dispatcher's tombstone test: a cell + * cancelled after a dispatch was requested must be skipped by that dispatch's + * later windows, even though the dispatcher pre-stamped it before the stop. + */ +export function isExecCancelledAfter(exec: RowExecutionMetadata | undefined, since: Date): boolean { + if (!isExecCancelled(exec) || !exec?.cancelledAt) return false + const at = Date.parse(exec.cancelledAt) + return Number.isFinite(at) && at > since.getTime() +} + +/** + * A dependency column counts as unmet when its value is empty OR explicitly + * `false`. An unchecked checkbox is treated as "dependency not satisfied", so + * only checking a box (false→true) makes dependents eligible — unchecking + * (true→false) never triggers a rerun. + */ +function isDepValueUnmet(value: unknown): boolean { + return value === null || value === undefined || value === '' || value === false +} + /** * True when every output column the group writes still has a non-empty value * on this row. The "completed" exec status is metadata, but the cells are the @@ -47,8 +78,7 @@ export function areOutputsFilled(group: WorkflowGroup, row: TableRow): boolean { export function areGroupDepsSatisfied(group: WorkflowGroup, row: TableRow): boolean { const cols = group.dependencies?.columns ?? [] for (const colName of cols) { - const value = row.data[colName] - if (value === null || value === undefined || value === '') return false + if (isDepValueUnmet(row.data[colName])) return false } return true } @@ -66,8 +96,7 @@ export function getUnmetGroupDeps(group: WorkflowGroup, row: TableRow): UnmetDep const cols = group.dependencies?.columns ?? [] const columns: string[] = [] for (const colName of cols) { - const value = row.data[colName] - if (value === null || value === undefined || value === '') columns.push(colName) + if (isDepValueUnmet(row.data[colName])) columns.push(colName) } return { columns } } diff --git a/apps/sim/lib/table/dispatcher.ts b/apps/sim/lib/table/dispatcher.ts index 441abda9330..377758c9a57 100644 --- a/apps/sim/lib/table/dispatcher.ts +++ b/apps/sim/lib/table/dispatcher.ts @@ -6,6 +6,7 @@ import { generateId } from '@sim/utils/id' import { and, asc, eq, gt, inArray, isNotNull, ne, or, type SQL, sql } from 'drizzle-orm' import { getJobQueue } from '@/lib/core/async-jobs/config' import { writeWorkflowGroupState } from '@/lib/table/cell-write' +import { isExecCancelledAfter } from '@/lib/table/deps' import { appendTableEvent } from '@/lib/table/events' import type { RowExecutionMetadata, RowExecutions, TableRow } from '@/lib/table/types' import { @@ -359,6 +360,22 @@ export async function dispatcherStep(dispatchId: string): Promise