alepha@docs:~/docs/packages/alepha/queue$
cat core.md
3 min read
Last commit:

#Alepha - Queue

#Installation

Part of the alepha package. Import from alepha/queue.

npm install alepha

#Overview

Provides asynchronous message queuing and processing capabilities through declarative queue descriptors.

The queue module enables reliable background job processing and message passing using the $queue descriptor on class properties. It supports schema validation, automatic retries, and multiple queue backends for building scalable, decoupled applications with robust error handling.

#API Reference

#Primitives

Primitives are functions that define and configure various aspects of your application. They follow the convention of starting with $ and return configured primitive instances.

For more details, see the Primitives documentation.

#$consumer()

Creates a consumer primitive to process messages from a specific queue.

Provides a dedicated message consumer that connects to a queue and processes messages with custom handler logic, enabling scalable architectures where multiple consumers can process messages from the same queue.

Key Features

  • Seamless integration with any $queue primitive
  • Full type safety inherited from queue schema
  • Automatic worker management for background processing
  • Built-in error handling and retry mechanisms
  • Support for multiple consumers per queue for horizontal scaling

Common Use Cases

  • Email sending and notification services
  • Image and media processing workers
  • Data synchronization and background jobs
ts
 1class EmailService { 2  emailQueue = $queue({ 3    name: "emails", 4    schema: t.object({ 5      to: t.text(), 6      subject: t.text(), 7      body: t.text() 8    }) 9  });10 11  emailConsumer = $consumer({12    queue: this.emailQueue,13    handler: async (message) => {14      const { to, subject, body } = message.payload;15      await this.sendEmail(to, subject, body);16    }17  });18 19  async sendWelcomeEmail(userEmail: string) {20    await this.emailQueue.push({21      to: userEmail,22      subject: "Welcome!",23      body: "Thanks for joining."24    });25  }26}

#$queue()

Creates a queue primitive for asynchronous message processing with background workers.

The $queue primitive enables powerful asynchronous communication patterns in your application. It provides type-safe message queuing with automatic worker processing, making it perfect for decoupling components and handling background tasks efficiently.

Background Processing

  • Automatic worker threads for non-blocking message processing
  • Built-in retry mechanisms and error handling
  • Dead letter queues for failed message handling
  • Graceful shutdown and worker lifecycle management

Type Safety

  • Full TypeScript support with schema validation using TypeBox
  • Type-safe message payloads with automatic inference
  • Runtime validation of all queued messages
  • Compile-time errors for invalid message structures

Storage Flexibility

  • Memory provider for development and testing
  • Redis provider for production scalability and persistence
  • Custom provider support for specialized backends
  • Automatic failover and connection pooling

Performance & Scalability

  • Batch processing support for high-throughput scenarios
  • Horizontal scaling with distributed queue backends
  • Configurable concurrency and worker pools
  • Efficient serialization and message routing

Reliability

  • Message persistence across application restarts
  • Automatic retry with exponential backoff
  • Dead letter handling for permanently failed messages
  • Comprehensive logging and monitoring integration
typescript
 1const emailQueue = $queue({ 2  name: "email-notifications", 3  schema: t.object({ 4    to: t.text(), 5    subject: t.text(), 6    body: t.text(), 7    priority: t.optional(t.enum(["high", "normal"])) 8  }), 9  handler: async (message) => {10    await emailService.send(message.payload);11    console.log(`Email sent to ${message.payload.to}`);12  }13});14 15// Push messages for background processing16await emailQueue.push({17  to: "user@example.com",18  subject: "Welcome!",19  body: "Welcome to our platform",20  priority: "high"21});
typescript
 1const imageQueue = $queue({ 2  name: "image-processing", 3  provider: RedisQueueProvider, 4  schema: t.object({ 5    imageId: t.text(), 6    operations: t.array(t.enum(["resize", "compress", "thumbnail"])) 7  }), 8  handler: async (message) => { 9    for (const op of message.payload.operations) {10      await processImage(message.payload.imageId, op);11    }12  }13});14 15// Batch processing multiple images16await imageQueue.push(17  { imageId: "img1", operations: ["resize", "thumbnail"] },18  { imageId: "img2", operations: ["compress"] },19  { imageId: "img3", operations: ["resize", "compress", "thumbnail"] }20);
typescript
 1const taskQueue = $queue({ 2  name: "dev-tasks", 3  provider: "memory", 4  schema: t.object({ 5    taskType: t.enum(["cleanup", "backup", "report"]), 6    data: t.record(t.text(), t.any()) 7  }), 8  handler: async (message) => { 9    switch (message.payload.taskType) {10      case "cleanup":11        await performCleanup(message.payload.data);12        break;13      case "backup":14        await createBackup(message.payload.data);15        break;16      case "report":17        await generateReport(message.payload.data);18        break;19    }20  }21});
On This Page
No headings found...
ready
mainTypeScript
UTF-8packages_alepha_queue_core.md