src — concurrency

Module: src-concurrency Cohesion: 0.80 Members: 0

src — concurrency

The src/concurrency module provides essential utilities for managing asynchronous operations, ensuring order, preventing race conditions, and applying backpressure in multi-session or multi-channel environments. It offers two distinct, yet complementary, concurrency control systems:

  1. Session Lanes (SessionLane, LaneManager): A simpler, session-based queueing system primarily focused on ensuring strict sequential processing of operations for a given session.
  2. Lane Queue (LaneQueue): An "Enterprise-grade" system that implements a "Default Serial, Explicit Parallel" pattern, offering more granular control over task execution within a session, including explicit parallel execution, retries, and timeouts.

This documentation will detail each system, its purpose, how it works, and how to use it effectively.


1. Session Lanes (FIFO/Priority Queues per Session)

The SessionLane and LaneManager system is designed to prevent message interleaving and ensure that operations related to a specific session are processed in a defined order (FIFO or by priority). This is crucial for maintaining state consistency when multiple requests for the same session might be in flight simultaneously.

1.1 Core Concepts

1.2 How it Works

  1. Lane Creation: When an operation needs to be processed for a sessionId, LaneManager.getLane(sessionId) is called. If a lane for that session doesn't exist, a new SessionLane is created and registered.
  2. Processor Assignment: A processor function (an async function that takes a LaneItem and returns a Promise) must be set for the SessionLane. This function defines how each item in the lane will be processed. The LaneManager can also set a defaultProcessor for all new lanes.
  3. Enqueueing: SessionLane.enqueue(payload, options) adds a new LaneItem to the lane's internal queue. Items are ordered either strictly FIFO (config.fifo: true) or by priority (lower number = higher priority).
  4. Processing Loop:

  1. Status and Events: SessionLane emits events (enqueue, start, complete, error, drain, pause, resume) to signal lifecycle changes of items and the lane itself. It also exposes its status (idle, processing, paused, draining) and queue length.

Simplified Architecture

graph TD
    A[LaneManager] --> B{SessionLane (sessionId)};
    B -- Manages --> C[Queue: LaneItem<T>];
    C -- Processes --> D[Processor: (item: LaneItem<T>) => Promise<unknown>];
    D -- Emits Events --> B;

1.3 Key Components & Usage

SessionLane Class

LaneManager Class

Singleton Access

Convenience Functions

1.4 Example Usage

import { getLaneManager, createLanedFunction } from &#39;./concurrency/lanes.js&#39;;

interface UserAction {
  userId: string;
  action: string;
  data: any;
}

class="hl-cmt">// 1. Using LaneManager directly
const manager = getLaneManager<UserAction>();
manager.setDefaultProcessor(async (item) => {
  console.log(`[${item.id}] Processing action for user ${item.payload.userId}: ${item.payload.action}`);
  await new Promise(resolve => setTimeout(resolve, 100)); class="hl-cmt">// Simulate async work
  return `Processed: ${item.payload.action}`;
});

manager.enqueue(&#39;user-123&#39;, { userId: &#39;user-123&#39;, action: &#39;updateProfile&#39;, data: { name: &#39;Alice&#39; } });
manager.enqueue(&#39;user-456&#39;, { userId: &#39;user-456&#39;, action: &#39;fetchData&#39;, data: {} });
manager.enqueue(&#39;user-123&#39;, { userId: &#39;user-123&#39;, action: &#39;sendNotification&#39;, data: { message: &#39;Welcome!&#39; } });
class="hl-cmt">// Operations for user-123 will be processed in order.

class="hl-cmt">// 2. Using createLanedFunction for a reusable API
const processUserAction = createLanedFunction<UserAction, string>(
  (payload) => payload.userId, class="hl-cmt">// How to get the session ID from the payload
  async (payload) => {
    console.log(`[LanedFn] Processing action for user ${payload.userId}: ${payload.action}`);
    await new Promise(resolve => setTimeout(resolve, 50));
    return `Completed: ${payload.action}`;
  }
);

async function runLanedFunctionExample() {
  const p1 = processUserAction({ userId: &#39;user-789&#39;, action: &#39;login&#39;, data: {} });
  const p2 = processUserAction({ userId: &#39;user-101&#39;, action: &#39;viewPage&#39;, data: { page: &#39;/home&#39; } });
  const p3 = processUserAction({ userId: &#39;user-789&#39;, action: &#39;logout&#39;, data: {} });

  await Promise.all([p1, p2, p3]);
  console.log(&#39;All laned functions completed.&#39;);
}

runLanedFunctionExample();

2. Lane Queue (Default Serial, Explicit Parallel)

The LaneQueue system provides a more sophisticated concurrency model, Advanced enterprise architecture for "Default Serial, Explicit Parallel" pattern. This means that tasks within a session's lane execute serially by default, but can be explicitly marked as parallel if they are safe to run concurrently with other parallel tasks in the same lane. This is ideal for scenarios where some operations (e.g., writes, state modifications) must be strictly ordered, while others (e.g., reads, idempotent operations) can benefit from parallel execution.

2.1 Core Concepts

2.2 How it Works

  1. Enqueueing: LaneQueue.enqueue(laneId, fn, options) creates a Task object and adds it to the pending queue of the specified laneId. Tasks are sorted by priority (higher value = higher priority).
  2. Lane Processing (processLane):

  1. Task Selection (getNextTasks): This is the core logic for "Default Serial, Explicit Parallel":

  1. Task Execution (executeTask):

  1. Events and Stats: LaneQueue emits events (task:enqueued, task:started, task:completed, task:failed, task:cancelled, lane:created, lane:paused, lane:resumed, lane:drained) for detailed monitoring. It also maintains LaneStats for each lane and global statistics.

Simplified Architecture

graph TD
    A[LaneQueue] --> B{Lane (laneId)};
    B -- Enqueues --> C[Pending Tasks (sorted by priority)];
    B -- Runs --> D[Running Tasks];
    C -- getNextTasks --> D;
    D -- executeTask --> E[Task fn() with timeout/retries];
    E -- Emits Events --> A;

2.3 Key Components & Usage

LaneQueue Class

Singleton Access

2.4 Example Usage

import { getLaneQueue } from &#39;./concurrency/lane-queue.js&#39;;

const queue = getLaneQueue({ maxParallel: 2 }); class="hl-cmt">// Allow up to 2 parallel tasks per lane

async function runLaneQueueExample() {
  console.log(&#39;--- LaneQueue Example ---&#39;);

  class="hl-cmt">// Serial task (default) - will block other tasks in &#39;session-A&#39;
  const serialTask = queue.enqueue(&#39;session-A&#39;, async () => {
    console.log(&#39;[session-A] Serial task START&#39;);
    await new Promise(resolve => setTimeout(resolve, 200));
    console.log(&#39;[session-A] Serial task END&#39;);
    return &#39;Serial Done&#39;;
  });

  class="hl-cmt">// Parallel tasks - will run concurrently if no serial task is active
  const parallelTask1 = queue.enqueue(&#39;session-A&#39;, async () => {
    console.log(&#39;[session-A] Parallel 1 START&#39;);
    await new Promise(resolve => setTimeout(resolve, 100));
    console.log(&#39;[session-A] Parallel 1 END&#39;);
    return &#39;Parallel 1 Done&#39;;
  }, { parallel: true });

  const parallelTask2 = queue.enqueue(&#39;session-A&#39;, async () => {
    console.log(&#39;[session-A] Parallel 2 START&#39;);
    await new Promise(resolve => setTimeout(resolve, 150));
    console.log(&#39;[session-A] Parallel 2 END&#39;);
    return &#39;Parallel 2 Done&#39;;
  }, { parallel: true });

  const parallelTask3 = queue.enqueue(&#39;session-A&#39;, async () => {
    console.log(&#39;[session-A] Parallel 3 START (will wait for one of the first two to finish)&#39;);
    await new Promise(resolve => setTimeout(resolve, 50));
    console.log(&#39;[session-A] Parallel 3 END&#39;);
    return &#39;Parallel 3 Done&#39;;
  }, { parallel: true });

  class="hl-cmt">// Task for another session - runs independently
  const otherSessionTask = queue.enqueue(&#39;session-B&#39;, async () => {
    console.log(&#39;[session-B] Independent task START&#39;);
    await new Promise(resolve => setTimeout(resolve, 100));
    console.log(&#39;[session-B] Independent task END&#39;);
    return &#39;Session B Done&#39;;
  });

  try {
    const results = await Promise.all([
      serialTask,
      parallelTask1,
      parallelTask2,
      parallelTask3,
      otherSessionTask
    ]);
    console.log(&#39;All tasks completed:&#39;, results);
  } catch (error) {
    console.error(&#39;A task failed:&#39;, error);
  }

  console.log(&#39;\n&#39; + queue.formatStatus());
}

runLaneQueueExample();

3. Choosing the Right Concurrency System

4. Integration with the Codebase

Both concurrency systems are designed as foundational utilities.

This module serves as a robust foundation for managing complex asynchronous workflows, providing developers with the tools to build reliable and performant concurrent systems.