alepha@docs:~/docs/reference/primitives$
cat $consumer.md
1 min read

#$consumer

#Import

typescript
1import { $consumer } from "alepha/queue";

#Overview

Creates a consumer primitive to process messages from a specific queue.

Provides a dedicated message consumer that connects to a queue and processes messages with custom handler logic, enabling scalable architectures where multiple consumers can process messages from the same queue.

Key Features

  • Seamless integration with any $queue primitive
  • Full type safety inherited from queue schema
  • Automatic worker management for background processing
  • Built-in error handling and retry mechanisms
  • Support for multiple consumers per queue for horizontal scaling

Common Use Cases

  • Email sending and notification services
  • Image and media processing workers
  • Data synchronization and background jobs

#Options

Option Type Required Description
queue QueuePrimitive<T> Yes The queue primitive that this consumer will process messages from
handler Object Yes Message handler function that processes individual messages from the queue

#Examples

ts
 1class EmailService { 2  emailQueue = $queue({ 3    name: "emails", 4    schema: t.object({ 5      to: t.text(), 6      subject: t.text(), 7      body: t.text() 8    }) 9  });10 11  emailConsumer = $consumer({12    queue: this.emailQueue,13    handler: async (message) => {14      const { to, subject, body } = message.payload;15      await this.sendEmail(to, subject, body);16    }17  });18 19  async sendWelcomeEmail(userEmail: string) {20    await this.emailQueue.push({21      to: userEmail,22      subject: "Welcome!",23      body: "Thanks for joining."24    });25  }26}