Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ export interface SelectionSnapshot {
/** Total running/queued workflow runs across ALL rows. Drives the page-header
* RunStatusControl ("N running, Stop all"). */
totalRunning: number
/** Whether any dispatch is active (pending/dispatching). Keeps the RunStatusControl
* + Stop-all visible during a run even when the per-row count momentarily reads 0
* (e.g. the first window of an auto-fired/capped dispatch before cells stamp). */
hasActiveDispatch: boolean
/** Whether the table has any workflow-output columns (drives the Run/Stop visibility). */
hasWorkflowColumns: boolean
/** Cells the Play / Refresh / Stop buttons act on. Null when the selection
Expand Down Expand Up @@ -333,6 +337,7 @@ export function TableGrid({
// rows still inside a dispatch's scope — e.g. a cascade where 3 of 4 columns
// finished would read "4 running" instead of "1".
const totalRunning = Object.values(runningByRowId).reduce((sum, n) => sum + n, 0)
const hasActiveDispatch = (activeDispatches?.length ?? 0) > 0

const tableRowCountRef = useRef(tableData?.rowCount ?? 0)
tableRowCountRef.current = tableData?.rowCount ?? 0
Expand Down Expand Up @@ -3194,6 +3199,7 @@ export function TableGrid({
sameStats &&
prev.runningInActionBarSelection === runningInActionBarSelection &&
prev.totalRunning === totalRunning &&
prev.hasActiveDispatch === hasActiveDispatch &&
prev.hasWorkflowColumns === hasWorkflowColumns &&
prev.actionBarRowIds.length === actionBarRowIds.length &&
prev.actionBarRowIds.every((id, i) => id === actionBarRowIds[i])
Expand All @@ -3204,6 +3210,7 @@ export function TableGrid({
actionBarRowIds,
runningInActionBarSelection,
totalRunning,
hasActiveDispatch,
hasWorkflowColumns,
selectedRunScope,
selectionStats,
Expand All @@ -3215,6 +3222,7 @@ export function TableGrid({
actionBarRowIds,
runningInActionBarSelection,
totalRunning,
hasActiveDispatch,
hasWorkflowColumns,
selectedRunScope,
selectionStats,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
'use client'

import { useEffect } from 'react'
import { useEffect, useRef } from 'react'
import { createLogger } from '@sim/logger'
import { useQueryClient } from '@tanstack/react-query'
import type { ActiveDispatch } from '@/lib/api/contracts/tables'
Expand Down Expand Up @@ -44,6 +44,9 @@ interface UseTableEventStreamArgs {
tableId: string | undefined
workspaceId: string | undefined
enabled?: boolean
/** Fired when the server halts a dispatch because the billed account is over
* its usage limit. The page surfaces an upgrade prompt + redirect. */
onUsageLimitReached?: (event: { dispatchId?: string; message: string }) => void
}

/**
Expand All @@ -59,9 +62,14 @@ export function useTableEventStream({
tableId,
workspaceId,
enabled = true,
onUsageLimitReached,
}: UseTableEventStreamArgs): void {
const queryClient = useQueryClient()

// Ref so a changing callback identity doesn't tear down + reconnect the SSE.
const onUsageLimitReachedRef = useRef(onUsageLimitReached)
onUsageLimitReachedRef.current = onUsageLimitReached

useEffect(() => {
if (!enabled || !tableId || !workspaceId) return

Expand Down Expand Up @@ -205,6 +213,28 @@ export function useTableEventStream({
scheduleDispatchInvalidate()
}

const applyUsageLimit = (event: Extract<TableEvent, { kind: 'usageLimitReached' }>): void => {
// Drop the halted dispatch from the overlay so the "running" UI clears
// immediately (the dispatcher was marked complete server-side). Cascade /
// auto-fire events carry no dispatchId — nothing to remove.
if (event.dispatchId) {
queryClient.setQueryData<TableRunState>(tableKeys.activeDispatches(tableId), (prev) => {
if (!prev) return prev
const filtered = prev.dispatches.filter((d) => d.id !== event.dispatchId)
return filtered.length === prev.dispatches.length
? prev
: { ...prev, dispatches: filtered }
})
}
// Blocked cells are left `queued` in the DB with no terminal cell event,
// so `runningByRowId` would otherwise stay non-zero (stale "X running").
// Re-sync the server counts, and refetch rows so cells whose pre-stamps
// the server cleared drop their "Queued" state.
scheduleDispatchInvalidate()
void queryClient.invalidateQueries({ queryKey: tableKeys.rowsRoot(tableId) })
onUsageLimitReachedRef.current?.({ dispatchId: event.dispatchId, message: event.message })
}
Comment thread
greptile-apps[bot] marked this conversation as resolved.
Comment thread
greptile-apps[bot] marked this conversation as resolved.

const handlePrune = (payload: PrunedEvent): void => {
logger.info('Table event buffer pruned — full refetch', { tableId, ...payload })
void queryClient.invalidateQueries({ queryKey: tableKeys.rowsRoot(tableId) })
Expand Down Expand Up @@ -253,6 +283,7 @@ export function useTableEventStream({
savePointer(tableId, lastEventId)
if (entry.event?.kind === 'cell') applyCell(entry.event)
else if (entry.event?.kind === 'dispatch') applyDispatch(entry.event)
else if (entry.event?.kind === 'usageLimitReached') applyUsageLimit(entry.event)
} catch (err) {
logger.warn('Failed to parse table event', { tableId, err })
}
Expand Down
16 changes: 13 additions & 3 deletions apps/sim/app/workspace/[workspaceId]/tables/[tableId]/table.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import {
useRunColumn,
} from '@/hooks/queries/tables'
import { useInlineRename } from '@/hooks/use-inline-rename'
import { useSettingsNavigation } from '@/hooks/use-settings-navigation'
import { useLogDetailsUIStore } from '@/stores/logs/store'
import type { DeletedRowSnapshot } from '@/stores/table/types'
import {
Expand Down Expand Up @@ -129,7 +130,15 @@ export function Table({
const posthogRef = useRef(posthog)
posthogRef.current = posthog

useTableEventStream({ tableId, workspaceId })
const { navigateToSettings } = useSettingsNavigation()
// Plain function: `useTableEventStream` keeps it in a ref (its effect doesn't
// depend on the identity), so a stable reference buys nothing here.
const onUsageLimitReached = ({ message }: { dispatchId?: string; message: string }) => {
toast.error(message, {
action: { label: 'Upgrade', onClick: () => navigateToSettings({ section: 'subscription' }) },
})
}
useTableEventStream({ tableId, workspaceId, onUsageLimitReached })

const [slideout, dispatch] = useReducer(slideoutReducer, { kind: 'none' })
const [showDeleteTableConfirm, setShowDeleteTableConfirm] = useState(false)
Expand All @@ -141,6 +150,7 @@ export function Table({
actionBarRowIds: [],
runningInActionBarSelection: 0,
totalRunning: 0,
hasActiveDispatch: false,
hasWorkflowColumns: false,
selectedRunScope: null,
selectionStats: { hasIncompleteOrFailed: false, hasCompleted: false, hasInFlight: false },
Expand Down Expand Up @@ -509,7 +519,7 @@ export function Table({
createTrigger={createTrigger}
actions={headerActions}
leadingActions={
selection.totalRunning > 0 ? (
selection.totalRunning > 0 || selection.hasActiveDispatch ? (
<RunStatusControl
running={selection.totalRunning}
onStopAll={onStopAll}
Expand All @@ -527,7 +537,7 @@ export function Table({
onFilterToggle={() => setFilterOpen((prev) => !prev)}
filterActive={filterOpen || !!queryOptions.filter}
trailing={
embedded && selection.totalRunning > 0 ? (
embedded && (selection.totalRunning > 0 || selection.hasActiveDispatch) ? (
<RunStatusControl
running={selection.totalRunning}
onStopAll={onStopAll}
Expand Down
31 changes: 31 additions & 0 deletions apps/sim/background/resume-execution.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { toError } from '@sim/utils/errors'
import { generateId } from '@sim/utils/id'
import { task } from '@trigger.dev/sdk'
import { withCascadeLock } from '@/lib/table/cascade-lock'
import { isExecCancelled } from '@/lib/table/deps'
import type { RowData, RowExecutionMetadata } from '@/lib/table/types'
import { PauseResumeManager } from '@/lib/workflows/executor/human-in-the-loop-manager'

Expand Down Expand Up @@ -44,6 +45,36 @@ export async function executeResumeJob(payload: ResumeExecutionPayload) {
const { findCellContextByExecutionId } = await import('@/lib/table/workflow-columns')
const cellContext = await findCellContextByExecutionId(parentExecutionId)

// A paused/awaiting table cell that was cancelled by "Stop all" must not
// resume — the cancel write is authoritative (matches the cell-write guard
// philosophy). Aborting here also stops the wasted compute the guard alone
// can't prevent. Read the cell's current exec and bail if cancelled.
if (cellContext) {
const { getRowById } = await import('@/lib/table/service')
const cellRow = await getRowById(
cellContext.tableId,
cellContext.rowId,
cellContext.workspaceId
)
if (isExecCancelled(cellRow?.executions?.[cellContext.groupId])) {
logger.info('Skipping resume — table cell cancelled', {
tableId: cellContext.tableId,
rowId: cellContext.rowId,
groupId: cellContext.groupId,
parentExecutionId,
})
return {
success: false,
workflowId,
executionId: resumeExecutionId,
parentExecutionId,
status: 'cancelled' as const,
output: undefined,
executedAt: new Date().toISOString(),
}
}
}

const writers = cellContext
? await buildResumeCellWriters(cellContext, parentExecutionId)
: null
Expand Down
Loading
Loading