Queue Systems in Web Development

Week 10: Full Stack Application Development - Friday Lecture

Introduction to Queue Systems

Queue systems are fundamental components in modern web applications, serving as the backbone for handling asynchronous tasks, managing workloads, and ensuring reliability in distributed systems.

Think of a queue system like a busy restaurant. When customers (tasks) arrive, they don't immediately get served (processed). Instead, they join a queue, and waiters (workers) take orders (process tasks) one by one. This ensures the kitchen (server) isn't overwhelmed and every customer gets served eventually.

graph LR A[Web Application] -->|Tasks| B[Queue] B -->|Processing| C[Workers] C -->|Results| D[Database/Storage] style A fill:#f9d5e5,stroke:#333,stroke-width:2px style B fill:#eeeeee,stroke:#333,stroke-width:2px style C fill:#c3e5e7,stroke:#333,stroke-width:2px style D fill:#d5f5e3,stroke:#333,stroke-width:2px

Why We Need Queue Systems

In web development, several scenarios demand queue systems:

Real-World Example: E-commerce Order Processing

When a customer places an order on an e-commerce site, several things need to happen:

  1. Payment processing
  2. Inventory updates
  3. Order confirmation emails
  4. Shipping label generation
  5. Analytics updates

Without a queue system, the customer would have to wait for all these operations to complete before getting a response. With queues, the application can quickly acknowledge the order and process these tasks in the background.

sequenceDiagram participant C as Customer participant A as Web App participant Q as Queue participant W as Workers C->>A: Place Order A->>Q: Enqueue Payment Processing A->>Q: Enqueue Inventory Update A->>Q: Enqueue Email Sending A->>C: Order Confirmation Q->>W: Process Payment Q->>W: Update Inventory Q->>W: Send Email

Core Concepts of Queue Systems

Basic Terminology

Queue Properties

flowchart TB subgraph "Queue System" Q[Queue] DLQ[Dead Letter Queue] end P[Producer] -->|Send Message| Q Q -->|Process| W[Worker] W -->|Success| C[Completed] W -->|Failure| R{Retry?} R -->|Yes| Q R -->|No, Max Retries Exceeded| DLQ

Popular Queue Systems

RabbitMQ

One of the most widely used message brokers, implementing the Advanced Message Queuing Protocol (AMQP).

Redis-based Solutions

Redis can be used as a lightweight queue with libraries like Bull, Bee-Queue, or Kue.

Amazon SQS

Simple Queue Service, a fully managed message queuing service from AWS.

Other Options

Implementing a Queue System with Bull in Node.js

Setting Up Bull

Bull is a Redis-based queue system for Node.js with excellent features and reliability.

// Install required packages
// npm install bull redis

Basic queue setup:

// Create a queue
const Queue = require('bull');
const imageProcessingQueue = new Queue('image-processing', {
  redis: {
    host: 'localhost',
    port: 6379
  }
});

Adding Jobs to the Queue

// Inside an Express route handler
app.post('/api/images', async (req, res) => {
  try {
    // Save the uploaded image
    const imagePath = '/uploads/' + req.file.filename;
    
    // Add job to the queue
    await imageProcessingQueue.add({
      imagePath,
      userId: req.user.id,
      effects: req.body.effects
    }, {
      attempts: 3,          // Retry up to 3 times
      backoff: 10000,       // Wait 10 seconds between retries
      removeOnComplete: true // Remove job from queue when complete
    });
    
    res.status(202).json({ message: 'Image processing started' });
  } catch (error) {
    res.status(500).json({ error: error.message });
  }
});

Processing Jobs with Workers

Workers should be in a separate file or process:

// worker.js
const Queue = require('bull');
const imageProcessingQueue = new Queue('image-processing', {
  redis: {
    host: 'localhost',
    port: 6379
  }
});

// Process jobs
imageProcessingQueue.process(async (job) => {
  const { imagePath, userId, effects } = job.data;
  
  // Update progress
  job.progress(10);
  
  // Process image
  await applyEffects(imagePath, effects);
  job.progress(50);
  
  // Resize for different formats
  await createThumbnails(imagePath);
  job.progress(75);
  
  // Update user's gallery
  await updateUserGallery(userId, imagePath);
  job.progress(100);
  
  return { processedImage: imagePath.replace('/uploads/', '/processed/') };
});

// Handle events
imageProcessingQueue.on('completed', (job, result) => {
  console.log(`Job ${job.id} completed! Result:`, result);
});

imageProcessingQueue.on('failed', (job, err) => {
  console.error(`Job ${job.id} failed with error:`, err);
});

Advanced Bull Features

Scheduled Jobs

// Run job after 5 minutes
await emailQueue.add(
  { user: 'user@example.com', template: 'reminder' },
  { delay: 5 * 60 * 1000 }
);

// Recurring jobs (cron-like)
await reportQueue.add(
  { report: 'weekly-sales' },
  { 
    repeat: { 
      cron: '0 0 * * 1' // Every Monday at midnight
    } 
  }
);

Rate Limiting

const apiCallQueue = new Queue('api-calls', {
  redis: { host: 'localhost', port: 6379 },
  limiter: {
    max: 10,         // Maximum 10 jobs
    duration: 60000  // per minute
  }
});

Prioritization

// Higher priority (lower number)
await taskQueue.add('critical-task', data, { priority: 1 });

// Lower priority (higher number)
await taskQueue.add('low-priority-task', data, { priority: 10 });

Concurrency Control

// Process up to 5 jobs simultaneously
taskQueue.process(5, async (job) => {
  // Job processing logic
});

Queue Dashboard

Bull has a visual dashboard available through the "bull-board" package:

// npm install bull-board

const { createBullBoard } = require('@bull-board/api');
const { BullAdapter } = require('@bull-board/api/bullAdapter');
const { ExpressAdapter } = require('@bull-board/express');

// Setup in Express app
const serverAdapter = new ExpressAdapter();
const bullBoard = createBullBoard({
  queues: [new BullAdapter(imageProcessingQueue)],
  serverAdapter
});

serverAdapter.setBasePath('/admin/queues');
app.use('/admin/queues', serverAdapter.getRouter());

Best Practices for Queue Systems

Designing Job Payloads

Error Handling Strategies

// Proper error handling
myQueue.process(async (job) => {
  try {
    // Process job
    return result;
  } catch (error) {
    console.error(`Error processing job ${job.id}:`, error);
    
    // Decide whether to retry
    if (isTransientError(error)) {
      // Will be retried automatically
      throw error;
    } else {
      // Add to a cleanup queue instead of retrying
      await cleanupQueue.add({ jobId: job.id, error: error.message });
      return { status: 'failed-permanently' };
    }
  }
});

Worker Process Management

// ecosystem.config.js for PM2
module.exports = {
  apps: [{
    name: 'web-server',
    script: 'server.js',
    instances: 2,
    exec_mode: 'cluster'
  }, {
    name: 'queue-worker',
    script: 'worker.js',
    instances: 4,
    exec_mode: 'fork'
  }]
};

Real-World Use Cases

Email Delivery System

Managing email delivery to avoid overwhelming mail servers and handle temporary failures:

// Send transactional emails through a queue
const emailQueue = new Queue('transactional-emails');

// Add an email to the queue
await emailQueue.add({
  recipient: 'user@example.com',
  template: 'order-confirmation',
  data: { orderId: '12345', items: [...] }
}, {
  attempts: 5,
  backoff: {
    type: 'exponential',
    delay: 60000 // Start with 1 minute, then 2, 4, 8...
  }
});

Report Generation

Handling resource-intensive report generation without blocking the main application:

// Queue for generating PDF reports
app.post('/api/reports/generate', async (req, res) => {
  const reportJob = await reportQueue.add({
    type: req.body.reportType,
    parameters: req.body.parameters,
    userId: req.user.id
  });
  
  res.json({ 
    message: 'Report generation started',
    jobId: reportJob.id,
    statusUrl: `/api/reports/status/${reportJob.id}`
  });
});

User Notification System

Coordinating multi-channel notifications without delaying user interactions:

// Function to notify user across multiple channels
async function notifyUser(userId, event, data) {
  const user = await User.findById(userId);
  
  // Queue multiple notification types
  await Promise.all([
    // Push notification
    user.pushEnabled && notificationQueue.add('push', { 
      userId, title: getTitle(event), body: getMessage(event, data),
      deviceTokens: user.deviceTokens
    }),
    
    // Email notification
    user.emailNotifications && emailQueue.add({
      recipient: user.email,
      template: `${event}-notification`,
      data
    }),
    
    // In-app notification
    inAppQueue.add({ userId, type: event, data, read: false })
  ]);
}

Monitoring and Debugging Queue Systems

Key Metrics to Monitor

Tools for Monitoring

Common Issues and Solutions

Issue Possible Causes Solutions
Growing queue size Not enough workers, slow processing Add more workers, optimize job processing, add more resources
High failure rate Bugs in processing logic, external service failures Fix bugs, improve error handling, implement circuit breakers
Redis memory issues Too many jobs in memory, not cleaning completed jobs Set proper removeOnComplete/removeOnFail options, use Redis persistence
Duplicate job processing Network issues, worker crashes Implement idempotent job processing, use job IDs for deduplication

Scaling Queue Systems

Vertical Scaling

Increase resources (CPU, memory) for Redis and worker processes.

Horizontal Scaling

Architecture Patterns for Scale

graph TB A[Web Servers] -->|Produce Jobs| B[Redis Cluster] B --> C[Worker Pool 1] B --> D[Worker Pool 2] B --> E[Worker Pool 3] subgraph "Worker Pool 1" W1[Worker] W2[Worker] W3[Worker] end subgraph "Worker Pool 2" W4[Worker] W5[Worker] end subgraph "Worker Pool 3" W6[Worker] W7[Worker] end

Practical Exercise: Building a Thumbnail Generator

Let's create a simple application that uses Bull to process image uploads and generate thumbnails.

Project Structure

thumbnail-generator/
├── server.js          # Express server
├── worker.js          # Bull worker
├── queues/
│   └── imageQueue.js  # Queue configuration
├── public/
│   ├── index.html     # Upload form
│   └── css/
│       └── style.css
└── uploads/           # Original images
└── processed/         # Generated thumbnails

Queue Configuration

// queues/imageQueue.js
const Queue = require('bull');

const imageQueue = new Queue('image-processing', {
  redis: {
    host: process.env.REDIS_HOST || 'localhost',
    port: process.env.REDIS_PORT || 6379,
  }
});

module.exports = imageQueue;

Express Server

// server.js
const express = require('express');
const multer = require('multer');
const path = require('path');
const imageQueue = require('./queues/imageQueue');
const { createBullBoard } = require('@bull-board/api');
const { BullAdapter } = require('@bull-board/api/bullAdapter');
const { ExpressAdapter } = require('@bull-board/express');

const app = express();
const port = process.env.PORT || 3000;

// Set up multer for file uploads
const storage = multer.diskStorage({
  destination: (req, file, cb) => {
    cb(null, 'uploads/');
  },
  filename: (req, file, cb) => {
    cb(null, Date.now() + path.extname(file.originalname));
  }
});
const upload = multer({ storage });

// Serve static files
app.use(express.static('public'));
app.use('/uploads', express.static('uploads'));
app.use('/processed', express.static('processed'));

// Set up Bull Board
const serverAdapter = new ExpressAdapter();
createBullBoard({
  queues: [new BullAdapter(imageQueue)],
  serverAdapter
});
serverAdapter.setBasePath('/admin/queues');
app.use('/admin/queues', serverAdapter.getRouter());

// Upload endpoint
app.post('/upload', upload.single('image'), async (req, res) => {
  if (!req.file) {
    return res.status(400).json({ error: 'No file uploaded' });
  }

  const job = await imageQueue.add({
    filepath: req.file.path,
    filename: req.file.filename,
    sizes: [
      { width: 100, height: 100, suffix: 'thumb' },
      { width: 300, height: 300, suffix: 'medium' },
      { width: 600, height: 600, suffix: 'large' }
    ]
  });

  res.json({
    message: 'Image uploaded and processing started',
    job: {
      id: job.id,
      status: 'processing'
    }
  });
});

// Job status endpoint
app.get('/status/:jobId', async (req, res) => {
  const job = await imageQueue.getJob(req.params.jobId);
  
  if (!job) {
    return res.status(404).json({ error: 'Job not found' });
  }
  
  const state = await job.getState();
  const progress = job._progress;
  const result = job.returnvalue;
  
  res.json({ id: job.id, state, progress, result });
});

app.listen(port, () => {
  console.log(`Server running on port ${port}`);
  console.log(`Bull Board available at http://localhost:${port}/admin/queues`);
});

Worker Process

// worker.js
const imageQueue = require('./queues/imageQueue');
const sharp = require('sharp');
const path = require('path');
const fs = require('fs');

// Make sure directories exist
['uploads', 'processed'].forEach(dir => {
  if (!fs.existsSync(dir)) {
    fs.mkdirSync(dir);
  }
});

// Process images
imageQueue.process(async (job) => {
  const { filepath, filename, sizes } = job.data;
  const fileBasename = path.basename(filename, path.extname(filename));
  const processedImages = [];
  
  // Update progress as we go
  let progress = 0;
  const progressIncrement = 100 / sizes.length;
  
  for (const size of sizes) {
    const outputFilename = `${fileBasename}_${size.suffix}${path.extname(filename)}`;
    const outputPath = path.join('processed', outputFilename);
    
    await sharp(filepath)
      .resize(size.width, size.height)
      .toFile(outputPath);
    
    processedImages.push({
      size: `${size.width}x${size.height}`,
      path: outputPath,
      url: `/processed/${outputFilename}`
    });
    
    progress += progressIncrement;
    job.progress(Math.round(progress));
  }
  
  return {
    original: `/uploads/${filename}`,
    processed: processedImages
  };
});

// Log completed jobs
imageQueue.on('completed', (job, result) => {
  console.log(`Job ${job.id} completed. Generated ${result.processed.length} thumbnails.`);
});

// Log failed jobs
imageQueue.on('failed', (job, error) => {
  console.error(`Job ${job.id} failed:`, error);
});

console.log('Worker started, waiting for jobs...');

HTML Upload Form

<!-- public/index.html -->
<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>Thumbnail Generator</title>
    <link rel="stylesheet" href="/css/style.css">
</head>
<body>
    <div class="container">
        <h1>Image Thumbnail Generator</h1>
        <form id="uploadForm" enctype="multipart/form-data">
            <div class="form-group">
                <label for="image">Select an image:</label>
                <input type="file" id="image" name="image" accept="image/*" required>
            </div>
            <button type="submit">Upload & Generate Thumbnails</button>
        </form>
        
        <div id="status" class="hidden">
            <h2>Processing...</h2>
            <div class="progress-bar">
                <div class="progress" id="progressBar"></div>
            </div>
            <p id="progressText">0%</p>
        </div>
        
        <div id="results" class="hidden">
            <h2>Generated Thumbnails</h2>
            <div class="thumbnails" id="thumbnailContainer"></div>
        </div>
    </div>

    <script>
        document.getElementById('uploadForm').addEventListener('submit', async (e) => {
            e.preventDefault();
            
            const formData = new FormData(e.target);
            const statusDiv = document.getElementById('status');
            const resultsDiv = document.getElementById('results');
            const progressBar = document.getElementById('progressBar');
            const progressText = document.getElementById('progressText');
            const thumbnailContainer = document.getElementById('thumbnailContainer');
            
            statusDiv.classList.remove('hidden');
            resultsDiv.classList.add('hidden');
            
            try {
                // Upload the image
                const response = await fetch('/upload', {
                    method: 'POST',
                    body: formData
                });
                
                const data = await response.json();
                if (!data.job) {
                    throw new Error('Upload failed');
                }
                
                // Poll for job status
                const jobId = data.job.id;
                const checkInterval = setInterval(async () => {
                    const statusResponse = await fetch(`/status/${jobId}`);
                    const statusData = await statusResponse.json();
                    
                    progressBar.style.width = `${statusData.progress}%`;
                    progressText.textContent = `${Math.round(statusData.progress)}%`;
                    
                    if (statusData.state === 'completed') {
                        clearInterval(checkInterval);
                        displayResults(statusData.result);
                    } else if (statusData.state === 'failed') {
                        clearInterval(checkInterval);
                        alert('Processing failed, please try again');
                    }
                }, 1000);
                
            } catch (error) {
                console.error('Error:', error);
                statusDiv.classList.add('hidden');
                alert('An error occurred: ' + error.message);
            }
        });
        
        function displayResults(result) {
            const statusDiv = document.getElementById('status');
            const resultsDiv = document.getElementById('results');
            const thumbnailContainer = document.getElementById('thumbnailContainer');
            
            statusDiv.classList.add('hidden');
            resultsDiv.classList.remove('hidden');
            
            thumbnailContainer.innerHTML = '';
            
            // Add original image
            const originalDiv = document.createElement('div');
            originalDiv.className = 'thumbnail-item';
            originalDiv.innerHTML = `
                <h3>Original</h3>
                <img src="${result.original}" alt="Original image">
            `;
            thumbnailContainer.appendChild(originalDiv);
            
            // Add thumbnails
            result.processed.forEach(image => {
                const thumbnailDiv = document.createElement('div');
                thumbnailDiv.className = 'thumbnail-item';
                thumbnailDiv.innerHTML = `
                    <h3>${image.size}</h3>
                    <img src="${image.url}" alt="${image.size} thumbnail">
                `;
                thumbnailContainer.appendChild(thumbnailDiv);
            });
        }
    </script>
</body>
</html>

Running the Application

To run this application, you need to:

  1. Install Redis on your system or use a Docker container
  2. Install Node.js dependencies: bull, express, multer, sharp, @bull-board/api, @bull-board/express
  3. Start Redis server
  4. Run the worker process: node worker.js
  5. Run the server: node server.js

This example demonstrates how to:

Conclusion and Additional Resources

Queue systems are essential components in modern web applications, enabling asynchronous processing, reliability, and scalability. By properly implementing queues, you can:

Key Takeaways

Further Learning Resources

Practice Exercises

Exercise 1: Simple Email Queue

Create a simple application that uses Bull to queue and send emails. Use nodemailer for sending emails and implement rate limiting to avoid sending too many emails at once.

Exercise 2: Extend the Thumbnail Generator

Add the following features to the thumbnail generator application:

Exercise 3: Multi-stage Processing Pipeline

Create a processing pipeline with multiple queues where the output of one job becomes the input for another. For example:

  1. Upload queue: Receives uploaded files and stores them
  2. Processing queue: Processes the files (e.g., image resizing)
  3. Notification queue: Sends notifications when processing is complete

Exercise 4: Dashboard for Queue Monitoring

Build a custom dashboard to monitor multiple queues in your application. The dashboard should show: