From 829fec64f80da5b687dba7743f6dff303a10b8cd Mon Sep 17 00:00:00 2001 From: Saadi Myftija Date: Wed, 10 Jun 2026 18:01:42 +0200 Subject: [PATCH 1/3] fix: cancel pending delayed snapshots when the run completes or disconnects The compute suspend flow delays snapshots by snapshotDelayMs to avoid wasted work on short-lived waitpoints, with the intent that a run which continues before the delay expires cancels the pending snapshot. But the only cancel() call site is the /continue workload action, which runners only invoke when restoring from an already-taken snapshot - so a pending snapshot is never actually cancelled (zero snapshot.canceled events in prod). When a run resumes and completes within the delay window, the stale snapshot fires anyway and fcrun pauses the VM for ~6-13s while its controller is mid warm-start long-poll. The frozen guest can't fire its abort timer or send a FIN, so firestarter keeps the connection claimable past the client deadline and dispatches runs into it - each one a ~300s stall (TRI-10293). Cancel the pending snapshot when the attempt completes and when the run socket disconnects. Genuine waitpoint suspensions keep the runner socket connected and the attempt incomplete, so neither hook cancels a snapshot that is still wanted. Cancellation is guarded by runnerId so a stale duplicate runner for a reassigned run can't cancel the new runner's pending snapshot. --- .../services/computeSnapshotService.test.ts | 130 ++++++++++++++++++ .../src/services/computeSnapshotService.ts | 15 +- apps/supervisor/src/services/timerWheel.ts | 6 + apps/supervisor/src/workloadServer/index.ts | 18 +++ 4 files changed, 167 insertions(+), 2 deletions(-) create mode 100644 apps/supervisor/src/services/computeSnapshotService.test.ts diff --git a/apps/supervisor/src/services/computeSnapshotService.test.ts b/apps/supervisor/src/services/computeSnapshotService.test.ts new file mode 100644 index 00000000000..b039b63bd4d --- /dev/null +++ b/apps/supervisor/src/services/computeSnapshotService.test.ts @@ -0,0 +1,130 @@ +import { describe, expect, it, vi } from "vitest"; +import { setTimeout as sleep } from "node:timers/promises"; +import { ComputeSnapshotService } from "./computeSnapshotService.js"; +import type { ComputeWorkloadManager } from "../workloadManager/compute.js"; +import type { SupervisorHttpClient } from "@trigger.dev/core/v3/workers"; + +// The TimerWheel ticks every 100ms, so a 200ms delay dispatches within ~300ms. +const DELAY_MS = 200; +// Long enough that a pending snapshot would certainly have dispatched. +const SETTLE_MS = 600; + +function createService() { + const snapshot = vi.fn(async (_opts: { runnerId: string; metadata: Record }) => true); + + const computeManager = { + snapshotDelayMs: DELAY_MS, + snapshotDispatchLimit: 1, + snapshot, + } as unknown as ComputeWorkloadManager; + + const service = new ComputeSnapshotService({ + computeManager, + workerClient: {} as SupervisorHttpClient, + wideEventOpts: { service: "supervisor-test", env: {}, enabled: false }, + }); + + return { service, snapshot }; +} + +function delayedSnapshot(runnerId = "runner-1") { + return { + runnerId, + runFriendlyId: "run_1", + snapshotFriendlyId: "snapshot_1", + }; +} + +describe("ComputeSnapshotService", () => { + it("dispatches a scheduled snapshot after the delay", async () => { + const { service, snapshot } = createService(); + try { + service.schedule("run_1", delayedSnapshot()); + + await vi.waitFor(() => expect(snapshot).toHaveBeenCalledTimes(1), { timeout: 2_000 }); + expect(snapshot).toHaveBeenCalledWith({ + runnerId: "runner-1", + metadata: { runId: "run_1", snapshotFriendlyId: "snapshot_1" }, + }); + } finally { + service.stop(); + } + }); + + it("cancel before the delay expires prevents the dispatch", async () => { + const { service, snapshot } = createService(); + try { + service.schedule("run_1", delayedSnapshot()); + + expect(service.cancel("run_1")).toBe(true); + + await sleep(SETTLE_MS); + expect(snapshot).not.toHaveBeenCalled(); + } finally { + service.stop(); + } + }); + + it("cancel returns false when nothing is pending", () => { + const { service } = createService(); + try { + expect(service.cancel("run_1")).toBe(false); + } finally { + service.stop(); + } + }); + + it("cancel with a matching runnerId cancels the pending snapshot", async () => { + const { service, snapshot } = createService(); + try { + service.schedule("run_1", delayedSnapshot("runner-a")); + + expect(service.cancel("run_1", "runner-a")).toBe(true); + + await sleep(SETTLE_MS); + expect(snapshot).not.toHaveBeenCalled(); + } finally { + service.stop(); + } + }); + + it("cancel with a different runnerId leaves the pending snapshot alone", async () => { + const { service, snapshot } = createService(); + try { + service.schedule("run_1", delayedSnapshot("runner-a")); + + // A stale runner for a reassigned run must not cancel the new runner's snapshot. + expect(service.cancel("run_1", "runner-b")).toBe(false); + + await vi.waitFor(() => expect(snapshot).toHaveBeenCalledTimes(1), { timeout: 2_000 }); + expect(snapshot).toHaveBeenCalledWith( + expect.objectContaining({ runnerId: "runner-a" }) + ); + } finally { + service.stop(); + } + }); + + it("re-scheduling the same run replaces the pending snapshot", async () => { + const { service, snapshot } = createService(); + try { + service.schedule("run_1", delayedSnapshot()); + service.schedule("run_1", { + runnerId: "runner-1", + runFriendlyId: "run_1", + snapshotFriendlyId: "snapshot_2", + }); + + await vi.waitFor(() => expect(snapshot).toHaveBeenCalledTimes(1), { timeout: 2_000 }); + await sleep(SETTLE_MS); + + expect(snapshot).toHaveBeenCalledTimes(1); + expect(snapshot).toHaveBeenCalledWith({ + runnerId: "runner-1", + metadata: { runId: "run_1", snapshotFriendlyId: "snapshot_2" }, + }); + } finally { + service.stop(); + } + }); +}); diff --git a/apps/supervisor/src/services/computeSnapshotService.ts b/apps/supervisor/src/services/computeSnapshotService.ts index 35ac6acecab..216753fc12d 100644 --- a/apps/supervisor/src/services/computeSnapshotService.ts +++ b/apps/supervisor/src/services/computeSnapshotService.ts @@ -92,8 +92,19 @@ export class ComputeSnapshotService { }); } - /** Cancel a pending delayed snapshot. Returns true if one was cancelled. */ - cancel(runFriendlyId: string): boolean { + /** + * Cancel a pending delayed snapshot. Returns true if one was cancelled. + * When `runnerId` is given, only a snapshot scheduled for that same runner + * is cancelled - a stale runner for a run that has since been reassigned + * must not cancel the new runner's pending snapshot. + */ + cancel(runFriendlyId: string, runnerId?: string): boolean { + if (runnerId) { + const pending = this.timerWheel.peek(runFriendlyId); + if (pending && pending.data.runnerId !== runnerId) { + return false; + } + } const cancelled = this.timerWheel.cancel(runFriendlyId); if (cancelled) { emitOneShot({ diff --git a/apps/supervisor/src/services/timerWheel.ts b/apps/supervisor/src/services/timerWheel.ts index 9584423824d..cab5a5d7a25 100644 --- a/apps/supervisor/src/services/timerWheel.ts +++ b/apps/supervisor/src/services/timerWheel.ts @@ -121,6 +121,12 @@ export class TimerWheel { return true; } + /** Look up a pending item without removing it. */ + peek(key: string): TimerWheelItem | undefined { + const entry = this.entries.get(key); + return entry ? { key, data: entry.data } : undefined; + } + /** Number of pending items in the wheel. */ get size(): number { return this.entries.size; diff --git a/apps/supervisor/src/workloadServer/index.ts b/apps/supervisor/src/workloadServer/index.ts index ba933477976..43bbe4fa887 100644 --- a/apps/supervisor/src/workloadServer/index.ts +++ b/apps/supervisor/src/workloadServer/index.ts @@ -303,6 +303,16 @@ export class WorkloadServer extends EventEmitter { return; } + // A completed attempt invalidates any pending delayed snapshot: the + // suspended execution state it was scheduled to capture no longer + // exists. Without this, the snapshot fires up to snapshotDelayMs + // later and pauses a VM that has long moved on, e.g. mid warm-start + // long-poll or already executing the next run. + this.snapshotService?.cancel( + params.runFriendlyId, + this.runnerIdFromRequest(req) + ); + reply.json( completeResponse.data satisfies WorkloadRunAttemptCompleteResponseBody ); @@ -728,6 +738,14 @@ export class WorkloadServer extends EventEmitter { const runDisconnected = (friendlyId: string, reason: string) => { socketLogger.debug("runDisconnected", { ...getSocketMetadata() }); + // The run is gone from this runner (crash, exit, or replaced by a new + // run), so a pending delayed snapshot for it is stale. Genuine + // waitpoint suspensions keep the socket connected, so this doesn't + // cancel a snapshot that's still wanted; the runnerId match guards + // against a stale duplicate runner cancelling a fresh runner's + // snapshot after the run was reassigned. + this.snapshotService?.cancel(friendlyId, socket.data.runnerId); + this.runSockets.delete(friendlyId); this.emit("runDisconnected", { run: { friendlyId } }); socket.data.runFriendlyId = undefined; From dbbe9b33c771b04e3bda7865257a766512374861 Mon Sep 17 00:00:00 2001 From: Saadi Myftija Date: Wed, 10 Jun 2026 19:53:57 +0200 Subject: [PATCH 2/3] fix: retry transient instance create failures instead of abandoning the run ComputeWorkloadManager.create swallows gateway errors by design, so a cold start that fails placement (e.g. a netns slot with a busy tap, a full node disk) silently abandons the dequeued run until the run engine's PENDING_EXECUTING timeout redrives it minutes later. These failures are transient per placement - redriven runs virtually always succeed - so retry the create up to 3 times with short backoff before giving up. Gateway 5xx and network-level fetch failures are retried; 4xx responses (won't heal) and timeouts (the instance may still be provisioning) are not. --- .../src/workloadManager/compute.test.ts | 45 +++++++++ .../supervisor/src/workloadManager/compute.ts | 92 ++++++++++++++----- 2 files changed, 116 insertions(+), 21 deletions(-) create mode 100644 apps/supervisor/src/workloadManager/compute.test.ts diff --git a/apps/supervisor/src/workloadManager/compute.test.ts b/apps/supervisor/src/workloadManager/compute.test.ts new file mode 100644 index 00000000000..7598caf04f3 --- /dev/null +++ b/apps/supervisor/src/workloadManager/compute.test.ts @@ -0,0 +1,45 @@ +import { describe, expect, it } from "vitest"; +import { ComputeClientError } from "@internal/compute"; +import { isRetryableCreateError } from "./compute.js"; + +describe("isRetryableCreateError", () => { + it("retries statuses where the create definitely did not commit", () => { + expect(isRetryableCreateError(new ComputeClientError(500, "tap busy", "http://gw"))).toBe( + true + ); + expect(isRetryableCreateError(new ComputeClientError(503, "no placement", "http://gw"))).toBe( + true + ); + }); + + it("does not retry lost-response statuses (create may have committed)", () => { + expect(isRetryableCreateError(new ComputeClientError(502, "bad gateway", "http://gw"))).toBe( + false + ); + expect( + isRetryableCreateError(new ComputeClientError(504, "gateway timeout", "http://gw")) + ).toBe(false); + }); + + it("does not retry 4xx responses", () => { + expect(isRetryableCreateError(new ComputeClientError(400, "bad request", "http://gw"))).toBe( + false + ); + expect(isRetryableCreateError(new ComputeClientError(409, "conflict", "http://gw"))).toBe( + false + ); + }); + + it("does not retry timeouts (instance may still be provisioning)", () => { + expect(isRetryableCreateError(new DOMException("timed out", "TimeoutError"))).toBe(false); + }); + + it("retries network-level fetch failures", () => { + expect(isRetryableCreateError(new TypeError("fetch failed"))).toBe(true); + }); + + it("does not retry unknown errors", () => { + expect(isRetryableCreateError(new Error("something else"))).toBe(false); + expect(isRetryableCreateError("string error")).toBe(false); + }); +}); diff --git a/apps/supervisor/src/workloadManager/compute.ts b/apps/supervisor/src/workloadManager/compute.ts index 3efad7d4077..2719712d916 100644 --- a/apps/supervisor/src/workloadManager/compute.ts +++ b/apps/supervisor/src/workloadManager/compute.ts @@ -6,12 +6,41 @@ import { type WorkloadManagerCreateOptions, type WorkloadManagerOptions, } from "./types.js"; -import { ComputeClient, stripImageDigest } from "@internal/compute"; +import { ComputeClient, ComputeClientError, stripImageDigest } from "@internal/compute"; +import { setTimeout as sleep } from "node:timers/promises"; import { extractTraceparent, getRunnerId } from "../util.js"; import type { OtlpTraceService } from "../services/otlpTraceService.js"; import { tryCatch } from "@trigger.dev/core"; import { encodeBaggage, fromContext } from "../wideEvents/index.js"; +const CREATE_MAX_ATTEMPTS = 3; +const CREATE_RETRY_BASE_DELAY_MS = 250; + +/** + * Whether a failed instance create is worth retrying. Only statuses where + * the create definitely did NOT commit are retried: 500 means the agent or + * fcrun returned a create error (e.g. a netns slot holding the tap busy, a + * full node disk - placement may differ on retry), 503 means the gateway + * had nowhere to place it. 502/504 are excluded: the gateway emits those + * when it fails to reach the node or read its response, which can happen + * AFTER the agent committed the create - and the gateway only records the + * instance name on a clean 201, so a same-name retry would miss the + * collision check and could double-create the VM on another node. 4xx won't + * heal on retry, and timeouts may still be provisioning. Network-level + * fetch failures are safe: if the gateway processed the create, its name + * index is populated and the retry 409s harmlessly. + */ +export function isRetryableCreateError(error: unknown): boolean { + if (error instanceof ComputeClientError) { + return error.status === 500 || error.status === 503; + } + if (error instanceof DOMException && error.name === "TimeoutError") { + return false; + } + // Network-level fetch failures (gateway briefly unreachable) + return error instanceof TypeError; +} + type ComputeWorkloadManagerOptions = WorkloadManagerOptions & { gateway: { url: string; @@ -165,27 +194,48 @@ export class ComputeWorkloadManager implements WorkloadManager { const startMs = performance.now(); try { - const [error, data] = await tryCatch( - this.compute.instances.create({ - name: runnerId, - image: imageRef, - env: envVars, - cpu: opts.machine.cpu, - memory_gb: opts.machine.memory, - metadata: { - runId: opts.runFriendlyId, - envId: opts.envId, - envType: opts.envType, - orgId: opts.orgId, - projectId: opts.projectId, - deploymentVersion: opts.deploymentVersion, - machine: opts.machine.name, - }, - ...(Object.keys(labels).length > 0 ? { labels } : {}), - }) - ); + const createRequest = { + name: runnerId, + image: imageRef, + env: envVars, + cpu: opts.machine.cpu, + memory_gb: opts.machine.memory, + metadata: { + runId: opts.runFriendlyId, + envId: opts.envId, + envType: opts.envType, + orgId: opts.orgId, + projectId: opts.projectId, + deploymentVersion: opts.deploymentVersion, + machine: opts.machine.name, + }, + ...(Object.keys(labels).length > 0 ? { labels } : {}), + }; + + // Retry transient placement failures instead of abandoning the run: a + // swallowed create error leaves the run waiting for the run engine's + // PENDING_EXECUTING timeout (minutes) before it is redriven, while a + // retried create typically succeeds in under a second (TRI-10293). + let error: unknown; + let data: Awaited> | null | undefined; + let attempt = 1; + for (; attempt <= CREATE_MAX_ATTEMPTS; attempt++) { + [error, data] = await tryCatch(this.compute.instances.create(createRequest)); + + if (!error) break; + + this.logger.warn("create instance attempt failed", { + runnerId, + attempt, + error: error instanceof Error ? error.message : String(error), + }); + + if (!isRetryableCreateError(error) || attempt === CREATE_MAX_ATTEMPTS) break; + await sleep(CREATE_RETRY_BASE_DELAY_MS * attempt); + } + event.createAttempts = attempt; - if (error) { + if (error || !data) { event.error = error instanceof Error ? error.message : String(error); event.errorType = error instanceof DOMException && error.name === "TimeoutError" ? "timeout" : "fetch"; From a76626cd3d7200674c70e6e0ea1d87b3bc1d5a0c Mon Sep 17 00:00:00 2001 From: Saadi Myftija Date: Wed, 10 Jun 2026 21:13:19 +0200 Subject: [PATCH 3/3] fix: suffix retried instance create names to dodge stale registrations A failed create can leave its instance name registered gateway/fcrun-side until async cleanup runs, so a same-name retry can 409 against our own residue (observed: tap-EBUSY 500 at 18:29Z followed by 409 name_conflict on the retry 2.7s later, costing the full redrive anyway). Give retry attempts a deterministic -rN suffix; attempt 1 keeps the unsuffixed name so the non-retry path is unchanged. The suffixed name flows into both the instance name and TRIGGER_RUNNER_ID from the same variable - every downstream flow (suspend scheduling, snapshot dispatch, cancel guards, run-engine fields) treats it as one opaque self-reported token, and restored VMs already carry deterministic name suffixes. Temporary measure (TRI-10293): the proper fix is gateway-side cleanup of failed-create registrations. --- .../src/workloadManager/compute.test.ts | 13 +++++- .../supervisor/src/workloadManager/compute.ts | 46 +++++++++++++++++-- 2 files changed, 55 insertions(+), 4 deletions(-) diff --git a/apps/supervisor/src/workloadManager/compute.test.ts b/apps/supervisor/src/workloadManager/compute.test.ts index 7598caf04f3..ea5ddabf285 100644 --- a/apps/supervisor/src/workloadManager/compute.test.ts +++ b/apps/supervisor/src/workloadManager/compute.test.ts @@ -1,6 +1,17 @@ import { describe, expect, it } from "vitest"; import { ComputeClientError } from "@internal/compute"; -import { isRetryableCreateError } from "./compute.js"; +import { isRetryableCreateError, runnerNameForAttempt } from "./compute.js"; + +describe("runnerNameForAttempt", () => { + it("keeps the unsuffixed name for the first attempt", () => { + expect(runnerNameForAttempt("runner-abc123", 1)).toBe("runner-abc123"); + }); + + it("suffixes retry attempts deterministically", () => { + expect(runnerNameForAttempt("runner-abc123", 2)).toBe("runner-abc123-r2"); + expect(runnerNameForAttempt("runner-abc123", 3)).toBe("runner-abc123-r3"); + }); +}); describe("isRetryableCreateError", () => { it("retries statuses where the create definitely did not commit", () => { diff --git a/apps/supervisor/src/workloadManager/compute.ts b/apps/supervisor/src/workloadManager/compute.ts index 2719712d916..307dc3615a2 100644 --- a/apps/supervisor/src/workloadManager/compute.ts +++ b/apps/supervisor/src/workloadManager/compute.ts @@ -16,6 +16,23 @@ import { encodeBaggage, fromContext } from "../wideEvents/index.js"; const CREATE_MAX_ATTEMPTS = 3; const CREATE_RETRY_BASE_DELAY_MS = 250; +/** + * TEMPORARY (TRI-10293): a failed create can leave its instance name + * registered gateway/fcrun-side until async cleanup runs, so a same-name + * retry can 409 against our own residue. Until the gateway cleans up + * failed-create registrations properly, retry attempts get a deterministic + * suffix. Attempt 1 keeps the unsuffixed name so the non-retry path is + * unchanged; the suffixed name flows into both the instance name and + * TRIGGER_RUNNER_ID, which downstream flows treat as one opaque + * self-reported token. Only attempts following a ComputeClientError are + * suffixed - network-failure retries keep the same name on purpose, because + * the gateway's name-collision 409 is their safety net against + * double-creating an instance whose create response was lost. + */ +export function runnerNameForAttempt(runnerId: string, attempt: number): string { + return attempt === 1 ? runnerId : `${runnerId}-r${attempt}`; +} + /** * Whether a failed instance create is worth retrying. Only statuses where * the create definitely did NOT commit are retried: 500 means the agent or @@ -219,13 +236,36 @@ export class ComputeWorkloadManager implements WorkloadManager { let error: unknown; let data: Awaited> | null | undefined; let attempt = 1; + // Set after a ComputeClientError: the failed create may have left its + // name registered, so subsequent attempts use a suffixed name. + let suffixAttempts = false; for (; attempt <= CREATE_MAX_ATTEMPTS; attempt++) { - [error, data] = await tryCatch(this.compute.instances.create(createRequest)); + const attemptRunnerId = suffixAttempts + ? runnerNameForAttempt(runnerId, attempt) + : runnerId; + [error, data] = await tryCatch( + this.compute.instances.create( + attemptRunnerId === runnerId + ? createRequest + : { + ...createRequest, + name: attemptRunnerId, + env: { ...envVars, TRIGGER_RUNNER_ID: attemptRunnerId }, + } + ) + ); + + if (!error) { + event.runnerId = attemptRunnerId; + break; + } - if (!error) break; + if (error instanceof ComputeClientError) { + suffixAttempts = true; + } this.logger.warn("create instance attempt failed", { - runnerId, + runnerId: attemptRunnerId, attempt, error: error instanceof Error ? error.message : String(error), });