src — streaming

Module: src-streaming Cohesion: 0.80 Members: 0

src — streaming

The src/streaming module provides a comprehensive suite of utilities for handling asynchronous data streams, with a particular focus on real-time processing of Large Language Model (LLM) outputs, robust error handling, and user feedback mechanisms. It aims to make stream consumption efficient, resilient, and developer-friendly, abstracting away complexities like backpressure, retries, and incremental UI updates.

Core Concepts

Before diving into the components, understanding a few core concepts is helpful:

Architecture Overview

The module's architecture is designed to handle various streaming needs, from low-level LLM delta processing to high-level tool execution feedback. The StreamHandler and ChunkProcessor form the core pipeline for LLM output.

graph TD
    A[Raw LLM Delta Stream] --> B{StreamHandler};
    B -- Pipes to --> C{ChunkProcessor};
    C -- Processes, Batches, Applies Backpressure --> D[StreamEvent[]];
    D --> E[Consumer (e.g., UI, ChunkHandler)];
    C -- Emits Metrics, Flow Hints --> F[Monitoring/Feedback];
    B -- Can create --> G[Node.js Transform Stream];
    G --> H[Node.js Stream Consumers];

  1. Raw LLM Delta Stream: This is the initial input, typically an AsyncIterable of raw JSON objects (e.g., ChatDelta from chunk-processor.ts) directly from an LLM API.
  2. StreamHandler: The primary orchestrator for consuming LLM streams. It manages global timeouts, integrates with Node.js Transform streams, and wraps the ChunkProcessor.
  3. ChunkProcessor: A high-performance component responsible for:

  1. StreamEvent[]: The output of the ChunkProcessor, a normalized array of events that can be consumed by various parts of the application.
  2. Consumer: Any component that needs to react to the processed stream events, such as a UI rendering the content, or a ChunkHandler accumulating the full response.
  3. Monitoring/Feedback: ChunkProcessor provides StreamingMetrics and FlowHints, allowing for real-time performance monitoring and adaptive UI adjustments.
  4. Node.js Transform Stream: StreamHandler can expose a Transform stream interface, enabling integration with standard Node.js stream pipelines.

Key Components

StreamHandler (src/streaming/stream-handler.ts)

The StreamHandler acts as the main entry point for consuming LLM-like asynchronous streams. It wraps a ChunkProcessor and provides a higher-level interface for stream management, including global timeouts and Node.js stream compatibility.

Purpose: To provide a robust and flexible way to consume and process streaming data, particularly from LLM APIs, with built-in error handling, timeouts, and integration capabilities.

Key Features:

Usage Example:

import { StreamHandler } from './streaming/stream-handler.js';
import { StreamChunk } from './streaming/types.js'; class="hl-cmt">// Assuming types.ts defines StreamChunk

async function* mockLLMStream(): AsyncIterable<StreamChunk> {
  yield { choices: [{ delta: { content: &#39;Hello&#39; } }] };
  await new Promise(resolve => setTimeout(resolve, 50));
  yield { choices: [{ delta: { content: &#39; world!&#39; } }] };
  await new Promise(resolve => setTimeout(resolve, 100));
  yield { choices: [{ delta: { tool_calls: [{ index: 0, function: { name: &#39;my_tool&#39; } }] }] };
  await new Promise(resolve => setTimeout(resolve, 50));
  yield { choices: [{ delta: { tool_calls: [{ index: 0, function: { arguments: &#39;{"param":"value"}&#39; } }] }] };
  await new Promise(resolve => setTimeout(resolve, 50));
}

async function processMyStream() {
  const handler = new StreamHandler({
    globalTimeoutMs: 5000,
    processorOptions: {
      enableBatching: true,
      adaptiveThrottle: true,
    },
  });

  handler.on(&#39;flowHint&#39;, (hint) => console.log(&#39;Flow Hint:&#39;, hint.state));
  handler.on(&#39;progress&#39;, (progress) => console.log(&#39;Progress:&#39;, progress.progress));
  handler.on(&#39;complete&#39;, (data) => console.log(&#39;Stream Complete:&#39;, data.metricsSummary));
  handler.on(&#39;globalTimeout&#39;, () => console.error(&#39;Global stream timed out!&#39;));

  try {
    for await (const event of handler.handleStream(mockLLMStream())) {
      if (event.type === &#39;content&#39;) {
        process.stdout.write(event.content);
      } else if (event.type === &#39;tool_call&#39;) {
        console.log(&#39;\nTool Call:&#39;, event.toolCall);
      } else if (event.type === &#39;error&#39;) {
        console.error(&#39;\nStream Error:&#39;, event.error);
      }
    }
    console.log(&#39;\nFinal Accumulated Content:&#39;, handler.getAccumulated().content);
    console.log(&#39;Final Tool Calls:&#39;, handler.getAccumulated().toolCalls);
  } catch (e) {
    console.error(&#39;Stream processing failed:&#39;, e);
  }
}

processMyStream();

ChunkProcessor (src/streaming/chunk-processor.ts)

The ChunkProcessor is a highly optimized component for processing raw LLM deltas into a stream of StreamEvents. It focuses on performance, memory efficiency, and providing detailed insights into the streaming process.

Purpose: To efficiently transform raw, often fragmented, LLM output into a structured, sanitized, and manageable stream of events, while providing advanced flow control and performance monitoring.

Key Features:

Key Methods:

ChunkHandler (src/streaming/chunk-handler.ts)

The ChunkHandler provides a type-safe way to process and accumulate various types of Chunks (content, tool calls, errors, etc.). It's designed to build a complete response from a stream of discrete events.

Purpose: To provide a structured and extensible way to consume Chunk objects, accumulate their content, and emit specific events for different chunk types.

Key Features:

Usage Example:

import { ChunkHandler, Chunk } from &#39;./streaming/chunk-handler.js&#39;;

const handler = new ChunkHandler({
  maxContentLength: 100,
  validateChunks: true,
  typeHandlers: {
    error: (chunk) => console.error(&#39;Custom Error Handler:&#39;, chunk.error),
  },
});

handler.on(&#39;content&#39;, ({ content, total }) => console.log(`Content: "${content}" (Total: "${total}")`));
handler.on(&#39;toolCall&#39;, ({ toolCall }) => console.log(&#39;Tool Call:&#39;, toolCall));
handler.on(&#39;done&#39;, ({ accumulated }) => console.log(&#39;Final Accumulated:&#39;, accumulated));
handler.on(&#39;validationError&#39;, ({ errors }) => console.error(&#39;Validation Errors:&#39;, errors));

handler.handle(ChunkHandler.createContentChunk(&#39;First part. &#39;));
handler.handle(ChunkHandler.createContentChunk(&#39;Second part. &#39;));
handler.handle(ChunkHandler.createToolCallChunk(&#39;1&#39;, &#39;my_tool&#39;, &#39;{"arg":1}&#39;));
handler.handle(ChunkHandler.createErrorChunk(&#39;Something went wrong!&#39;));
handler.handle(ChunkHandler.createDoneChunk());

console.log(&#39;Final Content:&#39;, handler.getContent());
console.log(&#39;Has Errors:&#39;, handler.hasErrors());

BackpressureController (src/streaming/backpressure.ts)

A generic backpressure mechanism for controlling the flow of data in any stream-like scenario. It buffers items and signals when a producer should pause or resume.

Purpose: To prevent memory exhaustion and ensure graceful handling of slow consumers by regulating the rate at which data is produced.

Key Features:

Usage Example:

import { BackpressureController } from &#39;./streaming/backpressure.js&#39;;

async function producer(controller: BackpressureController<number>) {
  for (let i = 0; i < 200; i++) {
    console.log(`Producer: Pushing ${i}`);
    const accepted = await controller.push(i);
    if (!accepted) {
      console.log(`Producer: Chunk ${i} was dropped or blocked.`);
      if (controller.getState() === &#39;paused&#39;) {
        console.log(&#39;Producer: Waiting for drain...&#39;);
        await controller.waitForDrain();
        console.log(&#39;Producer: Resumed after drain.&#39;);
        class="hl-cmt">// Re-attempt push if blocked, or handle dropped
        await controller.push(i); class="hl-cmt">// Re-push if blocked
      }
    }
    await new Promise(resolve => setTimeout(resolve, 10)); class="hl-cmt">// Simulate work
  }
}

async function consumer(controller: BackpressureController<number>) {
  while (true) {
    const chunk = controller.pull();
    if (chunk !== undefined) {
      console.log(`Consumer: Pulled ${chunk}. Buffer size: ${controller.getBufferSize()}`);
      await new Promise(resolve => setTimeout(resolve, 50)); class="hl-cmt">// Simulate slow work
    } else if (controller.isEmpty() && controller.getState() === &#39;drained&#39;) {
      console.log(&#39;Consumer: Buffer drained, stopping.&#39;);
      break;
    } else {
      await new Promise(resolve => setTimeout(resolve, 10)); class="hl-cmt">// Wait for more data
    }
  }
}

const controller = new BackpressureController<number>({
  highWaterMark: 10,
  lowWaterMark: 5,
  overflowStrategy: &#39;block&#39;,
});

controller.on(&#39;pause&#39;, ({ bufferSize }) => console.warn(`Controller PAUSED. Buffer: ${bufferSize}`));
controller.on(&#39;resume&#39;, ({ bufferSize }) => console.info(`Controller RESUMED. Buffer: ${bufferSize}`));
controller.on(&#39;drain&#39;, () => console.log(&#39;Controller DRAINED.&#39;));

Promise.all([producer(controller), consumer(controller)]);

StreamProcessor (src/streaming/stream-processor.ts)

A generic stream processor for any AsyncIterable, providing fundamental stream management capabilities like buffering, pause/resume, abort, and basic retry logic. This is a more general-purpose utility compared to ChunkProcessor which is specialized for LLM deltas.

Purpose: To provide a foundational layer for processing generic asynchronous data streams with common control flow mechanisms.

Key Features:

StreamTransformer (src/streaming/stream-transformer.ts)

A static utility class offering a rich set of composable functions for transforming, filtering, and combining AsyncIterables.

Purpose: To provide a functional and declarative API for common stream manipulation patterns, making complex stream pipelines easier to build and reason about.

Key Features (Static Methods):

Usage Example:

import { StreamTransformer } from &#39;./streaming/stream-transformer.js&#39;;

async function* numberSource(count: number) {
  for (let i = 0; i < count; i++) {
    await new Promise(resolve => setTimeout(resolve, 10));
    yield i;
  }
}

async function demonstrateTransforms() {
  console.log(&#39;--- Map and Filter ---&#39;);
  const mappedAndFiltered = StreamTransformer.pipe(
    numberSource(10),
    (s) => StreamTransformer.map(s as AsyncIterable<number>, n => n * 2),
    (s) => StreamTransformer.filter(s as AsyncIterable<number>, n => n % 4 === 0)
  );
  console.log(await StreamTransformer.collect(mappedAndFiltered)); class="hl-cmt">// [0, 4, 8, 12, 16]

  console.log(&#39;--- Batch ---&#39;);
  const batched = StreamTransformer.batch(numberSource(10), 3);
  for await (const b of batched) {
    console.log(b); class="hl-cmt">// [0,1,2], [3,4,5], [6,7,8], [9]
  }

  console.log(&#39;--- Merge ---&#39;);
  async function* sourceA() { yield &#39;A1&#39;; await new Promise(r => setTimeout(r, 50)); yield &#39;A2&#39;; }
  async function* sourceB() { await new Promise(r => setTimeout(r, 20)); yield &#39;B1&#39;; await new Promise(r => setTimeout(r, 50)); yield &#39;B2&#39;; }
  console.log(await StreamTransformer.collect(StreamTransformer.merge(sourceA(), sourceB()))); class="hl-cmt">// [ &#39;B1&#39;, &#39;A1&#39;, &#39;A2&#39;, &#39;B2&#39; ] (order depends on timing)
}

demonstrateTransforms();

RetryManager & CircuitBreaker (src/streaming/retry-policy.ts)

This module provides robust error handling strategies: automatic retries with exponential backoff and the circuit breaker pattern. RetryManager orchestrates both.

Purpose: To enhance the resilience of asynchronous operations by automatically handling transient failures and preventing cascading failures to unstable services.

Key Components:

Usage Example:

import { getRetryManager, CircuitOpenError } from &#39;./streaming/retry-policy.js&#39;;

let failureCount = 0;
async function unreliableOperation(shouldFail: boolean): Promise<string> {
  if (shouldFail && failureCount < 3) { class="hl-cmt">// Fail first 3 times
    failureCount++;
    console.log(`Operation failed (attempt ${failureCount})`);
    throw new Error(&#39;Transient network error&#39;);
  }
  console.log(&#39;Operation succeeded!&#39;);
  return &#39;Data from service&#39;;
}

async function demonstrateRetry() {
  const manager = getRetryManager();
  const serviceId = &#39;my-api-service&#39;;

  manager.on(&#39;circuit-state-change&#39;, (id, from, to) =>
    console.warn(`Circuit for ${id} changed from ${from} to ${to}`)
  );
  manager.on(&#39;circuit-failure&#39;, (id, error) =>
    console.error(`Circuit for ${id} recorded failure: ${error.message}`)
  );

  try {
    console.log(&#39;\n--- Demonstrating Retry ---&#39;);
    failureCount = 0;
    const result = await manager.executeOrThrow(
      serviceId,
      () => unreliableOperation(true),
      { maxAttempts: 5, initialDelayMs: 100 }
    );
    console.log(&#39;Retry Result:&#39;, result);

    console.log(&#39;\n--- Demonstrating Circuit Breaker ---&#39;);
    class="hl-cmt">// Reset failure count for circuit breaker demo
    failureCount = 0;
    for (let i = 0; i < 10; i++) {
      try {
        console.log(`Request ${i + 1}. Circuit state: ${manager.getCircuitState(serviceId)}`);
        await manager.executeOrThrow(serviceId, () => unreliableOperation(true), { maxAttempts: 1 });
      } catch (e) {
        if (e instanceof CircuitOpenError) {
          console.error(`Request ${i + 1} rejected by circuit breaker.`);
        } else {
          console.error(`Request ${i + 1} failed: ${e.message}`);
        }
      }
      await new Promise(resolve => setTimeout(resolve, 500)); class="hl-cmt">// Wait a bit
    }
    console.log(&#39;Final Circuit Stats:&#39;, manager.getCircuitStats(serviceId));
  } catch (e) {
    console.error(&#39;Overall operation failed:&#39;, e.message);
  } finally {
    manager.resetAllCircuits();
  }
}

demonstrateRetry();

MarkdownChunker (src/streaming/markdown-chunker.ts)

A specialized utility for intelligently chunking streaming markdown content, ensuring that code blocks are not split mid-way unless absolutely necessary.

Purpose: To enable incremental display or processing of markdown content from a stream while preserving the structural integrity of code blocks, improving readability and usability.

Key Features:

Usage Example:

import { MarkdownChunker, createStreamingChunker } from &#39;./streaming/markdown-chunker.js&#39;;

const longMarkdown = `
# Introduction

This is some introductory text. It&#39;s quite long and might need to be chunked.

\`\`\`typescript
function helloWorld() {
  console.log("Hello, world!");
  class="hl-cmt">// This is a very long line that might cause a split if not handled carefully.
  class="hl-cmt">// Another long line to ensure the chunker has enough content to work with.
  class="hl-cmt">// Yet another line.
}
\`\`\`

More text after the code block.
`;

console.log(&#39;--- Streaming Chunker ---&#39;);
const streamingChunker = createStreamingChunker(
  (chunk) => {
    console.log(`Chunk (forceSplit: ${chunk.forceSplit}, reopen: ${chunk.reopenFence}):\n---\n${chunk.content}\n---`);
  },
  { softMaxChars: 100, hardMaxChars: 200, preserveCodeBlocks: true }
);

streamingChunker.write(longMarkdown.slice(0, 150));
streamingChunker.write(longMarkdown.slice(150, 300));
streamingChunker.write(longMarkdown.slice(300));
streamingChunker.flush();

console.log(&#39;\n--- Single-shot Chunker ---&#39;);
const chunks = MarkdownChunker.chunkMarkdown(longMarkdown, {
  softMaxChars: 100,
  hardMaxChars: 200,
  preserveCodeBlocks: true,
});
chunks.forEach((chunk, i) => {
  console.log(`Chunk ${i + 1} (forceSplit: ${chunk.forceSplit}, reopen: ${chunk.reopenFence}):\n---\n${chunk.content}\n---`);
});

ProgressTracker (src/streaming/progress-tracker.ts)

A utility for tracking and reporting progress across multiple, weighted stages of a long-running operation.

Purpose: To provide a structured way to monitor and communicate the progress of complex, multi-step processes, offering real-time updates and estimated time remaining.

Key Features:

Usage Example:

import { ProgressTracker } from &#39;./streaming/progress-tracker.js&#39;;

async function longRunningTask() {
  const tracker = new ProgressTracker({
    stages: [
      { name: &#39;Initialization&#39;, weight: 10 },
      { name: &#39;Processing Data&#39;, weight: 70 },
      { name: &#39;Finalization&#39;, weight: 20 },
    ],
    estimateTime: true,
    updateIntervalMs: 50,
  });

  tracker.on(&#39;progress&#39;, (update) => {
    console.log(
      `Total: ${update.totalProgress}% | Stage: ${update.currentStage} (${update.stageProgress}%) | Message: ${update.message} | ETA: ${update.estimatedTimeRemaining?.toFixed(0) || &#39;N/A&#39;}ms`
    );
  });

  tracker.start();

  class="hl-cmt">// Stage 1: Initialization
  for (let i = 0; i <= 100; i += 20) {
    tracker.updateProgress(i, &#39;Initialization&#39;, `Initializing step ${i / 20}`);
    await new Promise(resolve => setTimeout(resolve, 100));
  }
  tracker.completeStage(&#39;Initialization&#39;);

  class="hl-cmt">// Stage 2: Processing Data
  for (let i = 0; i <= 100; i += 10) {
    tracker.updateProgress(i, &#39;Processing Data&#39;, `Processing batch ${i / 10}`);
    await new Promise(resolve => setTimeout(resolve, 200));
  }
  tracker.completeStage(&#39;Processing Data&#39;);

  class="hl-cmt">// Stage 3: Finalization
  for (let i = 0; i <= 100; i += 25) {
    tracker.updateProgress(i, &#39;Finalization&#39;, `Finalizing step ${i / 25}`);
    await new Promise(resolve => setTimeout(resolve, 50));
  }
  tracker.completeStage(&#39;Finalization&#39;);

  console.log(&#39;Task Completed!&#39;);
}

longRunningTask();

Tool Streaming Convenience Functions (src/streaming/index.ts)

The index.ts file exports all components and provides high-level convenience functions for integrating tool execution with streaming progress and results. These functions leverage ToolPhaseManager and ToolPhaseThrottler (not detailed in the provided source, but part of the module).

Purpose: To simplify the process of reporting real-time progress and outcomes for asynchronous tool calls, making it easier to build interactive agents or UIs.

Key Functions:

Usage Example:

import { streamedOperation, streamedIteration } from &#39;./streaming/index.js&#39;;

async function myToolFunction(
  updateProgress: (progress: number, message?: string) => void
): Promise<string> {
  updateProgress(10, &#39;Starting complex calculation...&#39;);
  await new Promise(resolve => setTimeout(resolve, 200));
  updateProgress(50, &#39;Halfway through...&#39;);
  await new Promise(resolve => setTimeout(resolve, 300));
  updateProgress(90, &#39;Almost done...&#39;);
  await new Promise(resolve => setTimeout(resolve, 100));
  return &#39;Calculation complete!&#39;;
}

async function processItems(item: string, index: number): Promise<string> {
  console.log(`Processing item ${item} (index ${index})...`);
  await new Promise(resolve => setTimeout(resolve, 150));
  return `Processed: ${item}`;
}

async function demonstrateToolStreaming() {
  console.log(&#39;--- Streamed Operation ---&#39;);
  try {
    const result = await streamedOperation(
      &#39;tool-call-123&#39;,
      &#39;ComplexTool&#39;,
      myToolFunction,
      (event) => console.log(`[ToolPhase] ${event.phase}: ${event.message || &#39;&#39;} (${event.progress || 0}%)`)
    );
    console.log(&#39;Streamed Operation Result:&#39;, result);
  } catch (error) {
    console.error(&#39;Streamed Operation Failed:&#39;, error.message);
  }

  console.log(&#39;\n--- Streamed Iteration ---&#39;);
  try {
    const items = [&#39;itemA&#39;, &#39;itemB&#39;, &#39;itemC&#39;];
    const results = await streamedIteration(
      &#39;tool-call-456&#39;,
      &#39;BatchProcessor&#39;,
      items,
      processItems,
      (event) => console.log(`[ToolPhase] ${event.phase}: ${event.message || &#39;&#39;} (${event.progress || 0}%)`)
    );
    console.log(&#39;Streamed Iteration Results:&#39;, results);
  } catch (error) {
    console.error(&#39;Streamed Iteration Failed:&#39;, error.message);
  }
}

demonstrateToolStreaming();

Integration Points

The src/streaming module is designed to integrate broadly across the codebase:

Contributing to the Module

When contributing to the src/streaming module, consider the following: