Introduction to Event-Driven Architecture
In our previous lectures, we explored message queue concepts and RabbitMQ implementation. Today, we'll dive into event-driven architecture (EDA), a powerful design paradigm that leverages message queues to create responsive, scalable, and loosely coupled systems.
Event-driven architecture represents a significant shift from traditional request-response models. Instead of services directly calling each other and waiting for responses, services communicate by producing and consuming events, creating systems that are more resilient, scalable, and adaptable to change.
Core Concepts of Event-Driven Architecture
What is an Event?
An event is a significant change in state or a notable occurrence within a system. Events are immutable records of something that has happened in the past.
Examples of Events:
- Business Events: OrderPlaced, PaymentReceived, ShipmentDelivered
- System Events: UserLoggedIn, DatabaseConnectionLost, CPUThresholdExceeded
- User Interface Events: ButtonClicked, FormSubmitted, PageViewed
Key Components
A typical event-driven architecture consists of several key components:
graph LR
A[Event Producer] -->|publishes event| B[Event Channel]
B -->|delivers event| C[Event Consumer]
B -->|delivers event| D[Event Consumer]
B -->|delivers event| E[Event Consumer]
style B fill:#f9f9f9,stroke:#333,stroke-width:2px
- Event Producers: Systems or components that generate events when something notable happens
- Event Channels: Communication infrastructure (message queues, event buses) that transport events
- Event Consumers: Systems or components that listen for and react to events
- Event Processors: Components that analyze, transform, or enrich events
- Event Stores: Databases optimized for storing and retrieving events (event sourcing)
Analogy: The Newspaper System
Think of event-driven architecture like a newspaper distribution system:
- Reporters (Event Producers): Discover and write about newsworthy events
- Newspaper Company (Event Channel): Compiles, prints, and distributes the news
- Readers (Event Consumers): Receive the newspaper and react to the news in different ways
- Archives (Event Store): Maintain a historical record of all published news
Just as different readers may react differently to the same news story, different event consumers may take different actions based on the same event. And just as yesterday's newspaper becomes a historical record, events in a system represent an immutable history of what has occurred.
Benefits of Event-Driven Architecture
Loose Coupling
Event producers and consumers are decoupled, allowing them to evolve independently. Producers don't need to know who consumes their events, and consumers don't need to know who produced them.
graph TD
subgraph "Tightly Coupled Architecture"
A1[Service A] -->|direct call| B1[Service B]
A1 -->|direct call| C1[Service C]
A1 -->|direct call| D1[Service D]
end
subgraph "Event-Driven Architecture"
A2[Service A] -->|publishes event| E[Event Channel]
E -->|consumes event| B2[Service B]
E -->|consumes event| C2[Service C]
E -->|consumes event| D2[Service D]
end
style E fill:#f9f9f9,stroke:#333,stroke-width:2px
In a tightly coupled system, Service A needs to know about and directly call each downstream service. If a new service is added, Service A must be modified. In an event-driven system, Service A simply publishes events, and any number of services can consume those events without Service A knowing about them.
Scalability and Resilience
Event-driven systems can scale more efficiently because:
- Components can scale independently based on their specific load
- Event processing can be parallelized across multiple consumers
- Event queues act as buffers during traffic spikes
- Failures in one component don't necessarily affect others
Flexibility and Extensibility
Adding new functionality often requires only adding new event consumers, without modifying existing components. This makes the system more adaptable to changing requirements.
Real-time Responsiveness
Events can trigger immediate reactions, enabling real-time updates, notifications, and responsive user experiences.
Audit Trail and Time Travel
When events are stored (event sourcing), they provide a complete history of all changes in the system, enabling powerful auditing, debugging, and even "time travel" capabilities.
Common Event-Driven Patterns
Event Notification
The simplest pattern: a system notifies other systems that something has happened, without including detailed information about what happened.
graph LR
A[Order Service] -->|"OrderCreated(orderId: 123)"| B[Event Channel]
B -->|notification| C[Email Service]
B -->|notification| D[Analytics Service]
style B fill:#f9f9f9,stroke:#333,stroke-width:2px
In this pattern, consumers usually need to request additional information from the source system or a database to take meaningful action.
Event-Carried State Transfer
Events contain sufficient information (state) for consumers to process them without needing to request additional data.
graph LR
A[Order Service] -->|"OrderCreated(orderId: 123, customer: {name, email}, items: [...])"| B[Event Channel]
B -->|complete data| C[Email Service]
B -->|complete data| D[Analytics Service]
style B fill:#f9f9f9,stroke:#333,stroke-width:2px
This pattern reduces the coupling between services by eliminating the need for additional API calls.
Event Sourcing
Instead of storing the current state of entities, the system stores all events that led to that state. The current state can be reconstructed by replaying these events.
graph TD
A[User Actions] -->|generate| B[Events]
B -->|stored in| C[Event Store]
C -->|replay events| D[Current State]
C -->|replay events| E[Audit Trail]
C -->|replay events| F[Analytics]
style C fill:#f9f9f9,stroke:#333,stroke-width:2px
Event sourcing provides a complete audit trail and enables powerful time-based queries and analytics.
Command Query Responsibility Segregation (CQRS)
Separates read and write operations, potentially using different models, databases, and even programming languages for each.
graph TD
A[User] -->|Commands| B[Command Handler]
A -->|Queries| C[Query Handler]
B -->|writes events| D[Event Store]
D -->|updates| E[Read Model]
C -->|reads from| E
style D fill:#f9f9f9,stroke:#333,stroke-width:2px
CQRS is often combined with event sourcing, where commands generate events that update the read model.
Saga Pattern
Manages distributed transactions across multiple services using a sequence of events and compensating actions.
graph LR
A[Order Service] -->|OrderCreated| B[Payment Service]
B -->|PaymentSucceeded| C[Inventory Service]
C -->|ItemsReserved| D[Shipping Service]
D -->|OrderShipped| E[Notification Service]
B -.->|PaymentFailed| F[Order Cancellation]
C -.->|InsufficientStock| F
F -.->|compensating action| B
style F fill:#ffdddd,stroke:#ff0000,stroke-width:1px
The saga pattern helps maintain data consistency across services without requiring distributed transactions.
Designing Good Events
Event Naming Conventions
Events should be named in past tense, as they represent something that has already happened:
- Good: UserRegistered, OrderPlaced, PaymentProcessed
- Avoid: RegisterUser, PlaceOrder, ProcessPayment (these sound like commands)
Event Structure
Well-designed events typically include:
Example Event Structure
{
"eventId": "e7f45ce3-2c3b-4af4-8ce0-843a3e58a3f9", // Unique identifier
"eventType": "OrderPlaced", // Type of event
"timestamp": "2025-05-05T14:30:00Z", // When it happened
"producer": "order-service", // Who produced it
"version": "1.0", // Schema version
"correlationId": "c9b5f789-3a5b-46f3-8dd4-40f12d41f202", // For tracing
"data": { // Event payload
"orderId": "ORD-12345",
"customerId": "CUST-6789",
"items": [
{ "productId": "PROD-101", "quantity": 2, "price": 25.99 },
{ "productId": "PROD-205", "quantity": 1, "price": 59.99 }
],
"totalAmount": 111.97,
"shippingAddress": {
"street": "123 Main St",
"city": "Boston",
"state": "MA",
"zipCode": "02108"
}
}
}
Event Schema Evolution
As your system evolves, event schemas will need to change. Consider these strategies:
- Backward Compatibility: New consumers can understand old events
- Forward Compatibility: Old consumers can understand new events
- Versioning: Include a version field in your events
- Schema Registry: Use a schema registry (like Confluent Schema Registry) to manage schema evolution
Event Size Considerations
Consider what data to include in events:
- Minimal Events: Include only identifiers, requiring consumers to look up additional data
- Rich Events: Include all relevant data, reducing the need for lookups but increasing message size
- Hybrid Approach: Include essential data and references to larger objects
Implementing Event-Driven Architecture with Node.js and RabbitMQ
Let's implement a simple e-commerce system using event-driven architecture with Node.js and RabbitMQ.
Project Structure
event-driven-demo/
├── common/
│ ├── events.js # Event definitions
│ └── rabbit-client.js # Shared RabbitMQ client
├── services/
│ ├── order-service/ # Handles orders, publishes OrderPlaced events
│ ├── payment-service/ # Processes payments, publishes PaymentProcessed events
│ ├── inventory-service/ # Updates inventory, publishes InventoryUpdated events
│ └── notification-service/ # Sends emails, SMS based on various events
└── package.json
Shared RabbitMQ Client
common/rabbit-client.js
const amqp = require('amqplib');
class RabbitClient {
constructor() {
this.connection = null;
this.channel = null;
this.EXCHANGE_NAME = 'events';
this.EXCHANGE_TYPE = 'topic';
}
async connect() {
try {
this.connection = await amqp.connect('amqp://localhost');
this.channel = await this.connection.createChannel();
// Declare the events exchange
await this.channel.assertExchange(this.EXCHANGE_NAME, this.EXCHANGE_TYPE, {
durable: true
});
console.log('Connected to RabbitMQ');
return this.channel;
} catch (error) {
console.error('Error connecting to RabbitMQ:', error);
throw error;
}
}
async publishEvent(eventType, data) {
if (!this.channel) {
await this.connect();
}
const event = {
eventId: generateUUID(),
eventType,
timestamp: new Date().toISOString(),
producer: process.env.SERVICE_NAME || 'unknown-service',
version: '1.0',
correlationId: data.correlationId || generateUUID(),
data
};
const routingKey = eventType.toLowerCase().replace(/([A-Z])/g, '.$1').toLowerCase();
const success = this.channel.publish(
this.EXCHANGE_NAME,
routingKey,
Buffer.from(JSON.stringify(event)),
{ persistent: true }
);
console.log(`Published event ${eventType} with routing key ${routingKey}`);
return success;
}
async subscribeToEvent(eventType, handler) {
if (!this.channel) {
await this.connect();
}
// Create a queue for this service
const serviceName = process.env.SERVICE_NAME || 'unknown-service';
const queueName = `${serviceName}.${eventType.toLowerCase()}`;
await this.channel.assertQueue(queueName, {
durable: true
});
// Convert camelCase to dot.notation for routing key
const routingKey = eventType.replace(/([A-Z])/g, '.$1').toLowerCase().substring(1);
// Bind queue to the exchange with the routing key
await this.channel.bindQueue(queueName, this.EXCHANGE_NAME, routingKey);
// Consume messages
this.channel.consume(queueName, async (msg) => {
if (msg !== null) {
try {
const eventData = JSON.parse(msg.content.toString());
console.log(`Received ${eventType} event`);
// Call the handler with the event data
await handler(eventData);
// Acknowledge the message
this.channel.ack(msg);
} catch (error) {
console.error(`Error processing ${eventType} event:`, error);
// Reject the message and requeue
this.channel.nack(msg, false, false);
}
}
});
console.log(`Subscribed to ${eventType} events`);
}
async close() {
if (this.channel) {
await this.channel.close();
}
if (this.connection) {
await this.connection.close();
}
}
}
// Helper function to generate UUIDs
function generateUUID() {
return 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, function(c) {
const r = Math.random() * 16 | 0, v = c == 'x' ? r : (r & 0x3 | 0x8);
return v.toString(16);
});
}
module.exports = new RabbitClient();
Event Definitions
common/events.js
// Define event types as constants
const EventTypes = {
// Order events
ORDER_PLACED: 'OrderPlaced',
ORDER_UPDATED: 'OrderUpdated',
ORDER_CANCELLED: 'OrderCancelled',
// Payment events
PAYMENT_PROCESSED: 'PaymentProcessed',
PAYMENT_FAILED: 'PaymentFailed',
// Inventory events
INVENTORY_UPDATED: 'InventoryUpdated',
INVENTORY_RESERVED: 'InventoryReserved',
INVENTORY_RELEASED: 'InventoryReleased',
PRODUCT_OUT_OF_STOCK: 'ProductOutOfStock',
// Notification events
NOTIFICATION_SENT: 'NotificationSent',
NOTIFICATION_FAILED: 'NotificationFailed'
};
module.exports = {
EventTypes
};
Order Service Implementation
services/order-service/index.js
const express = require('express');
const bodyParser = require('body-parser');
const rabbitClient = require('../../common/rabbit-client');
const { EventTypes } = require('../../common/events');
// Set service name
process.env.SERVICE_NAME = 'order-service';
const app = express();
app.use(bodyParser.json());
// In-memory store for orders (would be a database in a real app)
const orders = {};
// Create a new order
app.post('/orders', async (req, res) => {
try {
const { customerId, items } = req.body;
if (!customerId || !items || !Array.isArray(items) || items.length === 0) {
return res.status(400).json({ error: 'Invalid order data' });
}
// Create order
const orderId = `ORD-${Date.now()}`;
const totalAmount = items.reduce((sum, item) => sum + (item.price * item.quantity), 0);
const order = {
orderId,
customerId,
items,
totalAmount,
status: 'CREATED',
createdAt: new Date().toISOString()
};
// Save order
orders[orderId] = order;
// Publish OrderPlaced event
await rabbitClient.publishEvent(EventTypes.ORDER_PLACED, order);
res.status(201).json({
message: 'Order created successfully',
orderId,
status: order.status
});
} catch (error) {
console.error('Error creating order:', error);
res.status(500).json({ error: 'Failed to create order' });
}
});
// Get order by ID
app.get('/orders/:orderId', (req, res) => {
const { orderId } = req.params;
const order = orders[orderId];
if (!order) {
return res.status(404).json({ error: 'Order not found' });
}
res.json(order);
});
// Listen for PaymentProcessed events
rabbitClient.connect().then(() => {
rabbitClient.subscribeToEvent(EventTypes.PAYMENT_PROCESSED, async (event) => {
const { orderId, status } = event.data;
if (orders[orderId]) {
// Update order status
orders[orderId].status = status === 'SUCCESS' ? 'PAID' : 'PAYMENT_FAILED';
console.log(`Updated order ${orderId} status to ${orders[orderId].status}`);
// Publish OrderUpdated event
await rabbitClient.publishEvent(EventTypes.ORDER_UPDATED, {
orderId,
status: orders[orderId].status,
updatedAt: new Date().toISOString()
});
}
});
// Also listen for inventory events to update order status
rabbitClient.subscribeToEvent(EventTypes.INVENTORY_RESERVED, async (event) => {
const { orderId } = event.data;
if (orders[orderId] && orders[orderId].status === 'PAID') {
orders[orderId].status = 'READY_FOR_SHIPMENT';
console.log(`Updated order ${orderId} status to READY_FOR_SHIPMENT`);
await rabbitClient.publishEvent(EventTypes.ORDER_UPDATED, {
orderId,
status: orders[orderId].status,
updatedAt: new Date().toISOString()
});
}
});
rabbitClient.subscribeToEvent(EventTypes.PRODUCT_OUT_OF_STOCK, async (event) => {
const { orderId, productId } = event.data;
if (orders[orderId]) {
orders[orderId].status = 'INVENTORY_ISSUE';
console.log(`Updated order ${orderId} status due to inventory issue with product ${productId}`);
await rabbitClient.publishEvent(EventTypes.ORDER_UPDATED, {
orderId,
status: orders[orderId].status,
productId,
updatedAt: new Date().toISOString()
});
}
});
});
// Start server
const PORT = process.env.PORT || 3001;
app.listen(PORT, () => {
console.log(`Order service listening on port ${PORT}`);
});
Payment Service Implementation
services/payment-service/index.js
const rabbitClient = require('../../common/rabbit-client');
const { EventTypes } = require('../../common/events');
// Set service name
process.env.SERVICE_NAME = 'payment-service';
// Connect to RabbitMQ
rabbitClient.connect().then(() => {
// Subscribe to OrderPlaced events
rabbitClient.subscribeToEvent(EventTypes.ORDER_PLACED, async (event) => {
const order = event.data;
console.log(`Processing payment for order ${order.orderId}`);
try {
// Simulate payment processing
const paymentSuccess = Math.random() > 0.2; // 80% chance of success
const processingTime = Math.floor(Math.random() * 2000) + 1000; // 1-3 seconds
await new Promise(resolve => setTimeout(resolve, processingTime));
// Payment result
const paymentResult = {
orderId: order.orderId,
customerId: order.customerId,
amount: order.totalAmount,
status: paymentSuccess ? 'SUCCESS' : 'FAILED',
transactionId: `TXN-${Date.now()}`,
processedAt: new Date().toISOString()
};
// Publish appropriate event based on payment result
if (paymentSuccess) {
await rabbitClient.publishEvent(EventTypes.PAYMENT_PROCESSED, paymentResult);
console.log(`Payment successful for order ${order.orderId}`);
} else {
await rabbitClient.publishEvent(EventTypes.PAYMENT_FAILED, paymentResult);
console.log(`Payment failed for order ${order.orderId}`);
}
} catch (error) {
console.error(`Error processing payment for order ${order.orderId}:`, error);
// Publish failure event
await rabbitClient.publishEvent(EventTypes.PAYMENT_FAILED, {
orderId: order.orderId,
customerId: order.customerId,
amount: order.totalAmount,
status: 'ERROR',
error: error.message,
processedAt: new Date().toISOString()
});
}
});
console.log('Payment service started');
});
Inventory Service Implementation
services/inventory-service/index.js
const rabbitClient = require('../../common/rabbit-client');
const { EventTypes } = require('../../common/events');
// Set service name
process.env.SERVICE_NAME = 'inventory-service';
// Simulate inventory database
const inventory = {
'PROD-101': { name: 'Wireless Headphones', quantity: 50 },
'PROD-102': { name: 'Smartphone Case', quantity: 100 },
'PROD-103': { name: 'USB-C Cable', quantity: 150 },
'PROD-104': { name: 'Bluetooth Speaker', quantity: 30 },
'PROD-105': { name: 'Smart Watch', quantity: 25 }
};
// Connect to RabbitMQ
rabbitClient.connect().then(() => {
// Subscribe to PaymentProcessed events
rabbitClient.subscribeToEvent(EventTypes.PAYMENT_PROCESSED, async (event) => {
const { orderId, status } = event.data;
if (status !== 'SUCCESS') {
console.log(`Skipping inventory check for order ${orderId} due to payment status ${status}`);
return;
}
console.log(`Checking inventory for order ${orderId}`);
// Get order details (in a real app, this would be a database query)
// Here we're simulating by making a request to the order service API
try {
const orderResponse = await fetch(`http://localhost:3001/orders/${orderId}`);
if (!orderResponse.ok) {
throw new Error(`Failed to get order details: ${orderResponse.statusText}`);
}
const order = await orderResponse.json();
let allItemsAvailable = true;
let outOfStockItem = null;
// Check inventory for each item
for (const item of order.items) {
const { productId, quantity } = item;
if (!inventory[productId] || inventory[productId].quantity < quantity) {
allItemsAvailable = false;
outOfStockItem = productId;
break;
}
}
if (allItemsAvailable) {
// Update inventory
for (const item of order.items) {
const { productId, quantity } = item;
inventory[productId].quantity -= quantity;
// Publish inventory update event
await rabbitClient.publishEvent(EventTypes.INVENTORY_UPDATED, {
productId,
newQuantity: inventory[productId].quantity,
updatedAt: new Date().toISOString()
});
}
// Publish inventory reserved event
await rabbitClient.publishEvent(EventTypes.INVENTORY_RESERVED, {
orderId,
items: order.items,
reservedAt: new Date().toISOString()
});
console.log(`Inventory reserved for order ${orderId}`);
} else {
// Publish out of stock event
await rabbitClient.publishEvent(EventTypes.PRODUCT_OUT_OF_STOCK, {
orderId,
productId: outOfStockItem,
notifiedAt: new Date().toISOString()
});
console.log(`Product ${outOfStockItem} is out of stock for order ${orderId}`);
}
} catch (error) {
console.error(`Error processing inventory for order ${orderId}:`, error);
}
});
// Subscribe to OrderCancelled events to release inventory
rabbitClient.subscribeToEvent(EventTypes.ORDER_CANCELLED, async (event) => {
const { orderId, items } = event.data;
if (!items || !Array.isArray(items)) {
console.log(`No items to release for cancelled order ${orderId}`);
return;
}
// Release inventory
for (const item of items) {
const { productId, quantity } = item;
if (inventory[productId]) {
inventory[productId].quantity += quantity;
// Publish inventory update event
await rabbitClient.publishEvent(EventTypes.INVENTORY_UPDATED, {
productId,
newQuantity: inventory[productId].quantity,
updatedAt: new Date().toISOString()
});
}
}
// Publish inventory released event
await rabbitClient.publishEvent(EventTypes.INVENTORY_RELEASED, {
orderId,
items,
releasedAt: new Date().toISOString()
});
console.log(`Inventory released for cancelled order ${orderId}`);
});
console.log('Inventory service started');
});
Notification Service Implementation
services/notification-service/index.js
const rabbitClient = require('../../common/rabbit-client');
const { EventTypes } = require('../../common/events');
// Set service name
process.env.SERVICE_NAME = 'notification-service';
// Simulated notification functions
const notificationMethods = {
sendEmail: async (recipient, subject, content) => {
console.log(`EMAIL to ${recipient}: ${subject}`);
console.log(`Content: ${content}`);
return { success: true, method: 'email' };
},
sendSMS: async (phoneNumber, message) => {
console.log(`SMS to ${phoneNumber}: ${message}`);
return { success: true, method: 'sms' };
},
sendPushNotification: async (userId, title, body) => {
console.log(`PUSH NOTIFICATION to ${userId}: ${title}`);
console.log(`Body: ${body}`);
return { success: true, method: 'push' };
}
};
// Connect to RabbitMQ
rabbitClient.connect().then(() => {
// Subscribe to OrderPlaced events
rabbitClient.subscribeToEvent(EventTypes.ORDER_PLACED, async (event) => {
const order = event.data;
try {
// Simulate customer data lookup
const customer = {
id: order.customerId,
email: `customer-${order.customerId}@example.com`,
phone: '+1234567890'
};
// Send order confirmation email
await notificationMethods.sendEmail(
customer.email,
`Order Confirmation #${order.orderId}`,
`Thank you for your order! Your order #${order.orderId} has been received and is being processed.`
);
// Publish notification sent event
await rabbitClient.publishEvent(EventTypes.NOTIFICATION_SENT, {
notificationType: 'ORDER_CONFIRMATION',
recipient: customer.email,
orderId: order.orderId,
method: 'email',
sentAt: new Date().toISOString()
});
} catch (error) {
console.error(`Error sending order confirmation for ${order.orderId}:`, error);
// Publish notification failed event
await rabbitClient.publishEvent(EventTypes.NOTIFICATION_FAILED, {
notificationType: 'ORDER_CONFIRMATION',
orderId: order.orderId,
error: error.message,
failedAt: new Date().toISOString()
});
}
});
// Subscribe to PaymentProcessed events
rabbitClient.subscribeToEvent(EventTypes.PAYMENT_PROCESSED, async (event) => {
const payment = event.data;
try {
// Simulate customer data lookup
const customer = {
id: payment.customerId,
email: `customer-${payment.customerId}@example.com`,
phone: '+1234567890'
};
// Send payment confirmation email
await notificationMethods.sendEmail(
customer.email,
`Payment Confirmation for Order #${payment.orderId}`,
`Your payment of $${payment.amount.toFixed(2)} for order #${payment.orderId} has been processed successfully.`
);
// Publish notification sent event
await rabbitClient.publishEvent(EventTypes.NOTIFICATION_SENT, {
notificationType: 'PAYMENT_CONFIRMATION',
recipient: customer.email,
orderId: payment.orderId,
method: 'email',
sentAt: new Date().toISOString()
});
} catch (error) {
console.error(`Error sending payment confirmation for ${payment.orderId}:`, error);
// Publish notification failed event
await rabbitClient.publishEvent(EventTypes.NOTIFICATION_FAILED, {
notificationType: 'PAYMENT_CONFIRMATION',
orderId: payment.orderId,
error: error.message,
failedAt: new Date().toISOString()
});
}
});
// Subscribe to PaymentFailed events
rabbitClient.subscribeToEvent(EventTypes.PAYMENT_FAILED, async (event) => {
const payment = event.data;
try {
// Simulate customer data lookup
const customer = {
id: payment.customerId,
email: `customer-${payment.customerId}@example.com`,
phone: '+1234567890'
};
// Send payment failure email
await notificationMethods.sendEmail(
customer.email,
`Payment Failed for Order #${payment.orderId}`,
`We were unable to process your payment of $${payment.amount.toFixed(2)} for order #${payment.orderId}. Please update your payment information.`
);
// Also send SMS for urgent notification
await notificationMethods.sendSMS(
customer.phone,
`Payment failed for your recent order #${payment.orderId}. Please check your email for details.`
);
// Publish notification sent event
await rabbitClient.publishEvent(EventTypes.NOTIFICATION_SENT, {
notificationType: 'PAYMENT_FAILURE',
recipient: customer.email,
orderId: payment.orderId,
methods: ['email', 'sms'],
sentAt: new Date().toISOString()
});
} catch (error) {
console.error(`Error sending payment failure notification for ${payment.orderId}:`, error);
// Publish notification failed event
await rabbitClient.publishEvent(EventTypes.NOTIFICATION_FAILED, {
notificationType: 'PAYMENT_FAILURE',
orderId: payment.orderId,
error: error.message,
failedAt: new Date().toISOString()
});
}
});
// Subscribe to ProductOutOfStock events
rabbitClient.subscribeToEvent(EventTypes.PRODUCT_OUT_OF_STOCK, async (event) => {
const { orderId, productId } = event.data;
try {
// Notify store administrator
await notificationMethods.sendEmail(
'admin@store.com',
`Inventory Alert: Product Out of Stock`,
`Product ${productId} is out of stock and was requested in order #${orderId}. Please restock this item.`
);
// Publish notification sent event
await rabbitClient.publishEvent(EventTypes.NOTIFICATION_SENT, {
notificationType: 'INVENTORY_ALERT',
recipient: 'admin@store.com',
productId,
orderId,
method: 'email',
sentAt: new Date().toISOString()
});
} catch (error) {
console.error(`Error sending inventory alert for ${productId}:`, error);
// Publish notification failed event
await rabbitClient.publishEvent(EventTypes.NOTIFICATION_FAILED, {
notificationType: 'INVENTORY_ALERT',
productId,
orderId,
error: error.message,
failedAt: new Date().toISOString()
});
}
});
console.log('Notification service started');
});
Running the Services
To run this event-driven architecture example:
- Make sure RabbitMQ is running
- Install dependencies:
npm install express body-parser amqplib node-fetch - Start each service in a separate terminal:
node services/order-service/index.jsnode services/payment-service/index.jsnode services/inventory-service/index.jsnode services/notification-service/index.js
- Create a new order by sending a POST request to
http://localhost:3001/orderswith order data - Observe as events flow through the system, triggering various actions in each service
Visualizing Event Flow in the System
Let's visualize how events flow through our e-commerce system when a customer places an order:
sequenceDiagram
participant C as Client
participant OS as Order Service
participant PS as Payment Service
participant IS as Inventory Service
participant NS as Notification Service
C->>OS: POST /orders (Create Order)
OS->>OS: Save Order
OS->>+PS: Publish OrderPlaced
OS-->>C: Return OrderId
PS->>PS: Process Payment
PS->>+IS: Publish PaymentProcessed
PS->>+NS: Publish PaymentProcessed
NS->>NS: Send Payment Confirmation
NS->>NS: Publish NotificationSent
IS->>IS: Reserve Inventory
IS->>+OS: Publish InventoryReserved
OS->>OS: Update Order Status
OS->>+NS: Publish OrderUpdated
NS->>NS: Send Order Status Update
NS->>NS: Publish NotificationSent
This sequence diagram shows how a single user action (placing an order) triggers a cascade of events that flow through the system, with each service reacting to events and potentially publishing new events.
Challenges and Considerations
Event Ordering
In distributed systems, events might not always arrive in the order they were generated. Strategies to handle this include:
- Timestamps: Include timestamps in events and handle based on event time
- Sequence Numbers: Include sequence numbers for related events
- Idempotent Handlers: Design handlers to be idempotent so repeated processing is safe
- State-Based Approaches: Base decisions on current state, not just the received event
Event Versioning
As your system evolves, event schemas will change. Options for handling this include:
- Consumer-Driven Contracts: Design events based on consumer needs
- Schema Versioning: Include version in event metadata
- Schema Registry: Use a central registry for schema validation
- Backward/Forward Compatibility: Design schemas with compatibility in mind
Event Consistency
Ensuring consistency across services can be challenging:
- Eventual Consistency: Design for eventual consistency rather than immediate
- Saga Pattern: Use compensating transactions for failures
- Outbox Pattern: Store events in a local "outbox" table before publishing
- Event Sourcing: Derive state from an immutable event log
Debugging and Tracing
Debugging distributed event flows can be complex:
- Correlation IDs: Include IDs to trace requests across services
- Distributed Tracing: Use tools like Jaeger or Zipkin
- Centralized Logging: Aggregate logs from all services
- Event Visualization: Tools to visualize event flows
Monitoring Event-Driven Systems
Effective monitoring is crucial for event-driven systems. Key aspects to monitor include:
Key Metrics
- Event Rate: Events produced/consumed per second
- Queue Depth: Number of messages waiting to be processed
- Processing Time: Time taken to handle events
- Error Rate: Failed events and retries
- End-to-End Latency: Time from event creation to completion of all downstream processing
Monitoring Tools
- RabbitMQ Management UI: Built-in monitoring for RabbitMQ
- Prometheus + Grafana: For metrics collection and visualization
- ELK Stack: For log aggregation and analysis
- Distributed Tracing: Jaeger, Zipkin, or AWS X-Ray
Alerting Strategies
Set up alerts for conditions like:
- Queue depth exceeding thresholds
- Dead letter queue receiving messages
- Processing time spikes
- High error rates
- Service health issues
Real-World Applications
Event-driven architecture is used in many domains and industries:
Financial Services
- Trading Platforms: Process market events and execute trades
- Payment Systems: Handle transaction events across multiple systems
- Fraud Detection: React to suspicious activity events in real-time
E-commerce
- Order Processing: Coordinate order fulfillment across services
- Inventory Management: Keep inventory updated across channels
- Recommendation Systems: React to user behavior events
IoT (Internet of Things)
- Sensor Networks: Process events from distributed sensors
- Smart Homes: Coordinate devices based on events
- Industrial Automation: React to machine events
Social Media
- News Feeds: Update feeds based on activity events
- Notifications: Trigger notifications from various events
- Analytics: Process user interaction events
Practice Activities
Activity 1: Event Storming
Event storming is a workshop format for exploring complex business domains. In small groups:
- Choose a domain (e.g., food delivery, car rental, etc.)
- Identify all significant events that can happen in that domain
- Arrange them in chronological order
- Identify the commands (actions) that trigger each event
- Identify the actors who issue those commands
- Discuss potential services and their responsibilities
Activity 2: Implement a Simple Chat System
Build a simple chat system using Node.js and RabbitMQ:
- Create a chat service that publishes messages to a topic exchange
- Create a user service that manages user presence
- Create a notification service that sends emails for mentions
- Create a history service that stores all messages
- Define the events that flow between these services
Activity 3: Add Event Sourcing
Extend one of the services from our e-commerce example to use event sourcing:
- Create an event store (can be a simple array or database)
- Modify the service to store events rather than current state
- Implement a projection that rebuilds state from events
- Add the ability to replay events from any point in time
Summary
Event-driven architecture provides a powerful approach to building distributed systems:
- Events represent significant changes or occurrences in a system
- EDA decouples components, enhancing maintainability and scalability
- Common patterns include event notification, event-carried state transfer, event sourcing, and CQRS
- Message queues like RabbitMQ provide the infrastructure for reliable event distribution
- Implementing EDA involves designing good events, handling event flow, and addressing challenges like ordering and consistency
- Event-driven systems are widespread in domains like finance, e-commerce, IoT, and social media
By embracing event-driven architecture, you can build systems that are more resilient, scalable, and adaptable to change—essential qualities for modern applications.