Part of the alepha package. Import from alepha/queue.
npm install alepha
Provides asynchronous message queuing and processing capabilities through declarative queue descriptors.
The queue module enables reliable background job processing and message passing using the $queue descriptor
on class properties. It supports schema validation, automatic retries, and multiple queue backends for
building scalable, decoupled applications with robust error handling.
Primitives are functions that define and configure various aspects of your application. They follow the convention of starting with $ and return configured primitive instances.
For more details, see the Primitives documentation.
Creates a consumer primitive to process messages from a specific queue.
Provides a dedicated message consumer that connects to a queue and processes messages with custom handler logic, enabling scalable architectures where multiple consumers can process messages from the same queue.
Key Features
Common Use Cases
1class EmailService { 2 emailQueue = $queue({ 3 name: "emails", 4 schema: t.object({ 5 to: t.text(), 6 subject: t.text(), 7 body: t.text() 8 }) 9 });10 11 emailConsumer = $consumer({12 queue: this.emailQueue,13 handler: async (message) => {14 const { to, subject, body } = message.payload;15 await this.sendEmail(to, subject, body);16 }17 });18 19 async sendWelcomeEmail(userEmail: string) {20 await this.emailQueue.push({21 to: userEmail,22 subject: "Welcome!",23 body: "Thanks for joining."24 });25 }26}
Creates a queue primitive for asynchronous message processing with background workers.
The $queue primitive enables powerful asynchronous communication patterns in your application. It provides type-safe message queuing with automatic worker processing, making it perfect for decoupling components and handling background tasks efficiently.
Background Processing
Type Safety
Storage Flexibility
Performance & Scalability
Reliability
1const emailQueue = $queue({ 2 name: "email-notifications", 3 schema: t.object({ 4 to: t.text(), 5 subject: t.text(), 6 body: t.text(), 7 priority: t.optional(t.enum(["high", "normal"])) 8 }), 9 handler: async (message) => {10 await emailService.send(message.payload);11 console.log(`Email sent to ${message.payload.to}`);12 }13});14 15// Push messages for background processing16await emailQueue.push({17 to: "user@example.com",18 subject: "Welcome!",19 body: "Welcome to our platform",20 priority: "high"21});
1const imageQueue = $queue({ 2 name: "image-processing", 3 provider: RedisQueueProvider, 4 schema: t.object({ 5 imageId: t.text(), 6 operations: t.array(t.enum(["resize", "compress", "thumbnail"])) 7 }), 8 handler: async (message) => { 9 for (const op of message.payload.operations) {10 await processImage(message.payload.imageId, op);11 }12 }13});14 15// Batch processing multiple images16await imageQueue.push(17 { imageId: "img1", operations: ["resize", "thumbnail"] },18 { imageId: "img2", operations: ["compress"] },19 { imageId: "img3", operations: ["resize", "compress", "thumbnail"] }20);
1const taskQueue = $queue({ 2 name: "dev-tasks", 3 provider: "memory", 4 schema: t.object({ 5 taskType: t.enum(["cleanup", "backup", "report"]), 6 data: t.record(t.text(), t.any()) 7 }), 8 handler: async (message) => { 9 switch (message.payload.taskType) {10 case "cleanup":11 await performCleanup(message.payload.data);12 break;13 case "backup":14 await createBackup(message.payload.data);15 break;16 case "report":17 await generateReport(message.payload.data);18 break;19 }20 }21});