π File detail
services/api/sessionIngress.ts
π― Use case
This file lives under βservices/β, which covers long-lived services (LSP, MCP, OAuth, tool execution, memory, compaction, voice, settings sync, β¦). On the API surface it exposes appendSessionLog, getSessionLogs, getSessionLogsViaOAuth, getTeleportEvents, and clearSession (and more) β mainly functions, hooks, or classes. Dependencies touch HTTP client and crypto. It composes internal code from constants, types, and utils (relative imports).
Generated from folder role, exports, dependency roots, and inline comments β not hand-reviewed for every path.
π§ Inline summary
import axios, { type AxiosError } from 'axios' import type { UUID } from 'crypto' import { getOauthConfig } from '../../constants/oauth.js' import type { Entry, TranscriptMessage } from '../../types/logs.js' import { logForDebugging } from '../../utils/debug.js'
π€ Exports (heuristic)
appendSessionLoggetSessionLogsgetSessionLogsViaOAuthgetTeleportEventsclearSessionclearAllSessions
π External import roots
Package roots from from "β¦" (relative paths omitted).
axioscrypto
π₯οΈ Source preview
import axios, { type AxiosError } from 'axios'
import type { UUID } from 'crypto'
import { getOauthConfig } from '../../constants/oauth.js'
import type { Entry, TranscriptMessage } from '../../types/logs.js'
import { logForDebugging } from '../../utils/debug.js'
import { logForDiagnosticsNoPII } from '../../utils/diagLogs.js'
import { isEnvTruthy } from '../../utils/envUtils.js'
import { logError } from '../../utils/log.js'
import { sequential } from '../../utils/sequential.js'
import { getSessionIngressAuthToken } from '../../utils/sessionIngressAuth.js'
import { sleep } from '../../utils/sleep.js'
import { jsonStringify } from '../../utils/slowOperations.js'
import { getOAuthHeaders } from '../../utils/teleport/api.js'
interface SessionIngressError {
error?: {
message?: string
type?: string
}
}
// Module-level state
const lastUuidMap: Map<string, UUID> = new Map()
const MAX_RETRIES = 10
const BASE_DELAY_MS = 500
// Per-session sequential wrappers to prevent concurrent log writes
const sequentialAppendBySession: Map<
string,
(
entry: TranscriptMessage,
url: string,
headers: Record<string, string>,
) => Promise<boolean>
> = new Map()
/**
* Gets or creates a sequential wrapper for a session
* This ensures that log appends for a session are processed one at a time
*/
function getOrCreateSequentialAppend(sessionId: string) {
let sequentialAppend = sequentialAppendBySession.get(sessionId)
if (!sequentialAppend) {
sequentialAppend = sequential(
async (
entry: TranscriptMessage,
url: string,
headers: Record<string, string>,
) => await appendSessionLogImpl(sessionId, entry, url, headers),
)
sequentialAppendBySession.set(sessionId, sequentialAppend)
}
return sequentialAppend
}
/**
* Internal implementation of appendSessionLog with retry logic
* Retries on transient errors (network, 5xx, 429). On 409, adopts the server's
* last UUID and retries (handles stale state from killed process's in-flight
* requests). Fails immediately on 401.
*/
async function appendSessionLogImpl(
sessionId: string,
entry: TranscriptMessage,
url: string,
headers: Record<string, string>,
): Promise<boolean> {
for (let attempt = 1; attempt <= MAX_RETRIES; attempt++) {
try {
const lastUuid = lastUuidMap.get(sessionId)
const requestHeaders = { ...headers }
if (lastUuid) {
requestHeaders['Last-Uuid'] = lastUuid
}
const response = await axios.put(url, entry, {
headers: requestHeaders,
validateStatus: status => status < 500,
})
if (response.status === 200 || response.status === 201) {
lastUuidMap.set(sessionId, entry.uuid)
logForDebugging(
`Successfully persisted session log entry for session ${sessionId}`,
)
return true
}
if (response.status === 409) {
// Check if our entry was actually stored (server returned 409 but entry exists)
// This handles the scenario where entry was stored but client received an error
// response, causing lastUuidMap to be stale
const serverLastUuid = response.headers['x-last-uuid']
if (serverLastUuid === entry.uuid) {
// Our entry IS the last entry on server - it was stored successfully previously
lastUuidMap.set(sessionId, entry.uuid)
logForDebugging(
`Session entry ${entry.uuid} already present on server, recovering from stale state`,
)
logForDiagnosticsNoPII('info', 'session_persist_recovered_from_409')
return true
}
// Another writer (e.g. in-flight request from a killed process)
// advanced the server's chain. Try to adopt the server's last UUID
// from the response header, or re-fetch the session to discover it.
if (serverLastUuid) {
lastUuidMap.set(sessionId, serverLastUuid as UUID)
logForDebugging(
`Session 409: adopting server lastUuid=${serverLastUuid} from header, retrying entry ${entry.uuid}`,
)
} else {
// Server didn't return x-last-uuid (e.g. v1 endpoint). Re-fetch
// the session to discover the current head of the append chain.
const logs = await fetchSessionLogsFromUrl(sessionId, url, headers)
const adoptedUuid = findLastUuid(logs)
if (adoptedUuid) {
lastUuidMap.set(sessionId, adoptedUuid)
logForDebugging(
`Session 409: re-fetched ${logs!.length} entries, adopting lastUuid=${adoptedUuid}, retrying entry ${entry.uuid}`,
)
} else {
// Can't determine server state β give up
const errorData = response.data as SessionIngressError
const errorMessage =
errorData.error?.message || 'Concurrent modification detected'
logError(
new Error(
`Session persistence conflict: UUID mismatch for session ${sessionId}, entry ${entry.uuid}. ${errorMessage}`,
),
)
logForDiagnosticsNoPII(
'error',
'session_persist_fail_concurrent_modification',
)
return false
}
}
logForDiagnosticsNoPII('info', 'session_persist_409_adopt_server_uuid')
continue // retry with updated lastUuid
}
if (response.status === 401) {
logForDebugging('Session token expired or invalid')
logForDiagnosticsNoPII('error', 'session_persist_fail_bad_token')
return false // Non-retryable
}
// Other 4xx (429, etc.) - retryable
logForDebugging(
`Failed to persist session log: ${response.status} ${response.statusText}`,
)
logForDiagnosticsNoPII('error', 'session_persist_fail_status', {
status: response.status,
attempt,
})
} catch (error) {
// Network errors, 5xx - retryable
const axiosError = error as AxiosError<SessionIngressError>
logError(new Error(`Error persisting session log: ${axiosError.message}`))
logForDiagnosticsNoPII('error', 'session_persist_fail_status', {
status: axiosError.status,
attempt,
})
}
if (attempt === MAX_RETRIES) {
logForDebugging(`Remote persistence failed after ${MAX_RETRIES} attempts`)
logForDiagnosticsNoPII(
'error',
'session_persist_error_retries_exhausted',
{ attempt },
)
return false
}
const delayMs = Math.min(BASE_DELAY_MS * Math.pow(2, attempt - 1), 8000)
logForDebugging(
`Remote persistence attempt ${attempt}/${MAX_RETRIES} failed, retrying in ${delayMs}msβ¦`,
)
await sleep(delayMs)
}
return false
}
/**
* Append a log entry to the session using JWT token
* Uses optimistic concurrency control with Last-Uuid header
* Ensures sequential execution per session to prevent race conditions
*/
export async function appendSessionLog(
sessionId: string,
entry: TranscriptMessage,
url: string,
): Promise<boolean> {
const sessionToken = getSessionIngressAuthToken()
if (!sessionToken) {
logForDebugging('No session token available for session persistence')
logForDiagnosticsNoPII('error', 'session_persist_fail_jwt_no_token')
return false
}
const headers: Record<string, string> = {
Authorization: `Bearer ${sessionToken}`,
'Content-Type': 'application/json',
}
const sequentialAppend = getOrCreateSequentialAppend(sessionId)
return sequentialAppend(entry, url, headers)
}
/**
* Get all session logs for hydration
*/
export async function getSessionLogs(
sessionId: string,
url: string,
): Promise<Entry[] | null> {
const sessionToken = getSessionIngressAuthToken()
if (!sessionToken) {
logForDebugging('No session token available for fetching session logs')
logForDiagnosticsNoPII('error', 'session_get_fail_no_token')
return null
}
const headers = { Authorization: `Bearer ${sessionToken}` }
const logs = await fetchSessionLogsFromUrl(sessionId, url, headers)
if (logs && logs.length > 0) {
// Update our lastUuid to the last entry's UUID
const lastEntry = logs.at(-1)
if (lastEntry && 'uuid' in lastEntry && lastEntry.uuid) {
lastUuidMap.set(sessionId, lastEntry.uuid)
}
}
return logs
}
/**
* Get all session logs for hydration via OAuth
* Used for teleporting sessions from the Sessions API
*/
export async function getSessionLogsViaOAuth(
sessionId: string,
accessToken: string,
orgUUID: string,
): Promise<Entry[] | null> {
const url = `${getOauthConfig().BASE_API_URL}/v1/session_ingress/session/${sessionId}`
logForDebugging(`[session-ingress] Fetching session logs from: ${url}`)
const headers = {
...getOAuthHeaders(accessToken),
'x-organization-uuid': orgUUID,
}
const result = await fetchSessionLogsFromUrl(sessionId, url, headers)
return result
}
/**
* Response shape from GET /v1/code/sessions/{id}/teleport-events.
* WorkerEvent.payload IS the Entry (TranscriptMessage struct) β the CLI
* writes it via AddWorkerEvent, the server stores it opaque, we read it
* back here.
*/
type TeleportEventsResponse = {
data: Array<{
event_id: string
event_type: string
is_compaction: boolean
payload: Entry | null
created_at: string
}>
// Unset when there are no more pages β this IS the end-of-stream
// signal (no separate has_more field).
next_cursor?: string
}
/**
* Get worker events (transcript) via the CCR v2 Sessions API. Replaces
* getSessionLogsViaOAuth once session-ingress is retired.
*
* The server dispatches per-session: Spanner for v2-native sessions,
* threadstore for pre-backfill session_* IDs. The cursor is opaque to us β
* echo it back until next_cursor is unset.
*
* Paginated (500/page default, server max 1000). session-ingress's one-shot
* 50k is gone; we loop.
*/
export async function getTeleportEvents(
sessionId: string,
accessToken: string,
orgUUID: string,
): Promise<Entry[] | null> {
const baseUrl = `${getOauthConfig().BASE_API_URL}/v1/code/sessions/${sessionId}/teleport-events`
const headers = {
...getOAuthHeaders(accessToken),
'x-organization-uuid': orgUUID,
}
logForDebugging(`[teleport] Fetching events from: ${baseUrl}`)
const all: Entry[] = []
let cursor: string | undefined
let pages = 0
// Infinite-loop guard: 1000/page Γ 100 pages = 100k events. Larger than
// session-ingress's 50k one-shot. If we hit this, something's wrong
// (server not advancing cursor) β bail rather than hang.
const maxPages = 100
while (pages < maxPages) {
const params: Record<string, string | number> = { limit: 1000 }
if (cursor !== undefined) {
params.cursor = cursor
}
let response
try {
response = await axios.get<TeleportEventsResponse>(baseUrl, {
headers,
params,
timeout: 20000,
validateStatus: status => status < 500,
})
} catch (e) {
const err = e as AxiosError
logError(new Error(`Teleport events fetch failed: ${err.message}`))
logForDiagnosticsNoPII('error', 'teleport_events_fetch_fail')
return null
}
if (response.status === 404) {
// 404 on page 0 is ambiguous during the migration window:
// (a) Session genuinely not found (not in Spanner AND not in
// threadstore) β nothing to fetch.
// (b) Route-level 404: endpoint not deployed yet, or session is
// a threadstore session not yet backfilled into Spanner.
// We can't tell them apart from the response alone. Returning null
// lets the caller fall back to session-ingress, which will correctly
// return empty for case (a) and data for case (b). Once the backfill
// is complete and session-ingress is gone, the fallback also returns
// null β same "Failed to fetch session logs" error as today.
//
// 404 mid-pagination (pages > 0) means session was deleted between
// pages β return what we have.
logForDebugging(
`[teleport] Session ${sessionId} not found (page ${pages})`,
)
logForDiagnosticsNoPII('warn', 'teleport_events_not_found')
return pages === 0 ? null : all
}
if (response.status === 401) {
logForDiagnosticsNoPII('error', 'teleport_events_bad_token')
throw new Error(
'Your session has expired. Please run /login to sign in again.',
)
}
if (response.status !== 200) {
logError(
new Error(
`Teleport events returned ${response.status}: ${jsonStringify(response.data)}`,
),
)
logForDiagnosticsNoPII('error', 'teleport_events_bad_status')
return null
}
const { data, next_cursor } = response.data
if (!Array.isArray(data)) {
logError(
new Error(
`Teleport events invalid response shape: ${jsonStringify(response.data)}`,
),
)
logForDiagnosticsNoPII('error', 'teleport_events_invalid_shape')
return null
}
// payload IS the Entry. null payload happens for threadstore non-generic
// events (server skips them) or encryption failures β skip here too.
for (const ev of data) {
if (ev.payload !== null) {
all.push(ev.payload)
}
}
pages++
// == null covers both `null` and `undefined` β the proto omits the
// field at end-of-stream, but some serializers emit `null`. Strict
// `=== undefined` would loop forever on `null` (cursor=null in query
// params stringifies to "null", which the server rejects or echoes).
if (next_cursor == null) {
break
}
cursor = next_cursor
}
if (pages >= maxPages) {
// Don't fail β return what we have. Better to teleport with a
// truncated transcript than not at all.
logError(
new Error(`Teleport events hit page cap (${maxPages}) for ${sessionId}`),
)
logForDiagnosticsNoPII('warn', 'teleport_events_page_cap')
}
logForDebugging(
`[teleport] Fetched ${all.length} events over ${pages} page(s) for ${sessionId}`,
)
return all
}
/**
* Shared implementation for fetching session logs from a URL
*/
async function fetchSessionLogsFromUrl(
sessionId: string,
url: string,
headers: Record<string, string>,
): Promise<Entry[] | null> {
try {
const response = await axios.get(url, {
headers,
timeout: 20000,
validateStatus: status => status < 500,
params: isEnvTruthy(process.env.CLAUDE_AFTER_LAST_COMPACT)
? { after_last_compact: true }
: undefined,
})
if (response.status === 200) {
const data = response.data
// Validate the response structure
if (!data || typeof data !== 'object' || !Array.isArray(data.loglines)) {
logError(
new Error(
`Invalid session logs response format: ${jsonStringify(data)}`,
),
)
logForDiagnosticsNoPII('error', 'session_get_fail_invalid_response')
return null
}
const logs = data.loglines as Entry[]
logForDebugging(
`Fetched ${logs.length} session logs for session ${sessionId}`,
)
return logs
}
if (response.status === 404) {
logForDebugging(`No existing logs for session ${sessionId}`)
logForDiagnosticsNoPII('warn', 'session_get_no_logs_for_session')
return []
}
if (response.status === 401) {
logForDebugging('Auth token expired or invalid')
logForDiagnosticsNoPII('error', 'session_get_fail_bad_token')
throw new Error(
'Your session has expired. Please run /login to sign in again.',
)
}
logForDebugging(
`Failed to fetch session logs: ${response.status} ${response.statusText}`,
)
logForDiagnosticsNoPII('error', 'session_get_fail_status', {
status: response.status,
})
return null
} catch (error) {
const axiosError = error as AxiosError<SessionIngressError>
logError(new Error(`Error fetching session logs: ${axiosError.message}`))
logForDiagnosticsNoPII('error', 'session_get_fail_status', {
status: axiosError.status,
})
return null
}
}
/**
* Walk backward through entries to find the last one with a uuid.
* Some entry types (SummaryMessage, TagMessage) don't have one.
*/
function findLastUuid(logs: Entry[] | null): UUID | undefined {
if (!logs) {
return undefined
}
const entry = logs.findLast(e => 'uuid' in e && e.uuid)
return entry && 'uuid' in entry ? (entry.uuid as UUID) : undefined
}
/**
* Clear cached state for a session
*/
export function clearSession(sessionId: string): void {
lastUuidMap.delete(sessionId)
sequentialAppendBySession.delete(sessionId)
}
/**
* Clear all cached session state (all sessions).
* Use this on /clear to free sub-agent session entries.
*/
export function clearAllSessions(): void {
lastUuidMap.clear()
sequentialAppendBySession.clear()
}