From d5b8cb8ddf9234ccf1e5c26923c610e96e349ef5 Mon Sep 17 00:00:00 2001 From: Daniel Sutton Date: Wed, 10 Jun 2026 12:26:15 +0100 Subject: [PATCH 01/16] refactor(run-engine): split completeWaitpoint into tx-able mutation and post-commit side effects Co-Authored-By: Claude Fable 5 --- .../src/engine/systems/waitpointSystem.ts | 61 +++++++++++++++++-- 1 file changed, 55 insertions(+), 6 deletions(-) diff --git a/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts b/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts index 8b8d4f82fcf..7b6f91649db 100644 --- a/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts @@ -39,6 +39,15 @@ export type WaitpointContinuationResult = waitpoints: Array; }; +export type CompletedWaitpointMutationResult = { + waitpoint: Waitpoint; + affectedTaskRuns: { + taskRunId: string; + spanIdToComplete: string | null; + createdAt: Date; + }[]; +}; + export class WaitpointSystem { private readonly $: SystemResources; private readonly executionSnapshotSystem: ExecutionSnapshotSystem; @@ -80,9 +89,34 @@ export class WaitpointSystem { isError: boolean; }; }): Promise { + const result = await this.completeWaitpointMutation({ id, output }); + await this.scheduleWaitpointContinuations({ ...result, output }); + return result.waitpoint; + } + + /** Marks the waitpoint COMPLETED and returns the runs it was blocking. + * Pure Postgres mutation — safe to run inside a transaction via `tx`. + * Callers passing `tx` MUST call scheduleWaitpointContinuations() with the + * result AFTER the surrounding transaction commits, otherwise blocked runs + * are never continued. */ + async completeWaitpointMutation({ + id, + output, + tx, + }: { + id: string; + output?: { + value: string; + type?: string; + isError: boolean; + }; + tx?: PrismaClientOrTransaction; + }): Promise { + const prisma = tx ?? this.$.prisma; + // 1. Complete the Waitpoint (if not completed) const [updateError, updateResult] = await tryCatch( - this.$.prisma.waitpoint.updateMany({ + prisma.waitpoint.updateMany({ where: { id, status: "PENDING" }, data: { status: "COMPLETED", @@ -106,7 +140,7 @@ export class WaitpointSystem { ); } - const waitpoint = await this.$.prisma.waitpoint.findFirst({ + const waitpoint = await prisma.waitpoint.findFirst({ where: { id }, }); @@ -123,7 +157,7 @@ export class WaitpointSystem { } // 2. Find the TaskRuns blocked by this waitpoint - const affectedTaskRuns = await this.$.prisma.taskRunWaitpoint.findMany({ + const affectedTaskRuns = await prisma.taskRunWaitpoint.findMany({ where: { waitpointId: id }, select: { taskRunId: true, spanIdToComplete: true, createdAt: true }, }); @@ -134,6 +168,23 @@ export class WaitpointSystem { }); } + return { waitpoint, affectedTaskRuns }; + } + + /** Post-commit side effects of completing a waitpoint: schedules continuation of + * the blocked runs and emits events. Must be called AFTER the mutation committed — + * never from inside a transaction. */ + async scheduleWaitpointContinuations({ + waitpoint, + affectedTaskRuns, + output, + }: CompletedWaitpointMutationResult & { + output?: { + value: string; + type?: string; + isError: boolean; + }; + }): Promise { // 3. Schedule trying to continue the runs for (const run of affectedTaskRuns) { const jobId = `continueRunIfUnblocked:${run.taskRunId}`; @@ -141,7 +192,7 @@ export class WaitpointSystem { const availableAt = new Date(Date.now() + 50); this.$.logger.debug(`completeWaitpoint: enqueueing continueRunIfUnblocked`, { - waitpointId: id, + waitpointId: waitpoint.id, runId: run.taskRunId, jobId, availableAt, @@ -169,8 +220,6 @@ export class WaitpointSystem { }); } } - - return waitpoint; } /** From 67f70bc804470f3d1efee7ad8871af160d1657a0 Mon Sep 17 00:00:00 2001 From: Daniel Sutton Date: Wed, 10 Jun 2026 12:33:15 +0100 Subject: [PATCH 02/16] refactor(run-engine): carry output through CompletedWaitpointMutationResult Co-Authored-By: Claude Fable 5 --- .../src/engine/systems/waitpointSystem.ts | 35 ++++++++----------- 1 file changed, 14 insertions(+), 21 deletions(-) diff --git a/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts b/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts index 7b6f91649db..174d86d9704 100644 --- a/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts @@ -39,6 +39,12 @@ export type WaitpointContinuationResult = waitpoints: Array; }; +export type WaitpointOutput = { + value: string; + type?: string; + isError: boolean; +}; + export type CompletedWaitpointMutationResult = { waitpoint: Waitpoint; affectedTaskRuns: { @@ -46,6 +52,7 @@ export type CompletedWaitpointMutationResult = { spanIdToComplete: string | null; createdAt: Date; }[]; + output?: WaitpointOutput; }; export class WaitpointSystem { @@ -83,20 +90,16 @@ export class WaitpointSystem { output, }: { id: string; - output?: { - value: string; - type?: string; - isError: boolean; - }; + output?: WaitpointOutput; }): Promise { const result = await this.completeWaitpointMutation({ id, output }); - await this.scheduleWaitpointContinuations({ ...result, output }); + await this.scheduleWaitpointContinuations(result); return result.waitpoint; } /** Marks the waitpoint COMPLETED and returns the runs it was blocking. * Pure Postgres mutation — safe to run inside a transaction via `tx`. - * Callers passing `tx` MUST call scheduleWaitpointContinuations() with the + * Direct callers MUST call scheduleWaitpointContinuations() with the * result AFTER the surrounding transaction commits, otherwise blocked runs * are never continued. */ async completeWaitpointMutation({ @@ -105,11 +108,7 @@ export class WaitpointSystem { tx, }: { id: string; - output?: { - value: string; - type?: string; - isError: boolean; - }; + output?: WaitpointOutput; tx?: PrismaClientOrTransaction; }): Promise { const prisma = tx ?? this.$.prisma; @@ -168,7 +167,7 @@ export class WaitpointSystem { }); } - return { waitpoint, affectedTaskRuns }; + return { waitpoint, affectedTaskRuns, output }; } /** Post-commit side effects of completing a waitpoint: schedules continuation of @@ -178,14 +177,8 @@ export class WaitpointSystem { waitpoint, affectedTaskRuns, output, - }: CompletedWaitpointMutationResult & { - output?: { - value: string; - type?: string; - isError: boolean; - }; - }): Promise { - // 3. Schedule trying to continue the runs + }: CompletedWaitpointMutationResult): Promise { + // Schedule trying to continue the runs for (const run of affectedTaskRuns) { const jobId = `continueRunIfUnblocked:${run.taskRunId}`; //50ms in the future From d3bfa0ce85e4facf806622426f83c52491f99d5d Mon Sep 17 00:00:00 2001 From: Daniel Sutton Date: Wed, 10 Jun 2026 12:37:29 +0100 Subject: [PATCH 03/16] test(run-engine): add failing tests for atomic single-commit run completion Co-Authored-By: Claude Fable 5 --- .../src/engine/tests/atomicCompletion.test.ts | 308 ++++++++++++++++++ 1 file changed, 308 insertions(+) create mode 100644 internal-packages/run-engine/src/engine/tests/atomicCompletion.test.ts diff --git a/internal-packages/run-engine/src/engine/tests/atomicCompletion.test.ts b/internal-packages/run-engine/src/engine/tests/atomicCompletion.test.ts new file mode 100644 index 00000000000..ca61e1519d0 --- /dev/null +++ b/internal-packages/run-engine/src/engine/tests/atomicCompletion.test.ts @@ -0,0 +1,308 @@ +import { assertNonNullable, containerTest } from "@internal/testcontainers"; +import { trace } from "@internal/tracing"; +import { describe, expect, vi } from "vitest"; +import { setTimeout } from "node:timers/promises"; +import { PrismaClient } from "@trigger.dev/database"; +import { RunEngine } from "../index.js"; +import { setupAuthenticatedEnvironment, setupBackgroundWorker } from "./setup.js"; + +vi.setConfig({ testTimeout: 60_000 }); + +function createEngine(prisma: PrismaClient, redisOptions: any) { + return new RunEngine({ + prisma, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + }, + queue: { + redis: redisOptions, + masterQueueConsumersDisabled: true, + processWorkerQueueDebounceMs: 50, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0001, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); +} + +/** Number of xid-consuming (i.e. write) transactions is the delta between two + * txid_current() calls, minus 1 for the second call itself. Read-only + * transactions never allocate an xid, so they don't count. */ +async function currentTxid(prisma: PrismaClient): Promise { + const rows = await prisma.$queryRaw<{ txid: bigint }[]>`SELECT txid_current() AS txid`; + return BigInt(rows[0].txid); +} + +/** Drives a parent run + triggerAndWait child up to the point where the child + * is EXECUTING and the parent is blocked on the child's associated waitpoint. */ +async function setupBlockedParentWithExecutingChild( + engine: RunEngine, + prisma: PrismaClient, + authenticatedEnvironment: Awaited> +) { + const parentTask = "parent-task"; + const childTask = "child-task"; + + await setupBackgroundWorker(engine, authenticatedEnvironment, [parentTask, childTask]); + + const parentRun = await engine.trigger( + { + number: 1, + friendlyId: "run_p1234", + environment: authenticatedEnvironment, + taskIdentifier: parentTask, + payload: "{}", + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "t12345", + spanId: "s12345", + queue: `task/${parentTask}`, + isTest: false, + tags: [], + workerQueue: "main", + }, + prisma + ); + + await setTimeout(500); + const dequeuedParent = await engine.dequeueFromWorkerQueue({ + consumerId: "test_12345", + workerQueue: "main", + }); + expect(dequeuedParent.length).toBe(1); + + await engine.startRunAttempt({ + runId: parentRun.id, + snapshotId: dequeuedParent[0].snapshot.id, + }); + + const childRun = await engine.trigger( + { + number: 1, + friendlyId: "run_c1234", + environment: authenticatedEnvironment, + taskIdentifier: childTask, + payload: "{}", + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "t12345", + spanId: "s12345", + queue: `task/${childTask}`, + isTest: false, + tags: [], + resumeParentOnCompletion: true, + parentTaskRunId: parentRun.id, + workerQueue: "main", + }, + prisma + ); + + await setTimeout(500); + const dequeuedChild = await engine.dequeueFromWorkerQueue({ + consumerId: "test_12345", + workerQueue: "main", + }); + expect(dequeuedChild.length).toBe(1); + + const childAttempt = await engine.startRunAttempt({ + runId: childRun.id, + snapshotId: dequeuedChild[0].snapshot.id, + }); + + const childWithWaitpoint = await prisma.taskRun.findFirstOrThrow({ + where: { id: childRun.id }, + include: { associatedWaitpoint: true }, + }); + assertNonNullable(childWithWaitpoint.associatedWaitpoint); + + return { + parentRun, + childRun, + childAttempt, + waitpointId: childWithWaitpoint.associatedWaitpoint.id, + }; +} + +describe("RunEngine atomic completion", () => { + containerTest( + "attemptSucceeded with an associated waitpoint is a single write transaction", + async ({ prisma, redisOptions }) => { + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + const engine = createEngine(prisma, redisOptions); + + try { + const { childRun, childAttempt } = await setupBlockedParentWithExecutingChild( + engine, + prisma, + authenticatedEnvironment + ); + + const before = await currentTxid(prisma); + + const result = await engine.completeRunAttempt({ + runId: childRun.id, + snapshotId: childAttempt.snapshot.id, + completion: { + ok: true, + id: childRun.id, + output: `{"foo":"bar"}`, + outputType: "application/json", + }, + }); + + const after = await currentTxid(prisma); + + expect(result.attemptStatus).toBe("RUN_FINISHED"); + + // TaskRun update (+ nested snapshot) and waitpoint completion + // must share one commit. + const writeTransactions = Number(after - before) - 1; + expect(writeTransactions).toBe(1); + } finally { + await engine.quit(); + } + } + ); + + containerTest( + "continueRunIfUnblocked is a single write transaction", + async ({ prisma, redisOptions }) => { + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + const engine = createEngine(prisma, redisOptions); + + try { + const { parentRun, childRun, childAttempt } = await setupBlockedParentWithExecutingChild( + engine, + prisma, + authenticatedEnvironment + ); + + await engine.completeRunAttempt({ + runId: childRun.id, + snapshotId: childAttempt.snapshot.id, + completion: { + ok: true, + id: childRun.id, + output: `{"foo":"bar"}`, + outputType: "application/json", + }, + }); + + const before = await currentTxid(prisma); + + // The continueRunIfUnblocked job is debounced by 50ms; poll until the + // parent has been continued (EXECUTING_WITH_WAITPOINTS -> EXECUTING). + let continued = false; + for (let i = 0; i < 50; i++) { + await setTimeout(100); + const parentData = await engine.getRunExecutionData({ runId: parentRun.id }); + if (parentData?.snapshot.executionStatus === "EXECUTING") { + continued = true; + break; + } + } + expect(continued).toBe(true); + + const after = await currentTxid(prisma); + + // Snapshot insert and TaskRunWaitpoint delete must share one commit. + const writeTransactions = Number(after - before) - 1; + expect(writeTransactions).toBe(1); + + const remainingBlockers = await prisma.taskRunWaitpoint.findMany({ + where: { taskRunId: parentRun.id }, + }); + expect(remainingBlockers.length).toBe(0); + } finally { + await engine.quit(); + } + } + ); + + containerTest( + "a failed completion rolls back the whole transition", + async ({ prisma, redisOptions }) => { + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + const engine = createEngine(prisma, redisOptions); + + try { + const { childRun, childAttempt, waitpointId } = await setupBlockedParentWithExecutingChild( + engine, + prisma, + authenticatedEnvironment + ); + + // Real fault injection, no mocks: hold a row lock on the child's + // associated waitpoint so the waitpoint completion inside + // attemptSucceeded blocks past the transaction timeout (5s default) + // and the whole transaction aborts. + const lockHolder = prisma.$transaction( + async (tx) => { + await tx.$queryRaw`SELECT "id" FROM "Waitpoint" WHERE "id" = ${waitpointId} FOR UPDATE`; + await setTimeout(8_000); + }, + { timeout: 20_000, maxWait: 5_000 } + ); + await setTimeout(300); // let the lock land + + let completionError: unknown; + try { + await engine.completeRunAttempt({ + runId: childRun.id, + snapshotId: childAttempt.snapshot.id, + completion: { + ok: true, + id: childRun.id, + output: `{"foo":"bar"}`, + outputType: "application/json", + }, + }); + } catch (error) { + completionError = error; + } + + await lockHolder; + + // The completion must have failed while the lock was held... + expect(completionError).toBeDefined(); + + // ...and NOTHING from the transition may have been committed: + // run still EXECUTING, waitpoint still PENDING, no FINISHED snapshot. + const childAfter = await prisma.taskRun.findFirstOrThrow({ + where: { id: childRun.id }, + }); + expect(childAfter.status).toBe("EXECUTING"); + + const waitpointAfter = await prisma.waitpoint.findFirstOrThrow({ + where: { id: waitpointId }, + }); + expect(waitpointAfter.status).toBe("PENDING"); + + const finishedSnapshots = await prisma.taskRunExecutionSnapshot.findMany({ + where: { runId: childRun.id, executionStatus: "FINISHED" }, + }); + expect(finishedSnapshots.length).toBe(0); + } finally { + await engine.quit(); + } + } + ); +}); From a8bd676995abb93d79bfe533f40a2852ddf39ab7 Mon Sep 17 00:00:00 2001 From: Daniel Sutton Date: Wed, 10 Jun 2026 12:48:07 +0100 Subject: [PATCH 04/16] test(run-engine): make atomic-completion assertions race-free (xmin equality + pre-completion bracketing) Co-Authored-By: Claude Fable 5 --- .../src/engine/tests/atomicCompletion.test.ts | 54 +++++++++++++------ 1 file changed, 37 insertions(+), 17 deletions(-) diff --git a/internal-packages/run-engine/src/engine/tests/atomicCompletion.test.ts b/internal-packages/run-engine/src/engine/tests/atomicCompletion.test.ts index ca61e1519d0..4fd416b1455 100644 --- a/internal-packages/run-engine/src/engine/tests/atomicCompletion.test.ts +++ b/internal-packages/run-engine/src/engine/tests/atomicCompletion.test.ts @@ -3,12 +3,13 @@ import { trace } from "@internal/tracing"; import { describe, expect, vi } from "vitest"; import { setTimeout } from "node:timers/promises"; import { PrismaClient } from "@trigger.dev/database"; +import { RedisOptions } from "@internal/redis"; import { RunEngine } from "../index.js"; import { setupAuthenticatedEnvironment, setupBackgroundWorker } from "./setup.js"; vi.setConfig({ testTimeout: 60_000 }); -function createEngine(prisma: PrismaClient, redisOptions: any) { +function createEngine(prisma: PrismaClient, redisOptions: RedisOptions) { return new RunEngine({ prisma, worker: { @@ -149,14 +150,12 @@ describe("RunEngine atomic completion", () => { const engine = createEngine(prisma, redisOptions); try { - const { childRun, childAttempt } = await setupBlockedParentWithExecutingChild( + const { childRun, childAttempt, waitpointId } = await setupBlockedParentWithExecutingChild( engine, prisma, authenticatedEnvironment ); - const before = await currentTxid(prisma); - const result = await engine.completeRunAttempt({ runId: childRun.id, snapshotId: childAttempt.snapshot.id, @@ -168,14 +167,22 @@ describe("RunEngine atomic completion", () => { }, }); - const after = await currentTxid(prisma); - expect(result.attemptStatus).toBe("RUN_FINISHED"); - // TaskRun update (+ nested snapshot) and waitpoint completion - // must share one commit. - const writeTransactions = Number(after - before) - 1; - expect(writeTransactions).toBe(1); + // Equal xmin on all three rows proves they were written (last-updated) + // by the same transaction — a timing-window-free proof that the + // TaskRun update, the FINISHED snapshot insert, and the waitpoint + // completion all share one commit. + const rows = await prisma.$queryRaw<{ source: string; xmin: string }[]>` + SELECT 'run' AS source, xmin::text FROM "TaskRun" WHERE id = ${childRun.id} + UNION ALL + SELECT 'snapshot' AS source, xmin::text FROM "TaskRunExecutionSnapshot" + WHERE "runId" = ${childRun.id} AND "executionStatus"::text = 'FINISHED' + UNION ALL + SELECT 'waitpoint' AS source, xmin::text FROM "Waitpoint" WHERE id = ${waitpointId} + `; + expect(rows.length, JSON.stringify(rows)).toBe(3); + expect(new Set(rows.map((r) => r.xmin)).size, JSON.stringify(rows)).toBe(1); } finally { await engine.quit(); } @@ -195,6 +202,12 @@ describe("RunEngine atomic completion", () => { authenticatedEnvironment ); + // Take `before` BEFORE completeRunAttempt so the bracket covers both + // the completion commit and the subsequent continueRunIfUnblocked commit, + // eliminating the race where the continuation job could commit inside + // a tighter window taken after completeRunAttempt returns. + const before = await currentTxid(prisma); + await engine.completeRunAttempt({ runId: childRun.id, snapshotId: childAttempt.snapshot.id, @@ -206,26 +219,29 @@ describe("RunEngine atomic completion", () => { }, }); - const before = await currentTxid(prisma); - // The continueRunIfUnblocked job is debounced by 50ms; poll until the // parent has been continued (EXECUTING_WITH_WAITPOINTS -> EXECUTING). let continued = false; + let lastStatus: string | undefined; for (let i = 0; i < 50; i++) { await setTimeout(100); const parentData = await engine.getRunExecutionData({ runId: parentRun.id }); - if (parentData?.snapshot.executionStatus === "EXECUTING") { + lastStatus = parentData?.snapshot.executionStatus; + if (lastStatus === "EXECUTING") { continued = true; break; } } - expect(continued).toBe(true); + expect(continued, `parent never continued, last status: ${lastStatus}`).toBe(true); const after = await currentTxid(prisma); - // Snapshot insert and TaskRunWaitpoint delete must share one commit. + // completion (1 commit) + continuation (1 commit); today this is 4 (2 + 2). + // Note: txid_current() is cluster-global — autovacuum could in principle + // add an xid between the two reads, accepted residual risk; xmin equality + // cannot witness the TaskRunWaitpoint DELETE so we keep the delta check here. const writeTransactions = Number(after - before) - 1; - expect(writeTransactions).toBe(1); + expect(writeTransactions).toBe(2); const remainingBlockers = await prisma.taskRunWaitpoint.findMany({ where: { taskRunId: parentRun.id }, @@ -254,6 +270,10 @@ describe("RunEngine atomic completion", () => { // associated waitpoint so the waitpoint completion inside // attemptSucceeded blocks past the transaction timeout (5s default) // and the whole transaction aborts. + // NOTE: this test relies on the implementation's transaction timeout + // (Prisma default 5s) being well below the 8s lock hold time. If the + // implementation ever raises its tx timeout to ≥8s, raise the hold + // time here accordingly. const lockHolder = prisma.$transaction( async (tx) => { await tx.$queryRaw`SELECT "id" FROM "Waitpoint" WHERE "id" = ${waitpointId} FOR UPDATE`; @@ -282,7 +302,7 @@ describe("RunEngine atomic completion", () => { await lockHolder; // The completion must have failed while the lock was held... - expect(completionError).toBeDefined(); + expect(completionError, "completion must fail while the waitpoint row is locked").toBeDefined(); // ...and NOTHING from the transition may have been committed: // run still EXECUTING, waitpoint still PENDING, no FINISHED snapshot. From 9372ab69057db953d657b7e6b62d3a74005e1283 Mon Sep 17 00:00:00 2001 From: Daniel Sutton Date: Wed, 10 Jun 2026 12:54:34 +0100 Subject: [PATCH 05/16] feat(run-engine): complete run and waitpoint in a single transaction Co-Authored-By: Claude Fable 5 --- .../src/engine/systems/runAttemptSystem.ts | 147 +++++++++++------- 1 file changed, 92 insertions(+), 55 deletions(-) diff --git a/internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts b/internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts index 06c80f67f2c..904153408de 100644 --- a/internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts @@ -738,58 +738,100 @@ export class RunAttemptSystem { environmentType: latestSnapshot.environmentType, }); - const run = await prisma.taskRun.update({ - where: { id: runId }, - data: { - status: "COMPLETED_SUCCESSFULLY", - completedAt, - output: completion.output, - outputType: completion.outputType, - usageDurationMs: updatedUsage.usageDurationMs, - costInCents: updatedUsage.costInCents, - executionSnapshots: { - create: { - executionStatus: "FINISHED", - description: "Task completed successfully", - runStatus: "COMPLETED_SUCCESSFULLY", - attemptNumber: latestSnapshot.attemptNumber, - environmentId: latestSnapshot.environmentId, - environmentType: latestSnapshot.environmentType, - projectId: latestSnapshot.projectId, - organizationId: latestSnapshot.organizationId, - workerId, - runnerId, + const completedOutput = completion.output + ? { value: completion.output, type: completion.outputType, isError: false as const } + : undefined; + + const txResult = await $transaction( + prisma, + async (tx) => { + const run = await tx.taskRun.update({ + where: { id: runId }, + data: { + status: "COMPLETED_SUCCESSFULLY", + completedAt, + output: completion.output, + outputType: completion.outputType, + usageDurationMs: updatedUsage.usageDurationMs, + costInCents: updatedUsage.costInCents, + executionSnapshots: { + create: { + executionStatus: "FINISHED", + description: "Task completed successfully", + runStatus: "COMPLETED_SUCCESSFULLY", + attemptNumber: latestSnapshot.attemptNumber, + environmentId: latestSnapshot.environmentId, + environmentType: latestSnapshot.environmentType, + projectId: latestSnapshot.projectId, + organizationId: latestSnapshot.organizationId, + workerId, + runnerId, + }, + }, }, - }, - }, - select: { - id: true, - friendlyId: true, - status: true, - attemptNumber: true, - spanId: true, - updatedAt: true, - associatedWaitpoint: { select: { id: true, + friendlyId: true, + status: true, + attemptNumber: true, + spanId: true, + updatedAt: true, + associatedWaitpoint: { + select: { + id: true, + }, + }, + project: { + select: { + organizationId: true, + }, + }, + batchId: true, + createdAt: true, + completedAt: true, + taskEventStore: true, + parentTaskRunId: true, + usageDurationMs: true, + costInCents: true, + runtimeEnvironmentId: true, + projectId: true, }, - }, - project: { - select: { - organizationId: true, - }, - }, - batchId: true, - createdAt: true, - completedAt: true, - taskEventStore: true, - parentTaskRunId: true, - usageDurationMs: true, - costInCents: true, - runtimeEnvironmentId: true, - projectId: true, + }); + + // Complete the waitpoint if it exists (runs without waiting parents + // have no waitpoint). Side effects (continuation jobs, events) are + // scheduled after this transaction commits. + const completedWaitpoint = run.associatedWaitpoint + ? await this.waitpointSystem.completeWaitpointMutation({ + id: run.associatedWaitpoint.id, + output: completedOutput, + tx, + }) + : undefined; + + return { run, completedWaitpoint }; }, - }); + (error) => { + this.$.logger.error("RunEngine.attemptSucceeded(): prisma.$transaction error", { + code: error.code, + meta: error.meta, + stack: error.stack, + message: error.message, + name: error.name, + }); + throw new ServiceValidationError( + "Failed to complete task run and associated waitpoint", + 500 + ); + } + ); + + if (!txResult) { + throw new ServiceValidationError("Failed to complete task run attempt", 500); + } + + const { run, completedWaitpoint } = txResult; + const newSnapshot = await getLatestExecutionSnapshot(prisma, runId); await this.$.runQueue.acknowledgeMessage(run.project.organizationId, runId); @@ -806,14 +848,9 @@ export class RunAttemptSystem { }, }); - // Complete the waitpoint if it exists (runs without waiting parents have no waitpoint) - if (run.associatedWaitpoint) { - await this.waitpointSystem.completeWaitpoint({ - id: run.associatedWaitpoint.id, - output: completion.output - ? { value: completion.output, type: completion.outputType, isError: false } - : undefined, - }); + // Post-commit side effects of the waitpoint completion + if (completedWaitpoint) { + await this.waitpointSystem.scheduleWaitpointContinuations(completedWaitpoint); } this.$.eventBus.emit("runSucceeded", { From 56943972fa0db9b0b0537778d53e30c92145423c Mon Sep 17 00:00:00 2001 From: Daniel Sutton Date: Wed, 10 Jun 2026 13:10:23 +0100 Subject: [PATCH 06/16] feat(run-engine): unblock continued runs in a single transaction Wrap the EXECUTING_WITH_WAITPOINTS case of continueRunIfUnblocked in a $transaction so the new EXECUTING snapshot and the TaskRunWaitpoint deletion share one atomic commit. sendNotificationToWorker fires only after the transaction commits. The SUSPENDED branch and post-switch deletion block are left untouched. Co-Authored-By: Claude Fable 5 --- .../src/engine/systems/waitpointSystem.ts | 83 ++++++++++++++----- 1 file changed, 61 insertions(+), 22 deletions(-) diff --git a/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts b/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts index 174d86d9704..d5d1e137df1 100644 --- a/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts @@ -1,6 +1,7 @@ import { timeoutError, tryCatch } from "@trigger.dev/core/v3"; import { WaitpointId } from "@trigger.dev/core/v3/isomorphic"; import { + $transaction, Prisma, PrismaClientOrTransaction, TaskQueue, @@ -835,31 +836,61 @@ export class WaitpointSystem { }; } case "EXECUTING_WITH_WAITPOINTS": { - const newSnapshot = await this.executionSnapshotSystem.createExecutionSnapshot( + const newSnapshot = await $transaction( this.$.prisma, - { - run: { - id: runId, - status: snapshot.runStatus, - attemptNumber: snapshot.attemptNumber, - }, - snapshot: { - executionStatus: "EXECUTING", - description: "Run was continued, whilst still executing.", - }, - previousSnapshotId: snapshot.id, - environmentId: snapshot.environmentId, - environmentType: snapshot.environmentType, - projectId: snapshot.projectId, - organizationId: snapshot.organizationId, - batchId: snapshot.batchId ?? undefined, - completedWaitpoints: blockingWaitpoints.map((b) => ({ - id: b.waitpoint.id, - index: b.batchIndex ?? undefined, - })), + async (tx) => { + const createdSnapshot = await this.executionSnapshotSystem.createExecutionSnapshot( + tx, + { + run: { + id: runId, + status: snapshot.runStatus, + attemptNumber: snapshot.attemptNumber, + }, + snapshot: { + executionStatus: "EXECUTING", + description: "Run was continued, whilst still executing.", + }, + previousSnapshotId: snapshot.id, + environmentId: snapshot.environmentId, + environmentType: snapshot.environmentType, + projectId: snapshot.projectId, + organizationId: snapshot.organizationId, + batchId: snapshot.batchId ?? undefined, + completedWaitpoints: blockingWaitpoints.map((b) => ({ + id: b.waitpoint.id, + index: b.batchIndex ?? undefined, + })), + } + ); + + // Remove the blocking waitpoints in the same transaction, so the + // new snapshot and the unblock are atomic. + if (blockingWaitpoints.length > 0) { + await tx.taskRunWaitpoint.deleteMany({ + where: { + taskRunId: runId, + id: { in: blockingWaitpoints.map((b) => b.id) }, + }, + }); + } + + return createdSnapshot; + }, + (error) => { + this.$.logger.error("continueRunIfUnblocked: prisma.$transaction error", { + code: error.code, + meta: error.meta, + message: error.message, + runId, + }); } ); + if (!newSnapshot) { + throw new Error(`continueRunIfUnblocked: failed to unblock run: ${runId}`); + } + this.$.logger.debug( `continueRunIfUnblocked: run was still executing, sending notification`, { @@ -875,7 +906,15 @@ export class WaitpointSystem { eventBus: this.$.eventBus, }); - break; + this.$.logger.debug(`continueRunIfUnblocked: removed blocking waitpoints`, { + runId, + blockingWaitpoints, + }); + + return { + status: "unblocked", + waitpoints: blockingWaitpoints.map((w) => w.waitpoint), + }; } case "SUSPENDED": { if (!snapshot.checkpointId) { From b4ce360eb291007f4bd392dd08303456eb76fb04 Mon Sep 17 00:00:00 2001 From: Daniel Sutton Date: Wed, 10 Jun 2026 13:15:55 +0100 Subject: [PATCH 07/16] fix(run-engine): keep snapshot side effects out of the unblock transaction MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Split createExecutionSnapshot into createExecutionSnapshotMutation (pure Postgres, safe inside a tx) and scheduleSnapshotSideEffects (Redis heartbeat enqueue + eventBus.emit, must run after commit). In continueRunIfUnblocked EXECUTING_WITH_WAITPOINTS, the $transaction now calls only createExecutionSnapshotMutation; scheduleSnapshotSideEffects is called after the tx returns so heartbeat/event always reference a durable row. createExecutionSnapshot is reimplemented as mutation → side-effects, keeping all other callers byte-identical. Co-Authored-By: Claude Fable 5 --- .../engine/systems/executionSnapshotSystem.ts | 83 +++++++++++++++++-- .../src/engine/systems/waitpointSystem.ts | 26 ++++-- 2 files changed, 91 insertions(+), 18 deletions(-) diff --git a/internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts b/internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts index d615c066b85..c6dd39b7fef 100644 --- a/internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts @@ -326,7 +326,12 @@ export class ExecutionSnapshotSystem { this.heartbeatTimeouts = options.heartbeatTimeouts; } - public async createExecutionSnapshot( + /** + * Pure Postgres mutation — safe inside a transaction. Inserts the snapshot row and + * returns the enhanced result. Direct callers MUST call `scheduleSnapshotSideEffects()` + * with the result after the surrounding transaction commits. + */ + public async createExecutionSnapshotMutation( prisma: PrismaClientOrTransaction, { run, @@ -399,34 +404,94 @@ export class ExecutionSnapshotSystem { }, }); + return { + ...newSnapshot, + friendlyId: SnapshotId.toFriendlyId(newSnapshot.id), + runFriendlyId: RunId.toFriendlyId(newSnapshot.runId), + completedWaitpoints, + }; + } + + /** + * Post-commit side effects for a newly created snapshot: arms the heartbeat job (if + * the execution status requires one) and emits the `executionSnapshotCreated` event. + * Never call this inside a transaction — these side effects must only run against a + * durable (committed) snapshot row. + */ + public async scheduleSnapshotSideEffects({ + snapshot, + runId, + error, + completedWaitpoints, + }: { + snapshot: Awaited>; + runId: string; + error?: string; + completedWaitpoints?: { id: string; index?: number }[]; + }) { if (!error) { //set heartbeat (if relevant) - const intervalMs = this.#getHeartbeatIntervalMs(newSnapshot.executionStatus); + const intervalMs = this.#getHeartbeatIntervalMs(snapshot.executionStatus); if (intervalMs !== null) { await this.$.worker.enqueue({ - id: `heartbeatSnapshot.${run.id}`, + id: `heartbeatSnapshot.${runId}`, job: "heartbeatSnapshot", - payload: { snapshotId: newSnapshot.id, runId: run.id }, + payload: { snapshotId: snapshot.id, runId }, availableAt: new Date(Date.now() + intervalMs), }); } } this.$.eventBus.emit("executionSnapshotCreated", { - time: newSnapshot.createdAt, + time: snapshot.createdAt, run: { - id: newSnapshot.runId, + id: snapshot.runId, }, snapshot: { - ...newSnapshot, + ...snapshot, completedWaitpointIds: completedWaitpoints?.map((w) => w.id) ?? [], }, }); + } + + public async createExecutionSnapshot( + prisma: PrismaClientOrTransaction, + args: { + run: { id: string; status: TaskRunStatus; attemptNumber?: number | null }; + snapshot: { + executionStatus: TaskRunExecutionStatus; + description: string; + metadata?: Prisma.JsonValue; + }; + previousSnapshotId?: string; + batchId?: string; + environmentId: string; + environmentType: RuntimeEnvironmentType; + projectId: string; + organizationId: string; + checkpointId?: string; + workerId?: string; + runnerId?: string; + completedWaitpoints?: { + id: string; + index?: number; + }[]; + error?: string; + } + ) { + const newSnapshot = await this.createExecutionSnapshotMutation(prisma, args); + + await this.scheduleSnapshotSideEffects({ + snapshot: newSnapshot, + runId: args.run.id, + error: args.error, + completedWaitpoints: args.completedWaitpoints, + }); return { ...newSnapshot, - friendlyId: SnapshotId.toFriendlyId(newSnapshot.id), - runFriendlyId: RunId.toFriendlyId(newSnapshot.runId), + friendlyId: newSnapshot.friendlyId, + runFriendlyId: newSnapshot.runFriendlyId, }; } diff --git a/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts b/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts index d5d1e137df1..c8320c887b9 100644 --- a/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts @@ -836,12 +836,16 @@ export class WaitpointSystem { }; } case "EXECUTING_WITH_WAITPOINTS": { + const completedWaitpointArgs = blockingWaitpoints.map((b) => ({ + id: b.waitpoint.id, + index: b.batchIndex ?? undefined, + })); + const newSnapshot = await $transaction( this.$.prisma, async (tx) => { - const createdSnapshot = await this.executionSnapshotSystem.createExecutionSnapshot( - tx, - { + const createdSnapshot = + await this.executionSnapshotSystem.createExecutionSnapshotMutation(tx, { run: { id: runId, status: snapshot.runStatus, @@ -857,12 +861,8 @@ export class WaitpointSystem { projectId: snapshot.projectId, organizationId: snapshot.organizationId, batchId: snapshot.batchId ?? undefined, - completedWaitpoints: blockingWaitpoints.map((b) => ({ - id: b.waitpoint.id, - index: b.batchIndex ?? undefined, - })), - } - ); + completedWaitpoints: completedWaitpointArgs, + }); // Remove the blocking waitpoints in the same transaction, so the // new snapshot and the unblock are atomic. @@ -891,6 +891,14 @@ export class WaitpointSystem { throw new Error(`continueRunIfUnblocked: failed to unblock run: ${runId}`); } + // Schedule side effects (heartbeat + eventBus) AFTER the transaction has + // committed, so they always reference a durable snapshot row. + await this.executionSnapshotSystem.scheduleSnapshotSideEffects({ + snapshot: newSnapshot, + runId, + completedWaitpoints: completedWaitpointArgs, + }); + this.$.logger.debug( `continueRunIfUnblocked: run was still executing, sending notification`, { From eee5d58bf1614830db7aeac83e99db1cfbdffeec Mon Sep 17 00:00:00 2001 From: Daniel Sutton Date: Wed, 10 Jun 2026 13:24:59 +0100 Subject: [PATCH 08/16] fix(run-engine): restore pre-split snapshot return and event payload shapes Remove completedWaitpoints from createExecutionSnapshotMutation return (it was leaking the args array into callers and the eventBus emit). In scheduleSnapshotSideEffects, destructure off the wrapper-only friendlyId/runFriendlyId before spreading into the emitted payload so the event contains only raw snapshot-row fields + completedWaitpointIds, matching the pre-split shape. Co-Authored-By: Claude Fable 5 --- .../src/engine/systems/executionSnapshotSystem.ts | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts b/internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts index c6dd39b7fef..5da2fef1898 100644 --- a/internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts @@ -408,7 +408,6 @@ export class ExecutionSnapshotSystem { ...newSnapshot, friendlyId: SnapshotId.toFriendlyId(newSnapshot.id), runFriendlyId: RunId.toFriendlyId(newSnapshot.runId), - completedWaitpoints, }; } @@ -442,13 +441,17 @@ export class ExecutionSnapshotSystem { } } + // Destructure off the wrapper-only fields so the emitted payload matches the + // pre-split shape (raw snapshot row fields + completedWaitpointIds only). + const { friendlyId: _fid, runFriendlyId: _rfid, ...snapshotRow } = snapshot; + this.$.eventBus.emit("executionSnapshotCreated", { time: snapshot.createdAt, run: { id: snapshot.runId, }, snapshot: { - ...snapshot, + ...snapshotRow, completedWaitpointIds: completedWaitpoints?.map((w) => w.id) ?? [], }, }); From 9e1d3a63747e44197b9a533c7b94c7e615a2c42b Mon Sep 17 00:00:00 2001 From: Daniel Sutton Date: Wed, 10 Jun 2026 13:32:31 +0100 Subject: [PATCH 09/16] fix(run-engine): widen unblock transaction limits for batch-scale waitpoints Pass explicit timeout/maxRetries to $transaction in continueRunIfUnblocked so that large batchTriggerAndWait parents (hundreds of blocking waitpoint rows) don't hit the 5s Prisma default and land in DLQ. Also removes a duplicate post-send debug log, collapses a no-op re-spread in createExecutionSnapshot, and documents that attemptSucceeded callers must pass a plain client. Co-Authored-By: Claude Fable 5 --- .../src/engine/systems/executionSnapshotSystem.ts | 6 +----- .../run-engine/src/engine/systems/runAttemptSystem.ts | 3 +++ .../run-engine/src/engine/systems/waitpointSystem.ts | 11 ++++++----- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts b/internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts index 5da2fef1898..4f4622bcac7 100644 --- a/internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts @@ -491,11 +491,7 @@ export class ExecutionSnapshotSystem { completedWaitpoints: args.completedWaitpoints, }); - return { - ...newSnapshot, - friendlyId: newSnapshot.friendlyId, - runFriendlyId: newSnapshot.runFriendlyId, - }; + return newSnapshot; } public async heartbeatRun({ diff --git a/internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts b/internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts index 904153408de..eee1da3fee8 100644 --- a/internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts @@ -688,6 +688,9 @@ export class RunAttemptSystem { runId: string; snapshotId: string; completion: TaskRunSuccessfulExecutionResult; + // Note: passing an open transaction as `tx` makes $transaction run inline in it, + // which would move this method's post-commit side effects inside the caller's + // transaction. Callers must pass a plain client. tx: PrismaClientOrTransaction; workerId?: string; runnerId?: string; diff --git a/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts b/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts index c8320c887b9..f65f2a4c117 100644 --- a/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts @@ -884,6 +884,12 @@ export class WaitpointSystem { message: error.message, runId, }); + }, + { + // the m2m connect inserts one row per blocking waitpoint, so large + // batchTriggerAndWait parents need more than the 5s default + timeout: 30_000, + maxRetries: 2, } ); @@ -914,11 +920,6 @@ export class WaitpointSystem { eventBus: this.$.eventBus, }); - this.$.logger.debug(`continueRunIfUnblocked: removed blocking waitpoints`, { - runId, - blockingWaitpoints, - }); - return { status: "unblocked", waitpoints: blockingWaitpoints.map((w) => w.waitpoint), From 9cffdce8a647aab52e63d275a292b6a09fb81a8f Mon Sep 17 00:00:00 2001 From: Daniel Sutton Date: Wed, 10 Jun 2026 13:33:43 +0100 Subject: [PATCH 10/16] chore: add server-changes entry for transactional run completion --- .../transactional-run-completion.md | 6 + ...6-06-10-phase1-transactional-completion.md | 980 ++++++++++++++++++ 2 files changed, 986 insertions(+) create mode 100644 .server-changes/transactional-run-completion.md create mode 100644 docs/superpowers/plans/2026-06-10-phase1-transactional-completion.md diff --git a/.server-changes/transactional-run-completion.md b/.server-changes/transactional-run-completion.md new file mode 100644 index 00000000000..38f333d6652 --- /dev/null +++ b/.server-changes/transactional-run-completion.md @@ -0,0 +1,6 @@ +--- +area: webapp +type: improvement +--- + +Reduce database load during traffic spikes by completing runs and resuming waiting runs in single atomic transactions diff --git a/docs/superpowers/plans/2026-06-10-phase1-transactional-completion.md b/docs/superpowers/plans/2026-06-10-phase1-transactional-completion.md new file mode 100644 index 00000000000..d2c4fab7a8f --- /dev/null +++ b/docs/superpowers/plans/2026-06-10-phase1-transactional-completion.md @@ -0,0 +1,980 @@ +# Phase 1: Transactional Run Completion Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Halve the Run Engine's Postgres commit count on the run-completion and run-continuation paths by wrapping their currently-separate single-row writes into single transactions, making each state transition atomic. + +**Architecture:** Production Performance Insights shows ~60% of DB load during AAS storms is `IO:XactSync` (Aurora commit sync) caused by the engine committing a separate tiny transaction per write. Two hot paths each issue 2 write-commits that can be 1: `attemptSucceeded` (TaskRun update+snapshot, then waitpoint completion) and `continueRunIfUnblocked` (snapshot insert, then TaskRunWaitpoint delete). We wrap each pair in one `$transaction` using the exact idiom already used by `startRunAttempt` (`runAttemptSystem.ts:397`). Redis operations and eventBus emissions MUST stay outside/after the transaction — a side effect firing before COMMIT would act on state that can roll back. `completeWaitpoint` is therefore split into a tx-able mutation phase and a post-commit side-effect phase. + +**Tech Stack:** TypeScript, Prisma 6 interactive transactions (`$transaction` helper from `@trigger.dev/database`), vitest + `@internal/testcontainers` (real Postgres + Redis — never mock, per repo CLAUDE.md). + +**Out of scope (follow-up PRs):** the failure path (`attemptFailed` / `#permanentlyFailRun`), the cancellation path (`cancelRun`), and cross-run write coalescing (Phase 2). One change at a time. + +**Verification baseline:** the run-engine test suite must be green before starting. All commands below run from `internal-packages/run-engine/` unless stated otherwise. + +--- + +## File Structure + +- Modify: `internal-packages/run-engine/src/engine/systems/waitpointSystem.ts` + - Split `completeWaitpoint` (lines 70–174) into `completeWaitpointMutation` (PG-only, tx-able) + `scheduleWaitpointContinuations` (post-commit Redis/event side effects), with `completeWaitpoint` preserved as the composition of both (7 existing callers unchanged). + - Wrap the `EXECUTING_WITH_WAITPOINTS` case of `continueRunIfUnblocked` (lines ~795–912) in one transaction. +- Modify: `internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts` + - Wrap `attemptSucceeded`'s TaskRun update + waitpoint completion (lines ~741–817) in one transaction. +- Create: `internal-packages/run-engine/src/engine/tests/atomicCompletion.test.ts` + - Commit-count regression tests (via `txid_current()` deltas) + rollback-atomicity test (via a held row lock — real DB fault injection, no mocks). +- Create: `.server-changes/transactional-run-completion.md` + +--- + +### Task 1: Confirm baseline is green + +**Files:** none modified. + +- [ ] **Step 1: Build dependencies and run the affected test files** + +```bash +cd /path/to/repo +pnpm run build --filter @internal/run-engine +cd internal-packages/run-engine +pnpm run test ./src/engine/tests/triggerAndWait.test.ts --run +pnpm run test ./src/engine/tests/waitpoints.test.ts --run +pnpm run test ./src/engine/tests/trigger.test.ts --run +``` + +Expected: PASS. If anything fails here, STOP — the baseline is broken and must be reported before any changes. + +--- + +### Task 2: Split `completeWaitpoint` into mutation + side-effect phases + +**Files:** +- Modify: `internal-packages/run-engine/src/engine/systems/waitpointSystem.ts:70-174` + +This is a behavior-preserving refactor. Existing tests are the safety net; no new test is written first. + +- [ ] **Step 1: Add the result type and split the method** + +In `waitpointSystem.ts`, immediately above `export class WaitpointSystem` (line 42), add: + +```ts +export type CompletedWaitpointMutationResult = { + waitpoint: Waitpoint; + affectedTaskRuns: { + taskRunId: string; + spanIdToComplete: string | null; + createdAt: Date; + }[]; +}; +``` + +(`Waitpoint` is already imported in this file — it's the current return type of `completeWaitpoint`.) + +Replace the entire `completeWaitpoint` method (lines 70–174, from the `/** This completes a waitpoint...` comment through the closing `}` after `return waitpoint;`) with these three methods: + +```ts + /** This completes a waitpoint and updates all entries so the run isn't blocked, + * if they're no longer blocked. This doesn't suffer from race conditions. */ + async completeWaitpoint({ + id, + output, + }: { + id: string; + output?: { + value: string; + type?: string; + isError: boolean; + }; + }): Promise { + const result = await this.completeWaitpointMutation({ id, output }); + await this.scheduleWaitpointContinuations({ ...result, output }); + return result.waitpoint; + } + + /** Marks the waitpoint COMPLETED and returns the runs it was blocking. + * Pure Postgres mutation — safe to run inside a transaction via `tx`. + * Callers passing `tx` MUST call scheduleWaitpointContinuations() with the + * result AFTER the surrounding transaction commits, otherwise blocked runs + * are never continued. */ + async completeWaitpointMutation({ + id, + output, + tx, + }: { + id: string; + output?: { + value: string; + type?: string; + isError: boolean; + }; + tx?: PrismaClientOrTransaction; + }): Promise { + const prisma = tx ?? this.$.prisma; + + // 1. Complete the Waitpoint (if not completed) + const [updateError, updateResult] = await tryCatch( + prisma.waitpoint.updateMany({ + where: { id, status: "PENDING" }, + data: { + status: "COMPLETED", + completedAt: new Date(), + output: output?.value, + outputType: output?.type, + outputIsError: output?.isError, + }, + }) + ); + + if (updateError) { + this.$.logger.error("completeWaitpoint: error updating waitpoint:", { updateError }); + throw updateError; + } + + if (updateResult.count === 0) { + this.$.logger.info( + "completeWaitpoint: attempted to complete a waitpoint that is not PENDING", + { waitpointId: id } + ); + } + + const waitpoint = await prisma.waitpoint.findFirst({ + where: { id }, + }); + + if (!waitpoint) { + this.$.logger.error("completeWaitpoint: waitpoint not found", { waitpointId: id }); + throw new Error("Waitpoint not found"); + } + + if (waitpoint.status !== "COMPLETED") { + this.$.logger.error(`completeWaitpoint: waitpoint is not completed`, { + waitpointId: id, + }); + throw new Error("Waitpoint not completed"); + } + + // 2. Find the TaskRuns blocked by this waitpoint + const affectedTaskRuns = await prisma.taskRunWaitpoint.findMany({ + where: { waitpointId: id }, + select: { taskRunId: true, spanIdToComplete: true, createdAt: true }, + }); + + if (affectedTaskRuns.length === 0) { + this.$.logger.debug(`completeWaitpoint: no TaskRunWaitpoints found for waitpoint`, { + waitpointId: id, + }); + } + + return { waitpoint, affectedTaskRuns }; + } + + /** Post-commit side effects of completing a waitpoint: schedules continuation of + * the blocked runs and emits events. Must be called AFTER the mutation committed — + * never from inside a transaction. */ + async scheduleWaitpointContinuations({ + waitpoint, + affectedTaskRuns, + output, + }: CompletedWaitpointMutationResult & { + output?: { + value: string; + type?: string; + isError: boolean; + }; + }): Promise { + // 3. Schedule trying to continue the runs + for (const run of affectedTaskRuns) { + const jobId = `continueRunIfUnblocked:${run.taskRunId}`; + //50ms in the future + const availableAt = new Date(Date.now() + 50); + + this.$.logger.debug(`completeWaitpoint: enqueueing continueRunIfUnblocked`, { + waitpointId: waitpoint.id, + runId: run.taskRunId, + jobId, + availableAt, + }); + + await this.$.worker.enqueue({ + //this will debounce the call + id: jobId, + job: "continueRunIfUnblocked", + payload: { runId: run.taskRunId }, + availableAt, + }); + + // emit an event to complete associated cached runs + if (run.spanIdToComplete) { + this.$.eventBus.emit("cachedRunCompleted", { + time: new Date(), + span: { + id: run.spanIdToComplete, + createdAt: run.createdAt, + }, + blockedRunId: run.taskRunId, + hasError: output?.isError ?? false, + cachedRunId: waitpoint.completedByTaskRunId ?? undefined, + }); + } + } + } +``` + +Note the one intentional difference from the original: the debug log inside the loop uses `waitpointId: waitpoint.id` instead of the old closure variable `id`. Everything else is a verbatim move. + +- [ ] **Step 2: Typecheck** + +```bash +cd internal-packages/run-engine && pnpm run typecheck +``` + +Expected: PASS. (`PrismaClientOrTransaction` and `tryCatch` are already imported in this file.) + +- [ ] **Step 3: Run the waitpoint-related tests to confirm no behavior change** + +```bash +pnpm run test ./src/engine/tests/waitpoints.test.ts --run +pnpm run test ./src/engine/tests/triggerAndWait.test.ts --run +``` + +Expected: PASS. + +- [ ] **Step 4: Commit** + +```bash +git add internal-packages/run-engine/src/engine/systems/waitpointSystem.ts +git commit -m "refactor(run-engine): split completeWaitpoint into tx-able mutation and post-commit side effects" +``` + +--- + +### Task 3: Write the failing tests for transactional completion + +**Files:** +- Create: `internal-packages/run-engine/src/engine/tests/atomicCompletion.test.ts` + +Both tests use the parent + `triggerAndWait` child setup (only child runs created with `resumeParentOnCompletion: true` get an `associatedWaitpoint`, so only they exercise the two-commit completion path). + +- [ ] **Step 1: Write the test file** + +```ts +import { containerTest, assertNonNullable } from "@internal/testcontainers"; +import { trace } from "@internal/tracing"; +import { expect } from "vitest"; +import { setTimeout } from "timers/promises"; +import { PrismaClient } from "@trigger.dev/database"; +import { RunEngine } from "../index.js"; +import { setupAuthenticatedEnvironment, setupBackgroundWorker } from "./setup.js"; + +vi.setConfig({ testTimeout: 60_000 }); + +function createEngine(prisma: PrismaClient, redisOptions: any) { + return new RunEngine({ + prisma, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + }, + queue: { + redis: redisOptions, + masterQueueConsumersDisabled: true, + processWorkerQueueDebounceMs: 50, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0001, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); +} + +/** Number of xid-consuming (i.e. write) transactions is the delta between two + * txid_current() calls, minus 1 for the second call itself. Read-only + * transactions never allocate an xid, so they don't count. */ +async function currentTxid(prisma: PrismaClient): Promise { + const rows = await prisma.$queryRaw<{ txid: bigint }[]>`SELECT txid_current() AS txid`; + return BigInt(rows[0].txid); +} + +/** Drives a parent run + triggerAndWait child up to the point where the child + * is EXECUTING and the parent is blocked on the child's associated waitpoint. */ +async function setupBlockedParentWithExecutingChild( + engine: RunEngine, + prisma: PrismaClient, + authenticatedEnvironment: Awaited> +) { + const parentTask = "parent-task"; + const childTask = "child-task"; + + await setupBackgroundWorker(engine, authenticatedEnvironment, [parentTask, childTask]); + + const parentRun = await engine.trigger( + { + number: 1, + friendlyId: "run_p1234", + environment: authenticatedEnvironment, + taskIdentifier: parentTask, + payload: "{}", + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "t12345", + spanId: "s12345", + queue: `task/${parentTask}`, + isTest: false, + tags: [], + workerQueue: "main", + }, + prisma + ); + + await setTimeout(500); + const dequeuedParent = await engine.dequeueFromWorkerQueue({ + consumerId: "test_12345", + workerQueue: "main", + }); + expect(dequeuedParent.length).toBe(1); + + await engine.startRunAttempt({ + runId: parentRun.id, + snapshotId: dequeuedParent[0].snapshot.id, + }); + + const childRun = await engine.trigger( + { + number: 1, + friendlyId: "run_c1234", + environment: authenticatedEnvironment, + taskIdentifier: childTask, + payload: "{}", + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "t12345", + spanId: "s12345", + queue: `task/${childTask}`, + isTest: false, + tags: [], + resumeParentOnCompletion: true, + parentTaskRunId: parentRun.id, + workerQueue: "main", + }, + prisma + ); + + await setTimeout(500); + const dequeuedChild = await engine.dequeueFromWorkerQueue({ + consumerId: "test_12345", + workerQueue: "main", + }); + expect(dequeuedChild.length).toBe(1); + + const childAttempt = await engine.startRunAttempt({ + runId: childRun.id, + snapshotId: dequeuedChild[0].snapshot.id, + }); + + const childWithWaitpoint = await prisma.taskRun.findFirstOrThrow({ + where: { id: childRun.id }, + include: { associatedWaitpoint: true }, + }); + assertNonNullable(childWithWaitpoint.associatedWaitpoint); + + return { + parentRun, + childRun, + childAttempt, + waitpointId: childWithWaitpoint.associatedWaitpoint.id, + }; +} + +describe("RunEngine atomic completion", () => { + containerTest( + "attemptSucceeded with an associated waitpoint is a single write transaction", + async ({ prisma, redisOptions }) => { + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + const engine = createEngine(prisma, redisOptions); + + try { + const { childRun, childAttempt } = await setupBlockedParentWithExecutingChild( + engine, + prisma, + authenticatedEnvironment + ); + + const before = await currentTxid(prisma); + + const result = await engine.completeRunAttempt({ + runId: childRun.id, + snapshotId: childAttempt.snapshot.id, + completion: { + ok: true, + id: childRun.id, + output: `{"foo":"bar"}`, + outputType: "application/json", + }, + }); + + const after = await currentTxid(prisma); + + expect(result.attemptStatus).toBe("RUN_FINISHED"); + + // TaskRun update (+ nested snapshot) and waitpoint completion + // must share one commit. + const writeTransactions = Number(after - before) - 1; + expect(writeTransactions).toBe(1); + } finally { + await engine.quit(); + } + } + ); + + containerTest( + "continueRunIfUnblocked is a single write transaction", + async ({ prisma, redisOptions }) => { + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + const engine = createEngine(prisma, redisOptions); + + try { + const { parentRun, childRun, childAttempt } = await setupBlockedParentWithExecutingChild( + engine, + prisma, + authenticatedEnvironment + ); + + await engine.completeRunAttempt({ + runId: childRun.id, + snapshotId: childAttempt.snapshot.id, + completion: { + ok: true, + id: childRun.id, + output: `{"foo":"bar"}`, + outputType: "application/json", + }, + }); + + const before = await currentTxid(prisma); + + // The continueRunIfUnblocked job is debounced by 50ms; poll until the + // parent has been continued (EXECUTING_WITH_WAITPOINTS -> EXECUTING). + let continued = false; + for (let i = 0; i < 50; i++) { + await setTimeout(100); + const parentData = await engine.getRunExecutionData({ runId: parentRun.id }); + if (parentData?.snapshot.executionStatus === "EXECUTING") { + continued = true; + break; + } + } + expect(continued).toBe(true); + + const after = await currentTxid(prisma); + + // Snapshot insert and TaskRunWaitpoint delete must share one commit. + const writeTransactions = Number(after - before) - 1; + expect(writeTransactions).toBe(1); + + const remainingBlockers = await prisma.taskRunWaitpoint.findMany({ + where: { taskRunId: parentRun.id }, + }); + expect(remainingBlockers.length).toBe(0); + } finally { + await engine.quit(); + } + } + ); + + containerTest( + "a failed completion rolls back the whole transition", + async ({ prisma, redisOptions }) => { + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + const engine = createEngine(prisma, redisOptions); + + try { + const { childRun, childAttempt, waitpointId } = await setupBlockedParentWithExecutingChild( + engine, + prisma, + authenticatedEnvironment + ); + + // Real fault injection, no mocks: hold a row lock on the child's + // associated waitpoint so the waitpoint completion inside + // attemptSucceeded blocks past the transaction timeout (5s default) + // and the whole transaction aborts. + const lockHolder = prisma.$transaction( + async (tx) => { + await tx.$queryRaw`SELECT "id" FROM "Waitpoint" WHERE "id" = ${waitpointId} FOR UPDATE`; + await setTimeout(8_000); + }, + { timeout: 20_000, maxWait: 5_000 } + ); + await setTimeout(300); // let the lock land + + let completionError: unknown; + try { + await engine.completeRunAttempt({ + runId: childRun.id, + snapshotId: childAttempt.snapshot.id, + completion: { + ok: true, + id: childRun.id, + output: `{"foo":"bar"}`, + outputType: "application/json", + }, + }); + } catch (error) { + completionError = error; + } + + await lockHolder; + + // The completion must have failed while the lock was held... + expect(completionError).toBeDefined(); + + // ...and NOTHING from the transition may have been committed: + // run still EXECUTING, waitpoint still PENDING, no FINISHED snapshot. + const childAfter = await prisma.taskRun.findFirstOrThrow({ + where: { id: childRun.id }, + }); + expect(childAfter.status).toBe("EXECUTING"); + + const waitpointAfter = await prisma.waitpoint.findFirstOrThrow({ + where: { id: waitpointId }, + }); + expect(waitpointAfter.status).toBe("PENDING"); + + const finishedSnapshots = await prisma.taskRunExecutionSnapshot.findMany({ + where: { runId: childRun.id, executionStatus: "FINISHED" }, + }); + expect(finishedSnapshots.length).toBe(0); + } finally { + await engine.quit(); + } + } + ); +}); +``` + +Note: `setupBackgroundWorker` accepts `string | string[]` for the task identifier (`setup.ts:87`), so the array call above is correct. + +- [ ] **Step 2: Run the new tests to verify they FAIL for the right reasons** + +```bash +pnpm run test ./src/engine/tests/atomicCompletion.test.ts --run +``` + +Expected: all three FAIL: +- Test 1: `writeTransactions` is 2 (TaskRun update commit + waitpoint updateMany commit), expected 1. +- Test 2: `writeTransactions` is 2 (snapshot insert commit + TaskRunWaitpoint delete commit), expected 1. +- Test 3: `completionError` is undefined (today the standalone `updateMany` has no transaction timeout — it just waits 8s for the lock and then succeeds) AND `childAfter.status` is `COMPLETED_SUCCESSFULLY` (the TaskRun update committed before the waitpoint write — the torn state this change eliminates). + +If a test fails for a different reason (setup error, wrong call shape), fix the test until the failures are exactly these. + +- [ ] **Step 3: Commit the failing tests** + +```bash +git add internal-packages/run-engine/src/engine/tests/atomicCompletion.test.ts +git commit -m "test(run-engine): add failing tests for atomic single-commit run completion" +``` + +--- + +### Task 4: Wrap `attemptSucceeded` in a single transaction + +**Files:** +- Modify: `internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts:741-817` + +- [ ] **Step 1: Replace the separate update + completeWaitpoint with one $transaction** + +In `attemptSucceeded`, replace the block from `const run = await prisma.taskRun.update({` (line 741) through the end of the `if (run.associatedWaitpoint) { ... }` block (line 817) with: + +```ts + const completedOutput = completion.output + ? { value: completion.output, type: completion.outputType, isError: false } + : undefined; + + const txResult = await $transaction( + prisma, + async (tx) => { + const run = await tx.taskRun.update({ + where: { id: runId }, + data: { + status: "COMPLETED_SUCCESSFULLY", + completedAt, + output: completion.output, + outputType: completion.outputType, + usageDurationMs: updatedUsage.usageDurationMs, + costInCents: updatedUsage.costInCents, + executionSnapshots: { + create: { + executionStatus: "FINISHED", + description: "Task completed successfully", + runStatus: "COMPLETED_SUCCESSFULLY", + attemptNumber: latestSnapshot.attemptNumber, + environmentId: latestSnapshot.environmentId, + environmentType: latestSnapshot.environmentType, + projectId: latestSnapshot.projectId, + organizationId: latestSnapshot.organizationId, + workerId, + runnerId, + }, + }, + }, + select: { + id: true, + friendlyId: true, + status: true, + attemptNumber: true, + spanId: true, + updatedAt: true, + associatedWaitpoint: { + select: { + id: true, + }, + }, + project: { + select: { + organizationId: true, + }, + }, + batchId: true, + createdAt: true, + completedAt: true, + taskEventStore: true, + parentTaskRunId: true, + usageDurationMs: true, + costInCents: true, + runtimeEnvironmentId: true, + projectId: true, + }, + }); + + // Complete the waitpoint if it exists (runs without waiting parents + // have no waitpoint). Side effects (continuation jobs, events) are + // scheduled after this transaction commits. + const completedWaitpoint = run.associatedWaitpoint + ? await this.waitpointSystem.completeWaitpointMutation({ + id: run.associatedWaitpoint.id, + output: completedOutput, + tx, + }) + : undefined; + + return { run, completedWaitpoint }; + }, + (error) => { + this.$.logger.error("RunEngine.attemptSucceeded(): prisma.$transaction error", { + code: error.code, + meta: error.meta, + stack: error.stack, + message: error.message, + name: error.name, + }); + throw new ServiceValidationError( + "Failed to complete task run and associated waitpoint", + 500 + ); + } + ); + + if (!txResult) { + throw new ServiceValidationError("Failed to complete task run attempt", 500); + } + + const { run, completedWaitpoint } = txResult; + + const newSnapshot = await getLatestExecutionSnapshot(prisma, runId); + + await this.$.runQueue.acknowledgeMessage(run.project.organizationId, runId); + + // We need to manually emit this as we created the final snapshot as part of the task run update + this.$.eventBus.emit("executionSnapshotCreated", { + time: newSnapshot.createdAt, + run: { + id: newSnapshot.runId, + }, + snapshot: { + ...newSnapshot, + completedWaitpointIds: newSnapshot.completedWaitpoints.map((wp) => wp.id), + }, + }); + + // Post-commit side effects of the waitpoint completion + if (completedWaitpoint) { + await this.waitpointSystem.scheduleWaitpointContinuations({ + ...completedWaitpoint, + output: completedOutput, + }); + } +``` + +The subsequent `this.$.eventBus.emit("runSucceeded", ...)`, `await this.#finalizeRun(run);`, and the `return { attemptStatus: "RUN_FINISHED", snapshot: newSnapshot, run };` (lines 819–852) stay exactly as they are. + +Two things removed relative to the old code, deliberately: +- The old `await this.$.runQueue.acknowledgeMessage(...)` / `executionSnapshotCreated` emit that sat *between* the update and the waitpoint completion now run after the transaction (same relative order to each other). +- The old direct `completeWaitpoint` call is replaced by mutation-inside-tx + side-effects-after-tx. + +`$transaction` is already imported in this file (line 38). `ServiceValidationError` is already imported (line 44). + +- [ ] **Step 2: Typecheck** + +```bash +cd internal-packages/run-engine && pnpm run typecheck +``` + +Expected: PASS. + +- [ ] **Step 3: Run the new tests — completion tests must now pass** + +```bash +pnpm run test ./src/engine/tests/atomicCompletion.test.ts --run +``` + +Expected: Test 1 ("single write transaction") PASS. Test 3 ("rolls back the whole transition") PASS. Test 2 ("continueRunIfUnblocked") still FAILS (that's Task 5). + +- [ ] **Step 4: Run the existing lifecycle tests** + +```bash +pnpm run test ./src/engine/tests/trigger.test.ts --run +pnpm run test ./src/engine/tests/triggerAndWait.test.ts --run +pnpm run test ./src/engine/tests/waitpoints.test.ts --run +pnpm run test ./src/engine/tests/batchTriggerAndWait.test.ts --run +``` + +Expected: PASS. + +- [ ] **Step 5: Commit** + +```bash +git add internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts +git commit -m "feat(run-engine): complete run and waitpoint in a single transaction" +``` + +--- + +### Task 5: Wrap `continueRunIfUnblocked`'s unblock in a single transaction + +**Files:** +- Modify: `internal-packages/run-engine/src/engine/systems/waitpointSystem.ts` (the `EXECUTING_WITH_WAITPOINTS` case, lines ~795–837, and imports) + +- [ ] **Step 1: Add `$transaction` to the imports** + +In the import block from `"@trigger.dev/database"` at the top of `waitpointSystem.ts`, add `$transaction`: + +```ts +import { + $transaction, + Prisma, + PrismaClientOrTransaction, + TaskRun, + Waitpoint, +} from "@trigger.dev/database"; +``` + +(Keep whatever names are already in that import — just add `$transaction` to them.) + +- [ ] **Step 2: Replace the `EXECUTING_WITH_WAITPOINTS` case** + +Replace the entire `case "EXECUTING_WITH_WAITPOINTS": { ... break; }` block with: + +```ts + case "EXECUTING_WITH_WAITPOINTS": { + const newSnapshot = await $transaction( + this.$.prisma, + async (tx) => { + const createdSnapshot = await this.executionSnapshotSystem.createExecutionSnapshot( + tx, + { + run: { + id: runId, + status: snapshot.runStatus, + attemptNumber: snapshot.attemptNumber, + }, + snapshot: { + executionStatus: "EXECUTING", + description: "Run was continued, whilst still executing.", + }, + previousSnapshotId: snapshot.id, + environmentId: snapshot.environmentId, + environmentType: snapshot.environmentType, + projectId: snapshot.projectId, + organizationId: snapshot.organizationId, + batchId: snapshot.batchId ?? undefined, + completedWaitpoints: blockingWaitpoints.map((b) => ({ + id: b.waitpoint.id, + index: b.batchIndex ?? undefined, + })), + } + ); + + // Remove the blocking waitpoints in the same transaction, so the + // new snapshot and the unblock are atomic. + if (blockingWaitpoints.length > 0) { + await tx.taskRunWaitpoint.deleteMany({ + where: { + taskRunId: runId, + id: { in: blockingWaitpoints.map((b) => b.id) }, + }, + }); + } + + return createdSnapshot; + }, + (error) => { + this.$.logger.error("continueRunIfUnblocked: prisma.$transaction error", { + code: error.code, + meta: error.meta, + message: error.message, + runId, + }); + } + ); + + if (!newSnapshot) { + throw new Error(`continueRunIfUnblocked: failed to unblock run: ${runId}`); + } + + this.$.logger.debug( + `continueRunIfUnblocked: run was still executing, sending notification`, + { + runId, + snapshot, + newSnapshot, + } + ); + + await sendNotificationToWorker({ + runId, + snapshot: newSnapshot, + eventBus: this.$.eventBus, + }); + + this.$.logger.debug(`continueRunIfUnblocked: removed blocking waitpoints`, { + runId, + blockingWaitpoints, + }); + + return { + status: "unblocked", + waitpoints: blockingWaitpoints.map((w) => w.waitpoint), + }; + } +``` + +This case now returns directly instead of `break`ing, so the post-switch `if (blockingWaitpoints.length > 0) { deleteMany ... }` block (lines ~893–906) no longer runs for it — that block now only serves the `SUSPENDED` case, which is deliberately left unchanged in this PR (its `enqueueRun` performs Redis work internally and must not be pulled into a Postgres transaction). + +Note the ordering improvement: the worker notification now fires after the unblock is durable, instead of before the waitpoint rows were deleted. + +- [ ] **Step 3: Typecheck** + +```bash +cd internal-packages/run-engine && pnpm run typecheck +``` + +Expected: PASS. + +- [ ] **Step 4: Run the new tests — all three must now pass** + +```bash +pnpm run test ./src/engine/tests/atomicCompletion.test.ts --run +``` + +Expected: PASS (all 3). + +- [ ] **Step 5: Run the waitpoint/continuation-heavy existing tests** + +```bash +pnpm run test ./src/engine/tests/triggerAndWait.test.ts --run +pnpm run test ./src/engine/tests/waitpoints.test.ts --run +pnpm run test ./src/engine/tests/waitpointRace.test.ts --run +pnpm run test ./src/engine/tests/batchTriggerAndWait.test.ts --run +pnpm run test ./src/engine/tests/lazyWaitpoint.test.ts --run +pnpm run test ./src/engine/tests/checkpoints.test.ts --run +``` + +Expected: PASS. (`checkpoints.test.ts` exercises the SUSPENDED path — it must be unaffected.) + +- [ ] **Step 6: Commit** + +```bash +git add internal-packages/run-engine/src/engine/systems/waitpointSystem.ts +git commit -m "feat(run-engine): unblock continued runs in a single transaction" +``` + +--- + +### Task 6: Full suite, server-changes file, final verification + +**Files:** +- Create: `.server-changes/transactional-run-completion.md` + +- [ ] **Step 1: Run the full run-engine test suite** + +```bash +cd internal-packages/run-engine && pnpm run test --run +``` + +Expected: PASS. This takes a while (testcontainers). If any test fails, STOP and investigate using the systematic-debugging skill — do not paper over it. + +- [ ] **Step 2: Typecheck the webapp (consumes the run-engine package)** + +```bash +cd /path/to/repo && pnpm run typecheck --filter webapp +``` + +Expected: PASS (the public method `completeWaitpoint` kept its exact signature, so no webapp changes should be needed). + +- [ ] **Step 3: Add the server-changes file** + +This PR touches only an internal package consumed by the server — no public package changes, so a `.server-changes/` file (not a changeset) is required. Create `.server-changes/transactional-run-completion.md`: + +```markdown +--- +area: webapp +type: improvement +--- + +Reduce database load during traffic spikes by completing runs and resuming waiting runs in single atomic transactions +``` + +- [ ] **Step 4: Commit** + +```bash +git add .server-changes/transactional-run-completion.md +git commit -m "chore: add server-changes entry for transactional run completion" +``` + +- [ ] **Step 5: Verify the diff is clean and report** + +```bash +git log --oneline main..HEAD +git diff main --stat +``` + +Expected: 4 commits, changes confined to `waitpointSystem.ts`, `runAttemptSystem.ts`, `atomicCompletion.test.ts`, the `.server-changes/` file, and this plan document. + +--- + +## Risks the implementer must watch for + +1. **Never move a Redis call or eventBus emit inside a `$transaction` callback.** That is the single forbidden move in this entire plan — it would publish state that can still roll back, recreating (in miniature) the async-commit hazard this approach exists to avoid. +2. **Transaction timeout:** the new transactions contain 2–4 statements and use Prisma's 5s default. If `atomicCompletion` tests show P2028 timeouts under normal (unlocked) conditions, something is holding the tx open — find it, don't raise the timeout. +3. **Flaky txid counts:** if the commit-count tests are intermittently off-by-one, another engine subsystem is writing concurrently. Identify it via `pg_stat_activity` in the test rather than loosening the assertion to `<=`. +4. **Prisma `$queryRaw` bigint mapping:** `txid_current()` comes back as `bigint` — if the helper's `BigInt(rows[0].txid)` throws on the returned type, log the raw row and adjust the cast; do not switch to a different counting mechanism. + +## Expected production impact (for the PR description) + +Engine-side write commits drop ~2× on the completion path and ~2× on the continuation path (the two hottest sequences after trigger/dequeue). Based on production Performance Insights (2026-06-10): cluster commit rate ~21–31k/sec with `IO:XactSync` at ~59% of spike DB load; this change should cut total commit rate by roughly a quarter at baseline and compress top-of-hour AAS spikes. Bonus: completion and continuation become atomic — a crash can no longer leave a finished run with a still-blocked parent. From 36c6a3f127bd7d00702cce2fb96f2c61ddde3fb7 Mon Sep 17 00:00:00 2001 From: Daniel Sutton Date: Wed, 10 Jun 2026 14:32:56 +0100 Subject: [PATCH 11/16] fix(run-engine): log snapshot ids instead of full objects on the unblock path --- .../run-engine/src/engine/systems/waitpointSystem.ts | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts b/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts index f65f2a4c117..4dd1ffa4205 100644 --- a/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts @@ -905,12 +905,15 @@ export class WaitpointSystem { completedWaitpoints: completedWaitpointArgs, }); + // Log IDs only — full snapshots include completedWaitpoint outputs, and this + // runs at debug level in prod where cloning them would burn event-loop time. this.$.logger.debug( `continueRunIfUnblocked: run was still executing, sending notification`, { runId, - snapshot, - newSnapshot, + previousSnapshotId: snapshot.id, + newSnapshotId: newSnapshot.id, + completedWaitpointCount: blockingWaitpoints.length, } ); From f9974dd6e20fa7fcd096181159fb802236606cec Mon Sep 17 00:00:00 2001 From: Daniel Sutton Date: Wed, 10 Jun 2026 14:35:38 +0100 Subject: [PATCH 12/16] Revert "fix(run-engine): log snapshot ids instead of full objects on the unblock path" This reverts commit 36c6a3f127bd7d00702cce2fb96f2c61ddde3fb7. --- .../run-engine/src/engine/systems/waitpointSystem.ts | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts b/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts index 4dd1ffa4205..f65f2a4c117 100644 --- a/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts @@ -905,15 +905,12 @@ export class WaitpointSystem { completedWaitpoints: completedWaitpointArgs, }); - // Log IDs only — full snapshots include completedWaitpoint outputs, and this - // runs at debug level in prod where cloning them would burn event-loop time. this.$.logger.debug( `continueRunIfUnblocked: run was still executing, sending notification`, { runId, - previousSnapshotId: snapshot.id, - newSnapshotId: newSnapshot.id, - completedWaitpointCount: blockingWaitpoints.length, + snapshot, + newSnapshot, } ); From 3a89556940e5d65f0da4787d87a28086e666e76c Mon Sep 17 00:00:00 2001 From: Daniel Sutton Date: Wed, 10 Jun 2026 14:55:03 +0100 Subject: [PATCH 13/16] docs(run-engine): explain why the SUSPENDED unblock path stays non-transactional --- .../run-engine/src/engine/systems/waitpointSystem.ts | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts b/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts index f65f2a4c117..2b02173fcee 100644 --- a/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts @@ -925,6 +925,11 @@ export class WaitpointSystem { waitpoints: blockingWaitpoints.map((w) => w.waitpoint), }; } + // Unlike EXECUTING_WITH_WAITPOINTS above, this case is deliberately NOT + // wrapped in a single transaction: enqueueRun performs Redis queue work + // internally, which must never run inside an open Postgres transaction. + // Its snapshot insert and the post-switch waitpoint deletion below stay + // separate statements, as they were before the transactional refactor. case "SUSPENDED": { if (!snapshot.checkpointId) { // A run canceled mid-suspend has its checkpoint cleared by the @@ -980,6 +985,8 @@ export class WaitpointSystem { } } + // Only the SUSPENDED case reaches here — EXECUTING_WITH_WAITPOINTS returns + // above after deleting its waitpoints inside its transaction. if (blockingWaitpoints.length > 0) { //5. Remove the blocking waitpoints await this.$.prisma.taskRunWaitpoint.deleteMany({ From 485147d601cff1b36b9a8b8a67245b76ab237ae3 Mon Sep 17 00:00:00 2001 From: Daniel Sutton Date: Wed, 10 Jun 2026 14:57:47 +0100 Subject: [PATCH 14/16] chore: drop implementation plan doc from the branch --- ...6-06-10-phase1-transactional-completion.md | 980 ------------------ 1 file changed, 980 deletions(-) delete mode 100644 docs/superpowers/plans/2026-06-10-phase1-transactional-completion.md diff --git a/docs/superpowers/plans/2026-06-10-phase1-transactional-completion.md b/docs/superpowers/plans/2026-06-10-phase1-transactional-completion.md deleted file mode 100644 index d2c4fab7a8f..00000000000 --- a/docs/superpowers/plans/2026-06-10-phase1-transactional-completion.md +++ /dev/null @@ -1,980 +0,0 @@ -# Phase 1: Transactional Run Completion Implementation Plan - -> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. - -**Goal:** Halve the Run Engine's Postgres commit count on the run-completion and run-continuation paths by wrapping their currently-separate single-row writes into single transactions, making each state transition atomic. - -**Architecture:** Production Performance Insights shows ~60% of DB load during AAS storms is `IO:XactSync` (Aurora commit sync) caused by the engine committing a separate tiny transaction per write. Two hot paths each issue 2 write-commits that can be 1: `attemptSucceeded` (TaskRun update+snapshot, then waitpoint completion) and `continueRunIfUnblocked` (snapshot insert, then TaskRunWaitpoint delete). We wrap each pair in one `$transaction` using the exact idiom already used by `startRunAttempt` (`runAttemptSystem.ts:397`). Redis operations and eventBus emissions MUST stay outside/after the transaction — a side effect firing before COMMIT would act on state that can roll back. `completeWaitpoint` is therefore split into a tx-able mutation phase and a post-commit side-effect phase. - -**Tech Stack:** TypeScript, Prisma 6 interactive transactions (`$transaction` helper from `@trigger.dev/database`), vitest + `@internal/testcontainers` (real Postgres + Redis — never mock, per repo CLAUDE.md). - -**Out of scope (follow-up PRs):** the failure path (`attemptFailed` / `#permanentlyFailRun`), the cancellation path (`cancelRun`), and cross-run write coalescing (Phase 2). One change at a time. - -**Verification baseline:** the run-engine test suite must be green before starting. All commands below run from `internal-packages/run-engine/` unless stated otherwise. - ---- - -## File Structure - -- Modify: `internal-packages/run-engine/src/engine/systems/waitpointSystem.ts` - - Split `completeWaitpoint` (lines 70–174) into `completeWaitpointMutation` (PG-only, tx-able) + `scheduleWaitpointContinuations` (post-commit Redis/event side effects), with `completeWaitpoint` preserved as the composition of both (7 existing callers unchanged). - - Wrap the `EXECUTING_WITH_WAITPOINTS` case of `continueRunIfUnblocked` (lines ~795–912) in one transaction. -- Modify: `internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts` - - Wrap `attemptSucceeded`'s TaskRun update + waitpoint completion (lines ~741–817) in one transaction. -- Create: `internal-packages/run-engine/src/engine/tests/atomicCompletion.test.ts` - - Commit-count regression tests (via `txid_current()` deltas) + rollback-atomicity test (via a held row lock — real DB fault injection, no mocks). -- Create: `.server-changes/transactional-run-completion.md` - ---- - -### Task 1: Confirm baseline is green - -**Files:** none modified. - -- [ ] **Step 1: Build dependencies and run the affected test files** - -```bash -cd /path/to/repo -pnpm run build --filter @internal/run-engine -cd internal-packages/run-engine -pnpm run test ./src/engine/tests/triggerAndWait.test.ts --run -pnpm run test ./src/engine/tests/waitpoints.test.ts --run -pnpm run test ./src/engine/tests/trigger.test.ts --run -``` - -Expected: PASS. If anything fails here, STOP — the baseline is broken and must be reported before any changes. - ---- - -### Task 2: Split `completeWaitpoint` into mutation + side-effect phases - -**Files:** -- Modify: `internal-packages/run-engine/src/engine/systems/waitpointSystem.ts:70-174` - -This is a behavior-preserving refactor. Existing tests are the safety net; no new test is written first. - -- [ ] **Step 1: Add the result type and split the method** - -In `waitpointSystem.ts`, immediately above `export class WaitpointSystem` (line 42), add: - -```ts -export type CompletedWaitpointMutationResult = { - waitpoint: Waitpoint; - affectedTaskRuns: { - taskRunId: string; - spanIdToComplete: string | null; - createdAt: Date; - }[]; -}; -``` - -(`Waitpoint` is already imported in this file — it's the current return type of `completeWaitpoint`.) - -Replace the entire `completeWaitpoint` method (lines 70–174, from the `/** This completes a waitpoint...` comment through the closing `}` after `return waitpoint;`) with these three methods: - -```ts - /** This completes a waitpoint and updates all entries so the run isn't blocked, - * if they're no longer blocked. This doesn't suffer from race conditions. */ - async completeWaitpoint({ - id, - output, - }: { - id: string; - output?: { - value: string; - type?: string; - isError: boolean; - }; - }): Promise { - const result = await this.completeWaitpointMutation({ id, output }); - await this.scheduleWaitpointContinuations({ ...result, output }); - return result.waitpoint; - } - - /** Marks the waitpoint COMPLETED and returns the runs it was blocking. - * Pure Postgres mutation — safe to run inside a transaction via `tx`. - * Callers passing `tx` MUST call scheduleWaitpointContinuations() with the - * result AFTER the surrounding transaction commits, otherwise blocked runs - * are never continued. */ - async completeWaitpointMutation({ - id, - output, - tx, - }: { - id: string; - output?: { - value: string; - type?: string; - isError: boolean; - }; - tx?: PrismaClientOrTransaction; - }): Promise { - const prisma = tx ?? this.$.prisma; - - // 1. Complete the Waitpoint (if not completed) - const [updateError, updateResult] = await tryCatch( - prisma.waitpoint.updateMany({ - where: { id, status: "PENDING" }, - data: { - status: "COMPLETED", - completedAt: new Date(), - output: output?.value, - outputType: output?.type, - outputIsError: output?.isError, - }, - }) - ); - - if (updateError) { - this.$.logger.error("completeWaitpoint: error updating waitpoint:", { updateError }); - throw updateError; - } - - if (updateResult.count === 0) { - this.$.logger.info( - "completeWaitpoint: attempted to complete a waitpoint that is not PENDING", - { waitpointId: id } - ); - } - - const waitpoint = await prisma.waitpoint.findFirst({ - where: { id }, - }); - - if (!waitpoint) { - this.$.logger.error("completeWaitpoint: waitpoint not found", { waitpointId: id }); - throw new Error("Waitpoint not found"); - } - - if (waitpoint.status !== "COMPLETED") { - this.$.logger.error(`completeWaitpoint: waitpoint is not completed`, { - waitpointId: id, - }); - throw new Error("Waitpoint not completed"); - } - - // 2. Find the TaskRuns blocked by this waitpoint - const affectedTaskRuns = await prisma.taskRunWaitpoint.findMany({ - where: { waitpointId: id }, - select: { taskRunId: true, spanIdToComplete: true, createdAt: true }, - }); - - if (affectedTaskRuns.length === 0) { - this.$.logger.debug(`completeWaitpoint: no TaskRunWaitpoints found for waitpoint`, { - waitpointId: id, - }); - } - - return { waitpoint, affectedTaskRuns }; - } - - /** Post-commit side effects of completing a waitpoint: schedules continuation of - * the blocked runs and emits events. Must be called AFTER the mutation committed — - * never from inside a transaction. */ - async scheduleWaitpointContinuations({ - waitpoint, - affectedTaskRuns, - output, - }: CompletedWaitpointMutationResult & { - output?: { - value: string; - type?: string; - isError: boolean; - }; - }): Promise { - // 3. Schedule trying to continue the runs - for (const run of affectedTaskRuns) { - const jobId = `continueRunIfUnblocked:${run.taskRunId}`; - //50ms in the future - const availableAt = new Date(Date.now() + 50); - - this.$.logger.debug(`completeWaitpoint: enqueueing continueRunIfUnblocked`, { - waitpointId: waitpoint.id, - runId: run.taskRunId, - jobId, - availableAt, - }); - - await this.$.worker.enqueue({ - //this will debounce the call - id: jobId, - job: "continueRunIfUnblocked", - payload: { runId: run.taskRunId }, - availableAt, - }); - - // emit an event to complete associated cached runs - if (run.spanIdToComplete) { - this.$.eventBus.emit("cachedRunCompleted", { - time: new Date(), - span: { - id: run.spanIdToComplete, - createdAt: run.createdAt, - }, - blockedRunId: run.taskRunId, - hasError: output?.isError ?? false, - cachedRunId: waitpoint.completedByTaskRunId ?? undefined, - }); - } - } - } -``` - -Note the one intentional difference from the original: the debug log inside the loop uses `waitpointId: waitpoint.id` instead of the old closure variable `id`. Everything else is a verbatim move. - -- [ ] **Step 2: Typecheck** - -```bash -cd internal-packages/run-engine && pnpm run typecheck -``` - -Expected: PASS. (`PrismaClientOrTransaction` and `tryCatch` are already imported in this file.) - -- [ ] **Step 3: Run the waitpoint-related tests to confirm no behavior change** - -```bash -pnpm run test ./src/engine/tests/waitpoints.test.ts --run -pnpm run test ./src/engine/tests/triggerAndWait.test.ts --run -``` - -Expected: PASS. - -- [ ] **Step 4: Commit** - -```bash -git add internal-packages/run-engine/src/engine/systems/waitpointSystem.ts -git commit -m "refactor(run-engine): split completeWaitpoint into tx-able mutation and post-commit side effects" -``` - ---- - -### Task 3: Write the failing tests for transactional completion - -**Files:** -- Create: `internal-packages/run-engine/src/engine/tests/atomicCompletion.test.ts` - -Both tests use the parent + `triggerAndWait` child setup (only child runs created with `resumeParentOnCompletion: true` get an `associatedWaitpoint`, so only they exercise the two-commit completion path). - -- [ ] **Step 1: Write the test file** - -```ts -import { containerTest, assertNonNullable } from "@internal/testcontainers"; -import { trace } from "@internal/tracing"; -import { expect } from "vitest"; -import { setTimeout } from "timers/promises"; -import { PrismaClient } from "@trigger.dev/database"; -import { RunEngine } from "../index.js"; -import { setupAuthenticatedEnvironment, setupBackgroundWorker } from "./setup.js"; - -vi.setConfig({ testTimeout: 60_000 }); - -function createEngine(prisma: PrismaClient, redisOptions: any) { - return new RunEngine({ - prisma, - worker: { - redis: redisOptions, - workers: 1, - tasksPerWorker: 10, - pollIntervalMs: 100, - }, - queue: { - redis: redisOptions, - masterQueueConsumersDisabled: true, - processWorkerQueueDebounceMs: 50, - }, - runLock: { - redis: redisOptions, - }, - machines: { - defaultMachine: "small-1x", - machines: { - "small-1x": { - name: "small-1x" as const, - cpu: 0.5, - memory: 0.5, - centsPerMs: 0.0001, - }, - }, - baseCostInCents: 0.0001, - }, - tracer: trace.getTracer("test", "0.0.0"), - }); -} - -/** Number of xid-consuming (i.e. write) transactions is the delta between two - * txid_current() calls, minus 1 for the second call itself. Read-only - * transactions never allocate an xid, so they don't count. */ -async function currentTxid(prisma: PrismaClient): Promise { - const rows = await prisma.$queryRaw<{ txid: bigint }[]>`SELECT txid_current() AS txid`; - return BigInt(rows[0].txid); -} - -/** Drives a parent run + triggerAndWait child up to the point where the child - * is EXECUTING and the parent is blocked on the child's associated waitpoint. */ -async function setupBlockedParentWithExecutingChild( - engine: RunEngine, - prisma: PrismaClient, - authenticatedEnvironment: Awaited> -) { - const parentTask = "parent-task"; - const childTask = "child-task"; - - await setupBackgroundWorker(engine, authenticatedEnvironment, [parentTask, childTask]); - - const parentRun = await engine.trigger( - { - number: 1, - friendlyId: "run_p1234", - environment: authenticatedEnvironment, - taskIdentifier: parentTask, - payload: "{}", - payloadType: "application/json", - context: {}, - traceContext: {}, - traceId: "t12345", - spanId: "s12345", - queue: `task/${parentTask}`, - isTest: false, - tags: [], - workerQueue: "main", - }, - prisma - ); - - await setTimeout(500); - const dequeuedParent = await engine.dequeueFromWorkerQueue({ - consumerId: "test_12345", - workerQueue: "main", - }); - expect(dequeuedParent.length).toBe(1); - - await engine.startRunAttempt({ - runId: parentRun.id, - snapshotId: dequeuedParent[0].snapshot.id, - }); - - const childRun = await engine.trigger( - { - number: 1, - friendlyId: "run_c1234", - environment: authenticatedEnvironment, - taskIdentifier: childTask, - payload: "{}", - payloadType: "application/json", - context: {}, - traceContext: {}, - traceId: "t12345", - spanId: "s12345", - queue: `task/${childTask}`, - isTest: false, - tags: [], - resumeParentOnCompletion: true, - parentTaskRunId: parentRun.id, - workerQueue: "main", - }, - prisma - ); - - await setTimeout(500); - const dequeuedChild = await engine.dequeueFromWorkerQueue({ - consumerId: "test_12345", - workerQueue: "main", - }); - expect(dequeuedChild.length).toBe(1); - - const childAttempt = await engine.startRunAttempt({ - runId: childRun.id, - snapshotId: dequeuedChild[0].snapshot.id, - }); - - const childWithWaitpoint = await prisma.taskRun.findFirstOrThrow({ - where: { id: childRun.id }, - include: { associatedWaitpoint: true }, - }); - assertNonNullable(childWithWaitpoint.associatedWaitpoint); - - return { - parentRun, - childRun, - childAttempt, - waitpointId: childWithWaitpoint.associatedWaitpoint.id, - }; -} - -describe("RunEngine atomic completion", () => { - containerTest( - "attemptSucceeded with an associated waitpoint is a single write transaction", - async ({ prisma, redisOptions }) => { - const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); - const engine = createEngine(prisma, redisOptions); - - try { - const { childRun, childAttempt } = await setupBlockedParentWithExecutingChild( - engine, - prisma, - authenticatedEnvironment - ); - - const before = await currentTxid(prisma); - - const result = await engine.completeRunAttempt({ - runId: childRun.id, - snapshotId: childAttempt.snapshot.id, - completion: { - ok: true, - id: childRun.id, - output: `{"foo":"bar"}`, - outputType: "application/json", - }, - }); - - const after = await currentTxid(prisma); - - expect(result.attemptStatus).toBe("RUN_FINISHED"); - - // TaskRun update (+ nested snapshot) and waitpoint completion - // must share one commit. - const writeTransactions = Number(after - before) - 1; - expect(writeTransactions).toBe(1); - } finally { - await engine.quit(); - } - } - ); - - containerTest( - "continueRunIfUnblocked is a single write transaction", - async ({ prisma, redisOptions }) => { - const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); - const engine = createEngine(prisma, redisOptions); - - try { - const { parentRun, childRun, childAttempt } = await setupBlockedParentWithExecutingChild( - engine, - prisma, - authenticatedEnvironment - ); - - await engine.completeRunAttempt({ - runId: childRun.id, - snapshotId: childAttempt.snapshot.id, - completion: { - ok: true, - id: childRun.id, - output: `{"foo":"bar"}`, - outputType: "application/json", - }, - }); - - const before = await currentTxid(prisma); - - // The continueRunIfUnblocked job is debounced by 50ms; poll until the - // parent has been continued (EXECUTING_WITH_WAITPOINTS -> EXECUTING). - let continued = false; - for (let i = 0; i < 50; i++) { - await setTimeout(100); - const parentData = await engine.getRunExecutionData({ runId: parentRun.id }); - if (parentData?.snapshot.executionStatus === "EXECUTING") { - continued = true; - break; - } - } - expect(continued).toBe(true); - - const after = await currentTxid(prisma); - - // Snapshot insert and TaskRunWaitpoint delete must share one commit. - const writeTransactions = Number(after - before) - 1; - expect(writeTransactions).toBe(1); - - const remainingBlockers = await prisma.taskRunWaitpoint.findMany({ - where: { taskRunId: parentRun.id }, - }); - expect(remainingBlockers.length).toBe(0); - } finally { - await engine.quit(); - } - } - ); - - containerTest( - "a failed completion rolls back the whole transition", - async ({ prisma, redisOptions }) => { - const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); - const engine = createEngine(prisma, redisOptions); - - try { - const { childRun, childAttempt, waitpointId } = await setupBlockedParentWithExecutingChild( - engine, - prisma, - authenticatedEnvironment - ); - - // Real fault injection, no mocks: hold a row lock on the child's - // associated waitpoint so the waitpoint completion inside - // attemptSucceeded blocks past the transaction timeout (5s default) - // and the whole transaction aborts. - const lockHolder = prisma.$transaction( - async (tx) => { - await tx.$queryRaw`SELECT "id" FROM "Waitpoint" WHERE "id" = ${waitpointId} FOR UPDATE`; - await setTimeout(8_000); - }, - { timeout: 20_000, maxWait: 5_000 } - ); - await setTimeout(300); // let the lock land - - let completionError: unknown; - try { - await engine.completeRunAttempt({ - runId: childRun.id, - snapshotId: childAttempt.snapshot.id, - completion: { - ok: true, - id: childRun.id, - output: `{"foo":"bar"}`, - outputType: "application/json", - }, - }); - } catch (error) { - completionError = error; - } - - await lockHolder; - - // The completion must have failed while the lock was held... - expect(completionError).toBeDefined(); - - // ...and NOTHING from the transition may have been committed: - // run still EXECUTING, waitpoint still PENDING, no FINISHED snapshot. - const childAfter = await prisma.taskRun.findFirstOrThrow({ - where: { id: childRun.id }, - }); - expect(childAfter.status).toBe("EXECUTING"); - - const waitpointAfter = await prisma.waitpoint.findFirstOrThrow({ - where: { id: waitpointId }, - }); - expect(waitpointAfter.status).toBe("PENDING"); - - const finishedSnapshots = await prisma.taskRunExecutionSnapshot.findMany({ - where: { runId: childRun.id, executionStatus: "FINISHED" }, - }); - expect(finishedSnapshots.length).toBe(0); - } finally { - await engine.quit(); - } - } - ); -}); -``` - -Note: `setupBackgroundWorker` accepts `string | string[]` for the task identifier (`setup.ts:87`), so the array call above is correct. - -- [ ] **Step 2: Run the new tests to verify they FAIL for the right reasons** - -```bash -pnpm run test ./src/engine/tests/atomicCompletion.test.ts --run -``` - -Expected: all three FAIL: -- Test 1: `writeTransactions` is 2 (TaskRun update commit + waitpoint updateMany commit), expected 1. -- Test 2: `writeTransactions` is 2 (snapshot insert commit + TaskRunWaitpoint delete commit), expected 1. -- Test 3: `completionError` is undefined (today the standalone `updateMany` has no transaction timeout — it just waits 8s for the lock and then succeeds) AND `childAfter.status` is `COMPLETED_SUCCESSFULLY` (the TaskRun update committed before the waitpoint write — the torn state this change eliminates). - -If a test fails for a different reason (setup error, wrong call shape), fix the test until the failures are exactly these. - -- [ ] **Step 3: Commit the failing tests** - -```bash -git add internal-packages/run-engine/src/engine/tests/atomicCompletion.test.ts -git commit -m "test(run-engine): add failing tests for atomic single-commit run completion" -``` - ---- - -### Task 4: Wrap `attemptSucceeded` in a single transaction - -**Files:** -- Modify: `internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts:741-817` - -- [ ] **Step 1: Replace the separate update + completeWaitpoint with one $transaction** - -In `attemptSucceeded`, replace the block from `const run = await prisma.taskRun.update({` (line 741) through the end of the `if (run.associatedWaitpoint) { ... }` block (line 817) with: - -```ts - const completedOutput = completion.output - ? { value: completion.output, type: completion.outputType, isError: false } - : undefined; - - const txResult = await $transaction( - prisma, - async (tx) => { - const run = await tx.taskRun.update({ - where: { id: runId }, - data: { - status: "COMPLETED_SUCCESSFULLY", - completedAt, - output: completion.output, - outputType: completion.outputType, - usageDurationMs: updatedUsage.usageDurationMs, - costInCents: updatedUsage.costInCents, - executionSnapshots: { - create: { - executionStatus: "FINISHED", - description: "Task completed successfully", - runStatus: "COMPLETED_SUCCESSFULLY", - attemptNumber: latestSnapshot.attemptNumber, - environmentId: latestSnapshot.environmentId, - environmentType: latestSnapshot.environmentType, - projectId: latestSnapshot.projectId, - organizationId: latestSnapshot.organizationId, - workerId, - runnerId, - }, - }, - }, - select: { - id: true, - friendlyId: true, - status: true, - attemptNumber: true, - spanId: true, - updatedAt: true, - associatedWaitpoint: { - select: { - id: true, - }, - }, - project: { - select: { - organizationId: true, - }, - }, - batchId: true, - createdAt: true, - completedAt: true, - taskEventStore: true, - parentTaskRunId: true, - usageDurationMs: true, - costInCents: true, - runtimeEnvironmentId: true, - projectId: true, - }, - }); - - // Complete the waitpoint if it exists (runs without waiting parents - // have no waitpoint). Side effects (continuation jobs, events) are - // scheduled after this transaction commits. - const completedWaitpoint = run.associatedWaitpoint - ? await this.waitpointSystem.completeWaitpointMutation({ - id: run.associatedWaitpoint.id, - output: completedOutput, - tx, - }) - : undefined; - - return { run, completedWaitpoint }; - }, - (error) => { - this.$.logger.error("RunEngine.attemptSucceeded(): prisma.$transaction error", { - code: error.code, - meta: error.meta, - stack: error.stack, - message: error.message, - name: error.name, - }); - throw new ServiceValidationError( - "Failed to complete task run and associated waitpoint", - 500 - ); - } - ); - - if (!txResult) { - throw new ServiceValidationError("Failed to complete task run attempt", 500); - } - - const { run, completedWaitpoint } = txResult; - - const newSnapshot = await getLatestExecutionSnapshot(prisma, runId); - - await this.$.runQueue.acknowledgeMessage(run.project.organizationId, runId); - - // We need to manually emit this as we created the final snapshot as part of the task run update - this.$.eventBus.emit("executionSnapshotCreated", { - time: newSnapshot.createdAt, - run: { - id: newSnapshot.runId, - }, - snapshot: { - ...newSnapshot, - completedWaitpointIds: newSnapshot.completedWaitpoints.map((wp) => wp.id), - }, - }); - - // Post-commit side effects of the waitpoint completion - if (completedWaitpoint) { - await this.waitpointSystem.scheduleWaitpointContinuations({ - ...completedWaitpoint, - output: completedOutput, - }); - } -``` - -The subsequent `this.$.eventBus.emit("runSucceeded", ...)`, `await this.#finalizeRun(run);`, and the `return { attemptStatus: "RUN_FINISHED", snapshot: newSnapshot, run };` (lines 819–852) stay exactly as they are. - -Two things removed relative to the old code, deliberately: -- The old `await this.$.runQueue.acknowledgeMessage(...)` / `executionSnapshotCreated` emit that sat *between* the update and the waitpoint completion now run after the transaction (same relative order to each other). -- The old direct `completeWaitpoint` call is replaced by mutation-inside-tx + side-effects-after-tx. - -`$transaction` is already imported in this file (line 38). `ServiceValidationError` is already imported (line 44). - -- [ ] **Step 2: Typecheck** - -```bash -cd internal-packages/run-engine && pnpm run typecheck -``` - -Expected: PASS. - -- [ ] **Step 3: Run the new tests — completion tests must now pass** - -```bash -pnpm run test ./src/engine/tests/atomicCompletion.test.ts --run -``` - -Expected: Test 1 ("single write transaction") PASS. Test 3 ("rolls back the whole transition") PASS. Test 2 ("continueRunIfUnblocked") still FAILS (that's Task 5). - -- [ ] **Step 4: Run the existing lifecycle tests** - -```bash -pnpm run test ./src/engine/tests/trigger.test.ts --run -pnpm run test ./src/engine/tests/triggerAndWait.test.ts --run -pnpm run test ./src/engine/tests/waitpoints.test.ts --run -pnpm run test ./src/engine/tests/batchTriggerAndWait.test.ts --run -``` - -Expected: PASS. - -- [ ] **Step 5: Commit** - -```bash -git add internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts -git commit -m "feat(run-engine): complete run and waitpoint in a single transaction" -``` - ---- - -### Task 5: Wrap `continueRunIfUnblocked`'s unblock in a single transaction - -**Files:** -- Modify: `internal-packages/run-engine/src/engine/systems/waitpointSystem.ts` (the `EXECUTING_WITH_WAITPOINTS` case, lines ~795–837, and imports) - -- [ ] **Step 1: Add `$transaction` to the imports** - -In the import block from `"@trigger.dev/database"` at the top of `waitpointSystem.ts`, add `$transaction`: - -```ts -import { - $transaction, - Prisma, - PrismaClientOrTransaction, - TaskRun, - Waitpoint, -} from "@trigger.dev/database"; -``` - -(Keep whatever names are already in that import — just add `$transaction` to them.) - -- [ ] **Step 2: Replace the `EXECUTING_WITH_WAITPOINTS` case** - -Replace the entire `case "EXECUTING_WITH_WAITPOINTS": { ... break; }` block with: - -```ts - case "EXECUTING_WITH_WAITPOINTS": { - const newSnapshot = await $transaction( - this.$.prisma, - async (tx) => { - const createdSnapshot = await this.executionSnapshotSystem.createExecutionSnapshot( - tx, - { - run: { - id: runId, - status: snapshot.runStatus, - attemptNumber: snapshot.attemptNumber, - }, - snapshot: { - executionStatus: "EXECUTING", - description: "Run was continued, whilst still executing.", - }, - previousSnapshotId: snapshot.id, - environmentId: snapshot.environmentId, - environmentType: snapshot.environmentType, - projectId: snapshot.projectId, - organizationId: snapshot.organizationId, - batchId: snapshot.batchId ?? undefined, - completedWaitpoints: blockingWaitpoints.map((b) => ({ - id: b.waitpoint.id, - index: b.batchIndex ?? undefined, - })), - } - ); - - // Remove the blocking waitpoints in the same transaction, so the - // new snapshot and the unblock are atomic. - if (blockingWaitpoints.length > 0) { - await tx.taskRunWaitpoint.deleteMany({ - where: { - taskRunId: runId, - id: { in: blockingWaitpoints.map((b) => b.id) }, - }, - }); - } - - return createdSnapshot; - }, - (error) => { - this.$.logger.error("continueRunIfUnblocked: prisma.$transaction error", { - code: error.code, - meta: error.meta, - message: error.message, - runId, - }); - } - ); - - if (!newSnapshot) { - throw new Error(`continueRunIfUnblocked: failed to unblock run: ${runId}`); - } - - this.$.logger.debug( - `continueRunIfUnblocked: run was still executing, sending notification`, - { - runId, - snapshot, - newSnapshot, - } - ); - - await sendNotificationToWorker({ - runId, - snapshot: newSnapshot, - eventBus: this.$.eventBus, - }); - - this.$.logger.debug(`continueRunIfUnblocked: removed blocking waitpoints`, { - runId, - blockingWaitpoints, - }); - - return { - status: "unblocked", - waitpoints: blockingWaitpoints.map((w) => w.waitpoint), - }; - } -``` - -This case now returns directly instead of `break`ing, so the post-switch `if (blockingWaitpoints.length > 0) { deleteMany ... }` block (lines ~893–906) no longer runs for it — that block now only serves the `SUSPENDED` case, which is deliberately left unchanged in this PR (its `enqueueRun` performs Redis work internally and must not be pulled into a Postgres transaction). - -Note the ordering improvement: the worker notification now fires after the unblock is durable, instead of before the waitpoint rows were deleted. - -- [ ] **Step 3: Typecheck** - -```bash -cd internal-packages/run-engine && pnpm run typecheck -``` - -Expected: PASS. - -- [ ] **Step 4: Run the new tests — all three must now pass** - -```bash -pnpm run test ./src/engine/tests/atomicCompletion.test.ts --run -``` - -Expected: PASS (all 3). - -- [ ] **Step 5: Run the waitpoint/continuation-heavy existing tests** - -```bash -pnpm run test ./src/engine/tests/triggerAndWait.test.ts --run -pnpm run test ./src/engine/tests/waitpoints.test.ts --run -pnpm run test ./src/engine/tests/waitpointRace.test.ts --run -pnpm run test ./src/engine/tests/batchTriggerAndWait.test.ts --run -pnpm run test ./src/engine/tests/lazyWaitpoint.test.ts --run -pnpm run test ./src/engine/tests/checkpoints.test.ts --run -``` - -Expected: PASS. (`checkpoints.test.ts` exercises the SUSPENDED path — it must be unaffected.) - -- [ ] **Step 6: Commit** - -```bash -git add internal-packages/run-engine/src/engine/systems/waitpointSystem.ts -git commit -m "feat(run-engine): unblock continued runs in a single transaction" -``` - ---- - -### Task 6: Full suite, server-changes file, final verification - -**Files:** -- Create: `.server-changes/transactional-run-completion.md` - -- [ ] **Step 1: Run the full run-engine test suite** - -```bash -cd internal-packages/run-engine && pnpm run test --run -``` - -Expected: PASS. This takes a while (testcontainers). If any test fails, STOP and investigate using the systematic-debugging skill — do not paper over it. - -- [ ] **Step 2: Typecheck the webapp (consumes the run-engine package)** - -```bash -cd /path/to/repo && pnpm run typecheck --filter webapp -``` - -Expected: PASS (the public method `completeWaitpoint` kept its exact signature, so no webapp changes should be needed). - -- [ ] **Step 3: Add the server-changes file** - -This PR touches only an internal package consumed by the server — no public package changes, so a `.server-changes/` file (not a changeset) is required. Create `.server-changes/transactional-run-completion.md`: - -```markdown ---- -area: webapp -type: improvement ---- - -Reduce database load during traffic spikes by completing runs and resuming waiting runs in single atomic transactions -``` - -- [ ] **Step 4: Commit** - -```bash -git add .server-changes/transactional-run-completion.md -git commit -m "chore: add server-changes entry for transactional run completion" -``` - -- [ ] **Step 5: Verify the diff is clean and report** - -```bash -git log --oneline main..HEAD -git diff main --stat -``` - -Expected: 4 commits, changes confined to `waitpointSystem.ts`, `runAttemptSystem.ts`, `atomicCompletion.test.ts`, the `.server-changes/` file, and this plan document. - ---- - -## Risks the implementer must watch for - -1. **Never move a Redis call or eventBus emit inside a `$transaction` callback.** That is the single forbidden move in this entire plan — it would publish state that can still roll back, recreating (in miniature) the async-commit hazard this approach exists to avoid. -2. **Transaction timeout:** the new transactions contain 2–4 statements and use Prisma's 5s default. If `atomicCompletion` tests show P2028 timeouts under normal (unlocked) conditions, something is holding the tx open — find it, don't raise the timeout. -3. **Flaky txid counts:** if the commit-count tests are intermittently off-by-one, another engine subsystem is writing concurrently. Identify it via `pg_stat_activity` in the test rather than loosening the assertion to `<=`. -4. **Prisma `$queryRaw` bigint mapping:** `txid_current()` comes back as `bigint` — if the helper's `BigInt(rows[0].txid)` throws on the returned type, log the raw row and adjust the cast; do not switch to a different counting mechanism. - -## Expected production impact (for the PR description) - -Engine-side write commits drop ~2× on the completion path and ~2× on the continuation path (the two hottest sequences after trigger/dequeue). Based on production Performance Insights (2026-06-10): cluster commit rate ~21–31k/sec with `IO:XactSync` at ~59% of spike DB load; this change should cut total commit rate by roughly a quarter at baseline and compress top-of-hour AAS spikes. Bonus: completion and continuation become atomic — a crash can no longer leave a finished run with a still-blocked parent. From 92277a58f746ae3c177ebf2c098b1d6a9679b3cb Mon Sep 17 00:00:00 2001 From: Daniel Sutton Date: Wed, 10 Jun 2026 15:05:29 +0100 Subject: [PATCH 15/16] docs(run-engine): trim narration and history references from comments --- .../src/engine/systems/executionSnapshotSystem.ts | 8 +++----- .../run-engine/src/engine/systems/runAttemptSystem.ts | 1 - .../run-engine/src/engine/systems/waitpointSystem.ts | 8 ++------ 3 files changed, 5 insertions(+), 12 deletions(-) diff --git a/internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts b/internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts index 4f4622bcac7..8498056575c 100644 --- a/internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts @@ -327,9 +327,8 @@ export class ExecutionSnapshotSystem { } /** - * Pure Postgres mutation — safe inside a transaction. Inserts the snapshot row and - * returns the enhanced result. Direct callers MUST call `scheduleSnapshotSideEffects()` - * with the result after the surrounding transaction commits. + * Pure Postgres mutation — safe inside a transaction. Direct callers MUST call + * `scheduleSnapshotSideEffects()` with the result after the surrounding transaction commits. */ public async createExecutionSnapshotMutation( prisma: PrismaClientOrTransaction, @@ -441,8 +440,7 @@ export class ExecutionSnapshotSystem { } } - // Destructure off the wrapper-only fields so the emitted payload matches the - // pre-split shape (raw snapshot row fields + completedWaitpointIds only). + // The emitted payload must contain only raw snapshot row fields (+ completedWaitpointIds). const { friendlyId: _fid, runFriendlyId: _rfid, ...snapshotRow } = snapshot; this.$.eventBus.emit("executionSnapshotCreated", { diff --git a/internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts b/internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts index eee1da3fee8..bae2dff4275 100644 --- a/internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts @@ -851,7 +851,6 @@ export class RunAttemptSystem { }, }); - // Post-commit side effects of the waitpoint completion if (completedWaitpoint) { await this.waitpointSystem.scheduleWaitpointContinuations(completedWaitpoint); } diff --git a/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts b/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts index 2b02173fcee..3f8b1d53b57 100644 --- a/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts @@ -864,8 +864,7 @@ export class WaitpointSystem { completedWaitpoints: completedWaitpointArgs, }); - // Remove the blocking waitpoints in the same transaction, so the - // new snapshot and the unblock are atomic. + // Deleted in the same transaction so the snapshot and unblock are atomic. if (blockingWaitpoints.length > 0) { await tx.taskRunWaitpoint.deleteMany({ where: { @@ -897,8 +896,7 @@ export class WaitpointSystem { throw new Error(`continueRunIfUnblocked: failed to unblock run: ${runId}`); } - // Schedule side effects (heartbeat + eventBus) AFTER the transaction has - // committed, so they always reference a durable snapshot row. + // Side effects must only run against a durably committed snapshot row. await this.executionSnapshotSystem.scheduleSnapshotSideEffects({ snapshot: newSnapshot, runId, @@ -928,8 +926,6 @@ export class WaitpointSystem { // Unlike EXECUTING_WITH_WAITPOINTS above, this case is deliberately NOT // wrapped in a single transaction: enqueueRun performs Redis queue work // internally, which must never run inside an open Postgres transaction. - // Its snapshot insert and the post-switch waitpoint deletion below stay - // separate statements, as they were before the transactional refactor. case "SUSPENDED": { if (!snapshot.checkpointId) { // A run canceled mid-suspend has its checkpoint cleared by the From fc20dc250e6106c3e89d883fdc982e5ab041ce5c Mon Sep 17 00:00:00 2001 From: Daniel Sutton Date: Wed, 10 Jun 2026 17:03:19 +0100 Subject: [PATCH 16/16] fix(run-engine): move snapshot side effects and ttl ack out of the start-attempt transaction --- .../src/engine/systems/runAttemptSystem.ts | 56 +++++++++++-------- 1 file changed, 33 insertions(+), 23 deletions(-) diff --git a/internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts b/internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts index bae2dff4275..dd69abd88b5 100644 --- a/internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts @@ -459,29 +459,27 @@ export class RunAttemptSystem { }, }); - const newSnapshot = await this.executionSnapshotSystem.createExecutionSnapshot(tx, { - run, - snapshot: { - executionStatus: "EXECUTING", - description: `Attempt created, starting execution${ - isWarmStart ? " (warm start)" : "" - }`, - }, - previousSnapshotId: latestSnapshot.id, - environmentId: latestSnapshot.environmentId, - environmentType: latestSnapshot.environmentType, - projectId: latestSnapshot.projectId, - organizationId: latestSnapshot.organizationId, - batchId: latestSnapshot.batchId ?? undefined, - completedWaitpoints: latestSnapshot.completedWaitpoints, - workerId, - runnerId, - }); - - if (taskRun.ttl) { - //don't expire the run, it's going to execute - await this.$.worker.ack(`expireRun:${taskRun.id}`); - } + const newSnapshot = await this.executionSnapshotSystem.createExecutionSnapshotMutation( + tx, + { + run, + snapshot: { + executionStatus: "EXECUTING", + description: `Attempt created, starting execution${ + isWarmStart ? " (warm start)" : "" + }`, + }, + previousSnapshotId: latestSnapshot.id, + environmentId: latestSnapshot.environmentId, + environmentType: latestSnapshot.environmentType, + projectId: latestSnapshot.projectId, + organizationId: latestSnapshot.organizationId, + batchId: latestSnapshot.batchId ?? undefined, + completedWaitpoints: latestSnapshot.completedWaitpoints, + workerId, + runnerId, + } + ); return { updatedRun: run, snapshot: newSnapshot }; }, @@ -510,6 +508,18 @@ export class RunAttemptSystem { const { updatedRun, snapshot } = result; + if (taskRun.ttl) { + //don't expire the run, it's going to execute + await this.$.worker.ack(`expireRun:${taskRun.id}`); + } + + // Side effects must only run against a durably committed snapshot row. + await this.executionSnapshotSystem.scheduleSnapshotSideEffects({ + snapshot, + runId: taskRun.id, + completedWaitpoints: latestSnapshot.completedWaitpoints, + }); + this.$.eventBus.emit("runAttemptStarted", { time: new Date(), run: {