From 17f342c2868f6b760a9f64bdc5aa187393100706 Mon Sep 17 00:00:00 2001 From: Theodore Li Date: Wed, 17 Jun 2026 18:20:42 -0700 Subject: [PATCH 1/7] feat(providers): hosted-key support for LLM providers (flag-gated, no rate limiting) --- apps/sim/lib/api-key/byok.ts | 56 ++++++++++- apps/sim/lib/api-key/hosted-cost.test.ts | 95 ++++++++++++++++++ apps/sim/lib/api-key/hosted-cost.ts | 93 ++++++++++++++++++ apps/sim/lib/core/config/env.ts | 1 + apps/sim/lib/core/config/feature-flags.ts | 8 ++ .../hosted-key-rate-limiter.test.ts | 43 +++++++- .../hosted-key/hosted-key-rate-limiter.ts | 98 ++++++++++++------- .../lib/core/rate-limiter/hosted-key/types.ts | 16 ++- apps/sim/providers/anthropic/core.ts | 7 ++ apps/sim/providers/baseten/index.ts | 5 + apps/sim/providers/fireworks/index.ts | 5 + apps/sim/providers/gemini/core.ts | 46 ++++----- apps/sim/providers/index.ts | 53 +++++++++- apps/sim/providers/mistral/index.ts | 5 + apps/sim/providers/models.ts | 58 +++++++++++ apps/sim/providers/ollama/core.ts | 5 + apps/sim/providers/openai/core.ts | 5 + apps/sim/providers/streaming-execution.ts | 62 +++++++++++- apps/sim/providers/together/index.ts | 5 + apps/sim/providers/types.ts | 6 ++ apps/sim/providers/utils.ts | 5 + apps/sim/tools/index.ts | 45 ++------- 22 files changed, 615 insertions(+), 107 deletions(-) create mode 100644 apps/sim/lib/api-key/hosted-cost.test.ts create mode 100644 apps/sim/lib/api-key/hosted-cost.ts diff --git a/apps/sim/lib/api-key/byok.ts b/apps/sim/lib/api-key/byok.ts index b131d2f7425..d0ca54425ed 100644 --- a/apps/sim/lib/api-key/byok.ts +++ b/apps/sim/lib/api-key/byok.ts @@ -5,9 +5,11 @@ import { and, asc, eq } from 'drizzle-orm' import { getRotatingApiKey } from '@/lib/core/config/api-keys' import { env } from '@/lib/core/config/env' import { isHosted } from '@/lib/core/config/env-flags' +import { isFeatureEnabled } from '@/lib/core/config/feature-flags' +import { getHostedKeyRateLimiter } from '@/lib/core/rate-limiter' import { decryptSecret } from '@/lib/core/security/encryption' import { getWorkspaceById } from '@/lib/workspaces/permissions/utils' -import { getHostedModels } from '@/providers/models' +import { getHostedModels, getProviderHosting } from '@/providers/models' import { PROVIDER_PLACEHOLDER_KEY } from '@/providers/utils' import { useProvidersStore } from '@/stores/providers/store' import type { BYOKProviderId } from '@/tools/types' @@ -90,12 +92,60 @@ export async function getBYOKKey( } } +export interface ApiKeyResolution { + apiKey: string + isBYOK: boolean + /** Env var name of the platform key used (only when a hosted-key-pool key was acquired). */ + hostedKeyEnvVar?: string +} + export async function getApiKeyWithBYOK( provider: string, model: string, workspaceId: string | undefined | null, - userProvidedKey?: string -): Promise<{ apiKey: string; isBYOK: boolean }> { + userProvidedKey?: string, + userId?: string | null +): Promise { + // Unified hosted-key path (flag-gated). For any provider with a hosting config: + // BYOK workspace key wins; otherwise acquire a platform key through the shared + // hosted-key framework with no rate limiting. Falls through to the legacy + // per-provider logic when the flag is off or no platform keys are configured, + // keeping flag-off behavior identical. + if (isHosted && workspaceId) { + const hosting = getProviderHosting(provider) + if (hosting && (await isFeatureEnabled('hosted-key-llm', { userId }))) { + const byokResult = await getBYOKKey(workspaceId, hosting.byokProviderId) + if (byokResult) { + logger.info('Using BYOK key (hosted-key-llm)', { provider, model, workspaceId }) + return byokResult + } + + const acquired = await getHostedKeyRateLimiter().acquireKey( + hosting.byokProviderId, + hosting.envKeyPrefix, + { mode: 'none' }, + workspaceId + ) + if (acquired.success && acquired.key) { + logger.info('Using hosted platform key (hosted-key-llm)', { + provider, + model, + workspaceId, + key: acquired.envVarName, + }) + return { + apiKey: acquired.key, + isBYOK: false, + hostedKeyEnvVar: acquired.envVarName, + } + } + logger.debug('No hosted platform keys configured, falling back to legacy path', { + provider, + model, + }) + } + } + const isOllamaModel = provider === 'ollama' || useProvidersStore.getState().providers.ollama.models.includes(model) if (isOllamaModel) { diff --git a/apps/sim/lib/api-key/hosted-cost.test.ts b/apps/sim/lib/api-key/hosted-cost.test.ts new file mode 100644 index 00000000000..92e7b15b7bb --- /dev/null +++ b/apps/sim/lib/api-key/hosted-cost.test.ts @@ -0,0 +1,95 @@ +/** + * @vitest-environment node + */ +import { beforeEach, describe, expect, it, vi } from 'vitest' + +const { mockRecordUsed, mockRecordCostCharged } = vi.hoisted(() => ({ + mockRecordUsed: vi.fn(), + mockRecordCostCharged: vi.fn(), +})) + +vi.mock('@/lib/monitoring/metrics', () => ({ + hostedKeyMetrics: { + recordUsed: mockRecordUsed, + recordCostCharged: mockRecordCostCharged, + }, +})) + +import { + calculateHostedCost, + classifyHostedKeyFailure, + emitHostedKeyUsage, +} from '@/lib/api-key/hosted-cost' + +describe('calculateHostedCost (tool pricing)', () => { + it('per_request returns the flat fee', () => { + expect(calculateHostedCost({ type: 'per_request', cost: 0.005 }, {}, {})).toEqual({ + cost: 0.005, + }) + }) + + it('custom returns a numeric getCost result', () => { + const pricing = { type: 'custom' as const, getCost: () => 0.42 } + expect(calculateHostedCost(pricing, {}, {})).toEqual({ cost: 0.42 }) + }) + + it('custom passes through a structured getCost result with metadata', () => { + const pricing = { + type: 'custom' as const, + getCost: () => ({ cost: 1.5, metadata: { units: 3 } }), + } + expect(calculateHostedCost(pricing, {}, {})).toEqual({ cost: 1.5, metadata: { units: 3 } }) + }) + + it('forwards params and response to custom getCost', () => { + const getCost = vi.fn(() => 1) + const params = { a: 1 } + const response = { b: 2 } + calculateHostedCost({ type: 'custom', getCost }, params, response) + expect(getCost).toHaveBeenCalledWith(params, response) + }) +}) + +describe('classifyHostedKeyFailure', () => { + it('classifies structured SDK errors by status', () => { + expect(classifyHostedKeyFailure({ status: 429 })).toBe('rate_limited') + expect(classifyHostedKeyFailure({ status: 503 })).toBe('rate_limited') + expect(classifyHostedKeyFailure({ status: 401 })).toBe('auth') + expect(classifyHostedKeyFailure({ status: 403, message: 'quota exceeded' })).toBe( + 'rate_limited' + ) + expect(classifyHostedKeyFailure({ status: 500 })).toBe('other') + }) + + it('classifies message-embedded status (provider errors with no .status)', () => { + // Regression: the previous `\bunauthor\b` regex never matched "Unauthorized". + expect(classifyHostedKeyFailure(new Error('Unauthorized'))).toBe('auth') + expect(classifyHostedKeyFailure(new Error('OpenAI API error (401): bad key'))).toBe('auth') + expect(classifyHostedKeyFailure(new Error('Forbidden'))).toBe('auth') + expect(classifyHostedKeyFailure(new Error('Invalid API key provided'))).toBe('auth') + expect(classifyHostedKeyFailure(new Error('API error (429): rate limit'))).toBe('rate_limited') + expect(classifyHostedKeyFailure(new Error('Internal Server Error (500)'))).toBe('other') + }) +}) + +describe('emitHostedKeyUsage', () => { + beforeEach(() => { + vi.clearAllMocks() + }) + + it('records both usage and cost with the provider/tool/key labels', () => { + emitHostedKeyUsage({ + provider: 'openai', + tool: 'gpt-4o', + key: 'OPENAI_API_KEY_2', + costTotal: 0.03, + }) + + expect(mockRecordUsed).toHaveBeenCalledWith({ + provider: 'openai', + tool: 'gpt-4o', + key: 'OPENAI_API_KEY_2', + }) + expect(mockRecordCostCharged).toHaveBeenCalledWith(0.03, { provider: 'openai', tool: 'gpt-4o' }) + }) +}) diff --git a/apps/sim/lib/api-key/hosted-cost.ts b/apps/sim/lib/api-key/hosted-cost.ts new file mode 100644 index 00000000000..ea23c603923 --- /dev/null +++ b/apps/sim/lib/api-key/hosted-cost.ts @@ -0,0 +1,93 @@ +import { hostedKeyMetrics } from '@/lib/monitoring/metrics' +import type { ToolHostingPricing } from '@/tools/types' + +export interface HostedCostResult { + /** Total billable cost in dollars. */ + cost: number + /** Optional metadata about the cost (e.g. provider breakdown from `custom` pricing). */ + metadata?: Record +} + +/** + * Cost for a hosted-key **tool** call. Tools declare config-driven pricing — + * a flat `per_request` fee or a response-derived `custom` fee. LLM providers do + * NOT use this: their cost is token-based and computed directly via + * {@link import('@/providers/utils').calculateCost}. + */ +export function calculateHostedCost( + pricing: ToolHostingPricing, + params: Record, + response: Record +): HostedCostResult { + switch (pricing.type) { + case 'per_request': + return { cost: pricing.cost } + + case 'custom': { + const result = pricing.getCost(params, response) + return typeof result === 'number' ? { cost: result } : result + } + + default: { + const exhaustiveCheck: never = pricing + throw new Error(`Unknown pricing type: ${(exhaustiveCheck as ToolHostingPricing).type}`) + } + } +} + +/** + * Classify a thrown error into a hosted-key failure reason for metrics. Handles + * both structured SDK errors (numeric `.status`) and provider errors that embed + * the status in the message string (e.g. `API error (401): ...`). Some providers + * signal quota/rate-limit via 401/403 + a descriptive message, so those count as + * `rate_limited`, not `auth`. + */ +export function classifyHostedKeyFailure(error: unknown): 'rate_limited' | 'auth' | 'other' { + const status = (error as { status?: number } | null)?.status + const message = ((error as { message?: string } | null)?.message ?? '').toLowerCase() + + if (status === 429 || status === 503) return 'rate_limited' + if (status === 401 || status === 403) { + return message.includes('quota') || message.includes('rate limit') ? 'rate_limited' : 'auth' + } + + // No structured status (e.g. provider errors that embed it in the message). + if (status === undefined) { + if ( + message.includes('quota') || + message.includes('rate limit') || + /\b(429|503)\b/.test(message) + ) + return 'rate_limited' + if ( + /\b(401|403)\b/.test(message) || + message.includes('unauthor') || + message.includes('forbidden') || + message.includes('invalid api key') + ) + return 'auth' + } + return 'other' +} + +/** + * Emit hosted-key usage telemetry for a completed call. CloudWatch only — never + * a billing write. `recordCostCharged` self-guards on `costTotal > 0`. The + * `tool` label carries the tool id for tools, or the model id for LLM calls. + */ +export function emitHostedKeyUsage(labels: { + provider: string + tool: string + key: string + costTotal: number +}): void { + hostedKeyMetrics.recordUsed({ + provider: labels.provider, + tool: labels.tool, + key: labels.key, + }) + hostedKeyMetrics.recordCostCharged(labels.costTotal, { + provider: labels.provider, + tool: labels.tool, + }) +} diff --git a/apps/sim/lib/core/config/env.ts b/apps/sim/lib/core/config/env.ts index 293c7560e57..60c64acccb2 100644 --- a/apps/sim/lib/core/config/env.ts +++ b/apps/sim/lib/core/config/env.ts @@ -409,6 +409,7 @@ export const env = createEnv({ DISABLE_INVITATIONS: z.boolean().optional(), // Disable workspace invitations globally (for self-hosted deployments) DISABLE_PUBLIC_API: z.boolean().optional(), // Disable public API access globally (for self-hosted deployments) MOTHERSHIP_BETA_FEATURES: z.boolean().optional(), // Enable beta Mothership planning/changelog artifact surfaces + HOSTED_KEY_LLM: z.boolean().optional(), // Route hosted LLM calls through the hosted-key framework (acquire + centralized cost + metrics), no rate limiting // Development Tools REACT_GRAB_ENABLED: z.boolean().optional(), // Enable React Grab for UI element debugging in Cursor/AI agents (dev only) diff --git a/apps/sim/lib/core/config/feature-flags.ts b/apps/sim/lib/core/config/feature-flags.ts index d3bc89ad794..39c92582974 100644 --- a/apps/sim/lib/core/config/feature-flags.ts +++ b/apps/sim/lib/core/config/feature-flags.ts @@ -74,6 +74,14 @@ const FEATURE_FLAGS = { 'user context — use enabled:true for global rollout rather than per-user targeting.', fallback: 'MOTHERSHIP_BETA_FEATURES', }, + 'hosted-key-llm': { + description: + 'Route hosted LLM provider calls through the hosted-key framework (acquire + centralized ' + + 'cost + metrics), with no rate limiting. Off = legacy getRotatingApiKey path. Evaluated ' + + 'server-side with userId only (no orgId in the provider request), so roll out globally or ' + + 'per-userId.', + fallback: 'HOSTED_KEY_LLM', + }, } satisfies Record /** diff --git a/apps/sim/lib/core/rate-limiter/hosted-key/hosted-key-rate-limiter.test.ts b/apps/sim/lib/core/rate-limiter/hosted-key/hosted-key-rate-limiter.test.ts index b14a34c634d..348082b46b8 100644 --- a/apps/sim/lib/core/rate-limiter/hosted-key/hosted-key-rate-limiter.test.ts +++ b/apps/sim/lib/core/rate-limiter/hosted-key/hosted-key-rate-limiter.test.ts @@ -85,10 +85,13 @@ describe('HostedKeyRateLimiter', () => { } mockAdapter.consumeTokens.mockResolvedValue(allowedResult) - process.env.EXA_API_KEY_COUNT = undefined - process.env.EXA_API_KEY_1 = undefined - process.env.EXA_API_KEY_2 = undefined - process.env.EXA_API_KEY_3 = undefined + // Empty string is falsy, so no key resolves. (Assigning `undefined` would + // leave the string "undefined" under vitest's env handling, which the + // `_1.._N` probe — used when `_COUNT` is absent — would treat as present.) + process.env.EXA_API_KEY_COUNT = '' + process.env.EXA_API_KEY_1 = '' + process.env.EXA_API_KEY_2 = '' + process.env.EXA_API_KEY_3 = '' const result = await rateLimiter.acquireKey( testProvider, @@ -101,6 +104,38 @@ describe('HostedKeyRateLimiter', () => { expect(result.error).toContain('No hosted keys configured') }) + it('mode: none returns a key without touching the queue or token bucket', async () => { + const result = await rateLimiter.acquireKey( + testProvider, + envKeyPrefix, + { mode: 'none' }, + 'workspace-1' + ) + + expect(result.success).toBe(true) + expect(result.key).toBe('test-key-1') + expect(result.envVarName).toBe('EXA_API_KEY_1') + expect(mockQueue.enqueue).not.toHaveBeenCalled() + expect(mockAdapter.consumeTokens).not.toHaveBeenCalled() + }) + + it('mode: none still reports an error when no keys are configured', async () => { + process.env.EXA_API_KEY_COUNT = '' + process.env.EXA_API_KEY_1 = '' + process.env.EXA_API_KEY_2 = '' + process.env.EXA_API_KEY_3 = '' + + const result = await rateLimiter.acquireKey( + testProvider, + envKeyPrefix, + { mode: 'none' }, + 'workspace-1' + ) + + expect(result.success).toBe(false) + expect(mockQueue.enqueue).not.toHaveBeenCalled() + }) + it('should rate limit billing actor when wait exceeds the queue cap', async () => { // resetAt past the 5-minute cap forces the wait loop to bail immediately. const rateLimitedResult: ConsumeResult = { diff --git a/apps/sim/lib/core/rate-limiter/hosted-key/hosted-key-rate-limiter.ts b/apps/sim/lib/core/rate-limiter/hosted-key/hosted-key-rate-limiter.ts index d199ed9e2c2..9326a0705e9 100644 --- a/apps/sim/lib/core/rate-limiter/hosted-key/hosted-key-rate-limiter.ts +++ b/apps/sim/lib/core/rate-limiter/hosted-key/hosted-key-rate-limiter.ts @@ -14,6 +14,7 @@ import { type CustomRateLimit, DEFAULT_BURST_MULTIPLIER, DEFAULT_WINDOW_MS, + type EnforcedRateLimitConfig, type HostedKeyRateLimitConfig, type ReportUsageResult, toTokenBucketConfig, @@ -84,11 +85,25 @@ function interruptibleSleep(ms: number, signal?: AbortSignal): Promise { */ function resolveEnvKeys(prefix: string): string[] { const count = Number.parseInt(process.env[`${prefix}_COUNT`] || '0', 10) - const names: string[] = [] - for (let i = 1; i <= count; i++) { - names.push(`${prefix}_${i}`) + if (count > 0) { + const names: string[] = [] + for (let i = 1; i <= count; i++) { + names.push(`${prefix}_${i}`) + } + return names + } + + // No explicit _COUNT: probe numbered keys (_1.._N) until one is unset, so + // existing setups (e.g. OPENAI_API_KEY_1/2/3) work without new ops config. + // Stop only on an absent var (`undefined`), not an empty one — an empty middle + // slot (`_2=''`) must not truncate the pool; getAvailableKeys filters empties. + // Intentionally does NOT fall back to a bare `{prefix}` env var — that would + // accidentally pick up ambient keys (e.g. a developer's OPENAI_API_KEY). + const numbered: string[] = [] + for (let i = 1; process.env[`${prefix}_${i}`] !== undefined; i++) { + numbered.push(`${prefix}_${i}`) } - return names + return numbered } /** Dimension name for per-billing-actor request rate limiting */ @@ -163,7 +178,7 @@ export class HostedKeyRateLimiter { * Build a token bucket config for the per-billing-actor request rate limit. * Works for both `per_request` and `custom` modes since both define `requestsPerMinute`. */ - private getActorRateLimitConfig(config: HostedKeyRateLimitConfig): TokenBucketConfig | null { + private getActorRateLimitConfig(config: EnforcedRateLimitConfig): TokenBucketConfig | null { if (!config.requestsPerMinute) return null return toTokenBucketConfig( config.requestsPerMinute, @@ -178,7 +193,7 @@ export class HostedKeyRateLimiter { private async checkActorRateLimit( provider: string, billingActorId: string, - config: HostedKeyRateLimitConfig + config: EnforcedRateLimitConfig ): Promise<{ rateLimited: true; retryAfterMs: number } | null> { const bucketConfig = this.getActorRateLimitConfig(config) if (!bucketConfig) return null @@ -283,6 +298,12 @@ export class HostedKeyRateLimiter { billingActorId: string, signal?: AbortSignal ): Promise { + // No rate limiting: skip the FIFO queue and per-actor token buckets entirely + // and select a key directly. Used by hosted LLM providers. + if (config.mode === 'none') { + return this.selectKey(provider, envKeyPrefix) + } + const ticketId = generateShortId() const startedAt = Date.now() const waitState: WaitState = { lastHeartbeatAt: startedAt } @@ -376,33 +397,7 @@ export class HostedKeyRateLimiter { }) } - const envKeys = resolveEnvKeys(envKeyPrefix) - const availableKeys = this.getAvailableKeys(envKeys) - - if (availableKeys.length === 0) { - logger.warn(`No hosted keys configured for provider ${provider}`) - return { - success: false, - error: `No hosted keys configured for ${provider}`, - } - } - - const counter = this.roundRobinCounters.get(provider) ?? 0 - const selected = availableKeys[counter % availableKeys.length] - this.roundRobinCounters.set(provider, counter + 1) - - logger.debug(`Selected hosted key for ${provider}`, { - provider, - keyIndex: selected.keyIndex, - envVarName: selected.envVarName, - }) - - return { - success: true, - key: selected.key, - keyIndex: selected.keyIndex, - envVarName: selected.envVarName, - } + return this.selectKey(provider, envKeyPrefix) } finally { // Always remove our ticket so the next caller can advance, regardless of whether // we succeeded, hit the cap, or threw. Best-effort; safe to call multiple times. @@ -410,6 +405,41 @@ export class HostedKeyRateLimiter { } } + /** + * Resolve the configured env keys for a provider and pick the next one via + * round-robin. Pure key selection with no rate limiting — shared by the + * normal acquire path (after queue/bucket waits) and the `mode: 'none'` path. + */ + private selectKey(provider: string, envKeyPrefix: string): AcquireKeyResult { + const envKeys = resolveEnvKeys(envKeyPrefix) + const availableKeys = this.getAvailableKeys(envKeys) + + if (availableKeys.length === 0) { + logger.warn(`No hosted keys configured for provider ${provider}`) + return { + success: false, + error: `No hosted keys configured for ${provider}`, + } + } + + const counter = this.roundRobinCounters.get(provider) ?? 0 + const selected = availableKeys[counter % availableKeys.length] + this.roundRobinCounters.set(provider, counter + 1) + + logger.debug(`Selected hosted key for ${provider}`, { + provider, + keyIndex: selected.keyIndex, + envVarName: selected.envVarName, + }) + + return { + success: true, + key: selected.key, + keyIndex: selected.keyIndex, + envVarName: selected.envVarName, + } + } + /** * Remaining time budget for waiting, in milliseconds. When an execution `AbortSignal` is * present it governs the wait: the budget is exhausted the moment the signal aborts (the @@ -505,7 +535,7 @@ export class HostedKeyRateLimiter { provider: string, billingActorId: string, ticketId: string, - config: HostedKeyRateLimitConfig, + config: EnforcedRateLimitConfig, startedAt: number, waitState: WaitState, signal?: AbortSignal diff --git a/apps/sim/lib/core/rate-limiter/hosted-key/types.ts b/apps/sim/lib/core/rate-limiter/hosted-key/types.ts index 735fe5b86df..244acb13c18 100644 --- a/apps/sim/lib/core/rate-limiter/hosted-key/types.ts +++ b/apps/sim/lib/core/rate-limiter/hosted-key/types.ts @@ -1,6 +1,15 @@ import type { TokenBucketConfig } from '@/lib/core/rate-limiter/storage' -export type HostedKeyRateLimitMode = 'per_request' | 'custom' +export type HostedKeyRateLimitMode = 'per_request' | 'custom' | 'none' + +/** + * No rate limiting. Skips the FIFO queue and per-actor token buckets entirely; + * `acquireKey` only does round-robin key selection. Used by hosted LLM providers, + * which are not subject to shared per-workspace request limits. + */ +export interface NoRateLimit { + mode: 'none' +} /** * Simple per-request rate limit configuration. @@ -46,8 +55,11 @@ interface RateLimitDimension { extractUsage: (params: Record, response: Record) => number } +/** Rate-limited configs (those that enforce a per-actor request limit). Excludes `none`. */ +export type EnforcedRateLimitConfig = PerRequestRateLimit | CustomRateLimit + /** Union of all hosted key rate limit configuration types */ -export type HostedKeyRateLimitConfig = PerRequestRateLimit | CustomRateLimit +export type HostedKeyRateLimitConfig = EnforcedRateLimitConfig | NoRateLimit /** * Result from acquiring a key from the hosted key rate limiter diff --git a/apps/sim/providers/anthropic/core.ts b/apps/sim/providers/anthropic/core.ts index 57056e6acca..f83fba07e4d 100644 --- a/apps/sim/providers/anthropic/core.ts +++ b/apps/sim/providers/anthropic/core.ts @@ -23,6 +23,7 @@ import type { ProviderRequest, ProviderResponse, TimeSegment } from '@/providers import { ProviderError } from '@/providers/types' import { calculateCost, + isCachedInput, prepareToolExecution, prepareToolsWithUsageControl, sumToolCosts, @@ -405,6 +406,8 @@ export async function executeAnthropicProviderRequest( initialTokens: { input: 0, output: 0, total: 0 }, initialCost: { total: 0.0, input: 0.0, output: 0.0 }, isStreaming: true, + hostedKey: request.hostedKey, + cached: isCachedInput(request.context), createStream: ({ output, finalizeTiming }) => createReadableStreamFromAnthropicStream( streamResponse as AsyncIterable, @@ -785,6 +788,8 @@ export async function executeAnthropicProviderRequest( }, toolCalls: toolCalls.length > 0 ? { list: toolCalls, count: toolCalls.length } : undefined, isStreaming: true, + hostedKey: request.hostedKey, + cached: isCachedInput(request.context), createStream: ({ output, finalizeTiming }) => createReadableStreamFromAnthropicStream( streamResponse as AsyncIterable, @@ -1208,6 +1213,8 @@ export async function executeAnthropicProviderRequest( }, toolCalls: toolCalls.length > 0 ? { list: toolCalls, count: toolCalls.length } : undefined, isStreaming: true, + hostedKey: request.hostedKey, + cached: isCachedInput(request.context), createStream: ({ output, finalizeTiming }) => createReadableStreamFromAnthropicStream( streamResponse as AsyncIterable, diff --git a/apps/sim/providers/baseten/index.ts b/apps/sim/providers/baseten/index.ts index a25fb29cb9c..aebe2717cae 100644 --- a/apps/sim/providers/baseten/index.ts +++ b/apps/sim/providers/baseten/index.ts @@ -26,6 +26,7 @@ import { ProviderError } from '@/providers/types' import { calculateCost, generateSchemaInstructions, + isCachedInput, prepareToolExecution, prepareToolsWithUsageControl, sumToolCosts, @@ -167,6 +168,8 @@ export const basetenProvider: ProviderConfig = { timing: { kind: 'simple', segmentName: request.model }, initialTokens: { input: 0, output: 0, total: 0 }, initialCost: { input: 0, output: 0, total: 0 }, + hostedKey: request.hostedKey, + cached: isCachedInput(request.context), createStream: ({ output, finalizeTiming }) => createReadableStreamFromOpenAIStream(streamResponse, (content, usage) => { output.content = content @@ -469,6 +472,8 @@ export const basetenProvider: ProviderConfig = { }, toolCalls: toolCalls.length > 0 ? { list: toolCalls, count: toolCalls.length } : undefined, + hostedKey: request.hostedKey, + cached: isCachedInput(request.context), createStream: ({ output }) => createReadableStreamFromOpenAIStream(streamResponse, (content, usage) => { output.content = content diff --git a/apps/sim/providers/fireworks/index.ts b/apps/sim/providers/fireworks/index.ts index c6981d4abf0..ce2c1c14bef 100644 --- a/apps/sim/providers/fireworks/index.ts +++ b/apps/sim/providers/fireworks/index.ts @@ -26,6 +26,7 @@ import { ProviderError } from '@/providers/types' import { calculateCost, generateSchemaInstructions, + isCachedInput, prepareToolExecution, prepareToolsWithUsageControl, sumToolCosts, @@ -167,6 +168,8 @@ export const fireworksProvider: ProviderConfig = { timing: { kind: 'simple', segmentName: request.model }, initialTokens: { input: 0, output: 0, total: 0 }, initialCost: { input: 0, output: 0, total: 0 }, + hostedKey: request.hostedKey, + cached: isCachedInput(request.context), createStream: ({ output, finalizeTiming }) => createReadableStreamFromOpenAIStream(streamResponse, (content, usage) => { output.content = content @@ -469,6 +472,8 @@ export const fireworksProvider: ProviderConfig = { }, toolCalls: toolCalls.length > 0 ? { list: toolCalls, count: toolCalls.length } : undefined, + hostedKey: request.hostedKey, + cached: isCachedInput(request.context), createStream: ({ output }) => createReadableStreamFromOpenAIStream(streamResponse, (content, usage) => { output.content = content diff --git a/apps/sim/providers/gemini/core.ts b/apps/sim/providers/gemini/core.ts index be1161ef09f..dbdad653587 100644 --- a/apps/sim/providers/gemini/core.ts +++ b/apps/sim/providers/gemini/core.ts @@ -26,6 +26,7 @@ import { extractTextContent, mapToThinkingLevel, } from '@/providers/google/utils' +import { settleStreamingLlmCost } from '@/providers/streaming-execution' import { enrichLastModelSegment } from '@/providers/trace-enrichment' import type { FunctionCallResponse, @@ -35,6 +36,7 @@ import type { } from '@/providers/types' import { calculateCost, + isCachedInput, isDeepResearchModel, isGemini3Model, prepareToolExecution, @@ -772,8 +774,12 @@ export async function executeDeepResearchRequest( } streamingResult.execution.output.interactionId = streamInteractionId - const cost = calculateCost(model, usage.inputTokens, usage.outputTokens) - streamingResult.execution.output.cost = cost + settleStreamingLlmCost( + streamingResult.execution.output, + model, + request.hostedKey, + isCachedInput(request.context) + ) const streamEndTime = Date.now() if (streamingResult.execution.output.providerTiming) { @@ -1038,12 +1044,12 @@ export async function executeGeminiRequest( total: usage.totalTokenCount, } - const costResult = calculateCost( + settleStreamingLlmCost( + streamingResult.execution.output, model, - usage.promptTokenCount, - usage.candidatesTokenCount + request.hostedKey, + isCachedInput(request.context) ) - streamingResult.execution.output.cost = costResult const streamEndTime = Date.now() if (streamingResult.execution.output.providerTiming) { @@ -1156,12 +1162,8 @@ export async function executeGeminiRequest( } } - // Capture accumulated cost before streaming - const accumulatedCost = { - input: state.cost.input, - output: state.cost.output, - total: state.cost.total, - } + // Capture accumulated tokens before streaming; final cost is settled + // from the accumulated totals via settleGeminiStreamingCost. const accumulatedTokens = { ...state.tokens } const streamGenerator = await ai.models.generateContentStream({ @@ -1189,19 +1191,17 @@ export async function executeGeminiRequest( total: accumulatedTokens.total + usage.totalTokenCount, } - const streamCost = calculateCost( + // Settle on the final accumulated tokens (cost is linear in tokens, + // so this equals the prior accumulatedCost + streamCost) so the + // hosted-key multiplier + metric apply uniformly. + const tc = sumToolCosts(state.toolResults) + settleStreamingLlmCost( + streamingResult.execution.output, model, - usage.promptTokenCount, - usage.candidatesTokenCount + request.hostedKey, + isCachedInput(request.context), + tc ) - const tc = sumToolCosts(state.toolResults) - streamingResult.execution.output.cost = { - input: accumulatedCost.input + streamCost.input, - output: accumulatedCost.output + streamCost.output, - toolCost: tc || undefined, - total: accumulatedCost.total + streamCost.total + tc, - pricing: streamCost.pricing, - } if (streamingResult.execution.output.providerTiming) { streamingResult.execution.output.providerTiming.endTime = new Date().toISOString() diff --git a/apps/sim/providers/index.ts b/apps/sim/providers/index.ts index b75860a1b11..7871de860ad 100644 --- a/apps/sim/providers/index.ts +++ b/apps/sim/providers/index.ts @@ -1,7 +1,9 @@ import { createLogger } from '@sim/logger' import { toError } from '@sim/utils/errors' import { getApiKeyWithBYOK } from '@/lib/api-key/byok' +import { classifyHostedKeyFailure, emitHostedKeyUsage } from '@/lib/api-key/hosted-cost' import { getCostMultiplier } from '@/lib/core/config/env-flags' +import { hostedKeyMetrics } from '@/lib/monitoring/metrics' import type { StreamingExecution } from '@/executor/types' import { attachLargeFileRemoteUrls, @@ -12,6 +14,7 @@ import type { ProviderId, ProviderRequest, ProviderResponse } from '@/providers/ import { calculateCost, generateStructuredOutputInstructions, + isCachedInput, shouldBillModelUsage, sumToolCosts, supportsReasoningEffort, @@ -142,6 +145,8 @@ export async function executeProviderRequest( let resolvedRequest = sanitizeRequest(request) let isBYOK = false + /** Env var of the platform hosted-key used, when one was acquired from our pool. */ + let hostedKeyEnvVar: string | undefined if (request.workspaceId) { try { @@ -149,15 +154,22 @@ export async function executeProviderRequest( providerId, request.model, request.workspaceId, - request.apiKey + request.apiKey, + request.userId ) resolvedRequest = { ...resolvedRequest, apiKey: result.apiKey } isBYOK = result.isBYOK + hostedKeyEnvVar = result.hostedKeyEnvVar + if (hostedKeyEnvVar) { + // Thread to the streaming seam so it can emit hosted-key cost on drain. + resolvedRequest.hostedKey = { provider: providerId, envVar: hostedKeyEnvVar } + } logger.info('API key resolved', { provider: providerId, model: request.model, workspaceId: request.workspaceId, isBYOK, + hostedKey: hostedKeyEnvVar, }) } catch (error) { logger.error('Failed to resolve API key:', { @@ -197,12 +209,33 @@ export async function executeProviderRequest( await attachLargeFileRemoteUrls(sanitizedRequest, providerId) await uploadLargeFilesToProvider(sanitizedRequest, providerId) - const response = await provider.executeRequest(sanitizedRequest) + let response: ProviderResponse | ReadableStream | StreamingExecution + try { + response = await provider.executeRequest(sanitizedRequest) + } catch (error) { + if (hostedKeyEnvVar) { + hostedKeyMetrics.recordFailed({ + provider: providerId, + tool: request.model, + key: hostedKeyEnvVar, + reason: classifyHostedKeyFailure(error), + }) + } + throw error + } if (isStreamingExecution(response)) { logger.info('Provider returned StreamingExecution', { isBYOK }) if (isBYOK) { zeroCostForBYOK(response) + } else if (hostedKeyEnvVar) { + // Hosted key used: record usage now; cost is settled on stream drain + // inside createStreamingExecution (the single streaming cost seam). + hostedKeyMetrics.recordUsed({ + provider: providerId, + tool: response.execution.output?.model ?? request.model, + key: hostedKeyEnvVar, + }) } return response } @@ -214,9 +247,13 @@ export async function executeProviderRequest( if (response.tokens) { const { input: promptTokens = 0, output: completionTokens = 0 } = response.tokens - const useCachedInput = !!request.context && request.context.length > 0 + const useCachedInput = isCachedInput(request.context) - const shouldBill = shouldBillModelUsage(response.model) && !isBYOK + // Bill when a platform key was used (hosted-key pool) or for a legacy hosted + // model, and never for BYOK. The explicit hosted-key signal lets us bill any + // hosting-configured provider without changing getHostedModels(). + const usedHostedKey = !!hostedKeyEnvVar + const shouldBill = !isBYOK && (usedHostedKey || shouldBillModelUsage(response.model)) if (shouldBill) { const costMultiplier = getCostMultiplier() response.cost = calculateCost( @@ -227,6 +264,14 @@ export async function executeProviderRequest( costMultiplier, costMultiplier ) + if (usedHostedKey) { + emitHostedKeyUsage({ + provider: providerId, + tool: response.model, + key: hostedKeyEnvVar as string, + costTotal: response.cost.total, + }) + } } else { response.cost = { input: 0, diff --git a/apps/sim/providers/mistral/index.ts b/apps/sim/providers/mistral/index.ts index 95c66422a72..d536d173bfc 100644 --- a/apps/sim/providers/mistral/index.ts +++ b/apps/sim/providers/mistral/index.ts @@ -19,6 +19,7 @@ import type { import { ProviderError } from '@/providers/types' import { calculateCost, + isCachedInput, prepareToolExecution, prepareToolsWithUsageControl, sumToolCosts, @@ -155,6 +156,8 @@ export const mistralProvider: ProviderConfig = { timing: { kind: 'simple', segmentName: request.model }, initialTokens: { input: 0, output: 0, total: 0 }, initialCost: { input: 0, output: 0, total: 0 }, + hostedKey: request.hostedKey, + cached: isCachedInput(request.context), createStream: ({ output, finalizeTiming }) => createReadableStreamFromMistralStream(streamResponse, (content, usage) => { output.content = content @@ -480,6 +483,8 @@ export const mistralProvider: ProviderConfig = { count: toolCalls.length, } : undefined, + hostedKey: request.hostedKey, + cached: isCachedInput(request.context), createStream: ({ output }) => createReadableStreamFromMistralStream(streamResponse, (content, usage) => { output.content = content diff --git a/apps/sim/providers/models.ts b/apps/sim/providers/models.ts index b8fa2a2bae8..472297aa970 100644 --- a/apps/sim/providers/models.ts +++ b/apps/sim/providers/models.ts @@ -29,6 +29,7 @@ import { xAIIcon, } from '@/components/icons' import type { ModelPricing, ProviderId } from '@/providers/types' +import type { BYOKProviderId } from '@/tools/types' export interface ModelCapabilities { temperature?: { @@ -83,6 +84,26 @@ export interface ProviderDefinition { contextInformationAvailable?: boolean /** Agent-block file attachment limit and large-file delivery for this provider. */ fileAttachment?: ProviderFileAttachment + /** + * Hosted-key configuration. When set (and the `hosted-key-llm` flag is on), the + * platform supplies rate-limit-free API keys for this provider's models, BYOK + * workspace keys take precedence, and usage is billed/metered through the shared + * hosted-key framework. Absence means the provider is never platform-hosted. + */ + hosting?: ProviderHostingConfig +} + +/** + * Hosted-key settings for an LLM provider. Unlike tools (which declare a + * `per_request`/`custom` pricing config), LLM cost is always token-based and + * computed directly via `calculateCost` — there is no pricing field. LLMs are + * never rate-limited (acquired with `mode: 'none'`), so there is no `rateLimit`. + */ +export interface ProviderHostingConfig { + /** Env var prefix for platform keys, e.g. 'OPENAI_API_KEY' (resolves `_1.._N`). */ + envKeyPrefix: string + /** BYOK provider id — workspace-key precedence + hosted-key metric labels. */ + byokProviderId: BYOKProviderId } /** @@ -111,9 +132,18 @@ export function getProviderFileAttachment(providerId: string): ProviderFileAttac return PROVIDER_DEFINITIONS[providerId]?.fileAttachment ?? DEFAULT_FILE_ATTACHMENT } +/** Hosted-key config for a provider, or undefined when it is never platform-hosted. */ +export function getProviderHosting(providerId: string): ProviderHostingConfig | undefined { + return PROVIDER_DEFINITIONS[providerId]?.hosting +} + export const PROVIDER_DEFINITIONS: Record = { fireworks: { id: 'fireworks', + hosting: { + envKeyPrefix: 'FIREWORKS_API_KEY', + byokProviderId: 'fireworks', + }, name: 'Fireworks', description: 'Fast inference for open-source models via Fireworks AI', defaultModel: '', @@ -130,6 +160,10 @@ export const PROVIDER_DEFINITIONS: Record = { }, together: { id: 'together', + hosting: { + envKeyPrefix: 'TOGETHER_API_KEY', + byokProviderId: 'together', + }, fileAttachment: { maxBytes: 50 * 1024 * 1024, strategy: 'remote-url' }, name: 'Together AI', description: 'Fast inference for open-source models via Together AI', @@ -147,6 +181,10 @@ export const PROVIDER_DEFINITIONS: Record = { }, baseten: { id: 'baseten', + hosting: { + envKeyPrefix: 'BASETEN_API_KEY', + byokProviderId: 'baseten', + }, fileAttachment: { maxBytes: 25 * 1024 * 1024, strategy: 'remote-url' }, name: 'Baseten', description: 'Fast inference for open-source models via Baseten Model APIs', @@ -180,6 +218,10 @@ export const PROVIDER_DEFINITIONS: Record = { }, 'ollama-cloud': { id: 'ollama-cloud', + hosting: { + envKeyPrefix: 'OLLAMA_CLOUD_API_KEY', + byokProviderId: 'ollama-cloud', + }, name: 'Ollama Cloud', description: 'Hosted open-source models via Ollama Cloud (bring your own key)', defaultModel: '', @@ -223,6 +265,10 @@ export const PROVIDER_DEFINITIONS: Record = { }, openai: { id: 'openai', + hosting: { + envKeyPrefix: 'OPENAI_API_KEY', + byokProviderId: 'openai', + }, fileAttachment: { maxBytes: 50 * 1024 * 1024, strategy: 'files-api' }, name: 'OpenAI', description: "OpenAI's models", @@ -657,6 +703,10 @@ export const PROVIDER_DEFINITIONS: Record = { }, anthropic: { id: 'anthropic', + hosting: { + envKeyPrefix: 'ANTHROPIC_API_KEY', + byokProviderId: 'anthropic', + }, fileAttachment: { maxBytes: 50 * 1024 * 1024, strategy: 'remote-url' }, name: 'Anthropic', description: "Anthropic's Claude models", @@ -1319,6 +1369,10 @@ export const PROVIDER_DEFINITIONS: Record = { }, google: { id: 'google', + hosting: { + envKeyPrefix: 'GEMINI_API_KEY', + byokProviderId: 'google', + }, fileAttachment: { maxBytes: 50 * 1024 * 1024, strategy: 'files-api' }, name: 'Google', description: "Google's Gemini models", @@ -2173,6 +2227,10 @@ export const PROVIDER_DEFINITIONS: Record = { }, mistral: { id: 'mistral', + hosting: { + envKeyPrefix: 'MISTRAL_API_KEY', + byokProviderId: 'mistral', + }, name: 'Mistral AI', description: "Mistral AI's language models", defaultModel: 'mistral-large-latest', diff --git a/apps/sim/providers/ollama/core.ts b/apps/sim/providers/ollama/core.ts index 9e10b7c436a..2d8b4487430 100644 --- a/apps/sim/providers/ollama/core.ts +++ b/apps/sim/providers/ollama/core.ts @@ -17,6 +17,7 @@ import { ProviderError } from '@/providers/types' import { calculateCost, generateSchemaInstructions, + isCachedInput, prepareToolExecution, sumToolCosts, } from '@/providers/utils' @@ -181,6 +182,8 @@ export async function executeOllamaProviderRequest( timing: { kind: 'simple', segmentName: request.model }, initialTokens: { input: 0, output: 0, total: 0 }, initialCost: { input: 0, output: 0, total: 0 }, + hostedKey: request.hostedKey, + cached: isCachedInput(request.context), createStream: ({ output, finalizeTiming }) => config.createStream(streamResponse, (content, usage) => { output.content = content @@ -489,6 +492,8 @@ export async function executeOllamaProviderRequest( count: toolCalls.length, } : undefined, + hostedKey: request.hostedKey, + cached: isCachedInput(request.context), createStream: ({ output }) => config.createStream(streamResponse, (content, usage) => { output.content = content diff --git a/apps/sim/providers/openai/core.ts b/apps/sim/providers/openai/core.ts index 913700ef5d7..1ad7755cc9a 100644 --- a/apps/sim/providers/openai/core.ts +++ b/apps/sim/providers/openai/core.ts @@ -11,6 +11,7 @@ import { ProviderError } from '@/providers/types' import { calculateCost, enforceStrictSchema, + isCachedInput, prepareToolExecution, prepareToolsWithUsageControl, sumToolCosts, @@ -255,6 +256,8 @@ export async function executeResponsesProviderRequest( timing: { kind: 'simple', segmentName: request.model }, initialTokens: { input: 0, output: 0, total: 0 }, initialCost: { input: 0, output: 0, total: 0 }, + hostedKey: request.hostedKey, + cached: isCachedInput(request.context), createStream: ({ output, finalizeTiming }) => createReadableStreamFromResponses(streamResponse, (content, usage) => { output.content = content @@ -681,6 +684,8 @@ export async function executeResponsesProviderRequest( total: accumulatedCost.total, }, toolCalls: toolCalls.length > 0 ? { list: toolCalls, count: toolCalls.length } : undefined, + hostedKey: request.hostedKey, + cached: isCachedInput(request.context), createStream: ({ output }) => createReadableStreamFromResponses(streamResponse, (content, usage) => { output.content = content diff --git a/apps/sim/providers/streaming-execution.ts b/apps/sim/providers/streaming-execution.ts index 7a217af283d..53471c3556e 100644 --- a/apps/sim/providers/streaming-execution.ts +++ b/apps/sim/providers/streaming-execution.ts @@ -1,5 +1,45 @@ +import { getCostMultiplier } from '@/lib/core/config/env-flags' +import { hostedKeyMetrics } from '@/lib/monitoring/metrics' import type { NormalizedBlockOutput, StreamingExecution } from '@/executor/types' import type { TimeSegment } from '@/providers/types' +import { calculateCost } from '@/providers/utils' + +/** + * Settle the authoritative streaming LLM cost onto `output.cost` from its final + * tokens (the single cost seam shared with the non-streaming path), and — on the + * hosted-key path — emit the hosted-key cost metric. The cost multiplier is the + * platform markup on hosted usage, so it is applied only when `hostedKey` is set; + * off the hosted path this is behaviour-preserving (multiplier 1). Any `toolCost` + * already on `output.cost` is preserved. Used here and by providers that build + * streams bespoke (e.g. gemini). + */ +export function settleStreamingLlmCost( + output: NormalizedBlockOutput, + model: string, + hostedKey: { provider: string; envVar: string } | undefined, + cached: boolean, + toolCost?: number +): void { + // Multiplier (platform markup) and cached pricing apply only on the hosted-key + // path; off it this stays behaviour-preserving (multiplier 1, no cached). + const multiplier = hostedKey ? getCostMultiplier() : 1 + const breakdown = calculateCost( + model, + output.tokens?.input ?? 0, + output.tokens?.output ?? 0, + hostedKey ? cached : false, + multiplier, + multiplier + ) + const tc = toolCost ?? output.cost?.toolCost + output.cost = tc ? { ...breakdown, toolCost: tc, total: breakdown.total + tc } : breakdown + if (hostedKey) { + hostedKeyMetrics.recordCostCharged(breakdown.total, { + provider: hostedKey.provider, + tool: model, + }) + } +} /** * Provider-agnostic assembly of the {@link StreamingExecution} object that every @@ -78,6 +118,16 @@ interface CreateStreamingExecutionOptions { toolCalls?: ToolCallSlice /** Marks `execution.isStreaming = true` when set. */ isStreaming?: boolean + /** + * Hosted-key cost settlement. Set only when the call resolved to a platform + * hosted-key (flag-on, not BYOK, not user key). When present, the wrapper owns + * the authoritative `output.cost` on drain via {@link settleStreamingLlmCost} + * (recomputed with the cost multiplier) and emits the hosted-key cost metric. + * Absent ⇒ provider's cost is left as-is. + */ + hostedKey?: { provider: string; envVar: string } + /** Whether cached input pricing applies (mirrors the non-streaming `useCachedInput`). */ + cached?: boolean /** * Builds the provider stream. Receives the live `output` object and a * `finalizeTiming` hook. The provider wires its native stream factory and, in @@ -104,6 +154,8 @@ export function createStreamingExecution( initialCost, toolCalls, isStreaming, + hostedKey, + cached, createStream, } = options @@ -150,7 +202,15 @@ export function createStreamingExecution( const timingKind = timing.kind const stream = createStream({ output, - finalizeTiming: () => finalizeTiming(output, providerStartTime, timingKind), + finalizeTiming: () => { + // Hosted-key path: the wrapper owns the authoritative cost — recompute from + // the now-final tokens (with the cost multiplier the per-provider streaming + // paths omit) and emit the hosted-key cost metric exactly once on drain. + if (hostedKey) { + settleStreamingLlmCost(output, model, hostedKey, cached ?? false) + } + finalizeTiming(output, providerStartTime, timingKind) + }, }) return { diff --git a/apps/sim/providers/together/index.ts b/apps/sim/providers/together/index.ts index 34c22f5c18c..85b2261b5ec 100644 --- a/apps/sim/providers/together/index.ts +++ b/apps/sim/providers/together/index.ts @@ -26,6 +26,7 @@ import { ProviderError } from '@/providers/types' import { calculateCost, generateSchemaInstructions, + isCachedInput, prepareToolExecution, prepareToolsWithUsageControl, sumToolCosts, @@ -167,6 +168,8 @@ export const togetherProvider: ProviderConfig = { timing: { kind: 'simple', segmentName: request.model }, initialTokens: { input: 0, output: 0, total: 0 }, initialCost: { input: 0, output: 0, total: 0 }, + hostedKey: request.hostedKey, + cached: isCachedInput(request.context), createStream: ({ output, finalizeTiming }) => createReadableStreamFromOpenAIStream(streamResponse, (content, usage) => { output.content = content @@ -469,6 +472,8 @@ export const togetherProvider: ProviderConfig = { }, toolCalls: toolCalls.length > 0 ? { list: toolCalls, count: toolCalls.length } : undefined, + hostedKey: request.hostedKey, + cached: isCachedInput(request.context), createStream: ({ output }) => createReadableStreamFromOpenAIStream(streamResponse, (content, usage) => { output.content = content diff --git a/apps/sim/providers/types.ts b/apps/sim/providers/types.ts index f5ab7a812a7..9c512dfc913 100644 --- a/apps/sim/providers/types.ts +++ b/apps/sim/providers/types.ts @@ -169,6 +169,12 @@ export interface ProviderRequest { blockNameMapping?: Record isCopilotRequest?: boolean isBYOK?: boolean + /** + * Set when the request resolved to a platform hosted-key-pool key (not BYOK, + * not user-provided). Carries the provider id + env var name so the streaming + * cost seam can emit hosted-key usage/cost metrics on drain. Absent otherwise. + */ + hostedKey?: { provider: string; envVar: string } azureEndpoint?: string azureApiVersion?: string vertexProject?: string diff --git a/apps/sim/providers/utils.ts b/apps/sim/providers/utils.ts index 2c22c865e4a..12539f47f2a 100644 --- a/apps/sim/providers/utils.ts +++ b/apps/sim/providers/utils.ts @@ -838,6 +838,11 @@ export function shouldBillModelUsage(model: string): boolean { return hostedModels.some((hostedModel) => model.toLowerCase() === hostedModel.toLowerCase()) } +/** Whether cached-input pricing applies — i.e. prior context/memory was supplied. */ +export function isCachedInput(context: string | null | undefined): boolean { + return !!context && context.length > 0 +} + /** * Placeholder returned for providers that use their own credential mechanism * rather than a user-supplied API key (e.g. AWS Bedrock via IAM/instance profiles). diff --git a/apps/sim/tools/index.ts b/apps/sim/tools/index.ts index 28eb17aaebe..a9e41f9ec3d 100644 --- a/apps/sim/tools/index.ts +++ b/apps/sim/tools/index.ts @@ -3,6 +3,11 @@ import { getErrorMessage, toError } from '@sim/utils/errors' import { sleep } from '@sim/utils/helpers' import { randomFloat } from '@sim/utils/random' import { getBYOKKey } from '@/lib/api-key/byok' +import { + calculateHostedCost, + classifyHostedKeyFailure, + emitHostedKeyUsage, +} from '@/lib/api-key/hosted-cost' import { generateInternalToken } from '@/lib/auth/internal' import { isHosted } from '@/lib/core/config/env-flags' import { DEFAULT_EXECUTION_TIMEOUT_MS, getMaxExecutionTimeout } from '@/lib/core/execution-limits' @@ -381,22 +386,6 @@ function isRateLimitError(error: unknown): boolean { return false } -/** - * Map a thrown tool error to a hosted-key failure reason for metrics. Mirrors - * `isRateLimitError`: some providers signal quota/rate-limit via 401/403 with a - * descriptive message, so those count as `rate_limited`, not `auth`. - */ -function classifyHostedKeyFailure(error: unknown): 'rate_limited' | 'auth' | 'other' { - const status = (error as { status?: number } | null)?.status - if (status === 429 || status === 503) return 'rate_limited' - if (status === 401 || status === 403) { - const message = ((error as { message?: string } | null)?.message ?? '').toLowerCase() - if (message.includes('quota') || message.includes('rate limit')) return 'rate_limited' - return 'auth' - } - return 'other' -} - /** Context for retry with rate limit tracking */ interface RetryContext { requestId: string @@ -506,30 +495,15 @@ interface ToolCostResult { } /** - * Calculate cost based on pricing model + * Calculate cost based on pricing model. Delegates to the shared + * {@link calculateHostedCost} so tools and LLM providers bill identically. */ function calculateToolCost( pricing: ToolHostingPricing, params: Record, response: Record ): ToolCostResult { - switch (pricing.type) { - case 'per_request': - return { cost: pricing.cost } - - case 'custom': { - const result = pricing.getCost(params, response) - if (typeof result === 'number') { - return { cost: result } - } - return result - } - - default: { - const exhaustiveCheck: never = pricing - throw new Error(`Unknown pricing type: ${(exhaustiveCheck as ToolHostingPricing).type}`) - } - } + return calculateHostedCost(pricing, params, response) } interface HostedKeyCostResult { @@ -656,8 +630,7 @@ async function applyHostedKeyCostToResult( const provider = tool.hosting?.byokProviderId || tool.id const key = envVarName ?? 'unknown' - hostedKeyMetrics.recordUsed({ provider, tool: tool.id, key }) - hostedKeyMetrics.recordCostCharged(hostedKeyCost, { provider, tool: tool.id }) + emitHostedKeyUsage({ provider, tool: tool.id, key, costTotal: hostedKeyCost }) if (hostedKeyCost > 0) { finalResult.output = { From 03f9159a8b357e401c5e7fd2bf1868859e268f85 Mon Sep 17 00:00:00 2001 From: Theodore Li Date: Wed, 17 Jun 2026 19:19:05 -0700 Subject: [PATCH 2/7] test(providers): complete provider-utils/env mocks for hosted-key streaming wiring --- apps/sim/providers/azure-anthropic/index.test.ts | 7 ++++++- apps/sim/providers/azure-openai/index.test.ts | 7 ++++++- apps/sim/providers/baseten/index.test.ts | 1 + apps/sim/providers/fireworks/index.test.ts | 1 + apps/sim/providers/litellm/index.test.ts | 3 +++ apps/sim/providers/ollama-cloud/index.test.ts | 1 + apps/sim/providers/ollama/index.test.ts | 1 + apps/sim/providers/together/index.test.ts | 1 + apps/sim/providers/vllm/index.test.ts | 7 ++++++- 9 files changed, 26 insertions(+), 3 deletions(-) diff --git a/apps/sim/providers/azure-anthropic/index.test.ts b/apps/sim/providers/azure-anthropic/index.test.ts index d78c9bdac2b..7242911e2cd 100644 --- a/apps/sim/providers/azure-anthropic/index.test.ts +++ b/apps/sim/providers/azure-anthropic/index.test.ts @@ -35,7 +35,12 @@ const { }) vi.mock('@anthropic-ai/sdk', () => ({ default: mockAnthropic })) -vi.mock('@/lib/core/config/env', () => ({ env: envState })) +vi.mock('@/lib/core/config/env', () => ({ + env: envState, + getEnv: (key: string) => (envState as Record)[key], + isTruthy: (v: unknown) => v === true || v === 'true' || v === '1', + isFalsy: (v: unknown) => v === false || v === 'false' || v === '0', +})) vi.mock('@/lib/core/security/input-validation.server', () => ({ validateUrlWithDNS: mockValidate, createPinnedFetch: mockCreatePinnedFetch, diff --git a/apps/sim/providers/azure-openai/index.test.ts b/apps/sim/providers/azure-openai/index.test.ts index 15e4073e8b0..b7fb53660c6 100644 --- a/apps/sim/providers/azure-openai/index.test.ts +++ b/apps/sim/providers/azure-openai/index.test.ts @@ -43,7 +43,12 @@ const { }) vi.mock('openai', () => ({ AzureOpenAI: mockAzureOpenAI })) -vi.mock('@/lib/core/config/env', () => ({ env: envState })) +vi.mock('@/lib/core/config/env', () => ({ + env: envState, + getEnv: (key: string) => (envState as Record)[key], + isTruthy: (v: unknown) => v === true || v === 'true' || v === '1', + isFalsy: (v: unknown) => v === false || v === 'false' || v === '0', +})) vi.mock('@/providers', () => ({ MAX_TOOL_ITERATIONS: 20 })) vi.mock('@/lib/core/security/input-validation.server', () => ({ validateUrlWithDNS: mockValidate, diff --git a/apps/sim/providers/baseten/index.test.ts b/apps/sim/providers/baseten/index.test.ts index df296c6626e..6b85b8a56a5 100644 --- a/apps/sim/providers/baseten/index.test.ts +++ b/apps/sim/providers/baseten/index.test.ts @@ -49,6 +49,7 @@ vi.mock('@/providers/trace-enrichment', () => ({ })) vi.mock('@/providers/utils', () => ({ + isCachedInput: (context?: string | null) => !!context && context.length > 0, calculateCost: vi.fn().mockReturnValue({ input: 0, output: 0, total: 0 }), generateSchemaInstructions: vi.fn(() => 'SCHEMA_INSTRUCTIONS'), prepareToolExecution: vi.fn(() => ({ toolParams: { x: 1 }, executionParams: { x: 1 } })), diff --git a/apps/sim/providers/fireworks/index.test.ts b/apps/sim/providers/fireworks/index.test.ts index 8c38a5b7303..bb0fb84aad7 100644 --- a/apps/sim/providers/fireworks/index.test.ts +++ b/apps/sim/providers/fireworks/index.test.ts @@ -49,6 +49,7 @@ vi.mock('@/providers/trace-enrichment', () => ({ })) vi.mock('@/providers/utils', () => ({ + isCachedInput: (context?: string | null) => !!context && context.length > 0, calculateCost: vi.fn().mockReturnValue({ input: 0, output: 0, total: 0 }), generateSchemaInstructions: vi.fn(() => 'SCHEMA_INSTRUCTIONS'), prepareToolExecution: vi.fn(() => ({ toolParams: { x: 1 }, executionParams: { x: 1 } })), diff --git a/apps/sim/providers/litellm/index.test.ts b/apps/sim/providers/litellm/index.test.ts index 8a6a2fa011d..dce1c0452a5 100644 --- a/apps/sim/providers/litellm/index.test.ts +++ b/apps/sim/providers/litellm/index.test.ts @@ -22,6 +22,9 @@ vi.mock('@/providers', () => ({ MAX_TOOL_ITERATIONS: 20 })) vi.mock('@/lib/core/config/env', () => ({ env: { LITELLM_BASE_URL: 'http://litellm.test', LITELLM_API_KEY: '' }, + getEnv: (_key: string) => undefined, + isTruthy: (v: unknown) => v === true || v === 'true' || v === '1', + isFalsy: (v: unknown) => v === false || v === 'false' || v === '0', })) vi.mock('@/stores/providers', () => ({ diff --git a/apps/sim/providers/ollama-cloud/index.test.ts b/apps/sim/providers/ollama-cloud/index.test.ts index 1164e0be3e3..f3c2b98d088 100644 --- a/apps/sim/providers/ollama-cloud/index.test.ts +++ b/apps/sim/providers/ollama-cloud/index.test.ts @@ -68,6 +68,7 @@ vi.mock('@/providers/ollama-cloud/utils', () => ({ }, })) vi.mock('@/providers/utils', () => ({ + isCachedInput: (context?: string | null) => !!context && context.length > 0, calculateCost: () => ({ input: 0, output: 0, total: 0, pricing: null }), generateSchemaInstructions: () => 'SCHEMA_INSTRUCTIONS', prepareToolExecution: (_tool: unknown, args: Record) => ({ diff --git a/apps/sim/providers/ollama/index.test.ts b/apps/sim/providers/ollama/index.test.ts index 3c811906826..0b93911f0d6 100644 --- a/apps/sim/providers/ollama/index.test.ts +++ b/apps/sim/providers/ollama/index.test.ts @@ -56,6 +56,7 @@ vi.mock('@/providers/ollama/utils', () => ({ }, })) vi.mock('@/providers/utils', () => ({ + isCachedInput: (context?: string | null) => !!context && context.length > 0, calculateCost: () => ({ input: 0, output: 0, total: 0, pricing: null }), generateSchemaInstructions: () => 'SCHEMA_INSTRUCTIONS', prepareToolExecution: (_tool: unknown, args: Record) => ({ diff --git a/apps/sim/providers/together/index.test.ts b/apps/sim/providers/together/index.test.ts index 9d5386331cc..a16687e6781 100644 --- a/apps/sim/providers/together/index.test.ts +++ b/apps/sim/providers/together/index.test.ts @@ -49,6 +49,7 @@ vi.mock('@/providers/trace-enrichment', () => ({ })) vi.mock('@/providers/utils', () => ({ + isCachedInput: (context?: string | null) => !!context && context.length > 0, calculateCost: vi.fn().mockReturnValue({ input: 0, output: 0, total: 0 }), generateSchemaInstructions: vi.fn(() => 'SCHEMA_INSTRUCTIONS'), prepareToolExecution: vi.fn(() => ({ toolParams: { x: 1 }, executionParams: { x: 1 } })), diff --git a/apps/sim/providers/vllm/index.test.ts b/apps/sim/providers/vllm/index.test.ts index 925c61ca2d2..4e3783c0512 100644 --- a/apps/sim/providers/vllm/index.test.ts +++ b/apps/sim/providers/vllm/index.test.ts @@ -44,7 +44,12 @@ const { }) vi.mock('openai', () => ({ default: mockOpenAI })) -vi.mock('@/lib/core/config/env', () => ({ env: envState })) +vi.mock('@/lib/core/config/env', () => ({ + env: envState, + getEnv: (key: string) => (envState as Record)[key], + isTruthy: (v: unknown) => v === true || v === 'true' || v === '1', + isFalsy: (v: unknown) => v === false || v === 'false' || v === '0', +})) vi.mock('@/lib/core/security/input-validation.server', () => ({ validateUrlWithDNS: mockValidateUrlWithDNS, createPinnedFetch: mockCreatePinnedFetch, From 5f9046e576f6ba1572a443869e319eeb82fc9bac Mon Sep 17 00:00:00 2001 From: Theodore Li Date: Wed, 17 Jun 2026 19:29:12 -0700 Subject: [PATCH 3/7] fix(providers): respect user-provided key over hosted pool in hosted-key-llm path --- apps/sim/lib/api-key/byok.ts | 11 +++++++++-- apps/sim/providers/index.ts | 5 +++-- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/apps/sim/lib/api-key/byok.ts b/apps/sim/lib/api-key/byok.ts index c750b5aff8e..0cebbad2c99 100644 --- a/apps/sim/lib/api-key/byok.ts +++ b/apps/sim/lib/api-key/byok.ts @@ -104,8 +104,9 @@ export async function getApiKeyWithBYOK( userId?: string | null ): Promise { // Unified hosted-key path (flag-gated). For any provider with a hosting config: - // BYOK workspace key wins; otherwise acquire a platform key through the shared - // hosted-key framework with no rate limiting. Falls through to the legacy + // workspace BYOK key wins, then a user-provided key (never billed via the pool), + // otherwise acquire a platform key through the shared hosted-key framework with no + // rate limiting. Mirrors tool hosted-key precedence. Falls through to the legacy // per-provider logic when the flag is off or no platform keys are configured, // keeping flag-off behavior identical. if (isHosted && workspaceId) { @@ -117,6 +118,12 @@ export async function getApiKeyWithBYOK( return byokResult } + // A user-supplied key takes precedence over the platform pool — use it as-is + // and never bill it through hosted-key metrics/cost. + if (userProvidedKey) { + return { apiKey: userProvidedKey, isBYOK: false } + } + const acquired = await getHostedKeyRateLimiter().acquireKey( hosting.byokProviderId, hosting.envKeyPrefix, diff --git a/apps/sim/providers/index.ts b/apps/sim/providers/index.ts index 7871de860ad..a1ea57a6383 100644 --- a/apps/sim/providers/index.ts +++ b/apps/sim/providers/index.ts @@ -229,8 +229,9 @@ export async function executeProviderRequest( if (isBYOK) { zeroCostForBYOK(response) } else if (hostedKeyEnvVar) { - // Hosted key used: record usage now; cost is settled on stream drain - // inside createStreamingExecution (the single streaming cost seam). + // Hosted key used: record usage now; cost is settled on stream drain via + // settleStreamingLlmCost (from createStreamingExecution, or the provider's + // bespoke stream finalizer for gemini). hostedKeyMetrics.recordUsed({ provider: providerId, tool: response.execution.output?.model ?? request.model, From 8d6b768c9f9b7ea7f804b0566d006e415f6cf64c Mon Sep 17 00:00:00 2001 From: Theodore Li Date: Wed, 17 Jun 2026 19:41:01 -0700 Subject: [PATCH 4/7] fix(providers): settle hosted-key streaming cost on stream drain, not finalizeTiming --- .../sim/providers/streaming-execution.test.ts | 56 ++++++++++++++++++ apps/sim/providers/streaming-execution.ts | 59 +++++++++++++++---- 2 files changed, 105 insertions(+), 10 deletions(-) diff --git a/apps/sim/providers/streaming-execution.test.ts b/apps/sim/providers/streaming-execution.test.ts index 25e7708b1f0..88df9a6ef78 100644 --- a/apps/sim/providers/streaming-execution.test.ts +++ b/apps/sim/providers/streaming-execution.test.ts @@ -2,6 +2,17 @@ * @vitest-environment node */ import { describe, expect, it, vi } from 'vitest' + +const { mockRecordCostCharged } = vi.hoisted(() => ({ mockRecordCostCharged: vi.fn() })) + +vi.mock('@/providers/utils', () => ({ + calculateCost: vi.fn(() => ({ input: 1, output: 2, total: 3, pricing: {} })), +})) +vi.mock('@/lib/core/config/env-flags', () => ({ getCostMultiplier: () => 1 })) +vi.mock('@/lib/monitoring/metrics', () => ({ + hostedKeyMetrics: { recordCostCharged: mockRecordCostCharged }, +})) + import type { NormalizedBlockOutput } from '@/executor/types' import { createStreamingExecution } from '@/providers/streaming-execution' @@ -27,6 +38,51 @@ describe('createStreamingExecution', () => { const providerStartTime = 1_000 const providerStartTimeISO = new Date(providerStartTime).toISOString() + it('settles hosted-key cost on stream drain even when finalizeTiming is never called (post-tool path)', async () => { + mockRecordCostCharged.mockClear() + // A source stream that closes immediately, mirroring a drained provider stream. + const sourceStream = new ReadableStream({ start: (c) => c.close() }) + + const result = createStreamingExecution({ + model: 'test-model', + providerStartTime, + providerStartTimeISO, + timing: { + kind: 'accumulated', + modelTime: 1, + toolsTime: 0, + firstResponseTime: 1, + iterations: 1, + timeSegments: [], + }, + initialTokens: { input: 0, output: 0, total: 0 }, + initialCost: { input: 0, output: 0, total: 0 }, + hostedKey: { provider: 'openai', envVar: 'OPENAI_API_KEY_1' }, + cached: false, + // Post-tool streaming path: sets final tokens but never calls finalizeTiming. + createStream: ({ output }) => { + output.tokens = { input: 100, output: 50, total: 150 } + return sourceStream + }, + }) + + // Cost not settled until the stream is actually drained. + expect(mockRecordCostCharged).not.toHaveBeenCalled() + + const reader = result.stream.getReader() + while (!(await reader.read()).done) { + // drain + } + + // Settlement ran on drain: cost recomputed from final tokens, metric emitted once. + expect(result.execution.output.cost).toEqual({ input: 1, output: 2, total: 3, pricing: {} }) + expect(mockRecordCostCharged).toHaveBeenCalledTimes(1) + expect(mockRecordCostCharged).toHaveBeenCalledWith(3, { + provider: 'openai', + tool: 'test-model', + }) + }) + it('assembles the simple (no-tools) shape and finalizes timing on drain', () => { const drainTime = 5_000 vi.spyOn(Date, 'now').mockReturnValue(drainTime) diff --git a/apps/sim/providers/streaming-execution.ts b/apps/sim/providers/streaming-execution.ts index 53471c3556e..f5e49ec6e17 100644 --- a/apps/sim/providers/streaming-execution.ts +++ b/apps/sim/providers/streaming-execution.ts @@ -4,6 +4,41 @@ import type { NormalizedBlockOutput, StreamingExecution } from '@/executor/types import type { TimeSegment } from '@/providers/types' import { calculateCost } from '@/providers/utils' +/** + * Passthrough of `source` that invokes `onDrain` exactly once, after the source + * completes naturally (not on error or cancel). Lets the hosted-key cost settle + * when final tokens are known, regardless of whether the provider's drain + * callback finalized timing. + */ +function settleHostedCostOnStreamDrain( + source: ReadableStream, + onDrain: () => void +): ReadableStream { + const reader = source.getReader() + let settled = false + return new ReadableStream({ + async pull(controller) { + try { + const { done, value } = await reader.read() + if (done) { + if (!settled) { + settled = true + onDrain() + } + controller.close() + return + } + controller.enqueue(value) + } catch (error) { + controller.error(error) + } + }, + cancel(reason) { + return reader.cancel(reason) + }, + }) +} + /** * Settle the authoritative streaming LLM cost onto `output.cost` from its final * tokens (the single cost seam shared with the non-streaming path), and — on the @@ -200,19 +235,23 @@ export function createStreamingExecution( } const timingKind = timing.kind - const stream = createStream({ + const baseStream = createStream({ output, - finalizeTiming: () => { - // Hosted-key path: the wrapper owns the authoritative cost — recompute from - // the now-final tokens (with the cost multiplier the per-provider streaming - // paths omit) and emit the hosted-key cost metric exactly once on drain. - if (hostedKey) { - settleStreamingLlmCost(output, model, hostedKey, cached ?? false) - } - finalizeTiming(output, providerStartTime, timingKind) - }, + finalizeTiming: () => finalizeTiming(output, providerStartTime, timingKind), }) + // Settle hosted-key cost on actual stream drain. This must NOT hang off the + // provider's `finalizeTiming` call — the post-tool streaming paths + // (`createStream: ({ output }) => …`) never invoke it — so instead we wrap the + // returned stream and settle once the source completes (final tokens are set by + // the provider's drain callback before the stream closes). Recomputes the + // authoritative cost with the multiplier and emits the cost metric exactly once. + const stream = hostedKey + ? settleHostedCostOnStreamDrain(baseStream, () => + settleStreamingLlmCost(output, model, hostedKey, cached ?? false) + ) + : baseStream + return { stream, execution: { From ed64de9790a2c9b36127ce47d38ab537123c70a7 Mon Sep 17 00:00:00 2001 From: Theodore Li Date: Wed, 17 Jun 2026 19:52:42 -0700 Subject: [PATCH 5/7] fix(providers): record hosted-key failure when a hosted stream errors mid-drain --- .../sim/providers/streaming-execution.test.ts | 45 ++++++++++++++++++- apps/sim/providers/streaming-execution.ts | 35 ++++++++++----- 2 files changed, 68 insertions(+), 12 deletions(-) diff --git a/apps/sim/providers/streaming-execution.test.ts b/apps/sim/providers/streaming-execution.test.ts index 88df9a6ef78..48dd992c78b 100644 --- a/apps/sim/providers/streaming-execution.test.ts +++ b/apps/sim/providers/streaming-execution.test.ts @@ -3,14 +3,20 @@ */ import { describe, expect, it, vi } from 'vitest' -const { mockRecordCostCharged } = vi.hoisted(() => ({ mockRecordCostCharged: vi.fn() })) +const { mockRecordCostCharged, mockRecordFailed } = vi.hoisted(() => ({ + mockRecordCostCharged: vi.fn(), + mockRecordFailed: vi.fn(), +})) vi.mock('@/providers/utils', () => ({ calculateCost: vi.fn(() => ({ input: 1, output: 2, total: 3, pricing: {} })), })) vi.mock('@/lib/core/config/env-flags', () => ({ getCostMultiplier: () => 1 })) vi.mock('@/lib/monitoring/metrics', () => ({ - hostedKeyMetrics: { recordCostCharged: mockRecordCostCharged }, + hostedKeyMetrics: { recordCostCharged: mockRecordCostCharged, recordFailed: mockRecordFailed }, +})) +vi.mock('@/lib/api-key/hosted-cost', () => ({ + classifyHostedKeyFailure: () => 'other', })) import type { NormalizedBlockOutput } from '@/executor/types' @@ -81,6 +87,41 @@ describe('createStreamingExecution', () => { provider: 'openai', tool: 'test-model', }) + expect(mockRecordFailed).not.toHaveBeenCalled() + }) + + it('records a hosted-key failure (not cost) when the stream errors mid-drain', async () => { + mockRecordCostCharged.mockClear() + mockRecordFailed.mockClear() + const boom = new Error('upstream 500') + const sourceStream = new ReadableStream({ + pull: (c) => c.error(boom), + }) + + const result = createStreamingExecution({ + model: 'test-model', + providerStartTime, + providerStartTimeISO, + timing: { kind: 'simple', segmentName: 'test-model' }, + initialTokens: { input: 0, output: 0, total: 0 }, + initialCost: { input: 0, output: 0, total: 0 }, + hostedKey: { provider: 'openai', envVar: 'OPENAI_API_KEY_1' }, + cached: false, + createStream: () => sourceStream, + }) + + const reader = result.stream.getReader() + await expect(reader.read()).rejects.toThrow('upstream 500') + + // Failure recorded once; no cost charged for a failed stream. + expect(mockRecordFailed).toHaveBeenCalledTimes(1) + expect(mockRecordFailed).toHaveBeenCalledWith({ + provider: 'openai', + tool: 'test-model', + key: 'OPENAI_API_KEY_1', + reason: 'other', + }) + expect(mockRecordCostCharged).not.toHaveBeenCalled() }) it('assembles the simple (no-tools) shape and finalizes timing on drain', () => { diff --git a/apps/sim/providers/streaming-execution.ts b/apps/sim/providers/streaming-execution.ts index f5e49ec6e17..6804ca9bca6 100644 --- a/apps/sim/providers/streaming-execution.ts +++ b/apps/sim/providers/streaming-execution.ts @@ -1,3 +1,4 @@ +import { classifyHostedKeyFailure } from '@/lib/api-key/hosted-cost' import { getCostMultiplier } from '@/lib/core/config/env-flags' import { hostedKeyMetrics } from '@/lib/monitoring/metrics' import type { NormalizedBlockOutput, StreamingExecution } from '@/executor/types' @@ -5,24 +6,26 @@ import type { TimeSegment } from '@/providers/types' import { calculateCost } from '@/providers/utils' /** - * Passthrough of `source` that invokes `onDrain` exactly once, after the source - * completes naturally (not on error or cancel). Lets the hosted-key cost settle - * when final tokens are known, regardless of whether the provider's drain - * callback finalized timing. + * Passthrough of `source` that runs exactly one terminal callback: `onDrain` + * when it completes normally (cost is settled then), or `onError` when a read + * errors mid-stream (records the hosted-key failure). A client `cancel` is not a + * key failure, so it runs neither. Lets hosted-key cost/failure metrics stay + * symmetric regardless of whether the provider's drain callback finalized timing. */ function settleHostedCostOnStreamDrain( source: ReadableStream, - onDrain: () => void + onDrain: () => void, + onError: (error: unknown) => void ): ReadableStream { const reader = source.getReader() - let settled = false + let finished = false return new ReadableStream({ async pull(controller) { try { const { done, value } = await reader.read() if (done) { - if (!settled) { - settled = true + if (!finished) { + finished = true onDrain() } controller.close() @@ -30,6 +33,10 @@ function settleHostedCostOnStreamDrain( } controller.enqueue(value) } catch (error) { + if (!finished) { + finished = true + onError(error) + } controller.error(error) } }, @@ -247,8 +254,16 @@ export function createStreamingExecution( // the provider's drain callback before the stream closes). Recomputes the // authoritative cost with the multiplier and emits the cost metric exactly once. const stream = hostedKey - ? settleHostedCostOnStreamDrain(baseStream, () => - settleStreamingLlmCost(output, model, hostedKey, cached ?? false) + ? settleHostedCostOnStreamDrain( + baseStream, + () => settleStreamingLlmCost(output, model, hostedKey, cached ?? false), + (error) => + hostedKeyMetrics.recordFailed({ + provider: hostedKey.provider, + tool: model, + key: hostedKey.envVar, + reason: classifyHostedKeyFailure(error), + }) ) : baseStream From c96a2f80c7efa2fb64dae5c32f5c21312801824a Mon Sep 17 00:00:00 2001 From: Theodore Li Date: Thu, 18 Jun 2026 09:57:47 -0700 Subject: [PATCH 6/7] fix(providers): record hosted-stream failures provider-agnostically at the chokepoint (covers gemini) --- apps/sim/providers/index.ts | 20 ++++--- .../sim/providers/streaming-execution.test.ts | 43 +++++++++------ apps/sim/providers/streaming-execution.ts | 55 ++++++++++++------- 3 files changed, 72 insertions(+), 46 deletions(-) diff --git a/apps/sim/providers/index.ts b/apps/sim/providers/index.ts index a1ea57a6383..eb6bde1e6f7 100644 --- a/apps/sim/providers/index.ts +++ b/apps/sim/providers/index.ts @@ -10,6 +10,7 @@ import { uploadLargeFilesToProvider, } from '@/providers/file-attachments.server' import { getProviderExecutor } from '@/providers/registry' +import { recordHostedStreamFailure } from '@/providers/streaming-execution' import type { ProviderId, ProviderRequest, ProviderResponse } from '@/providers/types' import { calculateCost, @@ -229,14 +230,17 @@ export async function executeProviderRequest( if (isBYOK) { zeroCostForBYOK(response) } else if (hostedKeyEnvVar) { - // Hosted key used: record usage now; cost is settled on stream drain via - // settleStreamingLlmCost (from createStreamingExecution, or the provider's - // bespoke stream finalizer for gemini). - hostedKeyMetrics.recordUsed({ - provider: providerId, - tool: response.execution.output?.model ?? request.model, - key: hostedKeyEnvVar, - }) + // Hosted key used: record usage at dispatch. Cost is settled per-provider on + // successful stream drain (createStreamingExecution, or gemini's bespoke + // finalizer); a mid-stream error records a failure here — provider-agnostic, + // so it covers gemini and any non-wrapper stream too. + const model = response.execution.output?.model ?? request.model + hostedKeyMetrics.recordUsed({ provider: providerId, tool: model, key: hostedKeyEnvVar }) + response.stream = recordHostedStreamFailure( + response.stream, + { provider: providerId, envVar: hostedKeyEnvVar }, + model + ) } return response } diff --git a/apps/sim/providers/streaming-execution.test.ts b/apps/sim/providers/streaming-execution.test.ts index 48dd992c78b..87d96d50a42 100644 --- a/apps/sim/providers/streaming-execution.test.ts +++ b/apps/sim/providers/streaming-execution.test.ts @@ -20,7 +20,10 @@ vi.mock('@/lib/api-key/hosted-cost', () => ({ })) import type { NormalizedBlockOutput } from '@/executor/types' -import { createStreamingExecution } from '@/providers/streaming-execution' +import { + createStreamingExecution, + recordHostedStreamFailure, +} from '@/providers/streaming-execution' /** * Builds a fake stream factory mirroring the providers' `createReadableStreamFrom*` @@ -90,27 +93,19 @@ describe('createStreamingExecution', () => { expect(mockRecordFailed).not.toHaveBeenCalled() }) - it('records a hosted-key failure (not cost) when the stream errors mid-drain', async () => { + it('recordHostedStreamFailure records a failure (not cost) when the stream errors', async () => { mockRecordCostCharged.mockClear() mockRecordFailed.mockClear() const boom = new Error('upstream 500') - const sourceStream = new ReadableStream({ - pull: (c) => c.error(boom), - }) + const sourceStream = new ReadableStream({ pull: (c) => c.error(boom) }) - const result = createStreamingExecution({ - model: 'test-model', - providerStartTime, - providerStartTimeISO, - timing: { kind: 'simple', segmentName: 'test-model' }, - initialTokens: { input: 0, output: 0, total: 0 }, - initialCost: { input: 0, output: 0, total: 0 }, - hostedKey: { provider: 'openai', envVar: 'OPENAI_API_KEY_1' }, - cached: false, - createStream: () => sourceStream, - }) + const wrapped = recordHostedStreamFailure( + sourceStream, + { provider: 'openai', envVar: 'OPENAI_API_KEY_1' }, + 'test-model' + ) - const reader = result.stream.getReader() + const reader = wrapped.getReader() await expect(reader.read()).rejects.toThrow('upstream 500') // Failure recorded once; no cost charged for a failed stream. @@ -124,6 +119,20 @@ describe('createStreamingExecution', () => { expect(mockRecordCostCharged).not.toHaveBeenCalled() }) + it('recordHostedStreamFailure does not record a failure when the stream completes', async () => { + mockRecordFailed.mockClear() + const wrapped = recordHostedStreamFailure( + new ReadableStream({ start: (c) => c.close() }), + { provider: 'openai', envVar: 'OPENAI_API_KEY_1' }, + 'test-model' + ) + const reader = wrapped.getReader() + while (!(await reader.read()).done) { + // drain + } + expect(mockRecordFailed).not.toHaveBeenCalled() + }) + it('assembles the simple (no-tools) shape and finalizes timing on drain', () => { const drainTime = 5_000 vi.spyOn(Date, 'now').mockReturnValue(drainTime) diff --git a/apps/sim/providers/streaming-execution.ts b/apps/sim/providers/streaming-execution.ts index 6804ca9bca6..ca46faabe15 100644 --- a/apps/sim/providers/streaming-execution.ts +++ b/apps/sim/providers/streaming-execution.ts @@ -6,16 +6,13 @@ import type { TimeSegment } from '@/providers/types' import { calculateCost } from '@/providers/utils' /** - * Passthrough of `source` that runs exactly one terminal callback: `onDrain` - * when it completes normally (cost is settled then), or `onError` when a read - * errors mid-stream (records the hosted-key failure). A client `cancel` is not a - * key failure, so it runs neither. Lets hosted-key cost/failure metrics stay - * symmetric regardless of whether the provider's drain callback finalized timing. + * Passthrough of `source` that runs at most one terminal callback: `onDrain` when + * it completes normally, or `onError` when a read errors mid-stream. A client + * `cancel` runs neither (an abort is not a key failure). */ -function settleHostedCostOnStreamDrain( +function tapStreamTermination( source: ReadableStream, - onDrain: () => void, - onError: (error: unknown) => void + callbacks: { onDrain?: () => void; onError?: (error: unknown) => void } ): ReadableStream { const reader = source.getReader() let finished = false @@ -26,7 +23,7 @@ function settleHostedCostOnStreamDrain( if (done) { if (!finished) { finished = true - onDrain() + callbacks.onDrain?.() } controller.close() return @@ -35,7 +32,7 @@ function settleHostedCostOnStreamDrain( } catch (error) { if (!finished) { finished = true - onError(error) + callbacks.onError?.(error) } controller.error(error) } @@ -46,6 +43,29 @@ function settleHostedCostOnStreamDrain( }) } +/** + * Wrap a hosted-key streaming response so a mid-stream read error records a + * hosted-key failure metric. Applied provider-agnostically at the chokepoint + * (`executeProviderRequest`) so it covers every provider — including ones that + * build streams bespoke (gemini) and don't go through {@link createStreamingExecution}. + * Cost on success is settled per-provider; this only handles the failure leg. + */ +export function recordHostedStreamFailure( + source: ReadableStream, + hostedKey: { provider: string; envVar: string }, + model: string +): ReadableStream { + return tapStreamTermination(source, { + onError: (error) => + hostedKeyMetrics.recordFailed({ + provider: hostedKey.provider, + tool: model, + key: hostedKey.envVar, + reason: classifyHostedKeyFailure(error), + }), + }) +} + /** * Settle the authoritative streaming LLM cost onto `output.cost` from its final * tokens (the single cost seam shared with the non-streaming path), and — on the @@ -253,18 +273,11 @@ export function createStreamingExecution( // returned stream and settle once the source completes (final tokens are set by // the provider's drain callback before the stream closes). Recomputes the // authoritative cost with the multiplier and emits the cost metric exactly once. + // Failure on error is handled provider-agnostically in executeProviderRequest. const stream = hostedKey - ? settleHostedCostOnStreamDrain( - baseStream, - () => settleStreamingLlmCost(output, model, hostedKey, cached ?? false), - (error) => - hostedKeyMetrics.recordFailed({ - provider: hostedKey.provider, - tool: model, - key: hostedKey.envVar, - reason: classifyHostedKeyFailure(error), - }) - ) + ? tapStreamTermination(baseStream, { + onDrain: () => settleStreamingLlmCost(output, model, hostedKey, cached ?? false), + }) : baseStream return { From 4d27d1badcb8885c54cf6878a1930040a50ca50b Mon Sep 17 00:00:00 2001 From: Theodore Li Date: Thu, 18 Jun 2026 10:06:41 -0700 Subject: [PATCH 7/7] fix(providers): strip client-supplied hostedKey so it can't skew hosted-key cost/metrics --- apps/sim/providers/index.test.ts | 16 ++++++++++++++++ apps/sim/providers/index.ts | 5 +++++ 2 files changed, 21 insertions(+) diff --git a/apps/sim/providers/index.test.ts b/apps/sim/providers/index.test.ts index 73b62a79abc..cf6fda7e1ec 100644 --- a/apps/sim/providers/index.test.ts +++ b/apps/sim/providers/index.test.ts @@ -85,6 +85,22 @@ describe('executeProviderRequest — BYOK regression', () => { expect(result.cost?.output).toBe(0) }) + it('strips a client-supplied hostedKey so it cannot skew streaming cost/metrics', async () => { + // User-provided key path: no platform pool key acquired (no hostedKeyEnvVar). + mockGetApiKeyWithBYOK.mockResolvedValue({ apiKey: 'sk-user', isBYOK: false }) + mockExecuteRequest.mockResolvedValue(makeAnthropicResponse()) + + await executeProviderRequest('anthropic', { + model: 'claude-opus-4-6', + workspaceId: 'ws-1', + // Untrusted, client-supplied — must not reach the provider/streaming settlement. + hostedKey: { provider: 'anthropic', envVar: 'ANTHROPIC_API_KEY_1' }, + }) + + const passedRequest = mockExecuteRequest.mock.calls[0][0] as { hostedKey?: unknown } + expect(passedRequest.hostedKey).toBeUndefined() + }) + it('zeroes per-segment model cost for BYOK callers so trace aggregation does not re-charge', async () => { mockGetApiKeyWithBYOK.mockResolvedValue({ apiKey: 'sk-byok', isBYOK: true }) mockExecuteRequest.mockResolvedValue(makeAnthropicResponse()) diff --git a/apps/sim/providers/index.ts b/apps/sim/providers/index.ts index eb6bde1e6f7..d09eeecb0f2 100644 --- a/apps/sim/providers/index.ts +++ b/apps/sim/providers/index.ts @@ -36,6 +36,11 @@ function sanitizeRequest(request: ProviderRequest): ProviderRequest { const sanitizedRequest = { ...request } const model = sanitizedRequest.model + // `hostedKey` is server-only: strip any client-supplied value so it can never + // be trusted. executeProviderRequest sets it solely when it acquires a platform + // pool key, which gates streaming cost settlement / hosted-key metrics. + sanitizedRequest.hostedKey = undefined + if (model && !supportsTemperature(model)) { sanitizedRequest.temperature = undefined }