tests — concurrency
tests — concurrency
This document provides an overview of the concurrency primitives tested within the tests/concurrency module. While this module primarily contains integration and unit tests, it serves as a comprehensive guide to understanding the design, functionality, and usage of the LaneQueue and SessionLane/LaneManager systems.
These concurrency modules are designed to manage asynchronous tasks, ensuring ordered execution, resource isolation, and robust error handling in complex, multi-task environments.
Concurrency Primitives Overview
The tests/concurrency module validates two distinct, yet conceptually related, concurrency management systems:
LaneQueue: A high-level, globally-aware queue system that manages tasks across multiple named "lanes" with a global concurrency limit. It supports priority, retries, and cancellation.SessionLaneandLaneManager: A more granular system for managing individual task queues (SessionLane) and orchestrating multiple such lanes (LaneManager), often used for session-specific or entity-specific task ordering.
While both systems utilize the concept of "lanes" for task isolation, they address different levels of concurrency management. LaneQueue focuses on global resource limits and cross-lane coordination, whereas SessionLane and LaneManager prioritize strict ordering and lifecycle management within specific logical units.
LaneQueue Module Documentation
The LaneQueue module (src/concurrency/lane-queue.js) provides a robust mechanism for managing asynchronous tasks across multiple isolated "lanes" while adhering to a global concurrency limit. It's ideal for scenarios where tasks need to be grouped logically (e.g., by user session, resource ID) but also need to share a limited pool of execution resources.
Purpose
LaneQueue ensures that:
- Tasks within different lanes can run concurrently without blocking each other, up to a global
maxParallellimit. - Tasks within the same lane are processed in a defined order (e.g., by priority).
- Transient failures can be automatically retried for idempotent operations.
- Tasks can be cancelled or managed through their lifecycle.
Key Features
- Multi-lane Isolation: Tasks enqueued to different lane names (
lane-A,lane-B) are treated as independent streams, allowing parallel execution across lanes. - Global Concurrency Limit: The
maxParalleloption limits the total number of tasks executing concurrently across all lanes. - Priority Ordering: Tasks within a single lane can be assigned a
priority(lower number = higher priority) to influence their execution order. - Retry Logic: Idempotent tasks can be configured with
retriesandretryDelayto automatically handle transient errors. - Cancellation: Pending tasks within a specific lane can be cancelled, rejecting their promises.
- Global Statistics: Provides aggregated statistics on total, completed, and failed tasks across all managed lanes.
- Timeout: Tasks can be configured with a
defaultTimeoutto prevent indefinite hangs.
Usage Patterns
Initialization
A LaneQueue instance is created with configuration options:
import { LaneQueue } from 39;../../src/concurrency/lane-queue.js39;;
const queue = new LaneQueue({
maxParallel: 2, class="hl-cmt">// Max 2 tasks running concurrently across all lanes
defaultTimeout: 5000 class="hl-cmt">// Default timeout for tasks
});
The resetLaneQueue() function is available for clearing the singleton instance, primarily used in testing.
Enqueuing Tasks
Tasks are enqueued to a specific laneName and are represented by an async function. Options can be provided for priority, retries, and idempotency.
class="hl-cmt">// Enqueue a task to 39;lane-A39;
const resultA = await queue.enqueue(39;lane-A39;, async () => {
class="hl-cmt">// Simulate async work
await new Promise(r => setTimeout(r, 50));
return 39;Task A completed39;;
});
class="hl-cmt">// Enqueue a higher priority task to 39;priority-lane39;
const resultP = await queue.enqueue(39;priority-lane39;, async () => {
return 39;High priority task39;;
}, { priority: 10 }); class="hl-cmt">// Higher number = higher priority
class="hl-cmt">// Enqueue an idempotent task with retries
let attempts = 0;
const retryResult = await queue.enqueue(39;retry-lane39;, async () => {
attempts++;
if (attempts < 3) {
throw new Error(39;Transient failure39;);
}
return 39;Success after retries39;;
}, { idempotent: true, retries: 3, retryDelay: 100 });
Managing Lanes
Individual lanes can be paused, resumed, or have their pending tasks cancelled.
class="hl-cmt">// Pause a specific lane
queue.pause(39;my-lane39;);
class="hl-cmt">// Resume a specific lane
queue.resume(39;my-lane39;);
class="hl-cmt">// Cancel all pending tasks in a lane
const cancelledCount = queue.cancelPending(39;my-lane39;); class="hl-cmt">// Returns number of tasks cancelled
Monitoring and Cleanup
Global statistics and a formatted status string are available. The queue should be cleared when no longer needed.
class="hl-cmt">// Get global statistics
const stats = queue.getGlobalStats();
console.log(`Total tasks: ${stats.totalTasks}, Completed: ${stats.completedTasks}`);
class="hl-cmt">// Get a human-readable status string
const statusString = queue.formatStatus();
console.log(statusString);
class="hl-cmt">// Clear all tasks and lanes
queue.clear();
API Reference (from tests)
new LaneQueue(options: { maxParallel?: number; defaultTimeout?: number }): Creates a newLaneQueueinstance.enqueue: Adds a task to the specified lane.(laneName: string, task: () => Promise , options?: { priority?: number; idempotent?: boolean; retries?: number; retryDelay?: number }): Promise pause(laneName: string): Pauses processing for a specific lane.resume(laneName: string): Resumes processing for a specific lane.cancelPending(laneName: string): number: Cancels all pending tasks in a lane, returning the count of cancelled tasks.getGlobalStats(): { totalTasks: number; completedTasks: number; failedTasks: number; activeTasks: number; pendingTasks: number; cancelledTasks: number }: Returns aggregated statistics across all lanes.formatStatus(): string: Returns a formatted string representing the current status of all lanes.clear(): Clears all tasks and lanes managed by thisLaneQueueinstance.resetLaneQueue(): (Utility) Resets the internal singleton instance ofLaneQueue.
SessionLane and LaneManager Module Documentation
The SessionLane and LaneManager modules (src/concurrency/lanes.js) provide a more granular approach to managing task queues, particularly useful for scenarios where strict ordering and lifecycle management are required for individual "sessions" or entities.
SessionLane
A SessionLane represents a single, independent queue of tasks. It processes items sequentially (FIFO by default, or by priority) and provides detailed control over its lifecycle and events.
Purpose
SessionLane ensures that:
- Tasks for a specific session are processed one after another, maintaining order.
- The state and progress of an individual session's tasks can be monitored.
- Custom processing logic can be applied to each task.
Key Features
- FIFO or Priority Queue: By default, tasks are processed in First-In, First-Out order. Can be configured for priority-based processing.
- Event Emitter: Emits events (
enqueue,start,complete,drain,error) for lifecycle monitoring. - Custom Processor: A
setProcessorfunction defines how each enqueued item is handled. - Lifecycle Control: Can be explicitly
startProcessing(),pause(),resume(), andclear()tasks. - Queue Limits: Can enforce a
maxQueueSizeto prevent unbounded growth. - Current Item Tracking: Provides access to the currently processing item.
Usage Patterns
Initialization and Processor Setup
A SessionLane is created, and its processing logic is defined using setProcessor.
import { SessionLane } from 39;../../src/concurrency/lanes.js39;;
const lane = new SessionLane({ autoStart: false, maxQueueSize: 10 });
const results: string[] = [];
lane.setProcessor(async (item) => {
class="hl-cmt">// Simulate async work
await new Promise(r => setTimeout(r, 10));
results.push(item.payload as string);
});
class="hl-cmt">// Listen for events
lane.on(39;complete39;, () => console.log(39;Task completed in lane39;));
lane.on(39;error39;, (err) => console.error(39;Lane error:39;, err.message));
Enqueuing and Processing Tasks
Tasks are enqueued with a payload. If autoStart is false, startProcessing() must be called.
lane.enqueue(39;task-139;);
lane.enqueue(39;task-239;, { priority: 5 }); class="hl-cmt">// For priority-based lanes
lane.enqueue(39;task-339;);
lane.startProcessing(); class="hl-cmt">// Start processing if autoStart was false
await lane.drain(); class="hl-cmt">// Wait for all tasks in this lane to complete
expect(results).toEqual([39;task-139;, 39;task-239;, 39;task-339;]); class="hl-cmt">// If FIFO
Lifecycle Management
lane.pause();
console.log(lane.getStatus()); class="hl-cmt">// 39;paused39;
lane.resume();
console.log(lane.getStatus()); class="hl-cmt">// 39;active39;
const clearedItems = lane.clear(); class="hl-cmt">// Clears all pending items
LaneManager
The LaneManager acts as a central registry and orchestrator for multiple SessionLane instances. It provides a singleton pattern and convenience methods for managing tasks across different lanes.
Purpose
LaneManager simplifies the management of multiple SessionLanes by:
- Providing a single point of access for creating and retrieving lanes by name.
- Allowing global operations (e.g.,
pauseAll,drainAll). - Offering a default processor for newly created lanes.
- Ensuring a singleton instance across the application.
Key Features
- Lane Creation/Retrieval:
getLane(name)creates a newSessionLaneif one doesn't exist for the given name, or returns the existing one. - Singleton Instance:
getLaneManager()ensures only oneLaneManagerinstance exists globally.resetLaneManager()is for testing. - Default Processor:
setDefaultProcessor()sets a common processing function for all new lanes. - Global Operations:
pauseAll(),resumeAll(),drainAll(),clearAll()affect all managed lanes. - Statistics:
getStats()provides an overview of all lanes and their pending tasks.
Usage Patterns
Accessing the Singleton and Managing Lanes
import { getLaneManager, resetLaneManager } from 39;../../src/concurrency/lanes.js39;;
resetLaneManager(); class="hl-cmt">// For clean state in tests
const manager = getLaneManager();
class="hl-cmt">// Set a default processor for all new lanes
manager.setDefaultProcessor(async (item) => {
console.log(`Processing item for lane ${item.laneName}: ${item.payload}`);
});
class="hl-cmt">// Get or create lanes
const lane1 = manager.getLane(39;session-139;);
const lane2 = manager.getLane(39;session-239;);
manager.enqueue(39;session-139;, 39;message-A39;);
manager.enqueue(39;session-239;, 39;message-B39;);
await manager.drainAll(); class="hl-cmt">// Wait for all tasks in all lanes to complete
Global Control
manager.pauseAll();
console.log(manager.getLane(39;session-139;).getStatus()); class="hl-cmt">// 39;paused39;
manager.resumeAll();
manager.clearAll(); class="hl-cmt">// Clears all tasks from all lanes and removes lanes
Monitoring
const stats = manager.getStats();
console.log(`Total lanes: ${stats.totalLanes}, Total pending: ${stats.totalPending}`);
console.log(stats.lanes[39;session-139;]); class="hl-cmt">// Lane-specific stats
Higher-Order Utilities
The lanes module also provides convenience functions for integrating lane-based concurrency directly into your application logic.
withLane
Executes a given function within the context of a specific lane, ensuring its execution is ordered relative to other tasks in that lane.
import { withLane } from 39;../../src/concurrency/lanes.js39;;
async function processData(sessionId: string, data: any) {
return await withLane(sessionId, data, async (payload) => {
class="hl-cmt">// This function will be enqueued and processed by the session39;s lane
console.log(`Processing data for ${sessionId}: ${payload}`);
await new Promise(r => setTimeout(r, 100));
return `Processed: ${payload}`;
});
}
class="hl-cmt">// Calls to processData for the same sessionId will be ordered
const p1 = processData(39;user-12339;, 39;request-139;);
const p2 = processData(39;user-12339;, 39;request-239;);
const p3 = processData(39;user-45639;, 39;request-A39;); class="hl-cmt">// This will run in parallel with user-123 tasks
await Promise.all([p1, p2, p3]);
createLanedFunction
Creates a new function that automatically enqueues its calls into a specific lane based on a provided getLaneName resolver. This is useful for creating "laned" versions of existing functions.
import { createLanedFunction } from 39;../../src/concurrency/lanes.js39;;
interface MyPayload { session: string; value: string; }
const myLanedProcessor = createLanedFunction<MyPayload, string>(
(payload) => payload.session, class="hl-cmt">// Resolver to get the lane name from the payload
async (payload) => {
class="hl-cmt">// This is the actual processing logic
await new Promise(r => setTimeout(r, 50));
return `Processed ${payload.value} for ${payload.session}`;
}
);
class="hl-cmt">// Calls to myLanedProcessor will be automatically routed to the correct lane
const r1 = myLanedProcessor({ session: 39;user-X39;, value: 39;data-139; });
const r2 = myLanedProcessor({ session: 39;user-X39;, value: 39;data-239; });
const r3 = myLanedProcessor({ session: 39;user-Y39;, value: 39;data-A39; });
await Promise.all([r1, r2, r3]);
class="hl-cmt">// r1 and r2 for 39;user-X39; will be processed sequentially
class="hl-cmt">// r3 for 39;user-Y39; will be processed in parallel
API Reference (from tests)
SessionLane
new SessionLane(options?: { autoStart?: boolean; maxQueueSize?: number; fifo?: boolean }): Creates a new lane.setProcessor: Sets the function to process enqueued items.(processor: (item: LaneItem ) => Promise ) enqueue: Adds an item to the lane's queue.(payload: T, options?: { priority?: number }): LaneItem drain(): Promise: Waits for all items in the queue to be processed.on(event: 'enqueue' | 'start' | 'complete' | 'drain' | 'error', listener: Function): Registers event listeners.getStatus(): 'idle' | 'active' | 'paused': Returns the current status of the lane.getQueueLength(): number: Returns the number of pending items in the queue.getCurrentItem: Returns the item currently being processed.(): LaneItem | undefined startProcessing(): Explicitly starts processing the queue.pause(): Pauses the lane's processing.resume(): Resumes the lane's processing.clear(): LaneItem: Clears all pending items from the queue, returning the cleared items.[]
LaneManager
new LaneManager(options?: { autoStart?: boolean; maxQueueSize?: number; fifo?: boolean }): Creates a new manager.getLane(name: string): SessionLane: Retrieves or creates aSessionLaneby name.hasLane(name: string): boolean: Checks if a lane with the given name exists.removeLane(name: string): Removes a lane from the manager.setDefaultProcessor: Sets the default processor for new lanes.(processor: (item: LaneItem ) => Promise ) enqueue: Enqueues an item to a specific lane.(laneName: string, payload: T, options?: { priority?: number }): LaneItem getTotalPending(): number: Returns the total number of pending items across all lanes.pauseAll(): Pauses all managed lanes.resumeAll(): Resumes all managed lanes.drainAll(): Promise: Waits for all tasks in all managed lanes to complete.getStats(): { totalLanes: number; totalPending: number; lanes: { [key: string]: { pending: number; status: string } } }: Returns statistics for all lanes.clearAll(): Clears all tasks from all lanes and removes the lanes.getActiveSessions(): string[]: Returns an array of names of active lanes.
Utilities
getLaneManager(): LaneManager: Returns the singletonLaneManagerinstance.resetLaneManager(): Resets the singletonLaneManagerinstance (primarily for testing).withLane: Executes a function within a specific lane.(laneName: string, payload: T, fn: (payload: T) => Promise , options?: { priority?: number }): Promise createLanedFunction: Creates a function that automatically enqueues its calls to a lane.(getLaneName: (payload: P) => string, fn: (payload: P) => Promise
, options?: { priority?: number }): (payload: P) => Promise
Architectural Relationship
The SessionLane and LaneManager components form a cohesive system for managing ordered task execution within logical "sessions." The LaneManager acts as the central hub, providing access to and control over individual SessionLane instances. Higher-order utilities like withLane and createLanedFunction abstract away the direct interaction with LaneManager, making it easier to integrate lane-based concurrency into application logic.
The LaneQueue module, while sharing the "lane" concept, operates as a separate, parallel concurrency system. It focuses on global resource management and cross-lane concurrency limits, rather than the strict in-lane ordering and eventing provided by SessionLane.
graph TD
subgraph Lane Management System
A[LaneManager Singleton] --> B{getLane(name)}
B --> C[SessionLane Instance 1]
B --> D[SessionLane Instance 2]
B --> E[...]
F[withLane()] --> A
G[createLanedFunction()] --> A
end
subgraph Global Concurrency System
H[LaneQueue Instance]
end
style A fill:#f9f,stroke:#333,stroke-width:2px
style H fill:#ccf,stroke:#333,stroke-width:2px
Explanation:
- The
LaneManageris a singleton that orchestrates multipleSessionLaneinstances. withLaneandcreateLanedFunctionare convenience wrappers that interact with theLaneManagerto enqueue tasks into appropriateSessionLanes.LaneQueueis a distinct concurrency primitive, managing its own set of lanes and global concurrency limits, independent ofLaneManagerandSessionLane.
Contributing and Extending
When contributing to or extending these concurrency modules:
LaneQueue: Consider its global impact. Changes tomaxParallelor retry logic affect the entire application's task throughput. New features should align with global resource management and cross-lane isolation.SessionLane: Focus on the behavior of a single queue. Enhancements might include new event types, more sophisticated queueing algorithms (iffifo: falseis expanded), or advanced error handling within a single lane's context.LaneManager: This is the orchestration layer forSessionLanes. Extensions here might involve new ways to group or prioritize lanes, or more comprehensive global statistics. Ensure changes maintain the singleton pattern and its reset mechanism.- Utilities (
withLane,createLanedFunction): These are designed for developer convenience. New utilities should simplify common concurrency patterns usingLaneManagerwithout exposing its internal complexities.
Always ensure that changes are thoroughly tested, especially considering the asynchronous and concurrent nature of these modules. The existing test suite provides a strong foundation for verifying correct behavior under various conditions.