diff --git a/.changeset/chat-agent-hardening.md b/.changeset/chat-agent-hardening.md new file mode 100644 index 00000000000..0ea82c0617e --- /dev/null +++ b/.changeset/chat-agent-hardening.md @@ -0,0 +1,6 @@ +--- +"@trigger.dev/sdk": patch +"@trigger.dev/core": patch +--- + +Reliability fixes for `chat.agent`. A user message sent while the agent is streaming is no longer delivered twice (which could run a duplicate turn), input appends now carry an idempotency key so a retried send can't duplicate a message, stopping a generation clears the streaming state so a page reload doesn't replay the stopped turn, and runs can now carry the full set of dashboard tags instead of being silently truncated. `onTurnComplete` now fires on errored turns (with the thrown error attached) and the failed turn's user message is persisted so it isn't lost on the next run. Custom agents and manual `chat.writeTurnComplete` callers now trim the output stream, sending a custom action no longer leaves a second stream reader running, and a long-lived `watch` subscription no longer grows its dedupe set without bound. diff --git a/packages/core/src/v3/apiClient/index.ts b/packages/core/src/v3/apiClient/index.ts index 4bbeca8bd31..652b016de92 100644 --- a/packages/core/src/v3/apiClient/index.ts +++ b/packages/core/src/v3/apiClient/index.ts @@ -1,3 +1,4 @@ +import { nanoid } from "nanoid"; import { z } from "zod"; import { VERSION } from "../../version.js"; import { generateJWT } from "../jwt.js"; @@ -1276,12 +1277,17 @@ export class ApiClient { part: TBody, requestOptions?: ZodFetchOptions ) { + // Generated once per logical append, outside zodfetch, so its internal + // retries reuse the same part id and the server-side dedupe collapses a + // retried POST whose first attempt actually committed. Full-length nanoid + // (~126 bits) to match the browser transport's randomUUID entropy. + const partId = nanoid(); return zodfetch( AppendToStreamResponseBody, `${this.baseUrl}/realtime/v1/sessions/${encodeURIComponent(sessionIdOrExternalId)}/${io}/append`, { method: "POST", - headers: this.#getHeaders(false), + headers: { ...this.#getHeaders(false), "X-Part-Id": partId }, body: part, }, mergeRequestOptions(this.defaultRequestOptions, requestOptions) diff --git a/packages/core/src/v3/apiClient/runStream.ts b/packages/core/src/v3/apiClient/runStream.ts index 4b60bb410fa..6d58dff1a70 100644 --- a/packages/core/src/v3/apiClient/runStream.ts +++ b/packages/core/src/v3/apiClient/runStream.ts @@ -371,7 +371,20 @@ export class SSEStreamSubscription implements StreamSubscription { this.retryCount = 0; // reset on success armStall(); + // Dedup window for record ids. Bounded with FIFO eviction so a + // long-lived `watch: true` subscription (one connection across many + // turns) doesn't grow this set without bound. The window only needs + // to cover the overlap a reconnect/replay can re-deliver, so a few + // thousand ids is ample. + const SEEN_IDS_CAP = 5000; const seenIds = new Set(); + const rememberSeen = (id: string) => { + seenIds.add(id); + if (seenIds.size > SEEN_IDS_CAP) { + const oldest = seenIds.values().next().value; + if (oldest !== undefined) seenIds.delete(oldest); + } + }; const stream = response.body .pipeThrough(new TextDecoderStream()) @@ -426,7 +439,7 @@ export class SSEStreamSubscription implements StreamSubscription { | undefined; if (parsedBody?.id) { if (seenIds.has(parsedBody.id)) continue; - seenIds.add(parsedBody.id); + rememberSeen(parsedBody.id); } chunkController.enqueue({ id: record.seq_num.toString(), diff --git a/packages/core/src/v3/sessionStreams/index.ts b/packages/core/src/v3/sessionStreams/index.ts index 7ea17261632..c150f0fe9df 100644 --- a/packages/core/src/v3/sessionStreams/index.ts +++ b/packages/core/src/v3/sessionStreams/index.ts @@ -34,7 +34,7 @@ export class SessionStreamsAPI implements SessionStreamManager { public on( sessionId: string, io: SessionChannelIO, - handler: (data: unknown) => void | Promise + handler: (data: unknown) => void | boolean | Promise ): { off: () => void } { return this.#getManager().on(sessionId, io, handler); } diff --git a/packages/core/src/v3/sessionStreams/manager.ts b/packages/core/src/v3/sessionStreams/manager.ts index 5d07d841ba4..65eaf40f9cc 100644 --- a/packages/core/src/v3/sessionStreams/manager.ts +++ b/packages/core/src/v3/sessionStreams/manager.ts @@ -9,7 +9,11 @@ import { computeReconnectDelayMs } from "../utils/reconnectBackoff.js"; import { SessionChannelIO, SessionStreamManager } from "./types.js"; import { controlSubtype } from "./wireProtocol.js"; -type SessionStreamHandler = (data: unknown) => void | Promise; +// A handler that synchronously returns `true` CONSUMES the record: it is +// not buffered for a later `once()` and the committed-consume cursor +// advances past it. Anything else (void, a Promise) leaves the record +// available to other consumers. See `SessionStreamManager.on` in types.ts. +type SessionStreamHandler = (data: unknown) => void | boolean | Promise; type OnceWaiter = { resolve: (result: InputStreamOnceResult) => void; @@ -113,30 +117,41 @@ export class StandardSessionStreamManager implements SessionStreamManager { this.explicitlyDisconnected.delete(key); this.#ensureTailConnected(sessionId, io); + // Selective drain: offer each buffered record to the new handler and + // remove ONLY the ones it consumed (returned `true` — e.g. the + // messages facade for message-kind records). Consumed records advance + // the committed-consume cursor, so a worker using `messagesInput.on()` + // for user-message delivery persists a `.in` cursor that matches what + // the handler processed. Records the handler did not consume (other + // kinds) STAY buffered for a future `once()` or a different handler — + // a blind drain here either swallowed them (delivered to a handler + // that filtered them out, then deleted) or re-delivered already- + // processed messages into every newly attached per-turn handler, + // duplicating turns. const buffered = this.buffer.get(key); if (buffered && buffered.length > 0) { - for (const data of buffered) { - this.#invokeHandler(handler, data); - } - // Advance the committed-consume cursor to the highest seq drained - // into the new handler. `on()`-drain removes the records from the - // buffer, so they're no longer available to a future `once()` — - // from the manager's perspective they've been consumed. Without - // this, a worker that uses `messagesInput.on()` for user-message - // delivery (pendingMessages mode) would persist a `.in` cursor - // that lags behind the records the handler already processed, and - // the next boot would re-deliver them. - const seqList = this.bufferSeqNums.get(key); - if (seqList) { - for (const s of seqList) { + const seqList = this.bufferSeqNums.get(key) ?? []; + const keptRecords: unknown[] = []; + // Kept in lock-step with `keptRecords` — drifting lengths would map + // seq_nums to the wrong records on subsequent shifts. + const keptSeqNums: Array = []; + for (let i = 0; i < buffered.length; i++) { + const consumed = this.#invokeHandler(handler, buffered[i]); + if (consumed) { + const s = seqList[i]; if (s !== undefined) this.#advanceLastDispatched(key, s); + } else { + keptRecords.push(buffered[i]); + keptSeqNums.push(seqList[i]); } } - this.buffer.delete(key); - // Keep `bufferSeqNums` in lock-step with `buffer` — without this, - // the parallel array desyncs and the next `#dispatch` that buffers - // a record would shift a stale seqNum into `lastDispatchedSeqNum`. - this.bufferSeqNums.delete(key); + if (keptRecords.length > 0) { + this.buffer.set(key, keptRecords); + this.bufferSeqNums.set(key, keptSeqNums); + } else { + this.buffer.delete(key); + this.bufferSeqNums.delete(key); + } } return { @@ -509,13 +524,21 @@ export class StandardSessionStreamManager implements SessionStreamManager { return; } - // Persistent handlers (e.g. `stopInput.on(...)`) get a copy of the chunk, - // but they don't "consume" it — handlers usually filter by `kind` and - // ignore chunks they don't care about. Buffer the chunk regardless so a - // subsequent `once()` (e.g. `messagesInput.waitWithIdleTimeout` in - // chat.agent's preload) can still pick up the same chunk that arrived - // before its waiter was registered. - this.#invokeHandlers(key, data); + // Persistent handlers get a copy of the chunk. A handler that + // synchronously returns `true` CONSUMES it (e.g. the messages facade + // for message-kind records): the record must not also be buffered, or + // the next `on()` attach / `once()` would deliver it a second time — + // in chat.agent's turn loop that duplicated user messages into a + // second turn. Records no handler consumed (e.g. a message arriving + // while only the stop facade is attached during preload) are buffered + // so a subsequent `once()` can still pick them up. + const consumed = this.#invokeHandlers(key, data); + if (consumed) { + if (seqNum !== undefined) { + this.#advanceLastDispatched(key, seqNum); + } + return; + } let buffered = this.buffer.get(key); if (!buffered) { @@ -535,17 +558,24 @@ export class StandardSessionStreamManager implements SessionStreamManager { bufferedSeqs.push(seqNum); } - #invokeHandlers(key: string, data: unknown): void { + /** Returns true when any handler consumed the record. All handlers are invoked regardless. */ + #invokeHandlers(key: string, data: unknown): boolean { const handlers = this.handlers.get(key); - if (!handlers) return; + if (!handlers) return false; + let consumed = false; for (const handler of handlers) { - this.#invokeHandler(handler, data); + if (this.#invokeHandler(handler, data)) { + consumed = true; + } } + return consumed; } - #invokeHandler(handler: SessionStreamHandler, data: unknown): void { + /** Returns true when the handler synchronously consumed the record (returned `true`). */ + #invokeHandler(handler: SessionStreamHandler, data: unknown): boolean { try { const result = handler(data); + if (result === true) return true; if (result && typeof result === "object" && "catch" in result) { (result as Promise).catch((error) => { if (this.debug) { @@ -558,6 +588,7 @@ export class StandardSessionStreamManager implements SessionStreamManager { console.error("[SessionStreamManager] Handler error:", error); } } + return false; } #removeOnceWaiter(key: string, waiter: OnceWaiter): void { diff --git a/packages/core/src/v3/sessionStreams/noopManager.ts b/packages/core/src/v3/sessionStreams/noopManager.ts index 42d97c9d4ea..77d1f40ac9f 100644 --- a/packages/core/src/v3/sessionStreams/noopManager.ts +++ b/packages/core/src/v3/sessionStreams/noopManager.ts @@ -6,7 +6,7 @@ export class NoopSessionStreamManager implements SessionStreamManager { on( _sessionId: string, _io: SessionChannelIO, - _handler: (data: unknown) => void | Promise + _handler: (data: unknown) => void | boolean | Promise ): { off: () => void } { return { off: () => {} }; } diff --git a/packages/core/src/v3/sessionStreams/types.ts b/packages/core/src/v3/sessionStreams/types.ts index f59d3ee9c7b..45be5149e2e 100644 --- a/packages/core/src/v3/sessionStreams/types.ts +++ b/packages/core/src/v3/sessionStreams/types.ts @@ -22,11 +22,20 @@ export type SessionChannelIO = "out" | "in"; * `.on` / `.once` / `.peek` / `.wait` / `.waitWithIdleTimeout`. */ export interface SessionStreamManager { - /** Register a handler that fires every time data arrives on the given channel. */ + /** + * Register a handler that fires every time data arrives on the given channel. + * + * A handler that synchronously returns `true` CONSUMES the record: it is + * not buffered for a later `once()` and the committed-consume cursor + * advances past it. Any other return value (including a Promise) leaves + * the record available to other consumers. Kind-filtering facades return + * `true` for the kinds they own so the same record is never delivered + * twice — once to the handler and again via a buffer drain. + */ on( sessionId: string, io: SessionChannelIO, - handler: (data: unknown) => void | Promise + handler: (data: unknown) => void | boolean | Promise ): { off: () => void }; /** Wait for the next record on the given channel (buffered or live). */ diff --git a/packages/core/src/v3/test/test-session-stream-manager.ts b/packages/core/src/v3/test/test-session-stream-manager.ts index a688790d616..f19e01dc33c 100644 --- a/packages/core/src/v3/test/test-session-stream-manager.ts +++ b/packages/core/src/v3/test/test-session-stream-manager.ts @@ -16,7 +16,10 @@ type OnceWaiter = { abortHandler?: () => void; }; -type Handler = (data: unknown) => void | Promise; +// Same contract as the production manager: a handler that synchronously +// returns `true` CONSUMES the record (not buffered, not re-delivered on a +// future `on()` attach). See `SessionStreamManager.on` in types.ts. +type Handler = (data: unknown) => void | boolean | Promise; function keyFor(sessionId: string, io: SessionChannelIO): string { return `${sessionId}:${io}`; @@ -51,20 +54,32 @@ export class TestSessionStreamManager implements SessionStreamManager { } set.add(handler); - // Note: we intentionally do NOT replay buffered records into the - // newly-registered handler, and we do NOT drain the buffer. The - // buffer is owned by `once()` — registering a passive observer - // (`on`) must not consume records destined for a future `once` - // waiter. This matches production SSE semantics where handlers - // observe records as they arrive, not retroactively. - // - // Earlier versions drained the buffer here, which caused user - // messages buffered during the runtime's `runFn` boot phase to be - // silently swallowed by the `stopInput.on()` handler registered at - // ai.ts:4806 (the stop handler ignores `kind: "message"` chunks). - // The next `messagesInput.waitWithIdleTimeout` then waited 30s for - // a record that had already been "delivered" to a handler that - // didn't want it. + // Selective drain, matching the production manager: offer each + // buffered record to the new handler and remove ONLY the ones it + // consumed (returned `true`). Records the handler filtered out (other + // kinds) stay buffered for a future `once()`. This is the corrected + // form of two historical bugs: a blind drain swallowed boot-phase user + // messages into the stop facade (which ignores `kind: "message"`), + // and no-drain-at-all let production re-deliver already-processed + // messages into every newly attached per-turn handler. + const buffered = this.buffer.get(key); + if (buffered && buffered.length > 0) { + const kept: unknown[] = []; + for (const data of buffered) { + let consumed = false; + try { + consumed = handler(data) === true; + } catch { + // Never let a handler error break test state + } + if (!consumed) kept.push(data); + } + if (kept.length > 0) { + this.buffer.set(key, kept); + } else { + this.buffer.delete(key); + } + } return { off: () => { @@ -212,20 +227,20 @@ export class TestSessionStreamManager implements SessionStreamManager { /** * Push a record onto the given channel. * - * Dispatch rules — similar to the production manager, but with a tweak - * that makes unit tests deterministic: + * Dispatch rules — same as the production manager: + * + * 1. **A pending `.once` waiter consumes first.** Handlers still observe + * a copy. + * 2. **Otherwise handlers observe.** A handler that synchronously + * returns `true` consumes the record (kind-filtering facades do this + * for the kinds they own) — it is NOT buffered. + * 3. **Records no one consumed are buffered** for the next `.once` call + * or the next consuming `on()` attach. * - * 1. **Handlers always observe** (like production). A session-level `.on` - * is a filter-observer — it fires every time a record arrives, - * regardless of whether a `.once` waiter is also active. - * 2. **First waiter consumes** the record if present (like production). - * 3. **If no waiter, the record is buffered for the next `.once` call.** - * Production discards records that only match handlers — but in - * production the SSE tail introduces enough latency that the next - * `.once` is usually registered before the next record arrives. Tests - * send synchronously right after `turn-complete`, so without this - * buffer the next `waitWithIdleTimeout` would race and lose the - * message. The buffer is the only deviation from production semantics. + * Handler promises are awaited before resolving so test code can rely + * on async handler work having settled by the time `__sendFromTest` + * resolves. Consumption is decided on the synchronous return value, + * exactly like production. */ async __sendFromTest( sessionId: string, @@ -234,23 +249,6 @@ export class TestSessionStreamManager implements SessionStreamManager { ): Promise { const key = keyFor(sessionId, io); - const handlers = this.handlers.get(key); - if (handlers && handlers.size > 0) { - // Awaited so test code can rely on handlers having completed by the - // time `__sendFromTest` resolves. Wrapped per-handler so a - // throwing/rejecting handler doesn't poison Promise.all and break - // unrelated test state. - await Promise.all( - Array.from(handlers).map(async (h) => { - try { - await h(data); - } catch { - // Never let a handler error break test state - } - }) - ); - } - const waiters = this.onceWaiters.get(key); if (waiters && waiters.length > 0) { const w = waiters.shift()!; @@ -260,6 +258,27 @@ export class TestSessionStreamManager implements SessionStreamManager { w.signal.removeEventListener("abort", w.abortHandler); } w.resolve({ ok: true, output: data }); + await this.#invokeHandlers(key, data); + return; + } + + const consumed = await this.#invokeHandlers(key, data); + if (consumed) return; + + // Re-check waiters: handler invocation above is awaited (unlike the + // synchronous production dispatch), and the runtime commonly registers + // its next `once()` during that window — e.g. the turn loop reaching + // `waitWithIdleTimeout` while a handler settles. Without this second + // look the record would be buffered while the fresh waiter hangs. + const lateWaiters = this.onceWaiters.get(key); + if (lateWaiters && lateWaiters.length > 0) { + const w = lateWaiters.shift()!; + if (lateWaiters.length === 0) this.onceWaiters.delete(key); + if (w.timer) clearTimeout(w.timer); + if (w.signal && w.abortHandler) { + w.signal.removeEventListener("abort", w.abortHandler); + } + w.resolve({ ok: true, output: data }); return; } @@ -271,6 +290,34 @@ export class TestSessionStreamManager implements SessionStreamManager { buffered.push(data); } + /** + * Invoke all handlers; resolves once any returned promises settle. + * Returns true when any handler synchronously consumed the record. + * Wrapped per-handler so a throwing/rejecting handler doesn't poison + * Promise.all and break unrelated test state. + */ + async #invokeHandlers(key: string, data: unknown): Promise { + const handlers = this.handlers.get(key); + if (!handlers || handlers.size === 0) return false; + + let consumed = false; + await Promise.all( + Array.from(handlers).map(async (h) => { + try { + const result = h(data); + if (result === true) { + consumed = true; + return; + } + await result; + } catch { + // Never let a handler error break test state + } + }) + ); + return consumed; + } + /** * Immediately resolve every pending `once()` waiter for the given channel * with a timeout error. Simulates a closed stream (e.g. session closed). diff --git a/packages/trigger-sdk/src/v3/ai.ts b/packages/trigger-sdk/src/v3/ai.ts index 853cf6e69e4..5b448ed3336 100644 --- a/packages/trigger-sdk/src/v3/ai.ts +++ b/packages/trigger-sdk/src/v3/ai.ts @@ -1500,8 +1500,15 @@ const messagesInput: RealtimeDefinedInputStream = { on(handler) { return getChatSession().in.on((chunk) => { if (chunk.kind === "message") { - return handler(chunk.payload); + // Returning `true` marks the record CONSUMED at the manager level: + // it is neither buffered for a later `once()` nor re-delivered by + // the buffer drain when the next turn re-attaches its handler. + // Without this, a message arriving mid-stream was delivered twice + // and ran a duplicate turn. + void Promise.resolve(handler(chunk.payload)).catch(() => {}); + return true; } + return undefined; }); }, once(options) { @@ -1601,8 +1608,13 @@ const stopInput: RealtimeDefinedInputStream<{ stop: true; message?: string }> = on(handler) { return getChatSession().in.on((chunk) => { if (chunk.kind === "stop") { - return handler({ stop: true, message: chunk.message }); + // Consume stop records (see the messages facade above). A stop is + // only meaningful to the turn it interrupts — buffering it would + // let a stale stop abort a future turn. + void Promise.resolve(handler({ stop: true, message: chunk.message })).catch(() => {}); + return true; } + return undefined; }); }, once(options) { @@ -4240,6 +4252,13 @@ export type TurnCompleteEvent + > | undefined; try { await withChatWriter(async (writer) => { const errorText = @@ -7551,11 +7578,95 @@ function chatAgent< writer.write({ type: "error", errorText } as any); }); // Signal turn complete so the client knows this turn is done - await writeTurnCompleteChunk(currentWirePayload.chatId); + errorTurnCompleteResult = await writeTurnCompleteChunk(currentWirePayload.chatId); } catch { // Best-effort — if stream write fails, let the run continue anyway } + // The submit-message merge into the accumulator may not have run + // yet (a pre-run hook threw), so fold the wire message in for the + // error event + snapshot — the cursor has already advanced past it, + // so otherwise it survives in neither the snapshot nor the `.in` tail. + const erroredWireMessage = (currentWirePayload as { message?: TUIMessage }).message; + const erroredUIMessages = + erroredWireMessage && + !accumulatedUIMessages.some((m) => m.id === erroredWireMessage.id) + ? [...accumulatedUIMessages, erroredWireMessage] + : accumulatedUIMessages; + + // Fire onTurnComplete on the error path too — the docs promise it + // runs "after every turn, successful or errored" so customers can + // mark the turn failed. `responseMessage` is undefined/partial and + // `error` carries the thrown value. + if (onTurnComplete) { + try { + await tracer.startActiveSpan( + "onTurnComplete()", + async () => { + await onTurnComplete({ + ctx, + chatId: currentWirePayload.chatId, + messages: accumulatedMessages, + uiMessages: erroredUIMessages, + newMessages: [], + newUIMessages: erroredWireMessage ? [erroredWireMessage] : [], + responseMessage: undefined, + rawResponseMessage: undefined, + turn, + runId: ctx.run.id, + chatAccessToken: "", + // Parsed `clientData` isn't reliably in scope here (parsing + // may itself be the failure), and the raw metadata is the + // wrong shape — leave it undefined on the error path. + clientData: undefined, + stopped: false, + continuation, + previousRunId, + preloaded, + totalUsage: cumulativeUsage, + finishReason: "error", + error: turnError, + lastEventId: errorTurnCompleteResult?.lastEventId, + }); + }, + { + attributes: { + [SemanticInternalAttributes.STYLE_ICON]: "task-hook-onComplete", + [SemanticInternalAttributes.COLLAPSED]: true, + "chat.id": currentWirePayload.chatId, + "chat.turn": turn + 1, + "chat.errored": true, + }, + } + ); + } catch { + // A throwing onTurnComplete on the error path must not crash + // the run — keep the conversation alive for the next message. + } + } + + // Persist a snapshot so the failed turn's user message isn't + // stranded. `writeTurnCompleteChunk` already advanced the `.in` + // cursor past it (via the session-in-event-id header), and the + // success-path snapshot write is skipped on error — without this + // the next boot would resume past a message that exists in + // neither the snapshot nor the replayable `.in` tail. + if (!hydrateMessages) { + try { + await writeChatSnapshot(sessionIdForSnapshot, { + version: 1, + savedAt: Date.now(), + messages: erroredUIMessages, + lastOutEventId: errorTurnCompleteResult?.lastEventId, + }); + } catch (error) { + logger.warn("chat.agent: error-path snapshot write failed", { + error: error instanceof Error ? error.message : String(error), + sessionId: sessionIdForSnapshot, + }); + } + } + // chat.requestUpgrade() / chat.endRun() — exit after error turn too if ( locals.get(chatUpgradeRequestedKey) || @@ -9518,7 +9629,8 @@ function createChatStartSessionAction( // run-list filter by chat works without the customer having to wire it // up. Mirrors the browser-mediated `TriggerChatTransport.doStart` path. const userTags = params.triggerConfig?.tags ?? options?.triggerConfig?.tags ?? []; - const tags = [`chat:${params.chatId}`, ...userTags].slice(0, 5); + // Platform cap is 10 tags per run; the auto chat tag takes one slot. + const tags = [`chat:${params.chatId}`, ...userTags].slice(0, 10); const clientDataMetadata = params.clientData !== undefined ? { metadata: params.clientData } : {}; @@ -9860,8 +9972,19 @@ async function writeTurnCompleteChunk( // 2. Trim back to the previous turn-complete, if we have one. Skipping on // first-turn-ever (or first turn post-OOM without a snapshot seed) is // fine — the chain catches up next turn. - const slot = locals.get(lastTurnCompleteSeqNumKey); - const prev = slot?.value; + // + // Lazily create the slot if a caller reached here without one (a plain + // `task()` driving `chat.createSession` / `chat.writeTurnComplete`, vs. + // chatAgent/chatCustomAgent which seed it at boot). The first call then + // does no trim (nothing before it) and records its seq; later calls trim + // — so `.out` is bounded for every writeTurnComplete caller, not just the + // built-in agents. + let slot = locals.get(lastTurnCompleteSeqNumKey); + if (!slot) { + slot = { value: undefined }; + locals.set(lastTurnCompleteSeqNumKey, slot); + } + const prev = slot.value; if (slot && prev !== undefined) { try { await session.out.trimTo(prev); diff --git a/packages/trigger-sdk/src/v3/chat-client.ts b/packages/trigger-sdk/src/v3/chat-client.ts index 0bbdddcfbea..92402948340 100644 --- a/packages/trigger-sdk/src/v3/chat-client.ts +++ b/packages/trigger-sdk/src/v3/chat-client.ts @@ -616,6 +616,9 @@ export class AgentChat { "Content-Type": "application/json", Authorization: `Bearer ${accessToken}`, "x-trigger-source": "sdk", + // Idempotency key: the server skips appends whose part id it has + // already committed, so a retried POST can't duplicate the record. + "X-Part-Id": crypto.randomUUID(), }; const response = await this.doFetch(ctx, url, { method: "POST", headers, body }); if (!response.ok) { diff --git a/packages/trigger-sdk/src/v3/chat-react.ts b/packages/trigger-sdk/src/v3/chat-react.ts index b14ac825bcb..495e235083d 100644 --- a/packages/trigger-sdk/src/v3/chat-react.ts +++ b/packages/trigger-sdk/src/v3/chat-react.ts @@ -299,6 +299,8 @@ export function usePendingMessages( // Internal state: track messages with their mode type InternalMessage = TUIMessage & { _mode: "steering" | "queued" }; const [pendingMsgs, setPendingMsgs] = useState([]); + const pendingMsgsRef = useRef(pendingMsgs); + pendingMsgsRef.current = pendingMsgs; const injectedIdsRef = useRef>(new Set()); const prevStatusRef = useRef(status); @@ -400,14 +402,18 @@ export function usePendingMessages( if (promotedIdsRef.current.has(id)) { return; } + + // Read from the ref, send OUTSIDE the state updater. React may invoke + // updaters more than once (StrictMode, update rebasing), so a network + // call inside one can double-send. + const msg = pendingMsgsRef.current.find((m) => m.id === id); + if (!msg || msg._mode !== "queued") return; promotedIdsRef.current.add(id); + transport.sendPendingMessage(chatId, msg, metadata); - setPendingMsgs((prev) => { - const msg = prev.find((m) => m.id === id); - if (!msg || msg._mode !== "queued") return prev; - transport.sendPendingMessage(chatId, msg, metadata); - return prev.map((m) => (m.id === id ? { ...m, _mode: "steering" as const } : m)); - }); + setPendingMsgs((prev) => + prev.map((m) => (m.id === id ? { ...m, _mode: "steering" as const } : m)) + ); }, [transport, chatId, metadata] ); diff --git a/packages/trigger-sdk/src/v3/chat.test.ts b/packages/trigger-sdk/src/v3/chat.test.ts index 6469f1ac86c..e4fc45eae4b 100644 --- a/packages/trigger-sdk/src/v3/chat.test.ts +++ b/packages/trigger-sdk/src/v3/chat.test.ts @@ -1005,6 +1005,65 @@ describe("TriggerChatTransport", () => { expect(actionBody.payload.trigger).toBe("action"); expect(actionBody.payload.action).toEqual({ type: "undo" }); }); + + it("marks the session streaming and notifies before subscribing", async () => { + global.fetch = vi.fn().mockImplementation(async (url: string | URL) => { + const urlStr = typeof url === "string" ? url : url.toString(); + if (isSessionStreamAppendUrl(urlStr)) return defaultAppendResponse(); + if (isSessionOutSubscribeUrl(urlStr)) return defaultSseResponse(); + throw new Error(`Unexpected URL: ${urlStr}`); + }); + + const onSessionChange = vi.fn(); + const transport = new TriggerChatTransport({ + task: "my-chat-task", + accessToken: () => "pat", + onSessionChange, + sessions: { "chat-act-stream": { publicAccessToken: "p" } }, + }); + + const stream = await transport.sendAction("chat-act-stream", { type: "undo" }); + // isStreaming:true must be observed during the action — otherwise a reload + // mid-action sees a persisted isStreaming:false and never resumes. + expect( + onSessionChange.mock.calls.some(([, session]) => session && session.isStreaming === true) + ).toBe(true); + await drainChunks(stream); + }); + }); + + describe("append idempotency header", () => { + it("the per-append X-Part-Id wins over a transport-wide headers override", async () => { + let appendPartId: string | undefined; + global.fetch = vi.fn().mockImplementation(async (url: string | URL, init?: RequestInit) => { + const urlStr = typeof url === "string" ? url : url.toString(); + if (isSessionStreamAppendUrl(urlStr)) { + appendPartId = (init!.headers as Record)["X-Part-Id"]; + return defaultAppendResponse(); + } + if (isSessionOutSubscribeUrl(urlStr)) return defaultSseResponse(); + throw new Error(`Unexpected URL: ${urlStr}`); + }); + + const transport = new TriggerChatTransport({ + task: "my-chat-task", + accessToken: () => "pat", + headers: { "X-Part-Id": "STATIC-OVERRIDE" }, + sessions: { "chat-hdr": { publicAccessToken: "p" } }, + }); + + const stream = await transport.sendMessages({ + trigger: "submit-message", + chatId: "chat-hdr", + messageId: undefined, + messages: [createUserMessage("hi")], + abortSignal: undefined, + }); + await drainChunks(stream); + + expect(appendPartId).toBeDefined(); + expect(appendPartId).not.toBe("STATIC-OVERRIDE"); + }); }); describe("reconnectToStream", () => { diff --git a/packages/trigger-sdk/src/v3/chat.ts b/packages/trigger-sdk/src/v3/chat.ts index 74746744c7e..e0d37ad7d47 100644 --- a/packages/trigger-sdk/src/v3/chat.ts +++ b/packages/trigger-sdk/src/v3/chat.ts @@ -615,11 +615,15 @@ export class TriggerChatTransport implements ChatTransport { const state = await this.ensureSessionState(chatId); + // Generated outside the closure so auth-retries reuse the same part id + // and the server-side dedupe sees one logical append. + const partId = crypto.randomUUID(); const sendChatMessage = async (token: string) => { await this.appendInputChunk( chatId, token, - this.serializeInputChunk({ kind: "message", payload: wirePayload }) + this.serializeInputChunk({ kind: "message", payload: wirePayload }), + partId ); }; @@ -800,11 +804,13 @@ export class TriggerChatTransport implements ChatTransport { metadata: mergedMetadata, }; + const partId = crypto.randomUUID(); const send = async (token: string) => { await this.appendInputChunk( chatId, token, - this.serializeInputChunk({ kind: "message", payload: wirePayload }) + this.serializeInputChunk({ kind: "message", payload: wirePayload }), + partId ); }; @@ -859,8 +865,14 @@ export class TriggerChatTransport implements ChatTransport { const state = this.sessions.get(chatId); if (!state) return false; + const partId = crypto.randomUUID(); const send = async (token: string) => { - await this.appendInputChunk(chatId, token, this.serializeInputChunk({ kind: "stop" })); + await this.appendInputChunk( + chatId, + token, + this.serializeInputChunk({ kind: "stop" }), + partId + ); }; try { @@ -876,6 +888,13 @@ export class TriggerChatTransport implements ChatTransport { activeStream.abort(); this.activeStreams.delete(chatId); } + + // The turn won't reach its turn-complete on this client (we just + // aborted the reader), so clear the streaming flag here and persist — + // otherwise a reload resumes mid-turn and replays the chunks the user + // explicitly stopped. + state.isStreaming = false; + this.notifySessionChange(chatId, state); return true; }; @@ -908,12 +927,28 @@ export class TriggerChatTransport implements ChatTransport { }; const body = this.serializeInputChunk({ kind: "message", payload: wirePayload }); + const partId = crypto.randomUUID(); const send = async (token: string) => { - await this.appendInputChunk(chatId, token, body); + await this.appendInputChunk(chatId, token, body, partId); }; await this.callWithAuthRetry(chatId, state, send); + // Supersede any in-flight reader before subscribing — same as + // `sendMessages`. Two concurrent readers both write `state.lastEventId` + // and the slower one can regress the cursor, replaying records on the + // next reconnect. + const activeStream = this.activeStreams.get(chatId); + if (activeStream) { + activeStream.abort(); + this.activeStreams.delete(chatId); + } + + // Mark streaming + persist so a reload mid-action resumes (reconnectToStream + // no-ops when the persisted session says isStreaming: false). + state.isStreaming = true; + this.notifySessionChange(chatId, state); + return this.subscribeToSessionStream(state, undefined, chatId); }; @@ -1085,14 +1120,22 @@ export class TriggerChatTransport implements ChatTransport { return this.fetchOverride ? this.fetchOverride(url, init, ctx) : fetch(url, init); } - private async appendInputChunk(chatId: string, token: string, body: string): Promise { + private async appendInputChunk( + chatId: string, + token: string, + body: string, + partId?: string + ): Promise { const ctx: ChatTransportEndpointContext = { endpoint: "in", chatId }; const url = `${this.resolveBaseURL(ctx)}/realtime/v1/sessions/${encodeURIComponent(chatId)}/in/append`; + // extraHeaders first so the fixed headers below win — a transport-wide + // X-Part-Id must not override the per-append idempotency key. const headers: Record = { + ...this.extraHeaders, "Content-Type": "application/json", Authorization: `Bearer ${token}`, "x-trigger-source": "sdk", - ...this.extraHeaders, + "X-Part-Id": partId ?? crypto.randomUUID(), }; const response = await this.doFetch(ctx, url, { method: "POST", headers, body }); if (!response.ok) { diff --git a/packages/trigger-sdk/src/v3/sessions.ts b/packages/trigger-sdk/src/v3/sessions.ts index ea3ebd8d937..fc47a037914 100644 --- a/packages/trigger-sdk/src/v3/sessions.ts +++ b/packages/trigger-sdk/src/v3/sessions.ts @@ -626,15 +626,19 @@ export class SessionInputChannel { /** * Register a handler that fires for every record landing on `.in`. - * Handlers are flushed with any buffered records on attach and cleaned - * up automatically when the task run completes. Returns `{ off }` to - * unsubscribe early. + * Buffered records are offered to the handler on attach, and handlers + * are cleaned up automatically when the task run completes. Returns + * `{ off }` to unsubscribe early. + * + * A handler that synchronously returns `true` CONSUMES the record: it + * won't be buffered for a later `once()` and won't be re-delivered on a + * future `on()` attach. Plain observers should return nothing. */ - on(handler: (data: T) => void | Promise): { off: () => void } { + on(handler: (data: T) => void | boolean | Promise): { off: () => void } { return sessionStreams.on( this.sessionId, "in", - handler as (data: unknown) => void | Promise + handler as (data: unknown) => void | boolean | Promise ); } diff --git a/packages/trigger-sdk/test/mockChatAgent.test.ts b/packages/trigger-sdk/test/mockChatAgent.test.ts index 7245622b6e4..5038f21ca24 100644 --- a/packages/trigger-sdk/test/mockChatAgent.test.ts +++ b/packages/trigger-sdk/test/mockChatAgent.test.ts @@ -1140,6 +1140,63 @@ describe("mockChatAgent", () => { } }); + it("error turn: onTurnComplete fires with the error and the failed message is snapshotted", async () => { + const onTurnComplete = vi.fn(); + const agent = chat.agent({ + id: "mockChatAgent.error-turn-run", + onTurnComplete, + run: async () => { + throw new Error("boom in run"); + }, + }); + const harness = mockChatAgent(agent, { chatId: "err-run" }); + try { + await harness.sendMessage(userMessage("hello", "u-err-run")); + await new Promise((r) => setTimeout(r, 50)); + expect(onTurnComplete).toHaveBeenCalledTimes(1); + const evt = onTurnComplete.mock.calls[0]![0]; + expect(evt.error).toBeInstanceOf(Error); + expect(evt.finishReason).toBe("error"); + expect(evt.responseMessage).toBeUndefined(); + expect(evt.uiMessages.some((m: any) => m.id === "u-err-run")).toBe(true); + const snap = harness.getSnapshot(); + expect(snap?.messages.some((m) => m.id === "u-err-run")).toBe(true); + } finally { + await harness.close(); + } + }); + + it("error turn: a pre-merge hook throw still snapshots the failed user message", async () => { + const model = new MockLanguageModelV3({ + doStream: async () => ({ stream: textStream("never reached") }), + }); + const onTurnComplete = vi.fn(); + const agent = chat.agent({ + id: "mockChatAgent.error-turn-prehook", + // onValidateMessages fires BEFORE the wire message is merged into the + // accumulator, so the message lands in the snapshot only because the + // error path folds the wire message back in. + onValidateMessages: async () => { + throw new Error("boom in validate"); + }, + onTurnComplete, + run: async ({ messages, signal }) => streamText({ model, messages, abortSignal: signal }), + }); + const harness = mockChatAgent(agent, { chatId: "err-prehook" }); + try { + await harness.sendMessage(userMessage("validate me", "u-err-prehook")); + await new Promise((r) => setTimeout(r, 50)); + expect(onTurnComplete).toHaveBeenCalledTimes(1); + const evt = onTurnComplete.mock.calls[0]![0]; + expect(evt.error).toBeInstanceOf(Error); + expect(evt.uiMessages.some((m: any) => m.id === "u-err-prehook")).toBe(true); + const snap = harness.getSnapshot(); + expect(snap?.messages.some((m) => m.id === "u-err-prehook")).toBe(true); + } finally { + await harness.close(); + } + }); + it("seeds locals before run() via setupLocals (DI pattern)", async () => { type FakeDb = { findUser(id: string): Promise<{ id: string; name: string }> }; const dbKey = locals.create("test-db");