π File detail
services/api/claude.ts
π― Use case
This file lives under βservices/β, which covers long-lived services (LSP, MCP, OAuth, tool execution, memory, compaction, voice, settings sync, β¦). On the API surface it exposes getExtraBodyParams, getPromptCachingEnabled, getCacheControl, configureTaskBudgetParams, and getAPIMetadata (and more) β mainly functions, hooks, or classes. Dependencies touch @anthropic-ai, crypto, src, and bun:bundle. It composes internal code from constants, Tool, tools, types, and utils (relative imports).
Generated from folder role, exports, dependency roots, and inline comments β not hand-reviewed for every path.
π§ Inline summary
import type { BetaContentBlock, BetaContentBlockParam, BetaImageBlockParam, BetaJSONOutputFormat,
π€ Exports (heuristic)
getExtraBodyParamsgetPromptCachingEnabledgetCacheControlconfigureTaskBudgetParamsgetAPIMetadataverifyApiKeyuserMessageToMessageParamassistantMessageToMessageParamOptionsqueryModelWithoutStreamingstripExcessMediaItemscleanupStreamupdateUsageaccumulateUsageaddCacheBreakpointsbuildSystemPromptBlocksqueryHaikuqueryWithModelMAX_NON_STREAMING_TOKENSadjustParamsForNonStreaminggetMaxOutputTokensForModel
π External import roots
Package roots from from "β¦" (relative paths omitted).
@anthropic-aicryptosrcbun:bundlefallback hit the bounded timeout
π₯οΈ Source preview
import type {
BetaContentBlock,
BetaContentBlockParam,
BetaImageBlockParam,
BetaJSONOutputFormat,
BetaMessage,
BetaMessageDeltaUsage,
BetaMessageStreamParams,
BetaOutputConfig,
BetaRawMessageStreamEvent,
BetaRequestDocumentBlock,
BetaStopReason,
BetaToolChoiceAuto,
BetaToolChoiceTool,
BetaToolResultBlockParam,
BetaToolUnion,
BetaUsage,
BetaMessageParam as MessageParam,
} from '@anthropic-ai/sdk/resources/beta/messages/messages.mjs'
import type { TextBlockParam } from '@anthropic-ai/sdk/resources/index.mjs'
import type { Stream } from '@anthropic-ai/sdk/streaming.mjs'
import { randomUUID } from 'crypto'
import {
getAPIProvider,
isFirstPartyAnthropicBaseUrl,
} from 'src/utils/model/providers.js'
import {
getAttributionHeader,
getCLISyspromptPrefix,
} from '../../constants/system.js'
import {
getEmptyToolPermissionContext,
type QueryChainTracking,
type Tool,
type ToolPermissionContext,
type Tools,
toolMatchesName,
} from '../../Tool.js'
import type { AgentDefinition } from '../../tools/AgentTool/loadAgentsDir.js'
import {
type ConnectorTextBlock,
type ConnectorTextDelta,
isConnectorTextBlock,
} from '../../types/connectorText.js'
import type {
AssistantMessage,
Message,
StreamEvent,
SystemAPIErrorMessage,
UserMessage,
} from '../../types/message.js'
import {
type CacheScope,
logAPIPrefix,
splitSysPromptPrefix,
toolToAPISchema,
} from '../../utils/api.js'
import { getOauthAccountInfo } from '../../utils/auth.js'
import {
getBedrockExtraBodyParamsBetas,
getMergedBetas,
getModelBetas,
} from '../../utils/betas.js'
import { getOrCreateUserID } from '../../utils/config.js'
import {
CAPPED_DEFAULT_MAX_TOKENS,
getModelMaxOutputTokens,
getSonnet1mExpTreatmentEnabled,
} from '../../utils/context.js'
import { resolveAppliedEffort } from '../../utils/effort.js'
import { isEnvTruthy } from '../../utils/envUtils.js'
import { errorMessage } from '../../utils/errors.js'
import { computeFingerprintFromMessages } from '../../utils/fingerprint.js'
import { captureAPIRequest, logError } from '../../utils/log.js'
import {
createAssistantAPIErrorMessage,
createUserMessage,
ensureToolResultPairing,
normalizeContentFromAPI,
normalizeMessagesForAPI,
stripAdvisorBlocks,
stripCallerFieldFromAssistantMessage,
stripToolReferenceBlocksFromUserMessage,
} from '../../utils/messages.js'
import {
getDefaultOpusModel,
getDefaultSonnetModel,
getSmallFastModel,
isNonCustomOpusModel,
} from '../../utils/model/model.js'
import {
asSystemPrompt,
type SystemPrompt,
} from '../../utils/systemPromptType.js'
import { tokenCountFromLastAPIResponse } from '../../utils/tokens.js'
import { getDynamicConfig_BLOCKS_ON_INIT } from '../analytics/growthbook.js'
import {
currentLimits,
extractQuotaStatusFromError,
extractQuotaStatusFromHeaders,
} from '../claudeAiLimits.js'
import { getAPIContextManagement } from '../compact/apiMicrocompact.js'
/* eslint-disable @typescript-eslint/no-require-imports */
const autoModeStateModule = feature('TRANSCRIPT_CLASSIFIER')
? (require('../../utils/permissions/autoModeState.js') as typeof import('../../utils/permissions/autoModeState.js'))
: null
import { feature } from 'bun:bundle'
import type { ClientOptions } from '@anthropic-ai/sdk'
import {
APIConnectionTimeoutError,
APIError,
APIUserAbortError,
} from '@anthropic-ai/sdk/error'
import {
getAfkModeHeaderLatched,
getCacheEditingHeaderLatched,
getFastModeHeaderLatched,
getLastApiCompletionTimestamp,
getPromptCache1hAllowlist,
getPromptCache1hEligible,
getSessionId,
getThinkingClearLatched,
setAfkModeHeaderLatched,
setCacheEditingHeaderLatched,
setFastModeHeaderLatched,
setLastMainRequestId,
setPromptCache1hAllowlist,
setPromptCache1hEligible,
setThinkingClearLatched,
} from 'src/bootstrap/state.js'
import {
AFK_MODE_BETA_HEADER,
CONTEXT_1M_BETA_HEADER,
CONTEXT_MANAGEMENT_BETA_HEADER,
EFFORT_BETA_HEADER,
FAST_MODE_BETA_HEADER,
PROMPT_CACHING_SCOPE_BETA_HEADER,
REDACT_THINKING_BETA_HEADER,
STRUCTURED_OUTPUTS_BETA_HEADER,
TASK_BUDGETS_BETA_HEADER,
} from 'src/constants/betas.js'
import type { QuerySource } from 'src/constants/querySource.js'
import type { Notification } from 'src/context/notifications.js'
import { addToTotalSessionCost } from 'src/cost-tracker.js'
import { getFeatureValue_CACHED_MAY_BE_STALE } from 'src/services/analytics/growthbook.js'
import type { AgentId } from 'src/types/ids.js'
import {
ADVISOR_TOOL_INSTRUCTIONS,
getExperimentAdvisorModels,
isAdvisorEnabled,
isValidAdvisorModel,
modelSupportsAdvisor,
} from 'src/utils/advisor.js'
import { getAgentContext } from 'src/utils/agentContext.js'
import { isClaudeAISubscriber } from 'src/utils/auth.js'
import {
getToolSearchBetaHeader,
modelSupportsStructuredOutputs,
shouldIncludeFirstPartyOnlyBetas,
shouldUseGlobalCacheScope,
} from 'src/utils/betas.js'
import { CLAUDE_IN_CHROME_MCP_SERVER_NAME } from 'src/utils/claudeInChrome/common.js'
import { CHROME_TOOL_SEARCH_INSTRUCTIONS } from 'src/utils/claudeInChrome/prompt.js'
import { getMaxThinkingTokensForModel } from 'src/utils/context.js'
import { logForDebugging } from 'src/utils/debug.js'
import { logForDiagnosticsNoPII } from 'src/utils/diagLogs.js'
import { type EffortValue, modelSupportsEffort } from 'src/utils/effort.js'
import {
isFastModeAvailable,
isFastModeCooldown,
isFastModeEnabled,
isFastModeSupportedByModel,
} from 'src/utils/fastMode.js'
import { returnValue } from 'src/utils/generators.js'
import { headlessProfilerCheckpoint } from 'src/utils/headlessProfiler.js'
import { isMcpInstructionsDeltaEnabled } from 'src/utils/mcpInstructionsDelta.js'
import { calculateUSDCost } from 'src/utils/modelCost.js'
import { endQueryProfile, queryCheckpoint } from 'src/utils/queryProfiler.js'
import {
modelSupportsAdaptiveThinking,
modelSupportsThinking,
type ThinkingConfig,
} from 'src/utils/thinking.js'
import {
extractDiscoveredToolNames,
isDeferredToolsDeltaEnabled,
isToolSearchEnabled,
} from 'src/utils/toolSearch.js'
import { API_MAX_MEDIA_PER_REQUEST } from '../../constants/apiLimits.js'
import { ADVISOR_BETA_HEADER } from '../../constants/betas.js'
import {
formatDeferredToolLine,
isDeferredTool,
TOOL_SEARCH_TOOL_NAME,
} from '../../tools/ToolSearchTool/prompt.js'
import { count } from '../../utils/array.js'
import { insertBlockAfterToolResults } from '../../utils/contentArray.js'
import { validateBoundedIntEnvVar } from '../../utils/envValidation.js'
import { safeParseJSON } from '../../utils/json.js'
import { getInferenceProfileBackingModel } from '../../utils/model/bedrock.js'
import {
normalizeModelStringForAPI,
parseUserSpecifiedModel,
} from '../../utils/model/model.js'
import {
startSessionActivity,
stopSessionActivity,
} from '../../utils/sessionActivity.js'
import { jsonStringify } from '../../utils/slowOperations.js'
import {
isBetaTracingEnabled,
type LLMRequestNewContext,
startLLMRequestSpan,
} from '../../utils/telemetry/sessionTracing.js'
/* eslint-enable @typescript-eslint/no-require-imports */
import {
type AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
logEvent,
} from '../analytics/index.js'
import {
consumePendingCacheEdits,
getPinnedCacheEdits,
markToolsSentToAPIState,
pinCacheEdits,
} from '../compact/microCompact.js'
import { getInitializationStatus } from '../lsp/manager.js'
import { isToolFromMcpServer } from '../mcp/utils.js'
import { withStreamingVCR, withVCR } from '../vcr.js'
import { CLIENT_REQUEST_ID_HEADER, getAnthropicClient } from './client.js'
import {
API_ERROR_MESSAGE_PREFIX,
CUSTOM_OFF_SWITCH_MESSAGE,
getAssistantMessageFromError,
getErrorMessageIfRefusal,
} from './errors.js'
import {
EMPTY_USAGE,
type GlobalCacheStrategy,
logAPIError,
logAPIQuery,
logAPISuccessAndDuration,
type NonNullableUsage,
} from './logging.js'
import {
CACHE_TTL_1HOUR_MS,
checkResponseForCacheBreak,
recordPromptState,
} from './promptCacheBreakDetection.js'
import {
CannotRetryError,
FallbackTriggeredError,
is529Error,
type RetryContext,
withRetry,
} from './withRetry.js'
// Define a type that represents valid JSON values
type JsonValue = string | number | boolean | null | JsonObject | JsonArray
type JsonObject = { [key: string]: JsonValue }
type JsonArray = JsonValue[]
/**
* Assemble the extra body parameters for the API request, based on the
* CLAUDE_CODE_EXTRA_BODY environment variable if present and on any beta
* headers (primarily for Bedrock requests).
*
* @param betaHeaders - An array of beta headers to include in the request.
* @returns A JSON object representing the extra body parameters.
*/
export function getExtraBodyParams(betaHeaders?: string[]): JsonObject {
// Parse user's extra body parameters first
const extraBodyStr = process.env.CLAUDE_CODE_EXTRA_BODY
let result: JsonObject = {}
if (extraBodyStr) {
try {
// Parse as JSON, which can be null, boolean, number, string, array or object
const parsed = safeParseJSON(extraBodyStr)
// We expect an object with key-value pairs to spread into API parameters
if (parsed && typeof parsed === 'object' && !Array.isArray(parsed)) {
// Shallow clone β safeParseJSON is LRU-cached and returns the same
// object reference for the same string. Mutating `result` below
// would poison the cache, causing stale values to persist.
result = { ...(parsed as JsonObject) }
} else {
logForDebugging(
`CLAUDE_CODE_EXTRA_BODY env var must be a JSON object, but was given ${extraBodyStr}`,
{ level: 'error' },
)
}
} catch (error) {
logForDebugging(
`Error parsing CLAUDE_CODE_EXTRA_BODY: ${errorMessage(error)}`,
{ level: 'error' },
)
}
}
// Anti-distillation: send fake_tools opt-in for 1P CLI only
if (
feature('ANTI_DISTILLATION_CC')
? process.env.CLAUDE_CODE_ENTRYPOINT === 'cli' &&
shouldIncludeFirstPartyOnlyBetas() &&
getFeatureValue_CACHED_MAY_BE_STALE(
'tengu_anti_distill_fake_tool_injection',
false,
)
: false
) {
result.anti_distillation = ['fake_tools']
}
// Handle beta headers if provided
if (betaHeaders && betaHeaders.length > 0) {
if (result.anthropic_beta && Array.isArray(result.anthropic_beta)) {
// Add to existing array, avoiding duplicates
const existingHeaders = result.anthropic_beta as string[]
const newHeaders = betaHeaders.filter(
header => !existingHeaders.includes(header),
)
result.anthropic_beta = [...existingHeaders, ...newHeaders]
} else {
// Create new array with the beta headers
result.anthropic_beta = betaHeaders
}
}
return result
}
export function getPromptCachingEnabled(model: string): boolean {
// Global disable takes precedence
if (isEnvTruthy(process.env.DISABLE_PROMPT_CACHING)) return false
// Check if we should disable for small/fast model
if (isEnvTruthy(process.env.DISABLE_PROMPT_CACHING_HAIKU)) {
const smallFastModel = getSmallFastModel()
if (model === smallFastModel) return false
}
// Check if we should disable for default Sonnet
if (isEnvTruthy(process.env.DISABLE_PROMPT_CACHING_SONNET)) {
const defaultSonnet = getDefaultSonnetModel()
if (model === defaultSonnet) return false
}
// Check if we should disable for default Opus
if (isEnvTruthy(process.env.DISABLE_PROMPT_CACHING_OPUS)) {
const defaultOpus = getDefaultOpusModel()
if (model === defaultOpus) return false
}
return true
}
export function getCacheControl({
scope,
querySource,
}: {
scope?: CacheScope
querySource?: QuerySource
} = {}): {
type: 'ephemeral'
ttl?: '1h'
scope?: CacheScope
} {
return {
type: 'ephemeral',
...(should1hCacheTTL(querySource) && { ttl: '1h' }),
...(scope === 'global' && { scope }),
}
}
/**
* Determines if 1h TTL should be used for prompt caching.
*
* Only applied when:
* 1. User is eligible (ant or subscriber within rate limits)
* 2. The query source matches a pattern in the GrowthBook allowlist
*
* GrowthBook config shape: { allowlist: string[] }
* Patterns support trailing '*' for prefix matching.
* Examples:
* - { allowlist: ["repl_main_thread*", "sdk"] } β main thread + SDK only
* - { allowlist: ["repl_main_thread*", "sdk", "agent:*"] } β also subagents
* - { allowlist: ["*"] } β all sources
*
* The allowlist is cached in STATE for session stability β prevents mixed
* TTLs when GrowthBook's disk cache updates mid-request.
*/
function should1hCacheTTL(querySource?: QuerySource): boolean {
// 3P Bedrock users get 1h TTL when opted in via env var β they manage their own billing
// No GrowthBook gating needed since 3P users don't have GrowthBook configured
if (
getAPIProvider() === 'bedrock' &&
isEnvTruthy(process.env.ENABLE_PROMPT_CACHING_1H_BEDROCK)
) {
return true
}
// Latch eligibility in bootstrap state for session stability β prevents
// mid-session overage flips from changing the cache_control TTL, which
// would bust the server-side prompt cache (~20K tokens per flip).
let userEligible = getPromptCache1hEligible()
if (userEligible === null) {
userEligible =
process.env.USER_TYPE === 'ant' ||
(isClaudeAISubscriber() && !currentLimits.isUsingOverage)
setPromptCache1hEligible(userEligible)
}
if (!userEligible) return false
// Cache allowlist in bootstrap state for session stability β prevents mixed
// TTLs when GrowthBook's disk cache updates mid-request
let allowlist = getPromptCache1hAllowlist()
if (allowlist === null) {
const config = getFeatureValue_CACHED_MAY_BE_STALE<{
allowlist?: string[]
}>('tengu_prompt_cache_1h_config', {})
allowlist = config.allowlist ?? []
setPromptCache1hAllowlist(allowlist)
}
return (
querySource !== undefined &&
allowlist.some(pattern =>
pattern.endsWith('*')
? querySource.startsWith(pattern.slice(0, -1))
: querySource === pattern,
)
)
}
/**
* Configure effort parameters for API request.
*
*/
function configureEffortParams(
effortValue: EffortValue | undefined,
outputConfig: BetaOutputConfig,
extraBodyParams: Record<string, unknown>,
betas: string[],
model: string,
): void {
if (!modelSupportsEffort(model) || 'effort' in outputConfig) {
return
}
if (effortValue === undefined) {
betas.push(EFFORT_BETA_HEADER)
} else if (typeof effortValue === 'string') {
// Send string effort level as is
outputConfig.effort = effortValue
betas.push(EFFORT_BETA_HEADER)
} else if (process.env.USER_TYPE === 'ant') {
// Numeric effort override - ant-only (uses anthropic_internal)
const existingInternal =
(extraBodyParams.anthropic_internal as Record<string, unknown>) || {}
extraBodyParams.anthropic_internal = {
...existingInternal,
effort_override: effortValue,
}
}
}
// output_config.task_budget β API-side token budget awareness for the model.
// Stainless SDK types don't yet include task_budget on BetaOutputConfig, so we
// define the wire shape locally and cast. The API validates on receipt; see
// api/api/schemas/messages/request/output_config.py:12-39 in the monorepo.
// Beta: task-budgets-2026-03-13 (EAP, claude-strudel-eap only as of Mar 2026).
type TaskBudgetParam = {
type: 'tokens'
total: number
remaining?: number
}
export function configureTaskBudgetParams(
taskBudget: Options['taskBudget'],
outputConfig: BetaOutputConfig & { task_budget?: TaskBudgetParam },
betas: string[],
): void {
if (
!taskBudget ||
'task_budget' in outputConfig ||
!shouldIncludeFirstPartyOnlyBetas()
) {
return
}
outputConfig.task_budget = {
type: 'tokens',
total: taskBudget.total,
...(taskBudget.remaining !== undefined && {
remaining: taskBudget.remaining,
}),
}
if (!betas.includes(TASK_BUDGETS_BETA_HEADER)) {
betas.push(TASK_BUDGETS_BETA_HEADER)
}
}
export function getAPIMetadata() {
// https://docs.google.com/document/d/1dURO9ycXXQCBS0V4Vhl4poDBRgkelFc5t2BNPoEgH5Q/edit?tab=t.0#heading=h.5g7nec5b09w5
let extra: JsonObject = {}
const extraStr = process.env.CLAUDE_CODE_EXTRA_METADATA
if (extraStr) {
const parsed = safeParseJSON(extraStr, false)
if (parsed && typeof parsed === 'object' && !Array.isArray(parsed)) {
extra = parsed as JsonObject
} else {
logForDebugging(
`CLAUDE_CODE_EXTRA_METADATA env var must be a JSON object, but was given ${extraStr}`,
{ level: 'error' },
)
}
}
return {
user_id: jsonStringify({
...extra,
device_id: getOrCreateUserID(),
// Only include OAuth account UUID when actively using OAuth authentication
account_uuid: getOauthAccountInfo()?.accountUuid ?? '',
session_id: getSessionId(),
}),
}
}
export async function verifyApiKey(
apiKey: string,
isNonInteractiveSession: boolean,
): Promise<boolean> {
// Skip API verification if running in print mode (isNonInteractiveSession)
if (isNonInteractiveSession) {
return true
}
try {
// WARNING: if you change this to use a non-Haiku model, this request will fail in 1P unless it uses getCLISyspromptPrefix.
const model = getSmallFastModel()
const betas = getModelBetas(model)
return await returnValue(
withRetry(
() =>
getAnthropicClient({
apiKey,
maxRetries: 3,
model,
source: 'verify_api_key',
}),
async anthropic => {
const messages: MessageParam[] = [{ role: 'user', content: 'test' }]
// biome-ignore lint/plugin: API key verification is intentionally a minimal direct call
await anthropic.beta.messages.create({
model,
max_tokens: 1,
messages,
temperature: 1,
...(betas.length > 0 && { betas }),
metadata: getAPIMetadata(),
...getExtraBodyParams(),
})
return true
},
{ maxRetries: 2, model, thinkingConfig: { type: 'disabled' } }, // Use fewer retries for API key verification
),
)
} catch (errorFromRetry) {
let error = errorFromRetry
if (errorFromRetry instanceof CannotRetryError) {
error = errorFromRetry.originalError
}
logError(error)
// Check for authentication error
if (
error instanceof Error &&
error.message.includes(
'{"type":"error","error":{"type":"authentication_error","message":"invalid x-api-key"}}',
)
) {
return false
}
throw error
}
}
export function userMessageToMessageParam(
message: UserMessage,
addCache = false,
enablePromptCaching: boolean,
querySource?: QuerySource,
): MessageParam {
if (addCache) {
if (typeof message.message.content === 'string') {
return {
role: 'user',
content: [
{
type: 'text',
text: message.message.content,
...(enablePromptCaching && {
cache_control: getCacheControl({ querySource }),
}),
},
],
}
} else {
return {
role: 'user',
content: message.message.content.map((_, i) => ({
..._,
...(i === message.message.content.length - 1
? enablePromptCaching
? { cache_control: getCacheControl({ querySource }) }
: {}
: {}),
})),
}
}
}
// Clone array content to prevent in-place mutations (e.g., insertCacheEditsBlock's
// splice) from contaminating the original message. Without cloning, multiple calls
// to addCacheBreakpoints share the same array and each splices in duplicate cache_edits.
return {
role: 'user',
content: Array.isArray(message.message.content)
? [...message.message.content]
: message.message.content,
}
}
export function assistantMessageToMessageParam(
message: AssistantMessage,
addCache = false,
enablePromptCaching: boolean,
querySource?: QuerySource,
): MessageParam {
if (addCache) {
if (typeof message.message.content === 'string') {
return {
role: 'assistant',
content: [
{
type: 'text',
text: message.message.content,
...(enablePromptCaching && {
cache_control: getCacheControl({ querySource }),
}),
},
],
}
} else {
return {
role: 'assistant',
content: message.message.content.map((_, i) => ({
..._,
...(i === message.message.content.length - 1 &&
_.type !== 'thinking' &&
_.type !== 'redacted_thinking' &&
(feature('CONNECTOR_TEXT') ? !isConnectorTextBlock(_) : true)
? enablePromptCaching
? { cache_control: getCacheControl({ querySource }) }
: {}
: {}),
})),
}
}
}
return {
role: 'assistant',
content: message.message.content,
}
}
export type Options = {
getToolPermissionContext: () => Promise<ToolPermissionContext>
model: string
toolChoice?: BetaToolChoiceTool | BetaToolChoiceAuto | undefined
isNonInteractiveSession: boolean
extraToolSchemas?: BetaToolUnion[]
maxOutputTokensOverride?: number
fallbackModel?: string
onStreamingFallback?: () => void
querySource: QuerySource
agents: AgentDefinition[]
allowedAgentTypes?: string[]
hasAppendSystemPrompt: boolean
fetchOverride?: ClientOptions['fetch']
enablePromptCaching?: boolean
skipCacheWrite?: boolean
temperatureOverride?: number
effortValue?: EffortValue
mcpTools: Tools
hasPendingMcpServers?: boolean
queryTracking?: QueryChainTracking
agentId?: AgentId // Only set for subagents
outputFormat?: BetaJSONOutputFormat
fastMode?: boolean
advisorModel?: string
addNotification?: (notif: Notification) => void
// API-side task budget (output_config.task_budget). Distinct from the
// tokenBudget.ts +500k auto-continue feature β this one is sent to the API
// so the model can pace itself. `remaining` is computed by the caller
// (query.ts decrements across the agentic loop).
taskBudget?: { total: number; remaining?: number }
}
export async function queryModelWithoutStreaming({
messages,
systemPrompt,
thinkingConfig,
tools,
signal,
options,
}: {
messages: Message[]
systemPrompt: SystemPrompt
thinkingConfig: ThinkingConfig
tools: Tools
signal: AbortSignal
options: Options
}): Promise<AssistantMessage> {
// Store the assistant message but continue consuming the generator to ensure
// logAPISuccessAndDuration gets called (which happens after all yields)
let assistantMessage: AssistantMessage | undefined
for await (const message of withStreamingVCR(messages, async function* () {
yield* queryModel(
messages,
systemPrompt,
thinkingConfig,
tools,
signal,
options,
)
})) {
if (message.type === 'assistant') {
assistantMessage = message
}
}
if (!assistantMessage) {
// If the signal was aborted, throw APIUserAbortError instead of a generic error
// This allows callers to handle abort scenarios gracefully
if (signal.aborted) {
throw new APIUserAbortError()
}
throw new Error('No assistant message found')
}
return assistantMessage
}
export async function* queryModelWithStreaming({
messages,
systemPrompt,
thinkingConfig,
tools,
signal,
options,
}: {
messages: Message[]
systemPrompt: SystemPrompt
thinkingConfig: ThinkingConfig
tools: Tools
signal: AbortSignal
options: Options
}): AsyncGenerator<
StreamEvent | AssistantMessage | SystemAPIErrorMessage,
void
> {
return yield* withStreamingVCR(messages, async function* () {
yield* queryModel(
messages,
systemPrompt,
thinkingConfig,
tools,
signal,
options,
)
})
}
/**
* Determines if an LSP tool should be deferred (tool appears with defer_loading: true)
* because LSP initialization is not yet complete.
*/
function shouldDeferLspTool(tool: Tool): boolean {
if (!('isLsp' in tool) || !tool.isLsp) {
return false
}
const status = getInitializationStatus()
// Defer when pending or not started
return status.status === 'pending' || status.status === 'not-started'
}
/**
* Per-attempt timeout for non-streaming fallback requests, in milliseconds.
* Reads API_TIMEOUT_MS when set so slow backends and the streaming path
* share the same ceiling.
*
* Remote sessions default to 120s to stay under CCR's container idle-kill
* (~5min) so a hung fallback to a wedged backend surfaces a clean
* APIConnectionTimeoutError instead of stalling past SIGKILL.
*
* Otherwise defaults to 300s β long enough for slow backends without
* approaching the API's 10-minute non-streaming boundary.
*/
function getNonstreamingFallbackTimeoutMs(): number {
const override = parseInt(process.env.API_TIMEOUT_MS || '', 10)
if (override) return override
return isEnvTruthy(process.env.CLAUDE_CODE_REMOTE) ? 120_000 : 300_000
}
/**
* Helper generator for non-streaming API requests.
* Encapsulates the common pattern of creating a withRetry generator,
* iterating to yield system messages, and returning the final BetaMessage.
*/
export async function* executeNonStreamingRequest(
clientOptions: {
model: string
fetchOverride?: Options['fetchOverride']
source: string
},
retryOptions: {
model: string
fallbackModel?: string
thinkingConfig: ThinkingConfig
fastMode?: boolean
signal: AbortSignal
initialConsecutive529Errors?: number
querySource?: QuerySource
},
paramsFromContext: (context: RetryContext) => BetaMessageStreamParams,
onAttempt: (attempt: number, start: number, maxOutputTokens: number) => void,
captureRequest: (params: BetaMessageStreamParams) => void,
/**
* Request ID of the failed streaming attempt this fallback is recovering
* from. Emitted in tengu_nonstreaming_fallback_error for funnel correlation.
*/
originatingRequestId?: string | null,
): AsyncGenerator<SystemAPIErrorMessage, BetaMessage> {
const fallbackTimeoutMs = getNonstreamingFallbackTimeoutMs()
const generator = withRetry(
() =>
getAnthropicClient({
maxRetries: 0,
model: clientOptions.model,
fetchOverride: clientOptions.fetchOverride,
source: clientOptions.source,
}),
async (anthropic, attempt, context) => {
const start = Date.now()
const retryParams = paramsFromContext(context)
captureRequest(retryParams)
onAttempt(attempt, start, retryParams.max_tokens)
const adjustedParams = adjustParamsForNonStreaming(
retryParams,
MAX_NON_STREAMING_TOKENS,
)
try {
// biome-ignore lint/plugin: non-streaming API call
return await anthropic.beta.messages.create(
{
...adjustedParams,
model: normalizeModelStringForAPI(adjustedParams.model),
},
{
signal: retryOptions.signal,
timeout: fallbackTimeoutMs,
},
)
} catch (err) {
// User aborts are not errors β re-throw immediately without logging
if (err instanceof APIUserAbortError) throw err
// Instrumentation: record when the non-streaming request errors (including
// timeouts). Lets us distinguish "fallback hung past container kill"
// (no event) from "fallback hit the bounded timeout" (this event).
logForDiagnosticsNoPII('error', 'cli_nonstreaming_fallback_error')
logEvent('tengu_nonstreaming_fallback_error', {
model:
clientOptions.model as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
error:
err instanceof Error
? (err.name as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS)
: ('unknown' as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS),
attempt,
timeout_ms: fallbackTimeoutMs,
request_id: (originatingRequestId ??
'unknown') as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
})
throw err
}
},
{
model: retryOptions.model,
fallbackModel: retryOptions.fallbackModel,
thinkingConfig: retryOptions.thinkingConfig,
...(isFastModeEnabled() && { fastMode: retryOptions.fastMode }),
signal: retryOptions.signal,
initialConsecutive529Errors: retryOptions.initialConsecutive529Errors,
querySource: retryOptions.querySource,
},
)
let e
do {
e = await generator.next()
if (!e.done && e.value.type === 'system') {
yield e.value
}
} while (!e.done)
return e.value as BetaMessage
}
/**
* Extracts the request ID from the most recent assistant message in the
* conversation. Used to link consecutive API requests in analytics so we can
* join them for cache-hit-rate analysis and incremental token tracking.
*
* Deriving this from the message array (rather than global state) ensures each
* query chain (main thread, subagent, teammate) tracks its own request chain
* independently, and rollback/undo naturally updates the value.
*/
function getPreviousRequestIdFromMessages(
messages: Message[],
): string | undefined {
for (let i = messages.length - 1; i >= 0; i--) {
const msg = messages[i]!
if (msg.type === 'assistant' && msg.requestId) {
return msg.requestId
}
}
return undefined
}
function isMedia(
block: BetaContentBlockParam,
): block is BetaImageBlockParam | BetaRequestDocumentBlock {
return block.type === 'image' || block.type === 'document'
}
function isToolResult(
block: BetaContentBlockParam,
): block is BetaToolResultBlockParam {
return block.type === 'tool_result'
}
/**
* Ensures messages contain at most `limit` media items (images + documents).
* Strips oldest media first to preserve the most recent.
*/
export function stripExcessMediaItems(
messages: (UserMessage | AssistantMessage)[],
limit: number,
): (UserMessage | AssistantMessage)[] {
let toRemove = 0
for (const msg of messages) {
if (!Array.isArray(msg.message.content)) continue
for (const block of msg.message.content) {
if (isMedia(block)) toRemove++
if (isToolResult(block) && Array.isArray(block.content)) {
for (const nested of block.content) {
if (isMedia(nested)) toRemove++
}
}
}
}
toRemove -= limit
if (toRemove <= 0) return messages
return messages.map(msg => {
if (toRemove <= 0) return msg
const content = msg.message.content
if (!Array.isArray(content)) return msg
const before = toRemove
const stripped = content
.map(block => {
if (
toRemove <= 0 ||
!isToolResult(block) ||
!Array.isArray(block.content)
)
return block
const filtered = block.content.filter(n => {
if (toRemove > 0 && isMedia(n)) {
toRemove--
return false
}
return true
})
return filtered.length === block.content.length
? block
: { ...block, content: filtered }
})
.filter(block => {
if (toRemove > 0 && isMedia(block)) {
toRemove--
return false
}
return true
})
return before === toRemove
? msg
: {
...msg,
message: { ...msg.message, content: stripped },
}
}) as (UserMessage | AssistantMessage)[]
}
async function* queryModel(
messages: Message[],
systemPrompt: SystemPrompt,
thinkingConfig: ThinkingConfig,
tools: Tools,
signal: AbortSignal,
options: Options,
): AsyncGenerator<
StreamEvent | AssistantMessage | SystemAPIErrorMessage,
void
> {
// Check cheap conditions first β the off-switch await blocks on GrowthBook
// init (~10ms). For non-Opus models (haiku, sonnet) this skips the await
// entirely. Subscribers don't hit this path at all.
if (
!isClaudeAISubscriber() &&
isNonCustomOpusModel(options.model) &&
(
await getDynamicConfig_BLOCKS_ON_INIT<{ activated: boolean }>(
'tengu-off-switch',
{
activated: false,
},
)
).activated
) {
logEvent('tengu_off_switch_query', {})
yield getAssistantMessageFromError(
new Error(CUSTOM_OFF_SWITCH_MESSAGE),
options.model,
)
return
}
// Derive previous request ID from the last assistant message in this query chain.
// This is scoped per message array (main thread, subagent, teammate each have their own),
// so concurrent agents don't clobber each other's request chain tracking.
// Also naturally handles rollback/undo since removed messages won't be in the array.
const previousRequestId = getPreviousRequestIdFromMessages(messages)
const resolvedModel =
getAPIProvider() === 'bedrock' &&
options.model.includes('application-inference-profile')
? ((await getInferenceProfileBackingModel(options.model)) ??
options.model)
: options.model
queryCheckpoint('query_tool_schema_build_start')
const isAgenticQuery =
options.querySource.startsWith('repl_main_thread') ||
options.querySource.startsWith('agent:') ||
options.querySource === 'sdk' ||
options.querySource === 'hook_agent' ||
options.querySource === 'verification_agent'
const betas = getMergedBetas(options.model, { isAgenticQuery })
// Always send the advisor beta header when advisor is enabled, so
// non-agentic queries (compact, side_question, extract_memories, etc.)
// can parse advisor server_tool_use blocks already in the conversation history.
if (isAdvisorEnabled()) {
betas.push(ADVISOR_BETA_HEADER)
}
let advisorModel: string | undefined
if (isAgenticQuery && isAdvisorEnabled()) {
let advisorOption = options.advisorModel
const advisorExperiment = getExperimentAdvisorModels()
if (advisorExperiment !== undefined) {
if (
normalizeModelStringForAPI(advisorExperiment.baseModel) ===
normalizeModelStringForAPI(options.model)
) {
// Override the advisor model if the base model matches. We
// should only have experiment models if the user cannot
// configure it themselves.
advisorOption = advisorExperiment.advisorModel
}
}
if (advisorOption) {
const normalizedAdvisorModel = normalizeModelStringForAPI(
parseUserSpecifiedModel(advisorOption),
)
if (!modelSupportsAdvisor(options.model)) {
logForDebugging(
`[AdvisorTool] Skipping advisor - base model ${options.model} does not support advisor`,
)
} else if (!isValidAdvisorModel(normalizedAdvisorModel)) {
logForDebugging(
`[AdvisorTool] Skipping advisor - ${normalizedAdvisorModel} is not a valid advisor model`,
)
} else {
advisorModel = normalizedAdvisorModel
logForDebugging(
`[AdvisorTool] Server-side tool enabled with ${advisorModel} as the advisor model`,
)
}
}
}
// Check if tool search is enabled (checks mode, model support, and threshold for auto mode)
// This is async because it may need to calculate MCP tool description sizes for TstAuto mode
let useToolSearch = await isToolSearchEnabled(
options.model,
tools,
options.getToolPermissionContext,
options.agents,
'query',
)
// Precompute once β isDeferredTool does 2 GrowthBook lookups per call
const deferredToolNames = new Set<string>()
if (useToolSearch) {
for (const t of tools) {
if (isDeferredTool(t)) deferredToolNames.add(t.name)
}
}
// Even if tool search mode is enabled, skip if there are no deferred tools
// AND no MCP servers are still connecting. When servers are pending, keep
// ToolSearch available so the model can discover tools after they connect.
if (
useToolSearch &&
deferredToolNames.size === 0 &&
!options.hasPendingMcpServers
) {
logForDebugging(
'Tool search disabled: no deferred tools available to search',
)
useToolSearch = false
}
// Filter out ToolSearchTool if tool search is not enabled for this model
// ToolSearchTool returns tool_reference blocks which unsupported models can't handle
let filteredTools: Tools
if (useToolSearch) {
// Dynamic tool loading: Only include deferred tools that have been discovered
// via tool_reference blocks in the message history. This eliminates the need
// to predeclare all deferred tools upfront and removes limits on tool quantity.
const discoveredToolNames = extractDiscoveredToolNames(messages)
filteredTools = tools.filter(tool => {
// Always include non-deferred tools
if (!deferredToolNames.has(tool.name)) return true
// Always include ToolSearchTool (so it can discover more tools)
if (toolMatchesName(tool, TOOL_SEARCH_TOOL_NAME)) return true
// Only include deferred tools that have been discovered
return discoveredToolNames.has(tool.name)
})
} else {
filteredTools = tools.filter(
t => !toolMatchesName(t, TOOL_SEARCH_TOOL_NAME),
)
}
// Add tool search beta header if enabled - required for defer_loading to be accepted
// Header differs by provider: 1P/Foundry use advanced-tool-use, Vertex/Bedrock use tool-search-tool
// For Bedrock, this header must go in extraBodyParams, not the betas array
const toolSearchHeader = useToolSearch ? getToolSearchBetaHeader() : null
if (toolSearchHeader && getAPIProvider() !== 'bedrock') {
if (!betas.includes(toolSearchHeader)) {
betas.push(toolSearchHeader)
}
}
// Determine if cached microcompact is enabled for this model.
// Computed once here (in async context) and captured by paramsFromContext.
// The beta header is also captured here to avoid a top-level import of the
// ant-only CACHE_EDITING_BETA_HEADER constant.
let cachedMCEnabled = false
let cacheEditingBetaHeader = ''
if (feature('CACHED_MICROCOMPACT')) {
const {
isCachedMicrocompactEnabled,
isModelSupportedForCacheEditing,
getCachedMCConfig,
} = await import('../compact/cachedMicrocompact.js')
const betas = await import('src/constants/betas.js')
cacheEditingBetaHeader = betas.CACHE_EDITING_BETA_HEADER
const featureEnabled = isCachedMicrocompactEnabled()
const modelSupported = isModelSupportedForCacheEditing(options.model)
cachedMCEnabled = featureEnabled && modelSupported
const config = getCachedMCConfig()
logForDebugging(
`Cached MC gate: enabled=${featureEnabled} modelSupported=${modelSupported} model=${options.model} supportedModels=${jsonStringify(config.supportedModels)}`,
)
}
const useGlobalCacheFeature = shouldUseGlobalCacheScope()
const willDefer = (t: Tool) =>
useToolSearch && (deferredToolNames.has(t.name) || shouldDeferLspTool(t))
// MCP tools are per-user β dynamic tool section β can't globally cache.
// Only gate when an MCP tool will actually render (not defer_loading).
const needsToolBasedCacheMarker =
useGlobalCacheFeature &&
filteredTools.some(t => t.isMcp === true && !willDefer(t))
// Ensure prompt_caching_scope beta header is present when global cache is enabled.
if (
useGlobalCacheFeature &&
!betas.includes(PROMPT_CACHING_SCOPE_BETA_HEADER)
) {
betas.push(PROMPT_CACHING_SCOPE_BETA_HEADER)
}
// Determine global cache strategy for logging
const globalCacheStrategy: GlobalCacheStrategy = useGlobalCacheFeature
? needsToolBasedCacheMarker
? 'none'
: 'system_prompt'
: 'none'
// Build tool schemas, adding defer_loading for MCP tools when tool search is enabled
// Note: We pass the full `tools` list (not filteredTools) to toolToAPISchema so that
// ToolSearchTool's prompt can list ALL available MCP tools. The filtering only affects
// which tools are actually sent to the API, not what the model sees in tool descriptions.
const toolSchemas = await Promise.all(
filteredTools.map(tool =>
toolToAPISchema(tool, {
getToolPermissionContext: options.getToolPermissionContext,
tools,
agents: options.agents,
allowedAgentTypes: options.allowedAgentTypes,
model: options.model,
deferLoading: willDefer(tool),
}),
),
)
if (useToolSearch) {
const includedDeferredTools = count(filteredTools, t =>
deferredToolNames.has(t.name),
)
logForDebugging(
`Dynamic tool loading: ${includedDeferredTools}/${deferredToolNames.size} deferred tools included`,
)
}
queryCheckpoint('query_tool_schema_build_end')
// Normalize messages before building system prompt (needed for fingerprinting)
// Instrumentation: Track message count before normalization
logEvent('tengu_api_before_normalize', {
preNormalizedMessageCount: messages.length,
})
queryCheckpoint('query_message_normalization_start')
let messagesForAPI = normalizeMessagesForAPI(messages, filteredTools)
queryCheckpoint('query_message_normalization_end')
// Model-specific post-processing: strip tool-search-specific fields if the
// selected model doesn't support tool search.
//
// Why is this needed in addition to normalizeMessagesForAPI?
// - normalizeMessagesForAPI uses isToolSearchEnabledNoModelCheck() because it's
// called from ~20 places (analytics, feedback, sharing, etc.), many of which
// don't have model context. Adding model to its signature would be a large refactor.
// - This post-processing uses the model-aware isToolSearchEnabled() check
// - This handles mid-conversation model switching (e.g., Sonnet β Haiku) where
// stale tool-search fields from the previous model would cause 400 errors
//
// Note: For assistant messages, normalizeMessagesForAPI already normalized the
// tool inputs, so stripCallerFieldFromAssistantMessage only needs to remove the
// 'caller' field (not re-normalize inputs).
if (!useToolSearch) {
messagesForAPI = messagesForAPI.map(msg => {
switch (msg.type) {
case 'user':
// Strip tool_reference blocks from tool_result content
return stripToolReferenceBlocksFromUserMessage(msg)
case 'assistant':
// Strip 'caller' field from tool_use blocks
return stripCallerFieldFromAssistantMessage(msg)
default:
return msg
}
})
}
// Repair tool_use/tool_result pairing mismatches that can occur when resuming
// remote/teleport sessions. Inserts synthetic error tool_results for orphaned
// tool_uses and strips orphaned tool_results referencing non-existent tool_uses.
messagesForAPI = ensureToolResultPairing(messagesForAPI)
// Strip advisor blocks β the API rejects them without the beta header.
if (!betas.includes(ADVISOR_BETA_HEADER)) {
messagesForAPI = stripAdvisorBlocks(messagesForAPI)
}
// Strip excess media items before making the API call.
// The API rejects requests with >100 media items but returns a confusing error.
// Rather than erroring (which is hard to recover from in Cowork/CCD), we
// silently drop the oldest media items to stay within the limit.
messagesForAPI = stripExcessMediaItems(
messagesForAPI,
API_MAX_MEDIA_PER_REQUEST,
)
// Instrumentation: Track message count after normalization
logEvent('tengu_api_after_normalize', {
postNormalizedMessageCount: messagesForAPI.length,
})
// Compute fingerprint from first user message for attribution.
// Must run BEFORE injecting synthetic messages (e.g. deferred tool names)
// so the fingerprint reflects the actual user input.
const fingerprint = computeFingerprintFromMessages(messagesForAPI)
// When the delta attachment is enabled, deferred tools are announced
// via persisted deferred_tools_delta attachments instead of this
// ephemeral prepend (which busts cache whenever the pool changes).
if (useToolSearch && !isDeferredToolsDeltaEnabled()) {
const deferredToolList = tools
.filter(t => deferredToolNames.has(t.name))
.map(formatDeferredToolLine)
.sort()
.join('\n')
if (deferredToolList) {
messagesForAPI = [
createUserMessage({
content: `<available-deferred-tools>\n${deferredToolList}\n</available-deferred-tools>`,
isMeta: true,
}),
...messagesForAPI,
]
}
}
// Chrome tool-search instructions: when the delta attachment is enabled,
// these are carried as a client-side block in mcp_instructions_delta
// (attachments.ts) instead of here. This per-request sys-prompt append
// busts the prompt cache when chrome connects late.
const hasChromeTools = filteredTools.some(t =>
isToolFromMcpServer(t.name, CLAUDE_IN_CHROME_MCP_SERVER_NAME),
)
const injectChromeHere =
useToolSearch && hasChromeTools && !isMcpInstructionsDeltaEnabled()
// filter(Boolean) works by converting each element to a boolean - empty strings become false and are filtered out.
systemPrompt = asSystemPrompt(
[
getAttributionHeader(fingerprint),
getCLISyspromptPrefix({
isNonInteractive: options.isNonInteractiveSession,
hasAppendSystemPrompt: options.hasAppendSystemPrompt,
}),
...systemPrompt,
...(advisorModel ? [ADVISOR_TOOL_INSTRUCTIONS] : []),
...(injectChromeHere ? [CHROME_TOOL_SEARCH_INSTRUCTIONS] : []),
].filter(Boolean),
)
// Prepend system prompt block for easy API identification
logAPIPrefix(systemPrompt)
const enablePromptCaching =
options.enablePromptCaching ?? getPromptCachingEnabled(options.model)
const system = buildSystemPromptBlocks(systemPrompt, enablePromptCaching, {
skipGlobalCacheForSystemPrompt: needsToolBasedCacheMarker,
querySource: options.querySource,
})
const useBetas = betas.length > 0
// Build minimal context for detailed tracing (when beta tracing is enabled)
// Note: The actual new_context message extraction is done in sessionTracing.ts using
// hash-based tracking per querySource (agent) from the messagesForAPI array
const extraToolSchemas = [...(options.extraToolSchemas ?? [])]
if (advisorModel) {
// Server tools must be in the tools array by API contract. Appended after
// toolSchemas (which carries the cache_control marker) so toggling /advisor
// only churns the small suffix, not the cached prefix.
extraToolSchemas.push({
type: 'advisor_20260301',
name: 'advisor',
model: advisorModel,
} as unknown as BetaToolUnion)
}
const allTools = [...toolSchemas, ...extraToolSchemas]
const isFastMode =
isFastModeEnabled() &&
isFastModeAvailable() &&
!isFastModeCooldown() &&
isFastModeSupportedByModel(options.model) &&
!!options.fastMode
// Sticky-on latches for dynamic beta headers. Each header, once first
// sent, keeps being sent for the rest of the session so mid-session
// toggles don't change the server-side cache key and bust ~50-70K tokens.
// Latches are cleared on /clear and /compact via clearBetaHeaderLatches().
// Per-call gates (isAgenticQuery, querySource===repl_main_thread) stay
// per-call so non-agentic queries keep their own stable header set.
let afkHeaderLatched = getAfkModeHeaderLatched() === true
if (feature('TRANSCRIPT_CLASSIFIER')) {
if (
!afkHeaderLatched &&
isAgenticQuery &&
shouldIncludeFirstPartyOnlyBetas() &&
(autoModeStateModule?.isAutoModeActive() ?? false)
) {
afkHeaderLatched = true
setAfkModeHeaderLatched(true)
}
}
let fastModeHeaderLatched = getFastModeHeaderLatched() === true
if (!fastModeHeaderLatched && isFastMode) {
fastModeHeaderLatched = true
setFastModeHeaderLatched(true)
}
let cacheEditingHeaderLatched = getCacheEditingHeaderLatched() === true
if (feature('CACHED_MICROCOMPACT')) {
if (
!cacheEditingHeaderLatched &&
cachedMCEnabled &&
getAPIProvider() === 'firstParty' &&
options.querySource === 'repl_main_thread'
) {
cacheEditingHeaderLatched = true
setCacheEditingHeaderLatched(true)
}
}
// Only latch from agentic queries so a classifier call doesn't flip the
// main thread's context_management mid-turn.
let thinkingClearLatched = getThinkingClearLatched() === true
if (!thinkingClearLatched && isAgenticQuery) {
const lastCompletion = getLastApiCompletionTimestamp()
if (
lastCompletion !== null &&
Date.now() - lastCompletion > CACHE_TTL_1HOUR_MS
) {
thinkingClearLatched = true
setThinkingClearLatched(true)
}
}
const effort = resolveAppliedEffort(options.model, options.effortValue)
if (feature('PROMPT_CACHE_BREAK_DETECTION')) {
// Exclude defer_loading tools from the hash -- the API strips them from the
// prompt, so they never affect the actual cache key. Including them creates
// false-positive "tool schemas changed" breaks when tools are discovered or
// MCP servers reconnect.
const toolsForCacheDetection = allTools.filter(
t => !('defer_loading' in t && t.defer_loading),
)
// Capture everything that could affect the server-side cache key.
// Pass latched header values (not live state) so break detection
// reflects what we actually send, not what the user toggled.
recordPromptState({
system,
toolSchemas: toolsForCacheDetection,
querySource: options.querySource,
model: options.model,
agentId: options.agentId,
fastMode: fastModeHeaderLatched,
globalCacheStrategy,
betas,
autoModeActive: afkHeaderLatched,
isUsingOverage: currentLimits.isUsingOverage ?? false,
cachedMCEnabled: cacheEditingHeaderLatched,
effortValue: effort,
extraBodyParams: getExtraBodyParams(),
})
}
const newContext: LLMRequestNewContext | undefined = isBetaTracingEnabled()
? {
systemPrompt: systemPrompt.join('\n\n'),
querySource: options.querySource,
tools: jsonStringify(allTools),
}
: undefined
// Capture the span so we can pass it to endLLMRequestSpan later
// This ensures responses are matched to the correct request when multiple requests run in parallel
const llmSpan = startLLMRequestSpan(
options.model,
newContext,
messagesForAPI,
isFastMode,
)
const startIncludingRetries = Date.now()
let start = Date.now()
let attemptNumber = 0
const attemptStartTimes: number[] = []
let stream: Stream<BetaRawMessageStreamEvent> | undefined = undefined
let streamRequestId: string | null | undefined = undefined
let clientRequestId: string | undefined = undefined
// eslint-disable-next-line eslint-plugin-n/no-unsupported-features/node-builtins -- Response is available in Node 18+ and is used by the SDK
let streamResponse: Response | undefined = undefined
// Release all stream resources to prevent native memory leaks.
// The Response object holds native TLS/socket buffers that live outside the
// V8 heap (observed on the Node.js/npm path; see GH #32920), so we must
// explicitly cancel and release it regardless of how the generator exits.
function releaseStreamResources(): void {
cleanupStream(stream)
stream = undefined
if (streamResponse) {
streamResponse.body?.cancel().catch(() => {})
streamResponse = undefined
}
}
// Consume pending cache edits ONCE before paramsFromContext is defined.
// paramsFromContext is called multiple times (logging, retries), so consuming
// inside it would cause the first call to steal edits from subsequent calls.
const consumedCacheEdits = cachedMCEnabled ? consumePendingCacheEdits() : null
const consumedPinnedEdits = cachedMCEnabled ? getPinnedCacheEdits() : []
// Capture the betas sent in the last API request, including the ones that
// were dynamically added, so we can log and send it to telemetry.
let lastRequestBetas: string[] | undefined
const paramsFromContext = (retryContext: RetryContext) => {
const betasParams = [...betas]
// Append 1M beta dynamically for the Sonnet 1M experiment.
if (
!betasParams.includes(CONTEXT_1M_BETA_HEADER) &&
getSonnet1mExpTreatmentEnabled(retryContext.model)
) {
betasParams.push(CONTEXT_1M_BETA_HEADER)
}
// For Bedrock, include both model-based betas and dynamically-added tool search header
const bedrockBetas =
getAPIProvider() === 'bedrock'
? [
...getBedrockExtraBodyParamsBetas(retryContext.model),
...(toolSearchHeader ? [toolSearchHeader] : []),
]
: []
const extraBodyParams = getExtraBodyParams(bedrockBetas)
const outputConfig: BetaOutputConfig = {
...((extraBodyParams.output_config as BetaOutputConfig) ?? {}),
}
configureEffortParams(
effort,
outputConfig,
extraBodyParams,
betasParams,
options.model,
)
configureTaskBudgetParams(
options.taskBudget,
outputConfig as BetaOutputConfig & { task_budget?: TaskBudgetParam },
betasParams,
)
// Merge outputFormat into extraBodyParams.output_config alongside effort
// Requires structured-outputs beta header per SDK (see parse() in messages.mjs)
if (options.outputFormat && !('format' in outputConfig)) {
outputConfig.format = options.outputFormat as BetaJSONOutputFormat
// Add beta header if not already present and provider supports it
if (
modelSupportsStructuredOutputs(options.model) &&
!betasParams.includes(STRUCTURED_OUTPUTS_BETA_HEADER)
) {
betasParams.push(STRUCTURED_OUTPUTS_BETA_HEADER)
}
}
// Retry context gets preference because it tries to course correct if we exceed the context window limit
const maxOutputTokens =
retryContext?.maxTokensOverride ||
options.maxOutputTokensOverride ||
getMaxOutputTokensForModel(options.model)
const hasThinking =
thinkingConfig.type !== 'disabled' &&
!isEnvTruthy(process.env.CLAUDE_CODE_DISABLE_THINKING)
let thinking: BetaMessageStreamParams['thinking'] | undefined = undefined
// IMPORTANT: Do not change the adaptive-vs-budget thinking selection below
// without notifying the model launch DRI and research. This is a sensitive
// setting that can greatly affect model quality and bashing.
if (hasThinking && modelSupportsThinking(options.model)) {
if (
!isEnvTruthy(process.env.CLAUDE_CODE_DISABLE_ADAPTIVE_THINKING) &&
modelSupportsAdaptiveThinking(options.model)
) {
// For models that support adaptive thinking, always use adaptive
// thinking without a budget.
thinking = {
type: 'adaptive',
} satisfies BetaMessageStreamParams['thinking']
} else {
// For models that do not support adaptive thinking, use the default
// thinking budget unless explicitly specified.
let thinkingBudget = getMaxThinkingTokensForModel(options.model)
if (
thinkingConfig.type === 'enabled' &&
thinkingConfig.budgetTokens !== undefined
) {
thinkingBudget = thinkingConfig.budgetTokens
}
thinkingBudget = Math.min(maxOutputTokens - 1, thinkingBudget)
thinking = {
budget_tokens: thinkingBudget,
type: 'enabled',
} satisfies BetaMessageStreamParams['thinking']
}
}
// Get API context management strategies if enabled
const contextManagement = getAPIContextManagement({
hasThinking,
isRedactThinkingActive: betasParams.includes(REDACT_THINKING_BETA_HEADER),
clearAllThinking: thinkingClearLatched,
})
const enablePromptCaching =
options.enablePromptCaching ?? getPromptCachingEnabled(retryContext.model)
// Fast mode: header is latched session-stable (cache-safe), but
// `speed='fast'` stays dynamic so cooldown still suppresses the actual
// fast-mode request without changing the cache key.
let speed: BetaMessageStreamParams['speed']
const isFastModeForRetry =
isFastModeEnabled() &&
isFastModeAvailable() &&
!isFastModeCooldown() &&
isFastModeSupportedByModel(options.model) &&
!!retryContext.fastMode
if (isFastModeForRetry) {
speed = 'fast'
}
if (fastModeHeaderLatched && !betasParams.includes(FAST_MODE_BETA_HEADER)) {
betasParams.push(FAST_MODE_BETA_HEADER)
}
// AFK mode beta: latched once auto mode is first activated. Still gated
// by isAgenticQuery per-call so classifiers/compaction don't get it.
if (feature('TRANSCRIPT_CLASSIFIER')) {
if (
afkHeaderLatched &&
shouldIncludeFirstPartyOnlyBetas() &&
isAgenticQuery &&
!betasParams.includes(AFK_MODE_BETA_HEADER)
) {
betasParams.push(AFK_MODE_BETA_HEADER)
}
}
// Cache editing beta: header is latched session-stable; useCachedMC
// (controls cache_edits body behavior) stays live so edits stop when
// the feature disables but the header doesn't flip.
const useCachedMC =
cachedMCEnabled &&
getAPIProvider() === 'firstParty' &&
options.querySource === 'repl_main_thread'
if (
cacheEditingHeaderLatched &&
getAPIProvider() === 'firstParty' &&
options.querySource === 'repl_main_thread' &&
!betasParams.includes(cacheEditingBetaHeader)
) {
betasParams.push(cacheEditingBetaHeader)
logForDebugging(
'Cache editing beta header enabled for cached microcompact',
)
}
// Only send temperature when thinking is disabled β the API requires
// temperature: 1 when thinking is enabled, which is already the default.
const temperature = !hasThinking
? (options.temperatureOverride ?? 1)
: undefined
lastRequestBetas = betasParams
return {
model: normalizeModelStringForAPI(options.model),
messages: addCacheBreakpoints(
messagesForAPI,
enablePromptCaching,
options.querySource,
useCachedMC,
consumedCacheEdits,
consumedPinnedEdits,
options.skipCacheWrite,
),
system,
tools: allTools,
tool_choice: options.toolChoice,
...(useBetas && { betas: betasParams }),
metadata: getAPIMetadata(),
max_tokens: maxOutputTokens,
thinking,
...(temperature !== undefined && { temperature }),
...(contextManagement &&
useBetas &&
betasParams.includes(CONTEXT_MANAGEMENT_BETA_HEADER) && {
context_management: contextManagement,
}),
...extraBodyParams,
...(Object.keys(outputConfig).length > 0 && {
output_config: outputConfig,
}),
...(speed !== undefined && { speed }),
}
}
// Compute log scalars synchronously so the fire-and-forget .then() closure
// captures only primitives instead of paramsFromContext's full closure scope
// (messagesForAPI, system, allTools, betas β the entire request-building
// context), which would otherwise be pinned until the promise resolves.
{
const queryParams = paramsFromContext({
model: options.model,
thinkingConfig,
})
const logMessagesLength = queryParams.messages.length
const logBetas = useBetas ? (queryParams.betas ?? []) : []
const logThinkingType = queryParams.thinking?.type ?? 'disabled'
const logEffortValue = queryParams.output_config?.effort
void options.getToolPermissionContext().then(permissionContext => {
logAPIQuery({
model: options.model,
messagesLength: logMessagesLength,
temperature: options.temperatureOverride ?? 1,
betas: logBetas,
permissionMode: permissionContext.mode,
querySource: options.querySource,
queryTracking: options.queryTracking,
thinkingType: logThinkingType,
effortValue: logEffortValue,
fastMode: isFastMode,
previousRequestId,
})
})
}
const newMessages: AssistantMessage[] = []
let ttftMs = 0
let partialMessage: BetaMessage | undefined = undefined
const contentBlocks: (BetaContentBlock | ConnectorTextBlock)[] = []
let usage: NonNullableUsage = EMPTY_USAGE
let costUSD = 0
let stopReason: BetaStopReason | null = null
let didFallBackToNonStreaming = false
let fallbackMessage: AssistantMessage | undefined
let maxOutputTokens = 0
let responseHeaders: globalThis.Headers | undefined = undefined
let research: unknown = undefined
let isFastModeRequest = isFastMode // Keep separate state as it may change if falling back
let isAdvisorInProgress = false
try {
queryCheckpoint('query_client_creation_start')
const generator = withRetry(
() =>
getAnthropicClient({
maxRetries: 0, // Disabled auto-retry in favor of manual implementation
model: options.model,
fetchOverride: options.fetchOverride,
source: options.querySource,
}),
async (anthropic, attempt, context) => {
attemptNumber = attempt
isFastModeRequest = context.fastMode ?? false
start = Date.now()
attemptStartTimes.push(start)
// Client has been created by withRetry's getClient() call. This fires
// once per attempt; on retries the client is usually cached (withRetry
// only calls getClient() again after auth errors), so the delta from
// client_creation_start is meaningful on attempt 1.
queryCheckpoint('query_client_creation_end')
const params = paramsFromContext(context)
captureAPIRequest(params, options.querySource) // Capture for bug reports
maxOutputTokens = params.max_tokens
// Fire immediately before the fetch is dispatched. .withResponse() below
// awaits until response headers arrive, so this MUST be before the await
// or the "Network TTFB" phase measurement is wrong.
queryCheckpoint('query_api_request_sent')
if (!options.agentId) {
headlessProfilerCheckpoint('api_request_sent')
}
// Generate and track client request ID so timeouts (which return no
// server request ID) can still be correlated with server logs.
// First-party only β 3P providers don't log it (inc-4029 class).
clientRequestId =
getAPIProvider() === 'firstParty' && isFirstPartyAnthropicBaseUrl()
? randomUUID()
: undefined
// Use raw stream instead of BetaMessageStream to avoid O(nΒ²) partial JSON parsing
// BetaMessageStream calls partialParse() on every input_json_delta, which we don't need
// since we handle tool input accumulation ourselves
// biome-ignore lint/plugin: main conversation loop handles attribution separately
const result = await anthropic.beta.messages
.create(
{ ...params, stream: true },
{
signal,
...(clientRequestId && {
headers: { [CLIENT_REQUEST_ID_HEADER]: clientRequestId },
}),
},
)
.withResponse()
queryCheckpoint('query_response_headers_received')
streamRequestId = result.request_id
streamResponse = result.response
return result.data
},
{
model: options.model,
fallbackModel: options.fallbackModel,
thinkingConfig,
...(isFastModeEnabled() ? { fastMode: isFastMode } : false),
signal,
querySource: options.querySource,
},
)
let e
do {
e = await generator.next()
// yield API error messages (the stream has a 'controller' property, error messages don't)
if (!('controller' in e.value)) {
yield e.value
}
} while (!e.done)
stream = e.value as Stream<BetaRawMessageStreamEvent>
// reset state
newMessages.length = 0
ttftMs = 0
partialMessage = undefined
contentBlocks.length = 0
usage = EMPTY_USAGE
stopReason = null
isAdvisorInProgress = false
// Streaming idle timeout watchdog: abort the stream if no chunks arrive
// for STREAM_IDLE_TIMEOUT_MS. Unlike the stall detection below (which only
// fires when the *next* chunk arrives), this uses setTimeout to actively
// kill hung streams. Without this, a silently dropped connection can hang
// the session indefinitely since the SDK's request timeout only covers the
// initial fetch(), not the streaming body.
const streamWatchdogEnabled = isEnvTruthy(
process.env.CLAUDE_ENABLE_STREAM_WATCHDOG,
)
const STREAM_IDLE_TIMEOUT_MS =
parseInt(process.env.CLAUDE_STREAM_IDLE_TIMEOUT_MS || '', 10) || 90_000
const STREAM_IDLE_WARNING_MS = STREAM_IDLE_TIMEOUT_MS / 2
let streamIdleAborted = false
// performance.now() snapshot when watchdog fires, for measuring abort propagation delay
let streamWatchdogFiredAt: number | null = null
let streamIdleWarningTimer: ReturnType<typeof setTimeout> | null = null
let streamIdleTimer: ReturnType<typeof setTimeout> | null = null
function clearStreamIdleTimers(): void {
if (streamIdleWarningTimer !== null) {
clearTimeout(streamIdleWarningTimer)
streamIdleWarningTimer = null
}
if (streamIdleTimer !== null) {
clearTimeout(streamIdleTimer)
streamIdleTimer = null
}
}
function resetStreamIdleTimer(): void {
clearStreamIdleTimers()
if (!streamWatchdogEnabled) {
return
}
streamIdleWarningTimer = setTimeout(
warnMs => {
logForDebugging(
`Streaming idle warning: no chunks received for ${warnMs / 1000}s`,
{ level: 'warn' },
)
logForDiagnosticsNoPII('warn', 'cli_streaming_idle_warning')
},
STREAM_IDLE_WARNING_MS,
STREAM_IDLE_WARNING_MS,
)
streamIdleTimer = setTimeout(() => {
streamIdleAborted = true
streamWatchdogFiredAt = performance.now()
logForDebugging(
`Streaming idle timeout: no chunks received for ${STREAM_IDLE_TIMEOUT_MS / 1000}s, aborting stream`,
{ level: 'error' },
)
logForDiagnosticsNoPII('error', 'cli_streaming_idle_timeout')
logEvent('tengu_streaming_idle_timeout', {
model:
options.model as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
request_id: (streamRequestId ??
'unknown') as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
timeout_ms: STREAM_IDLE_TIMEOUT_MS,
})
releaseStreamResources()
}, STREAM_IDLE_TIMEOUT_MS)
}
resetStreamIdleTimer()
startSessionActivity('api_call')
try {
// stream in and accumulate state
let isFirstChunk = true
let lastEventTime: number | null = null // Set after first chunk to avoid measuring TTFB as a stall
const STALL_THRESHOLD_MS = 30_000 // 30 seconds
let totalStallTime = 0
let stallCount = 0
for await (const part of stream) {
resetStreamIdleTimer()
const now = Date.now()
// Detect and log streaming stalls (only after first event to avoid counting TTFB)
if (lastEventTime !== null) {
const timeSinceLastEvent = now - lastEventTime
if (timeSinceLastEvent > STALL_THRESHOLD_MS) {
stallCount++
totalStallTime += timeSinceLastEvent
logForDebugging(
`Streaming stall detected: ${(timeSinceLastEvent / 1000).toFixed(1)}s gap between events (stall #${stallCount})`,
{ level: 'warn' },
)
logEvent('tengu_streaming_stall', {
stall_duration_ms: timeSinceLastEvent,
stall_count: stallCount,
total_stall_time_ms: totalStallTime,
event_type:
part.type as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
model:
options.model as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
request_id: (streamRequestId ??
'unknown') as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
})
}
}
lastEventTime = now
if (isFirstChunk) {
logForDebugging('Stream started - received first chunk')
queryCheckpoint('query_first_chunk_received')
if (!options.agentId) {
headlessProfilerCheckpoint('first_chunk')
}
endQueryProfile()
isFirstChunk = false
}
switch (part.type) {
case 'message_start': {
partialMessage = part.message
ttftMs = Date.now() - start
usage = updateUsage(usage, part.message?.usage)
// Capture research from message_start if available (internal only).
// Always overwrite with the latest value.
if (
process.env.USER_TYPE === 'ant' &&
'research' in (part.message as unknown as Record<string, unknown>)
) {
research = (part.message as unknown as Record<string, unknown>)
.research
}
break
}
case 'content_block_start':
switch (part.content_block.type) {
case 'tool_use':
contentBlocks[part.index] = {
...part.content_block,
input: '',
}
break
case 'server_tool_use':
contentBlocks[part.index] = {
...part.content_block,
input: '' as unknown as { [key: string]: unknown },
}
if ((part.content_block.name as string) === 'advisor') {
isAdvisorInProgress = true
logForDebugging(`[AdvisorTool] Advisor tool called`)
logEvent('tengu_advisor_tool_call', {
model:
options.model as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
advisor_model: (advisorModel ??
'unknown') as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
})
}
break
case 'text':
contentBlocks[part.index] = {
...part.content_block,
// awkwardly, the sdk sometimes returns text as part of a
// content_block_start message, then returns the same text
// again in a content_block_delta message. we ignore it here
// since there doesn't seem to be a way to detect when a
// content_block_delta message duplicates the text.
text: '',
}
break
case 'thinking':
contentBlocks[part.index] = {
...part.content_block,
// also awkward
thinking: '',
// initialize signature to ensure field exists even if signature_delta never arrives
signature: '',
}
break
default:
// even more awkwardly, the sdk mutates the contents of text blocks
// as it works. we want the blocks to be immutable, so that we can
// accumulate state ourselves.
contentBlocks[part.index] = { ...part.content_block }
if (
(part.content_block.type as string) === 'advisor_tool_result'
) {
isAdvisorInProgress = false
logForDebugging(`[AdvisorTool] Advisor tool result received`)
}
break
}
break
case 'content_block_delta': {
const contentBlock = contentBlocks[part.index]
const delta = part.delta as typeof part.delta | ConnectorTextDelta
if (!contentBlock) {
logEvent('tengu_streaming_error', {
error_type:
'content_block_not_found_delta' as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
part_type:
part.type as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
part_index: part.index,
})
throw new RangeError('Content block not found')
}
if (
feature('CONNECTOR_TEXT') &&
delta.type === 'connector_text_delta'
) {
if (contentBlock.type !== 'connector_text') {
logEvent('tengu_streaming_error', {
error_type:
'content_block_type_mismatch_connector_text' as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
expected_type:
'connector_text' as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
actual_type:
contentBlock.type as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
})
throw new Error('Content block is not a connector_text block')
}
contentBlock.connector_text += delta.connector_text
} else {
switch (delta.type) {
case 'citations_delta':
// TODO: handle citations
break
case 'input_json_delta':
if (
contentBlock.type !== 'tool_use' &&
contentBlock.type !== 'server_tool_use'
) {
logEvent('tengu_streaming_error', {
error_type:
'content_block_type_mismatch_input_json' as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
expected_type:
'tool_use' as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
actual_type:
contentBlock.type as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
})
throw new Error('Content block is not a input_json block')
}
if (typeof contentBlock.input !== 'string') {
logEvent('tengu_streaming_error', {
error_type:
'content_block_input_not_string' as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
input_type:
typeof contentBlock.input as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
})
throw new Error('Content block input is not a string')
}
contentBlock.input += delta.partial_json
break
case 'text_delta':
if (contentBlock.type !== 'text') {
logEvent('tengu_streaming_error', {
error_type:
'content_block_type_mismatch_text' as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
expected_type:
'text' as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
actual_type:
contentBlock.type as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
})
throw new Error('Content block is not a text block')
}
contentBlock.text += delta.text
break
case 'signature_delta':
if (
feature('CONNECTOR_TEXT') &&
contentBlock.type === 'connector_text'
) {
contentBlock.signature = delta.signature
break
}
if (contentBlock.type !== 'thinking') {
logEvent('tengu_streaming_error', {
error_type:
'content_block_type_mismatch_thinking_signature' as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
expected_type:
'thinking' as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
actual_type:
contentBlock.type as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
})
throw new Error('Content block is not a thinking block')
}
contentBlock.signature = delta.signature
break
case 'thinking_delta':
if (contentBlock.type !== 'thinking') {
logEvent('tengu_streaming_error', {
error_type:
'content_block_type_mismatch_thinking_delta' as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
expected_type:
'thinking' as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
actual_type:
contentBlock.type as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
})
throw new Error('Content block is not a thinking block')
}
contentBlock.thinking += delta.thinking
break
}
}
// Capture research from content_block_delta if available (internal only).
// Always overwrite with the latest value.
if (process.env.USER_TYPE === 'ant' && 'research' in part) {
research = (part as { research: unknown }).research
}
break
}
case 'content_block_stop': {
const contentBlock = contentBlocks[part.index]
if (!contentBlock) {
logEvent('tengu_streaming_error', {
error_type:
'content_block_not_found_stop' as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
part_type:
part.type as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
part_index: part.index,
})
throw new RangeError('Content block not found')
}
if (!partialMessage) {
logEvent('tengu_streaming_error', {
error_type:
'partial_message_not_found' as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
part_type:
part.type as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
})
throw new Error('Message not found')
}
const m: AssistantMessage = {
message: {
...partialMessage,
content: normalizeContentFromAPI(
[contentBlock] as BetaContentBlock[],
tools,
options.agentId,
),
},
requestId: streamRequestId ?? undefined,
type: 'assistant',
uuid: randomUUID(),
timestamp: new Date().toISOString(),
...(process.env.USER_TYPE === 'ant' &&
research !== undefined && { research }),
...(advisorModel && { advisorModel }),
}
newMessages.push(m)
yield m
break
}
case 'message_delta': {
usage = updateUsage(usage, part.usage)
// Capture research from message_delta if available (internal only).
// Always overwrite with the latest value. Also write back to
// already-yielded messages since message_delta arrives after
// content_block_stop.
if (
process.env.USER_TYPE === 'ant' &&
'research' in (part as unknown as Record<string, unknown>)
) {
research = (part as unknown as Record<string, unknown>).research
for (const msg of newMessages) {
msg.research = research
}
}
// Write final usage and stop_reason back to the last yielded
// message. Messages are created at content_block_stop from
// partialMessage, which was set at message_start before any tokens
// were generated (output_tokens: 0, stop_reason: null).
// message_delta arrives after content_block_stop with the real
// values.
//
// IMPORTANT: Use direct property mutation, not object replacement.
// The transcript write queue holds a reference to message.message
// and serializes it lazily (100ms flush interval). Object
// replacement ({ ...lastMsg.message, usage }) would disconnect
// the queued reference; direct mutation ensures the transcript
// captures the final values.
stopReason = part.delta.stop_reason
const lastMsg = newMessages.at(-1)
if (lastMsg) {
lastMsg.message.usage = usage
lastMsg.message.stop_reason = stopReason
}
// Update cost
const costUSDForPart = calculateUSDCost(resolvedModel, usage)
costUSD += addToTotalSessionCost(
costUSDForPart,
usage,
options.model,
)
const refusalMessage = getErrorMessageIfRefusal(
part.delta.stop_reason,
options.model,
)
if (refusalMessage) {
yield refusalMessage
}
if (stopReason === 'max_tokens') {
logEvent('tengu_max_tokens_reached', {
max_tokens: maxOutputTokens,
})
yield createAssistantAPIErrorMessage({
content: `${API_ERROR_MESSAGE_PREFIX}: Claude's response exceeded the ${
maxOutputTokens
} output token maximum. To configure this behavior, set the CLAUDE_CODE_MAX_OUTPUT_TOKENS environment variable.`,
apiError: 'max_output_tokens',
error: 'max_output_tokens',
})
}
if (stopReason === 'model_context_window_exceeded') {
logEvent('tengu_context_window_exceeded', {
max_tokens: maxOutputTokens,
output_tokens: usage.output_tokens,
})
// Reuse the max_output_tokens recovery path β from the model's
// perspective, both mean "response was cut off, continue from
// where you left off."
yield createAssistantAPIErrorMessage({
content: `${API_ERROR_MESSAGE_PREFIX}: The model has reached its context window limit.`,
apiError: 'max_output_tokens',
error: 'max_output_tokens',
})
}
break
}
case 'message_stop':
break
}
yield {
type: 'stream_event',
event: part,
...(part.type === 'message_start' ? { ttftMs } : undefined),
}
}
// Clear the idle timeout watchdog now that the stream loop has exited
clearStreamIdleTimers()
// If the stream was aborted by our idle timeout watchdog, fall back to
// non-streaming retry rather than treating it as a completed stream.
if (streamIdleAborted) {
// Instrumentation: proves the for-await exited after the watchdog fired
// (vs. hung forever). exit_delay_ms measures abort propagation latency:
// 0-10ms = abort worked; >>1000ms = something else woke the loop.
const exitDelayMs =
streamWatchdogFiredAt !== null
? Math.round(performance.now() - streamWatchdogFiredAt)
: -1
logForDiagnosticsNoPII(
'info',
'cli_stream_loop_exited_after_watchdog_clean',
)
logEvent('tengu_stream_loop_exited_after_watchdog', {
request_id: (streamRequestId ??
'unknown') as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
exit_delay_ms: exitDelayMs,
exit_path:
'clean' as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
model:
options.model as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
})
// Prevent double-emit: this throw lands in the catch block below,
// whose exit_path='error' probe guards on streamWatchdogFiredAt.
streamWatchdogFiredAt = null
throw new Error('Stream idle timeout - no chunks received')
}
// Detect when the stream completed without producing any assistant messages.
// This covers two proxy failure modes:
// 1. No events at all (!partialMessage): proxy returned 200 with non-SSE body
// 2. Partial events (partialMessage set but no content blocks completed AND
// no stop_reason received): proxy returned message_start but stream ended
// before content_block_stop and before message_delta with stop_reason
// BetaMessageStream had the first check in _endRequest() but the raw Stream
// does not - without it the generator silently returns no assistant messages,
// causing "Execution error" in -p mode.
// Note: We must check stopReason to avoid false positives. For example, with
// structured output (--json-schema), the model calls a StructuredOutput tool
// on turn 1, then on turn 2 responds with end_turn and no content blocks.
// That's a legitimate empty response, not an incomplete stream.
if (!partialMessage || (newMessages.length === 0 && !stopReason)) {
logForDebugging(
!partialMessage
? 'Stream completed without receiving message_start event - triggering non-streaming fallback'
: 'Stream completed with message_start but no content blocks completed - triggering non-streaming fallback',
{ level: 'error' },
)
logEvent('tengu_stream_no_events', {
model:
options.model as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
request_id: (streamRequestId ??
'unknown') as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
})
throw new Error('Stream ended without receiving any events')
}
// Log summary if any stalls occurred during streaming
if (stallCount > 0) {
logForDebugging(
`Streaming completed with ${stallCount} stall(s), total stall time: ${(totalStallTime / 1000).toFixed(1)}s`,
{ level: 'warn' },
)
logEvent('tengu_streaming_stall_summary', {
stall_count: stallCount,
total_stall_time_ms: totalStallTime,
model:
options.model as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
request_id: (streamRequestId ??
'unknown') as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
})
}
// Check if the cache actually broke based on response tokens
if (feature('PROMPT_CACHE_BREAK_DETECTION')) {
void checkResponseForCacheBreak(
options.querySource,
usage.cache_read_input_tokens,
usage.cache_creation_input_tokens,
messages,
options.agentId,
streamRequestId,
)
}
// Process fallback percentage header and quota status if available
// streamResponse is set when the stream is created in the withRetry callback above
// TypeScript's control flow analysis can't track that streamResponse is set in the callback
// eslint-disable-next-line eslint-plugin-n/no-unsupported-features/node-builtins
const resp = streamResponse as unknown as Response | undefined
if (resp) {
extractQuotaStatusFromHeaders(resp.headers)
// Store headers for gateway detection
responseHeaders = resp.headers
}
} catch (streamingError) {
// Clear the idle timeout watchdog on error path too
clearStreamIdleTimers()
// Instrumentation: if the watchdog had already fired and the for-await
// threw (rather than exiting cleanly), record that the loop DID exit and
// how long after the watchdog. Distinguishes true hangs from error exits.
if (streamIdleAborted && streamWatchdogFiredAt !== null) {
const exitDelayMs = Math.round(
performance.now() - streamWatchdogFiredAt,
)
logForDiagnosticsNoPII(
'info',
'cli_stream_loop_exited_after_watchdog_error',
)
logEvent('tengu_stream_loop_exited_after_watchdog', {
request_id: (streamRequestId ??
'unknown') as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
exit_delay_ms: exitDelayMs,
exit_path:
'error' as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
error_name:
streamingError instanceof Error
? (streamingError.name as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS)
: ('unknown' as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS),
model:
options.model as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
})
}
if (streamingError instanceof APIUserAbortError) {
// Check if the abort signal was triggered by the user (ESC key)
// If the signal is aborted, it's a user-initiated abort
// If not, it's likely a timeout from the SDK
if (signal.aborted) {
// This is a real user abort (ESC key was pressed)
logForDebugging(
`Streaming aborted by user: ${errorMessage(streamingError)}`,
)
if (isAdvisorInProgress) {
logEvent('tengu_advisor_tool_interrupted', {
model:
options.model as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
advisor_model: (advisorModel ??
'unknown') as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
})
}
throw streamingError
} else {
// The SDK threw APIUserAbortError but our signal wasn't aborted
// This means it's a timeout from the SDK's internal timeout
logForDebugging(
`Streaming timeout (SDK abort): ${streamingError.message}`,
{ level: 'error' },
)
// Throw a more specific error for timeout
throw new APIConnectionTimeoutError({ message: 'Request timed out' })
}
}
// When the flag is enabled, skip the non-streaming fallback and let the
// error propagate to withRetry. The mid-stream fallback causes double tool
// execution when streaming tool execution is active: the partial stream
// starts a tool, then the non-streaming retry produces the same tool_use
// and runs it again. See inc-4258.
const disableFallback =
isEnvTruthy(process.env.CLAUDE_CODE_DISABLE_NONSTREAMING_FALLBACK) ||
getFeatureValue_CACHED_MAY_BE_STALE(
'tengu_disable_streaming_to_non_streaming_fallback',
false,
)
if (disableFallback) {
logForDebugging(
`Error streaming (non-streaming fallback disabled): ${errorMessage(streamingError)}`,
{ level: 'error' },
)
logEvent('tengu_streaming_fallback_to_non_streaming', {
model:
options.model as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
error:
streamingError instanceof Error
? (streamingError.name as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS)
: (String(
streamingError,
) as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS),
attemptNumber,
maxOutputTokens,
thinkingType:
thinkingConfig.type as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
fallback_disabled: true,
request_id: (streamRequestId ??
'unknown') as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
fallback_cause: (streamIdleAborted
? 'watchdog'
: 'other') as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
})
throw streamingError
}
logForDebugging(
`Error streaming, falling back to non-streaming mode: ${errorMessage(streamingError)}`,
{ level: 'error' },
)
didFallBackToNonStreaming = true
if (options.onStreamingFallback) {
options.onStreamingFallback()
}
logEvent('tengu_streaming_fallback_to_non_streaming', {
model:
options.model as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
error:
streamingError instanceof Error
? (streamingError.name as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS)
: (String(
streamingError,
) as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS),
attemptNumber,
maxOutputTokens,
thinkingType:
thinkingConfig.type as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
fallback_disabled: false,
request_id: (streamRequestId ??
'unknown') as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
fallback_cause: (streamIdleAborted
? 'watchdog'
: 'other') as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
})
// Fall back to non-streaming mode with retries.
// If the streaming failure was itself a 529, count it toward the
// consecutive-529 budget so total 529s-before-model-fallback is the
// same whether the overload was hit in streaming or non-streaming mode.
// This is a speculative fix for https://github.com/anthropics/claude-code/issues/1513
// Instrumentation: proves executeNonStreamingRequest was entered (vs. the
// fallback event firing but the call itself hanging at dispatch).
logForDiagnosticsNoPII('info', 'cli_nonstreaming_fallback_started')
logEvent('tengu_nonstreaming_fallback_started', {
request_id: (streamRequestId ??
'unknown') as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
model:
options.model as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
fallback_cause: (streamIdleAborted
? 'watchdog'
: 'other') as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
})
const result = yield* executeNonStreamingRequest(
{ model: options.model, source: options.querySource },
{
model: options.model,
fallbackModel: options.fallbackModel,
thinkingConfig,
...(isFastModeEnabled() && { fastMode: isFastMode }),
signal,
initialConsecutive529Errors: is529Error(streamingError) ? 1 : 0,
querySource: options.querySource,
},
paramsFromContext,
(attempt, _startTime, tokens) => {
attemptNumber = attempt
maxOutputTokens = tokens
},
params => captureAPIRequest(params, options.querySource),
streamRequestId,
)
const m: AssistantMessage = {
message: {
...result,
content: normalizeContentFromAPI(
result.content,
tools,
options.agentId,
),
},
requestId: streamRequestId ?? undefined,
type: 'assistant',
uuid: randomUUID(),
timestamp: new Date().toISOString(),
...(process.env.USER_TYPE === 'ant' &&
research !== undefined && {
research,
}),
...(advisorModel && {
advisorModel,
}),
}
newMessages.push(m)
fallbackMessage = m
yield m
} finally {
clearStreamIdleTimers()
}
} catch (errorFromRetry) {
// FallbackTriggeredError must propagate to query.ts, which performs the
// actual model switch. Swallowing it here would turn the fallback into a
// no-op β the user would just see "Model fallback triggered: X -> Y" as
// an error message with no actual retry on the fallback model.
if (errorFromRetry instanceof FallbackTriggeredError) {
throw errorFromRetry
}
// Check if this is a 404 error during stream creation that should trigger
// non-streaming fallback. This handles gateways that return 404 for streaming
// endpoints but work fine with non-streaming. Before v2.1.8, BetaMessageStream
// threw 404s during iteration (caught by inner catch with fallback), but now
// with raw streams, 404s are thrown during creation (caught here).
const is404StreamCreationError =
!didFallBackToNonStreaming &&
errorFromRetry instanceof CannotRetryError &&
errorFromRetry.originalError instanceof APIError &&
errorFromRetry.originalError.status === 404
if (is404StreamCreationError) {
// 404 is thrown at .withResponse() before streamRequestId is assigned,
// and CannotRetryError means every retry failed β so grab the failed
// request's ID from the error header instead.
const failedRequestId =
(errorFromRetry.originalError as APIError).requestID ?? 'unknown'
logForDebugging(
'Streaming endpoint returned 404, falling back to non-streaming mode',
{ level: 'warn' },
)
didFallBackToNonStreaming = true
if (options.onStreamingFallback) {
options.onStreamingFallback()
}
logEvent('tengu_streaming_fallback_to_non_streaming', {
model:
options.model as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
error:
'404_stream_creation' as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
attemptNumber,
maxOutputTokens,
thinkingType:
thinkingConfig.type as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
request_id:
failedRequestId as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
fallback_cause:
'404_stream_creation' as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
})
try {
// Fall back to non-streaming mode
const result = yield* executeNonStreamingRequest(
{ model: options.model, source: options.querySource },
{
model: options.model,
fallbackModel: options.fallbackModel,
thinkingConfig,
...(isFastModeEnabled() && { fastMode: isFastMode }),
signal,
},
paramsFromContext,
(attempt, _startTime, tokens) => {
attemptNumber = attempt
maxOutputTokens = tokens
},
params => captureAPIRequest(params, options.querySource),
failedRequestId,
)
const m: AssistantMessage = {
message: {
...result,
content: normalizeContentFromAPI(
result.content,
tools,
options.agentId,
),
},
requestId: streamRequestId ?? undefined,
type: 'assistant',
uuid: randomUUID(),
timestamp: new Date().toISOString(),
...(process.env.USER_TYPE === 'ant' &&
research !== undefined && { research }),
...(advisorModel && { advisorModel }),
}
newMessages.push(m)
fallbackMessage = m
yield m
// Continue to success logging below
} catch (fallbackError) {
// Propagate model-fallback signal to query.ts (see comment above).
if (fallbackError instanceof FallbackTriggeredError) {
throw fallbackError
}
// Fallback also failed, handle as normal error
logForDebugging(
`Non-streaming fallback also failed: ${errorMessage(fallbackError)}`,
{ level: 'error' },
)
let error = fallbackError
let errorModel = options.model
if (fallbackError instanceof CannotRetryError) {
error = fallbackError.originalError
errorModel = fallbackError.retryContext.model
}
if (error instanceof APIError) {
extractQuotaStatusFromError(error)
}
const requestId =
streamRequestId ||
(error instanceof APIError ? error.requestID : undefined) ||
(error instanceof APIError
? (error.error as { request_id?: string })?.request_id
: undefined)
logAPIError({
error,
model: errorModel,
messageCount: messagesForAPI.length,
messageTokens: tokenCountFromLastAPIResponse(messagesForAPI),
durationMs: Date.now() - start,
durationMsIncludingRetries: Date.now() - startIncludingRetries,
attempt: attemptNumber,
requestId,
clientRequestId,
didFallBackToNonStreaming,
queryTracking: options.queryTracking,
querySource: options.querySource,
llmSpan,
fastMode: isFastModeRequest,
previousRequestId,
})
if (error instanceof APIUserAbortError) {
releaseStreamResources()
return
}
yield getAssistantMessageFromError(error, errorModel, {
messages,
messagesForAPI,
})
releaseStreamResources()
return
}
} else {
// Original error handling for non-404 errors
logForDebugging(`Error in API request: ${errorMessage(errorFromRetry)}`, {
level: 'error',
})
let error = errorFromRetry
let errorModel = options.model
if (errorFromRetry instanceof CannotRetryError) {
error = errorFromRetry.originalError
errorModel = errorFromRetry.retryContext.model
}
// Extract quota status from error headers if it's a rate limit error
if (error instanceof APIError) {
extractQuotaStatusFromError(error)
}
// Extract requestId from stream, error header, or error body
const requestId =
streamRequestId ||
(error instanceof APIError ? error.requestID : undefined) ||
(error instanceof APIError
? (error.error as { request_id?: string })?.request_id
: undefined)
logAPIError({
error,
model: errorModel,
messageCount: messagesForAPI.length,
messageTokens: tokenCountFromLastAPIResponse(messagesForAPI),
durationMs: Date.now() - start,
durationMsIncludingRetries: Date.now() - startIncludingRetries,
attempt: attemptNumber,
requestId,
clientRequestId,
didFallBackToNonStreaming,
queryTracking: options.queryTracking,
querySource: options.querySource,
llmSpan,
fastMode: isFastModeRequest,
previousRequestId,
})
// Don't yield an assistant error message for user aborts
// The interruption message is handled in query.ts
if (error instanceof APIUserAbortError) {
releaseStreamResources()
return
}
yield getAssistantMessageFromError(error, errorModel, {
messages,
messagesForAPI,
})
releaseStreamResources()
return
}
} finally {
stopSessionActivity('api_call')
// Must be in the finally block: if the generator is terminated early
// via .return() (e.g. consumer breaks out of for-await-of, or query.ts
// encounters an abort), code after the try/finally never executes.
// Without this, the Response object's native TLS/socket buffers leak
// until the generator itself is GC'd (see GH #32920).
releaseStreamResources()
// Non-streaming fallback cost: the streaming path tracks cost in the
// message_delta handler before any yield. Fallback pushes to newMessages
// then yields, so tracking must be here to survive .return() at the yield.
if (fallbackMessage) {
const fallbackUsage = fallbackMessage.message.usage
usage = updateUsage(EMPTY_USAGE, fallbackUsage)
stopReason = fallbackMessage.message.stop_reason
const fallbackCost = calculateUSDCost(resolvedModel, fallbackUsage)
costUSD += addToTotalSessionCost(
fallbackCost,
fallbackUsage,
options.model,
)
}
}
// Mark all registered tools as sent to API so they become eligible for deletion
if (feature('CACHED_MICROCOMPACT') && cachedMCEnabled) {
markToolsSentToAPIState()
}
// Track the last requestId for the main conversation chain so shutdown
// can send a cache eviction hint to inference. Exclude backgrounded
// sessions (Ctrl+B) which share the repl_main_thread querySource but
// run inside an agent context β they are independent conversation chains
// whose cache should not be evicted when the foreground session clears.
if (
streamRequestId &&
!getAgentContext() &&
(options.querySource.startsWith('repl_main_thread') ||
options.querySource === 'sdk')
) {
setLastMainRequestId(streamRequestId)
}
// Precompute scalars so the fire-and-forget .then() closure doesn't pin the
// full messagesForAPI array (the entire conversation up to the context window
// limit) until getToolPermissionContext() resolves.
const logMessageCount = messagesForAPI.length
const logMessageTokens = tokenCountFromLastAPIResponse(messagesForAPI)
void options.getToolPermissionContext().then(permissionContext => {
logAPISuccessAndDuration({
model:
newMessages[0]?.message.model ?? partialMessage?.model ?? options.model,
preNormalizedModel: options.model,
usage,
start,
startIncludingRetries,
attempt: attemptNumber,
messageCount: logMessageCount,
messageTokens: logMessageTokens,
requestId: streamRequestId ?? null,
stopReason,
ttftMs,
didFallBackToNonStreaming,
querySource: options.querySource,
headers: responseHeaders,
costUSD,
queryTracking: options.queryTracking,
permissionMode: permissionContext.mode,
// Pass newMessages for beta tracing - extraction happens in logging.ts
// only when beta tracing is enabled
newMessages,
llmSpan,
globalCacheStrategy,
requestSetupMs: start - startIncludingRetries,
attemptStartTimes,
fastMode: isFastModeRequest,
previousRequestId,
betas: lastRequestBetas,
})
})
// Defensive: also release on normal completion (no-op if finally already ran).
releaseStreamResources()
}
/**
* Cleans up stream resources to prevent memory leaks.
* @internal Exported for testing
*/
export function cleanupStream(
stream: Stream<BetaRawMessageStreamEvent> | undefined,
): void {
if (!stream) {
return
}
try {
// Abort the stream via its controller if not already aborted
if (!stream.controller.signal.aborted) {
stream.controller.abort()
}
} catch {
// Ignore - stream may already be closed
}
}
/**
* Updates usage statistics with new values from streaming API events.
* Note: Anthropic's streaming API provides cumulative usage totals, not incremental deltas.
* Each event contains the complete usage up to that point in the stream.
*
* Input-related tokens (input_tokens, cache_creation_input_tokens, cache_read_input_tokens)
* are typically set in message_start and remain constant. message_delta events may send
* explicit 0 values for these fields, which should not overwrite the values from message_start.
* We only update these fields if they have a non-null, non-zero value.
*/
export function updateUsage(
usage: Readonly<NonNullableUsage>,
partUsage: BetaMessageDeltaUsage | undefined,
): NonNullableUsage {
if (!partUsage) {
return { ...usage }
}
return {
input_tokens:
partUsage.input_tokens !== null && partUsage.input_tokens > 0
? partUsage.input_tokens
: usage.input_tokens,
cache_creation_input_tokens:
partUsage.cache_creation_input_tokens !== null &&
partUsage.cache_creation_input_tokens > 0
? partUsage.cache_creation_input_tokens
: usage.cache_creation_input_tokens,
cache_read_input_tokens:
partUsage.cache_read_input_tokens !== null &&
partUsage.cache_read_input_tokens > 0
? partUsage.cache_read_input_tokens
: usage.cache_read_input_tokens,
output_tokens: partUsage.output_tokens ?? usage.output_tokens,
server_tool_use: {
web_search_requests:
partUsage.server_tool_use?.web_search_requests ??
usage.server_tool_use.web_search_requests,
web_fetch_requests:
partUsage.server_tool_use?.web_fetch_requests ??
usage.server_tool_use.web_fetch_requests,
},
service_tier: usage.service_tier,
cache_creation: {
// SDK type BetaMessageDeltaUsage is missing cache_creation, but it's real!
ephemeral_1h_input_tokens:
(partUsage as BetaUsage).cache_creation?.ephemeral_1h_input_tokens ??
usage.cache_creation.ephemeral_1h_input_tokens,
ephemeral_5m_input_tokens:
(partUsage as BetaUsage).cache_creation?.ephemeral_5m_input_tokens ??
usage.cache_creation.ephemeral_5m_input_tokens,
},
// cache_deleted_input_tokens: returned by the API when cache editing
// deletes KV cache content, but not in SDK types. Kept off NonNullableUsage
// so the string is eliminated from external builds by dead code elimination.
// Uses the same > 0 guard as other token fields to prevent message_delta
// from overwriting the real value with 0.
...(feature('CACHED_MICROCOMPACT')
? {
cache_deleted_input_tokens:
(partUsage as unknown as { cache_deleted_input_tokens?: number })
.cache_deleted_input_tokens != null &&
(partUsage as unknown as { cache_deleted_input_tokens: number })
.cache_deleted_input_tokens > 0
? (partUsage as unknown as { cache_deleted_input_tokens: number })
.cache_deleted_input_tokens
: ((usage as unknown as { cache_deleted_input_tokens?: number })
.cache_deleted_input_tokens ?? 0),
}
: {}),
inference_geo: usage.inference_geo,
iterations: partUsage.iterations ?? usage.iterations,
speed: (partUsage as BetaUsage).speed ?? usage.speed,
}
}
/**
* Accumulates usage from one message into a total usage object.
* Used to track cumulative usage across multiple assistant turns.
*/
export function accumulateUsage(
totalUsage: Readonly<NonNullableUsage>,
messageUsage: Readonly<NonNullableUsage>,
): NonNullableUsage {
return {
input_tokens: totalUsage.input_tokens + messageUsage.input_tokens,
cache_creation_input_tokens:
totalUsage.cache_creation_input_tokens +
messageUsage.cache_creation_input_tokens,
cache_read_input_tokens:
totalUsage.cache_read_input_tokens + messageUsage.cache_read_input_tokens,
output_tokens: totalUsage.output_tokens + messageUsage.output_tokens,
server_tool_use: {
web_search_requests:
totalUsage.server_tool_use.web_search_requests +
messageUsage.server_tool_use.web_search_requests,
web_fetch_requests:
totalUsage.server_tool_use.web_fetch_requests +
messageUsage.server_tool_use.web_fetch_requests,
},
service_tier: messageUsage.service_tier, // Use the most recent service tier
cache_creation: {
ephemeral_1h_input_tokens:
totalUsage.cache_creation.ephemeral_1h_input_tokens +
messageUsage.cache_creation.ephemeral_1h_input_tokens,
ephemeral_5m_input_tokens:
totalUsage.cache_creation.ephemeral_5m_input_tokens +
messageUsage.cache_creation.ephemeral_5m_input_tokens,
},
// See comment in updateUsage β field is not on NonNullableUsage to keep
// the string out of external builds.
...(feature('CACHED_MICROCOMPACT')
? {
cache_deleted_input_tokens:
((totalUsage as unknown as { cache_deleted_input_tokens?: number })
.cache_deleted_input_tokens ?? 0) +
((
messageUsage as unknown as { cache_deleted_input_tokens?: number }
).cache_deleted_input_tokens ?? 0),
}
: {}),
inference_geo: messageUsage.inference_geo, // Use the most recent
iterations: messageUsage.iterations, // Use the most recent
speed: messageUsage.speed, // Use the most recent
}
}
function isToolResultBlock(
block: unknown,
): block is { type: 'tool_result'; tool_use_id: string } {
return (
block !== null &&
typeof block === 'object' &&
'type' in block &&
(block as { type: string }).type === 'tool_result' &&
'tool_use_id' in block
)
}
type CachedMCEditsBlock = {
type: 'cache_edits'
edits: { type: 'delete'; cache_reference: string }[]
}
type CachedMCPinnedEdits = {
userMessageIndex: number
block: CachedMCEditsBlock
}
// Exported for testing cache_reference placement constraints
export function addCacheBreakpoints(
messages: (UserMessage | AssistantMessage)[],
enablePromptCaching: boolean,
querySource?: QuerySource,
useCachedMC = false,
newCacheEdits?: CachedMCEditsBlock | null,
pinnedEdits?: CachedMCPinnedEdits[],
skipCacheWrite = false,
): MessageParam[] {
logEvent('tengu_api_cache_breakpoints', {
totalMessageCount: messages.length,
cachingEnabled: enablePromptCaching,
skipCacheWrite,
})
// Exactly one message-level cache_control marker per request. Mycro's
// turn-to-turn eviction (page_manager/index.rs: Index::insert) frees
// local-attention KV pages at any cached prefix position NOT in
// cache_store_int_token_boundaries. With two markers the second-to-last
// position is protected and its locals survive an extra turn even though
// nothing will ever resume from there β with one marker they're freed
// immediately. For fire-and-forget forks (skipCacheWrite) we shift the
// marker to the second-to-last message: that's the last shared-prefix
// point, so the write is a no-op merge on mycro (entry already exists)
// and the fork doesn't leave its own tail in the KVCC. Dense pages are
// refcounted and survive via the new hash either way.
const markerIndex = skipCacheWrite ? messages.length - 2 : messages.length - 1
const result = messages.map((msg, index) => {
const addCache = index === markerIndex
if (msg.type === 'user') {
return userMessageToMessageParam(
msg,
addCache,
enablePromptCaching,
querySource,
)
}
return assistantMessageToMessageParam(
msg,
addCache,
enablePromptCaching,
querySource,
)
})
if (!useCachedMC) {
return result
}
// Track all cache_references being deleted to prevent duplicates across blocks.
const seenDeleteRefs = new Set<string>()
// Helper to deduplicate a cache_edits block against already-seen deletions
const deduplicateEdits = (block: CachedMCEditsBlock): CachedMCEditsBlock => {
const uniqueEdits = block.edits.filter(edit => {
if (seenDeleteRefs.has(edit.cache_reference)) {
return false
}
seenDeleteRefs.add(edit.cache_reference)
return true
})
return { ...block, edits: uniqueEdits }
}
// Re-insert all previously-pinned cache_edits at their original positions
for (const pinned of pinnedEdits ?? []) {
const msg = result[pinned.userMessageIndex]
if (msg && msg.role === 'user') {
if (!Array.isArray(msg.content)) {
msg.content = [{ type: 'text', text: msg.content as string }]
}
const dedupedBlock = deduplicateEdits(pinned.block)
if (dedupedBlock.edits.length > 0) {
insertBlockAfterToolResults(msg.content, dedupedBlock)
}
}
}
// Insert new cache_edits into the last user message and pin them
if (newCacheEdits && result.length > 0) {
const dedupedNewEdits = deduplicateEdits(newCacheEdits)
if (dedupedNewEdits.edits.length > 0) {
for (let i = result.length - 1; i >= 0; i--) {
const msg = result[i]
if (msg && msg.role === 'user') {
if (!Array.isArray(msg.content)) {
msg.content = [{ type: 'text', text: msg.content as string }]
}
insertBlockAfterToolResults(msg.content, dedupedNewEdits)
// Pin so this block is re-sent at the same position in future calls
pinCacheEdits(i, newCacheEdits)
logForDebugging(
`Added cache_edits block with ${dedupedNewEdits.edits.length} deletion(s) to message[${i}]: ${dedupedNewEdits.edits.map(e => e.cache_reference).join(', ')}`,
)
break
}
}
}
}
// Add cache_reference to tool_result blocks that are within the cached prefix.
// Must be done AFTER cache_edits insertion since that modifies content arrays.
if (enablePromptCaching) {
// Find the last message containing a cache_control marker
let lastCCMsg = -1
for (let i = 0; i < result.length; i++) {
const msg = result[i]!
if (Array.isArray(msg.content)) {
for (const block of msg.content) {
if (block && typeof block === 'object' && 'cache_control' in block) {
lastCCMsg = i
}
}
}
}
// Add cache_reference to tool_result blocks that are strictly before
// the last cache_control marker. The API requires cache_reference to
// appear "before or on" the last cache_control β we use strict "before"
// to avoid edge cases where cache_edits splicing shifts block indices.
//
// Create new objects instead of mutating in-place to avoid contaminating
// blocks reused by secondary queries that use models without cache_editing support.
if (lastCCMsg >= 0) {
for (let i = 0; i < lastCCMsg; i++) {
const msg = result[i]!
if (msg.role !== 'user' || !Array.isArray(msg.content)) {
continue
}
let cloned = false
for (let j = 0; j < msg.content.length; j++) {
const block = msg.content[j]
if (block && isToolResultBlock(block)) {
if (!cloned) {
msg.content = [...msg.content]
cloned = true
}
msg.content[j] = Object.assign({}, block, {
cache_reference: block.tool_use_id,
})
}
}
}
}
}
return result
}
export function buildSystemPromptBlocks(
systemPrompt: SystemPrompt,
enablePromptCaching: boolean,
options?: {
skipGlobalCacheForSystemPrompt?: boolean
querySource?: QuerySource
},
): TextBlockParam[] {
// IMPORTANT: Do not add any more blocks for caching or you will get a 400
return splitSysPromptPrefix(systemPrompt, {
skipGlobalCacheForSystemPrompt: options?.skipGlobalCacheForSystemPrompt,
}).map(block => {
return {
type: 'text' as const,
text: block.text,
...(enablePromptCaching &&
block.cacheScope !== null && {
cache_control: getCacheControl({
scope: block.cacheScope,
querySource: options?.querySource,
}),
}),
}
})
}
type HaikuOptions = Omit<Options, 'model' | 'getToolPermissionContext'>
export async function queryHaiku({
systemPrompt = asSystemPrompt([]),
userPrompt,
outputFormat,
signal,
options,
}: {
systemPrompt: SystemPrompt
userPrompt: string
outputFormat?: BetaJSONOutputFormat
signal: AbortSignal
options: HaikuOptions
}): Promise<AssistantMessage> {
const result = await withVCR(
[
createUserMessage({
content: systemPrompt.map(text => ({ type: 'text', text })),
}),
createUserMessage({
content: userPrompt,
}),
],
async () => {
const messages = [
createUserMessage({
content: userPrompt,
}),
]
const result = await queryModelWithoutStreaming({
messages,
systemPrompt,
thinkingConfig: { type: 'disabled' },
tools: [],
signal,
options: {
...options,
model: getSmallFastModel(),
enablePromptCaching: options.enablePromptCaching ?? false,
outputFormat,
async getToolPermissionContext() {
return getEmptyToolPermissionContext()
},
},
})
return [result]
},
)
// We don't use streaming for Haiku so this is safe
return result[0]! as AssistantMessage
}
type QueryWithModelOptions = Omit<Options, 'getToolPermissionContext'>
/**
* Query a specific model through the Claude Code infrastructure.
* This goes through the full query pipeline including proper authentication,
* betas, and headers - unlike direct API calls.
*/
export async function queryWithModel({
systemPrompt = asSystemPrompt([]),
userPrompt,
outputFormat,
signal,
options,
}: {
systemPrompt: SystemPrompt
userPrompt: string
outputFormat?: BetaJSONOutputFormat
signal: AbortSignal
options: QueryWithModelOptions
}): Promise<AssistantMessage> {
const result = await withVCR(
[
createUserMessage({
content: systemPrompt.map(text => ({ type: 'text', text })),
}),
createUserMessage({
content: userPrompt,
}),
],
async () => {
const messages = [
createUserMessage({
content: userPrompt,
}),
]
const result = await queryModelWithoutStreaming({
messages,
systemPrompt,
thinkingConfig: { type: 'disabled' },
tools: [],
signal,
options: {
...options,
enablePromptCaching: options.enablePromptCaching ?? false,
outputFormat,
async getToolPermissionContext() {
return getEmptyToolPermissionContext()
},
},
})
return [result]
},
)
return result[0]! as AssistantMessage
}
// Non-streaming requests have a 10min max per the docs:
// https://platform.claude.com/docs/en/api/errors#long-requests
// The SDK's 21333-token cap is derived from 10min Γ 128k tokens/hour, but we
// bypass it by setting a client-level timeout, so we can cap higher.
export const MAX_NON_STREAMING_TOKENS = 64_000
/**
* Adjusts thinking budget when max_tokens is capped for non-streaming fallback.
* Ensures the API constraint: max_tokens > thinking.budget_tokens
*
* @param params - The parameters that will be sent to the API
* @param maxTokensCap - The maximum allowed tokens (MAX_NON_STREAMING_TOKENS)
* @returns Adjusted parameters with thinking budget capped if needed
*/
export function adjustParamsForNonStreaming<
T extends {
max_tokens: number
thinking?: BetaMessageStreamParams['thinking']
},
>(params: T, maxTokensCap: number): T {
const cappedMaxTokens = Math.min(params.max_tokens, maxTokensCap)
// Adjust thinking budget if it would exceed capped max_tokens
// to maintain the constraint: max_tokens > thinking.budget_tokens
const adjustedParams = { ...params }
if (
adjustedParams.thinking?.type === 'enabled' &&
adjustedParams.thinking.budget_tokens
) {
adjustedParams.thinking = {
...adjustedParams.thinking,
budget_tokens: Math.min(
adjustedParams.thinking.budget_tokens,
cappedMaxTokens - 1, // Must be at least 1 less than max_tokens
),
}
}
return {
...adjustedParams,
max_tokens: cappedMaxTokens,
}
}
function isMaxTokensCapEnabled(): boolean {
// 3P default: false (not validated on Bedrock/Vertex)
return getFeatureValue_CACHED_MAY_BE_STALE('tengu_otk_slot_v1', false)
}
export function getMaxOutputTokensForModel(model: string): number {
const maxOutputTokens = getModelMaxOutputTokens(model)
// Slot-reservation cap: drop default to 8k for all models. BQ p99 output
// = 4,911 tokens; 32k/64k defaults over-reserve 8-16Γ slot capacity.
// Requests hitting the cap get one clean retry at 64k (query.ts
// max_output_tokens_escalate). Math.min keeps models with lower native
// defaults (e.g. claude-3-opus at 4k) at their native value. Applied
// before the env-var override so CLAUDE_CODE_MAX_OUTPUT_TOKENS still wins.
const defaultTokens = isMaxTokensCapEnabled()
? Math.min(maxOutputTokens.default, CAPPED_DEFAULT_MAX_TOKENS)
: maxOutputTokens.default
const result = validateBoundedIntEnvVar(
'CLAUDE_CODE_MAX_OUTPUT_TOKENS',
process.env.CLAUDE_CODE_MAX_OUTPUT_TOKENS,
defaultTokens,
maxOutputTokens.upperLimit,
)
return result.effective
}