alepha@docs:~/docs/reference/primitives$
cat $queue.md
2 min read

#$queue

#Import

typescript
1import { $queue } from "alepha/queue";

#Overview

Creates a queue primitive for asynchronous message processing with background workers.

The $queue primitive enables powerful asynchronous communication patterns in your application. It provides type-safe message queuing with automatic worker processing, making it perfect for decoupling components and handling background tasks efficiently.

Background Processing

  • Automatic worker threads for non-blocking message processing
  • Built-in retry mechanisms and error handling
  • Dead letter queues for failed message handling
  • Graceful shutdown and worker lifecycle management

Type Safety

  • Full TypeScript support with schema validation using TypeBox
  • Type-safe message payloads with automatic inference
  • Runtime validation of all queued messages
  • Compile-time errors for invalid message structures

Storage Flexibility

  • Memory provider for development and testing
  • Redis provider for production scalability and persistence
  • Custom provider support for specialized backends
  • Automatic failover and connection pooling

Performance & Scalability

  • Batch processing support for high-throughput scenarios
  • Horizontal scaling with distributed queue backends
  • Configurable concurrency and worker pools
  • Efficient serialization and message routing

Reliability

  • Message persistence across application restarts
  • Automatic retry with exponential backoff
  • Dead letter handling for permanently failed messages
  • Comprehensive logging and monitoring integration

#Options

Option Type Required Description
name string No Unique name for the queue
description string No Human-readable description of the queue's purpose
provider "memory" | Service<QueueProvider> No Queue storage provider configuration
schema T Yes TypeBox schema defining the structure of messages in this queue
handler Object No Message handler function that processes queue messages

#Examples

Basic notification queue

typescript
 1const emailQueue = $queue({ 2  name: "email-notifications", 3  schema: t.object({ 4    to: t.text(), 5    subject: t.text(), 6    body: t.text(), 7    priority: t.optional(t.enum(["high", "normal"])) 8  }), 9  handler: async (message) => {10    await emailService.send(message.payload);11    console.log(`Email sent to ${message.payload.to}`);12  }13});14 15// Push messages for background processing16await emailQueue.push({17  to: "[email protected]",18  subject: "Welcome!",19  body: "Welcome to our platform",20  priority: "high"21});

Batch processing with Redis

typescript
 1const imageQueue = $queue({ 2  name: "image-processing", 3  provider: RedisQueueProvider, 4  schema: t.object({ 5    imageId: t.text(), 6    operations: t.array(t.enum(["resize", "compress", "thumbnail"])) 7  }), 8  handler: async (message) => { 9    for (const op of message.payload.operations) {10      await processImage(message.payload.imageId, op);11    }12  }13});14 15// Batch processing multiple images16await imageQueue.push(17  { imageId: "img1", operations: ["resize", "thumbnail"] },18  { imageId: "img2", operations: ["compress"] },19  { imageId: "img3", operations: ["resize", "compress", "thumbnail"] }20);

Development with memory provider

typescript
 1const taskQueue = $queue({ 2  name: "dev-tasks", 3  provider: "memory", 4  schema: t.object({ 5    taskType: t.enum(["cleanup", "backup", "report"]), 6    data: t.record(t.text(), t.any()) 7  }), 8  handler: async (message) => { 9    switch (message.payload.taskType) {10      case "cleanup":11        await performCleanup(message.payload.data);12        break;13      case "backup":14        await createBackup(message.payload.data);15        break;16      case "report":17        await generateReport(message.payload.data);18        break;19    }20  }21});