tests — concurrency

Module: tests-concurrency Cohesion: 0.80 Members: 0

tests — concurrency

This document provides an overview of the concurrency primitives tested within the tests/concurrency module. While this module primarily contains integration and unit tests, it serves as a comprehensive guide to understanding the design, functionality, and usage of the LaneQueue and SessionLane/LaneManager systems.

These concurrency modules are designed to manage asynchronous tasks, ensuring ordered execution, resource isolation, and robust error handling in complex, multi-task environments.

Concurrency Primitives Overview

The tests/concurrency module validates two distinct, yet conceptually related, concurrency management systems:

  1. LaneQueue: A high-level, globally-aware queue system that manages tasks across multiple named "lanes" with a global concurrency limit. It supports priority, retries, and cancellation.
  2. SessionLane and LaneManager: A more granular system for managing individual task queues (SessionLane) and orchestrating multiple such lanes (LaneManager), often used for session-specific or entity-specific task ordering.

While both systems utilize the concept of "lanes" for task isolation, they address different levels of concurrency management. LaneQueue focuses on global resource limits and cross-lane coordination, whereas SessionLane and LaneManager prioritize strict ordering and lifecycle management within specific logical units.


LaneQueue Module Documentation

The LaneQueue module (src/concurrency/lane-queue.js) provides a robust mechanism for managing asynchronous tasks across multiple isolated "lanes" while adhering to a global concurrency limit. It's ideal for scenarios where tasks need to be grouped logically (e.g., by user session, resource ID) but also need to share a limited pool of execution resources.

Purpose

LaneQueue ensures that:

Key Features

Usage Patterns

Initialization

A LaneQueue instance is created with configuration options:

import { LaneQueue } from '../../src/concurrency/lane-queue.js';

const queue = new LaneQueue({
  maxParallel: 2,       class="hl-cmt">// Max 2 tasks running concurrently across all lanes
  defaultTimeout: 5000  class="hl-cmt">// Default timeout for tasks
});

The resetLaneQueue() function is available for clearing the singleton instance, primarily used in testing.

Enqueuing Tasks

Tasks are enqueued to a specific laneName and are represented by an async function. Options can be provided for priority, retries, and idempotency.

class="hl-cmt">// Enqueue a task to 'lane-A'
const resultA = await queue.enqueue('lane-A', async () => {
  class="hl-cmt">// Simulate async work
  await new Promise(r => setTimeout(r, 50));
  return 'Task A completed';
});

class="hl-cmt">// Enqueue a higher priority task to 'priority-lane'
const resultP = await queue.enqueue('priority-lane', async () => {
  return 'High priority task';
}, { priority: 10 }); class="hl-cmt">// Higher number = higher priority

class="hl-cmt">// Enqueue an idempotent task with retries
let attempts = 0;
const retryResult = await queue.enqueue('retry-lane', async () => {
  attempts++;
  if (attempts < 3) {
    throw new Error(&#39;Transient failure&#39;);
  }
  return &#39;Success after retries&#39;;
}, { idempotent: true, retries: 3, retryDelay: 100 });

Managing Lanes

Individual lanes can be paused, resumed, or have their pending tasks cancelled.

class="hl-cmt">// Pause a specific lane
queue.pause(&#39;my-lane&#39;);

class="hl-cmt">// Resume a specific lane
queue.resume(&#39;my-lane&#39;);

class="hl-cmt">// Cancel all pending tasks in a lane
const cancelledCount = queue.cancelPending(&#39;my-lane&#39;); class="hl-cmt">// Returns number of tasks cancelled

Monitoring and Cleanup

Global statistics and a formatted status string are available. The queue should be cleared when no longer needed.

class="hl-cmt">// Get global statistics
const stats = queue.getGlobalStats();
console.log(`Total tasks: ${stats.totalTasks}, Completed: ${stats.completedTasks}`);

class="hl-cmt">// Get a human-readable status string
const statusString = queue.formatStatus();
console.log(statusString);

class="hl-cmt">// Clear all tasks and lanes
queue.clear();

API Reference (from tests)


SessionLane and LaneManager Module Documentation

The SessionLane and LaneManager modules (src/concurrency/lanes.js) provide a more granular approach to managing task queues, particularly useful for scenarios where strict ordering and lifecycle management are required for individual "sessions" or entities.

SessionLane

A SessionLane represents a single, independent queue of tasks. It processes items sequentially (FIFO by default, or by priority) and provides detailed control over its lifecycle and events.

Purpose

SessionLane ensures that:

Key Features

Usage Patterns

Initialization and Processor Setup

A SessionLane is created, and its processing logic is defined using setProcessor.

import { SessionLane } from &#39;../../src/concurrency/lanes.js&#39;;

const lane = new SessionLane({ autoStart: false, maxQueueSize: 10 });
const results: string[] = [];

lane.setProcessor(async (item) => {
  class="hl-cmt">// Simulate async work
  await new Promise(r => setTimeout(r, 10));
  results.push(item.payload as string);
});

class="hl-cmt">// Listen for events
lane.on(&#39;complete&#39;, () => console.log(&#39;Task completed in lane&#39;));
lane.on(&#39;error&#39;, (err) => console.error(&#39;Lane error:&#39;, err.message));

Enqueuing and Processing Tasks

Tasks are enqueued with a payload. If autoStart is false, startProcessing() must be called.

lane.enqueue(&#39;task-1&#39;);
lane.enqueue(&#39;task-2&#39;, { priority: 5 }); class="hl-cmt">// For priority-based lanes
lane.enqueue(&#39;task-3&#39;);

lane.startProcessing(); class="hl-cmt">// Start processing if autoStart was false

await lane.drain(); class="hl-cmt">// Wait for all tasks in this lane to complete
expect(results).toEqual([&#39;task-1&#39;, &#39;task-2&#39;, &#39;task-3&#39;]); class="hl-cmt">// If FIFO

Lifecycle Management

lane.pause();
console.log(lane.getStatus()); class="hl-cmt">// &#39;paused&#39;

lane.resume();
console.log(lane.getStatus()); class="hl-cmt">// &#39;active&#39;

const clearedItems = lane.clear(); class="hl-cmt">// Clears all pending items

LaneManager

The LaneManager acts as a central registry and orchestrator for multiple SessionLane instances. It provides a singleton pattern and convenience methods for managing tasks across different lanes.

Purpose

LaneManager simplifies the management of multiple SessionLanes by:

Key Features

Usage Patterns

Accessing the Singleton and Managing Lanes

import { getLaneManager, resetLaneManager } from &#39;../../src/concurrency/lanes.js&#39;;

resetLaneManager(); class="hl-cmt">// For clean state in tests
const manager = getLaneManager();

class="hl-cmt">// Set a default processor for all new lanes
manager.setDefaultProcessor(async (item) => {
  console.log(`Processing item for lane ${item.laneName}: ${item.payload}`);
});

class="hl-cmt">// Get or create lanes
const lane1 = manager.getLane(&#39;session-1&#39;);
const lane2 = manager.getLane(&#39;session-2&#39;);

manager.enqueue(&#39;session-1&#39;, &#39;message-A&#39;);
manager.enqueue(&#39;session-2&#39;, &#39;message-B&#39;);

await manager.drainAll(); class="hl-cmt">// Wait for all tasks in all lanes to complete

Global Control

manager.pauseAll();
console.log(manager.getLane(&#39;session-1&#39;).getStatus()); class="hl-cmt">// &#39;paused&#39;

manager.resumeAll();
manager.clearAll(); class="hl-cmt">// Clears all tasks from all lanes and removes lanes

Monitoring

const stats = manager.getStats();
console.log(`Total lanes: ${stats.totalLanes}, Total pending: ${stats.totalPending}`);
console.log(stats.lanes[&#39;session-1&#39;]); class="hl-cmt">// Lane-specific stats

Higher-Order Utilities

The lanes module also provides convenience functions for integrating lane-based concurrency directly into your application logic.

withLane

Executes a given function within the context of a specific lane, ensuring its execution is ordered relative to other tasks in that lane.

import { withLane } from &#39;../../src/concurrency/lanes.js&#39;;

async function processData(sessionId: string, data: any) {
  return await withLane(sessionId, data, async (payload) => {
    class="hl-cmt">// This function will be enqueued and processed by the session&#39;s lane
    console.log(`Processing data for ${sessionId}: ${payload}`);
    await new Promise(r => setTimeout(r, 100));
    return `Processed: ${payload}`;
  });
}

class="hl-cmt">// Calls to processData for the same sessionId will be ordered
const p1 = processData(&#39;user-123&#39;, &#39;request-1&#39;);
const p2 = processData(&#39;user-123&#39;, &#39;request-2&#39;);
const p3 = processData(&#39;user-456&#39;, &#39;request-A&#39;); class="hl-cmt">// This will run in parallel with user-123 tasks

await Promise.all([p1, p2, p3]);

createLanedFunction

Creates a new function that automatically enqueues its calls into a specific lane based on a provided getLaneName resolver. This is useful for creating "laned" versions of existing functions.

import { createLanedFunction } from &#39;../../src/concurrency/lanes.js&#39;;

interface MyPayload { session: string; value: string; }

const myLanedProcessor = createLanedFunction<MyPayload, string>(
  (payload) => payload.session, class="hl-cmt">// Resolver to get the lane name from the payload
  async (payload) => {
    class="hl-cmt">// This is the actual processing logic
    await new Promise(r => setTimeout(r, 50));
    return `Processed ${payload.value} for ${payload.session}`;
  }
);

class="hl-cmt">// Calls to myLanedProcessor will be automatically routed to the correct lane
const r1 = myLanedProcessor({ session: &#39;user-X&#39;, value: &#39;data-1&#39; });
const r2 = myLanedProcessor({ session: &#39;user-X&#39;, value: &#39;data-2&#39; });
const r3 = myLanedProcessor({ session: &#39;user-Y&#39;, value: &#39;data-A&#39; });

await Promise.all([r1, r2, r3]);
class="hl-cmt">// r1 and r2 for &#39;user-X&#39; will be processed sequentially
class="hl-cmt">// r3 for &#39;user-Y&#39; will be processed in parallel

API Reference (from tests)

SessionLane

LaneManager

Utilities


Architectural Relationship

The SessionLane and LaneManager components form a cohesive system for managing ordered task execution within logical "sessions." The LaneManager acts as the central hub, providing access to and control over individual SessionLane instances. Higher-order utilities like withLane and createLanedFunction abstract away the direct interaction with LaneManager, making it easier to integrate lane-based concurrency into application logic.

The LaneQueue module, while sharing the "lane" concept, operates as a separate, parallel concurrency system. It focuses on global resource management and cross-lane concurrency limits, rather than the strict in-lane ordering and eventing provided by SessionLane.

graph TD
    subgraph Lane Management System
        A[LaneManager Singleton] --> B{getLane(name)}
        B --> C[SessionLane Instance 1]
        B --> D[SessionLane Instance 2]
        B --> E[...]

        F[withLane()] --> A
        G[createLanedFunction()] --> A
    end

    subgraph Global Concurrency System
        H[LaneQueue Instance]
    end

    style A fill:#f9f,stroke:#333,stroke-width:2px
    style H fill:#ccf,stroke:#333,stroke-width:2px

Explanation:

Contributing and Extending

When contributing to or extending these concurrency modules:

Always ensure that changes are thoroughly tested, especially considering the asynchronous and concurrent nature of these modules. The existing test suite provides a strong foundation for verifying correct behavior under various conditions.