π File detail
cli/transports/SerialBatchEventUploader.ts
π§© .tsπ 276 linesπΎ 9,089 bytesπ text
β Back to All Filesπ― Use case
This file lives under βcli/β, which covers the CLI transport, NDJSON/streaming I/O, and command handlers. On the API surface it exposes RetryableError and SerialBatchEventUploader β mainly types, interfaces, or factory objects. It composes internal code from utils (relative imports).
Generated from folder role, exports, dependency roots, and inline comments β not hand-reviewed for every path.
π§ Inline summary
import { jsonStringify } from '../../utils/slowOperations.js' /** * Serial ordered event uploader with batching, retry, and backpressure. *
π€ Exports (heuristic)
RetryableErrorSerialBatchEventUploader
π₯οΈ Source preview
import { jsonStringify } from '../../utils/slowOperations.js'
/**
* Serial ordered event uploader with batching, retry, and backpressure.
*
* - enqueue() adds events to a pending buffer
* - At most 1 POST in-flight at a time
* - Drains up to maxBatchSize items per POST
* - New events accumulate while in-flight
* - On failure: exponential backoff (clamped), retries indefinitely
* until success or close() β unless maxConsecutiveFailures is set,
* in which case the failing batch is dropped and drain advances
* - flush() blocks until pending is empty and kicks drain if needed
* - Backpressure: enqueue() blocks when maxQueueSize is reached
*/
/**
* Throw from config.send() to make the uploader wait a server-supplied
* duration before retrying (e.g. 429 with Retry-After). When retryAfterMs
* is set, it overrides exponential backoff for that attempt β clamped to
* [baseDelayMs, maxDelayMs] and jittered so a misbehaving server can
* neither hot-loop nor stall the client, and many sessions sharing a rate
* limit don't all pounce at the same instant. Without retryAfterMs, behaves
* like any other thrown error (exponential backoff).
*/
export class RetryableError extends Error {
constructor(
message: string,
readonly retryAfterMs?: number,
) {
super(message)
}
}
type SerialBatchEventUploaderConfig<T> = {
/** Max items per POST (1 = no batching) */
maxBatchSize: number
/**
* Max serialized bytes per POST. First item always goes in regardless of
* size; subsequent items only if cumulative JSON bytes stay under this.
* Undefined = no byte limit (count-only batching).
*/
maxBatchBytes?: number
/** Max pending items before enqueue() blocks */
maxQueueSize: number
/** The actual HTTP call β caller controls payload format */
send: (batch: T[]) => Promise<void>
/** Base delay for exponential backoff (ms) */
baseDelayMs: number
/** Max delay cap (ms) */
maxDelayMs: number
/** Random jitter range added to retry delay (ms) */
jitterMs: number
/**
* After this many consecutive send() failures, drop the failing batch
* and move on to the next pending item with a fresh failure budget.
* Undefined = retry indefinitely (default).
*/
maxConsecutiveFailures?: number
/** Called when a batch is dropped for hitting maxConsecutiveFailures. */
onBatchDropped?: (batchSize: number, failures: number) => void
}
export class SerialBatchEventUploader<T> {
private pending: T[] = []
private pendingAtClose = 0
private draining = false
private closed = false
private backpressureResolvers: Array<() => void> = []
private sleepResolve: (() => void) | null = null
private flushResolvers: Array<() => void> = []
private droppedBatches = 0
private readonly config: SerialBatchEventUploaderConfig<T>
constructor(config: SerialBatchEventUploaderConfig<T>) {
this.config = config
}
/**
* Monotonic count of batches dropped via maxConsecutiveFailures. Callers
* can snapshot before flush() and compare after to detect silent drops
* (flush() resolves normally even when batches were dropped).
*/
get droppedBatchCount(): number {
return this.droppedBatches
}
/**
* Pending queue depth. After close(), returns the count at close time β
* close() clears the queue but shutdown diagnostics may read this after.
*/
get pendingCount(): number {
return this.closed ? this.pendingAtClose : this.pending.length
}
/**
* Add events to the pending buffer. Returns immediately if space is
* available. Blocks (awaits) if the buffer is full β caller pauses
* until drain frees space.
*/
async enqueue(events: T | T[]): Promise<void> {
if (this.closed) return
const items = Array.isArray(events) ? events : [events]
if (items.length === 0) return
// Backpressure: wait until there's space
while (
this.pending.length + items.length > this.config.maxQueueSize &&
!this.closed
) {
await new Promise<void>(resolve => {
this.backpressureResolvers.push(resolve)
})
}
if (this.closed) return
this.pending.push(...items)
void this.drain()
}
/**
* Block until all pending events have been sent.
* Used at turn boundaries and graceful shutdown.
*/
flush(): Promise<void> {
if (this.pending.length === 0 && !this.draining) {
return Promise.resolve()
}
void this.drain()
return new Promise<void>(resolve => {
this.flushResolvers.push(resolve)
})
}
/**
* Drop pending events and stop processing.
* Resolves any blocked enqueue() and flush() callers.
*/
close(): void {
if (this.closed) return
this.closed = true
this.pendingAtClose = this.pending.length
this.pending = []
this.sleepResolve?.()
this.sleepResolve = null
for (const resolve of this.backpressureResolvers) resolve()
this.backpressureResolvers = []
for (const resolve of this.flushResolvers) resolve()
this.flushResolvers = []
}
/**
* Drain loop. At most one instance runs at a time (guarded by this.draining).
* Sends batches serially. On failure, backs off and retries indefinitely.
*/
private async drain(): Promise<void> {
if (this.draining || this.closed) return
this.draining = true
let failures = 0
try {
while (this.pending.length > 0 && !this.closed) {
const batch = this.takeBatch()
if (batch.length === 0) continue
try {
await this.config.send(batch)
failures = 0
} catch (err) {
failures++
if (
this.config.maxConsecutiveFailures !== undefined &&
failures >= this.config.maxConsecutiveFailures
) {
this.droppedBatches++
this.config.onBatchDropped?.(batch.length, failures)
failures = 0
this.releaseBackpressure()
continue
}
// Re-queue the failed batch at the front. Use concat (single
// allocation) instead of unshift(...batch) which shifts every
// pending item batch.length times. Only hit on failure path.
this.pending = batch.concat(this.pending)
const retryAfterMs =
err instanceof RetryableError ? err.retryAfterMs : undefined
await this.sleep(this.retryDelay(failures, retryAfterMs))
continue
}
// Release backpressure waiters if space opened up
this.releaseBackpressure()
}
} finally {
this.draining = false
// Notify flush waiters if queue is empty
if (this.pending.length === 0) {
for (const resolve of this.flushResolvers) resolve()
this.flushResolvers = []
}
}
}
/**
* Pull the next batch from pending. Respects both maxBatchSize and
* maxBatchBytes. The first item is always taken; subsequent items only
* if adding them keeps the cumulative JSON size under maxBatchBytes.
*
* Un-serializable items (BigInt, circular refs, throwing toJSON) are
* dropped in place β they can never be sent and leaving them at
* pending[0] would poison the queue and hang flush() forever.
*/
private takeBatch(): T[] {
const { maxBatchSize, maxBatchBytes } = this.config
if (maxBatchBytes === undefined) {
return this.pending.splice(0, maxBatchSize)
}
let bytes = 0
let count = 0
while (count < this.pending.length && count < maxBatchSize) {
let itemBytes: number
try {
itemBytes = Buffer.byteLength(jsonStringify(this.pending[count]))
} catch {
this.pending.splice(count, 1)
continue
}
if (count > 0 && bytes + itemBytes > maxBatchBytes) break
bytes += itemBytes
count++
}
return this.pending.splice(0, count)
}
private retryDelay(failures: number, retryAfterMs?: number): number {
const jitter = Math.random() * this.config.jitterMs
if (retryAfterMs !== undefined) {
// Jitter on top of the server's hint prevents thundering herd when
// many sessions share a rate limit and all receive the same
// Retry-After. Clamp first, then spread β same shape as the
// exponential path (effective ceiling is maxDelayMs + jitterMs).
const clamped = Math.max(
this.config.baseDelayMs,
Math.min(retryAfterMs, this.config.maxDelayMs),
)
return clamped + jitter
}
const exponential = Math.min(
this.config.baseDelayMs * 2 ** (failures - 1),
this.config.maxDelayMs,
)
return exponential + jitter
}
private releaseBackpressure(): void {
const resolvers = this.backpressureResolvers
this.backpressureResolvers = []
for (const resolve of resolvers) resolve()
}
private sleep(ms: number): Promise<void> {
return new Promise(resolve => {
this.sleepResolve = resolve
setTimeout(
(self, resolve) => {
self.sleepResolve = null
resolve()
},
ms,
this,
resolve,
)
})
}
}