Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 130 additions & 0 deletions apps/supervisor/src/services/computeSnapshotService.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
import { describe, expect, it, vi } from "vitest";
import { setTimeout as sleep } from "node:timers/promises";
import { ComputeSnapshotService } from "./computeSnapshotService.js";
import type { ComputeWorkloadManager } from "../workloadManager/compute.js";
import type { SupervisorHttpClient } from "@trigger.dev/core/v3/workers";

// The TimerWheel ticks every 100ms, so a 200ms delay dispatches within ~300ms.
const DELAY_MS = 200;
// Long enough that a pending snapshot would certainly have dispatched.
const SETTLE_MS = 600;

function createService() {
const snapshot = vi.fn(async (_opts: { runnerId: string; metadata: Record<string, string> }) => true);

const computeManager = {
snapshotDelayMs: DELAY_MS,
snapshotDispatchLimit: 1,
snapshot,
} as unknown as ComputeWorkloadManager;

const service = new ComputeSnapshotService({
computeManager,
workerClient: {} as SupervisorHttpClient,
wideEventOpts: { service: "supervisor-test", env: {}, enabled: false },
});

return { service, snapshot };
}

function delayedSnapshot(runnerId = "runner-1") {
return {
runnerId,
runFriendlyId: "run_1",
snapshotFriendlyId: "snapshot_1",
};
}

describe("ComputeSnapshotService", () => {
it("dispatches a scheduled snapshot after the delay", async () => {
const { service, snapshot } = createService();
try {
service.schedule("run_1", delayedSnapshot());

await vi.waitFor(() => expect(snapshot).toHaveBeenCalledTimes(1), { timeout: 2_000 });
expect(snapshot).toHaveBeenCalledWith({
runnerId: "runner-1",
metadata: { runId: "run_1", snapshotFriendlyId: "snapshot_1" },
});
} finally {
service.stop();
}
});

it("cancel before the delay expires prevents the dispatch", async () => {
const { service, snapshot } = createService();
try {
service.schedule("run_1", delayedSnapshot());

expect(service.cancel("run_1")).toBe(true);

await sleep(SETTLE_MS);
expect(snapshot).not.toHaveBeenCalled();
} finally {
service.stop();
}
});

it("cancel returns false when nothing is pending", () => {
const { service } = createService();
try {
expect(service.cancel("run_1")).toBe(false);
} finally {
service.stop();
}
});

it("cancel with a matching runnerId cancels the pending snapshot", async () => {
const { service, snapshot } = createService();
try {
service.schedule("run_1", delayedSnapshot("runner-a"));

expect(service.cancel("run_1", "runner-a")).toBe(true);

await sleep(SETTLE_MS);
expect(snapshot).not.toHaveBeenCalled();
} finally {
service.stop();
}
});

it("cancel with a different runnerId leaves the pending snapshot alone", async () => {
const { service, snapshot } = createService();
try {
service.schedule("run_1", delayedSnapshot("runner-a"));

// A stale runner for a reassigned run must not cancel the new runner's snapshot.
expect(service.cancel("run_1", "runner-b")).toBe(false);

await vi.waitFor(() => expect(snapshot).toHaveBeenCalledTimes(1), { timeout: 2_000 });
expect(snapshot).toHaveBeenCalledWith(
expect.objectContaining({ runnerId: "runner-a" })
);
} finally {
service.stop();
}
});

it("re-scheduling the same run replaces the pending snapshot", async () => {
const { service, snapshot } = createService();
try {
service.schedule("run_1", delayedSnapshot());
service.schedule("run_1", {
runnerId: "runner-1",
runFriendlyId: "run_1",
snapshotFriendlyId: "snapshot_2",
});

await vi.waitFor(() => expect(snapshot).toHaveBeenCalledTimes(1), { timeout: 2_000 });
await sleep(SETTLE_MS);

expect(snapshot).toHaveBeenCalledTimes(1);
expect(snapshot).toHaveBeenCalledWith({
runnerId: "runner-1",
metadata: { runId: "run_1", snapshotFriendlyId: "snapshot_2" },
});
} finally {
service.stop();
}
});
});
15 changes: 13 additions & 2 deletions apps/supervisor/src/services/computeSnapshotService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,19 @@ export class ComputeSnapshotService {
});
}

/** Cancel a pending delayed snapshot. Returns true if one was cancelled. */
cancel(runFriendlyId: string): boolean {
/**
* Cancel a pending delayed snapshot. Returns true if one was cancelled.
* When `runnerId` is given, only a snapshot scheduled for that same runner
* is cancelled - a stale runner for a run that has since been reassigned
* must not cancel the new runner's pending snapshot.
*/
cancel(runFriendlyId: string, runnerId?: string): boolean {
if (runnerId) {
const pending = this.timerWheel.peek(runFriendlyId);
if (pending && pending.data.runnerId !== runnerId) {
return false;
}
}
const cancelled = this.timerWheel.cancel(runFriendlyId);
if (cancelled) {
emitOneShot({
Expand Down
6 changes: 6 additions & 0 deletions apps/supervisor/src/services/timerWheel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,12 @@ export class TimerWheel<T> {
return true;
}

/** Look up a pending item without removing it. */
peek(key: string): TimerWheelItem<T> | undefined {
const entry = this.entries.get(key);
return entry ? { key, data: entry.data } : undefined;
}

/** Number of pending items in the wheel. */
get size(): number {
return this.entries.size;
Expand Down
56 changes: 56 additions & 0 deletions apps/supervisor/src/workloadManager/compute.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import { describe, expect, it } from "vitest";
import { ComputeClientError } from "@internal/compute";
import { isRetryableCreateError, runnerNameForAttempt } from "./compute.js";

describe("runnerNameForAttempt", () => {
it("keeps the unsuffixed name for the first attempt", () => {
expect(runnerNameForAttempt("runner-abc123", 1)).toBe("runner-abc123");
});

it("suffixes retry attempts deterministically", () => {
expect(runnerNameForAttempt("runner-abc123", 2)).toBe("runner-abc123-r2");
expect(runnerNameForAttempt("runner-abc123", 3)).toBe("runner-abc123-r3");
});
});

describe("isRetryableCreateError", () => {
it("retries statuses where the create definitely did not commit", () => {
expect(isRetryableCreateError(new ComputeClientError(500, "tap busy", "http://gw"))).toBe(
true
);
expect(isRetryableCreateError(new ComputeClientError(503, "no placement", "http://gw"))).toBe(
true
);
});

it("does not retry lost-response statuses (create may have committed)", () => {
expect(isRetryableCreateError(new ComputeClientError(502, "bad gateway", "http://gw"))).toBe(
false
);
expect(
isRetryableCreateError(new ComputeClientError(504, "gateway timeout", "http://gw"))
).toBe(false);
});

it("does not retry 4xx responses", () => {
expect(isRetryableCreateError(new ComputeClientError(400, "bad request", "http://gw"))).toBe(
false
);
expect(isRetryableCreateError(new ComputeClientError(409, "conflict", "http://gw"))).toBe(
false
);
});

it("does not retry timeouts (instance may still be provisioning)", () => {
expect(isRetryableCreateError(new DOMException("timed out", "TimeoutError"))).toBe(false);
});

it("retries network-level fetch failures", () => {
expect(isRetryableCreateError(new TypeError("fetch failed"))).toBe(true);
});

it("does not retry unknown errors", () => {
expect(isRetryableCreateError(new Error("something else"))).toBe(false);
expect(isRetryableCreateError("string error")).toBe(false);
});
});
132 changes: 111 additions & 21 deletions apps/supervisor/src/workloadManager/compute.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,58 @@ import {
type WorkloadManagerCreateOptions,
type WorkloadManagerOptions,
} from "./types.js";
import { ComputeClient, stripImageDigest } from "@internal/compute";
import { ComputeClient, ComputeClientError, stripImageDigest } from "@internal/compute";
import { setTimeout as sleep } from "node:timers/promises";
import { extractTraceparent, getRunnerId } from "../util.js";
import type { OtlpTraceService } from "../services/otlpTraceService.js";
import { tryCatch } from "@trigger.dev/core";
import { encodeBaggage, fromContext } from "../wideEvents/index.js";

const CREATE_MAX_ATTEMPTS = 3;
const CREATE_RETRY_BASE_DELAY_MS = 250;

/**
* TEMPORARY (TRI-10293): a failed create can leave its instance name
* registered gateway/fcrun-side until async cleanup runs, so a same-name
* retry can 409 against our own residue. Until the gateway cleans up
* failed-create registrations properly, retry attempts get a deterministic
* suffix. Attempt 1 keeps the unsuffixed name so the non-retry path is
* unchanged; the suffixed name flows into both the instance name and
* TRIGGER_RUNNER_ID, which downstream flows treat as one opaque
* self-reported token. Only attempts following a ComputeClientError are
* suffixed - network-failure retries keep the same name on purpose, because
* the gateway's name-collision 409 is their safety net against
* double-creating an instance whose create response was lost.
*/
export function runnerNameForAttempt(runnerId: string, attempt: number): string {
return attempt === 1 ? runnerId : `${runnerId}-r${attempt}`;
}

/**
* Whether a failed instance create is worth retrying. Only statuses where
* the create definitely did NOT commit are retried: 500 means the agent or
* fcrun returned a create error (e.g. a netns slot holding the tap busy, a
* full node disk - placement may differ on retry), 503 means the gateway
* had nowhere to place it. 502/504 are excluded: the gateway emits those
* when it fails to reach the node or read its response, which can happen
* AFTER the agent committed the create - and the gateway only records the
* instance name on a clean 201, so a same-name retry would miss the
* collision check and could double-create the VM on another node. 4xx won't
* heal on retry, and timeouts may still be provisioning. Network-level
* fetch failures are safe: if the gateway processed the create, its name
* index is populated and the retry 409s harmlessly.
*/
export function isRetryableCreateError(error: unknown): boolean {
if (error instanceof ComputeClientError) {
return error.status === 500 || error.status === 503;
}
if (error instanceof DOMException && error.name === "TimeoutError") {
return false;
}
// Network-level fetch failures (gateway briefly unreachable)
return error instanceof TypeError;
}

type ComputeWorkloadManagerOptions = WorkloadManagerOptions & {
gateway: {
url: string;
Expand Down Expand Up @@ -165,27 +211,71 @@ export class ComputeWorkloadManager implements WorkloadManager {
const startMs = performance.now();

try {
const [error, data] = await tryCatch(
this.compute.instances.create({
name: runnerId,
image: imageRef,
env: envVars,
cpu: opts.machine.cpu,
memory_gb: opts.machine.memory,
metadata: {
runId: opts.runFriendlyId,
envId: opts.envId,
envType: opts.envType,
orgId: opts.orgId,
projectId: opts.projectId,
deploymentVersion: opts.deploymentVersion,
machine: opts.machine.name,
},
...(Object.keys(labels).length > 0 ? { labels } : {}),
})
);
const createRequest = {
name: runnerId,
image: imageRef,
env: envVars,
cpu: opts.machine.cpu,
memory_gb: opts.machine.memory,
metadata: {
runId: opts.runFriendlyId,
envId: opts.envId,
envType: opts.envType,
orgId: opts.orgId,
projectId: opts.projectId,
deploymentVersion: opts.deploymentVersion,
machine: opts.machine.name,
},
...(Object.keys(labels).length > 0 ? { labels } : {}),
};

// Retry transient placement failures instead of abandoning the run: a
// swallowed create error leaves the run waiting for the run engine's
// PENDING_EXECUTING timeout (minutes) before it is redriven, while a
// retried create typically succeeds in under a second (TRI-10293).
let error: unknown;
let data: Awaited<ReturnType<typeof this.compute.instances.create>> | null | undefined;
let attempt = 1;
// Set after a ComputeClientError: the failed create may have left its
// name registered, so subsequent attempts use a suffixed name.
let suffixAttempts = false;
for (; attempt <= CREATE_MAX_ATTEMPTS; attempt++) {
const attemptRunnerId = suffixAttempts
? runnerNameForAttempt(runnerId, attempt)
: runnerId;
[error, data] = await tryCatch(
this.compute.instances.create(
attemptRunnerId === runnerId
? createRequest
: {
...createRequest,
name: attemptRunnerId,
env: { ...envVars, TRIGGER_RUNNER_ID: attemptRunnerId },
}
)
);

if (!error) {
event.runnerId = attemptRunnerId;
break;
}

if (error instanceof ComputeClientError) {
suffixAttempts = true;
}

this.logger.warn("create instance attempt failed", {
runnerId: attemptRunnerId,
attempt,
error: error instanceof Error ? error.message : String(error),
});

if (!isRetryableCreateError(error) || attempt === CREATE_MAX_ATTEMPTS) break;
await sleep(CREATE_RETRY_BASE_DELAY_MS * attempt);
}
event.createAttempts = attempt;

if (error) {
if (error || !data) {
event.error = error instanceof Error ? error.message : String(error);
event.errorType =
error instanceof DOMException && error.name === "TimeoutError" ? "timeout" : "fetch";
Expand Down
Loading