From 8c3dff6bb8bf13a5134cc16ed9f7887336e39c66 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Wed, 10 Jun 2026 14:18:19 +0100 Subject: [PATCH 1/3] fix(sdk,core): harden chat.agent message delivery and idempotency Stop delivering a user message twice when it arrives mid-stream: the session stream manager now lets a handler consume a record so it is not also buffered for the next turn, which previously re-ran the message as a duplicate turn. Input appends carry an X-Part-Id idempotency key so a retried send cannot duplicate a message. Stopping a generation clears the streaming state and persists it, so a page reload no longer replays the stopped turn. Promoting a queued message to steering no longer sends inside a React state updater. Runs keep up to the full tag limit instead of being silently truncated. The in-memory test stream manager now mirrors the production consume semantics so this class of bug is covered. --- .changeset/chat-agent-hardening.md | 6 + packages/core/src/v3/apiClient/index.ts | 7 +- packages/core/src/v3/sessionStreams/index.ts | 2 +- .../core/src/v3/sessionStreams/manager.ts | 93 ++++++++---- .../core/src/v3/sessionStreams/noopManager.ts | 2 +- packages/core/src/v3/sessionStreams/types.ts | 13 +- .../v3/test/test-session-stream-manager.ts | 137 ++++++++++++------ packages/trigger-sdk/src/v3/ai.ts | 19 ++- packages/trigger-sdk/src/v3/chat-client.ts | 3 + packages/trigger-sdk/src/v3/chat-react.ts | 18 ++- packages/trigger-sdk/src/v3/chat.ts | 38 ++++- packages/trigger-sdk/src/v3/sessions.ts | 14 +- 12 files changed, 252 insertions(+), 100 deletions(-) create mode 100644 .changeset/chat-agent-hardening.md diff --git a/.changeset/chat-agent-hardening.md b/.changeset/chat-agent-hardening.md new file mode 100644 index 00000000000..fd03aa796cc --- /dev/null +++ b/.changeset/chat-agent-hardening.md @@ -0,0 +1,6 @@ +--- +"@trigger.dev/sdk": patch +"@trigger.dev/core": patch +--- + +Reliability fixes for `chat.agent`. A user message sent while the agent is streaming is no longer delivered twice (which could run a duplicate turn), input appends now carry an idempotency key so a retried send can't duplicate a message, stopping a generation clears the streaming state so a page reload doesn't replay the stopped turn, and runs can now carry the full set of dashboard tags instead of being silently truncated. diff --git a/packages/core/src/v3/apiClient/index.ts b/packages/core/src/v3/apiClient/index.ts index 4bbeca8bd31..c96abf6a435 100644 --- a/packages/core/src/v3/apiClient/index.ts +++ b/packages/core/src/v3/apiClient/index.ts @@ -1,3 +1,4 @@ +import { nanoid } from "nanoid"; import { z } from "zod"; import { VERSION } from "../../version.js"; import { generateJWT } from "../jwt.js"; @@ -1276,12 +1277,16 @@ export class ApiClient { part: TBody, requestOptions?: ZodFetchOptions ) { + // Generated once per logical append, outside zodfetch, so its internal + // retries reuse the same part id and the server-side dedupe collapses a + // retried POST whose first attempt actually committed. + const partId = nanoid(7); return zodfetch( AppendToStreamResponseBody, `${this.baseUrl}/realtime/v1/sessions/${encodeURIComponent(sessionIdOrExternalId)}/${io}/append`, { method: "POST", - headers: this.#getHeaders(false), + headers: { ...this.#getHeaders(false), "X-Part-Id": partId }, body: part, }, mergeRequestOptions(this.defaultRequestOptions, requestOptions) diff --git a/packages/core/src/v3/sessionStreams/index.ts b/packages/core/src/v3/sessionStreams/index.ts index 7ea17261632..c150f0fe9df 100644 --- a/packages/core/src/v3/sessionStreams/index.ts +++ b/packages/core/src/v3/sessionStreams/index.ts @@ -34,7 +34,7 @@ export class SessionStreamsAPI implements SessionStreamManager { public on( sessionId: string, io: SessionChannelIO, - handler: (data: unknown) => void | Promise + handler: (data: unknown) => void | boolean | Promise ): { off: () => void } { return this.#getManager().on(sessionId, io, handler); } diff --git a/packages/core/src/v3/sessionStreams/manager.ts b/packages/core/src/v3/sessionStreams/manager.ts index 5d07d841ba4..65eaf40f9cc 100644 --- a/packages/core/src/v3/sessionStreams/manager.ts +++ b/packages/core/src/v3/sessionStreams/manager.ts @@ -9,7 +9,11 @@ import { computeReconnectDelayMs } from "../utils/reconnectBackoff.js"; import { SessionChannelIO, SessionStreamManager } from "./types.js"; import { controlSubtype } from "./wireProtocol.js"; -type SessionStreamHandler = (data: unknown) => void | Promise; +// A handler that synchronously returns `true` CONSUMES the record: it is +// not buffered for a later `once()` and the committed-consume cursor +// advances past it. Anything else (void, a Promise) leaves the record +// available to other consumers. See `SessionStreamManager.on` in types.ts. +type SessionStreamHandler = (data: unknown) => void | boolean | Promise; type OnceWaiter = { resolve: (result: InputStreamOnceResult) => void; @@ -113,30 +117,41 @@ export class StandardSessionStreamManager implements SessionStreamManager { this.explicitlyDisconnected.delete(key); this.#ensureTailConnected(sessionId, io); + // Selective drain: offer each buffered record to the new handler and + // remove ONLY the ones it consumed (returned `true` — e.g. the + // messages facade for message-kind records). Consumed records advance + // the committed-consume cursor, so a worker using `messagesInput.on()` + // for user-message delivery persists a `.in` cursor that matches what + // the handler processed. Records the handler did not consume (other + // kinds) STAY buffered for a future `once()` or a different handler — + // a blind drain here either swallowed them (delivered to a handler + // that filtered them out, then deleted) or re-delivered already- + // processed messages into every newly attached per-turn handler, + // duplicating turns. const buffered = this.buffer.get(key); if (buffered && buffered.length > 0) { - for (const data of buffered) { - this.#invokeHandler(handler, data); - } - // Advance the committed-consume cursor to the highest seq drained - // into the new handler. `on()`-drain removes the records from the - // buffer, so they're no longer available to a future `once()` — - // from the manager's perspective they've been consumed. Without - // this, a worker that uses `messagesInput.on()` for user-message - // delivery (pendingMessages mode) would persist a `.in` cursor - // that lags behind the records the handler already processed, and - // the next boot would re-deliver them. - const seqList = this.bufferSeqNums.get(key); - if (seqList) { - for (const s of seqList) { + const seqList = this.bufferSeqNums.get(key) ?? []; + const keptRecords: unknown[] = []; + // Kept in lock-step with `keptRecords` — drifting lengths would map + // seq_nums to the wrong records on subsequent shifts. + const keptSeqNums: Array = []; + for (let i = 0; i < buffered.length; i++) { + const consumed = this.#invokeHandler(handler, buffered[i]); + if (consumed) { + const s = seqList[i]; if (s !== undefined) this.#advanceLastDispatched(key, s); + } else { + keptRecords.push(buffered[i]); + keptSeqNums.push(seqList[i]); } } - this.buffer.delete(key); - // Keep `bufferSeqNums` in lock-step with `buffer` — without this, - // the parallel array desyncs and the next `#dispatch` that buffers - // a record would shift a stale seqNum into `lastDispatchedSeqNum`. - this.bufferSeqNums.delete(key); + if (keptRecords.length > 0) { + this.buffer.set(key, keptRecords); + this.bufferSeqNums.set(key, keptSeqNums); + } else { + this.buffer.delete(key); + this.bufferSeqNums.delete(key); + } } return { @@ -509,13 +524,21 @@ export class StandardSessionStreamManager implements SessionStreamManager { return; } - // Persistent handlers (e.g. `stopInput.on(...)`) get a copy of the chunk, - // but they don't "consume" it — handlers usually filter by `kind` and - // ignore chunks they don't care about. Buffer the chunk regardless so a - // subsequent `once()` (e.g. `messagesInput.waitWithIdleTimeout` in - // chat.agent's preload) can still pick up the same chunk that arrived - // before its waiter was registered. - this.#invokeHandlers(key, data); + // Persistent handlers get a copy of the chunk. A handler that + // synchronously returns `true` CONSUMES it (e.g. the messages facade + // for message-kind records): the record must not also be buffered, or + // the next `on()` attach / `once()` would deliver it a second time — + // in chat.agent's turn loop that duplicated user messages into a + // second turn. Records no handler consumed (e.g. a message arriving + // while only the stop facade is attached during preload) are buffered + // so a subsequent `once()` can still pick them up. + const consumed = this.#invokeHandlers(key, data); + if (consumed) { + if (seqNum !== undefined) { + this.#advanceLastDispatched(key, seqNum); + } + return; + } let buffered = this.buffer.get(key); if (!buffered) { @@ -535,17 +558,24 @@ export class StandardSessionStreamManager implements SessionStreamManager { bufferedSeqs.push(seqNum); } - #invokeHandlers(key: string, data: unknown): void { + /** Returns true when any handler consumed the record. All handlers are invoked regardless. */ + #invokeHandlers(key: string, data: unknown): boolean { const handlers = this.handlers.get(key); - if (!handlers) return; + if (!handlers) return false; + let consumed = false; for (const handler of handlers) { - this.#invokeHandler(handler, data); + if (this.#invokeHandler(handler, data)) { + consumed = true; + } } + return consumed; } - #invokeHandler(handler: SessionStreamHandler, data: unknown): void { + /** Returns true when the handler synchronously consumed the record (returned `true`). */ + #invokeHandler(handler: SessionStreamHandler, data: unknown): boolean { try { const result = handler(data); + if (result === true) return true; if (result && typeof result === "object" && "catch" in result) { (result as Promise).catch((error) => { if (this.debug) { @@ -558,6 +588,7 @@ export class StandardSessionStreamManager implements SessionStreamManager { console.error("[SessionStreamManager] Handler error:", error); } } + return false; } #removeOnceWaiter(key: string, waiter: OnceWaiter): void { diff --git a/packages/core/src/v3/sessionStreams/noopManager.ts b/packages/core/src/v3/sessionStreams/noopManager.ts index 42d97c9d4ea..77d1f40ac9f 100644 --- a/packages/core/src/v3/sessionStreams/noopManager.ts +++ b/packages/core/src/v3/sessionStreams/noopManager.ts @@ -6,7 +6,7 @@ export class NoopSessionStreamManager implements SessionStreamManager { on( _sessionId: string, _io: SessionChannelIO, - _handler: (data: unknown) => void | Promise + _handler: (data: unknown) => void | boolean | Promise ): { off: () => void } { return { off: () => {} }; } diff --git a/packages/core/src/v3/sessionStreams/types.ts b/packages/core/src/v3/sessionStreams/types.ts index f59d3ee9c7b..45be5149e2e 100644 --- a/packages/core/src/v3/sessionStreams/types.ts +++ b/packages/core/src/v3/sessionStreams/types.ts @@ -22,11 +22,20 @@ export type SessionChannelIO = "out" | "in"; * `.on` / `.once` / `.peek` / `.wait` / `.waitWithIdleTimeout`. */ export interface SessionStreamManager { - /** Register a handler that fires every time data arrives on the given channel. */ + /** + * Register a handler that fires every time data arrives on the given channel. + * + * A handler that synchronously returns `true` CONSUMES the record: it is + * not buffered for a later `once()` and the committed-consume cursor + * advances past it. Any other return value (including a Promise) leaves + * the record available to other consumers. Kind-filtering facades return + * `true` for the kinds they own so the same record is never delivered + * twice — once to the handler and again via a buffer drain. + */ on( sessionId: string, io: SessionChannelIO, - handler: (data: unknown) => void | Promise + handler: (data: unknown) => void | boolean | Promise ): { off: () => void }; /** Wait for the next record on the given channel (buffered or live). */ diff --git a/packages/core/src/v3/test/test-session-stream-manager.ts b/packages/core/src/v3/test/test-session-stream-manager.ts index a688790d616..f19e01dc33c 100644 --- a/packages/core/src/v3/test/test-session-stream-manager.ts +++ b/packages/core/src/v3/test/test-session-stream-manager.ts @@ -16,7 +16,10 @@ type OnceWaiter = { abortHandler?: () => void; }; -type Handler = (data: unknown) => void | Promise; +// Same contract as the production manager: a handler that synchronously +// returns `true` CONSUMES the record (not buffered, not re-delivered on a +// future `on()` attach). See `SessionStreamManager.on` in types.ts. +type Handler = (data: unknown) => void | boolean | Promise; function keyFor(sessionId: string, io: SessionChannelIO): string { return `${sessionId}:${io}`; @@ -51,20 +54,32 @@ export class TestSessionStreamManager implements SessionStreamManager { } set.add(handler); - // Note: we intentionally do NOT replay buffered records into the - // newly-registered handler, and we do NOT drain the buffer. The - // buffer is owned by `once()` — registering a passive observer - // (`on`) must not consume records destined for a future `once` - // waiter. This matches production SSE semantics where handlers - // observe records as they arrive, not retroactively. - // - // Earlier versions drained the buffer here, which caused user - // messages buffered during the runtime's `runFn` boot phase to be - // silently swallowed by the `stopInput.on()` handler registered at - // ai.ts:4806 (the stop handler ignores `kind: "message"` chunks). - // The next `messagesInput.waitWithIdleTimeout` then waited 30s for - // a record that had already been "delivered" to a handler that - // didn't want it. + // Selective drain, matching the production manager: offer each + // buffered record to the new handler and remove ONLY the ones it + // consumed (returned `true`). Records the handler filtered out (other + // kinds) stay buffered for a future `once()`. This is the corrected + // form of two historical bugs: a blind drain swallowed boot-phase user + // messages into the stop facade (which ignores `kind: "message"`), + // and no-drain-at-all let production re-deliver already-processed + // messages into every newly attached per-turn handler. + const buffered = this.buffer.get(key); + if (buffered && buffered.length > 0) { + const kept: unknown[] = []; + for (const data of buffered) { + let consumed = false; + try { + consumed = handler(data) === true; + } catch { + // Never let a handler error break test state + } + if (!consumed) kept.push(data); + } + if (kept.length > 0) { + this.buffer.set(key, kept); + } else { + this.buffer.delete(key); + } + } return { off: () => { @@ -212,20 +227,20 @@ export class TestSessionStreamManager implements SessionStreamManager { /** * Push a record onto the given channel. * - * Dispatch rules — similar to the production manager, but with a tweak - * that makes unit tests deterministic: + * Dispatch rules — same as the production manager: + * + * 1. **A pending `.once` waiter consumes first.** Handlers still observe + * a copy. + * 2. **Otherwise handlers observe.** A handler that synchronously + * returns `true` consumes the record (kind-filtering facades do this + * for the kinds they own) — it is NOT buffered. + * 3. **Records no one consumed are buffered** for the next `.once` call + * or the next consuming `on()` attach. * - * 1. **Handlers always observe** (like production). A session-level `.on` - * is a filter-observer — it fires every time a record arrives, - * regardless of whether a `.once` waiter is also active. - * 2. **First waiter consumes** the record if present (like production). - * 3. **If no waiter, the record is buffered for the next `.once` call.** - * Production discards records that only match handlers — but in - * production the SSE tail introduces enough latency that the next - * `.once` is usually registered before the next record arrives. Tests - * send synchronously right after `turn-complete`, so without this - * buffer the next `waitWithIdleTimeout` would race and lose the - * message. The buffer is the only deviation from production semantics. + * Handler promises are awaited before resolving so test code can rely + * on async handler work having settled by the time `__sendFromTest` + * resolves. Consumption is decided on the synchronous return value, + * exactly like production. */ async __sendFromTest( sessionId: string, @@ -234,23 +249,6 @@ export class TestSessionStreamManager implements SessionStreamManager { ): Promise { const key = keyFor(sessionId, io); - const handlers = this.handlers.get(key); - if (handlers && handlers.size > 0) { - // Awaited so test code can rely on handlers having completed by the - // time `__sendFromTest` resolves. Wrapped per-handler so a - // throwing/rejecting handler doesn't poison Promise.all and break - // unrelated test state. - await Promise.all( - Array.from(handlers).map(async (h) => { - try { - await h(data); - } catch { - // Never let a handler error break test state - } - }) - ); - } - const waiters = this.onceWaiters.get(key); if (waiters && waiters.length > 0) { const w = waiters.shift()!; @@ -260,6 +258,27 @@ export class TestSessionStreamManager implements SessionStreamManager { w.signal.removeEventListener("abort", w.abortHandler); } w.resolve({ ok: true, output: data }); + await this.#invokeHandlers(key, data); + return; + } + + const consumed = await this.#invokeHandlers(key, data); + if (consumed) return; + + // Re-check waiters: handler invocation above is awaited (unlike the + // synchronous production dispatch), and the runtime commonly registers + // its next `once()` during that window — e.g. the turn loop reaching + // `waitWithIdleTimeout` while a handler settles. Without this second + // look the record would be buffered while the fresh waiter hangs. + const lateWaiters = this.onceWaiters.get(key); + if (lateWaiters && lateWaiters.length > 0) { + const w = lateWaiters.shift()!; + if (lateWaiters.length === 0) this.onceWaiters.delete(key); + if (w.timer) clearTimeout(w.timer); + if (w.signal && w.abortHandler) { + w.signal.removeEventListener("abort", w.abortHandler); + } + w.resolve({ ok: true, output: data }); return; } @@ -271,6 +290,34 @@ export class TestSessionStreamManager implements SessionStreamManager { buffered.push(data); } + /** + * Invoke all handlers; resolves once any returned promises settle. + * Returns true when any handler synchronously consumed the record. + * Wrapped per-handler so a throwing/rejecting handler doesn't poison + * Promise.all and break unrelated test state. + */ + async #invokeHandlers(key: string, data: unknown): Promise { + const handlers = this.handlers.get(key); + if (!handlers || handlers.size === 0) return false; + + let consumed = false; + await Promise.all( + Array.from(handlers).map(async (h) => { + try { + const result = h(data); + if (result === true) { + consumed = true; + return; + } + await result; + } catch { + // Never let a handler error break test state + } + }) + ); + return consumed; + } + /** * Immediately resolve every pending `once()` waiter for the given channel * with a timeout error. Simulates a closed stream (e.g. session closed). diff --git a/packages/trigger-sdk/src/v3/ai.ts b/packages/trigger-sdk/src/v3/ai.ts index 853cf6e69e4..2d2cebfee14 100644 --- a/packages/trigger-sdk/src/v3/ai.ts +++ b/packages/trigger-sdk/src/v3/ai.ts @@ -1500,8 +1500,15 @@ const messagesInput: RealtimeDefinedInputStream = { on(handler) { return getChatSession().in.on((chunk) => { if (chunk.kind === "message") { - return handler(chunk.payload); + // Returning `true` marks the record CONSUMED at the manager level: + // it is neither buffered for a later `once()` nor re-delivered by + // the buffer drain when the next turn re-attaches its handler. + // Without this, a message arriving mid-stream was delivered twice + // and ran a duplicate turn. + void Promise.resolve(handler(chunk.payload)).catch(() => {}); + return true; } + return undefined; }); }, once(options) { @@ -1601,8 +1608,13 @@ const stopInput: RealtimeDefinedInputStream<{ stop: true; message?: string }> = on(handler) { return getChatSession().in.on((chunk) => { if (chunk.kind === "stop") { - return handler({ stop: true, message: chunk.message }); + // Consume stop records (see the messages facade above). A stop is + // only meaningful to the turn it interrupts — buffering it would + // let a stale stop abort a future turn. + void Promise.resolve(handler({ stop: true, message: chunk.message })).catch(() => {}); + return true; } + return undefined; }); }, once(options) { @@ -9518,7 +9530,8 @@ function createChatStartSessionAction( // run-list filter by chat works without the customer having to wire it // up. Mirrors the browser-mediated `TriggerChatTransport.doStart` path. const userTags = params.triggerConfig?.tags ?? options?.triggerConfig?.tags ?? []; - const tags = [`chat:${params.chatId}`, ...userTags].slice(0, 5); + // Platform cap is 10 tags per run; the auto chat tag takes one slot. + const tags = [`chat:${params.chatId}`, ...userTags].slice(0, 10); const clientDataMetadata = params.clientData !== undefined ? { metadata: params.clientData } : {}; diff --git a/packages/trigger-sdk/src/v3/chat-client.ts b/packages/trigger-sdk/src/v3/chat-client.ts index 0bbdddcfbea..92402948340 100644 --- a/packages/trigger-sdk/src/v3/chat-client.ts +++ b/packages/trigger-sdk/src/v3/chat-client.ts @@ -616,6 +616,9 @@ export class AgentChat { "Content-Type": "application/json", Authorization: `Bearer ${accessToken}`, "x-trigger-source": "sdk", + // Idempotency key: the server skips appends whose part id it has + // already committed, so a retried POST can't duplicate the record. + "X-Part-Id": crypto.randomUUID(), }; const response = await this.doFetch(ctx, url, { method: "POST", headers, body }); if (!response.ok) { diff --git a/packages/trigger-sdk/src/v3/chat-react.ts b/packages/trigger-sdk/src/v3/chat-react.ts index b14ac825bcb..495e235083d 100644 --- a/packages/trigger-sdk/src/v3/chat-react.ts +++ b/packages/trigger-sdk/src/v3/chat-react.ts @@ -299,6 +299,8 @@ export function usePendingMessages( // Internal state: track messages with their mode type InternalMessage = TUIMessage & { _mode: "steering" | "queued" }; const [pendingMsgs, setPendingMsgs] = useState([]); + const pendingMsgsRef = useRef(pendingMsgs); + pendingMsgsRef.current = pendingMsgs; const injectedIdsRef = useRef>(new Set()); const prevStatusRef = useRef(status); @@ -400,14 +402,18 @@ export function usePendingMessages( if (promotedIdsRef.current.has(id)) { return; } + + // Read from the ref, send OUTSIDE the state updater. React may invoke + // updaters more than once (StrictMode, update rebasing), so a network + // call inside one can double-send. + const msg = pendingMsgsRef.current.find((m) => m.id === id); + if (!msg || msg._mode !== "queued") return; promotedIdsRef.current.add(id); + transport.sendPendingMessage(chatId, msg, metadata); - setPendingMsgs((prev) => { - const msg = prev.find((m) => m.id === id); - if (!msg || msg._mode !== "queued") return prev; - transport.sendPendingMessage(chatId, msg, metadata); - return prev.map((m) => (m.id === id ? { ...m, _mode: "steering" as const } : m)); - }); + setPendingMsgs((prev) => + prev.map((m) => (m.id === id ? { ...m, _mode: "steering" as const } : m)) + ); }, [transport, chatId, metadata] ); diff --git a/packages/trigger-sdk/src/v3/chat.ts b/packages/trigger-sdk/src/v3/chat.ts index 74746744c7e..cdf9aae5435 100644 --- a/packages/trigger-sdk/src/v3/chat.ts +++ b/packages/trigger-sdk/src/v3/chat.ts @@ -615,11 +615,15 @@ export class TriggerChatTransport implements ChatTransport { const state = await this.ensureSessionState(chatId); + // Generated outside the closure so auth-retries reuse the same part id + // and the server-side dedupe sees one logical append. + const partId = crypto.randomUUID(); const sendChatMessage = async (token: string) => { await this.appendInputChunk( chatId, token, - this.serializeInputChunk({ kind: "message", payload: wirePayload }) + this.serializeInputChunk({ kind: "message", payload: wirePayload }), + partId ); }; @@ -800,11 +804,13 @@ export class TriggerChatTransport implements ChatTransport { metadata: mergedMetadata, }; + const partId = crypto.randomUUID(); const send = async (token: string) => { await this.appendInputChunk( chatId, token, - this.serializeInputChunk({ kind: "message", payload: wirePayload }) + this.serializeInputChunk({ kind: "message", payload: wirePayload }), + partId ); }; @@ -859,8 +865,14 @@ export class TriggerChatTransport implements ChatTransport { const state = this.sessions.get(chatId); if (!state) return false; + const partId = crypto.randomUUID(); const send = async (token: string) => { - await this.appendInputChunk(chatId, token, this.serializeInputChunk({ kind: "stop" })); + await this.appendInputChunk( + chatId, + token, + this.serializeInputChunk({ kind: "stop" }), + partId + ); }; try { @@ -876,6 +888,13 @@ export class TriggerChatTransport implements ChatTransport { activeStream.abort(); this.activeStreams.delete(chatId); } + + // The turn won't reach its turn-complete on this client (we just + // aborted the reader), so clear the streaming flag here and persist — + // otherwise a reload resumes mid-turn and replays the chunks the user + // explicitly stopped. + state.isStreaming = false; + this.notifySessionChange(chatId, state); return true; }; @@ -908,8 +927,9 @@ export class TriggerChatTransport implements ChatTransport { }; const body = this.serializeInputChunk({ kind: "message", payload: wirePayload }); + const partId = crypto.randomUUID(); const send = async (token: string) => { - await this.appendInputChunk(chatId, token, body); + await this.appendInputChunk(chatId, token, body, partId); }; await this.callWithAuthRetry(chatId, state, send); @@ -1085,13 +1105,21 @@ export class TriggerChatTransport implements ChatTransport { return this.fetchOverride ? this.fetchOverride(url, init, ctx) : fetch(url, init); } - private async appendInputChunk(chatId: string, token: string, body: string): Promise { + private async appendInputChunk( + chatId: string, + token: string, + body: string, + partId?: string + ): Promise { const ctx: ChatTransportEndpointContext = { endpoint: "in", chatId }; const url = `${this.resolveBaseURL(ctx)}/realtime/v1/sessions/${encodeURIComponent(chatId)}/in/append`; const headers: Record = { "Content-Type": "application/json", Authorization: `Bearer ${token}`, "x-trigger-source": "sdk", + // Idempotency key: the server skips appends whose part id it has + // already committed, so a retried POST can't duplicate the record. + "X-Part-Id": partId ?? crypto.randomUUID(), ...this.extraHeaders, }; const response = await this.doFetch(ctx, url, { method: "POST", headers, body }); diff --git a/packages/trigger-sdk/src/v3/sessions.ts b/packages/trigger-sdk/src/v3/sessions.ts index ea3ebd8d937..fc47a037914 100644 --- a/packages/trigger-sdk/src/v3/sessions.ts +++ b/packages/trigger-sdk/src/v3/sessions.ts @@ -626,15 +626,19 @@ export class SessionInputChannel { /** * Register a handler that fires for every record landing on `.in`. - * Handlers are flushed with any buffered records on attach and cleaned - * up automatically when the task run completes. Returns `{ off }` to - * unsubscribe early. + * Buffered records are offered to the handler on attach, and handlers + * are cleaned up automatically when the task run completes. Returns + * `{ off }` to unsubscribe early. + * + * A handler that synchronously returns `true` CONSUMES the record: it + * won't be buffered for a later `once()` and won't be re-delivered on a + * future `on()` attach. Plain observers should return nothing. */ - on(handler: (data: T) => void | Promise): { off: () => void } { + on(handler: (data: T) => void | boolean | Promise): { off: () => void } { return sessionStreams.on( this.sessionId, "in", - handler as (data: unknown) => void | Promise + handler as (data: unknown) => void | boolean | Promise ); } From 2905009769545fba6876744a77319834322a875e Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Wed, 10 Jun 2026 14:23:44 +0100 Subject: [PATCH 2/3] fix(sdk,core): keep errored chat turns recoverable and bound stream growth Fire onTurnComplete on errored turns (with the thrown error attached) and persist a snapshot of the failed turn so its user message is not stranded past the resume cursor on the next run. Custom agents and manual chat.writeTurnComplete callers now trim the output stream the same way the built-in agent does, so it no longer grows without bound. Sending a custom action supersedes any in-flight reader instead of leaving two readers racing the resume cursor, and a long-lived watch subscription caps its dedupe set. --- .changeset/chat-agent-hardening.md | 2 +- packages/core/src/v3/apiClient/runStream.ts | 15 ++- packages/trigger-sdk/src/v3/ai.ts | 102 +++++++++++++++++++- packages/trigger-sdk/src/v3/chat.ts | 10 ++ 4 files changed, 124 insertions(+), 5 deletions(-) diff --git a/.changeset/chat-agent-hardening.md b/.changeset/chat-agent-hardening.md index fd03aa796cc..0ea82c0617e 100644 --- a/.changeset/chat-agent-hardening.md +++ b/.changeset/chat-agent-hardening.md @@ -3,4 +3,4 @@ "@trigger.dev/core": patch --- -Reliability fixes for `chat.agent`. A user message sent while the agent is streaming is no longer delivered twice (which could run a duplicate turn), input appends now carry an idempotency key so a retried send can't duplicate a message, stopping a generation clears the streaming state so a page reload doesn't replay the stopped turn, and runs can now carry the full set of dashboard tags instead of being silently truncated. +Reliability fixes for `chat.agent`. A user message sent while the agent is streaming is no longer delivered twice (which could run a duplicate turn), input appends now carry an idempotency key so a retried send can't duplicate a message, stopping a generation clears the streaming state so a page reload doesn't replay the stopped turn, and runs can now carry the full set of dashboard tags instead of being silently truncated. `onTurnComplete` now fires on errored turns (with the thrown error attached) and the failed turn's user message is persisted so it isn't lost on the next run. Custom agents and manual `chat.writeTurnComplete` callers now trim the output stream, sending a custom action no longer leaves a second stream reader running, and a long-lived `watch` subscription no longer grows its dedupe set without bound. diff --git a/packages/core/src/v3/apiClient/runStream.ts b/packages/core/src/v3/apiClient/runStream.ts index 4b60bb410fa..6d58dff1a70 100644 --- a/packages/core/src/v3/apiClient/runStream.ts +++ b/packages/core/src/v3/apiClient/runStream.ts @@ -371,7 +371,20 @@ export class SSEStreamSubscription implements StreamSubscription { this.retryCount = 0; // reset on success armStall(); + // Dedup window for record ids. Bounded with FIFO eviction so a + // long-lived `watch: true` subscription (one connection across many + // turns) doesn't grow this set without bound. The window only needs + // to cover the overlap a reconnect/replay can re-deliver, so a few + // thousand ids is ample. + const SEEN_IDS_CAP = 5000; const seenIds = new Set(); + const rememberSeen = (id: string) => { + seenIds.add(id); + if (seenIds.size > SEEN_IDS_CAP) { + const oldest = seenIds.values().next().value; + if (oldest !== undefined) seenIds.delete(oldest); + } + }; const stream = response.body .pipeThrough(new TextDecoderStream()) @@ -426,7 +439,7 @@ export class SSEStreamSubscription implements StreamSubscription { | undefined; if (parsedBody?.id) { if (seenIds.has(parsedBody.id)) continue; - seenIds.add(parsedBody.id); + rememberSeen(parsedBody.id); } chunkController.enqueue({ id: record.seq_num.toString(), diff --git a/packages/trigger-sdk/src/v3/ai.ts b/packages/trigger-sdk/src/v3/ai.ts index 2d2cebfee14..67027859062 100644 --- a/packages/trigger-sdk/src/v3/ai.ts +++ b/packages/trigger-sdk/src/v3/ai.ts @@ -4252,6 +4252,13 @@ export type TurnCompleteEvent + > | undefined; try { await withChatWriter(async (writer) => { const errorText = @@ -7563,11 +7578,81 @@ function chatAgent< writer.write({ type: "error", errorText } as any); }); // Signal turn complete so the client knows this turn is done - await writeTurnCompleteChunk(currentWirePayload.chatId); + errorTurnCompleteResult = await writeTurnCompleteChunk(currentWirePayload.chatId); } catch { // Best-effort — if stream write fails, let the run continue anyway } + // Fire onTurnComplete on the error path too — the docs promise it + // runs "after every turn, successful or errored" so customers can + // mark the turn failed. `responseMessage` is undefined/partial and + // `error` carries the thrown value. + if (onTurnComplete) { + try { + await tracer.startActiveSpan( + "onTurnComplete()", + async () => { + await onTurnComplete({ + ctx, + chatId: currentWirePayload.chatId, + messages: accumulatedMessages, + uiMessages: accumulatedUIMessages, + newMessages: [], + newUIMessages: [], + responseMessage: undefined, + rawResponseMessage: undefined, + turn, + runId: ctx.run.id, + chatAccessToken: "", + clientData: currentWirePayload.metadata as inferSchemaIn, + stopped: false, + continuation, + previousRunId, + preloaded, + totalUsage: cumulativeUsage, + finishReason: "error", + error: turnError, + lastEventId: errorTurnCompleteResult?.lastEventId, + }); + }, + { + attributes: { + [SemanticInternalAttributes.STYLE_ICON]: "task-hook-onComplete", + [SemanticInternalAttributes.COLLAPSED]: true, + "chat.id": currentWirePayload.chatId, + "chat.turn": turn + 1, + "chat.errored": true, + }, + } + ); + } catch { + // A throwing onTurnComplete on the error path must not crash + // the run — keep the conversation alive for the next message. + } + } + + // Persist a snapshot so the failed turn's user message isn't + // stranded. `writeTurnCompleteChunk` already advanced the `.in` + // cursor past it (via the session-in-event-id header), and the + // success-path snapshot write is skipped on error — without this + // the next boot would resume past a message that exists in + // neither the snapshot nor the replayable `.in` tail. + if (!hydrateMessages) { + try { + await writeChatSnapshot(sessionIdForSnapshot, { + version: 1, + savedAt: Date.now(), + messages: accumulatedUIMessages, + lastOutEventId: errorTurnCompleteResult?.lastEventId, + }); + } catch (error) { + logger.warn("chat.agent: error-path snapshot write failed", { + error: error instanceof Error ? error.message : String(error), + sessionId: sessionIdForSnapshot, + }); + } + } + // chat.requestUpgrade() / chat.endRun() — exit after error turn too if ( locals.get(chatUpgradeRequestedKey) || @@ -9873,8 +9958,19 @@ async function writeTurnCompleteChunk( // 2. Trim back to the previous turn-complete, if we have one. Skipping on // first-turn-ever (or first turn post-OOM without a snapshot seed) is // fine — the chain catches up next turn. - const slot = locals.get(lastTurnCompleteSeqNumKey); - const prev = slot?.value; + // + // Lazily create the slot if a caller reached here without one (a plain + // `task()` driving `chat.createSession` / `chat.writeTurnComplete`, vs. + // chatAgent/chatCustomAgent which seed it at boot). The first call then + // does no trim (nothing before it) and records its seq; later calls trim + // — so `.out` is bounded for every writeTurnComplete caller, not just the + // built-in agents. + let slot = locals.get(lastTurnCompleteSeqNumKey); + if (!slot) { + slot = { value: undefined }; + locals.set(lastTurnCompleteSeqNumKey, slot); + } + const prev = slot.value; if (slot && prev !== undefined) { try { await session.out.trimTo(prev); diff --git a/packages/trigger-sdk/src/v3/chat.ts b/packages/trigger-sdk/src/v3/chat.ts index cdf9aae5435..224ffe0e41c 100644 --- a/packages/trigger-sdk/src/v3/chat.ts +++ b/packages/trigger-sdk/src/v3/chat.ts @@ -934,6 +934,16 @@ export class TriggerChatTransport implements ChatTransport { await this.callWithAuthRetry(chatId, state, send); + // Supersede any in-flight reader before subscribing — same as + // `sendMessages`. Two concurrent readers both write `state.lastEventId` + // and the slower one can regress the cursor, replaying records on the + // next reconnect. + const activeStream = this.activeStreams.get(chatId); + if (activeStream) { + activeStream.abort(); + this.activeStreams.delete(chatId); + } + return this.subscribeToSessionStream(state, undefined, chatId); }; From 5122b0695432285261c6531fdac78c0eec3f32c4 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Wed, 10 Jun 2026 17:20:58 +0100 Subject: [PATCH 3/3] fix(sdk,core): address review feedback on chat.agent hardening Fold the failed turn's wire message into the error-path snapshot and onTurnComplete event so an early pre-run throw cannot strand it, and stop passing raw metadata as the parsed clientData on errored turns. Mark the session streaming before subscribing in sendAction so a reload mid-action resumes. Keep the per-append X-Part-Id from being overridden by a transport-wide header, and align the server-side append part id entropy with the browser transport. Adds tests for the error-turn snapshot, sendAction streaming state, and the X-Part-Id header precedence. --- packages/core/src/v3/apiClient/index.ts | 5 +- packages/trigger-sdk/src/v3/ai.ts | 22 +++++-- packages/trigger-sdk/src/v3/chat.test.ts | 59 +++++++++++++++++++ packages/trigger-sdk/src/v3/chat.ts | 11 +++- .../trigger-sdk/test/mockChatAgent.test.ts | 57 ++++++++++++++++++ 5 files changed, 145 insertions(+), 9 deletions(-) diff --git a/packages/core/src/v3/apiClient/index.ts b/packages/core/src/v3/apiClient/index.ts index c96abf6a435..652b016de92 100644 --- a/packages/core/src/v3/apiClient/index.ts +++ b/packages/core/src/v3/apiClient/index.ts @@ -1279,8 +1279,9 @@ export class ApiClient { ) { // Generated once per logical append, outside zodfetch, so its internal // retries reuse the same part id and the server-side dedupe collapses a - // retried POST whose first attempt actually committed. - const partId = nanoid(7); + // retried POST whose first attempt actually committed. Full-length nanoid + // (~126 bits) to match the browser transport's randomUUID entropy. + const partId = nanoid(); return zodfetch( AppendToStreamResponseBody, `${this.baseUrl}/realtime/v1/sessions/${encodeURIComponent(sessionIdOrExternalId)}/${io}/append`, diff --git a/packages/trigger-sdk/src/v3/ai.ts b/packages/trigger-sdk/src/v3/ai.ts index 67027859062..5b448ed3336 100644 --- a/packages/trigger-sdk/src/v3/ai.ts +++ b/packages/trigger-sdk/src/v3/ai.ts @@ -7583,6 +7583,17 @@ function chatAgent< // Best-effort — if stream write fails, let the run continue anyway } + // The submit-message merge into the accumulator may not have run + // yet (a pre-run hook threw), so fold the wire message in for the + // error event + snapshot — the cursor has already advanced past it, + // so otherwise it survives in neither the snapshot nor the `.in` tail. + const erroredWireMessage = (currentWirePayload as { message?: TUIMessage }).message; + const erroredUIMessages = + erroredWireMessage && + !accumulatedUIMessages.some((m) => m.id === erroredWireMessage.id) + ? [...accumulatedUIMessages, erroredWireMessage] + : accumulatedUIMessages; + // Fire onTurnComplete on the error path too — the docs promise it // runs "after every turn, successful or errored" so customers can // mark the turn failed. `responseMessage` is undefined/partial and @@ -7596,15 +7607,18 @@ function chatAgent< ctx, chatId: currentWirePayload.chatId, messages: accumulatedMessages, - uiMessages: accumulatedUIMessages, + uiMessages: erroredUIMessages, newMessages: [], - newUIMessages: [], + newUIMessages: erroredWireMessage ? [erroredWireMessage] : [], responseMessage: undefined, rawResponseMessage: undefined, turn, runId: ctx.run.id, chatAccessToken: "", - clientData: currentWirePayload.metadata as inferSchemaIn, + // Parsed `clientData` isn't reliably in scope here (parsing + // may itself be the failure), and the raw metadata is the + // wrong shape — leave it undefined on the error path. + clientData: undefined, stopped: false, continuation, previousRunId, @@ -7642,7 +7656,7 @@ function chatAgent< await writeChatSnapshot(sessionIdForSnapshot, { version: 1, savedAt: Date.now(), - messages: accumulatedUIMessages, + messages: erroredUIMessages, lastOutEventId: errorTurnCompleteResult?.lastEventId, }); } catch (error) { diff --git a/packages/trigger-sdk/src/v3/chat.test.ts b/packages/trigger-sdk/src/v3/chat.test.ts index 6469f1ac86c..e4fc45eae4b 100644 --- a/packages/trigger-sdk/src/v3/chat.test.ts +++ b/packages/trigger-sdk/src/v3/chat.test.ts @@ -1005,6 +1005,65 @@ describe("TriggerChatTransport", () => { expect(actionBody.payload.trigger).toBe("action"); expect(actionBody.payload.action).toEqual({ type: "undo" }); }); + + it("marks the session streaming and notifies before subscribing", async () => { + global.fetch = vi.fn().mockImplementation(async (url: string | URL) => { + const urlStr = typeof url === "string" ? url : url.toString(); + if (isSessionStreamAppendUrl(urlStr)) return defaultAppendResponse(); + if (isSessionOutSubscribeUrl(urlStr)) return defaultSseResponse(); + throw new Error(`Unexpected URL: ${urlStr}`); + }); + + const onSessionChange = vi.fn(); + const transport = new TriggerChatTransport({ + task: "my-chat-task", + accessToken: () => "pat", + onSessionChange, + sessions: { "chat-act-stream": { publicAccessToken: "p" } }, + }); + + const stream = await transport.sendAction("chat-act-stream", { type: "undo" }); + // isStreaming:true must be observed during the action — otherwise a reload + // mid-action sees a persisted isStreaming:false and never resumes. + expect( + onSessionChange.mock.calls.some(([, session]) => session && session.isStreaming === true) + ).toBe(true); + await drainChunks(stream); + }); + }); + + describe("append idempotency header", () => { + it("the per-append X-Part-Id wins over a transport-wide headers override", async () => { + let appendPartId: string | undefined; + global.fetch = vi.fn().mockImplementation(async (url: string | URL, init?: RequestInit) => { + const urlStr = typeof url === "string" ? url : url.toString(); + if (isSessionStreamAppendUrl(urlStr)) { + appendPartId = (init!.headers as Record)["X-Part-Id"]; + return defaultAppendResponse(); + } + if (isSessionOutSubscribeUrl(urlStr)) return defaultSseResponse(); + throw new Error(`Unexpected URL: ${urlStr}`); + }); + + const transport = new TriggerChatTransport({ + task: "my-chat-task", + accessToken: () => "pat", + headers: { "X-Part-Id": "STATIC-OVERRIDE" }, + sessions: { "chat-hdr": { publicAccessToken: "p" } }, + }); + + const stream = await transport.sendMessages({ + trigger: "submit-message", + chatId: "chat-hdr", + messageId: undefined, + messages: [createUserMessage("hi")], + abortSignal: undefined, + }); + await drainChunks(stream); + + expect(appendPartId).toBeDefined(); + expect(appendPartId).not.toBe("STATIC-OVERRIDE"); + }); }); describe("reconnectToStream", () => { diff --git a/packages/trigger-sdk/src/v3/chat.ts b/packages/trigger-sdk/src/v3/chat.ts index 224ffe0e41c..e0d37ad7d47 100644 --- a/packages/trigger-sdk/src/v3/chat.ts +++ b/packages/trigger-sdk/src/v3/chat.ts @@ -944,6 +944,11 @@ export class TriggerChatTransport implements ChatTransport { this.activeStreams.delete(chatId); } + // Mark streaming + persist so a reload mid-action resumes (reconnectToStream + // no-ops when the persisted session says isStreaming: false). + state.isStreaming = true; + this.notifySessionChange(chatId, state); + return this.subscribeToSessionStream(state, undefined, chatId); }; @@ -1123,14 +1128,14 @@ export class TriggerChatTransport implements ChatTransport { ): Promise { const ctx: ChatTransportEndpointContext = { endpoint: "in", chatId }; const url = `${this.resolveBaseURL(ctx)}/realtime/v1/sessions/${encodeURIComponent(chatId)}/in/append`; + // extraHeaders first so the fixed headers below win — a transport-wide + // X-Part-Id must not override the per-append idempotency key. const headers: Record = { + ...this.extraHeaders, "Content-Type": "application/json", Authorization: `Bearer ${token}`, "x-trigger-source": "sdk", - // Idempotency key: the server skips appends whose part id it has - // already committed, so a retried POST can't duplicate the record. "X-Part-Id": partId ?? crypto.randomUUID(), - ...this.extraHeaders, }; const response = await this.doFetch(ctx, url, { method: "POST", headers, body }); if (!response.ok) { diff --git a/packages/trigger-sdk/test/mockChatAgent.test.ts b/packages/trigger-sdk/test/mockChatAgent.test.ts index 7245622b6e4..5038f21ca24 100644 --- a/packages/trigger-sdk/test/mockChatAgent.test.ts +++ b/packages/trigger-sdk/test/mockChatAgent.test.ts @@ -1140,6 +1140,63 @@ describe("mockChatAgent", () => { } }); + it("error turn: onTurnComplete fires with the error and the failed message is snapshotted", async () => { + const onTurnComplete = vi.fn(); + const agent = chat.agent({ + id: "mockChatAgent.error-turn-run", + onTurnComplete, + run: async () => { + throw new Error("boom in run"); + }, + }); + const harness = mockChatAgent(agent, { chatId: "err-run" }); + try { + await harness.sendMessage(userMessage("hello", "u-err-run")); + await new Promise((r) => setTimeout(r, 50)); + expect(onTurnComplete).toHaveBeenCalledTimes(1); + const evt = onTurnComplete.mock.calls[0]![0]; + expect(evt.error).toBeInstanceOf(Error); + expect(evt.finishReason).toBe("error"); + expect(evt.responseMessage).toBeUndefined(); + expect(evt.uiMessages.some((m: any) => m.id === "u-err-run")).toBe(true); + const snap = harness.getSnapshot(); + expect(snap?.messages.some((m) => m.id === "u-err-run")).toBe(true); + } finally { + await harness.close(); + } + }); + + it("error turn: a pre-merge hook throw still snapshots the failed user message", async () => { + const model = new MockLanguageModelV3({ + doStream: async () => ({ stream: textStream("never reached") }), + }); + const onTurnComplete = vi.fn(); + const agent = chat.agent({ + id: "mockChatAgent.error-turn-prehook", + // onValidateMessages fires BEFORE the wire message is merged into the + // accumulator, so the message lands in the snapshot only because the + // error path folds the wire message back in. + onValidateMessages: async () => { + throw new Error("boom in validate"); + }, + onTurnComplete, + run: async ({ messages, signal }) => streamText({ model, messages, abortSignal: signal }), + }); + const harness = mockChatAgent(agent, { chatId: "err-prehook" }); + try { + await harness.sendMessage(userMessage("validate me", "u-err-prehook")); + await new Promise((r) => setTimeout(r, 50)); + expect(onTurnComplete).toHaveBeenCalledTimes(1); + const evt = onTurnComplete.mock.calls[0]![0]; + expect(evt.error).toBeInstanceOf(Error); + expect(evt.uiMessages.some((m: any) => m.id === "u-err-prehook")).toBe(true); + const snap = harness.getSnapshot(); + expect(snap?.messages.some((m) => m.id === "u-err-prehook")).toBe(true); + } finally { + await harness.close(); + } + }); + it("seeds locals before run() via setupLocals (DI pattern)", async () => { type FakeDb = { findUser(id: string): Promise<{ id: string; name: string }> }; const dbKey = locals.create("test-db");