From d4ff0e50a4ffbccfa465d45ec29108ad49cff69b Mon Sep 17 00:00:00 2001 From: Ekaterina Bulatova Date: Mon, 1 Jun 2026 10:17:31 +0200 Subject: [PATCH] feat(sdk,core): offload large trigger payloads via object storage Upload oversized trigger payloads before the API request and send an application/store pointer instead of embedding large JSON in the trigger body. Validate pointer payloads in TriggerTaskRequestBody. --- .changeset/large-trigger-payload-offload.md | 8 + packages/core/src/v3/schemas/api-type.test.ts | 40 ++++- packages/core/src/v3/schemas/api.ts | 30 +++- packages/core/src/v3/utils/ioSerialization.ts | 27 +++- packages/core/test/ioSerialization.test.ts | 148 +++++++++++++++++- packages/trigger-sdk/src/v3/shared.ts | 42 +++-- 6 files changed, 268 insertions(+), 27 deletions(-) create mode 100644 .changeset/large-trigger-payload-offload.md diff --git a/.changeset/large-trigger-payload-offload.md b/.changeset/large-trigger-payload-offload.md new file mode 100644 index 00000000000..e8d87947166 --- /dev/null +++ b/.changeset/large-trigger-payload-offload.md @@ -0,0 +1,8 @@ +--- +"@trigger.dev/core": patch +"@trigger.dev/sdk": patch +--- + +Offload large trigger payloads to object storage before sending the trigger API request. The SDK uploads packets at or above the existing 128KB limit and sends an `application/store` pointer instead of embedding large JSON in the request body. `TriggerTaskRequestBody` now validates that `application/store` payloads are non-empty storage paths. + +Payload uploads use the same resolved `ApiClient` as the trigger call (including `requestOptions.clientConfig`), not only the global `apiClientManager.client` — so custom `baseURL`, access token, and preview branch apply to both presign and trigger. diff --git a/packages/core/src/v3/schemas/api-type.test.ts b/packages/core/src/v3/schemas/api-type.test.ts index c936b3c769d..87f755206da 100644 --- a/packages/core/src/v3/schemas/api-type.test.ts +++ b/packages/core/src/v3/schemas/api-type.test.ts @@ -1,5 +1,5 @@ import { describe, it, expect } from "vitest"; -import { InitializeDeploymentRequestBody } from "./api.js"; +import { InitializeDeploymentRequestBody, TriggerTaskRequestBody } from "./api.js"; import type { InitializeDeploymentRequestBody as InitializeDeploymentRequestBodyType } from "./api.js"; describe("InitializeDeploymentRequestBody", () => { @@ -139,3 +139,41 @@ describe("InitializeDeploymentRequestBody", () => { }); }); }); + +describe("TriggerTaskRequestBody", () => { + it("accepts application/store payload as a non-empty string", () => { + const result = TriggerTaskRequestBody.safeParse({ + payload: "packets/payloads/file.json", + context: {}, + options: { + payloadType: "application/store", + }, + }); + + expect(result.success).toBe(true); + }); + + it("rejects application/store payload when payload is not a string", () => { + const result = TriggerTaskRequestBody.safeParse({ + payload: { foo: "bar" }, + context: {}, + options: { + payloadType: "application/store", + }, + }); + + expect(result.success).toBe(false); + }); + + it("rejects application/store payload when payload is an empty string", () => { + const result = TriggerTaskRequestBody.safeParse({ + payload: "", + context: {}, + options: { + payloadType: "application/store", + }, + }); + + expect(result.success).toBe(false); + }); +}); diff --git a/packages/core/src/v3/schemas/api.ts b/packages/core/src/v3/schemas/api.ts index e86e503de47..aba687d867a 100644 --- a/packages/core/src/v3/schemas/api.ts +++ b/packages/core/src/v3/schemas/api.ts @@ -157,11 +157,12 @@ export const IdempotencyKeyOptionsSchema = z.object({ export type IdempotencyKeyOptionsSchema = z.infer; -export const TriggerTaskRequestBody = z.object({ - payload: z.any(), - context: z.any(), - options: z - .object({ +export const TriggerTaskRequestBody = z + .object({ + payload: z.any(), + context: z.any(), + options: z + .object({ /** @deprecated engine v1 only */ dependentAttempt: z.string().optional(), /** @deprecated engine v1 only */ @@ -227,9 +228,22 @@ export const TriggerTaskRequestBody = z.object({ maxDelay: z.string().optional(), }) .optional(), - }) - .optional(), -}); + }) + .optional(), + }) + .superRefine((value, ctx) => { + if (value.options?.payloadType !== "application/store") { + return; + } + + if (typeof value.payload !== "string" || value.payload.length === 0) { + ctx.addIssue({ + code: z.ZodIssueCode.custom, + message: "payload must be a non-empty string when options.payloadType is application/store", + path: ["payload"], + }); + } + }); export type TriggerTaskRequestBody = z.infer; diff --git a/packages/core/src/v3/utils/ioSerialization.ts b/packages/core/src/v3/utils/ioSerialization.ts index 35e3d41061a..1386ed04b43 100644 --- a/packages/core/src/v3/utils/ioSerialization.ts +++ b/packages/core/src/v3/utils/ioSerialization.ts @@ -100,22 +100,33 @@ export async function stringifyIO(value: any): Promise { } } +/** + * Offloads a packet to object storage when it exceeds the size limit. + * + * @param client - Optional API client to use for the upload presign request. When + * omitted, falls back to the global `apiClientManager.client`. Pass an explicit client + * (e.g. one built from a custom `clientConfig`) to ensure the payload is uploaded using + * the same configuration as the accompanying trigger API call. + */ export async function conditionallyExportPacket( packet: IOPacket, pathPrefix: string, - tracer?: TriggerTracer + tracer?: TriggerTracer, + client?: ApiClient ): Promise { - if (apiClientManager.client) { + const $client = client ?? apiClientManager.client; + + if ($client) { const { needsOffloading, size } = packetRequiresOffloading(packet); if (needsOffloading) { if (!tracer) { - return await exportPacket(packet, pathPrefix); + return await exportPacket(packet, pathPrefix, $client); } else { const result = await tracer.startActiveSpan( "store.uploadOutput", async (span) => { - return await exportPacket(packet, pathPrefix); + return await exportPacket(packet, pathPrefix, $client); }, { attributes: { @@ -163,11 +174,15 @@ const ioRetryOptions = { randomize: true, } satisfies RetryOptions; -async function exportPacket(packet: IOPacket, pathPrefix: string): Promise { +async function exportPacket( + packet: IOPacket, + pathPrefix: string, + client: ApiClient +): Promise { // Offload the output const filename = `${pathPrefix}.${getPacketExtension(packet.dataType)}`; - const presignedResponse = await apiClientManager.client!.createUploadPayloadUrl(filename); + const presignedResponse = await client.createUploadPayloadUrl(filename); if (!presignedResponse.storagePath) { throw new Error( diff --git a/packages/core/test/ioSerialization.test.ts b/packages/core/test/ioSerialization.test.ts index d7bd90add83..9a2d7accfe1 100644 --- a/packages/core/test/ioSerialization.test.ts +++ b/packages/core/test/ioSerialization.test.ts @@ -1,4 +1,12 @@ -import { replaceSuperJsonPayload, prettyPrintPacket } from "../src/v3/utils/ioSerialization.js"; +import { createTestHttpServer } from "@epic-web/test-server/http"; +import { + replaceSuperJsonPayload, + prettyPrintPacket, + conditionallyExportPacket, + type IOPacket, +} from "../src/v3/utils/ioSerialization.js"; +import { ApiClient } from "../src/v3/apiClient/index.js"; +import { apiClientManager } from "../src/v3/apiClientManager-api.js"; describe("ioSerialization", () => { describe("replaceSuperJsonPayload", () => { @@ -344,4 +352,142 @@ describe("ioSerialization", () => { expect(result).toBe(JSON.stringify(data, null, 2)); }); }); + + describe("conditionallyExportPacket", () => { + // A payload large enough to exceed OFFLOAD_IO_PACKET_LENGTH_LIMIT (128KB) so it offloads. + const largePayload = "x".repeat(200_000); + const largePacket: IOPacket = { data: largePayload, dataType: "text/plain" }; + + afterEach(() => { + // Clear any global client config set during a test. + apiClientManager.disable(); + }); + + it("uses the provided client for the upload presign instead of the global client", async () => { + const globalPresignRequests: string[] = []; + const passedPresignRequests: string[] = []; + let uploadedBytes = 0; + + // The global client points here — it must NOT be hit when a client is passed. + const globalServer = await createTestHttpServer({ + defineRoutes(router) { + router.put("/api/v2/packets/:filename", async ({ req }) => { + globalPresignRequests.push(req.url); + return Response.json({ presignedUrl: "http://unused.local/upload" }); + }); + }, + }); + + // The explicitly passed client points here — it MUST receive the presign + upload. + const passedServer = await createTestHttpServer({ + defineRoutes(router) { + router.put("/api/v2/packets/:filename", async ({ req }) => { + passedPresignRequests.push(req.url); + return Response.json({ + presignedUrl: `${passedServer.http.url().origin}/upload/payload`, + storagePath: "trigger/task/payload.txt", + }); + }); + router.put("/upload/payload", async ({ req }) => { + uploadedBytes = (await req.text()).length; + return new Response(null, { status: 200 }); + }); + }, + }); + + try { + // Configure the global client to point at the global server. + apiClientManager.setGlobalAPIClientConfiguration({ + baseURL: globalServer.http.url().origin, + accessToken: "tr-global", + }); + + const passedClient = new ApiClient(passedServer.http.url().origin, "tr-passed"); + + const result = await conditionallyExportPacket( + largePacket, + "trigger/task/payload", + undefined, + passedClient + ); + + expect(result.dataType).toBe("application/store"); + expect(result.data).toBe("trigger/task/payload.txt"); + + // Upload went through the passed client only. + expect(passedPresignRequests).toHaveLength(1); + expect(globalPresignRequests).toHaveLength(0); + expect(uploadedBytes).toBe(largePayload.length); + } finally { + await globalServer.close(); + await passedServer.close(); + } + }); + + it("falls back to the global client when no client is provided", async () => { + const presignRequests: string[] = []; + + const globalServer = await createTestHttpServer({ + defineRoutes(router) { + router.put("/api/v2/packets/:filename", async ({ req }) => { + presignRequests.push(req.url); + return Response.json({ + presignedUrl: `${globalServer.http.url().origin}/upload/payload`, + storagePath: "trigger/task/payload.txt", + }); + }); + router.put("/upload/payload", async () => new Response(null, { status: 200 })); + }, + }); + + try { + apiClientManager.setGlobalAPIClientConfiguration({ + baseURL: globalServer.http.url().origin, + accessToken: "tr-global", + }); + + const result = await conditionallyExportPacket(largePacket, "trigger/task/payload"); + + expect(result.dataType).toBe("application/store"); + expect(result.data).toBe("trigger/task/payload.txt"); + expect(presignRequests).toHaveLength(1); + } finally { + await globalServer.close(); + } + }); + + it("returns the packet unchanged when no client is available", async () => { + apiClientManager.disable(); + + const result = await conditionallyExportPacket(largePacket, "trigger/task/payload"); + + expect(result).toEqual(largePacket); + }); + + it("does not offload small payloads even with a client", async () => { + const passedServer = await createTestHttpServer({ + defineRoutes(router) { + router.put("/api/v2/packets/:filename", async () => { + throw new Error("presign should not be called for small payloads"); + }); + }, + }); + + try { + const smallPacket: IOPacket = { data: "hello", dataType: "text/plain" }; + const passedClient = new ApiClient(passedServer.http.url().origin, "tr-passed"); + + const result = await conditionallyExportPacket( + smallPacket, + "trigger/task/payload", + undefined, + passedClient + ); + + expect(result).toEqual(smallPacket); + } finally { + await passedServer.close(); + } + }); + }); }); diff --git a/packages/trigger-sdk/src/v3/shared.ts b/packages/trigger-sdk/src/v3/shared.ts index 545594f4826..fba990949b3 100644 --- a/packages/trigger-sdk/src/v3/shared.ts +++ b/packages/trigger-sdk/src/v3/shared.ts @@ -2,9 +2,11 @@ import { SpanKind } from "@opentelemetry/api"; import { SerializableJson } from "@trigger.dev/core"; import { accessoryAttributes, + ApiClient, ApiError, apiClientManager, ApiRequestOptions, + conditionallyExportPacket, conditionallyImportPacket, convertToolParametersToSchema, createErrorTaskError, @@ -25,6 +27,7 @@ import { sdkScope, SemanticInternalAttributes, stringifyIO, + type IOPacket, SubtaskUnwrapError, taskContext, TaskFromIdentifier, @@ -2212,8 +2215,7 @@ async function trigger_internal( const apiClient = apiClientManager.clientOrThrow(requestOptions?.clientConfig); const parsedPayload = parsePayload ? await parsePayload(payload) : payload; - - const payloadPacket = await stringifyIO(parsedPayload); + const triggerPayloadPacket = await prepareTriggerPayload(parsedPayload, apiClient, id); // Process idempotency key and extract options for storage const processedIdempotencyKey = await makeIdempotencyKey(options?.idempotencyKey); @@ -2224,12 +2226,12 @@ async function trigger_internal( const handle = await apiClient.triggerTask( id, { - payload: payloadPacket.data, + payload: triggerPayloadPacket.data, options: { queue: options?.queue ? { name: options.queue } : undefined, concurrencyKey: options?.concurrencyKey, test: taskContext.ctx?.run.isTest, - payloadType: payloadPacket.dataType, + payloadType: triggerPayloadPacket.dataType, idempotencyKey: processedIdempotencyKey?.toString(), idempotencyKeyTTL: options?.idempotencyKeyTTL, idempotencyKeyOptions, @@ -2468,8 +2470,7 @@ async function triggerAndWait_internal { + const payloadPacket = await stringifyIO(payload); + return await conditionallyExportPacket( + payloadPacket, + createTriggerPayloadPathPrefix(taskId), + undefined, + apiClient + ); +} + +function createTriggerPayloadPathPrefix(taskId: string): string { + const safeTaskId = encodeURIComponent(taskId); + return `trigger/${safeTaskId}/${Date.now()}-${Math.random().toString(36).slice(2)}/payload`; +}