Understanding message queue patterns and Redis-based implementations for asynchronous communication in distributed systems
Message queues enable asynchronous communication between distributed components, improving system performance through decoupling, traffic smoothing, and fault tolerance.
A message queue is an inter-application communication mechanism where:
Producers send messages without waiting for processing
Message broker ensures reliable delivery
Consumers retrieve and process messages independently
Design Pattern Connection: Message queues implement the Observer pattern - publishers emit events without knowing who will consume them, and subscribers receive events without knowing who published them.
Improve system performance by deferring non-critical operations
1
Synchronous problem
Issues: Long response time, user waits for all operations
2
Asynchronous solution
Benefits: Fast response, background processing
Real Example: E-commerce Order
Critical path (synchronous):
Validate inventory
Process payment
Create order record
Background tasks (async via queue):
Send SMS notifications
Send email receipts
Update analytics
Log audit trail
Reduce dependencies between system componentsBefore (tightly coupled):
public void placeOrder(Order order) { inventoryService.reserve(order); // Direct dependency paymentService.charge(order); // Direct dependency smsService.notify(order); // Direct dependency emailService.send(order); // Direct dependency logService.record(order); // Direct dependency}
Problems:
All services must be available
Changes cascade across services
Difficult to add new integrations
After (loosely coupled):
public void placeOrder(Order order) { inventoryService.reserve(order); paymentService.charge(order); // Publish event, don't call directly eventPublisher.publish("order.created", order); return order; // Fast response}
Benefits:
Services work independently
Easy to add new consumers
Failures isolated
Handle traffic spikes without overloading backend
Peak Shaving Example
Scenario: Flash sale with 10,000 requests/secondWithout queue:
# Consumer blocks until data available or timeoutBRPOP queue:orders 30 # Wait up to 30 seconds# Returns: 1) "queue:orders" 2) "{message}"# Wait indefinitely (timeout=0)BRPOP queue:orders 0
✅ Benefits:
No busy-waiting: Client sleeps until message arrives
Reduced Redis load: No constant polling
Instant processing: Wakes immediately when message published
while True: try: result = redis.brpop('queue:orders', timeout=30) if result: process_message(result[1]) except redis.exceptions.ConnectionError: # Server disconnected idle connection reconnect()
Redis may close idle connections to save resources. Always handle connection errors!
Strategy: Read first, then consume after processing
# Consumer peeks at message (doesn't remove)LRANGE queue:orders -1 -1 # Get last element# Process message...# If successful:RPOP queue:orders # Now remove it
✅ Improves safety:
Message stays on server during processing
Client crash doesn’t lose message
❌ New problems:
Not blocking (back to busy-polling)
Duplicate processing risk: If consumer crashes after processing but before RPOP
# Atomically move from main queue to processing queueBRPOPLPUSH queue:orders queue:processing 30# Consumer processes message...# On success, remove from processing queue:LREM queue:processing 1 "{message}"
How It Works
✅ Advantages:
Atomic operation: Message never lost between queues
Server-side safety: All operations on Redis
Crash recovery: Unacknowledged messages stay in processing queue
Blocking: No busy-polling
❌ Remaining issue:
Stuck messages: If consumer crashes, messages stuck in queue:processingSolution: Monitoring daemon
# Separate process monitors processing queuedef rescue_stuck_messages(): while True: # Find messages older than timeout old_messages = find_old_messages('queue:processing', timeout=300) for msg in old_messages: # Move back to main queue redis.rpoplpush('queue:processing', 'queue:orders') time.sleep(60)
This creates a circular queue for automatic retry.
# Publisherredis.publish('order.created', json.dumps({ 'orderId': 123, 'userId': 456}))# Multiple consumers:# - Inventory service (reduce stock)# - Email service (send confirmation)# - Analytics service (track metrics)# - Notification service (push alert)
Real-Time Messaging
Example: Chat application
# User joins chat roomredis.subscribe(f'chat:room:{room_id}')# User sends messageredis.publish(f'chat:room:{room_id}', message)# All room participants receive instantly
Introduced: Redis 5.0 (2018)Inspiration: Apache Kafka conceptsStreams combine the best of Lists and Pub/Sub while adding persistence, consumer groups, and acknowledgments.
# Limit stream size (automatic trimming)XADD mystream MAXLEN ~ 10000 * field value# ^# ~ means approximate (more efficient)# Delete specific messagesXDEL mystream 1638360000000-0# Get stream infoXINFO STREAM mystream# Get consumer group infoXINFO GROUPS mystream# Get consumers in groupXINFO CONSUMERS mystream worker-group