Alepha Topic

A publish-subscribe (pub/sub) messaging interface for eventing.

Installation

This package is part of the Alepha framework and can be installed via the all-in-one package:

npm install alepha

Module

Generic interface for pub/sub messaging. Gives you the ability to create topics and subscribers. This module provides only a memory implementation of the topic provider.

This module can be imported and used as follows:

import { Alepha, run } from "alepha";
import { AlephaTopic } from "alepha/topic";

const alepha = Alepha.create()
  .with(AlephaTopic);

run(alepha);

API Reference

Descriptors

Descriptors are functions that define and configure various aspects of your application. They follow the convention of starting with $ and return configured descriptor instances.

For more details, see the Descriptors documentation.

$subscriber()

Creates a subscriber descriptor to listen for messages from a specific topic.

This descriptor creates a dedicated message subscriber that connects to a topic and processes its messages using a custom handler function. Subscribers provide a clean way to separate message publishing from consumption, enabling scalable pub/sub architectures where multiple subscribers can react to the same events independently.

Key Features

  • Topic Integration: Seamlessly connects to any $topic descriptor
  • Type Safety: Full TypeScript support inherited from the connected topic's schema
  • Dedicated Processing: Isolated message processing logic separate from the topic
  • Real-time Processing: Immediate message delivery when events are published
  • Error Isolation: Subscriber errors don't affect other subscribers or the topic
  • Scalability: Multiple subscribers can listen to the same topic independently

Use Cases

Perfect for creating specialized event handlers:

  • Notification services for user events
  • Analytics and logging systems
  • Data synchronization between services
  • Real-time UI updates
  • Event-driven workflow triggers
  • Audit and compliance logging

Basic subscriber setup:

import { $topic, $subscriber } from "alepha/topic";
import { t } from "alepha";

class UserActivityService {
  // Define the topic
  userEvents = $topic({
    name: "user-activity",
    schema: {
      payload: t.object({
        userId: t.string(),
        action: t.enum(["login", "logout", "purchase"]),
        timestamp: t.number(),
        metadata: t.optional(t.record(t.string(), t.any()))
      })
    }
  });

  // Create a dedicated subscriber for this topic
  activityLogger = $subscriber({
    topic: this.userEvents,
    handler: async (message) => {
      const { userId, action, timestamp } = message.payload;

      await this.auditLogger.log({
        event: 'user_activity',
        userId,
        action,
        timestamp,
        source: 'user-activity-topic'
      });

      this.log.info(`User ${userId} performed ${action} at ${new Date(timestamp).toISOString()}`);
    }
  });

  async trackUserLogin(userId: string, metadata: Record<string, any>) {
    // Publish to topic - subscriber will automatically process it
    await this.userEvents.publish({
      userId,
      action: "login",
      timestamp: Date.now(),
      metadata
    });
  }
}

Multiple specialized subscribers for different concerns:

class OrderEventHandlers {
  orderEvents = $topic({
    name: "order-events",
    schema: {
      payload: t.object({
        orderId: t.string(),
        customerId: t.string(),
        status: t.union([
          t.literal("created"),
          t.literal("paid"),
          t.literal("shipped"),
          t.literal("delivered")
        ]),
        data: t.optional(t.record(t.string(), t.any()))
      })
    }
  });

  // Analytics subscriber
  analyticsSubscriber = $subscriber({
    topic: this.orderEvents,
    handler: async (message) => {
      await this.analytics.track('order_status_changed', {
        orderId: message.payload.orderId,
        customerId: message.payload.customerId,
        status: message.payload.status,
        timestamp: Date.now()
      });
    }
  });

  // Email notification subscriber
  emailSubscriber = $subscriber({
    topic: this.orderEvents,
    handler: async (message) => {
      const { customerId, orderId, status } = message.payload;

      const templates = {
        'paid': 'order-confirmation',
        'shipped': 'order-shipped',
        'delivered': 'order-delivered'
      };

      const template = templates[status];
      if (template) {
        await this.emailService.send({
          customerId,
          template,
          data: { orderId, status }
        });
      }
    }
  });

  // Inventory management subscriber
  inventorySubscriber = $subscriber({
    topic: this.orderEvents,
    handler: async (message) => {
      if (message.payload.status === 'paid') {
        await this.inventoryService.reserveItems(message.payload.orderId);
      } else if (message.payload.status === 'delivered') {
        await this.inventoryService.confirmDelivery(message.payload.orderId);
      }
    }
  });
}

Subscriber with advanced error handling and filtering:

class NotificationSubscriber {
  systemEvents = $topic({
    name: "system-events",
    schema: {
      payload: t.object({
        eventType: t.string(),
        severity: t.enum(["info", "warning", "error"]),
        serviceId: t.string(),
        message: t.string(),
        data: t.optional(t.record(t.string(), t.any()))
      })
    }
  });

  alertSubscriber = $subscriber({
    topic: this.systemEvents,
    handler: async (message) => {
      const { eventType, severity, serviceId, message: eventMessage, data } = message.payload;

      try {
        // Only process error events for alerting
        if (severity !== 'error') {
          return;
        }

        // Log the event
        this.logger.error(`System alert from ${serviceId}`, {
          eventType,
          message: eventMessage,
          data
        });

        // Send alerts based on service criticality
        const criticalServices = ['payment', 'auth', 'database'];
        const isCritical = criticalServices.includes(serviceId);

        if (isCritical) {
          // Immediate alert for critical services
          await this.alertService.sendImmediate({
            title: `Critical Error in ${serviceId}`,
            message: eventMessage,
            severity: 'critical',
            metadata: { eventType, serviceId, data }
          });
        } else {
          // Queue non-critical alerts for batching
          await this.alertService.queueAlert({
            title: `Error in ${serviceId}`,
            message: eventMessage,
            severity: 'error',
            metadata: { eventType, serviceId, data }
          });
        }

        // Update service health status
        await this.healthMonitor.recordError(serviceId, eventType);

      } catch (error) {
        // Log subscriber errors but don't re-throw
        // This prevents one failing subscriber from affecting others
        this.log.error(`Alert subscriber failed`, {
          originalEvent: { eventType, serviceId, severity },
          subscriberError: error.message
        });
      }
    }
  });
}

Subscriber for real-time data aggregation:

class MetricsAggregator {
  userActivityTopic = $topic({
    name: "user-metrics",
    schema: {
      payload: t.object({
        userId: t.string(),
        sessionId: t.string(),
        eventType: t.string(),
        timestamp: t.number(),
        duration: t.optional(t.number()),
        metadata: t.optional(t.record(t.string(), t.any()))
      })
    }
  });

  metricsSubscriber = $subscriber({
    topic: this.userActivityTopic,
    handler: async (message) => {
      const { userId, sessionId, eventType, timestamp, duration, metadata } = message.payload;

      // Update real-time metrics
      await Promise.all([
        // Update user activity counters
        this.metricsStore.increment(`user:${userId}:events:${eventType}`, 1),
        this.metricsStore.increment(`global:events:${eventType}`, 1),

        // Track session activity
        this.sessionStore.updateActivity(sessionId, timestamp),

        // Record duration metrics if provided
        duration ? this.metricsStore.recordDuration(`events:${eventType}:duration`, duration) : Promise.resolve(),

        // Update time-based aggregations
        this.timeSeriesStore.addPoint({
          metric: `user_activity.${eventType}`,
          timestamp,
          value: 1,
          tags: { userId, sessionId }
        })
      ]);

      // Trigger real-time dashboard updates
      await this.dashboardService.updateRealTimeStats({
        eventType,
        userId,
        timestamp
      });

      this.logger.debug(`Processed metrics for ${eventType}`, {
        userId,
        eventType,
        timestamp
      });
    }
  });
}

$topic()

Creates a topic descriptor for pub/sub messaging and event-driven architecture.

This descriptor provides a powerful publish/subscribe system that enables decoupled communication between different parts of your application. Topics allow multiple publishers to send messages and multiple subscribers to receive them, creating flexible event-driven architectures with support for real-time messaging and asynchronous event processing.

Key Features

  • Publish/Subscribe Pattern: Decoupled communication between publishers and subscribers
  • Multiple Subscribers: One-to-many message distribution with automatic fan-out
  • Type-Safe Messages: Full TypeScript support with schema validation using TypeBox
  • Real-time Processing: Immediate message delivery to active subscribers
  • Event Filtering: Subscribe to specific message types using filter functions
  • Timeout Support: Wait for specific messages with configurable timeouts
  • Multiple Backends: Support for in-memory, Redis, and custom topic providers
  • Error Resilience: Built-in error handling and message processing recovery

Use Cases

Perfect for event-driven architectures and real-time communication:

  • User activity notifications
  • Real-time chat and messaging systems
  • System event broadcasting
  • Microservice communication
  • Live data updates and synchronization
  • Application state change notifications
  • Webhook and external API event handling

Basic topic with publish/subscribe:

import { $topic } from "alepha/topic";
import { t } from "alepha";

class NotificationService {
  userActivity = $topic({
    name: "user-activity",
    schema: {
      payload: t.object({
        userId: t.string(),
        action: t.enum(["login", "logout", "purchase"]),
        timestamp: t.number(),
        metadata: t.optional(t.record(t.string(), t.any()))
      })
    },
    handler: async (message) => {
      // This subscriber runs automatically for all messages
      console.log(`User ${message.payload.userId} performed ${message.payload.action}`);
    }
  });

  async trackUserLogin(userId: string) {
    // Publish event - all subscribers will receive it
    await this.userActivity.publish({
      userId,
      action: "login",
      timestamp: Date.now(),
      metadata: { source: "web", ip: "192.168.1.1" }
    });
  }

  async setupAdditionalSubscriber() {
    // Add another subscriber dynamically
    await this.userActivity.subscribe(async (message) => {
      if (message.payload.action === "purchase") {
        await this.sendPurchaseConfirmation(message.payload.userId);
      }
    });
  }
}

Real-time chat system with multiple subscribers:

class ChatService {
  messagesTopic = $topic({
    name: "chat-messages",
    description: "Real-time chat messages for all rooms",
    schema: {
      payload: t.object({
        messageId: t.string(),
        roomId: t.string(),
        userId: t.string(),
        content: t.string(),
        timestamp: t.number(),
        messageType: t.enum(["text", "image", "file"])
      })
    }
  });

  async sendMessage(roomId: string, userId: string, content: string) {
    await this.messagesTopic.publish({
      messageId: generateId(),
      roomId,
      userId,
      content,
      timestamp: Date.now(),
      messageType: "text"
    });
  }

  // Different services can subscribe to the same topic
  async setupMessageLogging() {
    await this.messagesTopic.subscribe(async (message) => {
      // Log all messages for compliance
      await this.auditLogger.log({
        action: "message_sent",
        roomId: message.payload.roomId,
        userId: message.payload.userId,
        timestamp: message.payload.timestamp
      });
    });
  }

  async setupNotificationService() {
    await this.messagesTopic.subscribe(async (message) => {
      // Send push notifications to offline users
      const offlineUsers = await this.getOfflineUsersInRoom(message.payload.roomId);
      await this.sendPushNotifications(offlineUsers, {
        title: `New message in ${message.payload.roomId}`,
        body: message.payload.content
      });
    });
  }
}

Event filtering and waiting for specific messages:

class OrderService {
  orderEvents = $topic({
    name: "order-events",
    schema: {
      payload: t.object({
        orderId: t.string(),
        status: t.union([
          t.literal("created"),
          t.literal("paid"),
          t.literal("shipped"),
          t.literal("delivered"),
          t.literal("cancelled")
        ]),
        timestamp: t.number(),
        data: t.optional(t.record(t.string(), t.any()))
      })
    }
  });

  async processOrder(orderId: string) {
    // Publish order created event
    await this.orderEvents.publish({
      orderId,
      status: "created",
      timestamp: Date.now()
    });

    // Wait for payment confirmation with timeout
    try {
      const paymentEvent = await this.orderEvents.wait({
        timeout: [5, "minutes"],
        filter: (message) =>
          message.payload.orderId === orderId &&
          message.payload.status === "paid"
      });

      console.log(`Order ${orderId} was paid at ${paymentEvent.payload.timestamp}`);

      // Continue with shipping...
      await this.initiateShipping(orderId);

    } catch (error) {
      if (error instanceof TopicTimeoutError) {
        console.log(`Payment timeout for order ${orderId}`);
        await this.cancelOrder(orderId);
      }
    }
  }

  async setupOrderTracking() {
    // Subscribe only to shipping events
    await this.orderEvents.subscribe(async (message) => {
      if (message.payload.status === "shipped") {
        await this.updateTrackingInfo(message.payload.orderId, message.payload.data);
        await this.notifyCustomer(message.payload.orderId, "Your order has shipped!");
      }
    });
  }
}

Redis-backed topic for distributed systems:

class DistributedEventSystem {
  systemEvents = $topic({
    name: "system-events",
    provider: RedisTopicProvider,  // Use Redis for cross-service communication
    schema: {
      payload: t.object({
        eventType: t.string(),
        serviceId: t.string(),
        data: t.record(t.string(), t.any()),
        timestamp: t.number(),
        correlationId: t.optional(t.string())
      })
    },
    handler: async (message) => {
      // Central event handler for all system events
      await this.processSystemEvent(message.payload);
    }
  });

  async publishServiceHealth(serviceId: string, healthy: boolean) {
    await this.systemEvents.publish({
      eventType: "service.health",
      serviceId,
      data: { healthy, checkedAt: new Date().toISOString() },
      timestamp: Date.now()
    });
  }

  async setupHealthMonitoring() {
    await this.systemEvents.subscribe(async (message) => {
      if (message.payload.eventType === "service.health") {
        await this.updateServiceStatus(
          message.payload.serviceId,
          message.payload.data.healthy
        );

        if (!message.payload.data.healthy) {
          await this.alertOnCall(`Service ${message.payload.serviceId} is down`);
        }
      }
    });
  }
}

Table of contents