src — queue
src — queue
The src/queue module provides a robust and flexible set of queue implementations designed for asynchronous task processing, supporting priority-based ordering and disk persistence. It's a foundational component for managing background tasks, event processing, or any scenario requiring reliable, ordered execution of data.
Module Overview
The queue module is built around an inheritance hierarchy, starting with a basic FIFO queue and extending it with priority and persistence capabilities. It also includes a singleton pattern for easy global access to queue instances.
Key Capabilities:
- Basic FIFO Queue (
Queue): Handles simple first-in, first-out task processing with configurable concurrency, retries, and event emission. - Priority Queue (
PriorityQueue): Extends the basic queue to process items based on defined priority levels (low, normal, high, critical), including fair scheduling to prevent starvation. - Persistent Queue (
PersistentQueue): Further extends the priority queue with file-system persistence, ensuring queue state survives application restarts and providing backup/restore functionalities. - Singleton Access: Provides convenient global access to pre-configured queue instances.
Architecture Diagram
The module's core components are structured as an inheritance chain, allowing for progressive enhancement of queue functionality:
classDiagram
direction LR
class Queue {
+enqueue(data: T): QueueItem<T>
+dequeue(): QueueItem<T>
+setProcessor(processor: (item: T) => Promise<unknown>): void
+processQueue(): Promise<void>
+getStats(): QueueStats
+dispose(): void
}
class PriorityQueue {
+enqueuePriority(data: T, priority: PriorityLevel): PriorityItem<T>
+updatePriority(id: string, priority: PriorityLevel): boolean
-insertByPriority(item: PriorityItem<T>): void
-applyFairScheduling(): void
}
class PersistentQueue {
+load(): boolean
+save(): boolean
+backup(): string | null
+restoreFromBackup(path: string): boolean
-markDirty(): void
-startAutoSave(): void
}
class QueueSingleton {
+getQueue(): Queue
+getPriorityQueue(): PriorityQueue
+getPersistentQueue(): PersistentQueue
+resetQueues(): void
}
Queue <|-- PriorityQueue : extends
PriorityQueue <|-- PersistentQueue : extends
QueueSingleton ..> Queue : manages
QueueSingleton ..> PriorityQueue : manages
QueueSingleton ..> PersistentQueue : manages
Core Queue Implementations
1. Queue (Base FIFO Queue)
The Queue class is the fundamental building block, providing a generic FIFO (First-In, First-Out) queue with asynchronous processing capabilities.
Purpose: To manage a list of tasks or data items, process them sequentially or concurrently, and handle retries for failed operations.
Key Features:
- FIFO Ordering: Items are processed in the order they are enqueued.
- Asynchronous Processing: Supports a
processorfunction that returns aPromise, allowing for non-blocking task execution. - Concurrency Control: Configurable
concurrencyfor parallel processing of items. - Retry Mechanism: Items can be retried up to
maxRetriestimes with aretryDelayif the processor fails. - Event-Driven: Emits events for various lifecycle stages (
enqueue,dequeue,process,processed,error,retry,full,empty,drain). - Statistics: Tracks
processedCount,failedCount, andavgProcessingTime.
Usage Pattern:
- Instantiate:
const myQueue = new Queue(options); - Set Processor: Define how items are processed:
myQueue.setProcessor(async (taskData: MyTaskType) => {
class="hl-cmt">// Perform async operation with taskData
console.log(`Processing task: ${taskData.id}`);
await someExternalService.doWork(taskData);
return 39;success39;;
});
- Enqueue Items: Add tasks to the queue:
myQueue.enqueue({ id: 39;task139;, payload: 39;data39; });
myQueue.enqueueMany([{ id: 39;task239;, payload: 39;more data39; }, { id: 39;task339;, payload: 39;even more39; }]);
- Listen to Events: React to queue activity:
myQueue.on(39;processed39;, (item, result) => console.log(`Item ${item.id} processed: ${result}`));
myQueue.on(39;error39;, (item, error) => console.error(`Item ${item.id} failed: ${error.message}`));
myQueue.on(39;drain39;, () => console.log(39;Queue is empty and all items processed.39;));
- Manual Processing (if
autoProcessis false):
await myQueue.processQueue();
Core Methods & Properties:
constructor(options?: QueueOptions): Initializes the queue.enqueue(data: T, metadata?: Record: Adds an item.): QueueItem | null dequeue(): QueueItem: Removes and returns the next item.| undefined setProcessor(processor: (item: T) => Promise: Sets the async function to process items.): void processQueue(): Promise: Starts processing items. Automatically called ifautoProcessis true and a processor is set.processItem(item: QueueItem(protected): Handles individual item processing, including retries.): Promise getStats(): QueueStats: Returns current queue statistics.dispose(): void: Clears the queue and removes all event listeners.
Execution Flow (autoProcess enabled):
enqueue()is called.- If
options.autoProcessis true and aprocessoris set,processQueue()is invoked. processQueue()manages concurrency, repeatedly callingdequeue()andprocessItem().processItem()executes theprocessorfunction. On success,processedCountis incremented. On failure, ifitem.attempts < options.maxRetries, the item is re-enqueued (usingitems.unshift(item)) after aretryDelay. Otherwise,failedCountis incremented.
2. PriorityQueue
The PriorityQueue class extends Queue, introducing the concept of item priority.
Purpose: To ensure that more important tasks are processed before less important ones, even if they were enqueued later.
Key Features:
- Priority Levels: Supports
low,normal,high, andcriticalpriority levels. - Priority-based Ordering: Items are stored and dequeued based on their
priorityValue. - Fair Scheduling: An optional mechanism (
fairScheduling: true) that boosts the priority of items that have been waiting for too long (maxWaitTime), preventing starvation of lower-priority tasks. - Priority Management: Methods to
updatePriority,escalate, anddeescalatean item's priority.
Usage Pattern:
- Instantiate:
const myPriorityQueue = new PriorityQueue(options); - Enqueue with Priority:
myPriorityQueue.enqueuePriority({ id: 39;urgent39;, payload: 39;critical data39; }, 39;critical39;);
myPriorityQueue.enqueuePriority({ id: 39;background39;, payload: 39;low priority39; }, 39;low39;);
myPriorityQueue.enqueue({ id: 39;default39;, payload: 39;normal priority39; }); class="hl-cmt">// Uses defaultPriority
- Manage Priorities:
myPriorityQueue.updatePriority(39;background39;, 39;normal39;);
myPriorityQueue.escalate(39;background39;); class="hl-cmt">// Boosts to 39;high39;
- Get Priority-Specific Stats:
const priorityStats = myPriorityQueue.getPriorityStats();
console.log(priorityStats.byPriority.critical); class="hl-cmt">// Count of critical items
Core Methods & Properties:
constructor(options?: PriorityQueueOptions): Initializes the priority queue.enqueuePriority(data: T, priority?: PriorityLevel, metadata?: Record: Adds an item with a specified priority. Overrides): PriorityItem | null enqueueto usedefaultPriority.insertByPriority(item: PriorityItem(protected): Inserts an item into the) itemsarray at the correct position based on itspriorityValue. This method also callsapplyFairScheduling()if enabled.applyFairScheduling()(protected): Iterates through items and boostspriorityValuefor those exceedingmaxWaitTime.updatePriority(id: string, priority: PriorityLevel): boolean: Changes an item's priority and re-sorts the queue.escalate(id: string): boolean: Increases an item's priority by one level.deescalate(id: string): boolean: Decreases an item's priority by one level.getPriorityStats(): Returns statistics specific to priority levels.
3. PersistentQueue
The PersistentQueue class extends PriorityQueue, adding the crucial ability to save and load the queue's state to/from disk.
Purpose: To provide durable queues that can survive application restarts, ensuring no tasks are lost and processing can resume from where it left off.
Key Features:
- File-based Persistence: Stores queue items and basic stats in a JSON file on the local filesystem.
- Auto-Save: Can be configured to automatically save the queue state on every modification (
autoSave: true) or at a fixedsaveInterval. - Recovery: Automatically loads the queue state from disk upon instantiation.
- Backup & Restore: Provides methods to create backups and restore from them.
- Configurable Storage: Allows specifying
storageDirandfilename.
Usage Pattern:
- Instantiate:
const myPersistentQueue = new PersistentQueue<MyTaskType>({
filename: 39;my-app-tasks.json39;,
autoSave: true,
storageDir: 39;/var/lib/my-app/queues39;
});
class="hl-cmt">// Upon instantiation, it will automatically try to load from 39;my-app-tasks.json39;
- Operations: All
PriorityQueueoperations (enqueue, dequeue, updatePriority) will now automatically trigger a save ifautoSaveis enabled or mark the queue as dirty for interval saving. - Manual Save/Load:
myPersistentQueue.save();
myPersistentQueue.load();
- Backup/Restore:
const backupPath = myPersistentQueue.backup();
if (backupPath) console.log(`Backup created at: ${backupPath}`);
const latestBackup = myPersistentQueue.listBackups()[0];
if (latestBackup) myPersistentQueue.restoreFromBackup(latestBackup);
- Cleanup:
myPersistentQueue.deleteStorage(); class="hl-cmt">// Deletes the main queue file
myPersistentQueue.dispose(); class="hl-cmt">// Stops auto-save, saves if dirty, clears in-memory queue
Core Methods & Properties:
constructor(options?: PersistentQueueOptions): Initializes the persistent queue. CallsensureStorageDir(),load(), andstartAutoSave().ensureStorageDir()(protected): Creates thestorageDirif it doesn't exist.load(): boolean: Reads and deserializes queue data fromstorageFilePath.save(): boolean: Serializes and writes current queue data tostorageFilePath.startAutoSave()(protected): Initiates thesetIntervalfor periodic saving ifautoSaveis false andsaveIntervalis set.stopAutoSave()(protected): Clears the auto-save timer.markDirty()(protected): Sets an internal flag indicating the queue needs saving. IfautoSaveis true, it immediately callssave().- Overrides:
enqueuePriority,dequeue,removeById,clear,updatePriorityall callmarkDirty()after their superclass operation. deleteStorage(): boolean: Deletes the queue's persistence file.backup(): string | null: Creates a timestamped backup of the queue.restoreFromBackup(backupPath: string): boolean: Loads queue state from a specified backup file.dispose(): Overrides the basedisposeto stop auto-save and perform a final save if dirty.
Persistence Format (SerializedQueue):
The queue data is stored as a JSON object with a version, createdAt, lastSavedAt, an array of items (each SerializedQueueItem), and stats. This structured format allows for potential future migrations and easier debugging.
Singleton Management (src/queue/queue-singleton.ts)
This module provides a convenient way to access globally shared instances of Queue, PriorityQueue, and PersistentQueue. This is useful for scenarios where different parts of an application need to interact with the same queue instance without explicitly passing it around.
Key Functions:
getQueue: Returns a singleton instance of(options?: QueueOptions): Queue Queue. If called for the first time, it initializes it with the provided options.getPriorityQueue: Returns a singleton instance of(options?: PriorityQueueOptions): PriorityQueue PriorityQueue.getPersistentQueue: Returns a singleton instance of(options?: PersistentQueueOptions): PersistentQueue PersistentQueue.resetQueues(): void: Disposes of all singleton queue instances and sets them tonull. This is primarily useful for testing or when a complete reset of the queue system is required.getQueuesSummary(): Provides a snapshot of statistics for all initialized singleton queues.
Usage:
import { getPersistentQueue, resetQueues } from 39;./queue-singleton39;;
class="hl-cmt">// Get the global persistent queue instance
const taskQueue = getPersistentQueue<MyTaskType>({
filename: 39;global-tasks.json39;,
autoSave: true,
});
taskQueue.enqueuePriority({ id: 39;global-task-139;, data: 39;important39; }, 39;high39;);
class="hl-cmt">// Later, in another part of the application
const anotherReference = getPersistentQueue<MyTaskType>(); class="hl-cmt">// Gets the *same* instance
anotherReference.enqueue({ id: 39;global-task-239;, data: 39;less important39; });
class="hl-cmt">// When shutting down or for testing
resetQueues();
Integration and Dependencies
events(Node.js built-in): All queue classes extendEventEmitterto provide a rich eventing system for monitoring queue activity.fs,path,os(Node.js built-ins): Used byPersistentQueuefor file system operations (reading, writing, creating directories, determining home directory for default storage).logger(../utils/logger.js): Used byPersistentQueueto log warnings, e.g., for queue version mismatches during loading.
Contributing and Extending
- Extending Functionality: The inheritance model makes it straightforward to extend existing queue types. For example, you could create a
NetworkQueuethat extendsPersistentQueueto synchronize its state with a remote server. - Custom Processors: The
setProcessormethod is the primary extension point for defining custom task logic. - Event Listeners: Leverage the extensive event system (
on,once,emit) to integrate queue events into other parts of the application (e.g., UI updates, notifications, metrics collection). - Persistence Format: If the
SerializedQueueformat needs to change in the future, ensure backward compatibility or implement migration logic within theload()method ofPersistentQueue. TheQUEUE_VERSIONconstant is provided for this purpose. - Performance: For very high-throughput scenarios, consider the impact of
fairScheduling(which re-sorts the queue) andautoSave(which performs disk I/O on every modification). AdjustsaveIntervalandconcurrencyas needed.