From b399ee0fe27b8f9cacdb9f9ad77d1ee2a3b73692 Mon Sep 17 00:00:00 2001 From: Waleed Date: Sun, 31 May 2026 11:22:02 -0700 Subject: [PATCH 01/13] improvement(copilot): make copilot_messages the sole transcript store, remove JSONB dual-write (#4826) Stop writing/reading the legacy copilot_chats.messages JSONB column now that reads are cut over to copilot_messages. Make appendCopilotChatMessages the primary write (throws on failure instead of swallowing), repoint peripheral readers (workspace VFS, chat cleanup, data drains, fork, superuser import) to copilot_messages, and persist the assistant turn inside finalizeAssistantTurn's transaction so it commits atomically with the stream-marker clear. The column itself is dropped in a follow-up migration after this bakes. --- .../app/api/copilot/chat/stop/route.test.ts | 196 +++++---------- .../chat/update-messages/route.test.ts | 49 ++-- .../api/copilot/chat/update-messages/route.ts | 28 ++- .../mothership/chats/[chatId]/fork/route.ts | 67 +++--- apps/sim/app/api/mothership/chats/route.ts | 1 - .../api/superuser/import-workflow/route.ts | 60 +++-- apps/sim/lib/cleanup/chat-cleanup.ts | 49 ++-- apps/sim/lib/copilot/chat/lifecycle.test.ts | 6 +- apps/sim/lib/copilot/chat/lifecycle.ts | 20 +- .../lib/copilot/chat/messages-dual-write.ts | 143 ----------- ...l-write.test.ts => messages-store.test.ts} | 14 +- apps/sim/lib/copilot/chat/messages-store.ts | 122 ++++++++++ apps/sim/lib/copilot/chat/post.test.ts | 40 ++-- apps/sim/lib/copilot/chat/post.ts | 66 +++--- .../lib/copilot/chat/terminal-state.test.ts | 223 ++++++------------ apps/sim/lib/copilot/chat/terminal-state.ts | 62 +++-- apps/sim/lib/copilot/vfs/workspace-vfs.ts | 21 +- .../lib/data-drains/sources/copilot-chats.ts | 67 +++++- apps/sim/lib/mothership/inbox/executor.ts | 36 +-- 19 files changed, 592 insertions(+), 678 deletions(-) delete mode 100644 apps/sim/lib/copilot/chat/messages-dual-write.ts rename apps/sim/lib/copilot/chat/{messages-dual-write.test.ts => messages-store.test.ts} (95%) create mode 100644 apps/sim/lib/copilot/chat/messages-store.ts diff --git a/apps/sim/app/api/copilot/chat/stop/route.test.ts b/apps/sim/app/api/copilot/chat/stop/route.test.ts index bab5465507d..452131f21e1 100644 --- a/apps/sim/app/api/copilot/chat/stop/route.test.ts +++ b/apps/sim/app/api/copilot/chat/stop/route.test.ts @@ -1,79 +1,19 @@ /** * @vitest-environment node */ -import { authMockFns } from '@sim/testing' +import { authMockFns, dbChainMock, dbChainMockFns, resetDbChainMock } from '@sim/testing' import { NextRequest } from 'next/server' import { beforeEach, describe, expect, it, vi } from 'vitest' -const { - mockSelect, - mockFrom, - mockWhereSelect, - mockLimit, - mockForUpdate, - mockUpdate, - mockSet, - mockWhereUpdate, - mockReturning, - mockPublishStatusChanged, - mockSql, - mockTransaction, -} = vi.hoisted(() => { - const mockSelect = vi.fn() - const mockFrom = vi.fn() - const mockWhereSelect = vi.fn() - const mockLimit = vi.fn() - const mockForUpdate = vi.fn() - const mockUpdate = vi.fn() - const mockSet = vi.fn() - const mockWhereUpdate = vi.fn() - const mockReturning = vi.fn() - const mockPublishStatusChanged = vi.fn() - const mockSql = vi.fn((strings: TemplateStringsArray, ...values: unknown[]) => ({ - strings, - values, - })) - const mockTransaction = vi.fn( - (callback: (tx: { select: typeof mockSelect; update: typeof mockUpdate }) => unknown) => - callback({ select: mockSelect, update: mockUpdate }) - ) - - return { - mockSelect, - mockFrom, - mockWhereSelect, - mockLimit, - mockForUpdate, - mockUpdate, - mockSet, - mockWhereUpdate, - mockReturning, - mockPublishStatusChanged, - mockSql, - mockTransaction, - } -}) +vi.mock('@sim/db', () => dbChainMock) -vi.mock('@sim/db/schema', () => ({ - copilotChats: { - id: 'copilotChats.id', - userId: 'copilotChats.userId', - workspaceId: 'copilotChats.workspaceId', - messages: 'copilotChats.messages', - conversationId: 'copilotChats.conversationId', - }, -})) - -vi.mock('@sim/db', () => ({ - db: { - transaction: mockTransaction, - }, +const { mockAppendCopilotChatMessages, mockPublishStatusChanged } = vi.hoisted(() => ({ + mockAppendCopilotChatMessages: vi.fn(), + mockPublishStatusChanged: vi.fn(), })) -vi.mock('drizzle-orm', () => ({ - and: vi.fn((...conditions: unknown[]) => ({ conditions, type: 'and' })), - eq: vi.fn((field: unknown, value: unknown) => ({ field, value, type: 'eq' })), - sql: mockSql, +vi.mock('@/lib/copilot/chat/messages-store', () => ({ + appendCopilotChatMessages: mockAppendCopilotChatMessages, })) vi.mock('@/lib/copilot/tasks', () => ({ @@ -92,39 +32,33 @@ function createRequest(body: Record) { }) } +/** + * Sequence the two in-tx reads `finalizeAssistantTurn` issues: the chat row + * (`FOR UPDATE ... LIMIT 1`) and the last-message lookup that drives dedup + * (both terminate on `.limit(1)`). + */ +function mockReads(opts: { + chat: Record | null + last?: { messageId: string; role: string } +}) { + dbChainMockFns.limit.mockResolvedValueOnce(opts.chat ? [opts.chat] : []) + dbChainMockFns.limit.mockResolvedValueOnce(opts.last ? [opts.last] : []) +} + describe('copilot chat stop route', () => { beforeEach(() => { vi.clearAllMocks() - + // Drain the once-queue (clearAllMocks/resetDbChainMock don't), then restore defaults. + dbChainMockFns.limit.mockReset() + resetDbChainMock() authMockFns.mockGetSession.mockResolvedValue({ user: { id: 'user-1' } }) - - mockLimit.mockResolvedValue([ - { - workspaceId: 'ws-1', - messages: [{ id: 'stream-1', role: 'user', content: 'hello' }], - conversationId: 'stream-1', - }, - ]) - mockForUpdate.mockReturnValue({ limit: mockLimit }) - mockWhereSelect.mockReturnValue({ for: mockForUpdate }) - mockFrom.mockReturnValue({ where: mockWhereSelect }) - mockSelect.mockReturnValue({ from: mockFrom }) - - mockReturning.mockResolvedValue([{ workspaceId: 'ws-1' }]) - mockWhereUpdate.mockReturnValue({ returning: mockReturning }) - mockSet.mockReturnValue({ where: mockWhereUpdate }) - mockUpdate.mockReturnValue({ set: mockSet }) }) it('returns 401 when unauthenticated', async () => { authMockFns.mockGetSession.mockResolvedValueOnce(null) const response = await POST( - createRequest({ - chatId: 'chat-1', - streamId: 'stream-1', - content: '', - }) + createRequest({ chatId: 'chat-1', streamId: 'stream-1', content: '' }) ) expect(response.status).toBe(401) @@ -132,41 +66,37 @@ describe('copilot chat stop route', () => { }) it('is a no-op when the chat is missing', async () => { - mockLimit.mockResolvedValueOnce([]) + mockReads({ chat: null }) const response = await POST( - createRequest({ - chatId: 'missing-chat', - streamId: 'stream-1', - content: '', - }) + createRequest({ chatId: 'missing-chat', streamId: 'stream-1', content: '' }) ) expect(response.status).toBe(200) expect(await response.json()).toEqual({ success: true }) - expect(mockUpdate).not.toHaveBeenCalled() + expect(mockAppendCopilotChatMessages).not.toHaveBeenCalled() }) it('appends a stopped assistant message even with no content', async () => { + mockReads({ + chat: { workspaceId: 'ws-1', conversationId: 'stream-1', model: null }, + last: { messageId: 'stream-1', role: 'user' }, + }) + const response = await POST( - createRequest({ - chatId: 'chat-1', - streamId: 'stream-1', - content: '', - }) + createRequest({ chatId: 'chat-1', streamId: 'stream-1', content: '' }) ) expect(response.status).toBe(200) expect(await response.json()).toEqual({ success: true }) - const setArg = mockSet.mock.calls[0]?.[0] - expect(setArg).toBeTruthy() + const setArg = dbChainMockFns.set.mock.calls[0]?.[0] as Record expect(setArg.conversationId).toBeNull() - expect(setArg.messages).toBeTruthy() + expect(Object.hasOwn(setArg, 'messages')).toBe(false) - const appendedPayload = JSON.parse(setArg.messages.values[1] as string) - expect(appendedPayload).toHaveLength(1) - expect(appendedPayload[0]).toMatchObject({ + expect(mockAppendCopilotChatMessages).toHaveBeenCalledTimes(1) + const [, appended] = mockAppendCopilotChatMessages.mock.calls[0] + expect(appended[0]).toMatchObject({ role: 'assistant', content: '', contentBlocks: [{ type: 'complete', status: 'cancelled' }], @@ -181,32 +111,21 @@ describe('copilot chat stop route', () => { }) it('appends a stopped assistant message if the stream marker was already cleared', async () => { - mockLimit.mockResolvedValueOnce([ - { - workspaceId: 'ws-1', - messages: [{ id: 'stream-1', role: 'user', content: 'hello' }], - conversationId: null, - }, - ]) + mockReads({ + chat: { workspaceId: 'ws-1', conversationId: null, model: null }, + last: { messageId: 'stream-1', role: 'user' }, + }) const response = await POST( - createRequest({ - chatId: 'chat-1', - streamId: 'stream-1', - content: 'partial', - }) + createRequest({ chatId: 'chat-1', streamId: 'stream-1', content: 'partial' }) ) expect(response.status).toBe(200) expect(await response.json()).toEqual({ success: true }) - const setArg = mockSet.mock.calls[0]?.[0] - expect(setArg.messages).toBeTruthy() - const appendedPayload = JSON.parse(setArg.messages.values[1] as string) - expect(appendedPayload[0]).toMatchObject({ - role: 'assistant', - content: 'partial', - }) + expect(mockAppendCopilotChatMessages).toHaveBeenCalledTimes(1) + const [, appended] = mockAppendCopilotChatMessages.mock.calls[0] + expect(appended[0]).toMatchObject({ role: 'assistant', content: 'partial' }) expect(mockPublishStatusChanged).toHaveBeenCalledWith({ workspaceId: 'ws-1', @@ -217,28 +136,19 @@ describe('copilot chat stop route', () => { }) it('republishes completed status when the assistant was already persisted', async () => { - mockLimit.mockResolvedValueOnce([ - { - workspaceId: 'ws-1', - messages: [ - { id: 'stream-1', role: 'user', content: 'hello' }, - { id: 'assistant-1', role: 'assistant', content: 'partial' }, - ], - conversationId: null, - }, - ]) + mockReads({ + chat: { workspaceId: 'ws-1', conversationId: null, model: null }, + last: { messageId: 'assistant-1', role: 'assistant' }, + }) const response = await POST( - createRequest({ - chatId: 'chat-1', - streamId: 'stream-1', - content: 'partial', - }) + createRequest({ chatId: 'chat-1', streamId: 'stream-1', content: 'partial' }) ) expect(response.status).toBe(200) expect(await response.json()).toEqual({ success: true }) - expect(mockUpdate).not.toHaveBeenCalled() + expect(mockAppendCopilotChatMessages).not.toHaveBeenCalled() + expect(dbChainMockFns.set).not.toHaveBeenCalled() expect(mockPublishStatusChanged).toHaveBeenCalledWith({ workspaceId: 'ws-1', chatId: 'chat-1', diff --git a/apps/sim/app/api/copilot/chat/update-messages/route.test.ts b/apps/sim/app/api/copilot/chat/update-messages/route.test.ts index c56415116ab..2e7dfa134c8 100644 --- a/apps/sim/app/api/copilot/chat/update-messages/route.test.ts +++ b/apps/sim/app/api/copilot/chat/update-messages/route.test.ts @@ -16,6 +16,7 @@ const { mockSet, mockUpdateWhere, mockReturning, + mockReplaceCopilotChatMessages, } = vi.hoisted(() => ({ mockSelect: vi.fn(), mockFrom: vi.fn(), @@ -25,15 +26,23 @@ const { mockSet: vi.fn(), mockUpdateWhere: vi.fn(), mockReturning: vi.fn(), + mockReplaceCopilotChatMessages: vi.fn(), })) vi.mock('@sim/db', () => ({ db: { select: mockSelect, update: mockUpdate, + transaction: async ( + cb: (tx: { update: typeof mockUpdate; select: typeof mockSelect }) => unknown + ) => cb({ update: mockUpdate, select: mockSelect }), }, })) +vi.mock('@/lib/copilot/chat/messages-store', () => ({ + replaceCopilotChatMessages: mockReplaceCopilotChatMessages, +})) + vi.mock('drizzle-orm', () => ({ and: vi.fn((...conditions: unknown[]) => ({ conditions, type: 'and' })), eq: vi.fn((field: unknown, value: unknown) => ({ field, value, type: 'eq' })), @@ -257,10 +266,13 @@ describe('Copilot Chat Update Messages API Route', () => { expect(mockSelect).toHaveBeenCalled() expect(mockUpdate).toHaveBeenCalled() - expect(mockSet).toHaveBeenCalledWith({ + expect(mockSet).toHaveBeenCalledWith({ updatedAt: expect.any(Date) }) + expect(mockReplaceCopilotChatMessages).toHaveBeenCalledWith( + 'chat-123', messages, - updatedAt: expect.any(Date), - }) + { chatModel: 'gpt-4' }, + expect.anything() + ) }) it('should successfully update chat messages with optional fields', async () => { @@ -315,8 +327,10 @@ describe('Copilot Chat Update Messages API Route', () => { messageCount: 2, }) - expect(mockSet).toHaveBeenCalledWith({ - messages: [ + expect(mockSet).toHaveBeenCalledWith({ updatedAt: expect.any(Date) }) + expect(mockReplaceCopilotChatMessages).toHaveBeenCalledWith( + 'chat-456', + [ { id: 'msg-1', role: 'user', @@ -345,8 +359,9 @@ describe('Copilot Chat Update Messages API Route', () => { ], }, ], - updatedAt: expect.any(Date), - }) + { chatModel: 'gpt-4' }, + expect.anything() + ) }) it('should handle empty messages array', async () => { @@ -373,10 +388,13 @@ describe('Copilot Chat Update Messages API Route', () => { messageCount: 0, }) - expect(mockSet).toHaveBeenCalledWith({ - messages: [], - updatedAt: expect.any(Date), - }) + expect(mockSet).toHaveBeenCalledWith({ updatedAt: expect.any(Date) }) + expect(mockReplaceCopilotChatMessages).toHaveBeenCalledWith( + 'chat-789', + [], + { chatModel: 'gpt-4' }, + expect.anything() + ) }) it('should handle database errors during chat lookup', async () => { @@ -485,10 +503,13 @@ describe('Copilot Chat Update Messages API Route', () => { messageCount: 100, }) - expect(mockSet).toHaveBeenCalledWith({ + expect(mockSet).toHaveBeenCalledWith({ updatedAt: expect.any(Date) }) + expect(mockReplaceCopilotChatMessages).toHaveBeenCalledWith( + 'chat-large', messages, - updatedAt: expect.any(Date), - }) + { chatModel: 'gpt-4' }, + expect.anything() + ) }) it('should handle messages with both user and assistant roles', async () => { diff --git a/apps/sim/app/api/copilot/chat/update-messages/route.ts b/apps/sim/app/api/copilot/chat/update-messages/route.ts index 1b654c4930a..7c7792e3f2c 100644 --- a/apps/sim/app/api/copilot/chat/update-messages/route.ts +++ b/apps/sim/app/api/copilot/chat/update-messages/route.ts @@ -6,7 +6,7 @@ import { type NextRequest, NextResponse } from 'next/server' import { updateCopilotMessagesContract } from '@/lib/api/contracts/copilot' import { parseRequest } from '@/lib/api/server' import { getAccessibleCopilotChatAuth } from '@/lib/copilot/chat/lifecycle' -import { replaceCopilotChatMessages } from '@/lib/copilot/chat/messages-dual-write' +import { replaceCopilotChatMessages } from '@/lib/copilot/chat/messages-store' import { normalizeMessage, type PersistedMessage } from '@/lib/copilot/chat/persisted-message' import { authenticateCopilotRequestSessionOnly, @@ -73,9 +73,7 @@ export const POST = withRouteHandler(async (req: NextRequest) => { return createNotFoundResponse('Chat not found or unauthorized') } - // Update chat with new messages, plan artifact, and config const updateData: Record = { - messages: normalizedMessages, updatedAt: new Date(), } @@ -87,16 +85,20 @@ export const POST = withRouteHandler(async (req: NextRequest) => { updateData.config = config } - const [updated] = await db - .update(copilotChats) - .set(updateData) - .where(eq(copilotChats.id, chatId)) - .returning({ model: copilotChats.model }) - if (updated) { - await replaceCopilotChatMessages(chatId, normalizedMessages, { - chatModel: updated.model ?? null, - }) - } + await db.transaction(async (tx) => { + const [updated] = await tx + .update(copilotChats) + .set(updateData) + .where(eq(copilotChats.id, chatId)) + .returning({ model: copilotChats.model }) + if (!updated) return + await replaceCopilotChatMessages( + chatId, + normalizedMessages, + { chatModel: updated.model ?? null }, + tx + ) + }) logger.info(`[${tracker.requestId}] Successfully updated chat`, { chatId, diff --git a/apps/sim/app/api/mothership/chats/[chatId]/fork/route.ts b/apps/sim/app/api/mothership/chats/[chatId]/fork/route.ts index 54f4cc588ee..fec6bc6c192 100644 --- a/apps/sim/app/api/mothership/chats/[chatId]/fork/route.ts +++ b/apps/sim/app/api/mothership/chats/[chatId]/fork/route.ts @@ -6,8 +6,8 @@ import { eq } from 'drizzle-orm' import { type NextRequest, NextResponse } from 'next/server' import { forkMothershipChatContract } from '@/lib/api/contracts/mothership-tasks' import { parseRequest } from '@/lib/api/server' -import { appendCopilotChatMessages } from '@/lib/copilot/chat/messages-dual-write' -import type { PersistedMessage } from '@/lib/copilot/chat/persisted-message' +import { loadCopilotChatMessages } from '@/lib/copilot/chat/lifecycle' +import { appendCopilotChatMessages } from '@/lib/copilot/chat/messages-store' import { fetchGo } from '@/lib/copilot/request/go/fetch' import { authenticateCopilotRequestSessionOnly, @@ -50,9 +50,19 @@ export const POST = withRouteHandler( const { chatId } = parsed.data.params const { upToMessageId } = parsed.data.body - // Load parent chat and verify ownership. const [parent] = await db - .select() + .select({ + id: copilotChats.id, + userId: copilotChats.userId, + type: copilotChats.type, + workspaceId: copilotChats.workspaceId, + title: copilotChats.title, + model: copilotChats.model, + resources: copilotChats.resources, + previewYaml: copilotChats.previewYaml, + planArtifact: copilotChats.planArtifact, + config: copilotChats.config, + }) .from(copilotChats) .where(eq(copilotChats.id, chatId)) .limit(1) @@ -65,8 +75,7 @@ export const POST = withRouteHandler( await assertActiveWorkspaceAccess(parent.workspaceId, userId) } - // Find the fork point in the Sim-side messages array. - const messages = Array.isArray(parent.messages) ? (parent.messages as PersistedMessage[]) : [] + const messages = await loadCopilotChatMessages(chatId) const forkIdx = messages.findIndex((m) => m.id === upToMessageId) if (forkIdx < 0) { return createBadRequestResponse('Message not found in chat') @@ -83,32 +92,36 @@ export const POST = withRouteHandler( const title = `Fork | ${baseTitle}` const now = new Date() - const [newChat] = await db - .insert(copilotChats) - .values({ - id: newId, - userId, - workspaceId: parent.workspaceId, - type: parent.type, - title, - model: parent.model, - messages: forkedMessages, - resources: parentResources, - previewYaml: parent.previewYaml, - planArtifact: parent.planArtifact, - config: parent.config, - conversationId: null, - updatedAt: now, - lastSeenAt: now, - }) - .returning({ id: copilotChats.id, workspaceId: copilotChats.workspaceId }) + const newChat = await db.transaction(async (tx) => { + const [row] = await tx + .insert(copilotChats) + .values({ + id: newId, + userId, + workspaceId: parent.workspaceId, + type: parent.type, + title, + model: parent.model, + resources: parentResources, + previewYaml: parent.previewYaml, + planArtifact: parent.planArtifact, + config: parent.config, + conversationId: null, + updatedAt: now, + lastSeenAt: now, + }) + .returning({ id: copilotChats.id, workspaceId: copilotChats.workspaceId }) + + if (!row) return null + + await appendCopilotChatMessages(newId, forkedMessages, { chatModel: parent.model }, tx) + return row + }) if (!newChat) { return createInternalServerErrorResponse('Failed to create forked chat') } - await appendCopilotChatMessages(newId, forkedMessages, { chatModel: parent.model }) - // Clone copilot-service conversation state (messages, active_messages, memory files). // Best-effort: if the copilot service doesn't have a row for the source chat yet, skip. try { diff --git a/apps/sim/app/api/mothership/chats/route.ts b/apps/sim/app/api/mothership/chats/route.ts index 1b7157fdde5..c5610da215d 100644 --- a/apps/sim/app/api/mothership/chats/route.ts +++ b/apps/sim/app/api/mothership/chats/route.ts @@ -106,7 +106,6 @@ export const POST = withRouteHandler(async (request: NextRequest) => { type: 'mothership', title: null, model: 'claude-opus-4-6', - messages: [], updatedAt: now, lastSeenAt: now, }) diff --git a/apps/sim/app/api/superuser/import-workflow/route.ts b/apps/sim/app/api/superuser/import-workflow/route.ts index 912278040bb..07ab98932be 100644 --- a/apps/sim/app/api/superuser/import-workflow/route.ts +++ b/apps/sim/app/api/superuser/import-workflow/route.ts @@ -7,8 +7,8 @@ import { type NextRequest, NextResponse } from 'next/server' import { importWorkflowAsSuperuserContract } from '@/lib/api/contracts/workflows' import { parseRequest } from '@/lib/api/server' import { getSession } from '@/lib/auth' -import { appendCopilotChatMessages } from '@/lib/copilot/chat/messages-dual-write' -import type { PersistedMessage } from '@/lib/copilot/chat/persisted-message' +import { loadCopilotChatMessages } from '@/lib/copilot/chat/lifecycle' +import { appendCopilotChatMessages } from '@/lib/copilot/chat/messages-store' import { withRouteHandler } from '@/lib/core/utils/with-route-handler' import { verifyEffectiveSuperUser } from '@/lib/templates/permissions' import { parseWorkflowJson } from '@/lib/workflows/operations/import-export' @@ -167,34 +167,46 @@ export const POST = withRouteHandler(async (request: NextRequest) => { // Copy copilot chats associated with the source workflow const sourceCopilotChats = await db - .select() + .select({ + id: copilotChats.id, + title: copilotChats.title, + model: copilotChats.model, + previewYaml: copilotChats.previewYaml, + planArtifact: copilotChats.planArtifact, + config: copilotChats.config, + }) .from(copilotChats) .where(eq(copilotChats.workflowId, workflowId)) let copilotChatsImported = 0 for (const chat of sourceCopilotChats) { - const [imported] = await db - .insert(copilotChats) - .values({ - userId: session.user.id, - workflowId: newWorkflowId, - title: chat.title ? `[Import] ${chat.title}` : null, - messages: chat.messages, - model: chat.model, - conversationId: null, // Don't copy conversation ID - previewYaml: chat.previewYaml, - planArtifact: chat.planArtifact, - config: chat.config, - createdAt: new Date(), - updatedAt: new Date(), - }) - .returning({ id: copilotChats.id }) - if (imported && Array.isArray(chat.messages) && chat.messages.length > 0) { - await appendCopilotChatMessages(imported.id, chat.messages as PersistedMessage[], { - chatModel: chat.model, - }) - } + const sourceMessages = await loadCopilotChatMessages(chat.id) + await db.transaction(async (tx) => { + const [imported] = await tx + .insert(copilotChats) + .values({ + userId: session.user.id, + workflowId: newWorkflowId, + title: chat.title ? `[Import] ${chat.title}` : null, + model: chat.model, + conversationId: null, // Don't copy conversation ID + previewYaml: chat.previewYaml, + planArtifact: chat.planArtifact, + config: chat.config, + createdAt: new Date(), + updatedAt: new Date(), + }) + .returning({ id: copilotChats.id }) + if (imported && sourceMessages.length > 0) { + await appendCopilotChatMessages( + imported.id, + sourceMessages, + { chatModel: chat.model }, + tx + ) + } + }) copilotChatsImported++ } diff --git a/apps/sim/lib/cleanup/chat-cleanup.ts b/apps/sim/lib/cleanup/chat-cleanup.ts index b154687bc37..c5dafdf9c27 100644 --- a/apps/sim/lib/cleanup/chat-cleanup.ts +++ b/apps/sim/lib/cleanup/chat-cleanup.ts @@ -1,5 +1,5 @@ import { db } from '@sim/db' -import { copilotChats, workspaceFiles } from '@sim/db/schema' +import { copilotMessages, workspaceFiles } from '@sim/db/schema' import { createLogger } from '@sim/logger' import { and, inArray, isNull } from 'drizzle-orm' import { chunkArray } from '@/lib/cleanup/batch-delete' @@ -11,7 +11,7 @@ import { isUsingCloudStorage, StorageService } from '@/lib/uploads' const logger = createLogger('ChatCleanup') const COPILOT_CLEANUP_BATCH_SIZE = 1000 -/** Bounds JSONB detoast memory: `messages` can be MBs per row. */ +/** Bounds how many chats' `copilot_messages` rows are scanned per query. */ const CHAT_FILE_COLLECT_CHUNK_SIZE = 500 /** @@ -31,7 +31,7 @@ interface FileRef { /** * Collect all file storage keys for the given chat IDs from two sources: * 1. workspaceFiles rows with chatId FK (chat-scoped contexts only) - * 2. fileAttachments[].key inside copilotChats.messages JSONB + * 2. fileAttachments[].key inside each copilot_messages.content */ export async function collectChatFiles(chatIds: string[]): Promise { const files: FileRef[] = [] @@ -40,7 +40,7 @@ export async function collectChatFiles(chatIds: string[]): Promise { const seen = new Set() for (const chunk of chunkArray(chatIds, CHAT_FILE_COLLECT_CHUNK_SIZE)) { - const [linkedFiles, chatsWithMessages] = await Promise.all([ + const [linkedFiles, messageRows] = await Promise.all([ db .select({ key: workspaceFiles.key, context: workspaceFiles.context }) .from(workspaceFiles) @@ -51,10 +51,12 @@ export async function collectChatFiles(chatIds: string[]): Promise { inArray(workspaceFiles.context, [...CHAT_SCOPED_CONTEXTS]) ) ), + // Scan every message row for the chat (no deleted_at filter): this is a + // deletion path collecting blob keys, so attachments on any row count. db - .select({ messages: copilotChats.messages }) - .from(copilotChats) - .where(inArray(copilotChats.id, chunk)), + .select({ content: copilotMessages.content }) + .from(copilotMessages) + .where(inArray(copilotMessages.chatId, chunk)), ]) for (const f of linkedFiles) { @@ -64,24 +66,21 @@ export async function collectChatFiles(chatIds: string[]): Promise { } } - for (const chat of chatsWithMessages) { - const messages = chat.messages as unknown[] - if (!Array.isArray(messages)) continue - for (const msg of messages) { - if (!msg || typeof msg !== 'object') continue - const attachments = (msg as Record).fileAttachments - if (!Array.isArray(attachments)) continue - for (const attachment of attachments) { - if ( - attachment && - typeof attachment === 'object' && - (attachment as Record).key - ) { - const key = (attachment as Record).key as string - if (!seen.has(key)) { - seen.add(key) - files.push({ key, context: 'copilot' }) - } + for (const row of messageRows) { + const msg = row.content + if (!msg || typeof msg !== 'object') continue + const attachments = (msg as Record).fileAttachments + if (!Array.isArray(attachments)) continue + for (const attachment of attachments) { + if ( + attachment && + typeof attachment === 'object' && + (attachment as Record).key + ) { + const key = (attachment as Record).key as string + if (!seen.has(key)) { + seen.add(key) + files.push({ key, context: 'copilot' }) } } } diff --git a/apps/sim/lib/copilot/chat/lifecycle.test.ts b/apps/sim/lib/copilot/chat/lifecycle.test.ts index df1689cc689..3ced9edfaec 100644 --- a/apps/sim/lib/copilot/chat/lifecycle.test.ts +++ b/apps/sim/lib/copilot/chat/lifecycle.test.ts @@ -122,13 +122,15 @@ describe('lifecycle copilot chat reads (cutover to copilot_messages)', () => { }) it('resolveOrCreateChat creates a new chat with an empty transcript', async () => { - // insert().values().returning() -> fresh chat with empty messages - dbChainMockFns.returning.mockResolvedValueOnce([{ ...chatRow, messages: [] }]) + dbChainMockFns.returning.mockResolvedValueOnce([chatRow]) const result = await resolveOrCreateChat({ userId: USER_ID, model: 'm' }) expect(result.isNew).toBe(true) expect(result.conversationHistory).toEqual([]) + expect(result.chat?.messages).toEqual([]) + const insertValues = dbChainMockFns.values.mock.calls[0]?.[0] as Record + expect(Object.hasOwn(insertValues, 'messages')).toBe(false) // a brand-new chat must not trigger a messages read expect(dbChainMockFns.orderBy).not.toHaveBeenCalled() }) diff --git a/apps/sim/lib/copilot/chat/lifecycle.ts b/apps/sim/lib/copilot/chat/lifecycle.ts index e86c127c104..1fcb5e51af6 100644 --- a/apps/sim/lib/copilot/chat/lifecycle.ts +++ b/apps/sim/lib/copilot/chat/lifecycle.ts @@ -6,6 +6,7 @@ import { getActiveWorkflowRecord, } from '@sim/workflow-authz' import { and, asc, eq, isNull, sql } from 'drizzle-orm' +import type { PersistedMessage } from '@/lib/copilot/chat/persisted-message' import { assertActiveWorkspaceAccess, checkWorkspaceAccess, @@ -52,16 +53,6 @@ const copilotChatDetailColumns = { updatedAt: copilotChats.updatedAt, } as const -/** - * Returning column set for newly-inserted chats. A fresh chat has no - * `copilot_messages` rows yet, so the transcript is the just-inserted empty - * JSONB array — return it directly rather than issuing a second query. - */ -const copilotChatDetailReturningColumns = { - ...copilotChatDetailColumns, - messages: copilotChats.messages, -} as const - /** * Column set for the legacy copilot chat detail endpoint. Extends * `copilotChatDetailColumns` with `model`, `planArtifact`, and `config` — the @@ -83,7 +74,7 @@ const copilotChatLegacyDetailColumns = { * to a legacy JSONB array element — so the downstream normalize/transcript * pipeline is unchanged. */ -async function loadCopilotChatMessages(chatId: string): Promise[]> { +export async function loadCopilotChatMessages(chatId: string): Promise { const rows = await db .select({ content: copilotMessages.content }) .from(copilotMessages) @@ -93,7 +84,7 @@ async function loadCopilotChatMessages(chatId: string): Promise row.content as Record) + return rows.map((row) => row.content as PersistedMessage) } type CopilotChatAuthRow = Pick< @@ -298,10 +289,9 @@ export async function resolveOrCreateChat(params: { type: type ?? 'copilot', title: null, model, - messages: [], lastSeenAt: now, }) - .returning(copilotChatDetailReturningColumns) + .returning(copilotChatDetailColumns) if (!newChat) { logger.warn('Failed to create new copilot chat row', { userId, workflowId, workspaceId }) @@ -315,7 +305,7 @@ export async function resolveOrCreateChat(params: { return { chatId: newChat.id, - chat: newChat, + chat: { ...newChat, messages: [] }, conversationHistory: [], isNew: true, } diff --git a/apps/sim/lib/copilot/chat/messages-dual-write.ts b/apps/sim/lib/copilot/chat/messages-dual-write.ts deleted file mode 100644 index 54e98afe3a0..00000000000 --- a/apps/sim/lib/copilot/chat/messages-dual-write.ts +++ /dev/null @@ -1,143 +0,0 @@ -import { db } from '@sim/db' -import { copilotMessages } from '@sim/db/schema' -import { createLogger } from '@sim/logger' -import { getErrorMessage } from '@sim/utils/errors' -import { and, eq, notInArray, sql } from 'drizzle-orm' -import type { PersistedMessage } from '@/lib/copilot/chat/persisted-message' - -const logger = createLogger('CopilotMessagesDualWrite') - -/** - * Keep the first occurrence of each message id. A single `INSERT ... ON - * CONFLICT` cannot touch the same conflict target twice, so a repeated id - * would otherwise throw. - */ -function dedupeById(messages: PersistedMessage[]): PersistedMessage[] { - const seen = new Set() - const out: PersistedMessage[] = [] - for (const m of messages) { - if (seen.has(m.id)) continue - seen.add(m.id) - out.push(m) - } - return out -} - -function toRow( - chatId: string, - message: PersistedMessage, - seq: number, - options?: { chatModel?: string | null; streamId?: string | null } -): typeof copilotMessages.$inferInsert { - const ts = new Date(message.timestamp) - return { - chatId, - messageId: message.id, - role: message.role, - content: message, - seq, - model: options?.chatModel ?? null, - streamId: options?.streamId ?? null, - createdAt: ts, - updatedAt: ts, - } -} - -/** - * Append messages to the new `copilot_messages` table. Best-effort — errors - * are logged but never thrown; the legacy `copilot_chats.messages` JSONB - * column stays the source of truth during the dual-write rollout. - * - * `seq` is `MAX(seq) + index`, computed in JS (not in SQL, where every row of - * a multi-row INSERT would read the same pre-insert MAX and collide). The - * read-then-insert is non-atomic, so interleaved appends to one chat can tie - * `seq`; that window is bounded by the cutover read order (`seq, created_at, - * id`) and `replaceCopilotChatMessages`, which re-densifies `seq` from the - * authoritative JSONB order on the next snapshot save. - */ -export async function appendCopilotChatMessages( - chatId: string, - messages: PersistedMessage[], - options?: { chatModel?: string | null; streamId?: string | null } -): Promise { - if (messages.length === 0) return - try { - const deduped = dedupeById(messages) - const [maxRow] = await db - .select({ maxSeq: sql`max(${copilotMessages.seq})` }) - .from(copilotMessages) - .where(eq(copilotMessages.chatId, chatId)) - const base = (maxRow?.maxSeq ?? -1) + 1 - await db - .insert(copilotMessages) - .values(deduped.map((m, i) => toRow(chatId, m, base + i, options))) - .onConflictDoUpdate({ - target: [copilotMessages.chatId, copilotMessages.messageId], - set: { - content: sql`excluded.content`, - role: sql`excluded.role`, - model: sql`COALESCE(excluded.model, ${copilotMessages.model})`, - streamId: sql`COALESCE(excluded.stream_id, ${copilotMessages.streamId})`, - seq: sql`COALESCE(${copilotMessages.seq}, excluded.seq)`, - updatedAt: sql`now()`, - }, - }) - } catch (err) { - logger.warn('Failed to append copilot chat messages', { - chatId, - messageCount: messages.length, - error: getErrorMessage(err), - }) - } -} - -/** - * Replace all messages for a chat. Used by the update-messages endpoint that - * receives a full snapshot of the conversation state. Best-effort. - */ -export async function replaceCopilotChatMessages( - chatId: string, - messages: PersistedMessage[], - options?: { chatModel?: string | null } -): Promise { - try { - const deduped = dedupeById(messages) - const newMessageIds = deduped.map((m) => m.id) - await db.transaction(async (tx) => { - // Drop rows for messages not in the new snapshot. - await tx - .delete(copilotMessages) - .where( - newMessageIds.length > 0 - ? and( - eq(copilotMessages.chatId, chatId), - notInArray(copilotMessages.messageId, newMessageIds) - ) - : eq(copilotMessages.chatId, chatId) - ) - if (deduped.length === 0) return - // Snapshot is authoritative on order, so seq = array index is overwritten - // on conflict; stream_id / model are preserved via COALESCE. - await tx - .insert(copilotMessages) - .values(deduped.map((m, i) => toRow(chatId, m, i, options))) - .onConflictDoUpdate({ - target: [copilotMessages.chatId, copilotMessages.messageId], - set: { - content: sql`excluded.content`, - role: sql`excluded.role`, - model: sql`COALESCE(excluded.model, ${copilotMessages.model})`, - streamId: sql`COALESCE(excluded.stream_id, ${copilotMessages.streamId})`, - seq: sql`excluded.seq`, - updatedAt: sql`now()`, - }, - }) - }) - } catch (err) { - logger.warn('Failed to replace copilot chat messages', { - chatId, - messageCount: messages.length, - error: getErrorMessage(err), - }) - } -} diff --git a/apps/sim/lib/copilot/chat/messages-dual-write.test.ts b/apps/sim/lib/copilot/chat/messages-store.test.ts similarity index 95% rename from apps/sim/lib/copilot/chat/messages-dual-write.test.ts rename to apps/sim/lib/copilot/chat/messages-store.test.ts index 17d3e1666c5..a96cff12508 100644 --- a/apps/sim/lib/copilot/chat/messages-dual-write.test.ts +++ b/apps/sim/lib/copilot/chat/messages-store.test.ts @@ -9,7 +9,7 @@ vi.mock('@sim/db', () => dbChainMock) import { appendCopilotChatMessages, replaceCopilotChatMessages, -} from '@/lib/copilot/chat/messages-dual-write' +} from '@/lib/copilot/chat/messages-store' import type { PersistedMessage } from '@/lib/copilot/chat/persisted-message' const userMsg: PersistedMessage = { @@ -32,7 +32,7 @@ function lastValuesRows() { return calls[calls.length - 1][0] as Array> } -describe('messages-dual-write', () => { +describe('messages-store', () => { beforeEach(() => { vi.clearAllMocks() resetDbChainMock() @@ -124,10 +124,12 @@ describe('messages-dual-write', () => { expect(rows[0].messageId).toBe('msg-user-1') }) - it('swallows DB errors so the legacy JSONB write stays canonical', async () => { + it('propagates DB errors — copilot_messages is the sole store', async () => { dbChainMockFns.onConflictDoUpdate.mockRejectedValueOnce(new Error('connection lost')) - await expect(appendCopilotChatMessages('chat-1', [userMsg])).resolves.toBeUndefined() + await expect(appendCopilotChatMessages('chat-1', [userMsg])).rejects.toThrow( + 'connection lost' + ) }) }) @@ -185,10 +187,10 @@ describe('messages-dual-write', () => { expect(rows[0].model).toBe('gpt-4o-mini') }) - it('swallows DB errors so the legacy JSONB write stays canonical', async () => { + it('propagates DB errors — the snapshot is authoritative', async () => { dbChainMockFns.transaction.mockRejectedValueOnce(new Error('tx aborted')) - await expect(replaceCopilotChatMessages('chat-1', [userMsg])).resolves.toBeUndefined() + await expect(replaceCopilotChatMessages('chat-1', [userMsg])).rejects.toThrow('tx aborted') }) }) }) diff --git a/apps/sim/lib/copilot/chat/messages-store.ts b/apps/sim/lib/copilot/chat/messages-store.ts new file mode 100644 index 00000000000..485a76ead3f --- /dev/null +++ b/apps/sim/lib/copilot/chat/messages-store.ts @@ -0,0 +1,122 @@ +import { db } from '@sim/db' +import { copilotMessages } from '@sim/db/schema' +import { and, eq, notInArray, sql } from 'drizzle-orm' +import type { PersistedMessage } from '@/lib/copilot/chat/persisted-message' +import type { DbOrTx } from '@/lib/db/types' + +/** + * Keep the first occurrence of each message id. A single `INSERT ... ON + * CONFLICT` cannot touch the same conflict target twice, so a repeated id + * would otherwise throw. + */ +function dedupeById(messages: PersistedMessage[]): PersistedMessage[] { + const seen = new Set() + const out: PersistedMessage[] = [] + for (const m of messages) { + if (seen.has(m.id)) continue + seen.add(m.id) + out.push(m) + } + return out +} + +function toRow( + chatId: string, + message: PersistedMessage, + seq: number, + options?: { chatModel?: string | null; streamId?: string | null } +): typeof copilotMessages.$inferInsert { + const ts = new Date(message.timestamp) + return { + chatId, + messageId: message.id, + role: message.role, + content: message, + seq, + model: options?.chatModel ?? null, + streamId: options?.streamId ?? null, + createdAt: ts, + updatedAt: ts, + } +} + +/** + * Append messages to the `copilot_messages` table — the sole store for chat + * transcripts. Throws on failure (a swallowed write would lose messages). + * Pass `executor` to enlist the write in an existing transaction. + * + * `seq` is `MAX(seq) + index`, computed in JS. The read-then-insert is + * non-atomic, but per-chat appends are serialized by the pending-stream lock + * and the `seq, created_at, id` read order breaks any residual tie. + */ +export async function appendCopilotChatMessages( + chatId: string, + messages: PersistedMessage[], + options?: { chatModel?: string | null; streamId?: string | null }, + executor: DbOrTx = db +): Promise { + if (messages.length === 0) return + const deduped = dedupeById(messages) + const [maxRow] = await executor + .select({ maxSeq: sql`max(${copilotMessages.seq})` }) + .from(copilotMessages) + .where(eq(copilotMessages.chatId, chatId)) + const base = (maxRow?.maxSeq ?? -1) + 1 + await executor + .insert(copilotMessages) + .values(deduped.map((m, i) => toRow(chatId, m, base + i, options))) + .onConflictDoUpdate({ + target: [copilotMessages.chatId, copilotMessages.messageId], + set: { + content: sql`excluded.content`, + role: sql`excluded.role`, + model: sql`COALESCE(excluded.model, ${copilotMessages.model})`, + streamId: sql`COALESCE(excluded.stream_id, ${copilotMessages.streamId})`, + seq: sql`COALESCE(${copilotMessages.seq}, excluded.seq)`, + updatedAt: sql`now()`, + }, + }) +} + +/** + * Replace all messages for a chat from a full snapshot (used by update-messages). + * Throws on failure. Pass `executor` to enlist the delete+insert in an existing + * transaction; otherwise it runs in its own. + */ +export async function replaceCopilotChatMessages( + chatId: string, + messages: PersistedMessage[], + options?: { chatModel?: string | null }, + executor?: DbOrTx +): Promise { + const deduped = dedupeById(messages) + const newMessageIds = deduped.map((m) => m.id) + const run = async (tx: DbOrTx) => { + await tx + .delete(copilotMessages) + .where( + newMessageIds.length > 0 + ? and( + eq(copilotMessages.chatId, chatId), + notInArray(copilotMessages.messageId, newMessageIds) + ) + : eq(copilotMessages.chatId, chatId) + ) + if (deduped.length === 0) return + await tx + .insert(copilotMessages) + .values(deduped.map((m, i) => toRow(chatId, m, i, options))) + .onConflictDoUpdate({ + target: [copilotMessages.chatId, copilotMessages.messageId], + set: { + content: sql`excluded.content`, + role: sql`excluded.role`, + model: sql`COALESCE(excluded.model, ${copilotMessages.model})`, + streamId: sql`COALESCE(excluded.stream_id, ${copilotMessages.streamId})`, + seq: sql`excluded.seq`, + updatedAt: sql`now()`, + }, + }) + } + await (executor ? run(executor) : db.transaction(run)) +} diff --git a/apps/sim/lib/copilot/chat/post.test.ts b/apps/sim/lib/copilot/chat/post.test.ts index 8b937704ac6..c2271ece05b 100644 --- a/apps/sim/lib/copilot/chat/post.test.ts +++ b/apps/sim/lib/copilot/chat/post.test.ts @@ -28,6 +28,7 @@ const { releasePendingChatStream, resolveOrCreateChat, finalizeAssistantTurn, + appendCopilotChatMessages, mockPublishStatusChanged, } = vi.hoisted(() => ({ getEffectiveDecryptedEnv: vi.fn(), @@ -41,6 +42,7 @@ const { releasePendingChatStream: vi.fn(), resolveOrCreateChat: vi.fn(), finalizeAssistantTurn: vi.fn(), + appendCopilotChatMessages: vi.fn(), mockPublishStatusChanged: vi.fn(), })) @@ -86,30 +88,40 @@ vi.mock('@/lib/copilot/chat/terminal-state', () => ({ finalizeAssistantTurn, })) +vi.mock('@/lib/copilot/chat/messages-store', () => ({ + appendCopilotChatMessages, +})) + vi.mock('@/lib/copilot/tasks', () => ({ taskPubSub: { publishStatusChanged: mockPublishStatusChanged, }, })) -vi.mock('@sim/db', () => ({ - db: { - update: vi.fn(() => ({ - set: vi.fn(() => ({ - where: vi.fn(() => ({ - returning: vi.fn().mockResolvedValue([]), - })), +vi.mock('@sim/db', () => { + const update = vi.fn(() => ({ + set: vi.fn(() => ({ + where: vi.fn(() => ({ + returning: vi.fn().mockResolvedValue([]), })), })), - select: vi.fn(() => ({ - from: vi.fn(() => ({ - where: vi.fn(() => ({ - limit: vi.fn().mockResolvedValue([{ permissionType: 'write' }]), - })), + })) + const select = vi.fn(() => ({ + from: vi.fn(() => ({ + where: vi.fn(() => ({ + limit: vi.fn().mockResolvedValue([{ permissionType: 'write' }]), })), })), - }, -})) + })) + return { + db: { + update, + select, + transaction: async (cb: (tx: { update: typeof update; select: typeof select }) => unknown) => + cb({ update, select }), + }, + } +}) vi.mock('drizzle-orm', () => ({ and: vi.fn(() => ({})), diff --git a/apps/sim/lib/copilot/chat/post.ts b/apps/sim/lib/copilot/chat/post.ts index 9c1d5950d0b..31a0c5fce9d 100644 --- a/apps/sim/lib/copilot/chat/post.ts +++ b/apps/sim/lib/copilot/chat/post.ts @@ -4,13 +4,13 @@ import { copilotChats, permissions } from '@sim/db/schema' import { createLogger } from '@sim/logger' import { getErrorMessage } from '@sim/utils/errors' import { generateId } from '@sim/utils/id' -import { and, eq, sql } from 'drizzle-orm' +import { and, eq } from 'drizzle-orm' import { type NextRequest, NextResponse } from 'next/server' import { z } from 'zod' import { isZodError, validationErrorResponse } from '@/lib/api/server' import { getSession } from '@/lib/auth' import { type ChatLoadResult, resolveOrCreateChat } from '@/lib/copilot/chat/lifecycle' -import { appendCopilotChatMessages } from '@/lib/copilot/chat/messages-dual-write' +import { appendCopilotChatMessages } from '@/lib/copilot/chat/messages-store' import { buildCopilotRequestPayload } from '@/lib/copilot/chat/payload' import { buildPersistedAssistantMessage, @@ -293,7 +293,7 @@ async function persistUserMessage(params: { * span parented to the about-to-be-dropped Next.js HTTP span. */ parentOtelContext?: OtelContext -}): Promise { +}): Promise { const { chatId, userMessageId, @@ -304,7 +304,7 @@ async function persistUserMessage(params: { notifyWorkspaceStatus, parentOtelContext, } = params - if (!chatId) return undefined + if (!chatId) return return withCopilotSpan( TraceSpan.CopilotChatPersistUserMessage, @@ -326,31 +326,32 @@ async function persistUserMessage(params: { contexts, }) - const [updated] = await db - .update(copilotChats) - .set({ - messages: sql`${copilotChats.messages} || ${JSON.stringify([userMsg])}::jsonb`, - conversationId: userMessageId, - updatedAt: new Date(), - }) - .where(eq(copilotChats.id, chatId)) - .returning({ messages: copilotChats.messages, model: copilotChats.model }) + const updated = await db.transaction(async (tx) => { + const [row] = await tx + .update(copilotChats) + .set({ + conversationId: userMessageId, + updatedAt: new Date(), + }) + .where(eq(copilotChats.id, chatId)) + .returning({ model: copilotChats.model }) - if (updated) { - await appendCopilotChatMessages(chatId, [userMsg], { - streamId: userMessageId, - chatModel: updated.model ?? null, - }) - } + if (!row) return null - const messagesAfter = Array.isArray(updated?.messages) ? updated.messages : undefined - span.setAttributes({ - [TraceAttr.ChatPersistOutcome]: updated - ? CopilotChatPersistOutcome.Appended - : CopilotChatPersistOutcome.ChatNotFound, - [TraceAttr.ChatMessagesAfter]: messagesAfter?.length ?? 0, + await appendCopilotChatMessages( + chatId, + [userMsg], + { streamId: userMessageId, chatModel: row.model ?? null }, + tx + ) + return row }) + span.setAttribute( + TraceAttr.ChatPersistOutcome, + updated ? CopilotChatPersistOutcome.Appended : CopilotChatPersistOutcome.ChatNotFound + ) + if (notifyWorkspaceStatus && updated && workspaceId) { taskPubSub?.publishStatusChanged({ workspaceId, @@ -359,8 +360,6 @@ async function persistUserMessage(params: { streamId: userMessageId, }) } - - return messagesAfter }, parentOtelContext ) @@ -885,7 +884,7 @@ export async function handleUnifiedChatPost(req: NextRequest) { }), activeOtelRoot.context ) - const persistedMessagesPromise = persistUserMessage({ + const persistUserMessagePromise = persistUserMessage({ chatId: actualChatId, userMessageId, message: body.message, @@ -908,24 +907,17 @@ export async function handleUnifiedChatPost(req: NextRequest) { activeOtelRoot.context ) - const [agentContexts, userPermission, workspaceContext, persistedMessages, executionContext] = + const [agentContexts, userPermission, workspaceContext, , executionContext] = await Promise.all([ agentContextsPromise, userPermissionPromise, workspaceContextPromise, - persistedMessagesPromise, + persistUserMessagePromise, executionContextPromise, ]) executionContext.userPermission = userPermission ?? undefined - if (persistedMessages) { - conversationHistory = persistedMessages.filter((message) => { - const record = message as Record - return record.id !== userMessageId - }) - } - // buildPayload is the last synchronous step before the outbound // Sim → Go HTTP call. It runs per-tool schema generation (subscription // lookup + registry iteration, cached 30s) and file upload tracking diff --git a/apps/sim/lib/copilot/chat/terminal-state.test.ts b/apps/sim/lib/copilot/chat/terminal-state.test.ts index cf4a230bf31..7adb8e42ae1 100644 --- a/apps/sim/lib/copilot/chat/terminal-state.test.ts +++ b/apps/sim/lib/copilot/chat/terminal-state.test.ts @@ -3,212 +3,145 @@ */ import { copilotChats } from '@sim/db/schema' +import { dbChainMock, dbChainMockFns, resetDbChainMock } from '@sim/testing' import { eq } from 'drizzle-orm' import { beforeEach, describe, expect, it, vi } from 'vitest' -const { - selectForUpdate, - selectLimit, - selectWhere, - selectFrom, - select, - updateWhere, - updateSet, - update, - transaction, -} = vi.hoisted(() => { - const selectLimit = vi.fn() - const selectForUpdate = vi.fn(() => ({ limit: selectLimit })) - const selectWhere = vi.fn(() => ({ for: selectForUpdate })) - const selectFrom = vi.fn(() => ({ where: selectWhere })) - const select = vi.fn(() => ({ from: selectFrom })) - - const updateWhere = vi.fn() - const updateSet = vi.fn(() => ({ where: updateWhere })) - const update = vi.fn(() => ({ set: updateSet })) - - const transaction = vi.fn( - (callback: (tx: { select: typeof select; update: typeof update }) => unknown) => - callback({ select, update }) - ) - - return { - selectForUpdate, - selectLimit, - selectWhere, - selectFrom, - select, - updateWhere, - updateSet, - update, - transaction, - } -}) +vi.mock('@sim/db', () => dbChainMock) + +const { mockAppendCopilotChatMessages } = vi.hoisted(() => ({ + mockAppendCopilotChatMessages: vi.fn(), +})) -vi.mock('@sim/db', () => ({ - db: { - transaction, - }, +vi.mock('@/lib/copilot/chat/messages-store', () => ({ + appendCopilotChatMessages: mockAppendCopilotChatMessages, })) import { finalizeAssistantTurn } from './terminal-state' +const assistantMessage = { + id: 'assistant-1', + role: 'assistant' as const, + content: 'hi', + timestamp: '2024-01-01T00:00:00.000Z', +} + +/** + * Sequence the two in-tx reads: the chat row (`FOR UPDATE ... LIMIT 1`) and the + * last-message lookup that drives dedup — both terminate on `.limit(1)`. + */ +function mockReads(opts: { + chat: Record | null + last?: { messageId: string; role: string } +}) { + dbChainMockFns.limit.mockResolvedValueOnce(opts.chat ? [opts.chat] : []) + dbChainMockFns.limit.mockResolvedValueOnce(opts.last ? [opts.last] : []) +} + describe('finalizeAssistantTurn', () => { beforeEach(() => { vi.clearAllMocks() - updateWhere.mockResolvedValue(undefined) + // Drain the once-queue (clearAllMocks/resetDbChainMock don't), then restore defaults. + dbChainMockFns.limit.mockReset() + resetDbChainMock() }) - it('appends the assistant message when the user turn is still last', async () => { - selectLimit.mockResolvedValue([ - { - messages: [{ id: 'user-1', role: 'user', content: 'hello' }], - conversationId: 'user-1', - workspaceId: 'ws-1', - }, - ]) + it('appends the assistant message when the user turn has no reply yet', async () => { + mockReads({ + chat: { conversationId: 'user-1', workspaceId: 'ws-1', model: null }, + last: { messageId: 'user-1', role: 'user' }, + }) - await finalizeAssistantTurn({ + const result = await finalizeAssistantTurn({ chatId: 'chat-1', userMessageId: 'user-1', - assistantMessage: { - id: 'assistant-1', - role: 'assistant', - content: 'hi', - timestamp: '2024-01-01T00:00:00.000Z', - }, + assistantMessage, }) - expect(updateSet).toHaveBeenCalledWith( - expect.objectContaining({ - updatedAt: expect.any(Date), - conversationId: null, - messages: expect.anything(), - }) + expect(result.appendedAssistant).toBe(true) + const updateArg = dbChainMockFns.set.mock.calls[0]?.[0] as Record + expect(updateArg).toEqual( + expect.objectContaining({ updatedAt: expect.any(Date), conversationId: null }) + ) + expect(Object.hasOwn(updateArg, 'messages')).toBe(false) + expect(dbChainMockFns.where).toHaveBeenCalledWith(eq(copilotChats.id, 'chat-1')) + expect(mockAppendCopilotChatMessages).toHaveBeenCalledTimes(1) + expect(mockAppendCopilotChatMessages).toHaveBeenCalledWith( + 'chat-1', + [assistantMessage], + { streamId: 'user-1', chatModel: null }, + expect.anything() ) - expect(updateWhere).toHaveBeenCalledWith(eq(copilotChats.id, 'chat-1')) }) it('only clears the active stream marker when a response is already persisted', async () => { - selectLimit.mockResolvedValue([ - { - messages: [ - { id: 'user-1', role: 'user', content: 'hello' }, - { id: 'assistant-1', role: 'assistant', content: 'partial' }, - ], - conversationId: 'user-1', - workspaceId: 'ws-1', - }, - ]) - - await finalizeAssistantTurn({ + mockReads({ + chat: { conversationId: 'user-1', workspaceId: 'ws-1', model: null }, + last: { messageId: 'assistant-1', role: 'assistant' }, + }) + + const result = await finalizeAssistantTurn({ chatId: 'chat-1', userMessageId: 'user-1', - assistantMessage: { - id: 'assistant-2', - role: 'assistant', - content: 'final', - timestamp: '2024-01-01T00:00:00.000Z', - }, + assistantMessage: { ...assistantMessage, id: 'assistant-2' }, }) - const updateCalls = updateSet.mock.calls as unknown as Array<[Record]> - const updateArg = updateCalls[0]?.[0] - expect(updateArg).toBeDefined() - if (!updateArg) { - throw new Error('Expected updateSet to be called') - } + expect(result.outcome).toBe('assistant_already_persisted') + const updateArg = dbChainMockFns.set.mock.calls[0]?.[0] as Record expect(updateArg).toEqual( - expect.objectContaining({ - updatedAt: expect.any(Date), - conversationId: null, - }) + expect.objectContaining({ updatedAt: expect.any(Date), conversationId: null }) ) expect(Object.hasOwn(updateArg, 'messages')).toBe(false) - expect(updateWhere).toHaveBeenCalledWith(eq(copilotChats.id, 'chat-1')) + expect(mockAppendCopilotChatMessages).not.toHaveBeenCalled() }) it('appends a stopped assistant when the stream marker was already cleared', async () => { - selectLimit.mockResolvedValue([ - { - messages: [{ id: 'user-1', role: 'user', content: 'hello' }], - conversationId: null, - workspaceId: 'ws-1', - }, - ]) + mockReads({ + chat: { conversationId: null, workspaceId: 'ws-1', model: null }, + last: { messageId: 'user-1', role: 'user' }, + }) const result = await finalizeAssistantTurn({ chatId: 'chat-1', userMessageId: 'user-1', streamMarkerPolicy: 'active-or-cleared', - assistantMessage: { - id: 'assistant-1', - role: 'assistant', - content: 'partial', - timestamp: '2024-01-01T00:00:00.000Z', - }, + assistantMessage, }) expect(result.appendedAssistant).toBe(true) - expect(updateSet).toHaveBeenCalledWith( - expect.objectContaining({ - updatedAt: expect.any(Date), - conversationId: null, - messages: expect.anything(), - }) - ) + expect(mockAppendCopilotChatMessages).toHaveBeenCalledTimes(1) }) it('does not append on a cleared marker unless the policy allows it', async () => { - selectLimit.mockResolvedValue([ - { - messages: [{ id: 'user-1', role: 'user', content: 'hello' }], - conversationId: null, - workspaceId: 'ws-1', - }, - ]) + mockReads({ chat: { conversationId: null, workspaceId: 'ws-1', model: null } }) const result = await finalizeAssistantTurn({ chatId: 'chat-1', userMessageId: 'user-1', - assistantMessage: { - id: 'assistant-1', - role: 'assistant', - content: 'partial', - timestamp: '2024-01-01T00:00:00.000Z', - }, + assistantMessage, }) expect(result.updated).toBe(false) - expect(updateSet).not.toHaveBeenCalled() + expect(dbChainMockFns.set).not.toHaveBeenCalled() + expect(mockAppendCopilotChatMessages).not.toHaveBeenCalled() }) it('reports already persisted when a cleared marker races with a duplicate stop', async () => { - selectLimit.mockResolvedValue([ - { - messages: [ - { id: 'user-1', role: 'user', content: 'hello' }, - { id: 'assistant-1', role: 'assistant', content: 'partial' }, - ], - conversationId: null, - workspaceId: 'ws-1', - }, - ]) + mockReads({ + chat: { conversationId: null, workspaceId: 'ws-1', model: null }, + last: { messageId: 'assistant-1', role: 'assistant' }, + }) const result = await finalizeAssistantTurn({ chatId: 'chat-1', userMessageId: 'user-1', streamMarkerPolicy: 'active-or-cleared', - assistantMessage: { - id: 'assistant-2', - role: 'assistant', - content: 'partial', - timestamp: '2024-01-01T00:00:00.000Z', - }, + assistantMessage: { ...assistantMessage, id: 'assistant-2' }, }) expect(result.updated).toBe(false) expect(result.outcome).toBe('assistant_already_persisted') - expect(updateSet).not.toHaveBeenCalled() + expect(dbChainMockFns.set).not.toHaveBeenCalled() + expect(mockAppendCopilotChatMessages).not.toHaveBeenCalled() }) }) diff --git a/apps/sim/lib/copilot/chat/terminal-state.ts b/apps/sim/lib/copilot/chat/terminal-state.ts index 8b4fdf11fee..5ff7886b42b 100644 --- a/apps/sim/lib/copilot/chat/terminal-state.ts +++ b/apps/sim/lib/copilot/chat/terminal-state.ts @@ -1,7 +1,7 @@ import { db } from '@sim/db' -import { copilotChats } from '@sim/db/schema' -import { and, eq, sql } from 'drizzle-orm' -import { appendCopilotChatMessages } from '@/lib/copilot/chat/messages-dual-write' +import { copilotChats, copilotMessages } from '@sim/db/schema' +import { and, desc, eq, isNull, sql } from 'drizzle-orm' +import { appendCopilotChatMessages } from '@/lib/copilot/chat/messages-store' import type { PersistedMessage } from '@/lib/copilot/chat/persisted-message' import { CopilotChatFinalizeOutcome } from '@/lib/copilot/generated/trace-attribute-values-v1' import { TraceAttr } from '@/lib/copilot/generated/trace-attributes-v1' @@ -48,15 +48,12 @@ export async function finalizeAssistantTurn({ [TraceAttr.ChatHasAssistantMessage]: !!assistantMessage, }, async (span) => { - let appendedAssistantMessage: PersistedMessage | undefined - let chatModel: string | null = null const result = await db.transaction(async (tx) => { const where = userId ? and(eq(copilotChats.id, chatId), eq(copilotChats.userId, userId)) : eq(copilotChats.id, chatId) const [row] = await tx .select({ - messages: copilotChats.messages, conversationId: copilotChats.conversationId, workspaceId: copilotChats.workspaceId, model: copilotChats.model, @@ -65,10 +62,6 @@ export async function finalizeAssistantTurn({ .where(where) .for('update') .limit(1) - chatModel = row?.model ?? null - - const messages: Record[] = Array.isArray(row?.messages) ? row.messages : [] - span.setAttribute(TraceAttr.ChatExistingMessageCount, messages.length) if (!row) { return { @@ -80,6 +73,8 @@ export async function finalizeAssistantTurn({ } } + const chatModel = row.model ?? null + const markerMatches = row.conversationId === userMessageId const markerAlreadyCleared = row.conversationId === null const ownsTurn = @@ -94,13 +89,20 @@ export async function finalizeAssistantTurn({ } } - const userIdx = messages.findIndex((message) => message.id === userMessageId) - const alreadyHasResponse = - userIdx >= 0 && - userIdx + 1 < messages.length && - (messages[userIdx + 1] as Record)?.role === 'assistant' - const canAppendAssistant = - userIdx >= 0 && userIdx === messages.length - 1 && !alreadyHasResponse + // Append only when the user message is still the last row: anything + // after it means the turn already has a response (dedup under the lock). + const [lastMessage] = await tx + .select({ messageId: copilotMessages.messageId, role: copilotMessages.role }) + .from(copilotMessages) + .where(and(eq(copilotMessages.chatId, chatId), isNull(copilotMessages.deletedAt))) + .orderBy( + sql`${copilotMessages.seq} desc nulls last`, + desc(copilotMessages.createdAt), + desc(copilotMessages.id) + ) + .limit(1) + const canAppendAssistant = lastMessage?.messageId === userMessageId + const alreadyHasResponse = lastMessage?.role === 'assistant' const updateWhere = userId ? and(eq(copilotChats.id, chatId), eq(copilotChats.userId, userId)) @@ -111,14 +113,13 @@ export async function finalizeAssistantTurn({ } if (assistantMessage && canAppendAssistant) { - await tx - .update(copilotChats) - .set({ - ...baseUpdate, - messages: sql`${copilotChats.messages} || ${JSON.stringify([assistantMessage])}::jsonb`, - }) - .where(updateWhere) - appendedAssistantMessage = assistantMessage + await tx.update(copilotChats).set(baseUpdate).where(updateWhere) + await appendCopilotChatMessages( + chatId, + [assistantMessage], + { streamId: userMessageId, chatModel }, + tx + ) return { found: true, updated: true, @@ -136,9 +137,7 @@ export async function finalizeAssistantTurn({ appendedAssistant: false, workspaceId: row.workspaceId, outcome: assistantMessage - ? alreadyHasResponse - ? CopilotChatFinalizeOutcome.AssistantAlreadyPersisted - : CopilotChatFinalizeOutcome.StaleUserMessage + ? CopilotChatFinalizeOutcome.AssistantAlreadyPersisted : CopilotChatFinalizeOutcome.ClearedStreamMarkerOnly, } } @@ -154,13 +153,6 @@ export async function finalizeAssistantTurn({ } }) - if (appendedAssistantMessage) { - await appendCopilotChatMessages(chatId, [appendedAssistantMessage], { - streamId: userMessageId, - chatModel, - }) - } - span.setAttribute(TraceAttr.ChatFinalizeOutcome, result.outcome) return result } diff --git a/apps/sim/lib/copilot/vfs/workspace-vfs.ts b/apps/sim/lib/copilot/vfs/workspace-vfs.ts index 74fb0735a59..5823e1b6444 100644 --- a/apps/sim/lib/copilot/vfs/workspace-vfs.ts +++ b/apps/sim/lib/copilot/vfs/workspace-vfs.ts @@ -1175,27 +1175,32 @@ export class WorkspaceVFS { .select({ id: copilotChats.id, title: copilotChats.title, - messageCount: sql`COALESCE(jsonb_array_length(${copilotChats.messages}), 0)`, + messageCount: sql`COALESCE(( + SELECT COUNT(*) FROM copilot_messages cm + WHERE cm.chat_id = ${copilotChats.id} AND cm.deleted_at IS NULL + ), 0)`, messages: sql`COALESCE(( SELECT jsonb_agg( jsonb_build_object( - 'role', m.value->>'role', - 'content', m.value->'content', + 'role', cm.content->>'role', + 'content', cm.content->'content', 'contentBlocks', COALESCE(( SELECT jsonb_agg(jsonb_build_object('type', 'text', 'content', b.value->'content') ORDER BY b.ord) FROM jsonb_array_elements( - CASE WHEN jsonb_typeof(m.value->'contentBlocks') = 'array' - THEN m.value->'contentBlocks' + CASE WHEN jsonb_typeof(cm.content->'contentBlocks') = 'array' + THEN cm.content->'contentBlocks' ELSE '[]'::jsonb END ) WITH ORDINALITY AS b(value, ord) WHERE b.value->>'type' = 'text' ), '[]'::jsonb) ) - ORDER BY m.ord + ORDER BY cm.seq ASC NULLS LAST, cm.created_at ASC, cm.id ASC ) - FROM jsonb_array_elements(${copilotChats.messages}) WITH ORDINALITY AS m(value, ord) - WHERE m.value->>'role' IN ('user', 'assistant') + FROM copilot_messages cm + WHERE cm.chat_id = ${copilotChats.id} + AND cm.deleted_at IS NULL + AND cm.content->>'role' IN ('user', 'assistant') ), '[]'::jsonb)`, createdAt: copilotChats.createdAt, updatedAt: copilotChats.updatedAt, diff --git a/apps/sim/lib/data-drains/sources/copilot-chats.ts b/apps/sim/lib/data-drains/sources/copilot-chats.ts index d1d25c2aaf2..6bde4632dca 100644 --- a/apps/sim/lib/data-drains/sources/copilot-chats.ts +++ b/apps/sim/lib/data-drains/sources/copilot-chats.ts @@ -1,6 +1,6 @@ import { db } from '@sim/db' -import { copilotChats } from '@sim/db/schema' -import { and, inArray } from 'drizzle-orm' +import { copilotChats, copilotMessages } from '@sim/db/schema' +import { and, asc, inArray, isNull, sql } from 'drizzle-orm' import { decodeTimeCursor, encodeTimeCursor, @@ -10,7 +10,34 @@ import { import { getOrganizationWorkspaceIds } from '@/lib/data-drains/sources/helpers' import type { Cursor, DrainSource, SourcePageInput } from '@/lib/data-drains/types' -type CopilotChatRow = typeof copilotChats.$inferSelect +/** + * The transcript no longer lives on `copilot_chats.messages` — it is assembled + * per page from the normalized `copilot_messages` table, so `messages` is the + * ordered list of message `content` objects rather than the DB column. + */ +type CopilotChatRow = Omit & { + messages: unknown[] +} + +/** Chat metadata columns, excluding the legacy `messages` JSONB. */ +const chatColumns = { + id: copilotChats.id, + userId: copilotChats.userId, + workflowId: copilotChats.workflowId, + workspaceId: copilotChats.workspaceId, + type: copilotChats.type, + title: copilotChats.title, + model: copilotChats.model, + conversationId: copilotChats.conversationId, + previewYaml: copilotChats.previewYaml, + planArtifact: copilotChats.planArtifact, + config: copilotChats.config, + resources: copilotChats.resources, + lastSeenAt: copilotChats.lastSeenAt, + pinned: copilotChats.pinned, + createdAt: copilotChats.createdAt, + updatedAt: copilotChats.updatedAt, +} as const /** * Cursor is `createdAt` (immutable) but rows themselves are mutable — @@ -28,18 +55,42 @@ async function* pages(input: SourcePageInput): AsyncIterable { while (!input.signal.aborted) { const cursorClause = timeCursorPredicate(copilotChats.createdAt, copilotChats.id, cursor) - const rows = await db - .select() + const metaRows = await db + .select(chatColumns) .from(copilotChats) .where(and(inArray(copilotChats.workspaceId, workspaceIds), cursorClause)) .orderBy(...timeCursorOrderBy(copilotChats.createdAt, copilotChats.id)) .limit(input.chunkSize) - if (rows.length === 0) return + if (metaRows.length === 0) return + + const chatIds = metaRows.map((r) => r.id) + const messageRows = await db + .select({ chatId: copilotMessages.chatId, content: copilotMessages.content }) + .from(copilotMessages) + .where(and(inArray(copilotMessages.chatId, chatIds), isNull(copilotMessages.deletedAt))) + .orderBy( + asc(copilotMessages.chatId), + sql`${copilotMessages.seq} asc nulls last`, + asc(copilotMessages.createdAt), + asc(copilotMessages.id) + ) + const messagesByChat = new Map() + for (const m of messageRows) { + const existing = messagesByChat.get(m.chatId) + if (existing) existing.push(m.content) + else messagesByChat.set(m.chatId, [m.content]) + } + + const rows: CopilotChatRow[] = metaRows.map((r) => ({ + ...r, + messages: messagesByChat.get(r.id) ?? [], + })) + yield rows - const last = rows[rows.length - 1] + const last = metaRows[metaRows.length - 1] cursor = { ts: last.createdAt.toISOString(), id: last.id } - if (rows.length < input.chunkSize) return + if (metaRows.length < input.chunkSize) return } } diff --git a/apps/sim/lib/mothership/inbox/executor.ts b/apps/sim/lib/mothership/inbox/executor.ts index d677e131243..67ac5296632 100644 --- a/apps/sim/lib/mothership/inbox/executor.ts +++ b/apps/sim/lib/mothership/inbox/executor.ts @@ -4,7 +4,7 @@ import { getErrorMessage } from '@sim/utils/errors' import { generateId } from '@sim/utils/id' import { and, eq, sql } from 'drizzle-orm' import { resolveOrCreateChat } from '@/lib/copilot/chat/lifecycle' -import { appendCopilotChatMessages } from '@/lib/copilot/chat/messages-dual-write' +import { appendCopilotChatMessages } from '@/lib/copilot/chat/messages-store' import { buildIntegrationToolSchemas } from '@/lib/copilot/chat/payload' import { buildPersistedAssistantMessage, @@ -227,7 +227,6 @@ export async function executeInboxTask(taskId: string): Promise { if (chatId) { await persistChatMessages( chatId, - userId, userMessageId, messageContent, { @@ -328,7 +327,6 @@ async function resolveUserId( */ async function persistChatMessages( chatId: string, - userId: string, userMessageId: string, userContent: string, result: OrchestratorResult, @@ -343,22 +341,24 @@ async function persistChatMessages( const assistantMessage = buildPersistedAssistantMessage(result) - const newMessages = JSON.stringify([userMessage, assistantMessage]) - const [updated] = await db - .update(copilotChats) - .set({ - messages: sql`COALESCE(${copilotChats.messages}, '[]'::jsonb) || ${newMessages}::jsonb`, - updatedAt: new Date(), - }) - .where(eq(copilotChats.id, chatId)) - .returning({ model: copilotChats.model }) - if (updated) { - await appendCopilotChatMessages(chatId, [userMessage, assistantMessage], { - chatModel: updated.model ?? null, - }) - } + // Best-effort: the email response is the primary deliverable, so a failure + // here is logged (in the catch below) rather than failing the task. + await db.transaction(async (tx) => { + const [updated] = await tx + .update(copilotChats) + .set({ updatedAt: new Date() }) + .where(eq(copilotChats.id, chatId)) + .returning({ model: copilotChats.model }) + if (!updated) return + await appendCopilotChatMessages( + chatId, + [userMessage, assistantMessage], + { chatModel: updated.model ?? null }, + tx + ) + }) } catch (err) { - logger.warn('Failed to persist chat messages', { + logger.error('Failed to persist chat messages', { chatId, error: getErrorMessage(err, 'Unknown error'), }) From 919fa5274bfebfbe5916ac0caa7311b7cbdbd3e1 Mon Sep 17 00:00:00 2001 From: Waleed Date: Sun, 31 May 2026 22:49:15 -0700 Subject: [PATCH 02/13] feat(tables): expand filter operators (not-contains, starts/ends-with, not-in, empty) (#4827) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add does-not-contain ($ncontains), starts-with ($startsWith), ends-with ($endsWith), not-in-array ($nin, previously executed server-side but unexposed in the UI), and is-empty/is-not-empty ($empty) filter operators end-to-end — SQL builder, condition types, query-builder converters/constants, the filter UI, the Table tools/block descriptions, and docs. Also fix correctness bugs in the filter builder surfaced by the wider operator set: - Same-column AND rules (e.g. age > 18 AND age < 65, or name startsWith 'A' AND name endsWith 'Z') silently overwrote each other because the AND group was keyed by column name. They now merge into one operator object, which also makes Filter -> rules -> Filter round-trip losslessly for multi-operator columns. - $nin values were not split into an array like $in, and textual-match values like "123" were numeric-coerced (breaking the ILIKE path). - A non-boolean $empty operand from the raw API silently inverted the check; it now coerces 'true'/'false' strings and otherwise returns a 400. --- apps/docs/content/docs/en/tools/table.mdx | 6 +- .../components/table-filter/table-filter.tsx | 30 +++-- apps/sim/blocks/blocks/table.ts | 8 ++ apps/sim/lib/table/__tests__/sql.test.ts | 66 ++++++++++ .../__tests__/converters.test.ts | 119 ++++++++++++++++++ apps/sim/lib/table/query-builder/constants.ts | 15 ++- .../sim/lib/table/query-builder/converters.ts | 58 ++++++++- apps/sim/lib/table/sql.ts | 98 ++++++++++++++- apps/sim/lib/table/types.ts | 8 ++ apps/sim/tools/table/delete_rows_by_filter.ts | 3 +- apps/sim/tools/table/query_rows.ts | 2 +- apps/sim/tools/table/update_rows_by_filter.ts | 3 +- 12 files changed, 388 insertions(+), 28 deletions(-) create mode 100644 apps/sim/lib/table/query-builder/__tests__/converters.test.ts diff --git a/apps/docs/content/docs/en/tools/table.mdx b/apps/docs/content/docs/en/tools/table.mdx index 388005b772c..e7cf5993704 100644 --- a/apps/docs/content/docs/en/tools/table.mdx +++ b/apps/docs/content/docs/en/tools/table.mdx @@ -275,7 +275,11 @@ Filters use MongoDB-style operators for flexible querying: | `$lte` | Less than or equal | `{"quantity": {"$lte": 10}}` | | `$in` | In array | `{"status": {"$in": ["active", "pending"]}}` | | `$nin` | Not in array | `{"type": {"$nin": ["spam", "blocked"]}}` | -| `$contains` | String contains | `{"email": {"$contains": "@gmail.com"}}` | +| `$contains` | String contains (case-insensitive) | `{"email": {"$contains": "@gmail.com"}}` | +| `$ncontains` | Does not contain (case-insensitive; matches empty cells) | `{"email": {"$ncontains": "@spam.com"}}` | +| `$startsWith` | Starts with (case-insensitive) | `{"name": {"$startsWith": "Dr."}}` | +| `$endsWith` | Ends with (case-insensitive) | `{"file": {"$endsWith": ".pdf"}}` | +| `$empty` | Cell is empty (`true`) or non-empty (`false`) | `{"phone": {"$empty": true}}` | ### Combining Filters diff --git a/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/table-filter/table-filter.tsx b/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/table-filter/table-filter.tsx index bd683c62515..12f5bb10fb8 100644 --- a/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/table-filter/table-filter.tsx +++ b/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/table-filter/table-filter.tsx @@ -12,7 +12,7 @@ import { } from '@/components/emcn' import { ChevronDown, Plus } from '@/components/emcn/icons' import type { Filter, FilterRule } from '@/lib/table' -import { COMPARISON_OPERATORS } from '@/lib/table/query-builder/constants' +import { COMPARISON_OPERATORS, VALUELESS_OPERATORS } from '@/lib/table/query-builder/constants' import { filterRulesToFilter, filterToRules } from '@/lib/table/query-builder/converters' const OPERATOR_LABELS = Object.fromEntries( @@ -71,7 +71,9 @@ export function TableFilter({ columns, filter, onApply, onClose }: TableFilterPr }, []) const handleApply = useCallback(() => { - const validRules = rulesRef.current.filter((r) => r.column && r.value) + const validRules = rulesRef.current.filter( + (r) => r.column && (r.value || VALUELESS_OPERATORS.has(r.operator)) + ) onApply(filterRulesToFilter(validRules)) }, [onApply]) @@ -197,16 +199,20 @@ const FilterRuleRow = memo(function FilterRuleRow({ - onUpdate(rule.id, 'value', e.target.value)} - onKeyDown={(e) => { - if (e.key === 'Enter') onApply() - }} - placeholder='Enter a value' - className='h-[28px] flex-1 rounded-[5px] border border-[var(--border)] bg-transparent px-2 text-[var(--text-secondary)] text-xs outline-none placeholder:text-[var(--text-subtle)] hover-hover:border-[var(--border-1)] focus:border-[var(--border-1)]' - /> + {VALUELESS_OPERATORS.has(rule.operator) ? ( +
+ ) : ( + onUpdate(rule.id, 'value', e.target.value)} + onKeyDown={(e) => { + if (e.key === 'Enter') onApply() + }} + placeholder='Enter a value' + className='h-[28px] flex-1 rounded-[5px] border border-[var(--border)] bg-transparent px-2 text-[var(--text-secondary)] text-xs outline-none placeholder:text-[var(--text-subtle)] hover-hover:border-[var(--border-1)] focus:border-[var(--border-1)]' + /> + )}