src — queue

Module: src-queue Cohesion: 0.80 Members: 0

src — queue

The src/queue module provides a robust and flexible set of queue implementations designed for asynchronous task processing, supporting priority-based ordering and disk persistence. It's a foundational component for managing background tasks, event processing, or any scenario requiring reliable, ordered execution of data.

Module Overview

The queue module is built around an inheritance hierarchy, starting with a basic FIFO queue and extending it with priority and persistence capabilities. It also includes a singleton pattern for easy global access to queue instances.

Key Capabilities:

Architecture Diagram

The module's core components are structured as an inheritance chain, allowing for progressive enhancement of queue functionality:

classDiagram
    direction LR
    class Queue {
        +enqueue(data: T): QueueItem<T>
        +dequeue(): QueueItem<T>
        +setProcessor(processor: (item: T) => Promise<unknown>): void
        +processQueue(): Promise<void>
        +getStats(): QueueStats
        +dispose(): void
    }
    class PriorityQueue {
        +enqueuePriority(data: T, priority: PriorityLevel): PriorityItem<T>
        +updatePriority(id: string, priority: PriorityLevel): boolean
        -insertByPriority(item: PriorityItem<T>): void
        -applyFairScheduling(): void
    }
    class PersistentQueue {
        +load(): boolean
        +save(): boolean
        +backup(): string | null
        +restoreFromBackup(path: string): boolean
        -markDirty(): void
        -startAutoSave(): void
    }
    class QueueSingleton {
        +getQueue(): Queue
        +getPriorityQueue(): PriorityQueue
        +getPersistentQueue(): PersistentQueue
        +resetQueues(): void
    }

    Queue <|-- PriorityQueue : extends
    PriorityQueue <|-- PersistentQueue : extends
    QueueSingleton ..> Queue : manages
    QueueSingleton ..> PriorityQueue : manages
    QueueSingleton ..> PersistentQueue : manages

Core Queue Implementations

1. Queue (Base FIFO Queue)

The Queue class is the fundamental building block, providing a generic FIFO (First-In, First-Out) queue with asynchronous processing capabilities.

Purpose: To manage a list of tasks or data items, process them sequentially or concurrently, and handle retries for failed operations.

Key Features:

Usage Pattern:

  1. Instantiate: const myQueue = new Queue(options);
  2. Set Processor: Define how items are processed:
    myQueue.setProcessor(async (taskData: MyTaskType) => {
      class="hl-cmt">// Perform async operation with taskData
      console.log(`Processing task: ${taskData.id}`);
      await someExternalService.doWork(taskData);
      return &#39;success&#39;;
    });

  1. Enqueue Items: Add tasks to the queue:
    myQueue.enqueue({ id: &#39;task1&#39;, payload: &#39;data&#39; });
    myQueue.enqueueMany([{ id: &#39;task2&#39;, payload: &#39;more data&#39; }, { id: &#39;task3&#39;, payload: &#39;even more&#39; }]);

  1. Listen to Events: React to queue activity:
    myQueue.on(&#39;processed&#39;, (item, result) => console.log(`Item ${item.id} processed: ${result}`));
    myQueue.on(&#39;error&#39;, (item, error) => console.error(`Item ${item.id} failed: ${error.message}`));
    myQueue.on(&#39;drain&#39;, () => console.log(&#39;Queue is empty and all items processed.&#39;));

  1. Manual Processing (if autoProcess is false):
    await myQueue.processQueue();

Core Methods & Properties:

Execution Flow (autoProcess enabled):

  1. enqueue() is called.
  2. If options.autoProcess is true and a processor is set, processQueue() is invoked.
  3. processQueue() manages concurrency, repeatedly calling dequeue() and processItem().
  4. processItem() executes the processor function. On success, processedCount is incremented. On failure, if item.attempts < options.maxRetries, the item is re-enqueued (using items.unshift(item)) after a retryDelay. Otherwise, failedCount is incremented.

2. PriorityQueue

The PriorityQueue class extends Queue, introducing the concept of item priority.

Purpose: To ensure that more important tasks are processed before less important ones, even if they were enqueued later.

Key Features:

Usage Pattern:

  1. Instantiate: const myPriorityQueue = new PriorityQueue(options);
  2. Enqueue with Priority:
    myPriorityQueue.enqueuePriority({ id: &#39;urgent&#39;, payload: &#39;critical data&#39; }, &#39;critical&#39;);
    myPriorityQueue.enqueuePriority({ id: &#39;background&#39;, payload: &#39;low priority&#39; }, &#39;low&#39;);
    myPriorityQueue.enqueue({ id: &#39;default&#39;, payload: &#39;normal priority&#39; }); class="hl-cmt">// Uses defaultPriority

  1. Manage Priorities:
    myPriorityQueue.updatePriority(&#39;background&#39;, &#39;normal&#39;);
    myPriorityQueue.escalate(&#39;background&#39;); class="hl-cmt">// Boosts to &#39;high&#39;

  1. Get Priority-Specific Stats:
    const priorityStats = myPriorityQueue.getPriorityStats();
    console.log(priorityStats.byPriority.critical); class="hl-cmt">// Count of critical items

Core Methods & Properties:

3. PersistentQueue

The PersistentQueue class extends PriorityQueue, adding the crucial ability to save and load the queue's state to/from disk.

Purpose: To provide durable queues that can survive application restarts, ensuring no tasks are lost and processing can resume from where it left off.

Key Features:

Usage Pattern:

  1. Instantiate:
    const myPersistentQueue = new PersistentQueue<MyTaskType>({
      filename: &#39;my-app-tasks.json&#39;,
      autoSave: true,
      storageDir: &#39;/var/lib/my-app/queues&#39;
    });
    class="hl-cmt">// Upon instantiation, it will automatically try to load from &#39;my-app-tasks.json&#39;

  1. Operations: All PriorityQueue operations (enqueue, dequeue, updatePriority) will now automatically trigger a save if autoSave is enabled or mark the queue as dirty for interval saving.
  2. Manual Save/Load:
    myPersistentQueue.save();
    myPersistentQueue.load();

  1. Backup/Restore:
    const backupPath = myPersistentQueue.backup();
    if (backupPath) console.log(`Backup created at: ${backupPath}`);

    const latestBackup = myPersistentQueue.listBackups()[0];
    if (latestBackup) myPersistentQueue.restoreFromBackup(latestBackup);

  1. Cleanup:
    myPersistentQueue.deleteStorage(); class="hl-cmt">// Deletes the main queue file
    myPersistentQueue.dispose(); class="hl-cmt">// Stops auto-save, saves if dirty, clears in-memory queue

Core Methods & Properties:

Persistence Format (SerializedQueue): The queue data is stored as a JSON object with a version, createdAt, lastSavedAt, an array of items (each SerializedQueueItem), and stats. This structured format allows for potential future migrations and easier debugging.

Singleton Management (src/queue/queue-singleton.ts)

This module provides a convenient way to access globally shared instances of Queue, PriorityQueue, and PersistentQueue. This is useful for scenarios where different parts of an application need to interact with the same queue instance without explicitly passing it around.

Key Functions:

Usage:

import { getPersistentQueue, resetQueues } from &#39;./queue-singleton&#39;;

class="hl-cmt">// Get the global persistent queue instance
const taskQueue = getPersistentQueue<MyTaskType>({
  filename: &#39;global-tasks.json&#39;,
  autoSave: true,
});

taskQueue.enqueuePriority({ id: &#39;global-task-1&#39;, data: &#39;important&#39; }, &#39;high&#39;);

class="hl-cmt">// Later, in another part of the application
const anotherReference = getPersistentQueue<MyTaskType>(); class="hl-cmt">// Gets the *same* instance
anotherReference.enqueue({ id: &#39;global-task-2&#39;, data: &#39;less important&#39; });

class="hl-cmt">// When shutting down or for testing
resetQueues();

Integration and Dependencies

Contributing and Extending