diff --git a/.server-changes/transactional-run-completion.md b/.server-changes/transactional-run-completion.md new file mode 100644 index 00000000000..38f333d6652 --- /dev/null +++ b/.server-changes/transactional-run-completion.md @@ -0,0 +1,6 @@ +--- +area: webapp +type: improvement +--- + +Reduce database load during traffic spikes by completing runs and resuming waiting runs in single atomic transactions diff --git a/internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts b/internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts index d615c066b85..8498056575c 100644 --- a/internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts @@ -326,7 +326,11 @@ export class ExecutionSnapshotSystem { this.heartbeatTimeouts = options.heartbeatTimeouts; } - public async createExecutionSnapshot( + /** + * Pure Postgres mutation — safe inside a transaction. Direct callers MUST call + * `scheduleSnapshotSideEffects()` with the result after the surrounding transaction commits. + */ + public async createExecutionSnapshotMutation( prisma: PrismaClientOrTransaction, { run, @@ -399,35 +403,93 @@ export class ExecutionSnapshotSystem { }, }); + return { + ...newSnapshot, + friendlyId: SnapshotId.toFriendlyId(newSnapshot.id), + runFriendlyId: RunId.toFriendlyId(newSnapshot.runId), + }; + } + + /** + * Post-commit side effects for a newly created snapshot: arms the heartbeat job (if + * the execution status requires one) and emits the `executionSnapshotCreated` event. + * Never call this inside a transaction — these side effects must only run against a + * durable (committed) snapshot row. + */ + public async scheduleSnapshotSideEffects({ + snapshot, + runId, + error, + completedWaitpoints, + }: { + snapshot: Awaited>; + runId: string; + error?: string; + completedWaitpoints?: { id: string; index?: number }[]; + }) { if (!error) { //set heartbeat (if relevant) - const intervalMs = this.#getHeartbeatIntervalMs(newSnapshot.executionStatus); + const intervalMs = this.#getHeartbeatIntervalMs(snapshot.executionStatus); if (intervalMs !== null) { await this.$.worker.enqueue({ - id: `heartbeatSnapshot.${run.id}`, + id: `heartbeatSnapshot.${runId}`, job: "heartbeatSnapshot", - payload: { snapshotId: newSnapshot.id, runId: run.id }, + payload: { snapshotId: snapshot.id, runId }, availableAt: new Date(Date.now() + intervalMs), }); } } + // The emitted payload must contain only raw snapshot row fields (+ completedWaitpointIds). + const { friendlyId: _fid, runFriendlyId: _rfid, ...snapshotRow } = snapshot; + this.$.eventBus.emit("executionSnapshotCreated", { - time: newSnapshot.createdAt, + time: snapshot.createdAt, run: { - id: newSnapshot.runId, + id: snapshot.runId, }, snapshot: { - ...newSnapshot, + ...snapshotRow, completedWaitpointIds: completedWaitpoints?.map((w) => w.id) ?? [], }, }); + } - return { - ...newSnapshot, - friendlyId: SnapshotId.toFriendlyId(newSnapshot.id), - runFriendlyId: RunId.toFriendlyId(newSnapshot.runId), - }; + public async createExecutionSnapshot( + prisma: PrismaClientOrTransaction, + args: { + run: { id: string; status: TaskRunStatus; attemptNumber?: number | null }; + snapshot: { + executionStatus: TaskRunExecutionStatus; + description: string; + metadata?: Prisma.JsonValue; + }; + previousSnapshotId?: string; + batchId?: string; + environmentId: string; + environmentType: RuntimeEnvironmentType; + projectId: string; + organizationId: string; + checkpointId?: string; + workerId?: string; + runnerId?: string; + completedWaitpoints?: { + id: string; + index?: number; + }[]; + error?: string; + } + ) { + const newSnapshot = await this.createExecutionSnapshotMutation(prisma, args); + + await this.scheduleSnapshotSideEffects({ + snapshot: newSnapshot, + runId: args.run.id, + error: args.error, + completedWaitpoints: args.completedWaitpoints, + }); + + return newSnapshot; } public async heartbeatRun({ diff --git a/internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts b/internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts index 06c80f67f2c..dd69abd88b5 100644 --- a/internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts @@ -459,29 +459,27 @@ export class RunAttemptSystem { }, }); - const newSnapshot = await this.executionSnapshotSystem.createExecutionSnapshot(tx, { - run, - snapshot: { - executionStatus: "EXECUTING", - description: `Attempt created, starting execution${ - isWarmStart ? " (warm start)" : "" - }`, - }, - previousSnapshotId: latestSnapshot.id, - environmentId: latestSnapshot.environmentId, - environmentType: latestSnapshot.environmentType, - projectId: latestSnapshot.projectId, - organizationId: latestSnapshot.organizationId, - batchId: latestSnapshot.batchId ?? undefined, - completedWaitpoints: latestSnapshot.completedWaitpoints, - workerId, - runnerId, - }); - - if (taskRun.ttl) { - //don't expire the run, it's going to execute - await this.$.worker.ack(`expireRun:${taskRun.id}`); - } + const newSnapshot = await this.executionSnapshotSystem.createExecutionSnapshotMutation( + tx, + { + run, + snapshot: { + executionStatus: "EXECUTING", + description: `Attempt created, starting execution${ + isWarmStart ? " (warm start)" : "" + }`, + }, + previousSnapshotId: latestSnapshot.id, + environmentId: latestSnapshot.environmentId, + environmentType: latestSnapshot.environmentType, + projectId: latestSnapshot.projectId, + organizationId: latestSnapshot.organizationId, + batchId: latestSnapshot.batchId ?? undefined, + completedWaitpoints: latestSnapshot.completedWaitpoints, + workerId, + runnerId, + } + ); return { updatedRun: run, snapshot: newSnapshot }; }, @@ -510,6 +508,18 @@ export class RunAttemptSystem { const { updatedRun, snapshot } = result; + if (taskRun.ttl) { + //don't expire the run, it's going to execute + await this.$.worker.ack(`expireRun:${taskRun.id}`); + } + + // Side effects must only run against a durably committed snapshot row. + await this.executionSnapshotSystem.scheduleSnapshotSideEffects({ + snapshot, + runId: taskRun.id, + completedWaitpoints: latestSnapshot.completedWaitpoints, + }); + this.$.eventBus.emit("runAttemptStarted", { time: new Date(), run: { @@ -688,6 +698,9 @@ export class RunAttemptSystem { runId: string; snapshotId: string; completion: TaskRunSuccessfulExecutionResult; + // Note: passing an open transaction as `tx` makes $transaction run inline in it, + // which would move this method's post-commit side effects inside the caller's + // transaction. Callers must pass a plain client. tx: PrismaClientOrTransaction; workerId?: string; runnerId?: string; @@ -738,58 +751,100 @@ export class RunAttemptSystem { environmentType: latestSnapshot.environmentType, }); - const run = await prisma.taskRun.update({ - where: { id: runId }, - data: { - status: "COMPLETED_SUCCESSFULLY", - completedAt, - output: completion.output, - outputType: completion.outputType, - usageDurationMs: updatedUsage.usageDurationMs, - costInCents: updatedUsage.costInCents, - executionSnapshots: { - create: { - executionStatus: "FINISHED", - description: "Task completed successfully", - runStatus: "COMPLETED_SUCCESSFULLY", - attemptNumber: latestSnapshot.attemptNumber, - environmentId: latestSnapshot.environmentId, - environmentType: latestSnapshot.environmentType, - projectId: latestSnapshot.projectId, - organizationId: latestSnapshot.organizationId, - workerId, - runnerId, + const completedOutput = completion.output + ? { value: completion.output, type: completion.outputType, isError: false as const } + : undefined; + + const txResult = await $transaction( + prisma, + async (tx) => { + const run = await tx.taskRun.update({ + where: { id: runId }, + data: { + status: "COMPLETED_SUCCESSFULLY", + completedAt, + output: completion.output, + outputType: completion.outputType, + usageDurationMs: updatedUsage.usageDurationMs, + costInCents: updatedUsage.costInCents, + executionSnapshots: { + create: { + executionStatus: "FINISHED", + description: "Task completed successfully", + runStatus: "COMPLETED_SUCCESSFULLY", + attemptNumber: latestSnapshot.attemptNumber, + environmentId: latestSnapshot.environmentId, + environmentType: latestSnapshot.environmentType, + projectId: latestSnapshot.projectId, + organizationId: latestSnapshot.organizationId, + workerId, + runnerId, + }, + }, }, - }, - }, - select: { - id: true, - friendlyId: true, - status: true, - attemptNumber: true, - spanId: true, - updatedAt: true, - associatedWaitpoint: { select: { id: true, + friendlyId: true, + status: true, + attemptNumber: true, + spanId: true, + updatedAt: true, + associatedWaitpoint: { + select: { + id: true, + }, + }, + project: { + select: { + organizationId: true, + }, + }, + batchId: true, + createdAt: true, + completedAt: true, + taskEventStore: true, + parentTaskRunId: true, + usageDurationMs: true, + costInCents: true, + runtimeEnvironmentId: true, + projectId: true, }, - }, - project: { - select: { - organizationId: true, - }, - }, - batchId: true, - createdAt: true, - completedAt: true, - taskEventStore: true, - parentTaskRunId: true, - usageDurationMs: true, - costInCents: true, - runtimeEnvironmentId: true, - projectId: true, + }); + + // Complete the waitpoint if it exists (runs without waiting parents + // have no waitpoint). Side effects (continuation jobs, events) are + // scheduled after this transaction commits. + const completedWaitpoint = run.associatedWaitpoint + ? await this.waitpointSystem.completeWaitpointMutation({ + id: run.associatedWaitpoint.id, + output: completedOutput, + tx, + }) + : undefined; + + return { run, completedWaitpoint }; }, - }); + (error) => { + this.$.logger.error("RunEngine.attemptSucceeded(): prisma.$transaction error", { + code: error.code, + meta: error.meta, + stack: error.stack, + message: error.message, + name: error.name, + }); + throw new ServiceValidationError( + "Failed to complete task run and associated waitpoint", + 500 + ); + } + ); + + if (!txResult) { + throw new ServiceValidationError("Failed to complete task run attempt", 500); + } + + const { run, completedWaitpoint } = txResult; + const newSnapshot = await getLatestExecutionSnapshot(prisma, runId); await this.$.runQueue.acknowledgeMessage(run.project.organizationId, runId); @@ -806,14 +861,8 @@ export class RunAttemptSystem { }, }); - // Complete the waitpoint if it exists (runs without waiting parents have no waitpoint) - if (run.associatedWaitpoint) { - await this.waitpointSystem.completeWaitpoint({ - id: run.associatedWaitpoint.id, - output: completion.output - ? { value: completion.output, type: completion.outputType, isError: false } - : undefined, - }); + if (completedWaitpoint) { + await this.waitpointSystem.scheduleWaitpointContinuations(completedWaitpoint); } this.$.eventBus.emit("runSucceeded", { diff --git a/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts b/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts index 8b8d4f82fcf..3f8b1d53b57 100644 --- a/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts @@ -1,6 +1,7 @@ import { timeoutError, tryCatch } from "@trigger.dev/core/v3"; import { WaitpointId } from "@trigger.dev/core/v3/isomorphic"; import { + $transaction, Prisma, PrismaClientOrTransaction, TaskQueue, @@ -39,6 +40,22 @@ export type WaitpointContinuationResult = waitpoints: Array; }; +export type WaitpointOutput = { + value: string; + type?: string; + isError: boolean; +}; + +export type CompletedWaitpointMutationResult = { + waitpoint: Waitpoint; + affectedTaskRuns: { + taskRunId: string; + spanIdToComplete: string | null; + createdAt: Date; + }[]; + output?: WaitpointOutput; +}; + export class WaitpointSystem { private readonly $: SystemResources; private readonly executionSnapshotSystem: ExecutionSnapshotSystem; @@ -74,15 +91,32 @@ export class WaitpointSystem { output, }: { id: string; - output?: { - value: string; - type?: string; - isError: boolean; - }; + output?: WaitpointOutput; }): Promise { + const result = await this.completeWaitpointMutation({ id, output }); + await this.scheduleWaitpointContinuations(result); + return result.waitpoint; + } + + /** Marks the waitpoint COMPLETED and returns the runs it was blocking. + * Pure Postgres mutation — safe to run inside a transaction via `tx`. + * Direct callers MUST call scheduleWaitpointContinuations() with the + * result AFTER the surrounding transaction commits, otherwise blocked runs + * are never continued. */ + async completeWaitpointMutation({ + id, + output, + tx, + }: { + id: string; + output?: WaitpointOutput; + tx?: PrismaClientOrTransaction; + }): Promise { + const prisma = tx ?? this.$.prisma; + // 1. Complete the Waitpoint (if not completed) const [updateError, updateResult] = await tryCatch( - this.$.prisma.waitpoint.updateMany({ + prisma.waitpoint.updateMany({ where: { id, status: "PENDING" }, data: { status: "COMPLETED", @@ -106,7 +140,7 @@ export class WaitpointSystem { ); } - const waitpoint = await this.$.prisma.waitpoint.findFirst({ + const waitpoint = await prisma.waitpoint.findFirst({ where: { id }, }); @@ -123,7 +157,7 @@ export class WaitpointSystem { } // 2. Find the TaskRuns blocked by this waitpoint - const affectedTaskRuns = await this.$.prisma.taskRunWaitpoint.findMany({ + const affectedTaskRuns = await prisma.taskRunWaitpoint.findMany({ where: { waitpointId: id }, select: { taskRunId: true, spanIdToComplete: true, createdAt: true }, }); @@ -134,14 +168,25 @@ export class WaitpointSystem { }); } - // 3. Schedule trying to continue the runs + return { waitpoint, affectedTaskRuns, output }; + } + + /** Post-commit side effects of completing a waitpoint: schedules continuation of + * the blocked runs and emits events. Must be called AFTER the mutation committed — + * never from inside a transaction. */ + async scheduleWaitpointContinuations({ + waitpoint, + affectedTaskRuns, + output, + }: CompletedWaitpointMutationResult): Promise { + // Schedule trying to continue the runs for (const run of affectedTaskRuns) { const jobId = `continueRunIfUnblocked:${run.taskRunId}`; //50ms in the future const availableAt = new Date(Date.now() + 50); this.$.logger.debug(`completeWaitpoint: enqueueing continueRunIfUnblocked`, { - waitpointId: id, + waitpointId: waitpoint.id, runId: run.taskRunId, jobId, availableAt, @@ -169,8 +214,6 @@ export class WaitpointSystem { }); } } - - return waitpoint; } /** @@ -793,31 +836,73 @@ export class WaitpointSystem { }; } case "EXECUTING_WITH_WAITPOINTS": { - const newSnapshot = await this.executionSnapshotSystem.createExecutionSnapshot( + const completedWaitpointArgs = blockingWaitpoints.map((b) => ({ + id: b.waitpoint.id, + index: b.batchIndex ?? undefined, + })); + + const newSnapshot = await $transaction( this.$.prisma, + async (tx) => { + const createdSnapshot = + await this.executionSnapshotSystem.createExecutionSnapshotMutation(tx, { + run: { + id: runId, + status: snapshot.runStatus, + attemptNumber: snapshot.attemptNumber, + }, + snapshot: { + executionStatus: "EXECUTING", + description: "Run was continued, whilst still executing.", + }, + previousSnapshotId: snapshot.id, + environmentId: snapshot.environmentId, + environmentType: snapshot.environmentType, + projectId: snapshot.projectId, + organizationId: snapshot.organizationId, + batchId: snapshot.batchId ?? undefined, + completedWaitpoints: completedWaitpointArgs, + }); + + // Deleted in the same transaction so the snapshot and unblock are atomic. + if (blockingWaitpoints.length > 0) { + await tx.taskRunWaitpoint.deleteMany({ + where: { + taskRunId: runId, + id: { in: blockingWaitpoints.map((b) => b.id) }, + }, + }); + } + + return createdSnapshot; + }, + (error) => { + this.$.logger.error("continueRunIfUnblocked: prisma.$transaction error", { + code: error.code, + meta: error.meta, + message: error.message, + runId, + }); + }, { - run: { - id: runId, - status: snapshot.runStatus, - attemptNumber: snapshot.attemptNumber, - }, - snapshot: { - executionStatus: "EXECUTING", - description: "Run was continued, whilst still executing.", - }, - previousSnapshotId: snapshot.id, - environmentId: snapshot.environmentId, - environmentType: snapshot.environmentType, - projectId: snapshot.projectId, - organizationId: snapshot.organizationId, - batchId: snapshot.batchId ?? undefined, - completedWaitpoints: blockingWaitpoints.map((b) => ({ - id: b.waitpoint.id, - index: b.batchIndex ?? undefined, - })), + // the m2m connect inserts one row per blocking waitpoint, so large + // batchTriggerAndWait parents need more than the 5s default + timeout: 30_000, + maxRetries: 2, } ); + if (!newSnapshot) { + throw new Error(`continueRunIfUnblocked: failed to unblock run: ${runId}`); + } + + // Side effects must only run against a durably committed snapshot row. + await this.executionSnapshotSystem.scheduleSnapshotSideEffects({ + snapshot: newSnapshot, + runId, + completedWaitpoints: completedWaitpointArgs, + }); + this.$.logger.debug( `continueRunIfUnblocked: run was still executing, sending notification`, { @@ -833,8 +918,14 @@ export class WaitpointSystem { eventBus: this.$.eventBus, }); - break; + return { + status: "unblocked", + waitpoints: blockingWaitpoints.map((w) => w.waitpoint), + }; } + // Unlike EXECUTING_WITH_WAITPOINTS above, this case is deliberately NOT + // wrapped in a single transaction: enqueueRun performs Redis queue work + // internally, which must never run inside an open Postgres transaction. case "SUSPENDED": { if (!snapshot.checkpointId) { // A run canceled mid-suspend has its checkpoint cleared by the @@ -890,6 +981,8 @@ export class WaitpointSystem { } } + // Only the SUSPENDED case reaches here — EXECUTING_WITH_WAITPOINTS returns + // above after deleting its waitpoints inside its transaction. if (blockingWaitpoints.length > 0) { //5. Remove the blocking waitpoints await this.$.prisma.taskRunWaitpoint.deleteMany({ diff --git a/internal-packages/run-engine/src/engine/tests/atomicCompletion.test.ts b/internal-packages/run-engine/src/engine/tests/atomicCompletion.test.ts new file mode 100644 index 00000000000..4fd416b1455 --- /dev/null +++ b/internal-packages/run-engine/src/engine/tests/atomicCompletion.test.ts @@ -0,0 +1,328 @@ +import { assertNonNullable, containerTest } from "@internal/testcontainers"; +import { trace } from "@internal/tracing"; +import { describe, expect, vi } from "vitest"; +import { setTimeout } from "node:timers/promises"; +import { PrismaClient } from "@trigger.dev/database"; +import { RedisOptions } from "@internal/redis"; +import { RunEngine } from "../index.js"; +import { setupAuthenticatedEnvironment, setupBackgroundWorker } from "./setup.js"; + +vi.setConfig({ testTimeout: 60_000 }); + +function createEngine(prisma: PrismaClient, redisOptions: RedisOptions) { + return new RunEngine({ + prisma, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + }, + queue: { + redis: redisOptions, + masterQueueConsumersDisabled: true, + processWorkerQueueDebounceMs: 50, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0001, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); +} + +/** Number of xid-consuming (i.e. write) transactions is the delta between two + * txid_current() calls, minus 1 for the second call itself. Read-only + * transactions never allocate an xid, so they don't count. */ +async function currentTxid(prisma: PrismaClient): Promise { + const rows = await prisma.$queryRaw<{ txid: bigint }[]>`SELECT txid_current() AS txid`; + return BigInt(rows[0].txid); +} + +/** Drives a parent run + triggerAndWait child up to the point where the child + * is EXECUTING and the parent is blocked on the child's associated waitpoint. */ +async function setupBlockedParentWithExecutingChild( + engine: RunEngine, + prisma: PrismaClient, + authenticatedEnvironment: Awaited> +) { + const parentTask = "parent-task"; + const childTask = "child-task"; + + await setupBackgroundWorker(engine, authenticatedEnvironment, [parentTask, childTask]); + + const parentRun = await engine.trigger( + { + number: 1, + friendlyId: "run_p1234", + environment: authenticatedEnvironment, + taskIdentifier: parentTask, + payload: "{}", + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "t12345", + spanId: "s12345", + queue: `task/${parentTask}`, + isTest: false, + tags: [], + workerQueue: "main", + }, + prisma + ); + + await setTimeout(500); + const dequeuedParent = await engine.dequeueFromWorkerQueue({ + consumerId: "test_12345", + workerQueue: "main", + }); + expect(dequeuedParent.length).toBe(1); + + await engine.startRunAttempt({ + runId: parentRun.id, + snapshotId: dequeuedParent[0].snapshot.id, + }); + + const childRun = await engine.trigger( + { + number: 1, + friendlyId: "run_c1234", + environment: authenticatedEnvironment, + taskIdentifier: childTask, + payload: "{}", + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "t12345", + spanId: "s12345", + queue: `task/${childTask}`, + isTest: false, + tags: [], + resumeParentOnCompletion: true, + parentTaskRunId: parentRun.id, + workerQueue: "main", + }, + prisma + ); + + await setTimeout(500); + const dequeuedChild = await engine.dequeueFromWorkerQueue({ + consumerId: "test_12345", + workerQueue: "main", + }); + expect(dequeuedChild.length).toBe(1); + + const childAttempt = await engine.startRunAttempt({ + runId: childRun.id, + snapshotId: dequeuedChild[0].snapshot.id, + }); + + const childWithWaitpoint = await prisma.taskRun.findFirstOrThrow({ + where: { id: childRun.id }, + include: { associatedWaitpoint: true }, + }); + assertNonNullable(childWithWaitpoint.associatedWaitpoint); + + return { + parentRun, + childRun, + childAttempt, + waitpointId: childWithWaitpoint.associatedWaitpoint.id, + }; +} + +describe("RunEngine atomic completion", () => { + containerTest( + "attemptSucceeded with an associated waitpoint is a single write transaction", + async ({ prisma, redisOptions }) => { + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + const engine = createEngine(prisma, redisOptions); + + try { + const { childRun, childAttempt, waitpointId } = await setupBlockedParentWithExecutingChild( + engine, + prisma, + authenticatedEnvironment + ); + + const result = await engine.completeRunAttempt({ + runId: childRun.id, + snapshotId: childAttempt.snapshot.id, + completion: { + ok: true, + id: childRun.id, + output: `{"foo":"bar"}`, + outputType: "application/json", + }, + }); + + expect(result.attemptStatus).toBe("RUN_FINISHED"); + + // Equal xmin on all three rows proves they were written (last-updated) + // by the same transaction — a timing-window-free proof that the + // TaskRun update, the FINISHED snapshot insert, and the waitpoint + // completion all share one commit. + const rows = await prisma.$queryRaw<{ source: string; xmin: string }[]>` + SELECT 'run' AS source, xmin::text FROM "TaskRun" WHERE id = ${childRun.id} + UNION ALL + SELECT 'snapshot' AS source, xmin::text FROM "TaskRunExecutionSnapshot" + WHERE "runId" = ${childRun.id} AND "executionStatus"::text = 'FINISHED' + UNION ALL + SELECT 'waitpoint' AS source, xmin::text FROM "Waitpoint" WHERE id = ${waitpointId} + `; + expect(rows.length, JSON.stringify(rows)).toBe(3); + expect(new Set(rows.map((r) => r.xmin)).size, JSON.stringify(rows)).toBe(1); + } finally { + await engine.quit(); + } + } + ); + + containerTest( + "continueRunIfUnblocked is a single write transaction", + async ({ prisma, redisOptions }) => { + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + const engine = createEngine(prisma, redisOptions); + + try { + const { parentRun, childRun, childAttempt } = await setupBlockedParentWithExecutingChild( + engine, + prisma, + authenticatedEnvironment + ); + + // Take `before` BEFORE completeRunAttempt so the bracket covers both + // the completion commit and the subsequent continueRunIfUnblocked commit, + // eliminating the race where the continuation job could commit inside + // a tighter window taken after completeRunAttempt returns. + const before = await currentTxid(prisma); + + await engine.completeRunAttempt({ + runId: childRun.id, + snapshotId: childAttempt.snapshot.id, + completion: { + ok: true, + id: childRun.id, + output: `{"foo":"bar"}`, + outputType: "application/json", + }, + }); + + // The continueRunIfUnblocked job is debounced by 50ms; poll until the + // parent has been continued (EXECUTING_WITH_WAITPOINTS -> EXECUTING). + let continued = false; + let lastStatus: string | undefined; + for (let i = 0; i < 50; i++) { + await setTimeout(100); + const parentData = await engine.getRunExecutionData({ runId: parentRun.id }); + lastStatus = parentData?.snapshot.executionStatus; + if (lastStatus === "EXECUTING") { + continued = true; + break; + } + } + expect(continued, `parent never continued, last status: ${lastStatus}`).toBe(true); + + const after = await currentTxid(prisma); + + // completion (1 commit) + continuation (1 commit); today this is 4 (2 + 2). + // Note: txid_current() is cluster-global — autovacuum could in principle + // add an xid between the two reads, accepted residual risk; xmin equality + // cannot witness the TaskRunWaitpoint DELETE so we keep the delta check here. + const writeTransactions = Number(after - before) - 1; + expect(writeTransactions).toBe(2); + + const remainingBlockers = await prisma.taskRunWaitpoint.findMany({ + where: { taskRunId: parentRun.id }, + }); + expect(remainingBlockers.length).toBe(0); + } finally { + await engine.quit(); + } + } + ); + + containerTest( + "a failed completion rolls back the whole transition", + async ({ prisma, redisOptions }) => { + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + const engine = createEngine(prisma, redisOptions); + + try { + const { childRun, childAttempt, waitpointId } = await setupBlockedParentWithExecutingChild( + engine, + prisma, + authenticatedEnvironment + ); + + // Real fault injection, no mocks: hold a row lock on the child's + // associated waitpoint so the waitpoint completion inside + // attemptSucceeded blocks past the transaction timeout (5s default) + // and the whole transaction aborts. + // NOTE: this test relies on the implementation's transaction timeout + // (Prisma default 5s) being well below the 8s lock hold time. If the + // implementation ever raises its tx timeout to ≥8s, raise the hold + // time here accordingly. + const lockHolder = prisma.$transaction( + async (tx) => { + await tx.$queryRaw`SELECT "id" FROM "Waitpoint" WHERE "id" = ${waitpointId} FOR UPDATE`; + await setTimeout(8_000); + }, + { timeout: 20_000, maxWait: 5_000 } + ); + await setTimeout(300); // let the lock land + + let completionError: unknown; + try { + await engine.completeRunAttempt({ + runId: childRun.id, + snapshotId: childAttempt.snapshot.id, + completion: { + ok: true, + id: childRun.id, + output: `{"foo":"bar"}`, + outputType: "application/json", + }, + }); + } catch (error) { + completionError = error; + } + + await lockHolder; + + // The completion must have failed while the lock was held... + expect(completionError, "completion must fail while the waitpoint row is locked").toBeDefined(); + + // ...and NOTHING from the transition may have been committed: + // run still EXECUTING, waitpoint still PENDING, no FINISHED snapshot. + const childAfter = await prisma.taskRun.findFirstOrThrow({ + where: { id: childRun.id }, + }); + expect(childAfter.status).toBe("EXECUTING"); + + const waitpointAfter = await prisma.waitpoint.findFirstOrThrow({ + where: { id: waitpointId }, + }); + expect(waitpointAfter.status).toBe("PENDING"); + + const finishedSnapshots = await prisma.taskRunExecutionSnapshot.findMany({ + where: { runId: childRun.id, executionStatus: "FINISHED" }, + }); + expect(finishedSnapshots.length).toBe(0); + } finally { + await engine.quit(); + } + } + ); +});