diff --git a/apps/supervisor/src/services/computeSnapshotService.test.ts b/apps/supervisor/src/services/computeSnapshotService.test.ts new file mode 100644 index 00000000000..b039b63bd4d --- /dev/null +++ b/apps/supervisor/src/services/computeSnapshotService.test.ts @@ -0,0 +1,130 @@ +import { describe, expect, it, vi } from "vitest"; +import { setTimeout as sleep } from "node:timers/promises"; +import { ComputeSnapshotService } from "./computeSnapshotService.js"; +import type { ComputeWorkloadManager } from "../workloadManager/compute.js"; +import type { SupervisorHttpClient } from "@trigger.dev/core/v3/workers"; + +// The TimerWheel ticks every 100ms, so a 200ms delay dispatches within ~300ms. +const DELAY_MS = 200; +// Long enough that a pending snapshot would certainly have dispatched. +const SETTLE_MS = 600; + +function createService() { + const snapshot = vi.fn(async (_opts: { runnerId: string; metadata: Record }) => true); + + const computeManager = { + snapshotDelayMs: DELAY_MS, + snapshotDispatchLimit: 1, + snapshot, + } as unknown as ComputeWorkloadManager; + + const service = new ComputeSnapshotService({ + computeManager, + workerClient: {} as SupervisorHttpClient, + wideEventOpts: { service: "supervisor-test", env: {}, enabled: false }, + }); + + return { service, snapshot }; +} + +function delayedSnapshot(runnerId = "runner-1") { + return { + runnerId, + runFriendlyId: "run_1", + snapshotFriendlyId: "snapshot_1", + }; +} + +describe("ComputeSnapshotService", () => { + it("dispatches a scheduled snapshot after the delay", async () => { + const { service, snapshot } = createService(); + try { + service.schedule("run_1", delayedSnapshot()); + + await vi.waitFor(() => expect(snapshot).toHaveBeenCalledTimes(1), { timeout: 2_000 }); + expect(snapshot).toHaveBeenCalledWith({ + runnerId: "runner-1", + metadata: { runId: "run_1", snapshotFriendlyId: "snapshot_1" }, + }); + } finally { + service.stop(); + } + }); + + it("cancel before the delay expires prevents the dispatch", async () => { + const { service, snapshot } = createService(); + try { + service.schedule("run_1", delayedSnapshot()); + + expect(service.cancel("run_1")).toBe(true); + + await sleep(SETTLE_MS); + expect(snapshot).not.toHaveBeenCalled(); + } finally { + service.stop(); + } + }); + + it("cancel returns false when nothing is pending", () => { + const { service } = createService(); + try { + expect(service.cancel("run_1")).toBe(false); + } finally { + service.stop(); + } + }); + + it("cancel with a matching runnerId cancels the pending snapshot", async () => { + const { service, snapshot } = createService(); + try { + service.schedule("run_1", delayedSnapshot("runner-a")); + + expect(service.cancel("run_1", "runner-a")).toBe(true); + + await sleep(SETTLE_MS); + expect(snapshot).not.toHaveBeenCalled(); + } finally { + service.stop(); + } + }); + + it("cancel with a different runnerId leaves the pending snapshot alone", async () => { + const { service, snapshot } = createService(); + try { + service.schedule("run_1", delayedSnapshot("runner-a")); + + // A stale runner for a reassigned run must not cancel the new runner's snapshot. + expect(service.cancel("run_1", "runner-b")).toBe(false); + + await vi.waitFor(() => expect(snapshot).toHaveBeenCalledTimes(1), { timeout: 2_000 }); + expect(snapshot).toHaveBeenCalledWith( + expect.objectContaining({ runnerId: "runner-a" }) + ); + } finally { + service.stop(); + } + }); + + it("re-scheduling the same run replaces the pending snapshot", async () => { + const { service, snapshot } = createService(); + try { + service.schedule("run_1", delayedSnapshot()); + service.schedule("run_1", { + runnerId: "runner-1", + runFriendlyId: "run_1", + snapshotFriendlyId: "snapshot_2", + }); + + await vi.waitFor(() => expect(snapshot).toHaveBeenCalledTimes(1), { timeout: 2_000 }); + await sleep(SETTLE_MS); + + expect(snapshot).toHaveBeenCalledTimes(1); + expect(snapshot).toHaveBeenCalledWith({ + runnerId: "runner-1", + metadata: { runId: "run_1", snapshotFriendlyId: "snapshot_2" }, + }); + } finally { + service.stop(); + } + }); +}); diff --git a/apps/supervisor/src/services/computeSnapshotService.ts b/apps/supervisor/src/services/computeSnapshotService.ts index 35ac6acecab..216753fc12d 100644 --- a/apps/supervisor/src/services/computeSnapshotService.ts +++ b/apps/supervisor/src/services/computeSnapshotService.ts @@ -92,8 +92,19 @@ export class ComputeSnapshotService { }); } - /** Cancel a pending delayed snapshot. Returns true if one was cancelled. */ - cancel(runFriendlyId: string): boolean { + /** + * Cancel a pending delayed snapshot. Returns true if one was cancelled. + * When `runnerId` is given, only a snapshot scheduled for that same runner + * is cancelled - a stale runner for a run that has since been reassigned + * must not cancel the new runner's pending snapshot. + */ + cancel(runFriendlyId: string, runnerId?: string): boolean { + if (runnerId) { + const pending = this.timerWheel.peek(runFriendlyId); + if (pending && pending.data.runnerId !== runnerId) { + return false; + } + } const cancelled = this.timerWheel.cancel(runFriendlyId); if (cancelled) { emitOneShot({ diff --git a/apps/supervisor/src/services/timerWheel.ts b/apps/supervisor/src/services/timerWheel.ts index 9584423824d..cab5a5d7a25 100644 --- a/apps/supervisor/src/services/timerWheel.ts +++ b/apps/supervisor/src/services/timerWheel.ts @@ -121,6 +121,12 @@ export class TimerWheel { return true; } + /** Look up a pending item without removing it. */ + peek(key: string): TimerWheelItem | undefined { + const entry = this.entries.get(key); + return entry ? { key, data: entry.data } : undefined; + } + /** Number of pending items in the wheel. */ get size(): number { return this.entries.size; diff --git a/apps/supervisor/src/workloadManager/compute.test.ts b/apps/supervisor/src/workloadManager/compute.test.ts new file mode 100644 index 00000000000..ea5ddabf285 --- /dev/null +++ b/apps/supervisor/src/workloadManager/compute.test.ts @@ -0,0 +1,56 @@ +import { describe, expect, it } from "vitest"; +import { ComputeClientError } from "@internal/compute"; +import { isRetryableCreateError, runnerNameForAttempt } from "./compute.js"; + +describe("runnerNameForAttempt", () => { + it("keeps the unsuffixed name for the first attempt", () => { + expect(runnerNameForAttempt("runner-abc123", 1)).toBe("runner-abc123"); + }); + + it("suffixes retry attempts deterministically", () => { + expect(runnerNameForAttempt("runner-abc123", 2)).toBe("runner-abc123-r2"); + expect(runnerNameForAttempt("runner-abc123", 3)).toBe("runner-abc123-r3"); + }); +}); + +describe("isRetryableCreateError", () => { + it("retries statuses where the create definitely did not commit", () => { + expect(isRetryableCreateError(new ComputeClientError(500, "tap busy", "http://gw"))).toBe( + true + ); + expect(isRetryableCreateError(new ComputeClientError(503, "no placement", "http://gw"))).toBe( + true + ); + }); + + it("does not retry lost-response statuses (create may have committed)", () => { + expect(isRetryableCreateError(new ComputeClientError(502, "bad gateway", "http://gw"))).toBe( + false + ); + expect( + isRetryableCreateError(new ComputeClientError(504, "gateway timeout", "http://gw")) + ).toBe(false); + }); + + it("does not retry 4xx responses", () => { + expect(isRetryableCreateError(new ComputeClientError(400, "bad request", "http://gw"))).toBe( + false + ); + expect(isRetryableCreateError(new ComputeClientError(409, "conflict", "http://gw"))).toBe( + false + ); + }); + + it("does not retry timeouts (instance may still be provisioning)", () => { + expect(isRetryableCreateError(new DOMException("timed out", "TimeoutError"))).toBe(false); + }); + + it("retries network-level fetch failures", () => { + expect(isRetryableCreateError(new TypeError("fetch failed"))).toBe(true); + }); + + it("does not retry unknown errors", () => { + expect(isRetryableCreateError(new Error("something else"))).toBe(false); + expect(isRetryableCreateError("string error")).toBe(false); + }); +}); diff --git a/apps/supervisor/src/workloadManager/compute.ts b/apps/supervisor/src/workloadManager/compute.ts index 3efad7d4077..307dc3615a2 100644 --- a/apps/supervisor/src/workloadManager/compute.ts +++ b/apps/supervisor/src/workloadManager/compute.ts @@ -6,12 +6,58 @@ import { type WorkloadManagerCreateOptions, type WorkloadManagerOptions, } from "./types.js"; -import { ComputeClient, stripImageDigest } from "@internal/compute"; +import { ComputeClient, ComputeClientError, stripImageDigest } from "@internal/compute"; +import { setTimeout as sleep } from "node:timers/promises"; import { extractTraceparent, getRunnerId } from "../util.js"; import type { OtlpTraceService } from "../services/otlpTraceService.js"; import { tryCatch } from "@trigger.dev/core"; import { encodeBaggage, fromContext } from "../wideEvents/index.js"; +const CREATE_MAX_ATTEMPTS = 3; +const CREATE_RETRY_BASE_DELAY_MS = 250; + +/** + * TEMPORARY (TRI-10293): a failed create can leave its instance name + * registered gateway/fcrun-side until async cleanup runs, so a same-name + * retry can 409 against our own residue. Until the gateway cleans up + * failed-create registrations properly, retry attempts get a deterministic + * suffix. Attempt 1 keeps the unsuffixed name so the non-retry path is + * unchanged; the suffixed name flows into both the instance name and + * TRIGGER_RUNNER_ID, which downstream flows treat as one opaque + * self-reported token. Only attempts following a ComputeClientError are + * suffixed - network-failure retries keep the same name on purpose, because + * the gateway's name-collision 409 is their safety net against + * double-creating an instance whose create response was lost. + */ +export function runnerNameForAttempt(runnerId: string, attempt: number): string { + return attempt === 1 ? runnerId : `${runnerId}-r${attempt}`; +} + +/** + * Whether a failed instance create is worth retrying. Only statuses where + * the create definitely did NOT commit are retried: 500 means the agent or + * fcrun returned a create error (e.g. a netns slot holding the tap busy, a + * full node disk - placement may differ on retry), 503 means the gateway + * had nowhere to place it. 502/504 are excluded: the gateway emits those + * when it fails to reach the node or read its response, which can happen + * AFTER the agent committed the create - and the gateway only records the + * instance name on a clean 201, so a same-name retry would miss the + * collision check and could double-create the VM on another node. 4xx won't + * heal on retry, and timeouts may still be provisioning. Network-level + * fetch failures are safe: if the gateway processed the create, its name + * index is populated and the retry 409s harmlessly. + */ +export function isRetryableCreateError(error: unknown): boolean { + if (error instanceof ComputeClientError) { + return error.status === 500 || error.status === 503; + } + if (error instanceof DOMException && error.name === "TimeoutError") { + return false; + } + // Network-level fetch failures (gateway briefly unreachable) + return error instanceof TypeError; +} + type ComputeWorkloadManagerOptions = WorkloadManagerOptions & { gateway: { url: string; @@ -165,27 +211,71 @@ export class ComputeWorkloadManager implements WorkloadManager { const startMs = performance.now(); try { - const [error, data] = await tryCatch( - this.compute.instances.create({ - name: runnerId, - image: imageRef, - env: envVars, - cpu: opts.machine.cpu, - memory_gb: opts.machine.memory, - metadata: { - runId: opts.runFriendlyId, - envId: opts.envId, - envType: opts.envType, - orgId: opts.orgId, - projectId: opts.projectId, - deploymentVersion: opts.deploymentVersion, - machine: opts.machine.name, - }, - ...(Object.keys(labels).length > 0 ? { labels } : {}), - }) - ); + const createRequest = { + name: runnerId, + image: imageRef, + env: envVars, + cpu: opts.machine.cpu, + memory_gb: opts.machine.memory, + metadata: { + runId: opts.runFriendlyId, + envId: opts.envId, + envType: opts.envType, + orgId: opts.orgId, + projectId: opts.projectId, + deploymentVersion: opts.deploymentVersion, + machine: opts.machine.name, + }, + ...(Object.keys(labels).length > 0 ? { labels } : {}), + }; + + // Retry transient placement failures instead of abandoning the run: a + // swallowed create error leaves the run waiting for the run engine's + // PENDING_EXECUTING timeout (minutes) before it is redriven, while a + // retried create typically succeeds in under a second (TRI-10293). + let error: unknown; + let data: Awaited> | null | undefined; + let attempt = 1; + // Set after a ComputeClientError: the failed create may have left its + // name registered, so subsequent attempts use a suffixed name. + let suffixAttempts = false; + for (; attempt <= CREATE_MAX_ATTEMPTS; attempt++) { + const attemptRunnerId = suffixAttempts + ? runnerNameForAttempt(runnerId, attempt) + : runnerId; + [error, data] = await tryCatch( + this.compute.instances.create( + attemptRunnerId === runnerId + ? createRequest + : { + ...createRequest, + name: attemptRunnerId, + env: { ...envVars, TRIGGER_RUNNER_ID: attemptRunnerId }, + } + ) + ); + + if (!error) { + event.runnerId = attemptRunnerId; + break; + } + + if (error instanceof ComputeClientError) { + suffixAttempts = true; + } + + this.logger.warn("create instance attempt failed", { + runnerId: attemptRunnerId, + attempt, + error: error instanceof Error ? error.message : String(error), + }); + + if (!isRetryableCreateError(error) || attempt === CREATE_MAX_ATTEMPTS) break; + await sleep(CREATE_RETRY_BASE_DELAY_MS * attempt); + } + event.createAttempts = attempt; - if (error) { + if (error || !data) { event.error = error instanceof Error ? error.message : String(error); event.errorType = error instanceof DOMException && error.name === "TimeoutError" ? "timeout" : "fetch"; diff --git a/apps/supervisor/src/workloadServer/index.ts b/apps/supervisor/src/workloadServer/index.ts index ba933477976..43bbe4fa887 100644 --- a/apps/supervisor/src/workloadServer/index.ts +++ b/apps/supervisor/src/workloadServer/index.ts @@ -303,6 +303,16 @@ export class WorkloadServer extends EventEmitter { return; } + // A completed attempt invalidates any pending delayed snapshot: the + // suspended execution state it was scheduled to capture no longer + // exists. Without this, the snapshot fires up to snapshotDelayMs + // later and pauses a VM that has long moved on, e.g. mid warm-start + // long-poll or already executing the next run. + this.snapshotService?.cancel( + params.runFriendlyId, + this.runnerIdFromRequest(req) + ); + reply.json( completeResponse.data satisfies WorkloadRunAttemptCompleteResponseBody ); @@ -728,6 +738,14 @@ export class WorkloadServer extends EventEmitter { const runDisconnected = (friendlyId: string, reason: string) => { socketLogger.debug("runDisconnected", { ...getSocketMetadata() }); + // The run is gone from this runner (crash, exit, or replaced by a new + // run), so a pending delayed snapshot for it is stale. Genuine + // waitpoint suspensions keep the socket connected, so this doesn't + // cancel a snapshot that's still wanted; the runnerId match guards + // against a stale duplicate runner cancelling a fresh runner's + // snapshot after the run was reassigned. + this.snapshotService?.cancel(friendlyId, socket.data.runnerId); + this.runSockets.delete(friendlyId); this.emit("runDisconnected", { run: { friendlyId } }); socket.data.runFriendlyId = undefined;