🎯 Use case
Public surface of the query module (engine wiring or re-exports). On the API surface it exposes QueryParams — mainly types, interfaces, or factory objects. Dependencies touch @anthropic-ai, src, and bun:bundle. It composes internal code from hooks, services, utils, Tool, and types (relative imports).
Generated from folder role, exports, dependency roots, and inline comments — not hand-reviewed for every path.
🧠 Inline summary
// biome-ignore-all assist/source/organizeImports: ANT-ONLY import markers must not be reordered import type { ToolResultBlockParam, ToolUseBlock, } from '@anthropic-ai/sdk/resources/index.mjs'
📤 Exports (heuristic)
QueryParams
📚 External import roots
Package roots from from "…" (relative paths omitted).
@anthropic-aisrcbun:bundle
🖥️ Source preview
// biome-ignore-all assist/source/organizeImports: ANT-ONLY import markers must not be reordered
import type {
ToolResultBlockParam,
ToolUseBlock,
} from '@anthropic-ai/sdk/resources/index.mjs'
import type { CanUseToolFn } from './hooks/useCanUseTool.js'
import { FallbackTriggeredError } from './services/api/withRetry.js'
import {
calculateTokenWarningState,
isAutoCompactEnabled,
type AutoCompactTrackingState,
} from './services/compact/autoCompact.js'
import { buildPostCompactMessages } from './services/compact/compact.js'
/* eslint-disable @typescript-eslint/no-require-imports */
const reactiveCompact = feature('REACTIVE_COMPACT')
? (require('./services/compact/reactiveCompact.js') as typeof import('./services/compact/reactiveCompact.js'))
: null
const contextCollapse = feature('CONTEXT_COLLAPSE')
? (require('./services/contextCollapse/index.js') as typeof import('./services/contextCollapse/index.js'))
: null
/* eslint-enable @typescript-eslint/no-require-imports */
import {
logEvent,
type AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
} from 'src/services/analytics/index.js'
import { ImageSizeError } from './utils/imageValidation.js'
import { ImageResizeError } from './utils/imageResizer.js'
import { findToolByName, type ToolUseContext } from './Tool.js'
import { asSystemPrompt, type SystemPrompt } from './utils/systemPromptType.js'
import type {
AssistantMessage,
AttachmentMessage,
Message,
RequestStartEvent,
StreamEvent,
ToolUseSummaryMessage,
UserMessage,
TombstoneMessage,
} from './types/message.js'
import { logError } from './utils/log.js'
import {
PROMPT_TOO_LONG_ERROR_MESSAGE,
isPromptTooLongMessage,
} from './services/api/errors.js'
import { logAntError, logForDebugging } from './utils/debug.js'
import {
createUserMessage,
createUserInterruptionMessage,
normalizeMessagesForAPI,
createSystemMessage,
createAssistantAPIErrorMessage,
getMessagesAfterCompactBoundary,
createToolUseSummaryMessage,
createMicrocompactBoundaryMessage,
stripSignatureBlocks,
} from './utils/messages.js'
import { generateToolUseSummary } from './services/toolUseSummary/toolUseSummaryGenerator.js'
import { prependUserContext, appendSystemContext } from './utils/api.js'
import {
createAttachmentMessage,
filterDuplicateMemoryAttachments,
getAttachmentMessages,
startRelevantMemoryPrefetch,
} from './utils/attachments.js'
/* eslint-disable @typescript-eslint/no-require-imports */
const skillPrefetch = feature('EXPERIMENTAL_SKILL_SEARCH')
? (require('./services/skillSearch/prefetch.js') as typeof import('./services/skillSearch/prefetch.js'))
: null
const jobClassifier = feature('TEMPLATES')
? (require('./jobs/classifier.js') as typeof import('./jobs/classifier.js'))
: null
/* eslint-enable @typescript-eslint/no-require-imports */
import {
remove as removeFromQueue,
getCommandsByMaxPriority,
isSlashCommand,
} from './utils/messageQueueManager.js'
import { notifyCommandLifecycle } from './utils/commandLifecycle.js'
import { headlessProfilerCheckpoint } from './utils/headlessProfiler.js'
import {
getRuntimeMainLoopModel,
renderModelName,
} from './utils/model/model.js'
import {
doesMostRecentAssistantMessageExceed200k,
finalContextTokensFromLastResponse,
tokenCountWithEstimation,
} from './utils/tokens.js'
import { ESCALATED_MAX_TOKENS } from './utils/context.js'
import { getFeatureValue_CACHED_MAY_BE_STALE } from './services/analytics/growthbook.js'
import { SLEEP_TOOL_NAME } from './tools/SleepTool/prompt.js'
import { executePostSamplingHooks } from './utils/hooks/postSamplingHooks.js'
import { executeStopFailureHooks } from './utils/hooks.js'
import type { QuerySource } from './constants/querySource.js'
import { createDumpPromptsFetch } from './services/api/dumpPrompts.js'
import { StreamingToolExecutor } from './services/tools/StreamingToolExecutor.js'
import { queryCheckpoint } from './utils/queryProfiler.js'
import { runTools } from './services/tools/toolOrchestration.js'
import { applyToolResultBudget } from './utils/toolResultStorage.js'
import { recordContentReplacement } from './utils/sessionStorage.js'
import { handleStopHooks } from './query/stopHooks.js'
import { buildQueryConfig } from './query/config.js'
import { productionDeps, type QueryDeps } from './query/deps.js'
import type { Terminal, Continue } from './query/transitions.js'
import { feature } from 'bun:bundle'
import {
getCurrentTurnTokenBudget,
getTurnOutputTokens,
incrementBudgetContinuationCount,
} from './bootstrap/state.js'
import { createBudgetTracker, checkTokenBudget } from './query/tokenBudget.js'
import { count } from './utils/array.js'
/* eslint-disable @typescript-eslint/no-require-imports */
const snipModule = feature('HISTORY_SNIP')
? (require('./services/compact/snipCompact.js') as typeof import('./services/compact/snipCompact.js'))
: null
const taskSummaryModule = feature('BG_SESSIONS')
? (require('./utils/taskSummary.js') as typeof import('./utils/taskSummary.js'))
: null
/* eslint-enable @typescript-eslint/no-require-imports */
function* yieldMissingToolResultBlocks(
assistantMessages: AssistantMessage[],
errorMessage: string,
) {
for (const assistantMessage of assistantMessages) {
// Extract all tool use blocks from this assistant message
const toolUseBlocks = assistantMessage.message.content.filter(
content => content.type === 'tool_use',
) as ToolUseBlock[]
// Emit an interruption message for each tool use
for (const toolUse of toolUseBlocks) {
yield createUserMessage({
content: [
{
type: 'tool_result',
content: errorMessage,
is_error: true,
tool_use_id: toolUse.id,
},
],
toolUseResult: errorMessage,
sourceToolAssistantUUID: assistantMessage.uuid,
})
}
}
}
/**
* The rules of thinking are lengthy and fortuitous. They require plenty of thinking
* of most long duration and deep meditation for a wizard to wrap one's noggin around.
*
* The rules follow:
* 1. A message that contains a thinking or redacted_thinking block must be part of a query whose max_thinking_length > 0
* 2. A thinking block may not be the last message in a block
* 3. Thinking blocks must be preserved for the duration of an assistant trajectory (a single turn, or if that turn includes a tool_use block then also its subsequent tool_result and the following assistant message)
*
* Heed these rules well, young wizard. For they are the rules of thinking, and
* the rules of thinking are the rules of the universe. If ye does not heed these
* rules, ye will be punished with an entire day of debugging and hair pulling.
*/
const MAX_OUTPUT_TOKENS_RECOVERY_LIMIT = 3
/**
* Is this a max_output_tokens error message? If so, the streaming loop should
* withhold it from SDK callers until we know whether the recovery loop can
* continue. Yielding early leaks an intermediate error to SDK callers (e.g.
* cowork/desktop) that terminate the session on any `error` field — the
* recovery loop keeps running but nobody is listening.
*
* Mirrors reactiveCompact.isWithheldPromptTooLong.
*/
function isWithheldMaxOutputTokens(
msg: Message | StreamEvent | undefined,
): msg is AssistantMessage {
return msg?.type === 'assistant' && msg.apiError === 'max_output_tokens'
}
export type QueryParams = {
messages: Message[]
systemPrompt: SystemPrompt
userContext: { [k: string]: string }
systemContext: { [k: string]: string }
canUseTool: CanUseToolFn
toolUseContext: ToolUseContext
fallbackModel?: string
querySource: QuerySource
maxOutputTokensOverride?: number
maxTurns?: number
skipCacheWrite?: boolean
// API task_budget (output_config.task_budget, beta task-budgets-2026-03-13).
// Distinct from the tokenBudget +500k auto-continue feature. `total` is the
// budget for the whole agentic turn; `remaining` is computed per iteration
// from cumulative API usage. See configureTaskBudgetParams in claude.ts.
taskBudget?: { total: number }
deps?: QueryDeps
}
// -- query loop state
// Mutable state carried between loop iterations
type State = {
messages: Message[]
toolUseContext: ToolUseContext
autoCompactTracking: AutoCompactTrackingState | undefined
maxOutputTokensRecoveryCount: number
hasAttemptedReactiveCompact: boolean
maxOutputTokensOverride: number | undefined
pendingToolUseSummary: Promise<ToolUseSummaryMessage | null> | undefined
stopHookActive: boolean | undefined
turnCount: number
// Why the previous iteration continued. Undefined on first iteration.
// Lets tests assert recovery paths fired without inspecting message contents.
transition: Continue | undefined
}
export async function* query(
params: QueryParams,
): AsyncGenerator<
| StreamEvent
| RequestStartEvent
| Message
| TombstoneMessage
| ToolUseSummaryMessage,
Terminal
> {
const consumedCommandUuids: string[] = []
const terminal = yield* queryLoop(params, consumedCommandUuids)
// Only reached if queryLoop returned normally. Skipped on throw (error
// propagates through yield*) and on .return() (Return completion closes
// both generators). This gives the same asymmetric started-without-completed
// signal as print.ts's drainCommandQueue when the turn fails.
for (const uuid of consumedCommandUuids) {
notifyCommandLifecycle(uuid, 'completed')
}
return terminal
}
async function* queryLoop(
params: QueryParams,
consumedCommandUuids: string[],
): AsyncGenerator<
| StreamEvent
| RequestStartEvent
| Message
| TombstoneMessage
| ToolUseSummaryMessage,
Terminal
> {
// Immutable params — never reassigned during the query loop.
const {
systemPrompt,
userContext,
systemContext,
canUseTool,
fallbackModel,
querySource,
maxTurns,
skipCacheWrite,
} = params
const deps = params.deps ?? productionDeps()
// Mutable cross-iteration state. The loop body destructures this at the top
// of each iteration so reads stay bare-name (`messages`, `toolUseContext`).
// Continue sites write `state = { ... }` instead of 9 separate assignments.
let state: State = {
messages: params.messages,
toolUseContext: params.toolUseContext,
maxOutputTokensOverride: params.maxOutputTokensOverride,
autoCompactTracking: undefined,
stopHookActive: undefined,
maxOutputTokensRecoveryCount: 0,
hasAttemptedReactiveCompact: false,
turnCount: 1,
pendingToolUseSummary: undefined,
transition: undefined,
}
const budgetTracker = feature('TOKEN_BUDGET') ? createBudgetTracker() : null
// task_budget.remaining tracking across compaction boundaries. Undefined
// until first compact fires — while context is uncompacted the server can
// see the full history and handles the countdown from {total} itself (see
// api/api/sampling/prompt/renderer.py:292). After a compact, the server sees
// only the summary and would under-count spend; remaining tells it the
// pre-compact final window that got summarized away. Cumulative across
// multiple compacts: each subtracts the final context at that compact's
// trigger point. Loop-local (not on State) to avoid touching the 7 continue
// sites.
let taskBudgetRemaining: number | undefined = undefined
// Snapshot immutable env/statsig/session state once at entry. See QueryConfig
// for what's included and why feature() gates are intentionally excluded.
const config = buildQueryConfig()
// Fired once per user turn — the prompt is invariant across loop iterations,
// so per-iteration firing would ask sideQuery the same question N times.
// Consume point polls settledAt (never blocks). `using` disposes on all
// generator exit paths — see MemoryPrefetch for dispose/telemetry semantics.
using pendingMemoryPrefetch = startRelevantMemoryPrefetch(
state.messages,
state.toolUseContext,
)
// eslint-disable-next-line no-constant-condition
while (true) {
// Destructure state at the top of each iteration. toolUseContext alone
// is reassigned within an iteration (queryTracking, messages updates);
// the rest are read-only between continue sites.
let { toolUseContext } = state
const {
messages,
autoCompactTracking,
maxOutputTokensRecoveryCount,
hasAttemptedReactiveCompact,
maxOutputTokensOverride,
pendingToolUseSummary,
stopHookActive,
turnCount,
} = state
// Skill discovery prefetch — per-iteration (uses findWritePivot guard
// that returns early on non-write iterations). Discovery runs while the
// model streams and tools execute; awaited post-tools alongside the
// memory prefetch consume. Replaces the blocking assistant_turn path
// that ran inside getAttachmentMessages (97% of those calls found
// nothing in prod). Turn-0 user-input discovery still blocks in
// userInputAttachments — that's the one signal where there's no prior
// work to hide under.
const pendingSkillPrefetch = skillPrefetch?.startSkillDiscoveryPrefetch(
null,
messages,
toolUseContext,
)
yield { type: 'stream_request_start' }
queryCheckpoint('query_fn_entry')
// Record query start for headless latency tracking (skip for subagents)
if (!toolUseContext.agentId) {
headlessProfilerCheckpoint('query_started')
}
// Initialize or increment query chain tracking
const queryTracking = toolUseContext.queryTracking
? {
chainId: toolUseContext.queryTracking.chainId,
depth: toolUseContext.queryTracking.depth + 1,
}
: {
chainId: deps.uuid(),
depth: 0,
}
const queryChainIdForAnalytics =
queryTracking.chainId as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS
toolUseContext = {
...toolUseContext,
queryTracking,
}
let messagesForQuery = [...getMessagesAfterCompactBoundary(messages)]
let tracking = autoCompactTracking
// Enforce per-message budget on aggregate tool result size. Runs BEFORE
// microcompact — cached MC operates purely by tool_use_id (never inspects
// content), so content replacement is invisible to it and the two compose
// cleanly. No-ops when contentReplacementState is undefined (feature off).
// Persist only for querySources that read records back on resume: agentId
// routes to sidechain file (AgentTool resume) or session file (/resume).
// Ephemeral runForkedAgent callers (agent_summary etc.) don't persist.
const persistReplacements =
querySource.startsWith('agent:') ||
querySource.startsWith('repl_main_thread')
messagesForQuery = await applyToolResultBudget(
messagesForQuery,
toolUseContext.contentReplacementState,
persistReplacements
? records =>
void recordContentReplacement(
records,
toolUseContext.agentId,
).catch(logError)
: undefined,
new Set(
toolUseContext.options.tools
.filter(t => !Number.isFinite(t.maxResultSizeChars))
.map(t => t.name),
),
)
// Apply snip before microcompact (both may run — they are not mutually exclusive).
// snipTokensFreed is plumbed to autocompact so its threshold check reflects
// what snip removed; tokenCountWithEstimation alone can't see it (reads usage
// from the protected-tail assistant, which survives snip unchanged).
let snipTokensFreed = 0
if (feature('HISTORY_SNIP')) {
queryCheckpoint('query_snip_start')
const snipResult = snipModule!.snipCompactIfNeeded(messagesForQuery)
messagesForQuery = snipResult.messages
snipTokensFreed = snipResult.tokensFreed
if (snipResult.boundaryMessage) {
yield snipResult.boundaryMessage
}
queryCheckpoint('query_snip_end')
}
// Apply microcompact before autocompact
queryCheckpoint('query_microcompact_start')
const microcompactResult = await deps.microcompact(
messagesForQuery,
toolUseContext,
querySource,
)
messagesForQuery = microcompactResult.messages
// For cached microcompact (cache editing), defer boundary message until after
// the API response so we can use actual cache_deleted_input_tokens.
// Gated behind feature() so the string is eliminated from external builds.
const pendingCacheEdits = feature('CACHED_MICROCOMPACT')
? microcompactResult.compactionInfo?.pendingCacheEdits
: undefined
queryCheckpoint('query_microcompact_end')
// Project the collapsed context view and maybe commit more collapses.
// Runs BEFORE autocompact so that if collapse gets us under the
// autocompact threshold, autocompact is a no-op and we keep granular
// context instead of a single summary.
//
// Nothing is yielded — the collapsed view is a read-time projection
// over the REPL's full history. Summary messages live in the collapse
// store, not the REPL array. This is what makes collapses persist
// across turns: projectView() replays the commit log on every entry.
// Within a turn, the view flows forward via state.messages at the
// continue site (query.ts:1192), and the next projectView() no-ops
// because the archived messages are already gone from its input.
if (feature('CONTEXT_COLLAPSE') && contextCollapse) {
const collapseResult = await contextCollapse.applyCollapsesIfNeeded(
messagesForQuery,
toolUseContext,
querySource,
)
messagesForQuery = collapseResult.messages
}
const fullSystemPrompt = asSystemPrompt(
appendSystemContext(systemPrompt, systemContext),
)
queryCheckpoint('query_autocompact_start')
const { compactionResult, consecutiveFailures } = await deps.autocompact(
messagesForQuery,
toolUseContext,
{
systemPrompt,
userContext,
systemContext,
toolUseContext,
forkContextMessages: messagesForQuery,
},
querySource,
tracking,
snipTokensFreed,
)
queryCheckpoint('query_autocompact_end')
if (compactionResult) {
const {
preCompactTokenCount,
postCompactTokenCount,
truePostCompactTokenCount,
compactionUsage,
} = compactionResult
logEvent('tengu_auto_compact_succeeded', {
originalMessageCount: messages.length,
compactedMessageCount:
compactionResult.summaryMessages.length +
compactionResult.attachments.length +
compactionResult.hookResults.length,
preCompactTokenCount,
postCompactTokenCount,
truePostCompactTokenCount,
compactionInputTokens: compactionUsage?.input_tokens,
compactionOutputTokens: compactionUsage?.output_tokens,
compactionCacheReadTokens:
compactionUsage?.cache_read_input_tokens ?? 0,
compactionCacheCreationTokens:
compactionUsage?.cache_creation_input_tokens ?? 0,
compactionTotalTokens: compactionUsage
? compactionUsage.input_tokens +
(compactionUsage.cache_creation_input_tokens ?? 0) +
(compactionUsage.cache_read_input_tokens ?? 0) +
compactionUsage.output_tokens
: 0,
queryChainId: queryChainIdForAnalytics,
queryDepth: queryTracking.depth,
})
// task_budget: capture pre-compact final context window before
// messagesForQuery is replaced with postCompactMessages below.
// iterations[-1] is the authoritative final window (post server tool
// loops); see #304930.
if (params.taskBudget) {
const preCompactContext =
finalContextTokensFromLastResponse(messagesForQuery)
taskBudgetRemaining = Math.max(
0,
(taskBudgetRemaining ?? params.taskBudget.total) - preCompactContext,
)
}
// Reset on every compact so turnCounter/turnId reflect the MOST RECENT
// compact. recompactionInfo (autoCompact.ts:190) already captured the
// old values for turnsSincePreviousCompact/previousCompactTurnId before
// the call, so this reset doesn't lose those.
tracking = {
compacted: true,
turnId: deps.uuid(),
turnCounter: 0,
consecutiveFailures: 0,
}
const postCompactMessages = buildPostCompactMessages(compactionResult)
for (const message of postCompactMessages) {
yield message
}
// Continue on with the current query call using the post compact messages
messagesForQuery = postCompactMessages
} else if (consecutiveFailures !== undefined) {
// Autocompact failed — propagate failure count so the circuit breaker
// can stop retrying on the next iteration.
tracking = {
...(tracking ?? { compacted: false, turnId: '', turnCounter: 0 }),
consecutiveFailures,
}
}
//TODO: no need to set toolUseContext.messages during set-up since it is updated here
toolUseContext = {
...toolUseContext,
messages: messagesForQuery,
}
const assistantMessages: AssistantMessage[] = []
const toolResults: (UserMessage | AttachmentMessage)[] = []
// @see https://docs.claude.com/en/docs/build-with-claude/tool-use
// Note: stop_reason === 'tool_use' is unreliable -- it's not always set correctly.
// Set during streaming whenever a tool_use block arrives — the sole
// loop-exit signal. If false after streaming, we're done (modulo stop-hook retry).
const toolUseBlocks: ToolUseBlock[] = []
let needsFollowUp = false
queryCheckpoint('query_setup_start')
const useStreamingToolExecution = config.gates.streamingToolExecution
let streamingToolExecutor = useStreamingToolExecution
? new StreamingToolExecutor(
toolUseContext.options.tools,
canUseTool,
toolUseContext,
)
: null
const appState = toolUseContext.getAppState()
const permissionMode = appState.toolPermissionContext.mode
let currentModel = getRuntimeMainLoopModel({
permissionMode,
mainLoopModel: toolUseContext.options.mainLoopModel,
exceeds200kTokens:
permissionMode === 'plan' &&
doesMostRecentAssistantMessageExceed200k(messagesForQuery),
})
queryCheckpoint('query_setup_end')
// Create fetch wrapper once per query session to avoid memory retention.
// Each call to createDumpPromptsFetch creates a closure that captures the request body.
// Creating it once means only the latest request body is retained (~700KB),
// instead of all request bodies from the session (~500MB for long sessions).
// Note: agentId is effectively constant during a query() call - it only changes
// between queries (e.g., /clear command or session resume).
const dumpPromptsFetch = config.gates.isAnt
? createDumpPromptsFetch(toolUseContext.agentId ?? config.sessionId)
: undefined
// Block if we've hit the hard blocking limit (only applies when auto-compact is OFF)
// This reserves space so users can still run /compact manually
// Skip this check if compaction just happened - the compaction result is already
// validated to be under the threshold, and tokenCountWithEstimation would use
// stale input_tokens from kept messages that reflect pre-compaction context size.
// Same staleness applies to snip: subtract snipTokensFreed (otherwise we'd
// falsely block in the window where snip brought us under autocompact threshold
// but the stale usage is still above blocking limit — before this PR that
// window never existed because autocompact always fired on the stale count).
// Also skip for compact/session_memory queries — these are forked agents that
// inherit the full conversation and would deadlock if blocked here (the compact
// agent needs to run to REDUCE the token count).
// Also skip when reactive compact is enabled and automatic compaction is
// allowed — the preempt's synthetic error returns before the API call,
// so reactive compact would never see a prompt-too-long to react to.
// Widened to walrus so RC can act as fallback when proactive fails.
//
// Same skip for context-collapse: its recoverFromOverflow drains
// staged collapses on a REAL API 413, then falls through to
// reactiveCompact. A synthetic preempt here would return before the
// API call and starve both recovery paths. The isAutoCompactEnabled()
// conjunct preserves the user's explicit "no automatic anything"
// config — if they set DISABLE_AUTO_COMPACT, they get the preempt.
let collapseOwnsIt = false
if (feature('CONTEXT_COLLAPSE')) {
collapseOwnsIt =
(contextCollapse?.isContextCollapseEnabled() ?? false) &&
isAutoCompactEnabled()
}
// Hoist media-recovery gate once per turn. Withholding (inside the
// stream loop) and recovery (after) must agree; CACHED_MAY_BE_STALE can
// flip during the 5-30s stream, and withhold-without-recover would eat
// the message. PTL doesn't hoist because its withholding is ungated —
// it predates the experiment and is already the control-arm baseline.
const mediaRecoveryEnabled =
reactiveCompact?.isReactiveCompactEnabled() ?? false
if (
!compactionResult &&
querySource !== 'compact' &&
querySource !== 'session_memory' &&
!(
reactiveCompact?.isReactiveCompactEnabled() && isAutoCompactEnabled()
) &&
!collapseOwnsIt
) {
const { isAtBlockingLimit } = calculateTokenWarningState(
tokenCountWithEstimation(messagesForQuery) - snipTokensFreed,
toolUseContext.options.mainLoopModel,
)
if (isAtBlockingLimit) {
yield createAssistantAPIErrorMessage({
content: PROMPT_TOO_LONG_ERROR_MESSAGE,
error: 'invalid_request',
})
return { reason: 'blocking_limit' }
}
}
let attemptWithFallback = true
queryCheckpoint('query_api_loop_start')
try {
while (attemptWithFallback) {
attemptWithFallback = false
try {
let streamingFallbackOccured = false
queryCheckpoint('query_api_streaming_start')
for await (const message of deps.callModel({
messages: prependUserContext(messagesForQuery, userContext),
systemPrompt: fullSystemPrompt,
thinkingConfig: toolUseContext.options.thinkingConfig,
tools: toolUseContext.options.tools,
signal: toolUseContext.abortController.signal,
options: {
async getToolPermissionContext() {
const appState = toolUseContext.getAppState()
return appState.toolPermissionContext
},
model: currentModel,
...(config.gates.fastModeEnabled && {
fastMode: appState.fastMode,
}),
toolChoice: undefined,
isNonInteractiveSession:
toolUseContext.options.isNonInteractiveSession,
fallbackModel,
onStreamingFallback: () => {
streamingFallbackOccured = true
},
querySource,
agents: toolUseContext.options.agentDefinitions.activeAgents,
allowedAgentTypes:
toolUseContext.options.agentDefinitions.allowedAgentTypes,
hasAppendSystemPrompt:
!!toolUseContext.options.appendSystemPrompt,
maxOutputTokensOverride,
fetchOverride: dumpPromptsFetch,
mcpTools: appState.mcp.tools,
hasPendingMcpServers: appState.mcp.clients.some(
c => c.type === 'pending',
),
queryTracking,
effortValue: appState.effortValue,
advisorModel: appState.advisorModel,
skipCacheWrite,
agentId: toolUseContext.agentId,
addNotification: toolUseContext.addNotification,
...(params.taskBudget && {
taskBudget: {
total: params.taskBudget.total,
...(taskBudgetRemaining !== undefined && {
remaining: taskBudgetRemaining,
}),
},
}),
},
})) {
// We won't use the tool_calls from the first attempt
// We could.. but then we'd have to merge assistant messages
// with different ids and double up on full the tool_results
if (streamingFallbackOccured) {
// Yield tombstones for orphaned messages so they're removed from UI and transcript.
// These partial messages (especially thinking blocks) have invalid signatures
// that would cause "thinking blocks cannot be modified" API errors.
for (const msg of assistantMessages) {
yield { type: 'tombstone' as const, message: msg }
}
logEvent('tengu_orphaned_messages_tombstoned', {
orphanedMessageCount: assistantMessages.length,
queryChainId: queryChainIdForAnalytics,
queryDepth: queryTracking.depth,
})
assistantMessages.length = 0
toolResults.length = 0
toolUseBlocks.length = 0
needsFollowUp = false
// Discard pending results from the failed streaming attempt and create
// a fresh executor. This prevents orphan tool_results (with old tool_use_ids)
// from being yielded after the fallback response arrives.
if (streamingToolExecutor) {
streamingToolExecutor.discard()
streamingToolExecutor = new StreamingToolExecutor(
toolUseContext.options.tools,
canUseTool,
toolUseContext,
)
}
}
// Backfill tool_use inputs on a cloned message before yield so
// SDK stream output and transcript serialization see legacy/derived
// fields. The original `message` is left untouched for
// assistantMessages.push below — it flows back to the API and
// mutating it would break prompt caching (byte mismatch).
let yieldMessage: typeof message = message
if (message.type === 'assistant') {
let clonedContent: typeof message.message.content | undefined
for (let i = 0; i < message.message.content.length; i++) {
const block = message.message.content[i]!
if (
block.type === 'tool_use' &&
typeof block.input === 'object' &&
block.input !== null
) {
const tool = findToolByName(
toolUseContext.options.tools,
block.name,
)
if (tool?.backfillObservableInput) {
const originalInput = block.input as Record<string, unknown>
const inputCopy = { ...originalInput }
tool.backfillObservableInput(inputCopy)
// Only yield a clone when backfill ADDED fields; skip if
// it only OVERWROTE existing ones (e.g. file tools
// expanding file_path). Overwrites change the serialized
// transcript and break VCR fixture hashes on resume,
// while adding nothing the SDK stream needs — hooks get
// the expanded path via toolExecution.ts separately.
const addedFields = Object.keys(inputCopy).some(
k => !(k in originalInput),
)
if (addedFields) {
clonedContent ??= [...message.message.content]
clonedContent[i] = { ...block, input: inputCopy }
}
}
}
}
if (clonedContent) {
yieldMessage = {
...message,
message: { ...message.message, content: clonedContent },
}
}
}
// Withhold recoverable errors (prompt-too-long, max-output-tokens)
// until we know whether recovery (collapse drain / reactive
// compact / truncation retry) can succeed. Still pushed to
// assistantMessages so the recovery checks below find them.
// Either subsystem's withhold is sufficient — they're
// independent so turning one off doesn't break the other's
// recovery path.
//
// feature() only works in if/ternary conditions (bun:bundle
// tree-shaking constraint), so the collapse check is nested
// rather than composed.
let withheld = false
if (feature('CONTEXT_COLLAPSE')) {
if (
contextCollapse?.isWithheldPromptTooLong(
message,
isPromptTooLongMessage,
querySource,
)
) {
withheld = true
}
}
if (reactiveCompact?.isWithheldPromptTooLong(message)) {
withheld = true
}
if (
mediaRecoveryEnabled &&
reactiveCompact?.isWithheldMediaSizeError(message)
) {
withheld = true
}
if (isWithheldMaxOutputTokens(message)) {
withheld = true
}
if (!withheld) {
yield yieldMessage
}
if (message.type === 'assistant') {
assistantMessages.push(message)
const msgToolUseBlocks = message.message.content.filter(
content => content.type === 'tool_use',
) as ToolUseBlock[]
if (msgToolUseBlocks.length > 0) {
toolUseBlocks.push(...msgToolUseBlocks)
needsFollowUp = true
}
if (
streamingToolExecutor &&
!toolUseContext.abortController.signal.aborted
) {
for (const toolBlock of msgToolUseBlocks) {
streamingToolExecutor.addTool(toolBlock, message)
}
}
}
if (
streamingToolExecutor &&
!toolUseContext.abortController.signal.aborted
) {
for (const result of streamingToolExecutor.getCompletedResults()) {
if (result.message) {
yield result.message
toolResults.push(
...normalizeMessagesForAPI(
[result.message],
toolUseContext.options.tools,
).filter(_ => _.type === 'user'),
)
}
}
}
}
queryCheckpoint('query_api_streaming_end')
// Yield deferred microcompact boundary message using actual API-reported
// token deletion count instead of client-side estimates.
// Entire block gated behind feature() so the excluded string
// is eliminated from external builds.
if (feature('CACHED_MICROCOMPACT') && pendingCacheEdits) {
const lastAssistant = assistantMessages.at(-1)
// The API field is cumulative/sticky across requests, so we
// subtract the baseline captured before this request to get the delta.
const usage = lastAssistant?.message.usage
const cumulativeDeleted = usage
? ((usage as unknown as Record<string, number>)
.cache_deleted_input_tokens ?? 0)
: 0
const deletedTokens = Math.max(
0,
cumulativeDeleted - pendingCacheEdits.baselineCacheDeletedTokens,
)
if (deletedTokens > 0) {
yield createMicrocompactBoundaryMessage(
pendingCacheEdits.trigger,
0,
deletedTokens,
pendingCacheEdits.deletedToolIds,
[],
)
}
}
} catch (innerError) {
if (innerError instanceof FallbackTriggeredError && fallbackModel) {
// Fallback was triggered - switch model and retry
currentModel = fallbackModel
attemptWithFallback = true
// Clear assistant messages since we'll retry the entire request
yield* yieldMissingToolResultBlocks(
assistantMessages,
'Model fallback triggered',
)
assistantMessages.length = 0
toolResults.length = 0
toolUseBlocks.length = 0
needsFollowUp = false
// Discard pending results from the failed attempt and create a
// fresh executor. This prevents orphan tool_results (with old
// tool_use_ids) from leaking into the retry.
if (streamingToolExecutor) {
streamingToolExecutor.discard()
streamingToolExecutor = new StreamingToolExecutor(
toolUseContext.options.tools,
canUseTool,
toolUseContext,
)
}
// Update tool use context with new model
toolUseContext.options.mainLoopModel = fallbackModel
// Thinking signatures are model-bound: replaying a protected-thinking
// block (e.g. capybara) to an unprotected fallback (e.g. opus) 400s.
// Strip before retry so the fallback model gets clean history.
if (process.env.USER_TYPE === 'ant') {
messagesForQuery = stripSignatureBlocks(messagesForQuery)
}
// Log the fallback event
logEvent('tengu_model_fallback_triggered', {
original_model:
innerError.originalModel as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
fallback_model:
fallbackModel as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
entrypoint:
'cli' as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
queryChainId: queryChainIdForAnalytics,
queryDepth: queryTracking.depth,
})
// Yield system message about fallback — use 'warning' level so
// users see the notification without needing verbose mode
yield createSystemMessage(
`Switched to ${renderModelName(innerError.fallbackModel)} due to high demand for ${renderModelName(innerError.originalModel)}`,
'warning',
)
continue
}
throw innerError
}
}
} catch (error) {
logError(error)
const errorMessage =
error instanceof Error ? error.message : String(error)
logEvent('tengu_query_error', {
assistantMessages: assistantMessages.length,
toolUses: assistantMessages.flatMap(_ =>
_.message.content.filter(content => content.type === 'tool_use'),
).length,
queryChainId: queryChainIdForAnalytics,
queryDepth: queryTracking.depth,
})
// Handle image size/resize errors with user-friendly messages
if (
error instanceof ImageSizeError ||
error instanceof ImageResizeError
) {
yield createAssistantAPIErrorMessage({
content: error.message,
})
return { reason: 'image_error' }
}
// Generally queryModelWithStreaming should not throw errors but instead
// yield them as synthetic assistant messages. However if it does throw
// due to a bug, we may end up in a state where we have already emitted
// a tool_use block but will stop before emitting the tool_result.
yield* yieldMissingToolResultBlocks(assistantMessages, errorMessage)
// Surface the real error instead of a misleading "[Request interrupted
// by user]" — this path is a model/runtime failure, not a user action.
// SDK consumers were seeing phantom interrupts on e.g. Node 18's missing
// Array.prototype.with(), masking the actual cause.
yield createAssistantAPIErrorMessage({
content: errorMessage,
})
// To help track down bugs, log loudly for ants
logAntError('Query error', error)
return { reason: 'model_error', error }
}
// Execute post-sampling hooks after model response is complete
if (assistantMessages.length > 0) {
void executePostSamplingHooks(
[...messagesForQuery, ...assistantMessages],
systemPrompt,
userContext,
systemContext,
toolUseContext,
querySource,
)
}
// We need to handle a streaming abort before anything else.
// When using streamingToolExecutor, we must consume getRemainingResults() so the
// executor can generate synthetic tool_result blocks for queued/in-progress tools.
// Without this, tool_use blocks would lack matching tool_result blocks.
if (toolUseContext.abortController.signal.aborted) {
if (streamingToolExecutor) {
// Consume remaining results - executor generates synthetic tool_results for
// aborted tools since it checks the abort signal in executeTool()
for await (const update of streamingToolExecutor.getRemainingResults()) {
if (update.message) {
yield update.message
}
}
} else {
yield* yieldMissingToolResultBlocks(
assistantMessages,
'Interrupted by user',
)
}
// chicago MCP: auto-unhide + lock release on interrupt. Same cleanup
// as the natural turn-end path in stopHooks.ts. Main thread only —
// see stopHooks.ts for the subagent-releasing-main's-lock rationale.
if (feature('CHICAGO_MCP') && !toolUseContext.agentId) {
try {
const { cleanupComputerUseAfterTurn } = await import(
'./utils/computerUse/cleanup.js'
)
await cleanupComputerUseAfterTurn(toolUseContext)
} catch {
// Failures are silent — this is dogfooding cleanup, not critical path
}
}
// Skip the interruption message for submit-interrupts — the queued
// user message that follows provides sufficient context.
if (toolUseContext.abortController.signal.reason !== 'interrupt') {
yield createUserInterruptionMessage({
toolUse: false,
})
}
return { reason: 'aborted_streaming' }
}
// Yield tool use summary from previous turn — haiku (~1s) resolved during model streaming (5-30s)
if (pendingToolUseSummary) {
const summary = await pendingToolUseSummary
if (summary) {
yield summary
}
}
if (!needsFollowUp) {
const lastMessage = assistantMessages.at(-1)
// Prompt-too-long recovery: the streaming loop withheld the error
// (see withheldByCollapse / withheldByReactive above). Try collapse
// drain first (cheap, keeps granular context), then reactive compact
// (full summary). Single-shot on each — if a retry still 413's,
// the next stage handles it or the error surfaces.
const isWithheld413 =
lastMessage?.type === 'assistant' &&
lastMessage.isApiErrorMessage &&
isPromptTooLongMessage(lastMessage)
// Media-size rejections (image/PDF/many-image) are recoverable via
// reactive compact's strip-retry. Unlike PTL, media errors skip the
// collapse drain — collapse doesn't strip images. mediaRecoveryEnabled
// is the hoisted gate from before the stream loop (same value as the
// withholding check — these two must agree or a withheld message is
// lost). If the oversized media is in the preserved tail, the
// post-compact turn will media-error again; hasAttemptedReactiveCompact
// prevents a spiral and the error surfaces.
const isWithheldMedia =
mediaRecoveryEnabled &&
reactiveCompact?.isWithheldMediaSizeError(lastMessage)
if (isWithheld413) {
// First: drain all staged context-collapses. Gated on the PREVIOUS
// transition not being collapse_drain_retry — if we already drained
// and the retry still 413'd, fall through to reactive compact.
if (
feature('CONTEXT_COLLAPSE') &&
contextCollapse &&
state.transition?.reason !== 'collapse_drain_retry'
) {
const drained = contextCollapse.recoverFromOverflow(
messagesForQuery,
querySource,
)
if (drained.committed > 0) {
const next: State = {
messages: drained.messages,
toolUseContext,
autoCompactTracking: tracking,
maxOutputTokensRecoveryCount,
hasAttemptedReactiveCompact,
maxOutputTokensOverride: undefined,
pendingToolUseSummary: undefined,
stopHookActive: undefined,
turnCount,
transition: {
reason: 'collapse_drain_retry',
committed: drained.committed,
},
}
state = next
continue
}
}
}
if ((isWithheld413 || isWithheldMedia) && reactiveCompact) {
const compacted = await reactiveCompact.tryReactiveCompact({
hasAttempted: hasAttemptedReactiveCompact,
querySource,
aborted: toolUseContext.abortController.signal.aborted,
messages: messagesForQuery,
cacheSafeParams: {
systemPrompt,
userContext,
systemContext,
toolUseContext,
forkContextMessages: messagesForQuery,
},
})
if (compacted) {
// task_budget: same carryover as the proactive path above.
// messagesForQuery still holds the pre-compact array here (the
// 413-failed attempt's input).
if (params.taskBudget) {
const preCompactContext =
finalContextTokensFromLastResponse(messagesForQuery)
taskBudgetRemaining = Math.max(
0,
(taskBudgetRemaining ?? params.taskBudget.total) -
preCompactContext,
)
}
const postCompactMessages = buildPostCompactMessages(compacted)
for (const msg of postCompactMessages) {
yield msg
}
const next: State = {
messages: postCompactMessages,
toolUseContext,
autoCompactTracking: undefined,
maxOutputTokensRecoveryCount,
hasAttemptedReactiveCompact: true,
maxOutputTokensOverride: undefined,
pendingToolUseSummary: undefined,
stopHookActive: undefined,
turnCount,
transition: { reason: 'reactive_compact_retry' },
}
state = next
continue
}
// No recovery — surface the withheld error and exit. Do NOT fall
// through to stop hooks: the model never produced a valid response,
// so hooks have nothing meaningful to evaluate. Running stop hooks
// on prompt-too-long creates a death spiral: error → hook blocking
// → retry → error → … (the hook injects more tokens each cycle).
yield lastMessage
void executeStopFailureHooks(lastMessage, toolUseContext)
return { reason: isWithheldMedia ? 'image_error' : 'prompt_too_long' }
} else if (feature('CONTEXT_COLLAPSE') && isWithheld413) {
// reactiveCompact compiled out but contextCollapse withheld and
// couldn't recover (staged queue empty/stale). Surface. Same
// early-return rationale — don't fall through to stop hooks.
yield lastMessage
void executeStopFailureHooks(lastMessage, toolUseContext)
return { reason: 'prompt_too_long' }
}
// Check for max_output_tokens and inject recovery message. The error
// was withheld from the stream above; only surface it if recovery
// exhausts.
if (isWithheldMaxOutputTokens(lastMessage)) {
// Escalating retry: if we used the capped 8k default and hit the
// limit, retry the SAME request at 64k — no meta message, no
// multi-turn dance. This fires once per turn (guarded by the
// override check), then falls through to multi-turn recovery if
// 64k also hits the cap.
// 3P default: false (not validated on Bedrock/Vertex)
const capEnabled = getFeatureValue_CACHED_MAY_BE_STALE(
'tengu_otk_slot_v1',
false,
)
if (
capEnabled &&
maxOutputTokensOverride === undefined &&
!process.env.CLAUDE_CODE_MAX_OUTPUT_TOKENS
) {
logEvent('tengu_max_tokens_escalate', {
escalatedTo: ESCALATED_MAX_TOKENS,
})
const next: State = {
messages: messagesForQuery,
toolUseContext,
autoCompactTracking: tracking,
maxOutputTokensRecoveryCount,
hasAttemptedReactiveCompact,
maxOutputTokensOverride: ESCALATED_MAX_TOKENS,
pendingToolUseSummary: undefined,
stopHookActive: undefined,
turnCount,
transition: { reason: 'max_output_tokens_escalate' },
}
state = next
continue
}
if (maxOutputTokensRecoveryCount < MAX_OUTPUT_TOKENS_RECOVERY_LIMIT) {
const recoveryMessage = createUserMessage({
content:
`Output token limit hit. Resume directly — no apology, no recap of what you were doing. ` +
`Pick up mid-thought if that is where the cut happened. Break remaining work into smaller pieces.`,
isMeta: true,
})
const next: State = {
messages: [
...messagesForQuery,
...assistantMessages,
recoveryMessage,
],
toolUseContext,
autoCompactTracking: tracking,
maxOutputTokensRecoveryCount: maxOutputTokensRecoveryCount + 1,
hasAttemptedReactiveCompact,
maxOutputTokensOverride: undefined,
pendingToolUseSummary: undefined,
stopHookActive: undefined,
turnCount,
transition: {
reason: 'max_output_tokens_recovery',
attempt: maxOutputTokensRecoveryCount + 1,
},
}
state = next
continue
}
// Recovery exhausted — surface the withheld error now.
yield lastMessage
}
// Skip stop hooks when the last message is an API error (rate limit,
// prompt-too-long, auth failure, etc.). The model never produced a
// real response — hooks evaluating it create a death spiral:
// error → hook blocking → retry → error → …
if (lastMessage?.isApiErrorMessage) {
void executeStopFailureHooks(lastMessage, toolUseContext)
return { reason: 'completed' }
}
const stopHookResult = yield* handleStopHooks(
messagesForQuery,
assistantMessages,
systemPrompt,
userContext,
systemContext,
toolUseContext,
querySource,
stopHookActive,
)
if (stopHookResult.preventContinuation) {
return { reason: 'stop_hook_prevented' }
}
if (stopHookResult.blockingErrors.length > 0) {
const next: State = {
messages: [
...messagesForQuery,
...assistantMessages,
...stopHookResult.blockingErrors,
],
toolUseContext,
autoCompactTracking: tracking,
maxOutputTokensRecoveryCount: 0,
// Preserve the reactive compact guard — if compact already ran and
// couldn't recover from prompt-too-long, retrying after a stop-hook
// blocking error will produce the same result. Resetting to false
// here caused an infinite loop: compact → still too long → error →
// stop hook blocking → compact → … burning thousands of API calls.
hasAttemptedReactiveCompact,
maxOutputTokensOverride: undefined,
pendingToolUseSummary: undefined,
stopHookActive: true,
turnCount,
transition: { reason: 'stop_hook_blocking' },
}
state = next
continue
}
if (feature('TOKEN_BUDGET')) {
const decision = checkTokenBudget(
budgetTracker!,
toolUseContext.agentId,
getCurrentTurnTokenBudget(),
getTurnOutputTokens(),
)
if (decision.action === 'continue') {
incrementBudgetContinuationCount()
logForDebugging(
`Token budget continuation #${decision.continuationCount}: ${decision.pct}% (${decision.turnTokens.toLocaleString()} / ${decision.budget.toLocaleString()})`,
)
state = {
messages: [
...messagesForQuery,
...assistantMessages,
createUserMessage({
content: decision.nudgeMessage,
isMeta: true,
}),
],
toolUseContext,
autoCompactTracking: tracking,
maxOutputTokensRecoveryCount: 0,
hasAttemptedReactiveCompact: false,
maxOutputTokensOverride: undefined,
pendingToolUseSummary: undefined,
stopHookActive: undefined,
turnCount,
transition: { reason: 'token_budget_continuation' },
}
continue
}
if (decision.completionEvent) {
if (decision.completionEvent.diminishingReturns) {
logForDebugging(
`Token budget early stop: diminishing returns at ${decision.completionEvent.pct}%`,
)
}
logEvent('tengu_token_budget_completed', {
...decision.completionEvent,
queryChainId: queryChainIdForAnalytics,
queryDepth: queryTracking.depth,
})
}
}
return { reason: 'completed' }
}
let shouldPreventContinuation = false
let updatedToolUseContext = toolUseContext
queryCheckpoint('query_tool_execution_start')
if (streamingToolExecutor) {
logEvent('tengu_streaming_tool_execution_used', {
tool_count: toolUseBlocks.length,
queryChainId: queryChainIdForAnalytics,
queryDepth: queryTracking.depth,
})
} else {
logEvent('tengu_streaming_tool_execution_not_used', {
tool_count: toolUseBlocks.length,
queryChainId: queryChainIdForAnalytics,
queryDepth: queryTracking.depth,
})
}
const toolUpdates = streamingToolExecutor
? streamingToolExecutor.getRemainingResults()
: runTools(toolUseBlocks, assistantMessages, canUseTool, toolUseContext)
for await (const update of toolUpdates) {
if (update.message) {
yield update.message
if (
update.message.type === 'attachment' &&
update.message.attachment.type === 'hook_stopped_continuation'
) {
shouldPreventContinuation = true
}
toolResults.push(
...normalizeMessagesForAPI(
[update.message],
toolUseContext.options.tools,
).filter(_ => _.type === 'user'),
)
}
if (update.newContext) {
updatedToolUseContext = {
...update.newContext,
queryTracking,
}
}
}
queryCheckpoint('query_tool_execution_end')
// Generate tool use summary after tool batch completes — passed to next recursive call
let nextPendingToolUseSummary:
| Promise<ToolUseSummaryMessage | null>
| undefined
if (
config.gates.emitToolUseSummaries &&
toolUseBlocks.length > 0 &&
!toolUseContext.abortController.signal.aborted &&
!toolUseContext.agentId // subagents don't surface in mobile UI — skip the Haiku call
) {
// Extract the last assistant text block for context
const lastAssistantMessage = assistantMessages.at(-1)
let lastAssistantText: string | undefined
if (lastAssistantMessage) {
const textBlocks = lastAssistantMessage.message.content.filter(
block => block.type === 'text',
)
if (textBlocks.length > 0) {
const lastTextBlock = textBlocks.at(-1)
if (lastTextBlock && 'text' in lastTextBlock) {
lastAssistantText = lastTextBlock.text
}
}
}
// Collect tool info for summary generation
const toolUseIds = toolUseBlocks.map(block => block.id)
const toolInfoForSummary = toolUseBlocks.map(block => {
// Find the corresponding tool result
const toolResult = toolResults.find(
result =>
result.type === 'user' &&
Array.isArray(result.message.content) &&
result.message.content.some(
content =>
content.type === 'tool_result' &&
content.tool_use_id === block.id,
),
)
const resultContent =
toolResult?.type === 'user' &&
Array.isArray(toolResult.message.content)
? toolResult.message.content.find(
(c): c is ToolResultBlockParam =>
c.type === 'tool_result' && c.tool_use_id === block.id,
)
: undefined
return {
name: block.name,
input: block.input,
output:
resultContent && 'content' in resultContent
? resultContent.content
: null,
}
})
// Fire off summary generation without blocking the next API call
nextPendingToolUseSummary = generateToolUseSummary({
tools: toolInfoForSummary,
signal: toolUseContext.abortController.signal,
isNonInteractiveSession: toolUseContext.options.isNonInteractiveSession,
lastAssistantText,
})
.then(summary => {
if (summary) {
return createToolUseSummaryMessage(summary, toolUseIds)
}
return null
})
.catch(() => null)
}
// We were aborted during tool calls
if (toolUseContext.abortController.signal.aborted) {
// chicago MCP: auto-unhide + lock release when aborted mid-tool-call.
// This is the most likely Ctrl+C path for CU (e.g. slow screenshot).
// Main thread only — see stopHooks.ts for the subagent rationale.
if (feature('CHICAGO_MCP') && !toolUseContext.agentId) {
try {
const { cleanupComputerUseAfterTurn } = await import(
'./utils/computerUse/cleanup.js'
)
await cleanupComputerUseAfterTurn(toolUseContext)
} catch {
// Failures are silent — this is dogfooding cleanup, not critical path
}
}
// Skip the interruption message for submit-interrupts — the queued
// user message that follows provides sufficient context.
if (toolUseContext.abortController.signal.reason !== 'interrupt') {
yield createUserInterruptionMessage({
toolUse: true,
})
}
// Check maxTurns before returning when aborted
const nextTurnCountOnAbort = turnCount + 1
if (maxTurns && nextTurnCountOnAbort > maxTurns) {
yield createAttachmentMessage({
type: 'max_turns_reached',
maxTurns,
turnCount: nextTurnCountOnAbort,
})
}
return { reason: 'aborted_tools' }
}
// If a hook indicated to prevent continuation, stop here
if (shouldPreventContinuation) {
return { reason: 'hook_stopped' }
}
if (tracking?.compacted) {
tracking.turnCounter++
logEvent('tengu_post_autocompact_turn', {
turnId:
tracking.turnId as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
turnCounter: tracking.turnCounter,
queryChainId: queryChainIdForAnalytics,
queryDepth: queryTracking.depth,
})
}
// Be careful to do this after tool calls are done, because the API
// will error if we interleave tool_result messages with regular user messages.
// Instrumentation: Track message count before attachments
logEvent('tengu_query_before_attachments', {
messagesForQueryCount: messagesForQuery.length,
assistantMessagesCount: assistantMessages.length,
toolResultsCount: toolResults.length,
queryChainId: queryChainIdForAnalytics,
queryDepth: queryTracking.depth,
})
// Get queued commands snapshot before processing attachments.
// These will be sent as attachments so Claude can respond to them in the current turn.
//
// Drain pending notifications. LocalShellTask completions are 'next'
// (when MONITOR_TOOL is on) and drain without Sleep. Other task types
// (agent/workflow/framework) still default to 'later' — the Sleep flush
// covers those. If all task types move to 'next', this branch could go.
//
// Slash commands are excluded from mid-turn drain — they must go through
// processSlashCommand after the turn ends (via useQueueProcessor), not be
// sent to the model as text. Bash-mode commands are already excluded by
// INLINE_NOTIFICATION_MODES in getQueuedCommandAttachments.
//
// Agent scoping: the queue is a process-global singleton shared by the
// coordinator and all in-process subagents. Each loop drains only what's
// addressed to it — main thread drains agentId===undefined, subagents
// drain their own agentId. User prompts (mode:'prompt') still go to main
// only; subagents never see the prompt stream.
// eslint-disable-next-line custom-rules/require-tool-match-name -- ToolUseBlock.name has no aliases
const sleepRan = toolUseBlocks.some(b => b.name === SLEEP_TOOL_NAME)
const isMainThread =
querySource.startsWith('repl_main_thread') || querySource === 'sdk'
const currentAgentId = toolUseContext.agentId
const queuedCommandsSnapshot = getCommandsByMaxPriority(
sleepRan ? 'later' : 'next',
).filter(cmd => {
if (isSlashCommand(cmd)) return false
if (isMainThread) return cmd.agentId === undefined
// Subagents only drain task-notifications addressed to them — never
// user prompts, even if someone stamps an agentId on one.
return cmd.mode === 'task-notification' && cmd.agentId === currentAgentId
})
for await (const attachment of getAttachmentMessages(
null,
updatedToolUseContext,
null,
queuedCommandsSnapshot,
[...messagesForQuery, ...assistantMessages, ...toolResults],
querySource,
)) {
yield attachment
toolResults.push(attachment)
}
// Memory prefetch consume: only if settled and not already consumed on
// an earlier iteration. If not settled yet, skip (zero-wait) and retry
// next iteration — the prefetch gets as many chances as there are loop
// iterations before the turn ends. readFileState (cumulative across
// iterations) filters out memories the model already Read/Wrote/Edited
// — including in earlier iterations, which the per-iteration
// toolUseBlocks array would miss.
if (
pendingMemoryPrefetch &&
pendingMemoryPrefetch.settledAt !== null &&
pendingMemoryPrefetch.consumedOnIteration === -1
) {
const memoryAttachments = filterDuplicateMemoryAttachments(
await pendingMemoryPrefetch.promise,
toolUseContext.readFileState,
)
for (const memAttachment of memoryAttachments) {
const msg = createAttachmentMessage(memAttachment)
yield msg
toolResults.push(msg)
}
pendingMemoryPrefetch.consumedOnIteration = turnCount - 1
}
// Inject prefetched skill discovery. collectSkillDiscoveryPrefetch emits
// hidden_by_main_turn — true when the prefetch resolved before this point
// (should be >98% at AKI@250ms / Haiku@573ms vs turn durations of 2-30s).
if (skillPrefetch && pendingSkillPrefetch) {
const skillAttachments =
await skillPrefetch.collectSkillDiscoveryPrefetch(pendingSkillPrefetch)
for (const att of skillAttachments) {
const msg = createAttachmentMessage(att)
yield msg
toolResults.push(msg)
}
}
// Remove only commands that were actually consumed as attachments.
// Prompt and task-notification commands are converted to attachments above.
const consumedCommands = queuedCommandsSnapshot.filter(
cmd => cmd.mode === 'prompt' || cmd.mode === 'task-notification',
)
if (consumedCommands.length > 0) {
for (const cmd of consumedCommands) {
if (cmd.uuid) {
consumedCommandUuids.push(cmd.uuid)
notifyCommandLifecycle(cmd.uuid, 'started')
}
}
removeFromQueue(consumedCommands)
}
// Instrumentation: Track file change attachments after they're added
const fileChangeAttachmentCount = count(
toolResults,
tr =>
tr.type === 'attachment' && tr.attachment.type === 'edited_text_file',
)
logEvent('tengu_query_after_attachments', {
totalToolResultsCount: toolResults.length,
fileChangeAttachmentCount,
queryChainId: queryChainIdForAnalytics,
queryDepth: queryTracking.depth,
})
// Refresh tools between turns so newly-connected MCP servers become available
if (updatedToolUseContext.options.refreshTools) {
const refreshedTools = updatedToolUseContext.options.refreshTools()
if (refreshedTools !== updatedToolUseContext.options.tools) {
updatedToolUseContext = {
...updatedToolUseContext,
options: {
...updatedToolUseContext.options,
tools: refreshedTools,
},
}
}
}
const toolUseContextWithQueryTracking = {
...updatedToolUseContext,
queryTracking,
}
// Each time we have tool results and are about to recurse, that's a turn
const nextTurnCount = turnCount + 1
// Periodic task summary for `claude ps` — fires mid-turn so a
// long-running agent still refreshes what it's working on. Gated
// only on !agentId so every top-level conversation (REPL, SDK, HFI,
// remote) generates summaries; subagents/forks don't.
if (feature('BG_SESSIONS')) {
if (
!toolUseContext.agentId &&
taskSummaryModule!.shouldGenerateTaskSummary()
) {
taskSummaryModule!.maybeGenerateTaskSummary({
systemPrompt,
userContext,
systemContext,
toolUseContext,
forkContextMessages: [
...messagesForQuery,
...assistantMessages,
...toolResults,
],
})
}
}
// Check if we've reached the max turns limit
if (maxTurns && nextTurnCount > maxTurns) {
yield createAttachmentMessage({
type: 'max_turns_reached',
maxTurns,
turnCount: nextTurnCount,
})
return { reason: 'max_turns', turnCount: nextTurnCount }
}
queryCheckpoint('query_recursive_call')
const next: State = {
messages: [...messagesForQuery, ...assistantMessages, ...toolResults],
toolUseContext: toolUseContextWithQueryTracking,
autoCompactTracking: tracking,
turnCount: nextTurnCount,
maxOutputTokensRecoveryCount: 0,
hasAttemptedReactiveCompact: false,
pendingToolUseSummary: nextPendingToolUseSummary,
maxOutputTokensOverride: undefined,
stopHookActive,
transition: { reason: 'next_turn' },
}
state = next
} // while (true)
}