π File detail
services/mcp/client.ts
π― Use case
This file lives under βservices/β, which covers long-lived services (LSP, MCP, OAuth, tool execution, memory, compaction, voice, settings sync, β¦). On the API surface it exposes McpAuthError, McpToolCallError_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, isMcpSessionExpiredError, clearMcpAuthCache, and createClaudeAiProxyFetch (and more) β mainly functions, hooks, or classes. Dependencies touch bun:bundle, @anthropic-ai, @modelcontextprotocol, and lodash-es. It composes internal code from bootstrap, commands, constants, state, and Tool (relative imports).
Generated from folder role, exports, dependency roots, and inline comments β not hand-reviewed for every path.
π§ Inline summary
import { feature } from 'bun:bundle' import type { Base64ImageSource, ContentBlockParam, MessageParam,
π€ Exports (heuristic)
McpAuthErrorMcpToolCallError_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHSisMcpSessionExpiredErrorclearMcpAuthCachecreateClaudeAiProxyFetchwrapFetchWithTimeoutgetMcpServerConnectionBatchSizegetServerCacheKeyconnectToServerclearServerCacheensureConnectedClientareMcpConfigsEqualmcpToolInputToAutoClassifierInputfetchToolsForClientfetchResourcesForClientfetchCommandsForClientcallIdeRpcreconnectMcpServerImplgetMcpToolsCommandsAndResourcesprefetchAllMcpResourcestransformResultContentMCPResultTypeTransformedMCPResultinferCompactSchematransformMCPResultprocessMCPResultcallMCPToolWithUrlElicitationRetrysetupSdkMcpClients
π External import roots
Package roots from from "β¦" (relative paths omitted).
bun:bundle@anthropic-ai@modelcontextprotocollodash-esp-mapsrcfspath
π₯οΈ Source preview
import { feature } from 'bun:bundle'
import type {
Base64ImageSource,
ContentBlockParam,
MessageParam,
} from '@anthropic-ai/sdk/resources/index.mjs'
import { Client } from '@modelcontextprotocol/sdk/client/index.js'
import {
SSEClientTransport,
type SSEClientTransportOptions,
} from '@modelcontextprotocol/sdk/client/sse.js'
import { StdioClientTransport } from '@modelcontextprotocol/sdk/client/stdio.js'
import {
StreamableHTTPClientTransport,
type StreamableHTTPClientTransportOptions,
} from '@modelcontextprotocol/sdk/client/streamableHttp.js'
import {
createFetchWithInit,
type FetchLike,
type Transport,
} from '@modelcontextprotocol/sdk/shared/transport.js'
import {
CallToolResultSchema,
ElicitRequestSchema,
type ElicitRequestURLParams,
type ElicitResult,
ErrorCode,
type JSONRPCMessage,
type ListPromptsResult,
ListPromptsResultSchema,
ListResourcesResultSchema,
ListRootsRequestSchema,
type ListToolsResult,
ListToolsResultSchema,
McpError,
type PromptMessage,
type ResourceLink,
} from '@modelcontextprotocol/sdk/types.js'
import mapValues from 'lodash-es/mapValues.js'
import memoize from 'lodash-es/memoize.js'
import zipObject from 'lodash-es/zipObject.js'
import pMap from 'p-map'
import { getOriginalCwd, getSessionId } from '../../bootstrap/state.js'
import type { Command } from '../../commands.js'
import { getOauthConfig } from '../../constants/oauth.js'
import { PRODUCT_URL } from '../../constants/product.js'
import type { AppState } from '../../state/AppState.js'
import {
type Tool,
type ToolCallProgress,
toolMatchesName,
} from '../../Tool.js'
import { ListMcpResourcesTool } from '../../tools/ListMcpResourcesTool/ListMcpResourcesTool.js'
import { type MCPProgress, MCPTool } from '../../tools/MCPTool/MCPTool.js'
import { createMcpAuthTool } from '../../tools/McpAuthTool/McpAuthTool.js'
import { ReadMcpResourceTool } from '../../tools/ReadMcpResourceTool/ReadMcpResourceTool.js'
import { createAbortController } from '../../utils/abortController.js'
import { count } from '../../utils/array.js'
import {
checkAndRefreshOAuthTokenIfNeeded,
getClaudeAIOAuthTokens,
handleOAuth401Error,
} from '../../utils/auth.js'
import { registerCleanup } from '../../utils/cleanupRegistry.js'
import { detectCodeIndexingFromMcpServerName } from '../../utils/codeIndexing.js'
import { logForDebugging } from '../../utils/debug.js'
import { isEnvDefinedFalsy, isEnvTruthy } from '../../utils/envUtils.js'
import {
errorMessage,
TelemetrySafeError_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
} from '../../utils/errors.js'
import { getMCPUserAgent } from '../../utils/http.js'
import { maybeNotifyIDEConnected } from '../../utils/ide.js'
import { maybeResizeAndDownsampleImageBuffer } from '../../utils/imageResizer.js'
import { logMCPDebug, logMCPError } from '../../utils/log.js'
import {
getBinaryBlobSavedMessage,
getFormatDescription,
getLargeOutputInstructions,
persistBinaryContent,
} from '../../utils/mcpOutputStorage.js'
import {
getContentSizeEstimate,
type MCPToolResult,
mcpContentNeedsTruncation,
truncateMcpContentIfNeeded,
} from '../../utils/mcpValidation.js'
import { WebSocketTransport } from '../../utils/mcpWebSocketTransport.js'
import { memoizeWithLRU } from '../../utils/memoize.js'
import { getWebSocketTLSOptions } from '../../utils/mtls.js'
import {
getProxyFetchOptions,
getWebSocketProxyAgent,
getWebSocketProxyUrl,
} from '../../utils/proxy.js'
import { recursivelySanitizeUnicode } from '../../utils/sanitization.js'
import { getSessionIngressAuthToken } from '../../utils/sessionIngressAuth.js'
import { subprocessEnv } from '../../utils/subprocessEnv.js'
import {
isPersistError,
persistToolResult,
} from '../../utils/toolResultStorage.js'
import {
type AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
logEvent,
} from '../analytics/index.js'
import {
type ElicitationWaitingState,
runElicitationHooks,
runElicitationResultHooks,
} from './elicitationHandler.js'
import { buildMcpToolName } from './mcpStringUtils.js'
import { normalizeNameForMCP } from './normalization.js'
import { getLoggingSafeMcpBaseUrl } from './utils.js'
/* eslint-disable @typescript-eslint/no-require-imports */
const fetchMcpSkillsForClient = feature('MCP_SKILLS')
? (
require('../../skills/mcpSkills.js') as typeof import('../../skills/mcpSkills.js')
).fetchMcpSkillsForClient
: null
import { UnauthorizedError } from '@modelcontextprotocol/sdk/client/auth.js'
import type { AssistantMessage } from 'src/types/message.js'
/* eslint-enable @typescript-eslint/no-require-imports */
import { classifyMcpToolForCollapse } from '../../tools/MCPTool/classifyForCollapse.js'
import { clearKeychainCache } from '../../utils/secureStorage/macOsKeychainHelpers.js'
import { sleep } from '../../utils/sleep.js'
import {
ClaudeAuthProvider,
hasMcpDiscoveryButNoToken,
wrapFetchWithStepUpDetection,
} from './auth.js'
import { markClaudeAiMcpConnected } from './claudeai.js'
import { getAllMcpConfigs, isMcpServerDisabled } from './config.js'
import { getMcpServerHeaders } from './headersHelper.js'
import { SdkControlClientTransport } from './SdkControlTransport.js'
import type {
ConnectedMCPServer,
MCPServerConnection,
McpSdkServerConfig,
ScopedMcpServerConfig,
ServerResource,
} from './types.js'
/**
* Custom error class to indicate that an MCP tool call failed due to
* authentication issues (e.g., expired OAuth token returning 401).
* This error should be caught at the tool execution layer to update
* the client's status to 'needs-auth'.
*/
export class McpAuthError extends Error {
serverName: string
constructor(serverName: string, message: string) {
super(message)
this.name = 'McpAuthError'
this.serverName = serverName
}
}
/**
* Thrown when an MCP session has expired and the connection cache has been cleared.
* The caller should get a fresh client via ensureConnectedClient and retry.
*/
class McpSessionExpiredError extends Error {
constructor(serverName: string) {
super(`MCP server "${serverName}" session expired`)
this.name = 'McpSessionExpiredError'
}
}
/**
* Thrown when an MCP tool returns `isError: true`. Carries the result's `_meta`
* so SDK consumers can still receive it β per the MCP spec, `_meta` is on the
* base Result type and is valid on error results.
*/
export class McpToolCallError_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS extends TelemetrySafeError_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS {
constructor(
message: string,
telemetryMessage: string,
readonly mcpMeta?: { _meta?: Record<string, unknown> },
) {
super(message, telemetryMessage)
this.name = 'McpToolCallError'
}
}
/**
* Detects whether an error is an MCP "Session not found" error (HTTP 404 + JSON-RPC code -32001).
* Per the MCP spec, servers return 404 when a session ID is no longer valid.
* We check both signals to avoid false positives from generic 404s (wrong URL, server gone, etc.).
*/
export function isMcpSessionExpiredError(error: Error): boolean {
const httpStatus =
'code' in error ? (error as Error & { code?: number }).code : undefined
if (httpStatus !== 404) {
return false
}
// The SDK embeds the response body text in the error message.
// MCP servers return: {"error":{"code":-32001,"message":"Session not found"},...}
// Check for the JSON-RPC error code to distinguish from generic web server 404s.
return (
error.message.includes('"code":-32001') ||
error.message.includes('"code": -32001')
)
}
/**
* Default timeout for MCP tool calls (effectively infinite - ~27.8 hours).
*/
const DEFAULT_MCP_TOOL_TIMEOUT_MS = 100_000_000
/**
* Cap on MCP tool descriptions and server instructions sent to the model.
* OpenAPI-generated MCP servers have been observed dumping 15-60KB of endpoint
* docs into tool.description; this caps the p95 tail without losing the intent.
*/
const MAX_MCP_DESCRIPTION_LENGTH = 2048
/**
* Gets the timeout for MCP tool calls in milliseconds.
* Uses MCP_TOOL_TIMEOUT environment variable if set, otherwise defaults to ~27.8 hours.
*/
function getMcpToolTimeoutMs(): number {
return (
parseInt(process.env.MCP_TOOL_TIMEOUT || '', 10) ||
DEFAULT_MCP_TOOL_TIMEOUT_MS
)
}
import { isClaudeInChromeMCPServer } from '../../utils/claudeInChrome/common.js'
// Lazy: toolRendering.tsx pulls React/ink; only needed when Claude-in-Chrome MCP server is connected
/* eslint-disable @typescript-eslint/no-require-imports */
const claudeInChromeToolRendering =
(): typeof import('../../utils/claudeInChrome/toolRendering.js') =>
require('../../utils/claudeInChrome/toolRendering.js')
// Lazy: wrapper.tsx β hostAdapter.ts β executor.ts pulls both native modules
// (@ant/computer-use-input + @ant/computer-use-swift). Runtime-gated by
// GrowthBook tengu_malort_pedway (see gates.ts).
const computerUseWrapper = feature('CHICAGO_MCP')
? (): typeof import('../../utils/computerUse/wrapper.js') =>
require('../../utils/computerUse/wrapper.js')
: undefined
const isComputerUseMCPServer = feature('CHICAGO_MCP')
? (
require('../../utils/computerUse/common.js') as typeof import('../../utils/computerUse/common.js')
).isComputerUseMCPServer
: undefined
import { mkdir, readFile, unlink, writeFile } from 'fs/promises'
import { dirname, join } from 'path'
import { getClaudeConfigHomeDir } from '../../utils/envUtils.js'
/* eslint-enable @typescript-eslint/no-require-imports */
import { jsonParse, jsonStringify } from '../../utils/slowOperations.js'
const MCP_AUTH_CACHE_TTL_MS = 15 * 60 * 1000 // 15 min
type McpAuthCacheData = Record<string, { timestamp: number }>
function getMcpAuthCachePath(): string {
return join(getClaudeConfigHomeDir(), 'mcp-needs-auth-cache.json')
}
// Memoized so N concurrent isMcpAuthCached() calls during batched connection
// share a single file read instead of N reads of the same file. Invalidated
// on write (setMcpAuthCacheEntry) and clear (clearMcpAuthCache). Not using
// lodash memoize because we need to null out the cache, not delete by key.
let authCachePromise: Promise<McpAuthCacheData> | null = null
function getMcpAuthCache(): Promise<McpAuthCacheData> {
if (!authCachePromise) {
authCachePromise = readFile(getMcpAuthCachePath(), 'utf-8')
.then(data => jsonParse(data) as McpAuthCacheData)
.catch(() => ({}))
}
return authCachePromise
}
async function isMcpAuthCached(serverId: string): Promise<boolean> {
const cache = await getMcpAuthCache()
const entry = cache[serverId]
if (!entry) {
return false
}
return Date.now() - entry.timestamp < MCP_AUTH_CACHE_TTL_MS
}
// Serialize cache writes through a promise chain to prevent concurrent
// read-modify-write races when multiple servers return 401 in the same batch
let writeChain = Promise.resolve()
function setMcpAuthCacheEntry(serverId: string): void {
writeChain = writeChain
.then(async () => {
const cache = await getMcpAuthCache()
cache[serverId] = { timestamp: Date.now() }
const cachePath = getMcpAuthCachePath()
await mkdir(dirname(cachePath), { recursive: true })
await writeFile(cachePath, jsonStringify(cache))
// Invalidate the read cache so subsequent reads see the new entry.
// Safe because writeChain serializes writes: the next write's
// getMcpAuthCache() call will re-read the file with this entry present.
authCachePromise = null
})
.catch(() => {
// Best-effort cache write
})
}
export function clearMcpAuthCache(): void {
authCachePromise = null
void unlink(getMcpAuthCachePath()).catch(() => {
// Cache file may not exist
})
}
/**
* Spread-ready analytics field for the server's base URL. Calls
* getLoggingSafeMcpBaseUrl once (not twice like the inline ternary it replaces).
* Typed as AnalyticsMetadata since the URL is query-stripped and safe to log.
*/
function mcpBaseUrlAnalytics(serverRef: ScopedMcpServerConfig): {
mcpServerBaseUrl?: AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS
} {
const url = getLoggingSafeMcpBaseUrl(serverRef)
return url
? {
mcpServerBaseUrl:
url as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
}
: {}
}
/**
* Shared handler for sse/http/claudeai-proxy auth failures during connect:
* emits tengu_mcp_server_needs_auth, caches the needs-auth entry, and returns
* the needs-auth connection result.
*/
function handleRemoteAuthFailure(
name: string,
serverRef: ScopedMcpServerConfig,
transportType: 'sse' | 'http' | 'claudeai-proxy',
): MCPServerConnection {
logEvent('tengu_mcp_server_needs_auth', {
transportType:
transportType as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
...mcpBaseUrlAnalytics(serverRef),
})
const label: Record<typeof transportType, string> = {
sse: 'SSE',
http: 'HTTP',
'claudeai-proxy': 'claude.ai proxy',
}
logMCPDebug(
name,
`Authentication required for ${label[transportType]} server`,
)
setMcpAuthCacheEntry(name)
return { name, type: 'needs-auth', config: serverRef }
}
/**
* Fetch wrapper for claude.ai proxy connections. Attaches the OAuth bearer
* token and retries once on 401 via handleOAuth401Error (force-refresh).
*
* The Anthropic API path has this retry (withRetry.ts, grove.ts) to handle
* memoize-cache staleness and clock drift. Without the same here, a single
* stale token mass-401s every claude.ai connector and sticks them all in the
* 15-min needs-auth cache.
*/
export function createClaudeAiProxyFetch(innerFetch: FetchLike): FetchLike {
return async (url, init) => {
const doRequest = async () => {
await checkAndRefreshOAuthTokenIfNeeded()
const currentTokens = getClaudeAIOAuthTokens()
if (!currentTokens) {
throw new Error('No claude.ai OAuth token available')
}
// eslint-disable-next-line eslint-plugin-n/no-unsupported-features/node-builtins
const headers = new Headers(init?.headers)
headers.set('Authorization', `Bearer ${currentTokens.accessToken}`)
const response = await innerFetch(url, { ...init, headers })
// Return the exact token that was sent. Reading getClaudeAIOAuthTokens()
// again after the request is wrong under concurrent 401s: another
// connector's handleOAuth401Error clears the memoize cache, so we'd read
// the NEW token from keychain, pass it to handleOAuth401Error, which
// finds same-as-keychain β returns false β skips retry. Same pattern as
// bridgeApi.ts withOAuthRetry (token passed as fn param).
return { response, sentToken: currentTokens.accessToken }
}
const { response, sentToken } = await doRequest()
if (response.status !== 401) {
return response
}
// handleOAuth401Error returns true only if the token actually changed
// (keychain had a newer one, or force-refresh succeeded). Gate retry on
// that β otherwise we double round-trip time for every connector whose
// downstream service genuinely needs auth (the common case: 30+ servers
// with "MCP server requires authentication but no OAuth token configured").
const tokenChanged = await handleOAuth401Error(sentToken).catch(() => false)
logEvent('tengu_mcp_claudeai_proxy_401', {
tokenChanged:
tokenChanged as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
})
if (!tokenChanged) {
// ELOCKED contention: another connector may have won the lockfile and refreshed β check if token changed underneath us
const now = getClaudeAIOAuthTokens()?.accessToken
if (!now || now === sentToken) {
return response
}
}
try {
return (await doRequest()).response
} catch {
// Retry itself failed (network error). Return the original 401 so the
// outer handler can classify it.
return response
}
}
}
// Minimal interface for WebSocket instances passed to mcpWebSocketTransport
type WsClientLike = {
readonly readyState: number
close(): void
send(data: string): void
}
/**
* Create a ws.WebSocket client with the MCP protocol.
* Bun's ws shim types lack the 3-arg constructor (url, protocols, options)
* that the real ws package supports, so we cast the constructor here.
*/
async function createNodeWsClient(
url: string,
options: Record<string, unknown>,
): Promise<WsClientLike> {
const wsModule = await import('ws')
const WS = wsModule.default as unknown as new (
url: string,
protocols: string[],
options: Record<string, unknown>,
) => WsClientLike
return new WS(url, ['mcp'], options)
}
const IMAGE_MIME_TYPES = new Set([
'image/jpeg',
'image/png',
'image/gif',
'image/webp',
])
function getConnectionTimeoutMs(): number {
return parseInt(process.env.MCP_TIMEOUT || '', 10) || 30000
}
/**
* Default timeout for individual MCP requests (auth, tool calls, etc.)
*/
const MCP_REQUEST_TIMEOUT_MS = 60000
/**
* MCP Streamable HTTP spec requires clients to advertise acceptance of both
* JSON and SSE on every POST. Servers that enforce this strictly reject
* requests without it (HTTP 406).
* https://modelcontextprotocol.io/specification/2025-03-26/basic/transports#sending-messages-to-the-server
*/
const MCP_STREAMABLE_HTTP_ACCEPT = 'application/json, text/event-stream'
/**
* Wraps a fetch function to apply a fresh timeout signal to each request.
* This avoids the bug where a single AbortSignal.timeout() created at connection
* time becomes stale after 60 seconds, causing all subsequent requests to fail
* immediately with "The operation timed out." Uses a 60-second timeout.
*
* Also ensures the Accept header required by the MCP Streamable HTTP spec is
* present on POSTs. The MCP SDK sets this inside StreamableHTTPClientTransport.send(),
* but it is attached to a Headers instance that passes through an object spread here,
* and some runtimes/agents have been observed dropping it before it reaches the wire.
* See https://github.com/anthropics/claude-agent-sdk-typescript/issues/202.
* Normalizing here (the last wrapper before fetch()) guarantees it is sent.
*
* GET requests are excluded from the timeout since, for MCP transports, they are
* long-lived SSE streams meant to stay open indefinitely. (Auth-related GETs use
* a separate fetch wrapper with its own timeout in auth.ts.)
*
* @param baseFetch - The fetch function to wrap
*/
export function wrapFetchWithTimeout(baseFetch: FetchLike): FetchLike {
return async (url: string | URL, init?: RequestInit) => {
const method = (init?.method ?? 'GET').toUpperCase()
// Skip timeout for GET requests - in MCP transports, these are long-lived SSE streams.
// (OAuth discovery GETs in auth.ts use a separate createAuthFetch() with its own timeout.)
if (method === 'GET') {
return baseFetch(url, init)
}
// Normalize headers and guarantee the Streamable-HTTP Accept value. new Headers()
// accepts HeadersInit | undefined and copies from plain objects, tuple arrays,
// and existing Headers instances β so whatever shape the SDK handed us, the
// Accept value survives the spread below as an own property of a concrete object.
// eslint-disable-next-line eslint-plugin-n/no-unsupported-features/node-builtins
const headers = new Headers(init?.headers)
if (!headers.has('accept')) {
headers.set('accept', MCP_STREAMABLE_HTTP_ACCEPT)
}
// Use setTimeout instead of AbortSignal.timeout() so we can clearTimeout on
// completion. AbortSignal.timeout's internal timer is only released when the
// signal is GC'd, which in Bun is lazy β ~2.4KB of native memory per request
// lingers for the full 60s even when the request completes in milliseconds.
const controller = new AbortController()
const timer = setTimeout(
c =>
c.abort(new DOMException('The operation timed out.', 'TimeoutError')),
MCP_REQUEST_TIMEOUT_MS,
controller,
)
timer.unref?.()
const parentSignal = init?.signal
const abort = () => controller.abort(parentSignal?.reason)
parentSignal?.addEventListener('abort', abort)
if (parentSignal?.aborted) {
controller.abort(parentSignal.reason)
}
const cleanup = () => {
clearTimeout(timer)
parentSignal?.removeEventListener('abort', abort)
}
try {
const response = await baseFetch(url, {
...init,
headers,
signal: controller.signal,
})
cleanup()
return response
} catch (error) {
cleanup()
throw error
}
}
}
export function getMcpServerConnectionBatchSize(): number {
return parseInt(process.env.MCP_SERVER_CONNECTION_BATCH_SIZE || '', 10) || 3
}
function getRemoteMcpServerConnectionBatchSize(): number {
return (
parseInt(process.env.MCP_REMOTE_SERVER_CONNECTION_BATCH_SIZE || '', 10) ||
20
)
}
function isLocalMcpServer(config: ScopedMcpServerConfig): boolean {
return !config.type || config.type === 'stdio' || config.type === 'sdk'
}
// For the IDE MCP servers, we only include specific tools
const ALLOWED_IDE_TOOLS = ['mcp__ide__executeCode', 'mcp__ide__getDiagnostics']
function isIncludedMcpTool(tool: Tool): boolean {
return (
!tool.name.startsWith('mcp__ide__') || ALLOWED_IDE_TOOLS.includes(tool.name)
)
}
/**
* Generates the cache key for a server connection
* @param name Server name
* @param serverRef Server configuration
* @returns Cache key string
*/
export function getServerCacheKey(
name: string,
serverRef: ScopedMcpServerConfig,
): string {
return `${name}-${jsonStringify(serverRef)}`
}
/**
* TODO (ollie): The memoization here increases complexity by a lot, and im not sure it really improves performance
* Attempts to connect to a single MCP server
* @param name Server name
* @param serverRef Scoped server configuration
* @returns A wrapped client (either connected or failed)
*/
export const connectToServer = memoize(
async (
name: string,
serverRef: ScopedMcpServerConfig,
serverStats?: {
totalServers: number
stdioCount: number
sseCount: number
httpCount: number
sseIdeCount: number
wsIdeCount: number
},
): Promise<MCPServerConnection> => {
const connectStartTime = Date.now()
let inProcessServer:
| { connect(t: Transport): Promise<void>; close(): Promise<void> }
| undefined
try {
let transport
// If we have the session ingress JWT, we will connect via the session ingress rather than
// to remote MCP's directly.
const sessionIngressToken = getSessionIngressAuthToken()
if (serverRef.type === 'sse') {
// Create an auth provider for this server
const authProvider = new ClaudeAuthProvider(name, serverRef)
// Get combined headers (static + dynamic)
const combinedHeaders = await getMcpServerHeaders(name, serverRef)
// Use the auth provider with SSEClientTransport
const transportOptions: SSEClientTransportOptions = {
authProvider,
// Use fresh timeout per request to avoid stale AbortSignal bug.
// Step-up detection wraps innermost so the 403 is seen before the
// SDK's handler calls auth() β tokens().
fetch: wrapFetchWithTimeout(
wrapFetchWithStepUpDetection(createFetchWithInit(), authProvider),
),
requestInit: {
headers: {
'User-Agent': getMCPUserAgent(),
...combinedHeaders,
},
},
}
// IMPORTANT: Always set eventSourceInit with a fetch that does NOT use the
// timeout wrapper. The EventSource connection is long-lived (stays open indefinitely
// to receive server-sent events), so applying a 60-second timeout would kill it.
// The timeout is only meant for individual API requests (POST, auth refresh), not
// the persistent SSE stream.
transportOptions.eventSourceInit = {
fetch: async (url: string | URL, init?: RequestInit) => {
// Get auth headers from the auth provider
const authHeaders: Record<string, string> = {}
const tokens = await authProvider.tokens()
if (tokens) {
authHeaders.Authorization = `Bearer ${tokens.access_token}`
}
const proxyOptions = getProxyFetchOptions()
// eslint-disable-next-line eslint-plugin-n/no-unsupported-features/node-builtins
return fetch(url, {
...init,
...proxyOptions,
headers: {
'User-Agent': getMCPUserAgent(),
...authHeaders,
...init?.headers,
...combinedHeaders,
Accept: 'text/event-stream',
},
})
},
}
transport = new SSEClientTransport(
new URL(serverRef.url),
transportOptions,
)
logMCPDebug(name, `SSE transport initialized, awaiting connection`)
} else if (serverRef.type === 'sse-ide') {
logMCPDebug(name, `Setting up SSE-IDE transport to ${serverRef.url}`)
// IDE servers don't need authentication
// TODO: Use the auth token provided in the lockfile
const proxyOptions = getProxyFetchOptions()
const transportOptions: SSEClientTransportOptions =
proxyOptions.dispatcher
? {
eventSourceInit: {
fetch: async (url: string | URL, init?: RequestInit) => {
// eslint-disable-next-line eslint-plugin-n/no-unsupported-features/node-builtins
return fetch(url, {
...init,
...proxyOptions,
headers: {
'User-Agent': getMCPUserAgent(),
...init?.headers,
},
})
},
},
}
: {}
transport = new SSEClientTransport(
new URL(serverRef.url),
Object.keys(transportOptions).length > 0
? transportOptions
: undefined,
)
} else if (serverRef.type === 'ws-ide') {
const tlsOptions = getWebSocketTLSOptions()
const wsHeaders = {
'User-Agent': getMCPUserAgent(),
...(serverRef.authToken && {
'X-Claude-Code-Ide-Authorization': serverRef.authToken,
}),
}
let wsClient: WsClientLike
if (typeof Bun !== 'undefined') {
// Bun's WebSocket supports headers/proxy/tls options but the DOM typings don't
// eslint-disable-next-line eslint-plugin-n/no-unsupported-features/node-builtins
wsClient = new globalThis.WebSocket(serverRef.url, {
protocols: ['mcp'],
headers: wsHeaders,
proxy: getWebSocketProxyUrl(serverRef.url),
tls: tlsOptions || undefined,
} as unknown as string[])
} else {
wsClient = await createNodeWsClient(serverRef.url, {
headers: wsHeaders,
agent: getWebSocketProxyAgent(serverRef.url),
...(tlsOptions || {}),
})
}
transport = new WebSocketTransport(wsClient)
} else if (serverRef.type === 'ws') {
logMCPDebug(
name,
`Initializing WebSocket transport to ${serverRef.url}`,
)
const combinedHeaders = await getMcpServerHeaders(name, serverRef)
const tlsOptions = getWebSocketTLSOptions()
const wsHeaders = {
'User-Agent': getMCPUserAgent(),
...(sessionIngressToken && {
Authorization: `Bearer ${sessionIngressToken}`,
}),
...combinedHeaders,
}
// Redact sensitive headers before logging
const wsHeadersForLogging = mapValues(wsHeaders, (value, key) =>
key.toLowerCase() === 'authorization' ? '[REDACTED]' : value,
)
logMCPDebug(
name,
`WebSocket transport options: ${jsonStringify({
url: serverRef.url,
headers: wsHeadersForLogging,
hasSessionAuth: !!sessionIngressToken,
})}`,
)
let wsClient: WsClientLike
if (typeof Bun !== 'undefined') {
// Bun's WebSocket supports headers/proxy/tls options but the DOM typings don't
// eslint-disable-next-line eslint-plugin-n/no-unsupported-features/node-builtins
wsClient = new globalThis.WebSocket(serverRef.url, {
protocols: ['mcp'],
headers: wsHeaders,
proxy: getWebSocketProxyUrl(serverRef.url),
tls: tlsOptions || undefined,
} as unknown as string[])
} else {
wsClient = await createNodeWsClient(serverRef.url, {
headers: wsHeaders,
agent: getWebSocketProxyAgent(serverRef.url),
...(tlsOptions || {}),
})
}
transport = new WebSocketTransport(wsClient)
} else if (serverRef.type === 'http') {
logMCPDebug(name, `Initializing HTTP transport to ${serverRef.url}`)
logMCPDebug(
name,
`Node version: ${process.version}, Platform: ${process.platform}`,
)
logMCPDebug(
name,
`Environment: ${jsonStringify({
NODE_OPTIONS: process.env.NODE_OPTIONS || 'not set',
UV_THREADPOOL_SIZE: process.env.UV_THREADPOOL_SIZE || 'default',
HTTP_PROXY: process.env.HTTP_PROXY || 'not set',
HTTPS_PROXY: process.env.HTTPS_PROXY || 'not set',
NO_PROXY: process.env.NO_PROXY || 'not set',
})}`,
)
// Create an auth provider for this server
const authProvider = new ClaudeAuthProvider(name, serverRef)
// Get combined headers (static + dynamic)
const combinedHeaders = await getMcpServerHeaders(name, serverRef)
// Check if this server has stored OAuth tokens. If so, the SDK's
// authProvider will set Authorization β don't override with the
// session ingress token (SDK merges requestInit AFTER authProvider).
// CCR proxy URLs (ccr_shttp_mcp) have no stored OAuth, so they still
// get the ingress token. See PR #24454 discussion.
const hasOAuthTokens = !!(await authProvider.tokens())
// Use the auth provider with StreamableHTTPClientTransport
const proxyOptions = getProxyFetchOptions()
logMCPDebug(
name,
`Proxy options: ${proxyOptions.dispatcher ? 'custom dispatcher' : 'default'}`,
)
const transportOptions: StreamableHTTPClientTransportOptions = {
authProvider,
// Use fresh timeout per request to avoid stale AbortSignal bug.
// Step-up detection wraps innermost so the 403 is seen before the
// SDK's handler calls auth() β tokens().
fetch: wrapFetchWithTimeout(
wrapFetchWithStepUpDetection(createFetchWithInit(), authProvider),
),
requestInit: {
...proxyOptions,
headers: {
'User-Agent': getMCPUserAgent(),
...(sessionIngressToken &&
!hasOAuthTokens && {
Authorization: `Bearer ${sessionIngressToken}`,
}),
...combinedHeaders,
},
},
}
// Redact sensitive headers before logging
const headersForLogging = transportOptions.requestInit?.headers
? mapValues(
transportOptions.requestInit.headers as Record<string, string>,
(value, key) =>
key.toLowerCase() === 'authorization' ? '[REDACTED]' : value,
)
: undefined
logMCPDebug(
name,
`HTTP transport options: ${jsonStringify({
url: serverRef.url,
headers: headersForLogging,
hasAuthProvider: !!authProvider,
timeoutMs: MCP_REQUEST_TIMEOUT_MS,
})}`,
)
transport = new StreamableHTTPClientTransport(
new URL(serverRef.url),
transportOptions,
)
logMCPDebug(name, `HTTP transport created successfully`)
} else if (serverRef.type === 'sdk') {
throw new Error('SDK servers should be handled in print.ts')
} else if (serverRef.type === 'claudeai-proxy') {
logMCPDebug(
name,
`Initializing claude.ai proxy transport for server ${serverRef.id}`,
)
const tokens = getClaudeAIOAuthTokens()
if (!tokens) {
throw new Error('No claude.ai OAuth token found')
}
const oauthConfig = getOauthConfig()
const proxyUrl = `${oauthConfig.MCP_PROXY_URL}${oauthConfig.MCP_PROXY_PATH.replace('{server_id}', serverRef.id)}`
logMCPDebug(name, `Using claude.ai proxy at ${proxyUrl}`)
// eslint-disable-next-line eslint-plugin-n/no-unsupported-features/node-builtins
const fetchWithAuth = createClaudeAiProxyFetch(globalThis.fetch)
const proxyOptions = getProxyFetchOptions()
const transportOptions: StreamableHTTPClientTransportOptions = {
// Wrap fetchWithAuth with fresh timeout per request
fetch: wrapFetchWithTimeout(fetchWithAuth),
requestInit: {
...proxyOptions,
headers: {
'User-Agent': getMCPUserAgent(),
'X-Mcp-Client-Session-Id': getSessionId(),
},
},
}
transport = new StreamableHTTPClientTransport(
new URL(proxyUrl),
transportOptions,
)
logMCPDebug(name, `claude.ai proxy transport created successfully`)
} else if (
(serverRef.type === 'stdio' || !serverRef.type) &&
isClaudeInChromeMCPServer(name)
) {
// Run the Chrome MCP server in-process to avoid spawning a ~325 MB subprocess
const { createChromeContext } = await import(
'../../utils/claudeInChrome/mcpServer.js'
)
const { createClaudeForChromeMcpServer } = await import(
'@ant/claude-for-chrome-mcp'
)
const { createLinkedTransportPair } = await import(
'./InProcessTransport.js'
)
const context = createChromeContext(serverRef.env)
inProcessServer = createClaudeForChromeMcpServer(context)
const [clientTransport, serverTransport] = createLinkedTransportPair()
await inProcessServer.connect(serverTransport)
transport = clientTransport
logMCPDebug(name, `In-process Chrome MCP server started`)
} else if (
feature('CHICAGO_MCP') &&
(serverRef.type === 'stdio' || !serverRef.type) &&
isComputerUseMCPServer!(name)
) {
// Run the Computer Use MCP server in-process β same rationale as
// Chrome above. The package's CallTool handler is a stub; real
// dispatch goes through wrapper.tsx's .call() override.
const { createComputerUseMcpServerForCli } = await import(
'../../utils/computerUse/mcpServer.js'
)
const { createLinkedTransportPair } = await import(
'./InProcessTransport.js'
)
inProcessServer = await createComputerUseMcpServerForCli()
const [clientTransport, serverTransport] = createLinkedTransportPair()
await inProcessServer.connect(serverTransport)
transport = clientTransport
logMCPDebug(name, `In-process Computer Use MCP server started`)
} else if (serverRef.type === 'stdio' || !serverRef.type) {
const finalCommand =
process.env.CLAUDE_CODE_SHELL_PREFIX || serverRef.command
const finalArgs = process.env.CLAUDE_CODE_SHELL_PREFIX
? [[serverRef.command, ...serverRef.args].join(' ')]
: serverRef.args
transport = new StdioClientTransport({
command: finalCommand,
args: finalArgs,
env: {
...subprocessEnv(),
...serverRef.env,
} as Record<string, string>,
stderr: 'pipe', // prevents error output from the MCP server from printing to the UI
})
} else {
throw new Error(`Unsupported server type: ${serverRef.type}`)
}
// Set up stderr logging for stdio transport before connecting in case there are any stderr
// outputs emitted during the connection start (this can be useful for debugging failed connections).
// Store handler reference for cleanup to prevent memory leaks
let stderrHandler: ((data: Buffer) => void) | undefined
let stderrOutput = ''
if (serverRef.type === 'stdio' || !serverRef.type) {
const stdioTransport = transport as StdioClientTransport
if (stdioTransport.stderr) {
stderrHandler = (data: Buffer) => {
// Cap stderr accumulation to prevent unbounded memory growth
if (stderrOutput.length < 64 * 1024 * 1024) {
try {
stderrOutput += data.toString()
} catch {
// Ignore errors from exceeding max string length
}
}
}
stdioTransport.stderr.on('data', stderrHandler)
}
}
const client = new Client(
{
name: 'claude-code',
title: 'Claude Code',
version: MACRO.VERSION ?? 'unknown',
description: "Anthropic's agentic coding tool",
websiteUrl: PRODUCT_URL,
},
{
capabilities: {
roots: {},
// Empty object declares the capability. Sending {form:{},url:{}}
// breaks Java MCP SDK servers (Spring AI) whose Elicitation class
// has zero fields and fails on unknown properties.
elicitation: {},
},
},
)
// Add debug logging for client events if available
if (serverRef.type === 'http') {
logMCPDebug(name, `Client created, setting up request handler`)
}
client.setRequestHandler(ListRootsRequestSchema, async () => {
logMCPDebug(name, `Received ListRoots request from server`)
return {
roots: [
{
uri: `file://${getOriginalCwd()}`,
},
],
}
})
// Add a timeout to connection attempts to prevent tests from hanging indefinitely
logMCPDebug(
name,
`Starting connection with timeout of ${getConnectionTimeoutMs()}ms`,
)
// For HTTP transport, try a basic connectivity test first
if (serverRef.type === 'http') {
logMCPDebug(name, `Testing basic HTTP connectivity to ${serverRef.url}`)
try {
const testUrl = new URL(serverRef.url)
logMCPDebug(
name,
`Parsed URL: host=${testUrl.hostname}, port=${testUrl.port || 'default'}, protocol=${testUrl.protocol}`,
)
// Log DNS resolution attempt
if (
testUrl.hostname === '127.0.0.1' ||
testUrl.hostname === 'localhost'
) {
logMCPDebug(name, `Using loopback address: ${testUrl.hostname}`)
}
} catch (urlError) {
logMCPDebug(name, `Failed to parse URL: ${urlError}`)
}
}
const connectPromise = client.connect(transport)
const timeoutPromise = new Promise<never>((_, reject) => {
const timeoutId = setTimeout(() => {
const elapsed = Date.now() - connectStartTime
logMCPDebug(
name,
`Connection timeout triggered after ${elapsed}ms (limit: ${getConnectionTimeoutMs()}ms)`,
)
if (inProcessServer) {
inProcessServer.close().catch(() => {})
}
transport.close().catch(() => {})
reject(
new TelemetrySafeError_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS(
`MCP server "${name}" connection timed out after ${getConnectionTimeoutMs()}ms`,
'MCP connection timeout',
),
)
}, getConnectionTimeoutMs())
// Clean up timeout if connect resolves or rejects
connectPromise.then(
() => {
clearTimeout(timeoutId)
},
_error => {
clearTimeout(timeoutId)
},
)
})
try {
await Promise.race([connectPromise, timeoutPromise])
if (stderrOutput) {
logMCPError(name, `Server stderr: ${stderrOutput}`)
stderrOutput = '' // Release accumulated string to prevent memory growth
}
const elapsed = Date.now() - connectStartTime
logMCPDebug(
name,
`Successfully connected (transport: ${serverRef.type || 'stdio'}) in ${elapsed}ms`,
)
} catch (error) {
const elapsed = Date.now() - connectStartTime
// SSE-specific error logging
if (serverRef.type === 'sse' && error instanceof Error) {
logMCPDebug(
name,
`SSE Connection failed after ${elapsed}ms: ${jsonStringify({
url: serverRef.url,
error: error.message,
errorType: error.constructor.name,
stack: error.stack,
})}`,
)
logMCPError(name, error)
if (error instanceof UnauthorizedError) {
return handleRemoteAuthFailure(name, serverRef, 'sse')
}
} else if (serverRef.type === 'http' && error instanceof Error) {
const errorObj = error as Error & {
cause?: unknown
code?: string
errno?: string | number
syscall?: string
}
logMCPDebug(
name,
`HTTP Connection failed after ${elapsed}ms: ${error.message} (code: ${errorObj.code || 'none'}, errno: ${errorObj.errno || 'none'})`,
)
logMCPError(name, error)
if (error instanceof UnauthorizedError) {
return handleRemoteAuthFailure(name, serverRef, 'http')
}
} else if (
serverRef.type === 'claudeai-proxy' &&
error instanceof Error
) {
logMCPDebug(
name,
`claude.ai proxy connection failed after ${elapsed}ms: ${error.message}`,
)
logMCPError(name, error)
// StreamableHTTPError has a `code` property with the HTTP status
const errorCode = (error as Error & { code?: number }).code
if (errorCode === 401) {
return handleRemoteAuthFailure(name, serverRef, 'claudeai-proxy')
}
} else if (
serverRef.type === 'sse-ide' ||
serverRef.type === 'ws-ide'
) {
logEvent('tengu_mcp_ide_server_connection_failed', {
connectionDurationMs: elapsed,
})
}
if (inProcessServer) {
inProcessServer.close().catch(() => {})
}
transport.close().catch(() => {})
if (stderrOutput) {
logMCPError(name, `Server stderr: ${stderrOutput}`)
}
throw error
}
const capabilities = client.getServerCapabilities()
const serverVersion = client.getServerVersion()
const rawInstructions = client.getInstructions()
let instructions = rawInstructions
if (
rawInstructions &&
rawInstructions.length > MAX_MCP_DESCRIPTION_LENGTH
) {
instructions =
rawInstructions.slice(0, MAX_MCP_DESCRIPTION_LENGTH) + 'β¦ [truncated]'
logMCPDebug(
name,
`Server instructions truncated from ${rawInstructions.length} to ${MAX_MCP_DESCRIPTION_LENGTH} chars`,
)
}
// Log successful connection details
logMCPDebug(
name,
`Connection established with capabilities: ${jsonStringify({
hasTools: !!capabilities?.tools,
hasPrompts: !!capabilities?.prompts,
hasResources: !!capabilities?.resources,
hasResourceSubscribe: !!capabilities?.resources?.subscribe,
serverVersion: serverVersion || 'unknown',
})}`,
)
logForDebugging(
`[MCP] Server "${name}" connected with subscribe=${!!capabilities?.resources?.subscribe}`,
)
// Register default elicitation handler that returns cancel during the
// window before registerElicitationHandler overwrites it in
// onConnectionAttempt (useManageMCPConnections).
client.setRequestHandler(ElicitRequestSchema, async request => {
logMCPDebug(
name,
`Elicitation request received during initialization: ${jsonStringify(request)}`,
)
return { action: 'cancel' as const }
})
if (serverRef.type === 'sse-ide' || serverRef.type === 'ws-ide') {
const ideConnectionDurationMs = Date.now() - connectStartTime
logEvent('tengu_mcp_ide_server_connection_succeeded', {
connectionDurationMs: ideConnectionDurationMs,
serverVersion:
serverVersion as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
})
try {
void maybeNotifyIDEConnected(client)
} catch (error) {
logMCPError(
name,
`Failed to send ide_connected notification: ${error}`,
)
}
}
// Enhanced connection drop detection and logging for all transport types
const connectionStartTime = Date.now()
let hasErrorOccurred = false
// Store original handlers
const originalOnerror = client.onerror
const originalOnclose = client.onclose
// The SDK's transport calls onerror on connection failures but doesn't call onclose,
// which CC uses to trigger reconnection. We bridge this gap by tracking consecutive
// terminal errors and manually closing after MAX_ERRORS_BEFORE_RECONNECT failures.
let consecutiveConnectionErrors = 0
const MAX_ERRORS_BEFORE_RECONNECT = 3
// Guard against re-entry: close() aborts in-flight streams which may fire
// onerror again before the close chain completes.
let hasTriggeredClose = false
// client.close() β transport.close() β transport.onclose β SDK's _onclose():
// rejects all pending request handlers (so hung callTool() promises fail with
// McpError -32000 "Connection closed") and then invokes our client.onclose
// handler below (which clears the memo cache so the next call reconnects).
// Calling client.onclose?.() directly would only clear the cache β pending
// tool calls would stay hung.
const closeTransportAndRejectPending = (reason: string) => {
if (hasTriggeredClose) return
hasTriggeredClose = true
logMCPDebug(name, `Closing transport (${reason})`)
void client.close().catch(e => {
logMCPDebug(name, `Error during close: ${errorMessage(e)}`)
})
}
const isTerminalConnectionError = (msg: string): boolean => {
return (
msg.includes('ECONNRESET') ||
msg.includes('ETIMEDOUT') ||
msg.includes('EPIPE') ||
msg.includes('EHOSTUNREACH') ||
msg.includes('ECONNREFUSED') ||
msg.includes('Body Timeout Error') ||
msg.includes('terminated') ||
// SDK SSE reconnection intermediate errors β may be wrapped around the
// actual network error, so the substrings above won't match
msg.includes('SSE stream disconnected') ||
msg.includes('Failed to reconnect SSE stream')
)
}
// Enhanced error handler with detailed logging
client.onerror = (error: Error) => {
const uptime = Date.now() - connectionStartTime
hasErrorOccurred = true
const transportType = serverRef.type || 'stdio'
// Log the connection drop with context
logMCPDebug(
name,
`${transportType.toUpperCase()} connection dropped after ${Math.floor(uptime / 1000)}s uptime`,
)
// Log specific error details for debugging
if (error.message) {
if (error.message.includes('ECONNRESET')) {
logMCPDebug(
name,
`Connection reset - server may have crashed or restarted`,
)
} else if (error.message.includes('ETIMEDOUT')) {
logMCPDebug(
name,
`Connection timeout - network issue or server unresponsive`,
)
} else if (error.message.includes('ECONNREFUSED')) {
logMCPDebug(name, `Connection refused - server may be down`)
} else if (error.message.includes('EPIPE')) {
logMCPDebug(
name,
`Broken pipe - server closed connection unexpectedly`,
)
} else if (error.message.includes('EHOSTUNREACH')) {
logMCPDebug(name, `Host unreachable - network connectivity issue`)
} else if (error.message.includes('ESRCH')) {
logMCPDebug(
name,
`Process not found - stdio server process terminated`,
)
} else if (error.message.includes('spawn')) {
logMCPDebug(
name,
`Failed to spawn process - check command and permissions`,
)
} else {
logMCPDebug(name, `Connection error: ${error.message}`)
}
}
// For HTTP transports, detect session expiry (404 + JSON-RPC -32001)
// and close the transport so pending tool calls reject and the next
// call reconnects with a fresh session ID.
if (
(transportType === 'http' || transportType === 'claudeai-proxy') &&
isMcpSessionExpiredError(error)
) {
logMCPDebug(
name,
`MCP session expired (server returned 404 with session-not-found), triggering reconnection`,
)
closeTransportAndRejectPending('session expired')
if (originalOnerror) {
originalOnerror(error)
}
return
}
// For remote transports (SSE/HTTP), track terminal connection errors
// and trigger reconnection via close if we see repeated failures.
if (
transportType === 'sse' ||
transportType === 'http' ||
transportType === 'claudeai-proxy'
) {
// The SDK's StreamableHTTP transport fires this after exhausting its
// own SSE reconnect attempts (default maxRetries: 2) β but it never
// calls onclose, so pending callTool() promises hang indefinitely.
// This is the definitive "transport gave up" signal.
if (error.message.includes('Maximum reconnection attempts')) {
closeTransportAndRejectPending('SSE reconnection exhausted')
if (originalOnerror) {
originalOnerror(error)
}
return
}
if (isTerminalConnectionError(error.message)) {
consecutiveConnectionErrors++
logMCPDebug(
name,
`Terminal connection error ${consecutiveConnectionErrors}/${MAX_ERRORS_BEFORE_RECONNECT}`,
)
if (consecutiveConnectionErrors >= MAX_ERRORS_BEFORE_RECONNECT) {
consecutiveConnectionErrors = 0
closeTransportAndRejectPending('max consecutive terminal errors')
}
} else {
// Non-terminal error (e.g., transient issue), reset counter
consecutiveConnectionErrors = 0
}
}
// Call original handler
if (originalOnerror) {
originalOnerror(error)
}
}
// Enhanced close handler with connection drop context
client.onclose = () => {
const uptime = Date.now() - connectionStartTime
const transportType = serverRef.type ?? 'unknown'
logMCPDebug(
name,
`${transportType.toUpperCase()} connection closed after ${Math.floor(uptime / 1000)}s (${hasErrorOccurred ? 'with errors' : 'cleanly'})`,
)
// Clear the memoization cache so next operation reconnects
const key = getServerCacheKey(name, serverRef)
// Also clear fetch caches (keyed by server name). Reconnection
// creates a new connection object; without clearing, the next
// fetch would return stale tools/resources from the old connection.
fetchToolsForClient.cache.delete(name)
fetchResourcesForClient.cache.delete(name)
fetchCommandsForClient.cache.delete(name)
if (feature('MCP_SKILLS')) {
fetchMcpSkillsForClient!.cache.delete(name)
}
connectToServer.cache.delete(key)
logMCPDebug(name, `Cleared connection cache for reconnection`)
if (originalOnclose) {
originalOnclose()
}
}
const cleanup = async () => {
// In-process servers (e.g. Chrome MCP) don't have child processes or stderr
if (inProcessServer) {
try {
await inProcessServer.close()
} catch (error) {
logMCPDebug(name, `Error closing in-process server: ${error}`)
}
try {
await client.close()
} catch (error) {
logMCPDebug(name, `Error closing client: ${error}`)
}
return
}
// Remove stderr event listener to prevent memory leaks
if (stderrHandler && (serverRef.type === 'stdio' || !serverRef.type)) {
const stdioTransport = transport as StdioClientTransport
stdioTransport.stderr?.off('data', stderrHandler)
}
// For stdio transports, explicitly terminate the child process with proper signals
// NOTE: StdioClientTransport.close() only sends an abort signal, but many MCP servers
// (especially Docker containers) need explicit SIGINT/SIGTERM signals to trigger graceful shutdown
if (serverRef.type === 'stdio') {
try {
const stdioTransport = transport as StdioClientTransport
const childPid = stdioTransport.pid
if (childPid) {
logMCPDebug(name, 'Sending SIGINT to MCP server process')
// First try SIGINT (like Ctrl+C)
try {
process.kill(childPid, 'SIGINT')
} catch (error) {
logMCPDebug(name, `Error sending SIGINT: ${error}`)
return
}
// Wait for graceful shutdown with rapid escalation (total 500ms to keep CLI responsive)
await new Promise<void>(async resolve => {
let resolved = false
// Set up a timer to check if process still exists
const checkInterval = setInterval(() => {
try {
// process.kill(pid, 0) checks if process exists without killing it
process.kill(childPid, 0)
} catch {
// Process no longer exists
if (!resolved) {
resolved = true
clearInterval(checkInterval)
clearTimeout(failsafeTimeout)
logMCPDebug(name, 'MCP server process exited cleanly')
resolve()
}
}
}, 50)
// Absolute failsafe: clear interval after 600ms no matter what
const failsafeTimeout = setTimeout(() => {
if (!resolved) {
resolved = true
clearInterval(checkInterval)
logMCPDebug(
name,
'Cleanup timeout reached, stopping process monitoring',
)
resolve()
}
}, 600)
try {
// Wait 100ms for SIGINT to work (usually much faster)
await sleep(100)
if (!resolved) {
// Check if process still exists
try {
process.kill(childPid, 0)
// Process still exists, SIGINT failed, try SIGTERM
logMCPDebug(
name,
'SIGINT failed, sending SIGTERM to MCP server process',
)
try {
process.kill(childPid, 'SIGTERM')
} catch (termError) {
logMCPDebug(name, `Error sending SIGTERM: ${termError}`)
resolved = true
clearInterval(checkInterval)
clearTimeout(failsafeTimeout)
resolve()
return
}
} catch {
// Process already exited
resolved = true
clearInterval(checkInterval)
clearTimeout(failsafeTimeout)
resolve()
return
}
// Wait 400ms for SIGTERM to work (slower than SIGINT, often used for cleanup)
await sleep(400)
if (!resolved) {
// Check if process still exists
try {
process.kill(childPid, 0)
// Process still exists, SIGTERM failed, force kill with SIGKILL
logMCPDebug(
name,
'SIGTERM failed, sending SIGKILL to MCP server process',
)
try {
process.kill(childPid, 'SIGKILL')
} catch (killError) {
logMCPDebug(
name,
`Error sending SIGKILL: ${killError}`,
)
}
} catch {
// Process already exited
resolved = true
clearInterval(checkInterval)
clearTimeout(failsafeTimeout)
resolve()
}
}
}
// Final timeout - always resolve after 500ms max (total cleanup time)
if (!resolved) {
resolved = true
clearInterval(checkInterval)
clearTimeout(failsafeTimeout)
resolve()
}
} catch {
// Handle any errors in the escalation sequence
if (!resolved) {
resolved = true
clearInterval(checkInterval)
clearTimeout(failsafeTimeout)
resolve()
}
}
})
}
} catch (processError) {
logMCPDebug(name, `Error terminating process: ${processError}`)
}
}
// Close the client connection (which also closes the transport)
try {
await client.close()
} catch (error) {
logMCPDebug(name, `Error closing client: ${error}`)
}
}
// Register cleanup for all transport types - even network transports might need cleanup
// This ensures all MCP servers get properly terminated, not just stdio ones
const cleanupUnregister = registerCleanup(cleanup)
// Create the wrapped cleanup that includes unregistering
const wrappedCleanup = async () => {
cleanupUnregister?.()
await cleanup()
}
const connectionDurationMs = Date.now() - connectStartTime
logEvent('tengu_mcp_server_connection_succeeded', {
connectionDurationMs,
transportType: (serverRef.type ??
'stdio') as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
totalServers: serverStats?.totalServers,
stdioCount: serverStats?.stdioCount,
sseCount: serverStats?.sseCount,
httpCount: serverStats?.httpCount,
sseIdeCount: serverStats?.sseIdeCount,
wsIdeCount: serverStats?.wsIdeCount,
...mcpBaseUrlAnalytics(serverRef),
})
return {
name,
client,
type: 'connected' as const,
capabilities: capabilities ?? {},
serverInfo: serverVersion,
instructions,
config: serverRef,
cleanup: wrappedCleanup,
}
} catch (error) {
const connectionDurationMs = Date.now() - connectStartTime
logEvent('tengu_mcp_server_connection_failed', {
connectionDurationMs,
totalServers: serverStats?.totalServers || 1,
stdioCount:
serverStats?.stdioCount || (serverRef.type === 'stdio' ? 1 : 0),
sseCount: serverStats?.sseCount || (serverRef.type === 'sse' ? 1 : 0),
httpCount:
serverStats?.httpCount || (serverRef.type === 'http' ? 1 : 0),
sseIdeCount:
serverStats?.sseIdeCount || (serverRef.type === 'sse-ide' ? 1 : 0),
wsIdeCount:
serverStats?.wsIdeCount || (serverRef.type === 'ws-ide' ? 1 : 0),
transportType: (serverRef.type ??
'stdio') as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
...mcpBaseUrlAnalytics(serverRef),
})
logMCPDebug(
name,
`Connection failed after ${connectionDurationMs}ms: ${errorMessage(error)}`,
)
logMCPError(name, `Connection failed: ${errorMessage(error)}`)
if (inProcessServer) {
inProcessServer.close().catch(() => {})
}
return {
name,
type: 'failed' as const,
config: serverRef,
error: errorMessage(error),
}
}
},
getServerCacheKey,
)
/**
* Clears the memoize cache for a specific server
* @param name Server name
* @param serverRef Server configuration
*/
export async function clearServerCache(
name: string,
serverRef: ScopedMcpServerConfig,
): Promise<void> {
const key = getServerCacheKey(name, serverRef)
try {
const wrappedClient = await connectToServer(name, serverRef)
if (wrappedClient.type === 'connected') {
await wrappedClient.cleanup()
}
} catch {
// Ignore errors - server might have failed to connect
}
// Clear from cache (both connection and fetch caches so reconnect
// fetches fresh tools/resources/commands instead of stale ones)
connectToServer.cache.delete(key)
fetchToolsForClient.cache.delete(name)
fetchResourcesForClient.cache.delete(name)
fetchCommandsForClient.cache.delete(name)
if (feature('MCP_SKILLS')) {
fetchMcpSkillsForClient!.cache.delete(name)
}
}
/**
* Ensures a valid connected client for an MCP server.
* For most server types, uses the memoization cache if available, or reconnects
* if the cache was cleared (e.g., after onclose). This ensures tool/resource
* calls always use a valid connection.
*
* SDK MCP servers run in-process and are handled separately via setupSdkMcpClients,
* so they are returned as-is without going through connectToServer.
*
* @param client The connected MCP server client
* @returns Connected MCP server client (same or reconnected)
* @throws Error if server cannot be connected
*/
export async function ensureConnectedClient(
client: ConnectedMCPServer,
): Promise<ConnectedMCPServer> {
// SDK MCP servers run in-process and are handled separately via setupSdkMcpClients
if (client.config.type === 'sdk') {
return client
}
const connectedClient = await connectToServer(client.name, client.config)
if (connectedClient.type !== 'connected') {
throw new TelemetrySafeError_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS(
`MCP server "${client.name}" is not connected`,
'MCP server not connected',
)
}
return connectedClient
}
/**
* Compares two MCP server configurations to determine if they are equivalent.
* Used to detect when a server needs to be reconnected due to config changes.
*/
export function areMcpConfigsEqual(
a: ScopedMcpServerConfig,
b: ScopedMcpServerConfig,
): boolean {
// Quick type check first
if (a.type !== b.type) return false
// Compare by serializing - this handles all config variations
// We exclude 'scope' from comparison since it's metadata, not connection config
const { scope: _scopeA, ...configA } = a
const { scope: _scopeB, ...configB } = b
return jsonStringify(configA) === jsonStringify(configB)
}
// Max cache size for fetch* caches. Keyed by server name (stable across
// reconnects), bounded to prevent unbounded growth with many MCP servers.
const MCP_FETCH_CACHE_SIZE = 20
/**
* Encode MCP tool input for the auto-mode security classifier.
* Exported so the auto-mode eval scripts can mirror production encoding
* for `mcp__*` tool stubs without duplicating this logic.
*/
export function mcpToolInputToAutoClassifierInput(
input: Record<string, unknown>,
toolName: string,
): string {
const keys = Object.keys(input)
return keys.length > 0
? keys.map(k => `${k}=${String(input[k])}`).join(' ')
: toolName
}
export const fetchToolsForClient = memoizeWithLRU(
async (client: MCPServerConnection): Promise<Tool[]> => {
if (client.type !== 'connected') return []
try {
if (!client.capabilities?.tools) {
return []
}
const result = (await client.client.request(
{ method: 'tools/list' },
ListToolsResultSchema,
)) as ListToolsResult
// Sanitize tool data from MCP server
const toolsToProcess = recursivelySanitizeUnicode(result.tools)
// Check if we should skip the mcp__ prefix for SDK MCP servers
const skipPrefix =
client.config.type === 'sdk' &&
isEnvTruthy(process.env.CLAUDE_AGENT_SDK_MCP_NO_PREFIX)
// Convert MCP tools to our Tool format
return toolsToProcess
.map((tool): Tool => {
const fullyQualifiedName = buildMcpToolName(client.name, tool.name)
return {
...MCPTool,
// In skip-prefix mode, use the original name for model invocation so MCP tools
// can override builtins by name. mcpInfo is used for permission checking.
name: skipPrefix ? tool.name : fullyQualifiedName,
mcpInfo: { serverName: client.name, toolName: tool.name },
isMcp: true,
// Collapse whitespace: _meta is open to external MCP servers, and
// a newline here would inject orphan lines into the deferred-tool
// list (formatDeferredToolLine joins on '\n').
searchHint:
typeof tool._meta?.['anthropic/searchHint'] === 'string'
? tool._meta['anthropic/searchHint']
.replace(/\s+/g, ' ')
.trim() || undefined
: undefined,
alwaysLoad: tool._meta?.['anthropic/alwaysLoad'] === true,
async description() {
return tool.description ?? ''
},
async prompt() {
const desc = tool.description ?? ''
return desc.length > MAX_MCP_DESCRIPTION_LENGTH
? desc.slice(0, MAX_MCP_DESCRIPTION_LENGTH) + 'β¦ [truncated]'
: desc
},
isConcurrencySafe() {
return tool.annotations?.readOnlyHint ?? false
},
isReadOnly() {
return tool.annotations?.readOnlyHint ?? false
},
toAutoClassifierInput(input) {
return mcpToolInputToAutoClassifierInput(input, tool.name)
},
isDestructive() {
return tool.annotations?.destructiveHint ?? false
},
isOpenWorld() {
return tool.annotations?.openWorldHint ?? false
},
isSearchOrReadCommand() {
return classifyMcpToolForCollapse(client.name, tool.name)
},
inputJSONSchema: tool.inputSchema as Tool['inputJSONSchema'],
async checkPermissions() {
return {
behavior: 'passthrough' as const,
message: 'MCPTool requires permission.',
suggestions: [
{
type: 'addRules' as const,
rules: [
{
toolName: fullyQualifiedName,
ruleContent: undefined,
},
],
behavior: 'allow' as const,
destination: 'localSettings' as const,
},
],
}
},
async call(
args: Record<string, unknown>,
context,
_canUseTool,
parentMessage,
onProgress?: ToolCallProgress<MCPProgress>,
) {
const toolUseId = extractToolUseId(parentMessage)
const meta = toolUseId
? { 'claudecode/toolUseId': toolUseId }
: {}
// Emit progress when tool starts
if (onProgress && toolUseId) {
onProgress({
toolUseID: toolUseId,
data: {
type: 'mcp_progress',
status: 'started',
serverName: client.name,
toolName: tool.name,
},
})
}
const startTime = Date.now()
const MAX_SESSION_RETRIES = 1
for (let attempt = 0; ; attempt++) {
try {
const connectedClient = await ensureConnectedClient(client)
const mcpResult = await callMCPToolWithUrlElicitationRetry({
client: connectedClient,
clientConnection: client,
tool: tool.name,
args,
meta,
signal: context.abortController.signal,
setAppState: context.setAppState,
onProgress:
onProgress && toolUseId
? progressData => {
onProgress({
toolUseID: toolUseId,
data: progressData,
})
}
: undefined,
handleElicitation: context.handleElicitation,
})
// Emit progress when tool completes successfully
if (onProgress && toolUseId) {
onProgress({
toolUseID: toolUseId,
data: {
type: 'mcp_progress',
status: 'completed',
serverName: client.name,
toolName: tool.name,
elapsedTimeMs: Date.now() - startTime,
},
})
}
return {
data: mcpResult.content,
...((mcpResult._meta || mcpResult.structuredContent) && {
mcpMeta: {
...(mcpResult._meta && {
_meta: mcpResult._meta,
}),
...(mcpResult.structuredContent && {
structuredContent: mcpResult.structuredContent,
}),
},
}),
}
} catch (error) {
// Session expired β the connection cache has been
// cleared, so retry with a fresh client.
if (
error instanceof McpSessionExpiredError &&
attempt < MAX_SESSION_RETRIES
) {
logMCPDebug(
client.name,
`Retrying tool '${tool.name}' after session recovery`,
)
continue
}
// Emit progress when tool fails
if (onProgress && toolUseId) {
onProgress({
toolUseID: toolUseId,
data: {
type: 'mcp_progress',
status: 'failed',
serverName: client.name,
toolName: tool.name,
elapsedTimeMs: Date.now() - startTime,
},
})
}
// Wrap MCP SDK errors so telemetry gets useful context
// instead of just "Error" or "McpError" (the constructor
// name). MCP SDK errors are protocol-level messages and
// don't contain user file paths or code.
if (
error instanceof Error &&
!(
error instanceof
TelemetrySafeError_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS
)
) {
const name = error.constructor.name
if (name === 'Error') {
throw new TelemetrySafeError_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS(
error.message,
error.message.slice(0, 200),
)
}
// McpError has a numeric `code` with the JSON-RPC error
// code (e.g. -32000 ConnectionClosed, -32001 RequestTimeout)
if (
name === 'McpError' &&
'code' in error &&
typeof error.code === 'number'
) {
throw new TelemetrySafeError_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS(
error.message,
`McpError ${error.code}`,
)
}
}
throw error
}
}
},
userFacingName() {
// Prefer title annotation if available, otherwise use tool name
const displayName = tool.annotations?.title || tool.name
return `${client.name} - ${displayName} (MCP)`
},
...(isClaudeInChromeMCPServer(client.name) &&
(client.config.type === 'stdio' || !client.config.type)
? claudeInChromeToolRendering().getClaudeInChromeMCPToolOverrides(
tool.name,
)
: {}),
...(feature('CHICAGO_MCP') &&
(client.config.type === 'stdio' || !client.config.type) &&
isComputerUseMCPServer!(client.name)
? computerUseWrapper!().getComputerUseMCPToolOverrides(tool.name)
: {}),
}
})
.filter(isIncludedMcpTool)
} catch (error) {
logMCPError(client.name, `Failed to fetch tools: ${errorMessage(error)}`)
return []
}
},
(client: MCPServerConnection) => client.name,
MCP_FETCH_CACHE_SIZE,
)
export const fetchResourcesForClient = memoizeWithLRU(
async (client: MCPServerConnection): Promise<ServerResource[]> => {
if (client.type !== 'connected') return []
try {
if (!client.capabilities?.resources) {
return []
}
const result = await client.client.request(
{ method: 'resources/list' },
ListResourcesResultSchema,
)
if (!result.resources) return []
// Add server name to each resource
return result.resources.map(resource => ({
...resource,
server: client.name,
}))
} catch (error) {
logMCPError(
client.name,
`Failed to fetch resources: ${errorMessage(error)}`,
)
return []
}
},
(client: MCPServerConnection) => client.name,
MCP_FETCH_CACHE_SIZE,
)
export const fetchCommandsForClient = memoizeWithLRU(
async (client: MCPServerConnection): Promise<Command[]> => {
if (client.type !== 'connected') return []
try {
if (!client.capabilities?.prompts) {
return []
}
// Request prompts list from client
const result = (await client.client.request(
{ method: 'prompts/list' },
ListPromptsResultSchema,
)) as ListPromptsResult
if (!result.prompts) return []
// Sanitize prompt data from MCP server
const promptsToProcess = recursivelySanitizeUnicode(result.prompts)
// Convert MCP prompts to our Command format
return promptsToProcess.map(prompt => {
const argNames = Object.values(prompt.arguments ?? {}).map(k => k.name)
return {
type: 'prompt' as const,
name: 'mcp__' + normalizeNameForMCP(client.name) + '__' + prompt.name,
description: prompt.description ?? '',
hasUserSpecifiedDescription: !!prompt.description,
contentLength: 0, // Dynamic MCP content
isEnabled: () => true,
isHidden: false,
isMcp: true,
progressMessage: 'running',
userFacingName() {
// Use prompt.name (programmatic identifier) not prompt.title (display name)
// to avoid spaces breaking slash command parsing
return `${client.name}:${prompt.name} (MCP)`
},
argNames,
source: 'mcp',
async getPromptForCommand(args: string) {
const argsArray = args.split(' ')
try {
const connectedClient = await ensureConnectedClient(client)
const result = await connectedClient.client.getPrompt({
name: prompt.name,
arguments: zipObject(argNames, argsArray),
})
const transformed = await Promise.all(
result.messages.map(message =>
transformResultContent(message.content, connectedClient.name),
),
)
return transformed.flat()
} catch (error) {
logMCPError(
client.name,
`Error running command '${prompt.name}': ${errorMessage(error)}`,
)
throw error
}
},
}
})
} catch (error) {
logMCPError(
client.name,
`Failed to fetch commands: ${errorMessage(error)}`,
)
return []
}
},
(client: MCPServerConnection) => client.name,
MCP_FETCH_CACHE_SIZE,
)
/**
* Call an IDE tool directly as an RPC
* @param toolName The name of the tool to call
* @param args The arguments to pass to the tool
* @param client The IDE client to use for the RPC call
* @returns The result of the tool call
*/
export async function callIdeRpc(
toolName: string,
args: Record<string, unknown>,
client: ConnectedMCPServer,
): Promise<string | ContentBlockParam[] | undefined> {
const result = await callMCPTool({
client,
tool: toolName,
args,
signal: createAbortController().signal,
})
return result.content
}
/**
* Note: This should not be called by UI components directly, they should use the reconnectMcpServer
* function from useManageMcpConnections.
* @param name Server name
* @param config Server configuration
* @returns Object containing the client connection and its resources
*/
export async function reconnectMcpServerImpl(
name: string,
config: ScopedMcpServerConfig,
): Promise<{
client: MCPServerConnection
tools: Tool[]
commands: Command[]
resources?: ServerResource[]
}> {
try {
// Invalidate the keychain cache so we read fresh credentials from disk.
// This is necessary when another process (e.g. the VS Code extension host)
// has modified stored tokens (cleared auth, saved new OAuth tokens) and then
// asks the CLI subprocess to reconnect. Without this, the subprocess would
// use stale cached data and never notice the tokens were removed.
clearKeychainCache()
await clearServerCache(name, config)
const client = await connectToServer(name, config)
if (client.type !== 'connected') {
return {
client,
tools: [],
commands: [],
}
}
if (config.type === 'claudeai-proxy') {
markClaudeAiMcpConnected(name)
}
const supportsResources = !!client.capabilities?.resources
const [tools, mcpCommands, mcpSkills, resources] = await Promise.all([
fetchToolsForClient(client),
fetchCommandsForClient(client),
feature('MCP_SKILLS') && supportsResources
? fetchMcpSkillsForClient!(client)
: Promise.resolve([]),
supportsResources ? fetchResourcesForClient(client) : Promise.resolve([]),
])
const commands = [...mcpCommands, ...mcpSkills]
// Check if we need to add resource tools
const resourceTools: Tool[] = []
if (supportsResources) {
// Only add resource tools if no other server has them
const hasResourceTools = [ListMcpResourcesTool, ReadMcpResourceTool].some(
tool => tools.some(t => toolMatchesName(t, tool.name)),
)
if (!hasResourceTools) {
resourceTools.push(ListMcpResourcesTool, ReadMcpResourceTool)
}
}
return {
client,
tools: [...tools, ...resourceTools],
commands,
resources: resources.length > 0 ? resources : undefined,
}
} catch (error) {
// Handle errors gracefully - connection might have closed during fetch
logMCPError(name, `Error during reconnection: ${errorMessage(error)}`)
// Return with failed status
return {
client: { name, type: 'failed' as const, config },
tools: [],
commands: [],
}
}
}
// Replaced 2026-03: previous implementation ran fixed-size sequential batches
// (await batch 1 fully, then start batch 2). That meant one slow server in
// batch N held up ALL servers in batch N+1, even if the other 19 slots were
// idle. pMap frees each slot as soon as its server completes, so a single
// slow server only occupies one slot instead of blocking an entire batch
// boundary. Same concurrency ceiling, same results, better scheduling.
async function processBatched<T>(
items: T[],
concurrency: number,
processor: (item: T) => Promise<void>,
): Promise<void> {
await pMap(items, processor, { concurrency })
}
export async function getMcpToolsCommandsAndResources(
onConnectionAttempt: (params: {
client: MCPServerConnection
tools: Tool[]
commands: Command[]
resources?: ServerResource[]
}) => void,
mcpConfigs?: Record<string, ScopedMcpServerConfig>,
): Promise<void> {
let resourceToolsAdded = false
const allConfigEntries = Object.entries(
mcpConfigs ?? (await getAllMcpConfigs()).servers,
)
// Partition into disabled and active entries β disabled servers should
// never generate HTTP connections or flow through batch processing
const configEntries: typeof allConfigEntries = []
for (const entry of allConfigEntries) {
if (isMcpServerDisabled(entry[0])) {
onConnectionAttempt({
client: { name: entry[0], type: 'disabled', config: entry[1] },
tools: [],
commands: [],
})
} else {
configEntries.push(entry)
}
}
// Calculate transport counts for logging
const totalServers = configEntries.length
const stdioCount = count(configEntries, ([_, c]) => c.type === 'stdio')
const sseCount = count(configEntries, ([_, c]) => c.type === 'sse')
const httpCount = count(configEntries, ([_, c]) => c.type === 'http')
const sseIdeCount = count(configEntries, ([_, c]) => c.type === 'sse-ide')
const wsIdeCount = count(configEntries, ([_, c]) => c.type === 'ws-ide')
// Split servers by type: local (stdio/sdk) need lower concurrency due to
// process spawning, remote servers can connect with higher concurrency
const localServers = configEntries.filter(([_, config]) =>
isLocalMcpServer(config),
)
const remoteServers = configEntries.filter(
([_, config]) => !isLocalMcpServer(config),
)
const serverStats = {
totalServers,
stdioCount,
sseCount,
httpCount,
sseIdeCount,
wsIdeCount,
}
const processServer = async ([name, config]: [
string,
ScopedMcpServerConfig,
]): Promise<void> => {
try {
// Check if server is disabled - if so, just add it to state without connecting
if (isMcpServerDisabled(name)) {
onConnectionAttempt({
client: {
name,
type: 'disabled',
config,
},
tools: [],
commands: [],
})
return
}
// Skip connection for servers that recently returned 401 (15min TTL),
// or that we have probed before but hold no token for. The second
// check closes the gap the TTL leaves open: without it, every 15min
// we re-probe servers that cannot succeed until the user runs /mcp.
// Each probe is a network round-trip for connect-401 plus OAuth
// discovery, and print mode awaits the whole batch (main.tsx:3503).
if (
(config.type === 'claudeai-proxy' ||
config.type === 'http' ||
config.type === 'sse') &&
((await isMcpAuthCached(name)) ||
((config.type === 'http' || config.type === 'sse') &&
hasMcpDiscoveryButNoToken(name, config)))
) {
logMCPDebug(name, `Skipping connection (cached needs-auth)`)
onConnectionAttempt({
client: { name, type: 'needs-auth' as const, config },
tools: [createMcpAuthTool(name, config)],
commands: [],
})
return
}
const client = await connectToServer(name, config, serverStats)
if (client.type !== 'connected') {
onConnectionAttempt({
client,
tools:
client.type === 'needs-auth'
? [createMcpAuthTool(name, config)]
: [],
commands: [],
})
return
}
if (config.type === 'claudeai-proxy') {
markClaudeAiMcpConnected(name)
}
const supportsResources = !!client.capabilities?.resources
const [tools, mcpCommands, mcpSkills, resources] = await Promise.all([
fetchToolsForClient(client),
fetchCommandsForClient(client),
// Discover skills from skill:// resources
feature('MCP_SKILLS') && supportsResources
? fetchMcpSkillsForClient!(client)
: Promise.resolve([]),
// Fetch resources if supported
supportsResources
? fetchResourcesForClient(client)
: Promise.resolve([]),
])
const commands = [...mcpCommands, ...mcpSkills]
// If this server resources and we haven't added resource tools yet,
// include our resource tools with this client's tools
const resourceTools: Tool[] = []
if (supportsResources && !resourceToolsAdded) {
resourceToolsAdded = true
resourceTools.push(ListMcpResourcesTool, ReadMcpResourceTool)
}
onConnectionAttempt({
client,
tools: [...tools, ...resourceTools],
commands,
resources: resources.length > 0 ? resources : undefined,
})
} catch (error) {
// Handle errors gracefully - connection might have closed during fetch
logMCPError(
name,
`Error fetching tools/commands/resources: ${errorMessage(error)}`,
)
// Still update with the client but no tools/commands
onConnectionAttempt({
client: { name, type: 'failed' as const, config },
tools: [],
commands: [],
})
}
}
// Process both groups concurrently, each with their own concurrency limits:
// - Local servers (stdio/sdk): lower concurrency to avoid process spawning resource contention
// - Remote servers: higher concurrency since they're just network connections
await Promise.all([
processBatched(
localServers,
getMcpServerConnectionBatchSize(),
processServer,
),
processBatched(
remoteServers,
getRemoteMcpServerConnectionBatchSize(),
processServer,
),
])
}
// Not memoized: called only 2-3 times at startup/reconfig. The inner work
// (connectToServer, fetch*ForClient) is already cached. Memoizing here by
// mcpConfigs object ref leaked β main.tsx creates fresh config objects each call.
export function prefetchAllMcpResources(
mcpConfigs: Record<string, ScopedMcpServerConfig>,
): Promise<{
clients: MCPServerConnection[]
tools: Tool[]
commands: Command[]
}> {
return new Promise(resolve => {
let pendingCount = 0
let completedCount = 0
pendingCount = Object.keys(mcpConfigs).length
if (pendingCount === 0) {
void resolve({
clients: [],
tools: [],
commands: [],
})
return
}
const clients: MCPServerConnection[] = []
const tools: Tool[] = []
const commands: Command[] = []
getMcpToolsCommandsAndResources(result => {
clients.push(result.client)
tools.push(...result.tools)
commands.push(...result.commands)
completedCount++
if (completedCount >= pendingCount) {
const commandsMetadataLength = commands.reduce((sum, command) => {
const commandMetadataLength =
command.name.length +
(command.description ?? '').length +
(command.argumentHint ?? '').length
return sum + commandMetadataLength
}, 0)
logEvent('tengu_mcp_tools_commands_loaded', {
tools_count: tools.length,
commands_count: commands.length,
commands_metadata_length: commandsMetadataLength,
})
void resolve({
clients,
tools,
commands,
})
}
}, mcpConfigs).catch(error => {
logMCPError(
'prefetchAllMcpResources',
`Failed to get MCP resources: ${errorMessage(error)}`,
)
// Still resolve with empty results
void resolve({
clients: [],
tools: [],
commands: [],
})
})
})
}
/**
* Transform result content from an MCP tool or MCP prompt into message blocks
*/
export async function transformResultContent(
resultContent: PromptMessage['content'],
serverName: string,
): Promise<Array<ContentBlockParam>> {
switch (resultContent.type) {
case 'text':
return [
{
type: 'text',
text: resultContent.text,
},
]
case 'audio': {
const audioData = resultContent as {
type: 'audio'
data: string
mimeType?: string
}
return await persistBlobToTextBlock(
Buffer.from(audioData.data, 'base64'),
audioData.mimeType,
serverName,
`[Audio from ${serverName}] `,
)
}
case 'image': {
// Resize and compress image data, enforcing API dimension limits
const imageBuffer = Buffer.from(String(resultContent.data), 'base64')
const ext = resultContent.mimeType?.split('/')[1] || 'png'
const resized = await maybeResizeAndDownsampleImageBuffer(
imageBuffer,
imageBuffer.length,
ext,
)
return [
{
type: 'image',
source: {
data: resized.buffer.toString('base64'),
media_type:
`image/${resized.mediaType}` as Base64ImageSource['media_type'],
type: 'base64',
},
},
]
}
case 'resource': {
const resource = resultContent.resource
const prefix = `[Resource from ${serverName} at ${resource.uri}] `
if ('text' in resource) {
return [
{
type: 'text',
text: `${prefix}${resource.text}`,
},
]
} else if ('blob' in resource) {
const isImage = IMAGE_MIME_TYPES.has(resource.mimeType ?? '')
if (isImage) {
// Resize and compress image blob, enforcing API dimension limits
const imageBuffer = Buffer.from(resource.blob, 'base64')
const ext = resource.mimeType?.split('/')[1] || 'png'
const resized = await maybeResizeAndDownsampleImageBuffer(
imageBuffer,
imageBuffer.length,
ext,
)
const content: MessageParam['content'] = []
if (prefix) {
content.push({
type: 'text',
text: prefix,
})
}
content.push({
type: 'image',
source: {
data: resized.buffer.toString('base64'),
media_type:
`image/${resized.mediaType}` as Base64ImageSource['media_type'],
type: 'base64',
},
})
return content
} else {
return await persistBlobToTextBlock(
Buffer.from(resource.blob, 'base64'),
resource.mimeType,
serverName,
prefix,
)
}
}
return []
}
case 'resource_link': {
const resourceLink = resultContent as ResourceLink
let text = `[Resource link: ${resourceLink.name}] ${resourceLink.uri}`
if (resourceLink.description) {
text += ` (${resourceLink.description})`
}
return [
{
type: 'text',
text,
},
]
}
default:
return []
}
}
/**
* Decode base64 binary content, write it to disk with the proper extension,
* and return a small text block with the file path. Replaces the old behavior
* of dumping raw base64 into the context.
*/
async function persistBlobToTextBlock(
bytes: Buffer,
mimeType: string | undefined,
serverName: string,
sourceDescription: string,
): Promise<Array<ContentBlockParam>> {
const persistId = `mcp-${normalizeNameForMCP(serverName)}-blob-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`
const result = await persistBinaryContent(bytes, mimeType, persistId)
if ('error' in result) {
return [
{
type: 'text',
text: `${sourceDescription}Binary content (${mimeType || 'unknown type'}, ${bytes.length} bytes) could not be saved to disk: ${result.error}`,
},
]
}
return [
{
type: 'text',
text: getBinaryBlobSavedMessage(
result.filepath,
mimeType,
result.size,
sourceDescription,
),
},
]
}
/**
* Processes MCP tool result into a normalized format.
*/
export type MCPResultType = 'toolResult' | 'structuredContent' | 'contentArray'
export type TransformedMCPResult = {
content: MCPToolResult
type: MCPResultType
schema?: string
}
/**
* Generates a compact, jq-friendly type signature for a value.
* e.g. "{title: string, items: [{id: number, name: string}]}"
*/
export function inferCompactSchema(value: unknown, depth = 2): string {
if (value === null) return 'null'
if (Array.isArray(value)) {
if (value.length === 0) return '[]'
return `[${inferCompactSchema(value[0], depth - 1)}]`
}
if (typeof value === 'object') {
if (depth <= 0) return '{...}'
const entries = Object.entries(value).slice(0, 10)
const props = entries.map(
([k, v]) => `${k}: ${inferCompactSchema(v, depth - 1)}`,
)
const suffix = Object.keys(value).length > 10 ? ', ...' : ''
return `{${props.join(', ')}${suffix}}`
}
return typeof value
}
export async function transformMCPResult(
result: unknown,
tool: string, // Tool name for validation (e.g., "search")
name: string, // Server name for transformation (e.g., "slack")
): Promise<TransformedMCPResult> {
if (result && typeof result === 'object') {
if ('toolResult' in result) {
return {
content: String(result.toolResult),
type: 'toolResult',
}
}
if (
'structuredContent' in result &&
result.structuredContent !== undefined
) {
return {
content: jsonStringify(result.structuredContent),
type: 'structuredContent',
schema: inferCompactSchema(result.structuredContent),
}
}
if ('content' in result && Array.isArray(result.content)) {
const transformedContent = (
await Promise.all(
result.content.map(item => transformResultContent(item, name)),
)
).flat()
return {
content: transformedContent,
type: 'contentArray',
schema: inferCompactSchema(transformedContent),
}
}
}
const errorMessage = `MCP server "${name}" tool "${tool}": unexpected response format`
logMCPError(name, errorMessage)
throw new TelemetrySafeError_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS(
errorMessage,
'MCP tool unexpected response format',
)
}
/**
* Check if MCP content contains any image blocks.
* Used to decide whether to persist to file (images should use truncation instead
* to preserve image compression and viewability).
*/
function contentContainsImages(content: MCPToolResult): boolean {
if (!content || typeof content === 'string') {
return false
}
return content.some(block => block.type === 'image')
}
export async function processMCPResult(
result: unknown,
tool: string, // Tool name for validation (e.g., "search")
name: string, // Server name for IDE check and transformation (e.g., "slack")
): Promise<MCPToolResult> {
const { content, type, schema } = await transformMCPResult(result, tool, name)
// IDE tools are not going to the model directly, so we don't need to
// handle large output.
if (name === 'ide') {
return content
}
// Check if content needs truncation (i.e., is too large)
if (!(await mcpContentNeedsTruncation(content))) {
return content
}
const sizeEstimateTokens = getContentSizeEstimate(content)
// If large output files feature is disabled, fall back to old truncation behavior
if (isEnvDefinedFalsy(process.env.ENABLE_MCP_LARGE_OUTPUT_FILES)) {
logEvent('tengu_mcp_large_result_handled', {
outcome: 'truncated',
reason: 'env_disabled',
sizeEstimateTokens,
} as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS)
return await truncateMcpContentIfNeeded(content)
}
// Save large output to file and return instructions for reading it
// Content is guaranteed to exist at this point (we checked mcpContentNeedsTruncation)
if (!content) {
return content
}
// If content contains images, fall back to truncation - persisting images as JSON
// defeats the image compression logic and makes them non-viewable
if (contentContainsImages(content)) {
logEvent('tengu_mcp_large_result_handled', {
outcome: 'truncated',
reason: 'contains_images',
sizeEstimateTokens,
} as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS)
return await truncateMcpContentIfNeeded(content)
}
// Generate a unique ID for the persisted file (server__tool-timestamp)
const timestamp = Date.now()
const persistId = `mcp-${normalizeNameForMCP(name)}-${normalizeNameForMCP(tool)}-${timestamp}`
// Convert to string for persistence (persistToolResult expects string or specific block types)
const contentStr =
typeof content === 'string' ? content : jsonStringify(content, null, 2)
const persistResult = await persistToolResult(contentStr, persistId)
if (isPersistError(persistResult)) {
// If file save failed, fall back to returning truncated content info
const contentLength = contentStr.length
logEvent('tengu_mcp_large_result_handled', {
outcome: 'truncated',
reason: 'persist_failed',
sizeEstimateTokens,
} as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS)
return `Error: result (${contentLength.toLocaleString()} characters) exceeds maximum allowed tokens. Failed to save output to file: ${persistResult.error}. If this MCP server provides pagination or filtering tools, use them to retrieve specific portions of the data.`
}
logEvent('tengu_mcp_large_result_handled', {
outcome: 'persisted',
reason: 'file_saved',
sizeEstimateTokens,
persistedSizeChars: persistResult.originalSize,
} as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS)
const formatDescription = getFormatDescription(type, schema)
return getLargeOutputInstructions(
persistResult.filepath,
persistResult.originalSize,
formatDescription,
)
}
/**
* Call an MCP tool, handling UrlElicitationRequiredError (-32042) by
* displaying the URL elicitation to the user, waiting for the completion
* notification, and retrying the tool call.
*/
type MCPToolCallResult = {
content: MCPToolResult
_meta?: Record<string, unknown>
structuredContent?: Record<string, unknown>
}
/** @internal Exported for testing. */
export async function callMCPToolWithUrlElicitationRetry({
client: connectedClient,
clientConnection,
tool,
args,
meta,
signal,
setAppState,
onProgress,
callToolFn = callMCPTool,
handleElicitation,
}: {
client: ConnectedMCPServer
clientConnection: MCPServerConnection
tool: string
args: Record<string, unknown>
meta?: Record<string, unknown>
signal: AbortSignal
setAppState: (f: (prev: AppState) => AppState) => void
onProgress?: (data: MCPProgress) => void
/** Injectable for testing. Defaults to callMCPTool. */
callToolFn?: (opts: {
client: ConnectedMCPServer
tool: string
args: Record<string, unknown>
meta?: Record<string, unknown>
signal: AbortSignal
onProgress?: (data: MCPProgress) => void
}) => Promise<MCPToolCallResult>
/** Handler for URL elicitations when no hook handles them.
* In print/SDK mode, delegates to structuredIO. In REPL, falls back to queue. */
handleElicitation?: (
serverName: string,
params: ElicitRequestURLParams,
signal: AbortSignal,
) => Promise<ElicitResult>
}): Promise<MCPToolCallResult> {
const MAX_URL_ELICITATION_RETRIES = 3
for (let attempt = 0; ; attempt++) {
try {
return await callToolFn({
client: connectedClient,
tool,
args,
meta,
signal,
onProgress,
})
} catch (error) {
// The MCP SDK's Protocol creates plain McpError (not UrlElicitationRequiredError)
// for error responses, so we check the error code instead of instanceof.
if (
!(error instanceof McpError) ||
error.code !== ErrorCode.UrlElicitationRequired
) {
throw error
}
// Limit the number of URL elicitation retries
if (attempt >= MAX_URL_ELICITATION_RETRIES) {
throw error
}
const errorData = error.data
const rawElicitations =
errorData != null &&
typeof errorData === 'object' &&
'elicitations' in errorData &&
Array.isArray(errorData.elicitations)
? (errorData.elicitations as unknown[])
: []
// Validate each element has the required fields for ElicitRequestURLParams
const elicitations = rawElicitations.filter(
(e): e is ElicitRequestURLParams => {
if (e == null || typeof e !== 'object') return false
const obj = e as Record<string, unknown>
return (
obj.mode === 'url' &&
typeof obj.url === 'string' &&
typeof obj.elicitationId === 'string' &&
typeof obj.message === 'string'
)
},
)
const serverName =
clientConnection.type === 'connected'
? clientConnection.name
: 'unknown'
if (elicitations.length === 0) {
logMCPDebug(
serverName,
`Tool '${tool}' returned -32042 but no valid elicitations in error data`,
)
throw error
}
logMCPDebug(
serverName,
`Tool '${tool}' requires URL elicitation (error -32042, attempt ${attempt + 1}), processing ${elicitations.length} elicitation(s)`,
)
// Process each URL elicitation from the error.
// The completion notification handler (in registerElicitationHandler) sets
// `completed: true` on the matching queue event; the dialog reacts to this flag.
for (const elicitation of elicitations) {
const { elicitationId } = elicitation
// Run elicitation hooks β they can resolve URL elicitations programmatically
const hookResponse = await runElicitationHooks(
serverName,
elicitation,
signal,
)
if (hookResponse) {
logMCPDebug(
serverName,
`URL elicitation ${elicitationId} resolved by hook: ${jsonStringify(hookResponse)}`,
)
if (hookResponse.action !== 'accept') {
return {
content: `URL elicitation was ${hookResponse.action === 'decline' ? 'declined' : hookResponse.action + 'ed'} by a hook. The tool "${tool}" could not complete because it requires the user to open a URL.`,
}
}
// Hook accepted β skip the UI and proceed to retry
continue
}
// Resolve the URL elicitation via callback (print/SDK mode) or queue (REPL mode).
let userResult: ElicitResult
if (handleElicitation) {
// Print/SDK mode: delegate to structuredIO which sends a control request
userResult = await handleElicitation(serverName, elicitation, signal)
} else {
// REPL mode: queue for ElicitationDialog with two-phase consent/waiting flow
const waitingState: ElicitationWaitingState = {
actionLabel: 'Retry now',
showCancel: true,
}
userResult = await new Promise<ElicitResult>(resolve => {
const onAbort = () => {
void resolve({ action: 'cancel' })
}
if (signal.aborted) {
onAbort()
return
}
signal.addEventListener('abort', onAbort, { once: true })
setAppState(prev => ({
...prev,
elicitation: {
queue: [
...prev.elicitation.queue,
{
serverName,
requestId: `error-elicit-${elicitationId}`,
params: elicitation,
signal,
waitingState,
respond: result => {
// Phase 1 consent: accept is a no-op (doesn't resolve retry Promise)
if (result.action === 'accept') {
return
}
// Decline or cancel: resolve the retry Promise
signal.removeEventListener('abort', onAbort)
void resolve(result)
},
onWaitingDismiss: action => {
signal.removeEventListener('abort', onAbort)
if (action === 'retry') {
void resolve({ action: 'accept' })
} else {
void resolve({ action: 'cancel' })
}
},
},
],
},
}))
})
}
// Run ElicitationResult hooks β they can modify or block the response
const finalResult = await runElicitationResultHooks(
serverName,
userResult,
signal,
'url',
elicitationId,
)
if (finalResult.action !== 'accept') {
logMCPDebug(
serverName,
`User ${finalResult.action === 'decline' ? 'declined' : finalResult.action + 'ed'} URL elicitation ${elicitationId}`,
)
return {
content: `URL elicitation was ${finalResult.action === 'decline' ? 'declined' : finalResult.action + 'ed'} by the user. The tool "${tool}" could not complete because it requires the user to open a URL.`,
}
}
logMCPDebug(
serverName,
`Elicitation ${elicitationId} completed, retrying tool call`,
)
}
// Loop back to retry the tool call
}
}
}
async function callMCPTool({
client: { client, name, config },
tool,
args,
meta,
signal,
onProgress,
}: {
client: ConnectedMCPServer
tool: string
args: Record<string, unknown>
meta?: Record<string, unknown>
signal: AbortSignal
onProgress?: (data: MCPProgress) => void
}): Promise<{
content: MCPToolResult
_meta?: Record<string, unknown>
structuredContent?: Record<string, unknown>
}> {
const toolStartTime = Date.now()
let progressInterval: NodeJS.Timeout | undefined
try {
logMCPDebug(name, `Calling MCP tool: ${tool}`)
// Set up progress logging for long-running tools (every 30 seconds)
progressInterval = setInterval(
(startTime, name, tool) => {
const elapsed = Date.now() - startTime
const elapsedSeconds = Math.floor(elapsed / 1000)
const duration = `${elapsedSeconds}s`
logMCPDebug(name, `Tool '${tool}' still running (${duration} elapsed)`)
},
30000, // Log every 30 seconds
toolStartTime,
name,
tool,
)
// Use Promise.race with our own timeout to handle cases where SDK's
// internal timeout doesn't work (e.g., SSE stream breaks mid-request)
const timeoutMs = getMcpToolTimeoutMs()
let timeoutId: NodeJS.Timeout | undefined
const timeoutPromise = new Promise<never>((_, reject) => {
timeoutId = setTimeout(
(reject, name, tool, timeoutMs) => {
reject(
new TelemetrySafeError_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS(
`MCP server "${name}" tool "${tool}" timed out after ${Math.floor(timeoutMs / 1000)}s`,
'MCP tool timeout',
),
)
},
timeoutMs,
reject,
name,
tool,
timeoutMs,
)
})
const result = await Promise.race([
client.callTool(
{
name: tool,
arguments: args,
_meta: meta,
},
CallToolResultSchema,
{
signal,
timeout: timeoutMs,
onprogress: onProgress
? sdkProgress => {
onProgress({
type: 'mcp_progress',
status: 'progress',
serverName: name,
toolName: tool,
progress: sdkProgress.progress,
total: sdkProgress.total,
progressMessage: sdkProgress.message,
})
}
: undefined,
},
),
timeoutPromise,
]).finally(() => {
if (timeoutId) {
clearTimeout(timeoutId)
}
})
if ('isError' in result && result.isError) {
let errorDetails = 'Unknown error'
if (
'content' in result &&
Array.isArray(result.content) &&
result.content.length > 0
) {
const firstContent = result.content[0]
if (
firstContent &&
typeof firstContent === 'object' &&
'text' in firstContent
) {
errorDetails = firstContent.text
}
} else if ('error' in result) {
// Fallback for legacy error format
errorDetails = String(result.error)
}
logMCPError(name, errorDetails)
throw new McpToolCallError_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS(
errorDetails,
'MCP tool returned error',
'_meta' in result && result._meta ? { _meta: result._meta } : undefined,
)
}
const elapsed = Date.now() - toolStartTime
const duration =
elapsed < 1000
? `${elapsed}ms`
: elapsed < 60000
? `${Math.floor(elapsed / 1000)}s`
: `${Math.floor(elapsed / 60000)}m ${Math.floor((elapsed % 60000) / 1000)}s`
logMCPDebug(name, `Tool '${tool}' completed successfully in ${duration}`)
// Log code indexing tool usage
const codeIndexingTool = detectCodeIndexingFromMcpServerName(name)
if (codeIndexingTool) {
logEvent('tengu_code_indexing_tool_used', {
tool: codeIndexingTool as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
source:
'mcp' as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
success: true,
})
}
const content = await processMCPResult(result, tool, name)
return {
content,
_meta: result._meta as Record<string, unknown> | undefined,
structuredContent: result.structuredContent as
| Record<string, unknown>
| undefined,
}
} catch (e) {
// Clear intervals on error
if (progressInterval !== undefined) {
clearInterval(progressInterval)
}
const elapsed = Date.now() - toolStartTime
if (e instanceof Error && e.name !== 'AbortError') {
logMCPDebug(
name,
`Tool '${tool}' failed after ${Math.floor(elapsed / 1000)}s: ${e.message}`,
)
}
// Check for 401 errors indicating expired/invalid OAuth tokens
// The MCP SDK's StreamableHTTPError has a `code` property with the HTTP status
if (e instanceof Error) {
const errorCode = 'code' in e ? (e.code as number | undefined) : undefined
if (errorCode === 401 || e instanceof UnauthorizedError) {
logMCPDebug(
name,
`Tool call returned 401 Unauthorized - token may have expired`,
)
logEvent('tengu_mcp_tool_call_auth_error', {})
throw new McpAuthError(
name,
`MCP server "${name}" requires re-authorization (token expired)`,
)
}
// Check for session expiry β two error shapes can surface here:
// 1. Direct 404 + JSON-RPC -32001 from the server (StreamableHTTPError)
// 2. -32000 "Connection closed" (McpError) β the SDK closes the transport
// after the onerror handler fires, so the pending callTool() rejects
// with this derived error instead of the original 404.
// In both cases, clear the connection cache so the next tool call
// creates a fresh session.
const isSessionExpired = isMcpSessionExpiredError(e)
const isConnectionClosedOnHttp =
'code' in e &&
(e as Error & { code?: number }).code === -32000 &&
e.message.includes('Connection closed') &&
(config.type === 'http' || config.type === 'claudeai-proxy')
if (isSessionExpired || isConnectionClosedOnHttp) {
logMCPDebug(
name,
`MCP session expired during tool call (${isSessionExpired ? '404/-32001' : 'connection closed'}), clearing connection cache for re-initialization`,
)
logEvent('tengu_mcp_session_expired', {})
await clearServerCache(name, config)
throw new McpSessionExpiredError(name)
}
}
// When the users hits esc, avoid logspew
if (!(e instanceof Error) || e.name !== 'AbortError') {
throw e
}
return { content: undefined }
} finally {
// Always clear intervals
if (progressInterval !== undefined) {
clearInterval(progressInterval)
}
}
}
function extractToolUseId(message: AssistantMessage): string | undefined {
if (message.message.content[0]?.type !== 'tool_use') {
return undefined
}
return message.message.content[0].id
}
/**
* Sets up SDK MCP clients by creating transports and connecting them.
* This is used for SDK MCP servers that run in the same process as the SDK.
*
* @param sdkMcpConfigs - The SDK MCP server configurations
* @param sendMcpMessage - Callback to send MCP messages through the control channel
* @returns Connected clients, their tools, and transport map for message routing
*/
export async function setupSdkMcpClients(
sdkMcpConfigs: Record<string, McpSdkServerConfig>,
sendMcpMessage: (
serverName: string,
message: JSONRPCMessage,
) => Promise<JSONRPCMessage>,
): Promise<{
clients: MCPServerConnection[]
tools: Tool[]
}> {
const clients: MCPServerConnection[] = []
const tools: Tool[] = []
// Connect to all servers in parallel
const results = await Promise.allSettled(
Object.entries(sdkMcpConfigs).map(async ([name, config]) => {
const transport = new SdkControlClientTransport(name, sendMcpMessage)
const client = new Client(
{
name: 'claude-code',
title: 'Claude Code',
version: MACRO.VERSION ?? 'unknown',
description: "Anthropic's agentic coding tool",
websiteUrl: PRODUCT_URL,
},
{
capabilities: {},
},
)
try {
// Connect the client
await client.connect(transport)
// Get capabilities from the server
const capabilities = client.getServerCapabilities()
// Create the connected client object
const connectedClient: MCPServerConnection = {
type: 'connected',
name,
capabilities: capabilities || {},
client,
config: { ...config, scope: 'dynamic' as const },
cleanup: async () => {
await client.close()
},
}
// Fetch tools if the server has them
const serverTools: Tool[] = []
if (capabilities?.tools) {
const sdkTools = await fetchToolsForClient(connectedClient)
serverTools.push(...sdkTools)
}
return {
client: connectedClient,
tools: serverTools,
}
} catch (error) {
// If connection fails, return failed server
logMCPError(name, `Failed to connect SDK MCP server: ${error}`)
return {
client: {
type: 'failed' as const,
name,
config: { ...config, scope: 'user' as const },
},
tools: [],
}
}
}),
)
// Process results and collect clients and tools
for (const result of results) {
if (result.status === 'fulfilled') {
clients.push(result.value.client)
tools.push(...result.value.tools)
}
// If rejected (unexpected), the error was already logged inside the promise
}
return { clients, tools }
}