π File detail
bridge/remoteBridgeCore.ts
π― Use case
This file lives under βbridge/β, which covers the bridge between the UI/shell and the agent (IPC, REPL hooks, permissions, session glue). On the API surface it exposes EnvLessBridgeParams, initEnvLessBridgeCore, fetchRemoteCredentials, createCodeSession, and type RemoteCredentials β mainly functions, hooks, or classes. Dependencies touch CCR v2, bun:bundle, and HTTP client. It composes internal code from replBridgeTransport, workSecret, sessionIdCompat, flushGate, and jwtUtils (relative imports).
Generated from folder role, exports, dependency roots, and inline comments β not hand-reviewed for every path.
π§ Inline summary
// biome-ignore-all assist/source/organizeImports: ANT-ONLY import markers must not be reordered /** * Env-less Remote Control bridge core. * * "Env-less" = no Environments API layer. Distinct from "CCR v2" (the
π€ Exports (heuristic)
EnvLessBridgeParamsinitEnvLessBridgeCorefetchRemoteCredentialscreateCodeSessiontype RemoteCredentials
π External import roots
Package roots from from "β¦" (relative paths omitted).
CCR v2bun:bundleaxios
π₯οΈ Source preview
// biome-ignore-all assist/source/organizeImports: ANT-ONLY import markers must not be reordered
/**
* Env-less Remote Control bridge core.
*
* "Env-less" = no Environments API layer. Distinct from "CCR v2" (the
* /worker/* transport protocol) β the env-based path (replBridge.ts) can also
* use CCR v2 transport via CLAUDE_CODE_USE_CCR_V2. This file is about removing
* the poll/dispatch layer, not about which transport protocol is underneath.
*
* Unlike initBridgeCore (env-based, ~2400 lines), this connects directly
* to the session-ingress layer without the Environments API work-dispatch
* layer:
*
* 1. POST /v1/code/sessions (OAuth, no env_id) β session.id
* 2. POST /v1/code/sessions/{id}/bridge (OAuth) β {worker_jwt, expires_in, api_base_url, worker_epoch}
* Each /bridge call bumps epoch β it IS the register. No separate /worker/register.
* 3. createV2ReplTransport(worker_jwt, worker_epoch) β SSE + CCRClient
* 4. createTokenRefreshScheduler β proactive /bridge re-call (new JWT + new epoch)
* 5. 401 on SSE β rebuild transport with fresh /bridge credentials (same seq-num)
*
* No register/poll/ack/stop/heartbeat/deregister environment lifecycle.
* The Environments API historically existed because CCR's /worker/*
* endpoints required a session_id+role=worker JWT that only the work-dispatch
* layer could mint. Server PR #292605 (renamed in #293280) adds the /bridge endpoint as a direct
* OAuthβworker_jwt exchange, making the env layer optional for REPL sessions.
*
* Gated by `tengu_bridge_repl_v2` GrowthBook flag in initReplBridge.ts.
* REPL-only β daemon/print stay on env-based.
*/
import { feature } from 'bun:bundle'
import axios from 'axios'
import {
createV2ReplTransport,
type ReplBridgeTransport,
} from './replBridgeTransport.js'
import { buildCCRv2SdkUrl } from './workSecret.js'
import { toCompatSessionId } from './sessionIdCompat.js'
import { FlushGate } from './flushGate.js'
import { createTokenRefreshScheduler } from './jwtUtils.js'
import { getTrustedDeviceToken } from './trustedDevice.js'
import {
getEnvLessBridgeConfig,
type EnvLessBridgeConfig,
} from './envLessBridgeConfig.js'
import {
handleIngressMessage,
handleServerControlRequest,
makeResultMessage,
isEligibleBridgeMessage,
extractTitleText,
BoundedUUIDSet,
} from './bridgeMessaging.js'
import { logBridgeSkip } from './debugUtils.js'
import { logForDebugging } from '../utils/debug.js'
import { logForDiagnosticsNoPII } from '../utils/diagLogs.js'
import { isInProtectedNamespace } from '../utils/envUtils.js'
import { errorMessage } from '../utils/errors.js'
import { sleep } from '../utils/sleep.js'
import { registerCleanup } from '../utils/cleanupRegistry.js'
import {
type AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
logEvent,
} from '../services/analytics/index.js'
import type { ReplBridgeHandle, BridgeState } from './replBridge.js'
import type { Message } from '../types/message.js'
import type { SDKMessage } from '../entrypoints/agentSdkTypes.js'
import type {
SDKControlRequest,
SDKControlResponse,
} from '../entrypoints/sdk/controlTypes.js'
import type { PermissionMode } from '../utils/permissions/PermissionMode.js'
const ANTHROPIC_VERSION = '2023-06-01'
// Telemetry discriminator for ws_connected. 'initial' is the default and
// never passed to rebuildTransport (which can only be called post-init);
// Exclude<> makes that constraint explicit at both signatures.
type ConnectCause = 'initial' | 'proactive_refresh' | 'auth_401_recovery'
function oauthHeaders(accessToken: string): Record<string, string> {
return {
Authorization: `Bearer ${accessToken}`,
'Content-Type': 'application/json',
'anthropic-version': ANTHROPIC_VERSION,
}
}
export type EnvLessBridgeParams = {
baseUrl: string
orgUUID: string
title: string
getAccessToken: () => string | undefined
onAuth401?: (staleAccessToken: string) => Promise<boolean>
/**
* Converts internal Message[] β SDKMessage[] for writeMessages() and the
* initial-flush/drain paths. Injected rather than imported β mappers.ts
* transitively pulls in src/commands.ts (entire command registry + React
* tree) which would bloat bundles that don't already have it.
*/
toSDKMessages: (messages: Message[]) => SDKMessage[]
initialHistoryCap: number
initialMessages?: Message[]
onInboundMessage?: (msg: SDKMessage) => void | Promise<void>
/**
* Fired on each title-worthy user message seen in writeMessages() until
* the callback returns true (done). Mirrors replBridge.ts's onUserMessage β
* caller derives a title and PATCHes /v1/sessions/{id} so auto-started
* sessions don't stay at the generic fallback. The caller owns the
* derive-at-count-1-and-3 policy; the transport just keeps calling until
* told to stop. sessionId is the raw cse_* β updateBridgeSessionTitle
* retags internally.
*/
onUserMessage?: (text: string, sessionId: string) => boolean
onPermissionResponse?: (response: SDKControlResponse) => void
onInterrupt?: () => void
onSetModel?: (model: string | undefined) => void
onSetMaxThinkingTokens?: (maxTokens: number | null) => void
onSetPermissionMode?: (
mode: PermissionMode,
) => { ok: true } | { ok: false; error: string }
onStateChange?: (state: BridgeState, detail?: string) => void
/**
* When true, skip opening the SSE read stream β only the CCRClient write
* path is activated. Threaded to createV2ReplTransport and
* handleServerControlRequest.
*/
outboundOnly?: boolean
/** Free-form tags for session categorization (e.g. ['ccr-mirror']). */
tags?: string[]
}
/**
* Create a session, fetch a worker JWT, connect the v2 transport.
*
* Returns null on any pre-flight failure (session create failed, /bridge
* failed, transport setup failed). Caller (initReplBridge) surfaces this
* as a generic "initialization failed" state.
*/
export async function initEnvLessBridgeCore(
params: EnvLessBridgeParams,
): Promise<ReplBridgeHandle | null> {
const {
baseUrl,
orgUUID,
title,
getAccessToken,
onAuth401,
toSDKMessages,
initialHistoryCap,
initialMessages,
onInboundMessage,
onUserMessage,
onPermissionResponse,
onInterrupt,
onSetModel,
onSetMaxThinkingTokens,
onSetPermissionMode,
onStateChange,
outboundOnly,
tags,
} = params
const cfg = await getEnvLessBridgeConfig()
// ββ 1. Create session (POST /v1/code/sessions, no env_id) βββββββββββββββ
const accessToken = getAccessToken()
if (!accessToken) {
logForDebugging('[remote-bridge] No OAuth token')
return null
}
const createdSessionId = await withRetry(
() =>
createCodeSession(baseUrl, accessToken, title, cfg.http_timeout_ms, tags),
'createCodeSession',
cfg,
)
if (!createdSessionId) {
onStateChange?.('failed', 'Session creation failed β see debug log')
logBridgeSkip('v2_session_create_failed', undefined, true)
return null
}
const sessionId: string = createdSessionId
logForDebugging(`[remote-bridge] Created session ${sessionId}`)
logForDiagnosticsNoPII('info', 'bridge_repl_v2_session_created')
// ββ 2. Fetch bridge credentials (POST /bridge β worker_jwt, expires_in, api_base_url) ββ
const credentials = await withRetry(
() =>
fetchRemoteCredentials(
sessionId,
baseUrl,
accessToken,
cfg.http_timeout_ms,
),
'fetchRemoteCredentials',
cfg,
)
if (!credentials) {
onStateChange?.('failed', 'Remote credentials fetch failed β see debug log')
logBridgeSkip('v2_remote_creds_failed', undefined, true)
void archiveSession(
sessionId,
baseUrl,
accessToken,
orgUUID,
cfg.http_timeout_ms,
)
return null
}
logForDebugging(
`[remote-bridge] Fetched bridge credentials (expires_in=${credentials.expires_in}s)`,
)
// ββ 3. Build v2 transport (SSETransport + CCRClient) ββββββββββββββββββββ
const sessionUrl = buildCCRv2SdkUrl(credentials.api_base_url, sessionId)
logForDebugging(`[remote-bridge] v2 session URL: ${sessionUrl}`)
let transport: ReplBridgeTransport
try {
transport = await createV2ReplTransport({
sessionUrl,
ingressToken: credentials.worker_jwt,
sessionId,
epoch: credentials.worker_epoch,
heartbeatIntervalMs: cfg.heartbeat_interval_ms,
heartbeatJitterFraction: cfg.heartbeat_jitter_fraction,
// Per-instance closure β keeps the worker JWT out of
// process.env.CLAUDE_CODE_SESSION_ACCESS_TOKEN, which mcp/client.ts
// reads ungatedly and would otherwise send to user-configured ws/http
// MCP servers. Frozen-at-construction is correct: transport is fully
// rebuilt on refresh (rebuildTransport below).
getAuthToken: () => credentials.worker_jwt,
outboundOnly,
})
} catch (err) {
logForDebugging(
`[remote-bridge] v2 transport setup failed: ${errorMessage(err)}`,
{ level: 'error' },
)
onStateChange?.('failed', `Transport setup failed: ${errorMessage(err)}`)
logBridgeSkip('v2_transport_setup_failed', undefined, true)
void archiveSession(
sessionId,
baseUrl,
accessToken,
orgUUID,
cfg.http_timeout_ms,
)
return null
}
logForDebugging(
`[remote-bridge] v2 transport created (epoch=${credentials.worker_epoch})`,
)
onStateChange?.('ready')
// ββ 4. State ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
// Echo dedup: messages we POST come back on the read stream. Seeded with
// initial message UUIDs so server echoes of flushed history are recognized.
// Both sets cover initial UUIDs β recentPostedUUIDs is a 2000-cap ring buffer
// and could evict them after enough live writes; initialMessageUUIDs is the
// unbounded fallback. Defense-in-depth; mirrors replBridge.ts.
const recentPostedUUIDs = new BoundedUUIDSet(cfg.uuid_dedup_buffer_size)
const initialMessageUUIDs = new Set<string>()
if (initialMessages) {
for (const msg of initialMessages) {
initialMessageUUIDs.add(msg.uuid)
recentPostedUUIDs.add(msg.uuid)
}
}
// Defensive dedup for re-delivered inbound prompts (seq-num negotiation
// edge cases, server history replay after transport swap).
const recentInboundUUIDs = new BoundedUUIDSet(cfg.uuid_dedup_buffer_size)
// FlushGate: queue live writes while the history flush POST is in flight,
// so the server receives [history..., live...] in order.
const flushGate = new FlushGate<Message>()
let initialFlushDone = false
let tornDown = false
let authRecoveryInFlight = false
// Latch for onUserMessage β flips true when the callback returns true
// (policy says "done deriving"). sessionId is const (no re-create path β
// rebuildTransport swaps JWT/epoch, same session), so no reset needed.
let userMessageCallbackDone = !onUserMessage
// Telemetry: why did onConnect fire? Set by rebuildTransport before
// wireTransportCallbacks; read asynchronously by onConnect. Race-safe
// because authRecoveryInFlight serializes rebuild callers, and a fresh
// initEnvLessBridgeCore() call gets a fresh closure defaulting to 'initial'.
let connectCause: ConnectCause = 'initial'
// Deadline for onConnect after transport.connect(). Cleared by onConnect
// (connected) and onClose (got a close β not silent). If neither fires
// before cfg.connect_timeout_ms, onConnectTimeout emits β the only
// signal for the `started β (silence)` gap.
let connectDeadline: ReturnType<typeof setTimeout> | undefined
function onConnectTimeout(cause: ConnectCause): void {
if (tornDown) return
logEvent('tengu_bridge_repl_connect_timeout', {
v2: true,
elapsed_ms: cfg.connect_timeout_ms,
cause:
cause as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
})
}
// ββ 5. JWT refresh scheduler ββββββββββββββββββββββββββββββββββββββββββββ
// Schedule a callback 5min before expiry (per response.expires_in). On fire,
// re-fetch /bridge with OAuth β rebuild transport with fresh credentials.
// Each /bridge call bumps epoch server-side, so a JWT-only swap would leave
// the old CCRClient heartbeating with a stale epoch β 409 within 20s.
// JWT is opaque β do not decode.
const refresh = createTokenRefreshScheduler({
refreshBufferMs: cfg.token_refresh_buffer_ms,
getAccessToken: async () => {
// Unconditionally refresh OAuth before calling /bridge β getAccessToken()
// returns expired tokens as non-null strings (doesn't check expiresAt),
// so truthiness doesn't mean valid. Pass the stale token to onAuth401
// so handleOAuth401Error's keychain-comparison can detect parallel refresh.
const stale = getAccessToken()
if (onAuth401) await onAuth401(stale ?? '')
return getAccessToken() ?? stale
},
onRefresh: (sid, oauthToken) => {
void (async () => {
// Laptop wake: overdue proactive timer + SSE 401 fire ~simultaneously.
// Claim the flag BEFORE the /bridge fetch so the other path skips
// entirely β prevents double epoch bump (each /bridge call bumps; if
// both fetch, the first rebuild gets a stale epoch and 409s).
if (authRecoveryInFlight || tornDown) {
logForDebugging(
'[remote-bridge] Recovery already in flight, skipping proactive refresh',
)
return
}
authRecoveryInFlight = true
try {
const fresh = await withRetry(
() =>
fetchRemoteCredentials(
sid,
baseUrl,
oauthToken,
cfg.http_timeout_ms,
),
'fetchRemoteCredentials (proactive)',
cfg,
)
if (!fresh || tornDown) return
await rebuildTransport(fresh, 'proactive_refresh')
logForDebugging(
'[remote-bridge] Transport rebuilt (proactive refresh)',
)
} catch (err) {
logForDebugging(
`[remote-bridge] Proactive refresh rebuild failed: ${errorMessage(err)}`,
{ level: 'error' },
)
logForDiagnosticsNoPII(
'error',
'bridge_repl_v2_proactive_refresh_failed',
)
if (!tornDown) {
onStateChange?.('failed', `Refresh failed: ${errorMessage(err)}`)
}
} finally {
authRecoveryInFlight = false
}
})()
},
label: 'remote',
})
refresh.scheduleFromExpiresIn(sessionId, credentials.expires_in)
// ββ 6. Wire callbacks (extracted so transport-rebuild can re-wire) ββββββ
function wireTransportCallbacks(): void {
transport.setOnConnect(() => {
clearTimeout(connectDeadline)
logForDebugging('[remote-bridge] v2 transport connected')
logForDiagnosticsNoPII('info', 'bridge_repl_v2_transport_connected')
logEvent('tengu_bridge_repl_ws_connected', {
v2: true,
cause:
connectCause as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
})
if (!initialFlushDone && initialMessages && initialMessages.length > 0) {
initialFlushDone = true
// Capture current transport β if 401/teardown happens mid-flush,
// the stale .finally() must not drain the gate or signal connected.
// (Same guard pattern as replBridge.ts:1119.)
const flushTransport = transport
void flushHistory(initialMessages)
.catch(e =>
logForDebugging(`[remote-bridge] flushHistory failed: ${e}`),
)
.finally(() => {
// authRecoveryInFlight catches the v1-vs-v2 asymmetry: v1 nulls
// transport synchronously in setOnClose (replBridge.ts:1175), so
// transport !== flushTransport trips immediately. v2 doesn't null β
// transport reassigned only at rebuildTransport:346, 3 awaits deep.
// authRecoveryInFlight is set synchronously at rebuildTransport entry.
if (
transport !== flushTransport ||
tornDown ||
authRecoveryInFlight
) {
return
}
drainFlushGate()
onStateChange?.('connected')
})
} else if (!flushGate.active) {
onStateChange?.('connected')
}
})
transport.setOnData((data: string) => {
handleIngressMessage(
data,
recentPostedUUIDs,
recentInboundUUIDs,
onInboundMessage,
// Remote client answered the permission prompt β the turn resumes.
// Without this the server stays on requires_action until the next
// user message or turn-end result.
onPermissionResponse
? res => {
transport.reportState('running')
onPermissionResponse(res)
}
: undefined,
req =>
handleServerControlRequest(req, {
transport,
sessionId,
onInterrupt,
onSetModel,
onSetMaxThinkingTokens,
onSetPermissionMode,
outboundOnly,
}),
)
})
transport.setOnClose((code?: number) => {
clearTimeout(connectDeadline)
if (tornDown) return
logForDebugging(`[remote-bridge] v2 transport closed (code=${code})`)
logEvent('tengu_bridge_repl_ws_closed', { code, v2: true })
// onClose fires only for TERMINAL failures: 401 (JWT invalid),
// 4090 (CCR epoch mismatch), 4091 (CCR init failed), or SSE 10-min
// reconnect budget exhausted. Transient disconnects are handled
// transparently inside SSETransport. 401 we can recover from (fetch
// fresh JWT, rebuild transport); all other codes are dead-ends.
if (code === 401 && !authRecoveryInFlight) {
void recoverFromAuthFailure()
return
}
onStateChange?.('failed', `Transport closed (code ${code})`)
})
}
// ββ 7. Transport rebuild (shared by proactive refresh + 401 recovery) ββ
// Every /bridge call bumps epoch server-side. Both refresh paths must
// rebuild the transport with the new epoch β a JWT-only swap leaves the
// old CCRClient heartbeating stale epoch β 409. SSE resumes from the old
// transport's high-water-mark seq-num so no server-side replay.
// Caller MUST set authRecoveryInFlight = true before calling (synchronously,
// before any await) and clear it in a finally. This function doesn't manage
// the flag β moving it here would be too late to prevent a double /bridge
// fetch, and each fetch bumps epoch.
async function rebuildTransport(
fresh: RemoteCredentials,
cause: Exclude<ConnectCause, 'initial'>,
): Promise<void> {
connectCause = cause
// Queue writes during rebuild β once /bridge returns, the old transport's
// epoch is stale and its next write/heartbeat 409s. Without this gate,
// writeMessages adds UUIDs to recentPostedUUIDs then writeBatch silently
// no-ops (closed uploader after 409) β permanent silent message loss.
flushGate.start()
try {
const seq = transport.getLastSequenceNum()
transport.close()
transport = await createV2ReplTransport({
sessionUrl: buildCCRv2SdkUrl(fresh.api_base_url, sessionId),
ingressToken: fresh.worker_jwt,
sessionId,
epoch: fresh.worker_epoch,
heartbeatIntervalMs: cfg.heartbeat_interval_ms,
heartbeatJitterFraction: cfg.heartbeat_jitter_fraction,
initialSequenceNum: seq,
getAuthToken: () => fresh.worker_jwt,
outboundOnly,
})
if (tornDown) {
// Teardown fired during the async createV2ReplTransport window.
// Don't wire/connect/schedule β we'd re-arm timers after cancelAll()
// and fire onInboundMessage into a torn-down bridge.
transport.close()
return
}
wireTransportCallbacks()
transport.connect()
connectDeadline = setTimeout(
onConnectTimeout,
cfg.connect_timeout_ms,
connectCause,
)
refresh.scheduleFromExpiresIn(sessionId, fresh.expires_in)
// Drain queued writes into the new uploader. Runs before
// ccr.initialize() resolves (transport.connect() is fire-and-forget),
// but the uploader serializes behind the initial PUT /worker. If
// init fails (4091), events drop β but only recentPostedUUIDs
// (per-instance) is populated, so re-enabling the bridge re-flushes.
drainFlushGate()
} finally {
// End the gate on failure paths too β drainFlushGate already ended
// it on success. Queued messages are dropped (transport still dead).
flushGate.drop()
}
}
// ββ 8. 401 recovery (OAuth refresh + rebuild) βββββββββββββββββββββββββββ
async function recoverFromAuthFailure(): Promise<void> {
// setOnClose already guards `!authRecoveryInFlight` but that check and
// this set must be atomic against onRefresh β claim synchronously before
// any await. Laptop wake fires both paths ~simultaneously.
if (authRecoveryInFlight) return
authRecoveryInFlight = true
onStateChange?.('reconnecting', 'JWT expired β refreshing')
logForDebugging('[remote-bridge] 401 on SSE β attempting JWT refresh')
try {
// Unconditionally try OAuth refresh β getAccessToken() returns expired
// tokens as non-null strings, so !oauthToken doesn't catch expiry.
// Pass the stale token so handleOAuth401Error's keychain-comparison
// can detect if another tab already refreshed.
const stale = getAccessToken()
if (onAuth401) await onAuth401(stale ?? '')
const oauthToken = getAccessToken() ?? stale
if (!oauthToken || tornDown) {
if (!tornDown) {
onStateChange?.('failed', 'JWT refresh failed: no OAuth token')
}
return
}
const fresh = await withRetry(
() =>
fetchRemoteCredentials(
sessionId,
baseUrl,
oauthToken,
cfg.http_timeout_ms,
),
'fetchRemoteCredentials (recovery)',
cfg,
)
if (!fresh || tornDown) {
if (!tornDown) {
onStateChange?.('failed', 'JWT refresh failed after 401')
}
return
}
// If 401 interrupted the initial flush, writeBatch may have silently
// no-op'd on the closed uploader (ccr.close() ran in the SSE wrapper
// before our setOnClose callback). Reset so the new onConnect re-flushes.
// (v1 scopes initialFlushDone inside the per-transport closure at
// replBridge.ts:1027 so it resets naturally; v2 has it at outer scope.)
initialFlushDone = false
await rebuildTransport(fresh, 'auth_401_recovery')
logForDebugging('[remote-bridge] Transport rebuilt after 401')
} catch (err) {
logForDebugging(
`[remote-bridge] 401 recovery failed: ${errorMessage(err)}`,
{ level: 'error' },
)
logForDiagnosticsNoPII('error', 'bridge_repl_v2_jwt_refresh_failed')
if (!tornDown) {
onStateChange?.('failed', `JWT refresh failed: ${errorMessage(err)}`)
}
} finally {
authRecoveryInFlight = false
}
}
wireTransportCallbacks()
// Start flushGate BEFORE connect so writeMessages() during handshake
// queues instead of racing the history POST.
if (initialMessages && initialMessages.length > 0) {
flushGate.start()
}
transport.connect()
connectDeadline = setTimeout(
onConnectTimeout,
cfg.connect_timeout_ms,
connectCause,
)
// ββ 8. History flush + drain helpers ββββββββββββββββββββββββββββββββββββ
function drainFlushGate(): void {
const msgs = flushGate.end()
if (msgs.length === 0) return
for (const msg of msgs) recentPostedUUIDs.add(msg.uuid)
const events = toSDKMessages(msgs).map(m => ({
...m,
session_id: sessionId,
}))
if (msgs.some(m => m.type === 'user')) {
transport.reportState('running')
}
logForDebugging(
`[remote-bridge] Drained ${msgs.length} queued message(s) after flush`,
)
void transport.writeBatch(events)
}
async function flushHistory(msgs: Message[]): Promise<void> {
// v2 always creates a fresh server session (unconditional createCodeSession
// above) β no session reuse, no double-post risk. Unlike v1, we do NOT
// filter by previouslyFlushedUUIDs: that set persists across REPL enable/
// disable cycles (useRef), so it would wrongly suppress history on re-enable.
const eligible = msgs.filter(isEligibleBridgeMessage)
const capped =
initialHistoryCap > 0 && eligible.length > initialHistoryCap
? eligible.slice(-initialHistoryCap)
: eligible
if (capped.length < eligible.length) {
logForDebugging(
`[remote-bridge] Capped initial flush: ${eligible.length} -> ${capped.length} (cap=${initialHistoryCap})`,
)
}
const events = toSDKMessages(capped).map(m => ({
...m,
session_id: sessionId,
}))
if (events.length === 0) return
// Mid-turn init: if Remote Control is enabled while a query is running,
// the last eligible message is a user prompt or tool_result (both 'user'
// type). Without this the init PUT's 'idle' sticks until the next user-
// type message forwards via writeMessages β which for a pure-text turn
// is never (only assistant chunks stream post-init). Check eligible (pre-
// cap), not capped: the cap may truncate to a user message even when the
// actual trailing message is assistant.
if (eligible.at(-1)?.type === 'user') {
transport.reportState('running')
}
logForDebugging(`[remote-bridge] Flushing ${events.length} history events`)
await transport.writeBatch(events)
}
// ββ 9. Teardown βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
// On SIGINT/SIGTERM/β /exit, gracefulShutdown races runCleanupFunctions()
// against a 2s cap before forceExit kills the process. Budget accordingly:
// - archive: teardown_archive_timeout_ms (default 1500, cap 2000)
// - result write: fire-and-forget, archive latency covers the drain
// - 401 retry: only if first archive 401s, shares the same budget
async function teardown(): Promise<void> {
if (tornDown) return
tornDown = true
refresh.cancelAll()
clearTimeout(connectDeadline)
flushGate.drop()
// Fire the result message before archive β transport.write() only awaits
// enqueue (SerialBatchEventUploader resolves once buffered, drain is
// async). Archiving before close() gives the uploader's drain loop a
// window (typical archive β 100-500ms) to POST the result without an
// explicit sleep. close() sets closed=true which interrupts drain at the
// next while-check, so close-before-archive drops the result.
transport.reportState('idle')
void transport.write(makeResultMessage(sessionId))
let token = getAccessToken()
let status = await archiveSession(
sessionId,
baseUrl,
token,
orgUUID,
cfg.teardown_archive_timeout_ms,
)
// Token is usually fresh (refresh scheduler runs 5min before expiry) but
// laptop-wake past the refresh window leaves getAccessToken() returning a
// stale string. Retry once on 401 β onAuth401 (= handleOAuth401Error)
// clears keychain cache + force-refreshes. No proactive refresh on the
// happy path: handleOAuth401Error force-refreshes even valid tokens,
// which would waste budget 99% of the time. try/catch mirrors
// recoverFromAuthFailure: keychain reads can throw (macOS locked after
// wake); an uncaught throw here would skip transport.close + telemetry.
if (status === 401 && onAuth401) {
try {
await onAuth401(token ?? '')
token = getAccessToken()
status = await archiveSession(
sessionId,
baseUrl,
token,
orgUUID,
cfg.teardown_archive_timeout_ms,
)
} catch (err) {
logForDebugging(
`[remote-bridge] Teardown 401 retry threw: ${errorMessage(err)}`,
{ level: 'error' },
)
}
}
transport.close()
const archiveStatus: ArchiveTelemetryStatus =
status === 'no_token'
? 'skipped_no_token'
: status === 'timeout' || status === 'error'
? 'network_error'
: status >= 500
? 'server_5xx'
: status >= 400
? 'server_4xx'
: 'ok'
logForDebugging(`[remote-bridge] Torn down (archive=${status})`)
logForDiagnosticsNoPII('info', 'bridge_repl_v2_teardown')
logEvent(
feature('CCR_MIRROR') && outboundOnly
? 'tengu_ccr_mirror_teardown'
: 'tengu_bridge_repl_teardown',
{
v2: true,
archive_status:
archiveStatus as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
archive_ok: typeof status === 'number' && status < 400,
archive_http_status: typeof status === 'number' ? status : undefined,
archive_timeout: status === 'timeout',
archive_no_token: status === 'no_token',
},
)
}
const unregister = registerCleanup(teardown)
if (feature('CCR_MIRROR') && outboundOnly) {
logEvent('tengu_ccr_mirror_started', {
v2: true,
expires_in_s: credentials.expires_in,
})
} else {
logEvent('tengu_bridge_repl_started', {
has_initial_messages: !!(initialMessages && initialMessages.length > 0),
v2: true,
expires_in_s: credentials.expires_in,
inProtectedNamespace: isInProtectedNamespace(),
})
}
// ββ 10. Handle ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
return {
bridgeSessionId: sessionId,
environmentId: '',
sessionIngressUrl: credentials.api_base_url,
writeMessages(messages) {
const filtered = messages.filter(
m =>
isEligibleBridgeMessage(m) &&
!initialMessageUUIDs.has(m.uuid) &&
!recentPostedUUIDs.has(m.uuid),
)
if (filtered.length === 0) return
// Fire onUserMessage for title derivation. Scan before the flushGate
// check β prompts are title-worthy even if they queue. Keeps calling
// on every title-worthy message until the callback returns true; the
// caller owns the policy (derive at 1st and 3rd, skip if explicit).
if (!userMessageCallbackDone) {
for (const m of filtered) {
const text = extractTitleText(m)
if (text !== undefined && onUserMessage?.(text, sessionId)) {
userMessageCallbackDone = true
break
}
}
}
if (flushGate.enqueue(...filtered)) {
logForDebugging(
`[remote-bridge] Queued ${filtered.length} message(s) during flush`,
)
return
}
for (const msg of filtered) recentPostedUUIDs.add(msg.uuid)
const events = toSDKMessages(filtered).map(m => ({
...m,
session_id: sessionId,
}))
// v2 does not derive worker_status from events server-side (unlike v1
// session-ingress session_status_updater.go). Push it from here so the
// CCR web session list shows Running instead of stuck on Idle. A user
// message in the batch marks turn start. CCRClient.reportState dedupes
// consecutive same-state pushes.
if (filtered.some(m => m.type === 'user')) {
transport.reportState('running')
}
logForDebugging(`[remote-bridge] Sending ${filtered.length} message(s)`)
void transport.writeBatch(events)
},
writeSdkMessages(messages: SDKMessage[]) {
const filtered = messages.filter(
m => !m.uuid || !recentPostedUUIDs.has(m.uuid),
)
if (filtered.length === 0) return
for (const msg of filtered) {
if (msg.uuid) recentPostedUUIDs.add(msg.uuid)
}
const events = filtered.map(m => ({ ...m, session_id: sessionId }))
void transport.writeBatch(events)
},
sendControlRequest(request: SDKControlRequest) {
if (authRecoveryInFlight) {
logForDebugging(
`[remote-bridge] Dropping control_request during 401 recovery: ${request.request_id}`,
)
return
}
const event = { ...request, session_id: sessionId }
if (request.request.subtype === 'can_use_tool') {
transport.reportState('requires_action')
}
void transport.write(event)
logForDebugging(
`[remote-bridge] Sent control_request request_id=${request.request_id}`,
)
},
sendControlResponse(response: SDKControlResponse) {
if (authRecoveryInFlight) {
logForDebugging(
'[remote-bridge] Dropping control_response during 401 recovery',
)
return
}
const event = { ...response, session_id: sessionId }
transport.reportState('running')
void transport.write(event)
logForDebugging('[remote-bridge] Sent control_response')
},
sendControlCancelRequest(requestId: string) {
if (authRecoveryInFlight) {
logForDebugging(
`[remote-bridge] Dropping control_cancel_request during 401 recovery: ${requestId}`,
)
return
}
const event = {
type: 'control_cancel_request' as const,
request_id: requestId,
session_id: sessionId,
}
// Hook/classifier/channel/recheck resolved the permission locally β
// interactiveHandler calls only cancelRequest (no sendResponse) on
// those paths, so without this the server stays on requires_action.
transport.reportState('running')
void transport.write(event)
logForDebugging(
`[remote-bridge] Sent control_cancel_request request_id=${requestId}`,
)
},
sendResult() {
if (authRecoveryInFlight) {
logForDebugging('[remote-bridge] Dropping result during 401 recovery')
return
}
transport.reportState('idle')
void transport.write(makeResultMessage(sessionId))
logForDebugging(`[remote-bridge] Sent result`)
},
async teardown() {
unregister()
await teardown()
},
}
}
// βββ Session API (v2 /code/sessions, no env) βββββββββββββββββββββββββββββββββ
/** Retry an async init call with exponential backoff + jitter. */
async function withRetry<T>(
fn: () => Promise<T | null>,
label: string,
cfg: EnvLessBridgeConfig,
): Promise<T | null> {
const max = cfg.init_retry_max_attempts
for (let attempt = 1; attempt <= max; attempt++) {
const result = await fn()
if (result !== null) return result
if (attempt < max) {
const base = cfg.init_retry_base_delay_ms * 2 ** (attempt - 1)
const jitter =
base * cfg.init_retry_jitter_fraction * (2 * Math.random() - 1)
const delay = Math.min(base + jitter, cfg.init_retry_max_delay_ms)
logForDebugging(
`[remote-bridge] ${label} failed (attempt ${attempt}/${max}), retrying in ${Math.round(delay)}ms`,
)
await sleep(delay)
}
}
return null
}
// Moved to codeSessionApi.ts so the SDK /bridge subpath can bundle them
// without pulling in this file's heavy CLI tree (analytics, transport).
export {
createCodeSession,
type RemoteCredentials,
} from './codeSessionApi.js'
import {
createCodeSession,
fetchRemoteCredentials as fetchRemoteCredentialsRaw,
type RemoteCredentials,
} from './codeSessionApi.js'
import { getBridgeBaseUrlOverride } from './bridgeConfig.js'
// CLI-side wrapper that applies the CLAUDE_BRIDGE_BASE_URL dev override and
// injects the trusted-device token (both are env/GrowthBook reads that the
// SDK-facing codeSessionApi.ts export must stay free of).
export async function fetchRemoteCredentials(
sessionId: string,
baseUrl: string,
accessToken: string,
timeoutMs: number,
): Promise<RemoteCredentials | null> {
const creds = await fetchRemoteCredentialsRaw(
sessionId,
baseUrl,
accessToken,
timeoutMs,
getTrustedDeviceToken(),
)
if (!creds) return null
return getBridgeBaseUrlOverride()
? { ...creds, api_base_url: baseUrl }
: creds
}
type ArchiveStatus = number | 'timeout' | 'error' | 'no_token'
// Single categorical for BQ `GROUP BY archive_status`. The booleans on
// _teardown predate this and are redundant with it (except archive_timeout,
// which distinguishes ECONNABORTED from other network errors β both map to
// 'network_error' here since the dominant cause in a 1.5s window is timeout).
type ArchiveTelemetryStatus =
| 'ok'
| 'skipped_no_token'
| 'network_error'
| 'server_4xx'
| 'server_5xx'
async function archiveSession(
sessionId: string,
baseUrl: string,
accessToken: string | undefined,
orgUUID: string,
timeoutMs: number,
): Promise<ArchiveStatus> {
if (!accessToken) return 'no_token'
// Archive lives at the compat layer (/v1/sessions/*, not /v1/code/sessions).
// compat.parseSessionID only accepts TagSession (session_*), so retag cse_*.
// anthropic-beta + x-organization-uuid are required β without them the
// compat gateway 404s before reaching the handler.
//
// Unlike bridgeMain.ts (which caches compatId in sessionCompatIds to keep
// in-memory titledSessions/logger keys consistent across a mid-session
// gate flip), this compatId is only a server URL path segment β no
// in-memory state. Fresh compute matches whatever the server currently
// validates: if the gate is OFF, the server has been updated to accept
// cse_* and we correctly send it.
const compatId = toCompatSessionId(sessionId)
try {
const response = await axios.post(
`${baseUrl}/v1/sessions/${compatId}/archive`,
{},
{
headers: {
...oauthHeaders(accessToken),
'anthropic-beta': 'ccr-byoc-2025-07-29',
'x-organization-uuid': orgUUID,
},
timeout: timeoutMs,
validateStatus: () => true,
},
)
logForDebugging(
`[remote-bridge] Archive ${compatId} status=${response.status}`,
)
return response.status
} catch (err) {
const msg = errorMessage(err)
logForDebugging(`[remote-bridge] Archive failed: ${msg}`)
return axios.isAxiosError(err) && err.code === 'ECONNABORTED'
? 'timeout'
: 'error'
}
}