Event-Driven Architecture

Building Reactive Systems with Message Queues

Introduction to Event-Driven Architecture

In our previous lectures, we explored message queue concepts and RabbitMQ implementation. Today, we'll dive into event-driven architecture (EDA), a powerful design paradigm that leverages message queues to create responsive, scalable, and loosely coupled systems.

Event-driven architecture represents a significant shift from traditional request-response models. Instead of services directly calling each other and waiting for responses, services communicate by producing and consuming events, creating systems that are more resilient, scalable, and adaptable to change.

Core Concepts of Event-Driven Architecture

What is an Event?

An event is a significant change in state or a notable occurrence within a system. Events are immutable records of something that has happened in the past.

Examples of Events:

  • Business Events: OrderPlaced, PaymentReceived, ShipmentDelivered
  • System Events: UserLoggedIn, DatabaseConnectionLost, CPUThresholdExceeded
  • User Interface Events: ButtonClicked, FormSubmitted, PageViewed

Key Components

A typical event-driven architecture consists of several key components:

                    graph LR
                    A[Event Producer] -->|publishes event| B[Event Channel]
                    B -->|delivers event| C[Event Consumer]
                    B -->|delivers event| D[Event Consumer]
                    B -->|delivers event| E[Event Consumer]
                    style B fill:#f9f9f9,stroke:#333,stroke-width:2px
                

Analogy: The Newspaper System

Think of event-driven architecture like a newspaper distribution system:

  • Reporters (Event Producers): Discover and write about newsworthy events
  • Newspaper Company (Event Channel): Compiles, prints, and distributes the news
  • Readers (Event Consumers): Receive the newspaper and react to the news in different ways
  • Archives (Event Store): Maintain a historical record of all published news

Just as different readers may react differently to the same news story, different event consumers may take different actions based on the same event. And just as yesterday's newspaper becomes a historical record, events in a system represent an immutable history of what has occurred.

Benefits of Event-Driven Architecture

Loose Coupling

Event producers and consumers are decoupled, allowing them to evolve independently. Producers don't need to know who consumes their events, and consumers don't need to know who produced them.

                    graph TD
                    subgraph "Tightly Coupled Architecture"
                    A1[Service A] -->|direct call| B1[Service B]
                    A1 -->|direct call| C1[Service C]
                    A1 -->|direct call| D1[Service D]
                    end
                    
                    subgraph "Event-Driven Architecture"
                    A2[Service A] -->|publishes event| E[Event Channel]
                    E -->|consumes event| B2[Service B]
                    E -->|consumes event| C2[Service C]
                    E -->|consumes event| D2[Service D]
                    end
                    
                    style E fill:#f9f9f9,stroke:#333,stroke-width:2px
                

In a tightly coupled system, Service A needs to know about and directly call each downstream service. If a new service is added, Service A must be modified. In an event-driven system, Service A simply publishes events, and any number of services can consume those events without Service A knowing about them.

Scalability and Resilience

Event-driven systems can scale more efficiently because:

Flexibility and Extensibility

Adding new functionality often requires only adding new event consumers, without modifying existing components. This makes the system more adaptable to changing requirements.

Real-time Responsiveness

Events can trigger immediate reactions, enabling real-time updates, notifications, and responsive user experiences.

Audit Trail and Time Travel

When events are stored (event sourcing), they provide a complete history of all changes in the system, enabling powerful auditing, debugging, and even "time travel" capabilities.

Common Event-Driven Patterns

Event Notification

The simplest pattern: a system notifies other systems that something has happened, without including detailed information about what happened.

                    graph LR
                    A[Order Service] -->|"OrderCreated(orderId: 123)"| B[Event Channel]
                    B -->|notification| C[Email Service]
                    B -->|notification| D[Analytics Service]
                    style B fill:#f9f9f9,stroke:#333,stroke-width:2px
                

In this pattern, consumers usually need to request additional information from the source system or a database to take meaningful action.

Event-Carried State Transfer

Events contain sufficient information (state) for consumers to process them without needing to request additional data.

                    graph LR
                    A[Order Service] -->|"OrderCreated(orderId: 123, customer: {name, email}, items: [...])"| B[Event Channel]
                    B -->|complete data| C[Email Service]
                    B -->|complete data| D[Analytics Service]
                    style B fill:#f9f9f9,stroke:#333,stroke-width:2px
                

This pattern reduces the coupling between services by eliminating the need for additional API calls.

Event Sourcing

Instead of storing the current state of entities, the system stores all events that led to that state. The current state can be reconstructed by replaying these events.

                    graph TD
                    A[User Actions] -->|generate| B[Events]
                    B -->|stored in| C[Event Store]
                    C -->|replay events| D[Current State]
                    C -->|replay events| E[Audit Trail]
                    C -->|replay events| F[Analytics]
                    style C fill:#f9f9f9,stroke:#333,stroke-width:2px
                

Event sourcing provides a complete audit trail and enables powerful time-based queries and analytics.

Command Query Responsibility Segregation (CQRS)

Separates read and write operations, potentially using different models, databases, and even programming languages for each.

                    graph TD
                    A[User] -->|Commands| B[Command Handler]
                    A -->|Queries| C[Query Handler]
                    B -->|writes events| D[Event Store]
                    D -->|updates| E[Read Model]
                    C -->|reads from| E
                    style D fill:#f9f9f9,stroke:#333,stroke-width:2px
                

CQRS is often combined with event sourcing, where commands generate events that update the read model.

Saga Pattern

Manages distributed transactions across multiple services using a sequence of events and compensating actions.

                    graph LR
                    A[Order Service] -->|OrderCreated| B[Payment Service]
                    B -->|PaymentSucceeded| C[Inventory Service]
                    C -->|ItemsReserved| D[Shipping Service]
                    D -->|OrderShipped| E[Notification Service]
                    
                    B -.->|PaymentFailed| F[Order Cancellation]
                    C -.->|InsufficientStock| F
                    F -.->|compensating action| B
                    
                    style F fill:#ffdddd,stroke:#ff0000,stroke-width:1px
                

The saga pattern helps maintain data consistency across services without requiring distributed transactions.

Designing Good Events

Event Naming Conventions

Events should be named in past tense, as they represent something that has already happened:

Event Structure

Well-designed events typically include:

Example Event Structure

{
  "eventId": "e7f45ce3-2c3b-4af4-8ce0-843a3e58a3f9",  // Unique identifier
  "eventType": "OrderPlaced",                          // Type of event
  "timestamp": "2025-05-05T14:30:00Z",                 // When it happened
  "producer": "order-service",                         // Who produced it
  "version": "1.0",                                   // Schema version
  "correlationId": "c9b5f789-3a5b-46f3-8dd4-40f12d41f202", // For tracing
  "data": {                                           // Event payload
    "orderId": "ORD-12345",
    "customerId": "CUST-6789",
    "items": [
      { "productId": "PROD-101", "quantity": 2, "price": 25.99 },
      { "productId": "PROD-205", "quantity": 1, "price": 59.99 }
    ],
    "totalAmount": 111.97,
    "shippingAddress": {
      "street": "123 Main St",
      "city": "Boston",
      "state": "MA",
      "zipCode": "02108"
    }
  }
}
                

Event Schema Evolution

As your system evolves, event schemas will need to change. Consider these strategies:

Event Size Considerations

Consider what data to include in events:

Implementing Event-Driven Architecture with Node.js and RabbitMQ

Let's implement a simple e-commerce system using event-driven architecture with Node.js and RabbitMQ.

Project Structure

event-driven-demo/
  ├── common/
  │   ├── events.js         # Event definitions
  │   └── rabbit-client.js  # Shared RabbitMQ client
  ├── services/
  │   ├── order-service/    # Handles orders, publishes OrderPlaced events
  │   ├── payment-service/  # Processes payments, publishes PaymentProcessed events
  │   ├── inventory-service/ # Updates inventory, publishes InventoryUpdated events
  │   └── notification-service/ # Sends emails, SMS based on various events
  └── package.json
            

Shared RabbitMQ Client

common/rabbit-client.js

const amqp = require('amqplib');

class RabbitClient {
  constructor() {
    this.connection = null;
    this.channel = null;
    this.EXCHANGE_NAME = 'events';
    this.EXCHANGE_TYPE = 'topic';
  }

  async connect() {
    try {
      this.connection = await amqp.connect('amqp://localhost');
      this.channel = await this.connection.createChannel();
      
      // Declare the events exchange
      await this.channel.assertExchange(this.EXCHANGE_NAME, this.EXCHANGE_TYPE, {
        durable: true
      });
      
      console.log('Connected to RabbitMQ');
      return this.channel;
    } catch (error) {
      console.error('Error connecting to RabbitMQ:', error);
      throw error;
    }
  }

  async publishEvent(eventType, data) {
    if (!this.channel) {
      await this.connect();
    }
    
    const event = {
      eventId: generateUUID(),
      eventType,
      timestamp: new Date().toISOString(),
      producer: process.env.SERVICE_NAME || 'unknown-service',
      version: '1.0',
      correlationId: data.correlationId || generateUUID(),
      data
    };
    
    const routingKey = eventType.toLowerCase().replace(/([A-Z])/g, '.$1').toLowerCase();
    const success = this.channel.publish(
      this.EXCHANGE_NAME,
      routingKey,
      Buffer.from(JSON.stringify(event)),
      { persistent: true }
    );
    
    console.log(`Published event ${eventType} with routing key ${routingKey}`);
    return success;
  }

  async subscribeToEvent(eventType, handler) {
    if (!this.channel) {
      await this.connect();
    }
    
    // Create a queue for this service
    const serviceName = process.env.SERVICE_NAME || 'unknown-service';
    const queueName = `${serviceName}.${eventType.toLowerCase()}`;
    
    await this.channel.assertQueue(queueName, {
      durable: true
    });
    
    // Convert camelCase to dot.notation for routing key
    const routingKey = eventType.replace(/([A-Z])/g, '.$1').toLowerCase().substring(1);
    
    // Bind queue to the exchange with the routing key
    await this.channel.bindQueue(queueName, this.EXCHANGE_NAME, routingKey);
    
    // Consume messages
    this.channel.consume(queueName, async (msg) => {
      if (msg !== null) {
        try {
          const eventData = JSON.parse(msg.content.toString());
          console.log(`Received ${eventType} event`);
          
          // Call the handler with the event data
          await handler(eventData);
          
          // Acknowledge the message
          this.channel.ack(msg);
        } catch (error) {
          console.error(`Error processing ${eventType} event:`, error);
          // Reject the message and requeue
          this.channel.nack(msg, false, false);
        }
      }
    });
    
    console.log(`Subscribed to ${eventType} events`);
  }

  async close() {
    if (this.channel) {
      await this.channel.close();
    }
    if (this.connection) {
      await this.connection.close();
    }
  }
}

// Helper function to generate UUIDs
function generateUUID() {
  return 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, function(c) {
    const r = Math.random() * 16 | 0, v = c == 'x' ? r : (r & 0x3 | 0x8);
    return v.toString(16);
  });
}

module.exports = new RabbitClient();
                

Event Definitions

common/events.js

// Define event types as constants
const EventTypes = {
  // Order events
  ORDER_PLACED: 'OrderPlaced',
  ORDER_UPDATED: 'OrderUpdated',
  ORDER_CANCELLED: 'OrderCancelled',
  
  // Payment events
  PAYMENT_PROCESSED: 'PaymentProcessed',
  PAYMENT_FAILED: 'PaymentFailed',
  
  // Inventory events
  INVENTORY_UPDATED: 'InventoryUpdated',
  INVENTORY_RESERVED: 'InventoryReserved',
  INVENTORY_RELEASED: 'InventoryReleased',
  PRODUCT_OUT_OF_STOCK: 'ProductOutOfStock',
  
  // Notification events
  NOTIFICATION_SENT: 'NotificationSent',
  NOTIFICATION_FAILED: 'NotificationFailed'
};

module.exports = {
  EventTypes
};
                

Order Service Implementation

services/order-service/index.js

const express = require('express');
const bodyParser = require('body-parser');
const rabbitClient = require('../../common/rabbit-client');
const { EventTypes } = require('../../common/events');

// Set service name
process.env.SERVICE_NAME = 'order-service';

const app = express();
app.use(bodyParser.json());

// In-memory store for orders (would be a database in a real app)
const orders = {};

// Create a new order
app.post('/orders', async (req, res) => {
  try {
    const { customerId, items } = req.body;
    
    if (!customerId || !items || !Array.isArray(items) || items.length === 0) {
      return res.status(400).json({ error: 'Invalid order data' });
    }
    
    // Create order
    const orderId = `ORD-${Date.now()}`;
    const totalAmount = items.reduce((sum, item) => sum + (item.price * item.quantity), 0);
    
    const order = {
      orderId,
      customerId,
      items,
      totalAmount,
      status: 'CREATED',
      createdAt: new Date().toISOString()
    };
    
    // Save order
    orders[orderId] = order;
    
    // Publish OrderPlaced event
    await rabbitClient.publishEvent(EventTypes.ORDER_PLACED, order);
    
    res.status(201).json({ 
      message: 'Order created successfully',
      orderId,
      status: order.status
    });
  } catch (error) {
    console.error('Error creating order:', error);
    res.status(500).json({ error: 'Failed to create order' });
  }
});

// Get order by ID
app.get('/orders/:orderId', (req, res) => {
  const { orderId } = req.params;
  const order = orders[orderId];
  
  if (!order) {
    return res.status(404).json({ error: 'Order not found' });
  }
  
  res.json(order);
});

// Listen for PaymentProcessed events
rabbitClient.connect().then(() => {
  rabbitClient.subscribeToEvent(EventTypes.PAYMENT_PROCESSED, async (event) => {
    const { orderId, status } = event.data;
    
    if (orders[orderId]) {
      // Update order status
      orders[orderId].status = status === 'SUCCESS' ? 'PAID' : 'PAYMENT_FAILED';
      console.log(`Updated order ${orderId} status to ${orders[orderId].status}`);
      
      // Publish OrderUpdated event
      await rabbitClient.publishEvent(EventTypes.ORDER_UPDATED, {
        orderId,
        status: orders[orderId].status,
        updatedAt: new Date().toISOString()
      });
    }
  });
  
  // Also listen for inventory events to update order status
  rabbitClient.subscribeToEvent(EventTypes.INVENTORY_RESERVED, async (event) => {
    const { orderId } = event.data;
    
    if (orders[orderId] && orders[orderId].status === 'PAID') {
      orders[orderId].status = 'READY_FOR_SHIPMENT';
      console.log(`Updated order ${orderId} status to READY_FOR_SHIPMENT`);
      
      await rabbitClient.publishEvent(EventTypes.ORDER_UPDATED, {
        orderId,
        status: orders[orderId].status,
        updatedAt: new Date().toISOString()
      });
    }
  });
  
  rabbitClient.subscribeToEvent(EventTypes.PRODUCT_OUT_OF_STOCK, async (event) => {
    const { orderId, productId } = event.data;
    
    if (orders[orderId]) {
      orders[orderId].status = 'INVENTORY_ISSUE';
      console.log(`Updated order ${orderId} status due to inventory issue with product ${productId}`);
      
      await rabbitClient.publishEvent(EventTypes.ORDER_UPDATED, {
        orderId,
        status: orders[orderId].status,
        productId,
        updatedAt: new Date().toISOString()
      });
    }
  });
});

// Start server
const PORT = process.env.PORT || 3001;
app.listen(PORT, () => {
  console.log(`Order service listening on port ${PORT}`);
});
                

Payment Service Implementation

services/payment-service/index.js

const rabbitClient = require('../../common/rabbit-client');
const { EventTypes } = require('../../common/events');

// Set service name
process.env.SERVICE_NAME = 'payment-service';

// Connect to RabbitMQ
rabbitClient.connect().then(() => {
  // Subscribe to OrderPlaced events
  rabbitClient.subscribeToEvent(EventTypes.ORDER_PLACED, async (event) => {
    const order = event.data;
    console.log(`Processing payment for order ${order.orderId}`);
    
    try {
      // Simulate payment processing
      const paymentSuccess = Math.random() > 0.2; // 80% chance of success
      const processingTime = Math.floor(Math.random() * 2000) + 1000; // 1-3 seconds
      
      await new Promise(resolve => setTimeout(resolve, processingTime));
      
      // Payment result
      const paymentResult = {
        orderId: order.orderId,
        customerId: order.customerId,
        amount: order.totalAmount,
        status: paymentSuccess ? 'SUCCESS' : 'FAILED',
        transactionId: `TXN-${Date.now()}`,
        processedAt: new Date().toISOString()
      };
      
      // Publish appropriate event based on payment result
      if (paymentSuccess) {
        await rabbitClient.publishEvent(EventTypes.PAYMENT_PROCESSED, paymentResult);
        console.log(`Payment successful for order ${order.orderId}`);
      } else {
        await rabbitClient.publishEvent(EventTypes.PAYMENT_FAILED, paymentResult);
        console.log(`Payment failed for order ${order.orderId}`);
      }
    } catch (error) {
      console.error(`Error processing payment for order ${order.orderId}:`, error);
      
      // Publish failure event
      await rabbitClient.publishEvent(EventTypes.PAYMENT_FAILED, {
        orderId: order.orderId,
        customerId: order.customerId,
        amount: order.totalAmount,
        status: 'ERROR',
        error: error.message,
        processedAt: new Date().toISOString()
      });
    }
  });
  
  console.log('Payment service started');
});
                

Inventory Service Implementation

services/inventory-service/index.js

const rabbitClient = require('../../common/rabbit-client');
const { EventTypes } = require('../../common/events');

// Set service name
process.env.SERVICE_NAME = 'inventory-service';

// Simulate inventory database
const inventory = {
  'PROD-101': { name: 'Wireless Headphones', quantity: 50 },
  'PROD-102': { name: 'Smartphone Case', quantity: 100 },
  'PROD-103': { name: 'USB-C Cable', quantity: 150 },
  'PROD-104': { name: 'Bluetooth Speaker', quantity: 30 },
  'PROD-105': { name: 'Smart Watch', quantity: 25 }
};

// Connect to RabbitMQ
rabbitClient.connect().then(() => {
  // Subscribe to PaymentProcessed events
  rabbitClient.subscribeToEvent(EventTypes.PAYMENT_PROCESSED, async (event) => {
    const { orderId, status } = event.data;
    
    if (status !== 'SUCCESS') {
      console.log(`Skipping inventory check for order ${orderId} due to payment status ${status}`);
      return;
    }
    
    console.log(`Checking inventory for order ${orderId}`);
    
    // Get order details (in a real app, this would be a database query)
    // Here we're simulating by making a request to the order service API
    try {
      const orderResponse = await fetch(`http://localhost:3001/orders/${orderId}`);
      
      if (!orderResponse.ok) {
        throw new Error(`Failed to get order details: ${orderResponse.statusText}`);
      }
      
      const order = await orderResponse.json();
      let allItemsAvailable = true;
      let outOfStockItem = null;
      
      // Check inventory for each item
      for (const item of order.items) {
        const { productId, quantity } = item;
        
        if (!inventory[productId] || inventory[productId].quantity < quantity) {
          allItemsAvailable = false;
          outOfStockItem = productId;
          break;
        }
      }
      
      if (allItemsAvailable) {
        // Update inventory
        for (const item of order.items) {
          const { productId, quantity } = item;
          inventory[productId].quantity -= quantity;
          
          // Publish inventory update event
          await rabbitClient.publishEvent(EventTypes.INVENTORY_UPDATED, {
            productId,
            newQuantity: inventory[productId].quantity,
            updatedAt: new Date().toISOString()
          });
        }
        
        // Publish inventory reserved event
        await rabbitClient.publishEvent(EventTypes.INVENTORY_RESERVED, {
          orderId,
          items: order.items,
          reservedAt: new Date().toISOString()
        });
        
        console.log(`Inventory reserved for order ${orderId}`);
      } else {
        // Publish out of stock event
        await rabbitClient.publishEvent(EventTypes.PRODUCT_OUT_OF_STOCK, {
          orderId,
          productId: outOfStockItem,
          notifiedAt: new Date().toISOString()
        });
        
        console.log(`Product ${outOfStockItem} is out of stock for order ${orderId}`);
      }
    } catch (error) {
      console.error(`Error processing inventory for order ${orderId}:`, error);
    }
  });
  
  // Subscribe to OrderCancelled events to release inventory
  rabbitClient.subscribeToEvent(EventTypes.ORDER_CANCELLED, async (event) => {
    const { orderId, items } = event.data;
    
    if (!items || !Array.isArray(items)) {
      console.log(`No items to release for cancelled order ${orderId}`);
      return;
    }
    
    // Release inventory
    for (const item of items) {
      const { productId, quantity } = item;
      
      if (inventory[productId]) {
        inventory[productId].quantity += quantity;
        
        // Publish inventory update event
        await rabbitClient.publishEvent(EventTypes.INVENTORY_UPDATED, {
          productId,
          newQuantity: inventory[productId].quantity,
          updatedAt: new Date().toISOString()
        });
      }
    }
    
    // Publish inventory released event
    await rabbitClient.publishEvent(EventTypes.INVENTORY_RELEASED, {
      orderId,
      items,
      releasedAt: new Date().toISOString()
    });
    
    console.log(`Inventory released for cancelled order ${orderId}`);
  });
  
  console.log('Inventory service started');
});
                

Notification Service Implementation

services/notification-service/index.js

const rabbitClient = require('../../common/rabbit-client');
const { EventTypes } = require('../../common/events');

// Set service name
process.env.SERVICE_NAME = 'notification-service';

// Simulated notification functions
const notificationMethods = {
  sendEmail: async (recipient, subject, content) => {
    console.log(`EMAIL to ${recipient}: ${subject}`);
    console.log(`Content: ${content}`);
    return { success: true, method: 'email' };
  },
  
  sendSMS: async (phoneNumber, message) => {
    console.log(`SMS to ${phoneNumber}: ${message}`);
    return { success: true, method: 'sms' };
  },
  
  sendPushNotification: async (userId, title, body) => {
    console.log(`PUSH NOTIFICATION to ${userId}: ${title}`);
    console.log(`Body: ${body}`);
    return { success: true, method: 'push' };
  }
};

// Connect to RabbitMQ
rabbitClient.connect().then(() => {
  // Subscribe to OrderPlaced events
  rabbitClient.subscribeToEvent(EventTypes.ORDER_PLACED, async (event) => {
    const order = event.data;
    
    try {
      // Simulate customer data lookup
      const customer = {
        id: order.customerId,
        email: `customer-${order.customerId}@example.com`,
        phone: '+1234567890'
      };
      
      // Send order confirmation email
      await notificationMethods.sendEmail(
        customer.email,
        `Order Confirmation #${order.orderId}`,
        `Thank you for your order! Your order #${order.orderId} has been received and is being processed.`
      );
      
      // Publish notification sent event
      await rabbitClient.publishEvent(EventTypes.NOTIFICATION_SENT, {
        notificationType: 'ORDER_CONFIRMATION',
        recipient: customer.email,
        orderId: order.orderId,
        method: 'email',
        sentAt: new Date().toISOString()
      });
    } catch (error) {
      console.error(`Error sending order confirmation for ${order.orderId}:`, error);
      
      // Publish notification failed event
      await rabbitClient.publishEvent(EventTypes.NOTIFICATION_FAILED, {
        notificationType: 'ORDER_CONFIRMATION',
        orderId: order.orderId,
        error: error.message,
        failedAt: new Date().toISOString()
      });
    }
  });
  
  // Subscribe to PaymentProcessed events
  rabbitClient.subscribeToEvent(EventTypes.PAYMENT_PROCESSED, async (event) => {
    const payment = event.data;
    
    try {
      // Simulate customer data lookup
      const customer = {
        id: payment.customerId,
        email: `customer-${payment.customerId}@example.com`,
        phone: '+1234567890'
      };
      
      // Send payment confirmation email
      await notificationMethods.sendEmail(
        customer.email,
        `Payment Confirmation for Order #${payment.orderId}`,
        `Your payment of $${payment.amount.toFixed(2)} for order #${payment.orderId} has been processed successfully.`
      );
      
      // Publish notification sent event
      await rabbitClient.publishEvent(EventTypes.NOTIFICATION_SENT, {
        notificationType: 'PAYMENT_CONFIRMATION',
        recipient: customer.email,
        orderId: payment.orderId,
        method: 'email',
        sentAt: new Date().toISOString()
      });
    } catch (error) {
      console.error(`Error sending payment confirmation for ${payment.orderId}:`, error);
      
      // Publish notification failed event
      await rabbitClient.publishEvent(EventTypes.NOTIFICATION_FAILED, {
        notificationType: 'PAYMENT_CONFIRMATION',
        orderId: payment.orderId,
        error: error.message,
        failedAt: new Date().toISOString()
      });
    }
  });
  
  // Subscribe to PaymentFailed events
  rabbitClient.subscribeToEvent(EventTypes.PAYMENT_FAILED, async (event) => {
    const payment = event.data;
    
    try {
      // Simulate customer data lookup
      const customer = {
        id: payment.customerId,
        email: `customer-${payment.customerId}@example.com`,
        phone: '+1234567890'
      };
      
      // Send payment failure email
      await notificationMethods.sendEmail(
        customer.email,
        `Payment Failed for Order #${payment.orderId}`,
        `We were unable to process your payment of $${payment.amount.toFixed(2)} for order #${payment.orderId}. Please update your payment information.`
      );
      
      // Also send SMS for urgent notification
      await notificationMethods.sendSMS(
        customer.phone,
        `Payment failed for your recent order #${payment.orderId}. Please check your email for details.`
      );
      
      // Publish notification sent event
      await rabbitClient.publishEvent(EventTypes.NOTIFICATION_SENT, {
        notificationType: 'PAYMENT_FAILURE',
        recipient: customer.email,
        orderId: payment.orderId,
        methods: ['email', 'sms'],
        sentAt: new Date().toISOString()
      });
    } catch (error) {
      console.error(`Error sending payment failure notification for ${payment.orderId}:`, error);
      
      // Publish notification failed event
      await rabbitClient.publishEvent(EventTypes.NOTIFICATION_FAILED, {
        notificationType: 'PAYMENT_FAILURE',
        orderId: payment.orderId,
        error: error.message,
        failedAt: new Date().toISOString()
      });
    }
  });
  
  // Subscribe to ProductOutOfStock events
  rabbitClient.subscribeToEvent(EventTypes.PRODUCT_OUT_OF_STOCK, async (event) => {
    const { orderId, productId } = event.data;
    
    try {
      // Notify store administrator
      await notificationMethods.sendEmail(
        'admin@store.com',
        `Inventory Alert: Product Out of Stock`,
        `Product ${productId} is out of stock and was requested in order #${orderId}. Please restock this item.`
      );
      
      // Publish notification sent event
      await rabbitClient.publishEvent(EventTypes.NOTIFICATION_SENT, {
        notificationType: 'INVENTORY_ALERT',
        recipient: 'admin@store.com',
        productId,
        orderId,
        method: 'email',
        sentAt: new Date().toISOString()
      });
    } catch (error) {
      console.error(`Error sending inventory alert for ${productId}:`, error);
      
      // Publish notification failed event
      await rabbitClient.publishEvent(EventTypes.NOTIFICATION_FAILED, {
        notificationType: 'INVENTORY_ALERT',
        productId,
        orderId,
        error: error.message,
        failedAt: new Date().toISOString()
      });
    }
  });
  
  console.log('Notification service started');
});
                

Running the Services

To run this event-driven architecture example:

  1. Make sure RabbitMQ is running
  2. Install dependencies: npm install express body-parser amqplib node-fetch
  3. Start each service in a separate terminal:
    • node services/order-service/index.js
    • node services/payment-service/index.js
    • node services/inventory-service/index.js
    • node services/notification-service/index.js
  4. Create a new order by sending a POST request to http://localhost:3001/orders with order data
  5. Observe as events flow through the system, triggering various actions in each service

Visualizing Event Flow in the System

Let's visualize how events flow through our e-commerce system when a customer places an order:

                    sequenceDiagram
                    participant C as Client
                    participant OS as Order Service
                    participant PS as Payment Service
                    participant IS as Inventory Service
                    participant NS as Notification Service
                    
                    C->>OS: POST /orders (Create Order)
                    OS->>OS: Save Order
                    OS->>+PS: Publish OrderPlaced
                    OS-->>C: Return OrderId
                    
                    PS->>PS: Process Payment
                    PS->>+IS: Publish PaymentProcessed
                    PS->>+NS: Publish PaymentProcessed
                    
                    NS->>NS: Send Payment Confirmation
                    NS->>NS: Publish NotificationSent
                    
                    IS->>IS: Reserve Inventory
                    IS->>+OS: Publish InventoryReserved
                    
                    OS->>OS: Update Order Status
                    OS->>+NS: Publish OrderUpdated
                    
                    NS->>NS: Send Order Status Update
                    NS->>NS: Publish NotificationSent
                

This sequence diagram shows how a single user action (placing an order) triggers a cascade of events that flow through the system, with each service reacting to events and potentially publishing new events.

Challenges and Considerations

Event Ordering

In distributed systems, events might not always arrive in the order they were generated. Strategies to handle this include:

Event Versioning

As your system evolves, event schemas will change. Options for handling this include:

Event Consistency

Ensuring consistency across services can be challenging:

Debugging and Tracing

Debugging distributed event flows can be complex:

Monitoring Event-Driven Systems

Effective monitoring is crucial for event-driven systems. Key aspects to monitor include:

Key Metrics

Monitoring Tools

Alerting Strategies

Set up alerts for conditions like:

Real-World Applications

Event-driven architecture is used in many domains and industries:

Financial Services

E-commerce

IoT (Internet of Things)

Social Media

Practice Activities

Activity 1: Event Storming

Event storming is a workshop format for exploring complex business domains. In small groups:

  1. Choose a domain (e.g., food delivery, car rental, etc.)
  2. Identify all significant events that can happen in that domain
  3. Arrange them in chronological order
  4. Identify the commands (actions) that trigger each event
  5. Identify the actors who issue those commands
  6. Discuss potential services and their responsibilities

Activity 2: Implement a Simple Chat System

Build a simple chat system using Node.js and RabbitMQ:

  1. Create a chat service that publishes messages to a topic exchange
  2. Create a user service that manages user presence
  3. Create a notification service that sends emails for mentions
  4. Create a history service that stores all messages
  5. Define the events that flow between these services

Activity 3: Add Event Sourcing

Extend one of the services from our e-commerce example to use event sourcing:

  1. Create an event store (can be a simple array or database)
  2. Modify the service to store events rather than current state
  3. Implement a projection that rebuilds state from events
  4. Add the ability to replay events from any point in time

Summary

Event-driven architecture provides a powerful approach to building distributed systems:

By embracing event-driven architecture, you can build systems that are more resilient, scalable, and adaptable to change—essential qualities for modern applications.

Coming Up Next

In our next lecture, we'll explore WebAssembly (WASM) and how it can be used to enhance web applications with near-native performance. We'll learn about WASM concepts, tools for working with it, and practical use cases in web development.

Additional Resources