π File detail
utils/task/diskOutput.ts
π― Use case
This file lives under βutils/β, which covers cross-cutting helpers (shell, tempfiles, settings, messages, process input, β¦). On the API surface it exposes MAX_TASK_OUTPUT_BYTES, MAX_TASK_OUTPUT_BYTES_DISPLAY, getTaskOutputDir, _resetTaskOutputDirForTest, and getTaskOutputPath (and more) β mainly functions, hooks, or classes. Dependencies touch Node filesystem and Node path helpers. It composes internal code from bootstrap, errors, fsOperations, log, and permissions (relative imports).
Generated from folder role, exports, dependency roots, and inline comments β not hand-reviewed for every path.
π§ Inline summary
import { constants as fsConstants } from 'fs' import { type FileHandle, mkdir, open,
π€ Exports (heuristic)
MAX_TASK_OUTPUT_BYTESMAX_TASK_OUTPUT_BYTES_DISPLAYgetTaskOutputDir_resetTaskOutputDirForTestgetTaskOutputPathDiskTaskOutput_clearOutputsForTestappendTaskOutputflushTaskOutputevictTaskOutputgetTaskOutputDeltagetTaskOutputgetTaskOutputSizecleanupTaskOutputinitTaskOutputinitTaskOutputAsSymlink
π External import roots
Package roots from from "β¦" (relative paths omitted).
fspath
π₯οΈ Source preview
import { constants as fsConstants } from 'fs'
import {
type FileHandle,
mkdir,
open,
stat,
symlink,
unlink,
} from 'fs/promises'
import { join } from 'path'
import { getSessionId } from '../../bootstrap/state.js'
import { getErrnoCode } from '../errors.js'
import { readFileRange, tailFile } from '../fsOperations.js'
import { logError } from '../log.js'
import { getProjectTempDir } from '../permissions/filesystem.js'
// SECURITY: O_NOFOLLOW prevents following symlinks when opening task output files.
// Without this, an attacker in the sandbox could create symlinks in the tasks directory
// pointing to arbitrary files, causing Claude Code on the host to write to those files.
// O_NOFOLLOW is not available on Windows, but the sandbox attack vector is Unix-only.
const O_NOFOLLOW = fsConstants.O_NOFOLLOW ?? 0
const DEFAULT_MAX_READ_BYTES = 8 * 1024 * 1024 // 8MB
/**
* Disk cap for task output files. In file mode (bash), a watchdog polls
* file size and kills the process. In pipe mode (hooks), DiskTaskOutput
* drops chunks past this limit. Shared so both caps stay in sync.
*/
export const MAX_TASK_OUTPUT_BYTES = 5 * 1024 * 1024 * 1024
export const MAX_TASK_OUTPUT_BYTES_DISPLAY = '5GB'
/**
* Get the task output directory for this session.
* Uses project temp directory so reads are auto-allowed by checkReadableInternalPath.
*
* The session ID is included so concurrent sessions in the same project don't
* clobber each other's output files. Startup cleanup in one session previously
* unlinked in-flight output files from other sessions β the writing process's fd
* keeps the inode alive but reads via path fail ENOENT, and getStdout() returned
* empty string (inc-4586 / boris-20260309-060423).
*
* The session ID is captured at FIRST CALL, not re-read on every invocation.
* /clear calls regenerateSessionId(), which would otherwise cause
* ensureOutputDir() to create a new-session path while existing TaskOutput
* instances still hold old-session paths β open() would ENOENT. Background
* bash tasks surviving /clear need their output files to stay reachable.
*/
let _taskOutputDir: string | undefined
export function getTaskOutputDir(): string {
if (_taskOutputDir === undefined) {
_taskOutputDir = join(getProjectTempDir(), getSessionId(), 'tasks')
}
return _taskOutputDir
}
/** Test helper β clears the memoized dir. */
export function _resetTaskOutputDirForTest(): void {
_taskOutputDir = undefined
}
/**
* Ensure the task output directory exists
*/
async function ensureOutputDir(): Promise<void> {
await mkdir(getTaskOutputDir(), { recursive: true })
}
/**
* Get the output file path for a task
*/
export function getTaskOutputPath(taskId: string): string {
return join(getTaskOutputDir(), `${taskId}.output`)
}
// Tracks fire-and-forget promises (initTaskOutput, initTaskOutputAsSymlink,
// evictTaskOutput, #drain) so tests can drain before teardown. Prevents the
// async-ENOENT-after-teardown flake class (#24957, #25065): a voided async
// resumes after preload's afterEach nuked the temp dir β ENOENT β unhandled
// rejection β flaky test failure. allSettled so a rejection doesn't short-
// circuit the drain and leave other ops racing the rmSync.
const _pendingOps = new Set<Promise<unknown>>()
function track<T>(p: Promise<T>): Promise<T> {
_pendingOps.add(p)
void p.finally(() => _pendingOps.delete(p)).catch(() => {})
return p
}
/**
* Encapsulates async disk writes for a single task's output.
*
* Uses a flat array as a write queue processed by a single drain loop,
* so each chunk can be GC'd immediately after its write completes.
* This avoids the memory retention problem of chained .then() closures
* where every reaction captures its data until the whole chain resolves.
*/
export class DiskTaskOutput {
#path: string
#fileHandle: FileHandle | null = null
#queue: string[] = []
#bytesWritten = 0
#capped = false
#flushPromise: Promise<void> | null = null
#flushResolve: (() => void) | null = null
constructor(taskId: string) {
this.#path = getTaskOutputPath(taskId)
}
append(content: string): void {
if (this.#capped) {
return
}
// content.length (UTF-16 code units) undercounts UTF-8 bytes by at most ~3Γ.
// Acceptable for a coarse disk-fill guard β avoids re-scanning every chunk.
this.#bytesWritten += content.length
if (this.#bytesWritten > MAX_TASK_OUTPUT_BYTES) {
this.#capped = true
this.#queue.push(
`\n[output truncated: exceeded ${MAX_TASK_OUTPUT_BYTES_DISPLAY} disk cap]\n`,
)
} else {
this.#queue.push(content)
}
if (!this.#flushPromise) {
this.#flushPromise = new Promise<void>(resolve => {
this.#flushResolve = resolve
})
void track(this.#drain())
}
}
flush(): Promise<void> {
return this.#flushPromise ?? Promise.resolve()
}
cancel(): void {
this.#queue.length = 0
}
async #drainAllChunks(): Promise<void> {
while (true) {
try {
if (!this.#fileHandle) {
await ensureOutputDir()
this.#fileHandle = await open(
this.#path,
process.platform === 'win32'
? 'a'
: fsConstants.O_WRONLY |
fsConstants.O_APPEND |
fsConstants.O_CREAT |
O_NOFOLLOW,
)
}
while (true) {
await this.#writeAllChunks()
if (this.#queue.length === 0) {
break
}
}
} finally {
if (this.#fileHandle) {
const fileHandle = this.#fileHandle
this.#fileHandle = null
await fileHandle.close()
}
}
// you could have another .append() while we're waiting for the file to close, so we check the queue again before fully exiting
if (this.#queue.length) {
continue
}
break
}
}
#writeAllChunks(): Promise<void> {
// This code is extremely precise.
// You **must not** add an await here!! That will cause memory to balloon as the queue grows.
// It's okay to add an `await` to the caller of this method (e.g. #drainAllChunks) because that won't cause Buffer[] to be kept alive in memory.
return this.#fileHandle!.appendFile(
// This variable needs to get GC'd ASAP.
this.#queueToBuffers(),
)
}
/** Keep this in a separate method so that GC doesn't keep it alive for any longer than it should. */
#queueToBuffers(): Buffer {
// Use .splice to in-place mutate the array, informing the GC it can free it.
const queue = this.#queue.splice(0, this.#queue.length)
let totalLength = 0
for (const str of queue) {
totalLength += Buffer.byteLength(str, 'utf8')
}
const buffer = Buffer.allocUnsafe(totalLength)
let offset = 0
for (const str of queue) {
offset += buffer.write(str, offset, 'utf8')
}
return buffer
}
async #drain(): Promise<void> {
try {
await this.#drainAllChunks()
} catch (e) {
// Transient fs errors (EMFILE on busy CI, EPERM on Windows pending-
// delete) previously rode up through `void this.#drain()` as an
// unhandled rejection while the flush promise resolved anyway β callers
// saw an empty file with no error. Retry once for the transient case
// (queue is intact if open() failed), then log and give up.
logError(e)
if (this.#queue.length > 0) {
try {
await this.#drainAllChunks()
} catch (e2) {
logError(e2)
}
}
} finally {
const resolve = this.#flushResolve!
this.#flushPromise = null
this.#flushResolve = null
resolve()
}
}
}
const outputs = new Map<string, DiskTaskOutput>()
/**
* Test helper β cancel pending writes, await in-flight ops, clear the map.
* backgroundShells.test.ts and other task tests spawn real shells that
* write through this module without afterEach cleanup; their entries
* leak into diskOutput.test.ts on the same shard.
*
* Awaits all tracked promises until the set stabilizes β a settling promise
* may spawn another (initTaskOutputAsSymlink's catch β initTaskOutput).
* Call this in afterEach BEFORE rmSync to avoid async-ENOENT-after-teardown.
*/
export async function _clearOutputsForTest(): Promise<void> {
for (const output of outputs.values()) {
output.cancel()
}
while (_pendingOps.size > 0) {
await Promise.allSettled([..._pendingOps])
}
outputs.clear()
}
function getOrCreateOutput(taskId: string): DiskTaskOutput {
let output = outputs.get(taskId)
if (!output) {
output = new DiskTaskOutput(taskId)
outputs.set(taskId, output)
}
return output
}
/**
* Append output to a task's disk file asynchronously.
* Creates the file if it doesn't exist.
*/
export function appendTaskOutput(taskId: string, content: string): void {
getOrCreateOutput(taskId).append(content)
}
/**
* Wait for all pending writes for a task to complete.
* Useful before reading output to ensure all data is flushed.
*/
export async function flushTaskOutput(taskId: string): Promise<void> {
const output = outputs.get(taskId)
if (output) {
await output.flush()
}
}
/**
* Evict a task's DiskTaskOutput from the in-memory map after flushing.
* Unlike cleanupTaskOutput, this does not delete the output file on disk.
* Call this when a task completes and its output has been consumed.
*/
export function evictTaskOutput(taskId: string): Promise<void> {
return track(
(async () => {
const output = outputs.get(taskId)
if (output) {
await output.flush()
outputs.delete(taskId)
}
})(),
)
}
/**
* Get delta (new content) since last read.
* Reads only from the byte offset, up to maxBytes β never loads the full file.
*/
export async function getTaskOutputDelta(
taskId: string,
fromOffset: number,
maxBytes: number = DEFAULT_MAX_READ_BYTES,
): Promise<{ content: string; newOffset: number }> {
try {
const result = await readFileRange(
getTaskOutputPath(taskId),
fromOffset,
maxBytes,
)
if (!result) {
return { content: '', newOffset: fromOffset }
}
return {
content: result.content,
newOffset: fromOffset + result.bytesRead,
}
} catch (e) {
const code = getErrnoCode(e)
if (code === 'ENOENT') {
return { content: '', newOffset: fromOffset }
}
logError(e)
return { content: '', newOffset: fromOffset }
}
}
/**
* Get output for a task, reading the tail of the file.
* Caps at maxBytes to avoid loading multi-GB files into memory.
*/
export async function getTaskOutput(
taskId: string,
maxBytes: number = DEFAULT_MAX_READ_BYTES,
): Promise<string> {
try {
const { content, bytesTotal, bytesRead } = await tailFile(
getTaskOutputPath(taskId),
maxBytes,
)
if (bytesTotal > bytesRead) {
return `[${Math.round((bytesTotal - bytesRead) / 1024)}KB of earlier output omitted]\n${content}`
}
return content
} catch (e) {
const code = getErrnoCode(e)
if (code === 'ENOENT') {
return ''
}
logError(e)
return ''
}
}
/**
* Get the current size (offset) of a task's output file.
*/
export async function getTaskOutputSize(taskId: string): Promise<number> {
try {
return (await stat(getTaskOutputPath(taskId))).size
} catch (e) {
const code = getErrnoCode(e)
if (code === 'ENOENT') {
return 0
}
logError(e)
return 0
}
}
/**
* Clean up a task's output file and write queue.
*/
export async function cleanupTaskOutput(taskId: string): Promise<void> {
const output = outputs.get(taskId)
if (output) {
output.cancel()
outputs.delete(taskId)
}
try {
await unlink(getTaskOutputPath(taskId))
} catch (e) {
const code = getErrnoCode(e)
if (code === 'ENOENT') {
return
}
logError(e)
}
}
/**
* Initialize output file for a new task.
* Creates an empty file to ensure the path exists.
*/
export function initTaskOutput(taskId: string): Promise<string> {
return track(
(async () => {
await ensureOutputDir()
const outputPath = getTaskOutputPath(taskId)
// SECURITY: O_NOFOLLOW prevents symlink-following attacks from the sandbox.
// O_EXCL ensures we create a new file and fail if something already exists at this path.
// On Windows, use string flags β numeric O_EXCL can produce EINVAL through libuv.
const fh = await open(
outputPath,
process.platform === 'win32'
? 'wx'
: fsConstants.O_WRONLY |
fsConstants.O_CREAT |
fsConstants.O_EXCL |
O_NOFOLLOW,
)
await fh.close()
return outputPath
})(),
)
}
/**
* Initialize output file as a symlink to another file (e.g., agent transcript).
* Tries to create the symlink first; if a file already exists, removes it and retries.
*/
export function initTaskOutputAsSymlink(
taskId: string,
targetPath: string,
): Promise<string> {
return track(
(async () => {
try {
await ensureOutputDir()
const outputPath = getTaskOutputPath(taskId)
try {
await symlink(targetPath, outputPath)
} catch {
await unlink(outputPath)
await symlink(targetPath, outputPath)
}
return outputPath
} catch (error) {
logError(error)
return initTaskOutput(taskId)
}
})(),
)
}