Supported Platforms
Apache Kafka
Distributed event streaming platform
Google Pub/Sub
GCP messaging service
RabbitMQ
Message broker with AMQP
NATS
Cloud native messaging system
ActiveMQ
Java message broker via STOMP
AWS Kinesis
Real-time data streaming (via boto3)
Apache Kafka
Consume messages from Kafka topics using the kafka-python library.Installation
kafka-python==2.3.0confluent-avro~=1.8.0
Configuration Example
SASL/SSL Authentication
Avro Schema Support
Google Pub/Sub
Consume messages from Google Cloud Pub/Sub.Installation
google-cloud-pubsub~=2.19.0
Configuration
Pull Subscription
RabbitMQ
Consume messages from RabbitMQ queues.Installation
pika==1.3.1
Configuration
SSL/TLS Connection
NATS
Consume messages from NATS messaging system.Installation
nats-py==2.6.0nkeys~=0.2.0
Configuration
JetStream (Persistent Streams)
AWS Kinesis
Consume real-time data streams from AWS Kinesis.Installation
boto3==1.26.60
Configuration
With Checkpointing
ActiveMQ (STOMP)
Connect to ActiveMQ using the STOMP protocol.Installation
stomp.py==8.1.0
Configuration
Elasticsearch
Query Elasticsearch indices as a streaming source.Installation
elasticsearch==8.15.1
Configuration
OpenSearch
Query OpenSearch clusters.Installation
opensearch-py==2.0.0
Configuration
InfluxDB
Query time-series data from InfluxDB.Installation
influxdb_client==1.36.1
Configuration
MongoDB Change Streams
Real-time CDC for MongoDB.Installation
pymongo==4.3.3
Configuration
Best Practices
Consumer Groups
Use consumer groups for parallel processing:Batch Processing
Process messages in batches for efficiency:Error Handling
Implement robust error handling:State Management
Track offsets and sequence numbers:Monitoring & Observability
Message Metrics
Track message processing:Installation
Comparison Table
| Platform | Best For | Throughput | Durability | Ordering |
|---|---|---|---|---|
| Kafka | High throughput, log aggregation | Very High | High | Partition-level |
| Kinesis | AWS-native streaming | High | High | Shard-level |
| Pub/Sub | GCP-native, fan-out | High | High | No guarantee |
| RabbitMQ | Message routing, job queues | Medium | High | Queue-level |
| NATS | Low latency, IoT | Very High | Medium | No guarantee |
| ActiveMQ | Enterprise messaging | Medium | High | Queue-level |
Best Practices
- Use consumer groups to distribute load across multiple instances
- Implement batching to reduce processing overhead
- Handle message failures with dead letter queues
- Monitor lag to ensure timely processing
- Set appropriate timeouts based on message processing time
- Use schema validation for message formats
- Implement idempotency for duplicate message handling
- Track offsets/sequence numbers for exactly-once processing
- Set up monitoring for consumer lag and errors
- Test failure scenarios (broker downtime, network issues)
Troubleshooting
Connection Timeout
Connection Timeout
- Verify network connectivity
- Check firewall rules
- Ensure correct ports are open
- Validate credentials
Consumer Lag
Consumer Lag
- Increase batch size
- Add more consumer instances
- Optimize message processing
- Check for slow downstream systems
Message Deserialization Errors
Message Deserialization Errors
- Verify message format
- Check schema compatibility
- Handle different message versions
- Log problematic messages
Duplicate Messages
Duplicate Messages
- Implement idempotent processing
- Use unique message IDs
- Deduplicate at write time
- Track processed message IDs
Next Steps
Database Sources
Connect to PostgreSQL, MySQL, Snowflake, and more
API Sources
Integrate with REST APIs and SaaS platforms