alepha@docs:~/docs/reference/primitives$
cat $consumer.md1 min read
#$consumer
#Import
typescript
1import { $consumer } from "alepha/queue";
#Overview
Creates a consumer primitive to process messages from a specific queue.
Provides a dedicated message consumer that connects to a queue and processes messages with custom handler logic, enabling scalable architectures where multiple consumers can process messages from the same queue.
Key Features
- Seamless integration with any $queue primitive
- Full type safety inherited from queue schema
- Automatic worker management for background processing
- Built-in error handling and retry mechanisms
- Support for multiple consumers per queue for horizontal scaling
Common Use Cases
- Email sending and notification services
- Image and media processing workers
- Data synchronization and background jobs
#Options
| Option | Type | Required | Description |
|---|---|---|---|
queue |
QueuePrimitive<T> |
Yes | The queue primitive that this consumer will process messages from |
handler |
Object |
Yes | Message handler function that processes individual messages from the queue |
#Examples
ts
1class EmailService { 2 emailQueue = $queue({ 3 name: "emails", 4 schema: t.object({ 5 to: t.text(), 6 subject: t.text(), 7 body: t.text() 8 }) 9 });10 11 emailConsumer = $consumer({12 queue: this.emailQueue,13 handler: async (message) => {14 const { to, subject, body } = message.payload;15 await this.sendEmail(to, subject, body);16 }17 });18 19 async sendWelcomeEmail(userEmail: string) {20 await this.emailQueue.push({21 to: userEmail,22 subject: "Welcome!",23 body: "Thanks for joining."24 });25 }26}