src — concurrency
src — concurrency
The src/concurrency module provides essential utilities for managing asynchronous operations, ensuring order, preventing race conditions, and applying backpressure in multi-session or multi-channel environments. It offers two distinct, yet complementary, concurrency control systems:
- Session Lanes (
SessionLane,LaneManager): A simpler, session-based queueing system primarily focused on ensuring strict sequential processing of operations for a given session. - Lane Queue (
LaneQueue): An "Enterprise-grade" system that implements a "Default Serial, Explicit Parallel" pattern, offering more granular control over task execution within a session, including explicit parallel execution, retries, and timeouts.
This documentation will detail each system, its purpose, how it works, and how to use it effectively.
1. Session Lanes (FIFO/Priority Queues per Session)
The SessionLane and LaneManager system is designed to prevent message interleaving and ensure that operations related to a specific session are processed in a defined order (FIFO or by priority). This is crucial for maintaining state consistency when multiple requests for the same session might be in flight simultaneously.
1.1 Core Concepts
SessionLane: Represents a single processing queue for a specific session. It holdsLaneItems and processes them one by one using a registeredprocessorfunction.LaneItem: The unit of work enqueued into aSessionLane. It contains thepayload(the actual message/request), a uniqueid,enqueuedAttimestamp,priority, and optionalmetadata.LaneManager: A central registry that manages multipleSessionLaneinstances. It acts as a factory for lanes, ensuring that each session ID maps to a uniqueSessionLane.
1.2 How it Works
- Lane Creation: When an operation needs to be processed for a
sessionId,LaneManager.getLane(sessionId)is called. If a lane for that session doesn't exist, a newSessionLaneis created and registered. - Processor Assignment: A
processorfunction (anasyncfunction that takes aLaneItemand returns aPromise) must be set for theSessionLane. This function defines how each item in the lane will be processed. TheLaneManagercan also set adefaultProcessorfor all new lanes. - Enqueueing:
SessionLane.enqueue(payload, options)adds a newLaneItemto the lane's internal queue. Items are ordered either strictly FIFO (config.fifo: true) or bypriority(lower number = higher priority). - Processing Loop:
- When an item is enqueued, if
config.autoStartis true and the lane isidle,SessionLane.startProcessing()is called. SessionLane.processQueue()continuously shifts items from the queue and executes the registeredprocessorfor each.- Processing occurs sequentially: one item completes before the next begins.
- A
processingTimeoutMscan be configured to prevent long-running tasks from blocking the lane indefinitely.
- Status and Events:
SessionLaneemits events (enqueue,start,complete,error,drain,pause,resume) to signal lifecycle changes of items and the lane itself. It also exposes itsstatus(idle,processing,paused,draining) and queue length.
Simplified Architecture
graph TD
A[LaneManager] --> B{SessionLane (sessionId)};
B -- Manages --> C[Queue: LaneItem<T>];
C -- Processes --> D[Processor: (item: LaneItem<T>) => Promise<unknown>];
D -- Emits Events --> B;
1.3 Key Components & Usage
SessionLane Class
constructor(config?: Partial: Initializes a lane with optional configuration.) setProcessor(processor: (item: LaneItem: Defines the asynchronous function that will process each) => Promise ) LaneItem. This is mandatory for the lane to function.enqueue(payload: T, options?: { priority?: number; metadata?: Record: Adds a payload to the lane's queue. Returns the created}) LaneItem.startProcessing(): Manually starts the processing loop ifautoStartis false or the lane is paused.pause()/resume(): Controls the processing flow of the lane.drain(): Asynchronously waits for all currently pending items to be processed.clear(): Removes all pending items from the queue.getStatus()/getQueueLength()/getCurrentItem()/getStats(): Provide insights into the lane's current state.
LaneManager Class
constructor(config?: Partial: Initializes the manager with default lane configuration.) setDefaultProcessor(processor: (item: LaneItem: Sets a processor that will be automatically assigned to any new) => Promise ) SessionLanecreated by this manager.getLane(sessionId: string): Retrieves or creates aSessionLanefor the givensessionId.enqueue(sessionId: string, payload: T, options?: ...): A convenience method to enqueue a payload directly to a session's lane via the manager.removeLane(sessionId: string): Removes a lane and clears its pending items.pauseAll()/resumeAll()/drainAll()/clearAll(): Global operations across all managed lanes.getStats(): Provides aggregated statistics across all lanes.
Singleton Access
getLaneManager: Provides access to a global singleton(config?: Partial ) LaneManagerinstance. This is the recommended way to interact with the manager across your application.resetLaneManager(): Clears all lanes from the singleton manager and resets the instance. Useful for testing or application shutdown.
Convenience Functions
withLane: A high-level function to execute a single(sessionId: string, payload: T, processor: (payload: T) => Promise , config?: Partial ) processorfunction for apayloadwithin a specificsessionId's lane. It handles setting up the processor and enqueueing the item, returning aPromisethat resolves with the processor's result.createLanedFunction: Creates a reusable async function that automatically enqueues its calls into the appropriate session lane. The(getSessionId: (payload: T) => string, processor: (payload: T) => Promise , config?: Partial ) getSessionIdfunction determines which lane to use based on the inputpayload. This is ideal for wrapping existing functions that need lane-ordered execution.
1.4 Example Usage
import { getLaneManager, createLanedFunction } from 39;./concurrency/lanes.js39;;
interface UserAction {
userId: string;
action: string;
data: any;
}
class="hl-cmt">// 1. Using LaneManager directly
const manager = getLaneManager<UserAction>();
manager.setDefaultProcessor(async (item) => {
console.log(`[${item.id}] Processing action for user ${item.payload.userId}: ${item.payload.action}`);
await new Promise(resolve => setTimeout(resolve, 100)); class="hl-cmt">// Simulate async work
return `Processed: ${item.payload.action}`;
});
manager.enqueue(39;user-12339;, { userId: 39;user-12339;, action: 39;updateProfile39;, data: { name: 39;Alice39; } });
manager.enqueue(39;user-45639;, { userId: 39;user-45639;, action: 39;fetchData39;, data: {} });
manager.enqueue(39;user-12339;, { userId: 39;user-12339;, action: 39;sendNotification39;, data: { message: 39;Welcome!39; } });
class="hl-cmt">// Operations for user-123 will be processed in order.
class="hl-cmt">// 2. Using createLanedFunction for a reusable API
const processUserAction = createLanedFunction<UserAction, string>(
(payload) => payload.userId, class="hl-cmt">// How to get the session ID from the payload
async (payload) => {
console.log(`[LanedFn] Processing action for user ${payload.userId}: ${payload.action}`);
await new Promise(resolve => setTimeout(resolve, 50));
return `Completed: ${payload.action}`;
}
);
async function runLanedFunctionExample() {
const p1 = processUserAction({ userId: 39;user-78939;, action: 39;login39;, data: {} });
const p2 = processUserAction({ userId: 39;user-10139;, action: 39;viewPage39;, data: { page: 39;/home39; } });
const p3 = processUserAction({ userId: 39;user-78939;, action: 39;logout39;, data: {} });
await Promise.all([p1, p2, p3]);
console.log(39;All laned functions completed.39;);
}
runLanedFunctionExample();
2. Lane Queue (Default Serial, Explicit Parallel)
The LaneQueue system provides a more sophisticated concurrency model, Advanced enterprise architecture for "Default Serial, Explicit Parallel" pattern. This means that tasks within a session's lane execute serially by default, but can be explicitly marked as parallel if they are safe to run concurrently with other parallel tasks in the same lane. This is ideal for scenarios where some operations (e.g., writes, state modifications) must be strictly ordered, while others (e.g., reads, idempotent operations) can benefit from parallel execution.
2.1 Core Concepts
LaneQueue: The central manager for all session lanes. It orchestrates task distribution and execution.Lane: Represents a session's execution context. EachLanemaintains separatependingandrunningtask queues.Task: The unit of work. Each task encapsulates anasyncfunction (fn),TaskOptions(e.g.,parallel,priority,timeout,retries), and its currentstatus.- "Default Serial, Explicit Parallel":
- Serial Tasks: If
options.parallelisfalse(default), a task will only start if no other tasks (serial or parallel) are currently running in its lane. This ensures strict ordering for state-modifying operations. - Parallel Tasks: If
options.parallelistrue, a task can run concurrently with other parallel tasks, up to amaxParallellimit defined in theLaneQueueConfig. However, a serial task will still block all parallel tasks from starting if it's currently running.
2.2 How it Works
- Enqueueing:
LaneQueue.enqueue(laneId, fn, options)creates aTaskobject and adds it to thependingqueue of the specifiedlaneId. Tasks are sorted bypriority(higher value = higher priority). - Lane Processing (
processLane):
- Each
Lanehas an asynchronousprocessLaneloop that runs when tasks are enqueued or completed. - It checks if the lane is
pausedor alreadyprocessing. - It calls
getNextTasksto determine which tasks from thependingqueue can be moved torunning.
- Task Selection (
getNextTasks): This is the core logic for "Default Serial, Explicit Parallel":
- If there are any tasks currently
runningin the lane, no new serial tasks can start. - If there are no
runningtasks, the highest priority serial task can start. If a serial task starts, no other tasks (serial or parallel) can start until it completes. - Parallel tasks can start if the
maxParallellimit for the lane has not been reached, and no serial task is currently running.
- Task Execution (
executeTask):
- A task is moved from
pendingtorunning. - It's executed using
executeWithTimeoutto enforce the configuredtimeout. - If the task fails and
options.idempotentis true andretriesare available, it will be retried after aretryDelay. - Upon completion (success or final failure), the task is removed from
running, andprocessLaneis called again to continue processing.
- Events and Stats:
LaneQueueemits events (task:enqueued,task:started,task:completed,task:failed,task:cancelled,lane:created,lane:paused,lane:resumed,lane:drained) for detailed monitoring. It also maintainsLaneStatsfor each lane and global statistics.
Simplified Architecture
graph TD
A[LaneQueue] --> B{Lane (laneId)};
B -- Enqueues --> C[Pending Tasks (sorted by priority)];
B -- Runs --> D[Running Tasks];
C -- getNextTasks --> D;
D -- executeTask --> E[Task fn() with timeout/retries];
E -- Emits Events --> A;
2.3 Key Components & Usage
LaneQueue Class
constructor(config?: Partial: Initializes the queue with optional configuration, including) maxParallel,defaultTimeout,defaultRetries, etc.enqueue: The primary method to add a task.(laneId: string, fn: () => Promise , options?: TaskOptions) fnis the asynchronous function to execute.optionsallow specifyingparallel,priority,timeout,idempotent,retries, etc. Returns aPromisethat resolves with the task's result.pause(laneId: string)/resume(laneId: string): Controls the processing flow for a specific lane.cancelPending(laneId: string): Rejects and removes all tasks currently in thependingqueue for a lane.getLane(laneId: string)/listLanes(): Retrieve lane information.getStats(laneId: string)/getGlobalStats(): Access performance and status statistics.removeLane(laneId: string): Removes a lane and cancels its pending tasks.clear(): Clears all lanes and their pending tasks.formatStatus(): Returns a human-readable string of the current queue status.
Singleton Access
getLaneQueue(config?: Partial: Provides access to a global singleton) LaneQueueinstance.resetLaneQueue(): Clears all lanes from the singleton queue and resets the instance.
2.4 Example Usage
import { getLaneQueue } from 39;./concurrency/lane-queue.js39;;
const queue = getLaneQueue({ maxParallel: 2 }); class="hl-cmt">// Allow up to 2 parallel tasks per lane
async function runLaneQueueExample() {
console.log(39;--- LaneQueue Example ---39;);
class="hl-cmt">// Serial task (default) - will block other tasks in 39;session-A39;
const serialTask = queue.enqueue(39;session-A39;, async () => {
console.log(39;[session-A] Serial task START39;);
await new Promise(resolve => setTimeout(resolve, 200));
console.log(39;[session-A] Serial task END39;);
return 39;Serial Done39;;
});
class="hl-cmt">// Parallel tasks - will run concurrently if no serial task is active
const parallelTask1 = queue.enqueue(39;session-A39;, async () => {
console.log(39;[session-A] Parallel 1 START39;);
await new Promise(resolve => setTimeout(resolve, 100));
console.log(39;[session-A] Parallel 1 END39;);
return 39;Parallel 1 Done39;;
}, { parallel: true });
const parallelTask2 = queue.enqueue(39;session-A39;, async () => {
console.log(39;[session-A] Parallel 2 START39;);
await new Promise(resolve => setTimeout(resolve, 150));
console.log(39;[session-A] Parallel 2 END39;);
return 39;Parallel 2 Done39;;
}, { parallel: true });
const parallelTask3 = queue.enqueue(39;session-A39;, async () => {
console.log(39;[session-A] Parallel 3 START (will wait for one of the first two to finish)39;);
await new Promise(resolve => setTimeout(resolve, 50));
console.log(39;[session-A] Parallel 3 END39;);
return 39;Parallel 3 Done39;;
}, { parallel: true });
class="hl-cmt">// Task for another session - runs independently
const otherSessionTask = queue.enqueue(39;session-B39;, async () => {
console.log(39;[session-B] Independent task START39;);
await new Promise(resolve => setTimeout(resolve, 100));
console.log(39;[session-B] Independent task END39;);
return 39;Session B Done39;;
});
try {
const results = await Promise.all([
serialTask,
parallelTask1,
parallelTask2,
parallelTask3,
otherSessionTask
]);
console.log(39;All tasks completed:39;, results);
} catch (error) {
console.error(39;A task failed:39;, error);
}
console.log(39;\n39; + queue.formatStatus());
}
runLaneQueueExample();
3. Choosing the Right Concurrency System
- Use
SessionLane/LaneManagerwhen: - You need a simple, strict FIFO or priority-based queue for operations related to a specific session.
- All operations within a session are inherently serial and modifying shared state.
- You primarily need to prevent message interleaving.
- The
withLaneorcreateLanedFunctionconvenience wrappers fit your use case well.
- Use
LaneQueuewhen: - You need more fine-grained control over concurrency within a session.
- Some operations can safely run in parallel (e.g., read-only API calls) while others must be strictly serial (e.g., database writes).
- You require features like task timeouts, automatic retries for idempotent operations, or explicit priority management.
- You want to manage backpressure by limiting
maxParalleltasks ormaxPendingtasks per lane.
4. Integration with the Codebase
Both concurrency systems are designed as foundational utilities.
LaneQueueIntegration: TheLaneQueueis actively used by thesrc/channels/core.tsmodule. Specifically,enqueueMessageutilizesLaneQueue.enqueueto manage message processing for channels, ensuring that messages within a channel are handled according to the "Default Serial, Explicit Parallel" model. This prevents race conditions and ensures message order where necessary.src/channels/core.ts→getChannelLaneQueue()→LaneQueuesrc/channels/core.ts→enqueueMessage()→LaneQueue.enqueue()src/channels/core.ts→resetChannelLaneQueue()→LaneQueue.clear()
SessionLane/LaneManagerIntegration: WhileLaneQueueis used for core channel messaging, theSessionLanesystem provides a more general-purpose session-based queueing mechanism. Its convenience functions (withLane,createLanedFunction) are particularly useful for wrapping business logic that needs to guarantee sequential execution for specific entities (e.g., user sessions, document IDs) without needing the explicit parallel/serial distinction ofLaneQueue.- The
onErrorevent ofSessionLaneis observed by various error boundary components (e.g.,components/error-boundaries/tool-error-boundary.tsx,ui/components/ErrorBoundary.tsx,components/error-boundaries/file-error-boundary.tsx), suggesting that lane processing errors are captured and handled at the UI level.
This module serves as a robust foundation for managing complex asynchronous workflows, providing developers with the tools to build reliable and performant concurrent systems.