Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 13 additions & 5 deletions packages/agent/src/adapters/claude/claude-agent.refresh.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
import type { AgentSideConnection } from "@agentclientprotocol/sdk";
import {
type AgentSideConnection,
RequestError,
} from "@agentclientprotocol/sdk";
import { beforeEach, describe, expect, it, vi } from "vitest";
import { POSTHOG_METHODS } from "../../acp-extensions";
import { Pushable } from "../../utils/streams";
Expand All @@ -15,6 +18,7 @@ type SdkQueryHandle = {
setMcpServers: ReturnType<typeof vi.fn>;
supportedCommands: ReturnType<typeof vi.fn>;
initializationResult: ReturnType<typeof vi.fn>;
close: ReturnType<typeof vi.fn>;
[Symbol.asyncIterator]: () => AsyncIterator<never>;
};

Expand All @@ -31,6 +35,7 @@ function makeQueryHandle(): SdkQueryHandle {
setMcpServers: vi.fn().mockResolvedValue(undefined),
supportedCommands: vi.fn().mockResolvedValue([]),
initializationResult: vi.fn().mockImplementation(() => nextInitPromise),
close: vi.fn(),
[Symbol.asyncIterator]: async function* () {
/* never yields */
} as never,
Expand Down Expand Up @@ -192,7 +197,7 @@ describe("ClaudeAcpAgent.extMethod refresh_session", () => {
).rejects.toThrow(/does not support MCP injection/);
});

it("throws when initialization of the new query times out", async () => {
it("throws a RequestError and closes the timed-out query so it cannot leak", async () => {
vi.useFakeTimers();
try {
const agent = makeAgent();
Expand All @@ -209,9 +214,12 @@ describe("ClaudeAcpAgent.extMethod refresh_session", () => {

await vi.advanceTimersByTimeAsync(30_001);

await expect(promise).rejects.toThrow(
/Session refresh timed out for s-timeout/,
);
// A RequestError (not a plain Error) is what survives the ACP layer
// instead of being collapsed into a generic "Internal error".
await expect(promise).rejects.toBeInstanceOf(RequestError);
await expect(promise).rejects.toThrow(/Session refresh timed out after/);
// The new query is closed so its CLI subprocess does not leak.
expect(createdQueries[0]?.close).toHaveBeenCalledTimes(1);
} finally {
vi.useRealTimers();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,24 @@ type SdkQueryHandle = {
setMcpServers: ReturnType<typeof vi.fn>;
supportedCommands: ReturnType<typeof vi.fn>;
initializationResult: ReturnType<typeof vi.fn>;
close: ReturnType<typeof vi.fn>;
[Symbol.asyncIterator]: () => AsyncIterator<never>;
};

let nextInitPromise: Promise<unknown> = Promise.resolve({
result: "success",
commands: [],
models: [],
});

function makeQueryHandle(): SdkQueryHandle {
return {
interrupt: vi.fn().mockResolvedValue(undefined),
setModel: vi.fn().mockResolvedValue(undefined),
setMcpServers: vi.fn().mockResolvedValue(undefined),
supportedCommands: vi.fn().mockResolvedValue([]),
initializationResult: vi.fn().mockResolvedValue({
result: "success",
commands: [],
models: [],
}),
initializationResult: vi.fn().mockImplementation(() => nextInitPromise),
close: vi.fn(),
[Symbol.asyncIterator]: async function* () {
/* never yields */
} as never,
Expand Down Expand Up @@ -101,6 +105,11 @@ afterAll(() => {
describe("ClaudeAcpAgent session model on resume", () => {
beforeEach(() => {
createdQueries.length = 0;
nextInitPromise = Promise.resolve({
result: "success",
commands: [],
models: [],
});
// No gateway: fetchGatewayModels returns [] and the requested model is
// kept as a custom option — mirrors the gateway-outage failure mode.
delete process.env.ANTHROPIC_BASE_URL;
Expand Down Expand Up @@ -157,4 +166,43 @@ describe("ClaudeAcpAgent session model on resume", () => {
expect(createdQueries).toHaveLength(1);
expect(createdQueries[0].setModel).not.toHaveBeenCalled();
});

// The timeout *message* (RequestError "... timed out after ...") is covered
// by claude-agent.refresh.test.ts. Here we cover the leak fix on the
// new-session and resume paths: any init failure must close the query so the
// CLI subprocess can't leak and be multiplied by the retry loop.
it("closes the query and rethrows when new-session init fails", async () => {
const failedInit = Promise.reject(new Error("init boom"));
failedInit.catch(() => {});
nextInitPromise = failedInit;
const agent = makeAgent();

await expect(
agent.newSession({
cwd,
mcpServers: [],
_meta: { taskRunId: "run-init-fail-new" },
}),
).rejects.toThrow(/init boom/);

expect(createdQueries[0]?.close).toHaveBeenCalledTimes(1);
});

it("closes the query and rethrows when resume init fails", async () => {
const failedInit = Promise.reject(new Error("resume boom"));
failedInit.catch(() => {});
nextInitPromise = failedInit;
const agent = makeAgent();

await expect(
agent.resumeSession({
sessionId: "0197a000-0000-7000-8000-0000000000ff",
cwd,
mcpServers: [],
_meta: { taskRunId: "run-init-fail-resume" },
}),
).rejects.toThrow(/resume boom/);

expect(createdQueries[0]?.close).toHaveBeenCalledTimes(1);
});
});
36 changes: 30 additions & 6 deletions packages/agent/src/adapters/claude/claude-agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1286,7 +1286,12 @@ export class ClaudeAcpAgent extends BaseAcpAgent {
SESSION_VALIDATION_TIMEOUT_MS,
);
if (result.result === "timeout") {
throw new Error(`Session refresh timed out for ${this.sessionId}`);
this.terminateQuery(newQuery, newAbortController);
throw new RequestError(
-32603,
`Session refresh timed out after ${SESSION_VALIDATION_TIMEOUT_MS}ms`,
{ sessionId: this.sessionId },
);
}

// Re-fetch MCP tool metadata + slash commands — the server list changed.
Expand Down Expand Up @@ -1467,6 +1472,20 @@ export class ClaudeAcpAgent extends BaseAcpAgent {
}
}

/**
* Without this, a timed-out session leaks an orphaned `claude` process that
* the retry loop then multiplies. Aborting the controller kills the
* subprocess via the spawn signal; closing the query stops further reads.
*/
private terminateQuery(sdkQuery: Query, controller: AbortController): void {
controller.abort();
try {
sdkQuery.close();
} catch {
// Query may already be closed.
}
}

private async createSession(
params: {
cwd: string;
Expand Down Expand Up @@ -1655,15 +1674,18 @@ export class ClaudeAcpAgent extends BaseAcpAgent {
SESSION_VALIDATION_TIMEOUT_MS,
);
if (result.result === "timeout") {
throw new Error(
`Session ${forkSession ? "fork" : "resumption"} timed out for sessionId=${sessionId}`,
throw new RequestError(
-32603,
`Session ${forkSession ? "fork" : "resumption"} timed out after ${SESSION_VALIDATION_TIMEOUT_MS}ms`,
{ sessionId, taskId, taskRunId: meta?.taskRunId },
);
}
session.knownSlashCommands = collectKnownSlashCommands(
result.value.commands,
);
} catch (err) {
settingsManager.dispose();
this.terminateQuery(q, abortController);
if (
err instanceof Error &&
err.message === "Query closed before response received"
Expand Down Expand Up @@ -1718,16 +1740,18 @@ export class ClaudeAcpAgent extends BaseAcpAgent {
try {
const initResult = await initPromise;
if (initResult.result === "timeout") {
settingsManager.dispose();
throw new Error(
`Session initialization timed out for sessionId=${sessionId}`,
throw new RequestError(
-32603,
`Session initialization timed out after ${SESSION_VALIDATION_TIMEOUT_MS}ms`,
{ sessionId, taskId, taskRunId: meta?.taskRunId },
);
}
session.knownSlashCommands = collectKnownSlashCommands(
initResult.value.commands,
);
} catch (err) {
settingsManager.dispose();
this.terminateQuery(q, abortController);
this.logger.error("Session initialization failed", {
sessionId,
taskId,
Expand Down
35 changes: 34 additions & 1 deletion packages/agent/src/gateway-models.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import { describe, expect, it } from "vitest";
import { afterEach, describe, expect, it, vi } from "vitest";
import {
fetchGatewayModels,
fetchModelsList,
formatGatewayModelName,
getClaudeModelRecency,
isBlockedModelId,
Expand Down Expand Up @@ -109,3 +111,34 @@ describe("getClaudeModelRecency", () => {
]);
});
});

describe("gateway model fetch timeout", () => {
afterEach(() => {
vi.restoreAllMocks();
});

// Both fetches run inside the Promise.all that gates session-init, so a
// stalled gateway must degrade to "no models" rather than hang.
it.each([
{ name: "fetchGatewayModels", fn: fetchGatewayModels },
{ name: "fetchModelsList", fn: fetchModelsList },
])(
"$name bounds the request and returns [] when it times out",
async ({ fn }) => {
// Reject the way AbortSignal.timeout would once the deadline passes.
const fetchMock = vi
.spyOn(globalThis, "fetch")
.mockRejectedValue(
new DOMException("The operation was aborted.", "TimeoutError"),
);

await expect(
fn({ gatewayUrl: "https://gateway.timeout-test" }),
).resolves.toEqual([]);

expect(fetchMock).toHaveBeenCalledTimes(1);
const init = fetchMock.mock.calls[0]?.[1] as RequestInit | undefined;
expect(init?.signal).toBeInstanceOf(AbortSignal);
},
);
});
14 changes: 12 additions & 2 deletions packages/agent/src/gateway-models.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ type ModelsListResponse =

const CACHE_TTL = 10 * 60 * 1000; // 10 minutes

// Bound the gateway /v1/models request so a stalled connection cannot hold up
// session init: this fetch runs inside the Promise.all that gates the 30s SDK
// initialization timeout, so it must resolve well within that window. On abort
// the callers fall through to `return []`.
const GATEWAY_FETCH_TIMEOUT_MS = 10_000;

let gatewayModelsCache: {
models: GatewayModel[];
expiry: number;
Expand All @@ -76,7 +82,9 @@ export async function fetchGatewayModels(
const modelsUrl = `${gatewayUrl}/v1/models`;

try {
const response = await fetch(modelsUrl);
const response = await fetch(modelsUrl, {
signal: AbortSignal.timeout(GATEWAY_FETCH_TIMEOUT_MS),
});

if (!response.ok) {
return [];
Expand Down Expand Up @@ -138,7 +146,9 @@ export async function fetchModelsList(

try {
const modelsUrl = `${gatewayUrl}/v1/models`;
const response = await fetch(modelsUrl);
const response = await fetch(modelsUrl, {
signal: AbortSignal.timeout(GATEWAY_FETCH_TIMEOUT_MS),
});
if (!response.ok) {
return [];
}
Expand Down
Loading
Loading