alepha@docs:~/docs/reference/primitives$
cat $queue.md2 min read
#$queue
#Import
typescript
1import { $queue } from "alepha/queue";
#Overview
Creates a queue primitive for asynchronous message processing with background workers.
The $queue primitive enables powerful asynchronous communication patterns in your application. It provides type-safe message queuing with automatic worker processing, making it perfect for decoupling components and handling background tasks efficiently.
Background Processing
- Automatic worker threads for non-blocking message processing
- Built-in retry mechanisms and error handling
- Dead letter queues for failed message handling
- Graceful shutdown and worker lifecycle management
Type Safety
- Full TypeScript support with schema validation using TypeBox
- Type-safe message payloads with automatic inference
- Runtime validation of all queued messages
- Compile-time errors for invalid message structures
Storage Flexibility
- Memory provider for development and testing
- Redis provider for production scalability and persistence
- Custom provider support for specialized backends
- Automatic failover and connection pooling
Performance & Scalability
- Batch processing support for high-throughput scenarios
- Horizontal scaling with distributed queue backends
- Configurable concurrency and worker pools
- Efficient serialization and message routing
Reliability
- Message persistence across application restarts
- Automatic retry with exponential backoff
- Dead letter handling for permanently failed messages
- Comprehensive logging and monitoring integration
#Options
| Option | Type | Required | Description |
|---|---|---|---|
name |
string |
No | Unique name for the queue |
description |
string |
No | Human-readable description of the queue's purpose |
provider |
"memory" | Service<QueueProvider> |
No | Queue storage provider configuration |
schema |
T |
Yes | TypeBox schema defining the structure of messages in this queue |
handler |
Object |
No | Message handler function that processes queue messages |
#Examples
Basic notification queue
typescript
1const emailQueue = $queue({ 2 name: "email-notifications", 3 schema: t.object({ 4 to: t.text(), 5 subject: t.text(), 6 body: t.text(), 7 priority: t.optional(t.enum(["high", "normal"])) 8 }), 9 handler: async (message) => {10 await emailService.send(message.payload);11 console.log(`Email sent to ${message.payload.to}`);12 }13});14 15// Push messages for background processing16await emailQueue.push({17 to: "[email protected]",18 subject: "Welcome!",19 body: "Welcome to our platform",20 priority: "high"21});
Batch processing with Redis
typescript
1const imageQueue = $queue({ 2 name: "image-processing", 3 provider: RedisQueueProvider, 4 schema: t.object({ 5 imageId: t.text(), 6 operations: t.array(t.enum(["resize", "compress", "thumbnail"])) 7 }), 8 handler: async (message) => { 9 for (const op of message.payload.operations) {10 await processImage(message.payload.imageId, op);11 }12 }13});14 15// Batch processing multiple images16await imageQueue.push(17 { imageId: "img1", operations: ["resize", "thumbnail"] },18 { imageId: "img2", operations: ["compress"] },19 { imageId: "img3", operations: ["resize", "compress", "thumbnail"] }20);
Development with memory provider
typescript
1const taskQueue = $queue({ 2 name: "dev-tasks", 3 provider: "memory", 4 schema: t.object({ 5 taskType: t.enum(["cleanup", "backup", "report"]), 6 data: t.record(t.text(), t.any()) 7 }), 8 handler: async (message) => { 9 switch (message.payload.taskType) {10 case "cleanup":11 await performCleanup(message.payload.data);12 break;13 case "backup":14 await createBackup(message.payload.data);15 break;16 case "report":17 await generateReport(message.payload.data);18 break;19 }20 }21});