src — streaming
src — streaming
The src/streaming module provides a comprehensive suite of utilities for handling asynchronous data streams, with a particular focus on real-time processing of Large Language Model (LLM) outputs, robust error handling, and user feedback mechanisms. It aims to make stream consumption efficient, resilient, and developer-friendly, abstracting away complexities like backpressure, retries, and incremental UI updates.
Core Concepts
Before diving into the components, understanding a few core concepts is helpful:
- Chunk/Delta: In the context of LLMs, a "chunk" or "delta" refers to a small, incremental piece of data received from a streaming API. These are often partial words, tokens, or fragments of tool calls.
- StreamEvent: A normalized, type-safe representation of a processed chunk, indicating its type (e.g.,
content,tool_call,error,done). - Backpressure: A mechanism to prevent a fast producer from overwhelming a slow consumer by signaling the producer to slow down or pause.
- Retry Policy: Strategies for automatically re-attempting failed operations, often with exponential backoff.
- Circuit Breaker: A pattern to prevent repeated attempts to a failing service, allowing it to recover before further requests are sent.
- Tool Phases: A structured way to track and report the lifecycle (start, update, success, fail) of an asynchronous tool execution, enabling real-time feedback.
Architecture Overview
The module's architecture is designed to handle various streaming needs, from low-level LLM delta processing to high-level tool execution feedback. The StreamHandler and ChunkProcessor form the core pipeline for LLM output.
graph TD
A[Raw LLM Delta Stream] --> B{StreamHandler};
B -- Pipes to --> C{ChunkProcessor};
C -- Processes, Batches, Applies Backpressure --> D[StreamEvent[]];
D --> E[Consumer (e.g., UI, ChunkHandler)];
C -- Emits Metrics, Flow Hints --> F[Monitoring/Feedback];
B -- Can create --> G[Node.js Transform Stream];
G --> H[Node.js Stream Consumers];
- Raw LLM Delta Stream: This is the initial input, typically an
AsyncIterableof raw JSON objects (e.g.,ChatDeltafromchunk-processor.ts) directly from an LLM API. StreamHandler: The primary orchestrator for consuming LLM streams. It manages global timeouts, integrates with Node.jsTransformstreams, and wraps theChunkProcessor.ChunkProcessor: A high-performance component responsible for:
- Parsing raw LLM deltas.
- Applying sanitization.
- Batching small content chunks for efficiency.
- Managing internal backpressure (
pendingEvents). - Tracking per-chunk timeouts.
- Accumulating tool call deltas.
- Collecting detailed streaming metrics.
- Emitting
StreamEvents.
StreamEvent[]: The output of theChunkProcessor, a normalized array of events that can be consumed by various parts of the application.- Consumer: Any component that needs to react to the processed stream events, such as a UI rendering the content, or a
ChunkHandleraccumulating the full response. - Monitoring/Feedback:
ChunkProcessorprovidesStreamingMetricsandFlowHints, allowing for real-time performance monitoring and adaptive UI adjustments. - Node.js Transform Stream:
StreamHandlercan expose aTransformstream interface, enabling integration with standard Node.js stream pipelines.
Key Components
StreamHandler (src/streaming/stream-handler.ts)
The StreamHandler acts as the main entry point for consuming LLM-like asynchronous streams. It wraps a ChunkProcessor and provides a higher-level interface for stream management, including global timeouts and Node.js stream compatibility.
Purpose: To provide a robust and flexible way to consume and process streaming data, particularly from LLM APIs, with built-in error handling, timeouts, and integration capabilities.
Key Features:
- Orchestration: Manages the lifecycle of a streaming operation, including starting/clearing global timeouts.
ChunkProcessorIntegration: Instantiates and configures aChunkProcessorto handle the low-level delta processing.- Timeout Management: Supports both per-chunk timeouts (delegated to
ChunkProcessor) and a global stream timeout. - Node.js Stream Compatibility: Provides
createTransformStream()andprocessReadableStream()methods to integrate with Node.jsReadableandTransformstreams. - Event Emission: Emits
flowHint,progress,complete, andglobalTimeoutevents to provide real-time feedback. - Accumulation: Offers
getAccumulated()to retrieve the final content and tool calls.
Usage Example:
import { StreamHandler } from 39;./streaming/stream-handler.js39;;
import { StreamChunk } from 39;./streaming/types.js39;; class="hl-cmt">// Assuming types.ts defines StreamChunk
async function* mockLLMStream(): AsyncIterable<StreamChunk> {
yield { choices: [{ delta: { content: 39;Hello39; } }] };
await new Promise(resolve => setTimeout(resolve, 50));
yield { choices: [{ delta: { content: 39; world!39; } }] };
await new Promise(resolve => setTimeout(resolve, 100));
yield { choices: [{ delta: { tool_calls: [{ index: 0, function: { name: 39;my_tool39; } }] }] };
await new Promise(resolve => setTimeout(resolve, 50));
yield { choices: [{ delta: { tool_calls: [{ index: 0, function: { arguments: 39;{"param":"value"}39; } }] }] };
await new Promise(resolve => setTimeout(resolve, 50));
}
async function processMyStream() {
const handler = new StreamHandler({
globalTimeoutMs: 5000,
processorOptions: {
enableBatching: true,
adaptiveThrottle: true,
},
});
handler.on(39;flowHint39;, (hint) => console.log(39;Flow Hint:39;, hint.state));
handler.on(39;progress39;, (progress) => console.log(39;Progress:39;, progress.progress));
handler.on(39;complete39;, (data) => console.log(39;Stream Complete:39;, data.metricsSummary));
handler.on(39;globalTimeout39;, () => console.error(39;Global stream timed out!39;));
try {
for await (const event of handler.handleStream(mockLLMStream())) {
if (event.type === 39;content39;) {
process.stdout.write(event.content);
} else if (event.type === 39;tool_call39;) {
console.log(39;\nTool Call:39;, event.toolCall);
} else if (event.type === 39;error39;) {
console.error(39;\nStream Error:39;, event.error);
}
}
console.log(39;\nFinal Accumulated Content:39;, handler.getAccumulated().content);
console.log(39;Final Tool Calls:39;, handler.getAccumulated().toolCalls);
} catch (e) {
console.error(39;Stream processing failed:39;, e);
}
}
processMyStream();
ChunkProcessor (src/streaming/chunk-processor.ts)
The ChunkProcessor is a highly optimized component for processing raw LLM deltas into a stream of StreamEvents. It focuses on performance, memory efficiency, and providing detailed insights into the streaming process.
Purpose: To efficiently transform raw, often fragmented, LLM output into a structured, sanitized, and manageable stream of events, while providing advanced flow control and performance monitoring.
Key Features:
- Delta Processing: Takes
ChatDeltaobjects (raw LLM API responses) and extracts content and tool call information. - Content Sanitization: Applies
sanitizeLLMOutputto clean up content. - Tool Call Accumulation: Reconstructs complete
CodeBuddyToolCallobjects from fragmented deltas. Supports extracting commentary-style tool calls if native ones are not found. - Batching: Batches small content chunks to reduce event overhead and improve rendering performance.
- Backpressure: Manages an internal
pendingEventsqueue to prevent overwhelming consumers, emittingFlowHints. - Timeouts: Implements per-chunk timeouts to detect stalled streams.
- Adaptive Render Throttling: Adjusts the
renderThrottleMsdynamically based on reported render durations, ensuring smooth UI updates. - Metrics: Collects extensive
StreamingMetrics(latency, throughput, jitter, percentiles) for performance analysis. - Memory Efficiency: Uses reusable buffers (
contentBuffer,rawContentBuffer,pendingBatch) to minimize garbage collection.
Key Methods:
processDelta(chunk: ChatDelta): The core method for processing a single raw LLM delta.startChunkTimeout()/clearChunkTimeout(): Manages the per-chunk timeout.drainPendingEvents(): Releases queued events when backpressure is relieved.getThrottledContent(content: string): Provides content for rendering, respecting the adaptive throttle.reportRenderDuration(durationMs: number): Allows consumers to provide feedback for adaptive throttling.getToolCalls(): Retrieves all accumulated tool calls.getMetrics()/getMetricsSummary(): Provides detailed performance statistics.getProgressIndicator(): Returns a UI-friendly progress object.
ChunkHandler (src/streaming/chunk-handler.ts)
The ChunkHandler provides a type-safe way to process and accumulate various types of Chunks (content, tool calls, errors, etc.). It's designed to build a complete response from a stream of discrete events.
Purpose: To provide a structured and extensible way to consume Chunk objects, accumulate their content, and emit specific events for different chunk types.
Key Features:
- Type-Safe Chunks: Defines
ChunkTypeandChunkinterface for clear data structure. - Accumulation: Gathers
content,toolCalls,toolResults,tokenCount, anderrorsinto anAccumulatedContentobject. - Validation: Can validate incoming chunk structures (
validateChunksoption). - Event Emission: Emits events for each chunk type (
content,toolCall,error,done, etc.) and a generalchunkevent. - Content Overflow Handling: Prevents excessive content accumulation with
maxContentLength. - Custom Handlers: Allows defining custom logic for specific chunk types via
typeHandlers. - Static Factories: Provides
createContentChunk,createToolCallChunk, etc., for easy chunk creation.
Usage Example:
import { ChunkHandler, Chunk } from 39;./streaming/chunk-handler.js39;;
const handler = new ChunkHandler({
maxContentLength: 100,
validateChunks: true,
typeHandlers: {
error: (chunk) => console.error(39;Custom Error Handler:39;, chunk.error),
},
});
handler.on(39;content39;, ({ content, total }) => console.log(`Content: "${content}" (Total: "${total}")`));
handler.on(39;toolCall39;, ({ toolCall }) => console.log(39;Tool Call:39;, toolCall));
handler.on(39;done39;, ({ accumulated }) => console.log(39;Final Accumulated:39;, accumulated));
handler.on(39;validationError39;, ({ errors }) => console.error(39;Validation Errors:39;, errors));
handler.handle(ChunkHandler.createContentChunk(39;First part. 39;));
handler.handle(ChunkHandler.createContentChunk(39;Second part. 39;));
handler.handle(ChunkHandler.createToolCallChunk(39;139;, 39;my_tool39;, 39;{"arg":1}39;));
handler.handle(ChunkHandler.createErrorChunk(39;Something went wrong!39;));
handler.handle(ChunkHandler.createDoneChunk());
console.log(39;Final Content:39;, handler.getContent());
console.log(39;Has Errors:39;, handler.hasErrors());
BackpressureController (src/streaming/backpressure.ts)
A generic backpressure mechanism for controlling the flow of data in any stream-like scenario. It buffers items and signals when a producer should pause or resume.
Purpose: To prevent memory exhaustion and ensure graceful handling of slow consumers by regulating the rate at which data is produced.
Key Features:
- High/Low Water Marks: Defines thresholds for pausing (
highWaterMark) and resuming (lowWaterMark) the producer. - Buffer Management:
push()adds items,pull()removes them. - State Management: Tracks
flowing,paused,drained,overflowstates. - Overflow Strategies: Configurable behavior when
highWaterMarkis exceeded (block,drop,error). waitForDrain(): An async method for producers to await buffer space.- Event Emission: Emits
pause,resume,drain,overflow,dropevents. - Statistics: Tracks
BackpressureStatsliketotalChunks,droppedChunks,avgDrainTime. process()/apply(): Convenience methods for integrating withAsyncIterables.
Usage Example:
import { BackpressureController } from 39;./streaming/backpressure.js39;;
async function producer(controller: BackpressureController<number>) {
for (let i = 0; i < 200; i++) {
console.log(`Producer: Pushing ${i}`);
const accepted = await controller.push(i);
if (!accepted) {
console.log(`Producer: Chunk ${i} was dropped or blocked.`);
if (controller.getState() === 39;paused39;) {
console.log(39;Producer: Waiting for drain...39;);
await controller.waitForDrain();
console.log(39;Producer: Resumed after drain.39;);
class="hl-cmt">// Re-attempt push if blocked, or handle dropped
await controller.push(i); class="hl-cmt">// Re-push if blocked
}
}
await new Promise(resolve => setTimeout(resolve, 10)); class="hl-cmt">// Simulate work
}
}
async function consumer(controller: BackpressureController<number>) {
while (true) {
const chunk = controller.pull();
if (chunk !== undefined) {
console.log(`Consumer: Pulled ${chunk}. Buffer size: ${controller.getBufferSize()}`);
await new Promise(resolve => setTimeout(resolve, 50)); class="hl-cmt">// Simulate slow work
} else if (controller.isEmpty() && controller.getState() === 39;drained39;) {
console.log(39;Consumer: Buffer drained, stopping.39;);
break;
} else {
await new Promise(resolve => setTimeout(resolve, 10)); class="hl-cmt">// Wait for more data
}
}
}
const controller = new BackpressureController<number>({
highWaterMark: 10,
lowWaterMark: 5,
overflowStrategy: 39;block39;,
});
controller.on(39;pause39;, ({ bufferSize }) => console.warn(`Controller PAUSED. Buffer: ${bufferSize}`));
controller.on(39;resume39;, ({ bufferSize }) => console.info(`Controller RESUMED. Buffer: ${bufferSize}`));
controller.on(39;drain39;, () => console.log(39;Controller DRAINED.39;));
Promise.all([producer(controller), consumer(controller)]);
StreamProcessor (src/streaming/stream-processor.ts)
A generic stream processor for any AsyncIterable, providing fundamental stream management capabilities like buffering, pause/resume, abort, and basic retry logic. This is a more general-purpose utility compared to ChunkProcessor which is specialized for LLM deltas.
Purpose: To provide a foundational layer for processing generic asynchronous data streams with common control flow mechanisms.
Key Features:
- Generic
AsyncIterableProcessing: Works with anyAsyncIterable. - Buffering & Backpressure: Uses
maxBufferSizeto manage an internal buffer andwaitForBufferDrain()to apply backpressure. - Pause/Resume: Allows explicit pausing and resuming of stream consumption.
- Abort: Supports aborting the stream processing via an
AbortController. - Retry Logic:
processWithRetry()provides automatic retry with exponential backoff for transient errors. - Transformation: Supports an optional
transformfunction for each chunk. - Statistics: Tracks
chunksProcessed,bytesProcessed,errors,retries. - Event Emission: Emits
start,chunk,pause,resume,abort,complete,error,backpressure,retry,transformErrorevents. - Collection/Reduction:
collect()andreduce()convenience methods for common stream operations.
StreamTransformer (src/streaming/stream-transformer.ts)
A static utility class offering a rich set of composable functions for transforming, filtering, and combining AsyncIterables.
Purpose: To provide a functional and declarative API for common stream manipulation patterns, making complex stream pipelines easier to build and reason about.
Key Features (Static Methods):
- Mapping & Filtering:
map,filter. - Limiting:
take,skip,takeWhile,skipWhile. - Structure:
flatten,batch. - Flow Control:
debounce,throttle,delay. - Buffering:
buffer(with custom flush condition). - Combining Streams:
merge,concat,zip. - Duplication:
tee(creates multiple independent iterables from one source). - Pipelining:
pipe(chains multiple transformations). - Aggregation:
reduce,collect.
Usage Example:
import { StreamTransformer } from 39;./streaming/stream-transformer.js39;;
async function* numberSource(count: number) {
for (let i = 0; i < count; i++) {
await new Promise(resolve => setTimeout(resolve, 10));
yield i;
}
}
async function demonstrateTransforms() {
console.log(39;--- Map and Filter ---39;);
const mappedAndFiltered = StreamTransformer.pipe(
numberSource(10),
(s) => StreamTransformer.map(s as AsyncIterable<number>, n => n * 2),
(s) => StreamTransformer.filter(s as AsyncIterable<number>, n => n % 4 === 0)
);
console.log(await StreamTransformer.collect(mappedAndFiltered)); class="hl-cmt">// [0, 4, 8, 12, 16]
console.log(39;--- Batch ---39;);
const batched = StreamTransformer.batch(numberSource(10), 3);
for await (const b of batched) {
console.log(b); class="hl-cmt">// [0,1,2], [3,4,5], [6,7,8], [9]
}
console.log(39;--- Merge ---39;);
async function* sourceA() { yield 39;A139;; await new Promise(r => setTimeout(r, 50)); yield 39;A239;; }
async function* sourceB() { await new Promise(r => setTimeout(r, 20)); yield 39;B139;; await new Promise(r => setTimeout(r, 50)); yield 39;B239;; }
console.log(await StreamTransformer.collect(StreamTransformer.merge(sourceA(), sourceB()))); class="hl-cmt">// [ 39;B139;, 39;A139;, 39;A239;, 39;B239; ] (order depends on timing)
}
demonstrateTransforms();
RetryManager & CircuitBreaker (src/streaming/retry-policy.ts)
This module provides robust error handling strategies: automatic retries with exponential backoff and the circuit breaker pattern. RetryManager orchestrates both.
Purpose: To enhance the resilience of asynchronous operations by automatically handling transient failures and preventing cascading failures to unstable services.
Key Components:
RetryConfig: Defines parameters for retry attempts (max attempts, delays, jitter, retryable/non-retryable errors).retry: A standalone function to execute an operation with retry logic.(operation: () => Promise , config?: Partial ) retryOrThrowis a convenience wrapper.CircuitBreakerConfig: Defines parameters for the circuit breaker (failure thresholds, reset timeouts, success thresholds).CircuitBreaker: AnEventEmitterthat manages the state (closed,open,half-open) of a service. It prevents requests to a failing service whenopenand allows controlled probes whenhalf-open.RetryManager: A singleton (getRetryManager()) that manages multipleCircuitBreakerinstances (one perserviceId) and combines retry logic with circuit breaking. It providesexecute()andexecuteOrThrow()methods.- Decorators:
withRetry,withCircuitBreaker,withRetryAndCircuitBreakerprovide a declarative way to apply these policies to functions.
Usage Example:
import { getRetryManager, CircuitOpenError } from 39;./streaming/retry-policy.js39;;
let failureCount = 0;
async function unreliableOperation(shouldFail: boolean): Promise<string> {
if (shouldFail && failureCount < 3) { class="hl-cmt">// Fail first 3 times
failureCount++;
console.log(`Operation failed (attempt ${failureCount})`);
throw new Error(39;Transient network error39;);
}
console.log(39;Operation succeeded!39;);
return 39;Data from service39;;
}
async function demonstrateRetry() {
const manager = getRetryManager();
const serviceId = 39;my-api-service39;;
manager.on(39;circuit-state-change39;, (id, from, to) =>
console.warn(`Circuit for ${id} changed from ${from} to ${to}`)
);
manager.on(39;circuit-failure39;, (id, error) =>
console.error(`Circuit for ${id} recorded failure: ${error.message}`)
);
try {
console.log(39;\n--- Demonstrating Retry ---39;);
failureCount = 0;
const result = await manager.executeOrThrow(
serviceId,
() => unreliableOperation(true),
{ maxAttempts: 5, initialDelayMs: 100 }
);
console.log(39;Retry Result:39;, result);
console.log(39;\n--- Demonstrating Circuit Breaker ---39;);
class="hl-cmt">// Reset failure count for circuit breaker demo
failureCount = 0;
for (let i = 0; i < 10; i++) {
try {
console.log(`Request ${i + 1}. Circuit state: ${manager.getCircuitState(serviceId)}`);
await manager.executeOrThrow(serviceId, () => unreliableOperation(true), { maxAttempts: 1 });
} catch (e) {
if (e instanceof CircuitOpenError) {
console.error(`Request ${i + 1} rejected by circuit breaker.`);
} else {
console.error(`Request ${i + 1} failed: ${e.message}`);
}
}
await new Promise(resolve => setTimeout(resolve, 500)); class="hl-cmt">// Wait a bit
}
console.log(39;Final Circuit Stats:39;, manager.getCircuitStats(serviceId));
} catch (e) {
console.error(39;Overall operation failed:39;, e.message);
} finally {
manager.resetAllCircuits();
}
}
demonstrateRetry();
MarkdownChunker (src/streaming/markdown-chunker.ts)
A specialized utility for intelligently chunking streaming markdown content, ensuring that code blocks are not split mid-way unless absolutely necessary.
Purpose: To enable incremental display or processing of markdown content from a stream while preserving the structural integrity of code blocks, improving readability and usability.
Key Features:
- Block-Aware Splitting: Tracks
BlockState(e.g.,inCodeBlock,fence,language) to make informed splitting decisions. - Code Block Preservation: Prioritizes splitting outside code blocks. If a hard maximum is reached inside a code block, it will force-split by closing the current fence and reopening it in the next chunk.
- Configurable Thresholds:
softMaxChars(preferred split point) andhardMaxChars(absolute maximum before force-splitting). - Preferred Breaks: Uses
preferredBreaks(e.g.,\n\n,\n,.) to find natural split points. write()/flush(): Methods for incremental input and final buffer flushing.ChunkResult: Provides information about the chunk, including whether it was force-split and if a fence needs to be reopened.- Convenience Functions:
chunkMarkdown,hasUnclosedCodeBlock,countCodeBlocks,fixUnclosedCodeBlocks,createStreamingChunker.
Usage Example:
import { MarkdownChunker, createStreamingChunker } from 39;./streaming/markdown-chunker.js39;;
const longMarkdown = `
# Introduction
This is some introductory text. It39;s quite long and might need to be chunked.
\`\`\`typescript
function helloWorld() {
console.log("Hello, world!");
class="hl-cmt">// This is a very long line that might cause a split if not handled carefully.
class="hl-cmt">// Another long line to ensure the chunker has enough content to work with.
class="hl-cmt">// Yet another line.
}
\`\`\`
More text after the code block.
`;
console.log(39;--- Streaming Chunker ---39;);
const streamingChunker = createStreamingChunker(
(chunk) => {
console.log(`Chunk (forceSplit: ${chunk.forceSplit}, reopen: ${chunk.reopenFence}):\n---\n${chunk.content}\n---`);
},
{ softMaxChars: 100, hardMaxChars: 200, preserveCodeBlocks: true }
);
streamingChunker.write(longMarkdown.slice(0, 150));
streamingChunker.write(longMarkdown.slice(150, 300));
streamingChunker.write(longMarkdown.slice(300));
streamingChunker.flush();
console.log(39;\n--- Single-shot Chunker ---39;);
const chunks = MarkdownChunker.chunkMarkdown(longMarkdown, {
softMaxChars: 100,
hardMaxChars: 200,
preserveCodeBlocks: true,
});
chunks.forEach((chunk, i) => {
console.log(`Chunk ${i + 1} (forceSplit: ${chunk.forceSplit}, reopen: ${chunk.reopenFence}):\n---\n${chunk.content}\n---`);
});
ProgressTracker (src/streaming/progress-tracker.ts)
A utility for tracking and reporting progress across multiple, weighted stages of a long-running operation.
Purpose: To provide a structured way to monitor and communicate the progress of complex, multi-step processes, offering real-time updates and estimated time remaining.
Key Features:
- Multi-Stage Tracking: Defines
ProgressStages withnameandweight. - Weighted Progress: Calculates
totalProgressbased on the progress and weight of each stage. - Time Estimation: Can estimate
estimatedTimeRemainingbased on historical progress. - Event Emission: Emits
progressevents withProgressUpdateobjects. - Control Methods:
start(),updateProgress(),completeStage(),failStage(). - Throttling:
updateIntervalMsto control how frequently progress events are emitted. - Convenience Functions:
createSimpleTracker,calculateIterationProgress.
Usage Example:
import { ProgressTracker } from 39;./streaming/progress-tracker.js39;;
async function longRunningTask() {
const tracker = new ProgressTracker({
stages: [
{ name: 39;Initialization39;, weight: 10 },
{ name: 39;Processing Data39;, weight: 70 },
{ name: 39;Finalization39;, weight: 20 },
],
estimateTime: true,
updateIntervalMs: 50,
});
tracker.on(39;progress39;, (update) => {
console.log(
`Total: ${update.totalProgress}% | Stage: ${update.currentStage} (${update.stageProgress}%) | Message: ${update.message} | ETA: ${update.estimatedTimeRemaining?.toFixed(0) || 39;N/A39;}ms`
);
});
tracker.start();
class="hl-cmt">// Stage 1: Initialization
for (let i = 0; i <= 100; i += 20) {
tracker.updateProgress(i, 39;Initialization39;, `Initializing step ${i / 20}`);
await new Promise(resolve => setTimeout(resolve, 100));
}
tracker.completeStage(39;Initialization39;);
class="hl-cmt">// Stage 2: Processing Data
for (let i = 0; i <= 100; i += 10) {
tracker.updateProgress(i, 39;Processing Data39;, `Processing batch ${i / 10}`);
await new Promise(resolve => setTimeout(resolve, 200));
}
tracker.completeStage(39;Processing Data39;);
class="hl-cmt">// Stage 3: Finalization
for (let i = 0; i <= 100; i += 25) {
tracker.updateProgress(i, 39;Finalization39;, `Finalizing step ${i / 25}`);
await new Promise(resolve => setTimeout(resolve, 50));
}
tracker.completeStage(39;Finalization39;);
console.log(39;Task Completed!39;);
}
longRunningTask();
Tool Streaming Convenience Functions (src/streaming/index.ts)
The index.ts file exports all components and provides high-level convenience functions for integrating tool execution with streaming progress and results. These functions leverage ToolPhaseManager and ToolPhaseThrottler (not detailed in the provided source, but part of the module).
Purpose: To simplify the process of reporting real-time progress and outcomes for asynchronous tool calls, making it easier to build interactive agents or UIs.
Key Functions:
createToolStream(toolCallId, toolName, onPhase?): Creates an object with anemitterand helper methods (start,update,success,fail,cleanup) to manage the lifecycle of a tool call's streaming progress.streamedOperation: Wraps an(toolCallId, toolName, operation, onPhase?) asyncfunction, automatically emittingstart,update(via a provided callback),success, orfailevents.streamedIteration: Iterates over an array of items, calling a(toolCallId, toolName, items, processor, onPhase?) processorfor each, and automatically updates progress.
Usage Example:
import { streamedOperation, streamedIteration } from 39;./streaming/index.js39;;
async function myToolFunction(
updateProgress: (progress: number, message?: string) => void
): Promise<string> {
updateProgress(10, 39;Starting complex calculation...39;);
await new Promise(resolve => setTimeout(resolve, 200));
updateProgress(50, 39;Halfway through...39;);
await new Promise(resolve => setTimeout(resolve, 300));
updateProgress(90, 39;Almost done...39;);
await new Promise(resolve => setTimeout(resolve, 100));
return 39;Calculation complete!39;;
}
async function processItems(item: string, index: number): Promise<string> {
console.log(`Processing item ${item} (index ${index})...`);
await new Promise(resolve => setTimeout(resolve, 150));
return `Processed: ${item}`;
}
async function demonstrateToolStreaming() {
console.log(39;--- Streamed Operation ---39;);
try {
const result = await streamedOperation(
39;tool-call-12339;,
39;ComplexTool39;,
myToolFunction,
(event) => console.log(`[ToolPhase] ${event.phase}: ${event.message || 39;39;} (${event.progress || 0}%)`)
);
console.log(39;Streamed Operation Result:39;, result);
} catch (error) {
console.error(39;Streamed Operation Failed:39;, error.message);
}
console.log(39;\n--- Streamed Iteration ---39;);
try {
const items = [39;itemA39;, 39;itemB39;, 39;itemC39;];
const results = await streamedIteration(
39;tool-call-45639;,
39;BatchProcessor39;,
items,
processItems,
(event) => console.log(`[ToolPhase] ${event.phase}: ${event.message || 39;39;} (${event.progress || 0}%)`)
);
console.log(39;Streamed Iteration Results:39;, results);
} catch (error) {
console.error(39;Streamed Iteration Failed:39;, error.message);
}
}
demonstrateToolStreaming();
Integration Points
The src/streaming module is designed to integrate broadly across the codebase:
- LLM Providers (
src/providers/*-provider.ts): Theretryutility fromretry-policy.tsis used bygemini-provider.tsandlocal-llm-provider.tsto make API calls more resilient.StreamHandlerandChunkProcessorare likely used to consume the actual streaming responses from these providers. - CodeBuddy Client (
src/codebuddy/client.ts):geminiChatusesretryfor its operations. TheChunkProcessor's ability to extractCodeBuddyToolCalls is crucial here. - Agent Logic (
src/agent/codebuddy-agent.ts): Agents would use thecreateToolStream,streamedOperation, andstreamedIterationfunctions to report progress and results of their internal tool executions. They might also consumeStreamEvents fromStreamHandlerto process LLM responses. - UI/Frontend: The
StreamHandlerandChunkProcessorare designed to provide real-time feedback (progress,flowHint,getThrottledContent,reportRenderDuration) for building responsive user interfaces that display streaming LLM output. - Utilities (
src/utils/sanitize.ts,src/utils/retry.ts):ChunkProcessorusessanitizeLLMOutput. Theretryfunctions are also re-exported or used bysrc/utils/retry.ts. - Memory Management (
src/memory/memory-flush.ts): Thebuffertransformer fromstream-transformer.tsmight be used for memory flushing strategies. - Sandbox/Runtime (
src/sandbox/e2b-sandbox.ts,src/scripting/runtime.ts): Thedestroymethod of aTransformstream (created byStreamHandler) might be called by sandbox environments. Functions wrapped withwithRetryorwithCircuitBreakercould be runtime functions.
Contributing to the Module
When contributing to the src/streaming module, consider the following:
- Performance: Many components, especially
ChunkProcessor, are optimized for high performance and low memory allocation. Be mindful of creating new objects in hot paths. Reuse arrays (.length = 0) where possible. - Genericity: Components like
StreamProcessor,StreamTransformer, andBackpressureControllerare designed to be generic. Avoid coupling them to LLM-specific types unless absolutely necessary. - Testability: Each component should be independently testable. The existing unit tests (
tests/unit/streaming.test.ts) provide good examples. - Event-Driven: Many components extend
EventEmitter. Ensure events are clearly defined, documented, and emitted consistently. - Error Handling: Robust error handling is critical. Ensure that errors are caught, propagated, and reported appropriately, especially in streaming contexts.
- Asynchronous Nature: All components deal with
Promises andAsyncIterables. Understand asynchronous patterns and potential pitfalls (e.g., race conditions, unhandled promises). - Mermaid Diagrams: If adding new complex interactions, consider a small, focused Mermaid diagram to illustrate the flow.