π File detail
cli/transports/WorkerStateUploader.ts
π§© .tsπ 132 linesπΎ 3,879 bytesπ text
β Back to All Filesπ― Use case
This file lives under βcli/β, which covers the CLI transport, NDJSON/streaming I/O, and command handlers. On the API surface it exposes WorkerStateUploader β mainly types, interfaces, or factory objects. It composes internal code from utils (relative imports).
Generated from folder role, exports, dependency roots, and inline comments β not hand-reviewed for every path.
π§ Inline summary
import { sleep } from '../../utils/sleep.js' /** * Coalescing uploader for PUT /worker (session state + metadata). *
π€ Exports (heuristic)
WorkerStateUploader
π₯οΈ Source preview
import { sleep } from '../../utils/sleep.js'
/**
* Coalescing uploader for PUT /worker (session state + metadata).
*
* - 1 in-flight PUT + 1 pending patch
* - New calls coalesce into pending (never grows beyond 1 slot)
* - On success: send pending if exists
* - On failure: exponential backoff (clamped), retries indefinitely
* until success or close(). Absorbs any pending patches before each retry.
* - No backpressure needed β naturally bounded at 2 slots
*
* Coalescing rules:
* - Top-level keys (worker_status, external_metadata) β last value wins
* - Inside external_metadata / internal_metadata β RFC 7396 merge:
* keys are added/overwritten, null values preserved (server deletes)
*/
type WorkerStateUploaderConfig = {
send: (body: Record<string, unknown>) => Promise<boolean>
/** Base delay for exponential backoff (ms) */
baseDelayMs: number
/** Max delay cap (ms) */
maxDelayMs: number
/** Random jitter range added to retry delay (ms) */
jitterMs: number
}
export class WorkerStateUploader {
private inflight: Promise<void> | null = null
private pending: Record<string, unknown> | null = null
private closed = false
private readonly config: WorkerStateUploaderConfig
constructor(config: WorkerStateUploaderConfig) {
this.config = config
}
/**
* Enqueue a patch to PUT /worker. Coalesces with any existing pending
* patch. Fire-and-forget β callers don't need to await.
*/
enqueue(patch: Record<string, unknown>): void {
if (this.closed) return
this.pending = this.pending ? coalescePatches(this.pending, patch) : patch
void this.drain()
}
close(): void {
this.closed = true
this.pending = null
}
private async drain(): Promise<void> {
if (this.inflight || this.closed) return
if (!this.pending) return
const payload = this.pending
this.pending = null
this.inflight = this.sendWithRetry(payload).then(() => {
this.inflight = null
if (this.pending && !this.closed) {
void this.drain()
}
})
}
/** Retries indefinitely with exponential backoff until success or close(). */
private async sendWithRetry(payload: Record<string, unknown>): Promise<void> {
let current = payload
let failures = 0
while (!this.closed) {
const ok = await this.config.send(current)
if (ok) return
failures++
await sleep(this.retryDelay(failures))
// Absorb any patches that arrived during the retry
if (this.pending && !this.closed) {
current = coalescePatches(current, this.pending)
this.pending = null
}
}
}
private retryDelay(failures: number): number {
const exponential = Math.min(
this.config.baseDelayMs * 2 ** (failures - 1),
this.config.maxDelayMs,
)
const jitter = Math.random() * this.config.jitterMs
return exponential + jitter
}
}
/**
* Coalesce two patches for PUT /worker.
*
* Top-level keys: overlay replaces base (last value wins).
* Metadata keys (external_metadata, internal_metadata): RFC 7396 merge
* one level deep β overlay keys are added/overwritten, null values
* preserved for server-side delete.
*/
function coalescePatches(
base: Record<string, unknown>,
overlay: Record<string, unknown>,
): Record<string, unknown> {
const merged = { ...base }
for (const [key, value] of Object.entries(overlay)) {
if (
(key === 'external_metadata' || key === 'internal_metadata') &&
merged[key] &&
typeof merged[key] === 'object' &&
typeof value === 'object' &&
value !== null
) {
// RFC 7396 merge β overlay keys win, nulls preserved for server
merged[key] = {
...(merged[key] as Record<string, unknown>),
...(value as Record<string, unknown>),
}
} else {
merged[key] = value
}
}
return merged
}