Skip to main content
Mage supports real-time data ingestion from popular streaming platforms. While Mage integrations focus primarily on batch data extraction, the platform supports streaming through Python dependencies and custom integrations.

Supported Platforms

Apache Kafka

Distributed event streaming platform

Google Pub/Sub

GCP messaging service

RabbitMQ

Message broker with AMQP

NATS

Cloud native messaging system

ActiveMQ

Java message broker via STOMP

AWS Kinesis

Real-time data streaming (via boto3)

Apache Kafka

Consume messages from Kafka topics using the kafka-python library.

Installation

pip install "mage-ai[streaming]"
This installs:
  • kafka-python==2.3.0
  • confluent-avro~=1.8.0

Configuration Example

from kafka import KafkaConsumer
import json

@data_loader
def load_from_kafka(*args, **kwargs):
    consumer = KafkaConsumer(
        'my-topic',
        bootstrap_servers=['kafka1:9092', 'kafka2:9092'],
        auto_offset_reset='earliest',
        enable_auto_commit=True,
        group_id='mage-consumer-group',
        value_deserializer=lambda x: json.loads(x.decode('utf-8')),
        security_protocol='SASL_SSL',
        sasl_mechanism='PLAIN',
        sasl_plain_username='${env:KAFKA_USERNAME}',
        sasl_plain_password='${env:KAFKA_PASSWORD}'
    )
    
    messages = []
    for message in consumer:
        messages.append(message.value)
        if len(messages) >= 1000:  # Batch size
            break
    
    return messages

SASL/SSL Authentication

from kafka import KafkaConsumer
from kafka.errors import KafkaError

consumer = KafkaConsumer(
    'events-topic',
    bootstrap_servers='kafka.example.com:9093',
    security_protocol='SASL_SSL',
    sasl_mechanism='SCRAM-SHA-512',
    sasl_plain_username='kafka_user',
    sasl_plain_password='secure_password',
    ssl_cafile='/path/to/ca-cert',
    ssl_certfile='/path/to/client-cert',
    ssl_keyfile='/path/to/client-key',
    api_version=(2, 0, 2)
)

Avro Schema Support

from kafka import KafkaConsumer
from mage_integrations.utils.avro import AvroDeserializer

deserializer = AvroDeserializer(schema_registry_url='http://schema-registry:8081')

consumer = KafkaConsumer(
    'avro-topic',
    bootstrap_servers='kafka:9092',
    value_deserializer=lambda x: deserializer.deserialize(x)
)

for message in consumer:
    record = message.value
    # Process Avro record
from kafka import KafkaConsumer
import json

@data_loader
def load_from_kafka(*args, **kwargs):
    consumer = KafkaConsumer(
        'user-events',
        bootstrap_servers='localhost:9092',
        auto_offset_reset='earliest',
        group_id='mage-group',
        value_deserializer=lambda x: json.loads(x.decode('utf-8'))
    )
    
    batch = []
    for message in consumer:
        batch.append(message.value)
        if len(batch) >= 100:
            yield batch
            batch = []

Google Pub/Sub

Consume messages from Google Cloud Pub/Sub.

Installation

pip install "mage-ai[all]"
Includes: google-cloud-pubsub~=2.19.0

Configuration

from google.cloud import pubsub_v1
import json

@data_loader
def load_from_pubsub(*args, **kwargs):
    project_id = "my-project"
    subscription_id = "my-subscription"
    
    subscriber = pubsub_v1.SubscriberClient.from_service_account_file(
        'service-account.json'
    )
    subscription_path = subscriber.subscription_path(project_id, subscription_id)
    
    messages = []
    
    def callback(message):
        data = json.loads(message.data.decode('utf-8'))
        messages.append(data)
        message.ack()
        
        if len(messages) >= 1000:
            # Stop after batch
            return
    
    streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
    
    try:
        # Block for 30 seconds
        streaming_pull_future.result(timeout=30)
    except TimeoutError:
        streaming_pull_future.cancel()
    
    return messages

Pull Subscription

@data_loader
def load_from_pubsub_pull(*args, **kwargs):
    from google.cloud import pubsub_v1
    
    project_id = "my-project"
    subscription_id = "my-subscription"
    
    subscriber = pubsub_v1.SubscriberClient()
    subscription_path = subscriber.subscription_path(project_id, subscription_id)
    
    # Pull messages
    response = subscriber.pull(
        request={
            "subscription": subscription_path,
            "max_messages": 100,
        },
        timeout=30.0
    )
    
    messages = []
    ack_ids = []
    
    for received_message in response.received_messages:
        data = json.loads(received_message.message.data.decode('utf-8'))
        messages.append(data)
        ack_ids.append(received_message.ack_id)
    
    # Acknowledge messages
    if ack_ids:
        subscriber.acknowledge(
            request={
                "subscription": subscription_path,
                "ack_ids": ack_ids,
            }
        )
    
    return messages
from google.cloud import pubsub_v1
import json

@data_loader
def subscribe_pubsub(*args, **kwargs):
    subscriber = pubsub_v1.SubscriberClient.from_service_account_file(
        'credentials.json'
    )
    
    subscription_path = subscriber.subscription_path(
        'my-project',
        'my-subscription'
    )
    
    messages = []
    
    def callback(message):
        data = json.loads(message.data)
        messages.append(data)
        message.ack()
    
    future = subscriber.subscribe(subscription_path, callback)
    
    try:
        future.result(timeout=60)
    except TimeoutError:
        future.cancel()
    
    return messages

RabbitMQ

Consume messages from RabbitMQ queues.

Installation

pip install "mage-ai[streaming]"
Includes: pika==1.3.1

Configuration

import pika
import json

@data_loader
def load_from_rabbitmq(*args, **kwargs):
    credentials = pika.PlainCredentials('guest', 'guest')
    parameters = pika.ConnectionParameters(
        host='rabbitmq.example.com',
        port=5672,
        virtual_host='/',
        credentials=credentials
    )
    
    connection = pika.BlockingConnection(parameters)
    channel = connection.channel()
    
    queue_name = 'events'
    channel.queue_declare(queue=queue_name, durable=True)
    
    messages = []
    
    def callback(ch, method, properties, body):
        data = json.loads(body.decode('utf-8'))
        messages.append(data)
        ch.basic_ack(delivery_tag=method.delivery_tag)
        
        if len(messages) >= 100:
            ch.stop_consuming()
    
    channel.basic_consume(queue=queue_name, on_message_callback=callback)
    channel.start_consuming()
    
    connection.close()
    return messages

SSL/TLS Connection

import pika
import ssl

ssl_context = ssl.create_default_context(cafile="/path/to/ca.pem")
ssl_context.check_hostname = False

parameters = pika.ConnectionParameters(
    host='rabbitmq.example.com',
    port=5671,
    credentials=pika.PlainCredentials('user', 'pass'),
    ssl_options=pika.SSLOptions(ssl_context)
)

connection = pika.BlockingConnection(parameters)

NATS

Consume messages from NATS messaging system.

Installation

pip install "mage-ai[streaming]"
Includes:
  • nats-py==2.6.0
  • nkeys~=0.2.0

Configuration

import asyncio
import nats
import json

@data_loader
def load_from_nats(*args, **kwargs):
    async def fetch_messages():
        nc = await nats.connect("nats://nats.example.com:4222")
        
        messages = []
        
        async def message_handler(msg):
            data = json.loads(msg.data.decode())
            messages.append(data)
            
            if len(messages) >= 100:
                await nc.drain()
        
        await nc.subscribe("events.>", cb=message_handler)
        
        # Wait for messages
        await asyncio.sleep(30)
        await nc.drain()
        
        return messages
    
    return asyncio.run(fetch_messages())

JetStream (Persistent Streams)

import asyncio
import nats
from nats.js import JetStreamContext

@data_loader
def load_from_jetstream(*args, **kwargs):
    async def fetch():
        nc = await nats.connect("nats://localhost:4222")
        js = nc.jetstream()
        
        messages = []
        sub = await js.pull_subscribe("events", "mage-consumer")
        
        msgs = await sub.fetch(100, timeout=30)
        for msg in msgs:
            data = json.loads(msg.data.decode())
            messages.append(data)
            await msg.ack()
        
        await nc.drain()
        return messages
    
    return asyncio.run(fetch())

AWS Kinesis

Consume real-time data streams from AWS Kinesis.

Installation

pip install "mage-ai[s3]"
Includes: boto3==1.26.60

Configuration

import boto3
import json
from datetime import datetime

@data_loader
def load_from_kinesis(*args, **kwargs):
    kinesis = boto3.client(
        'kinesis',
        region_name='us-west-2',
        aws_access_key_id='${env:AWS_ACCESS_KEY_ID}',
        aws_secret_access_key='${env:AWS_SECRET_ACCESS_KEY}'
    )
    
    stream_name = 'my-stream'
    
    # Get shard iterator
    response = kinesis.describe_stream(StreamName=stream_name)
    shard_id = response['StreamDescription']['Shards'][0]['ShardId']
    
    shard_iterator = kinesis.get_shard_iterator(
        StreamName=stream_name,
        ShardId=shard_id,
        ShardIteratorType='LATEST'
    )['ShardIterator']
    
    # Fetch records
    messages = []
    while shard_iterator and len(messages) < 1000:
        response = kinesis.get_records(
            ShardIterator=shard_iterator,
            Limit=100
        )
        
        for record in response['Records']:
            data = json.loads(record['Data'].decode('utf-8'))
            messages.append(data)
        
        shard_iterator = response['NextShardIterator']
        
        if not response['Records']:
            break
    
    return messages

With Checkpointing

@data_loader
def load_from_kinesis_with_checkpoint(*args, **kwargs):
    import boto3
    
    kinesis = boto3.client('kinesis', region_name='us-west-2')
    dynamodb = boto3.client('dynamodb', region_name='us-west-2')
    
    stream_name = 'events-stream'
    checkpoint_table = 'mage-kinesis-checkpoints'
    
    # Get last sequence number from DynamoDB
    state = kwargs.get('state', {})
    last_sequence = state.get('last_sequence_number')
    
    iterator_type = 'AFTER_SEQUENCE_NUMBER' if last_sequence else 'LATEST'
    iterator_kwargs = {
        'StreamName': stream_name,
        'ShardId': 'shardId-000000000000',
        'ShardIteratorType': iterator_type
    }
    if last_sequence:
        iterator_kwargs['StartingSequenceNumber'] = last_sequence
    
    shard_iterator = kinesis.get_shard_iterator(**iterator_kwargs)['ShardIterator']
    
    messages = []
    last_seq = None
    
    response = kinesis.get_records(ShardIterator=shard_iterator, Limit=1000)
    for record in response['Records']:
        data = json.loads(record['Data'])
        messages.append(data)
        last_seq = record['SequenceNumber']
    
    # Save checkpoint
    if last_seq:
        state['last_sequence_number'] = last_seq
    
    return messages
import boto3
import json

@data_loader
def load_kinesis(*args, **kwargs):
    client = boto3.client('kinesis', region_name='us-west-2')
    
    response = client.describe_stream(StreamName='my-stream')
    shard_id = response['StreamDescription']['Shards'][0]['ShardId']
    
    iterator = client.get_shard_iterator(
        StreamName='my-stream',
        ShardId=shard_id,
        ShardIteratorType='TRIM_HORIZON'
    )['ShardIterator']
    
    records = []
    response = client.get_records(ShardIterator=iterator, Limit=1000)
    
    for record in response['Records']:
        data = json.loads(record['Data'])
        records.append(data)
    
    return records

ActiveMQ (STOMP)

Connect to ActiveMQ using the STOMP protocol.

Installation

pip install "mage-ai[streaming]"
Includes: stomp.py==8.1.0

Configuration

import stomp
import json
import time

class MessageListener(stomp.ConnectionListener):
    def __init__(self):
        self.messages = []
    
    def on_message(self, frame):
        data = json.loads(frame.body)
        self.messages.append(data)

@data_loader
def load_from_activemq(*args, **kwargs):
    conn = stomp.Connection(
        [('activemq.example.com', 61613)],
        heartbeats=(10000, 10000)
    )
    
    listener = MessageListener()
    conn.set_listener('mage', listener)
    
    conn.connect('username', 'password', wait=True)
    conn.subscribe('/queue/events', id=1, ack='auto')
    
    # Wait for messages
    time.sleep(30)
    
    conn.disconnect()
    
    return listener.messages

Elasticsearch

Query Elasticsearch indices as a streaming source.

Installation

pip install "mage-ai[streaming]"
Includes: elasticsearch==8.15.1

Configuration

from elasticsearch import Elasticsearch
import json

@data_loader
def load_from_elasticsearch(*args, **kwargs):
    es = Elasticsearch(
        ['https://elasticsearch.example.com:9200'],
        basic_auth=('elastic', 'password'),
        verify_certs=True,
        ca_certs='/path/to/ca.pem'
    )
    
    # Search with scroll API for large result sets
    query = {
        "query": {
            "range": {
                "timestamp": {
                    "gte": "2024-03-01T00:00:00",
                    "lt": "2024-03-04T00:00:00"
                }
            }
        },
        "sort": [{"timestamp": {"order": "asc"}}]
    }
    
    results = []
    resp = es.search(
        index="events-*",
        body=query,
        scroll='2m',
        size=1000
    )
    
    scroll_id = resp['_scroll_id']
    results.extend([hit['_source'] for hit in resp['hits']['hits']])
    
    while len(resp['hits']['hits']):
        resp = es.scroll(scroll_id=scroll_id, scroll='2m')
        results.extend([hit['_source'] for hit in resp['hits']['hits']])
    
    es.clear_scroll(scroll_id=scroll_id)
    
    return results

OpenSearch

Query OpenSearch clusters.

Installation

pip install "mage-ai[streaming]"
Includes: opensearch-py==2.0.0

Configuration

from opensearchpy import OpenSearch
import json

@data_loader
def load_from_opensearch(*args, **kwargs):
    client = OpenSearch(
        hosts=[{'host': 'opensearch.example.com', 'port': 9200}],
        http_auth=('admin', 'password'),
        use_ssl=True,
        verify_certs=True,
        ssl_show_warn=False
    )
    
    query = {
        "query": {
            "match_all": {}
        }
    }
    
    response = client.search(
        index="logs-*",
        body=query,
        size=1000
    )
    
    return [hit['_source'] for hit in response['hits']['hits']]

InfluxDB

Query time-series data from InfluxDB.

Installation

pip install "mage-ai[streaming]"
Includes: influxdb_client==1.36.1

Configuration

from influxdb_client import InfluxDBClient
from datetime import datetime, timedelta

@data_loader
def load_from_influxdb(*args, **kwargs):
    client = InfluxDBClient(
        url="https://influxdb.example.com:8086",
        token="${env:INFLUXDB_TOKEN}",
        org="my-org"
    )
    
    query_api = client.query_api()
    
    query = '''
    from(bucket: "metrics")
      |> range(start: -1h)
      |> filter(fn: (r) => r._measurement == "cpu")
      |> filter(fn: (r) => r._field == "usage_idle")
    '''
    
    result = query_api.query(query=query)
    
    records = []
    for table in result:
        for record in table.records:
            records.append({
                'time': record.get_time(),
                'measurement': record.get_measurement(),
                'field': record.get_field(),
                'value': record.get_value(),
                'tags': record.values
            })
    
    client.close()
    return records

MongoDB Change Streams

Real-time CDC for MongoDB.

Installation

pip install "mage-ai[streaming]"
Includes: pymongo==4.3.3

Configuration

from pymongo import MongoClient
import json

@data_loader
def load_from_mongodb_changestream(*args, **kwargs):
    client = MongoClient(
        'mongodb://localhost:27017/',
        username='user',
        password='pass'
    )
    
    db = client['mydb']
    collection = db['users']
    
    # Watch for changes
    changes = []
    
    with collection.watch() as stream:
        for change in stream:
            changes.append({
                'operation': change['operationType'],
                'document_key': change['documentKey'],
                'full_document': change.get('fullDocument'),
                'timestamp': change['clusterTime']
            })
            
            if len(changes) >= 100:
                break
    
    return changes

Best Practices

Consumer Groups

Use consumer groups for parallel processing:
# Kafka
consumer = KafkaConsumer(
    'topic',
    group_id='mage-pipeline-1',  # Unique per pipeline
    bootstrap_servers='kafka:9092'
)

Batch Processing

Process messages in batches for efficiency:
@data_loader
def load_streaming_batch(*args, **kwargs):
    batch = []
    batch_size = 1000
    batch_timeout = 30  # seconds
    
    start_time = datetime.now()
    
    for message in consumer:
        batch.append(message.value)
        
        # Flush batch if size or timeout reached
        if len(batch) >= batch_size or \
           (datetime.now() - start_time).seconds > batch_timeout:
            yield batch
            batch = []
            start_time = datetime.now()
    
    # Flush remaining messages
    if batch:
        yield batch

Error Handling

Implement robust error handling:
@data_loader
def load_with_error_handling(*args, **kwargs):
    from kafka import KafkaConsumer
    from kafka.errors import KafkaError
    
    consumer = KafkaConsumer(
        'events',
        bootstrap_servers='kafka:9092',
        max_poll_interval_ms=300000,
        session_timeout_ms=60000
    )
    
    messages = []
    errors = []
    
    try:
        for message in consumer:
            try:
                data = json.loads(message.value)
                messages.append(data)
            except json.JSONDecodeError as e:
                errors.append({
                    'offset': message.offset,
                    'error': str(e),
                    'raw': message.value
                })
            
            if len(messages) >= 1000:
                break
    except KafkaError as e:
        print(f"Kafka error: {e}")
        raise
    finally:
        consumer.close()
    
    if errors:
        print(f"Failed to parse {len(errors)} messages")
    
    return messages

State Management

Track offsets and sequence numbers:
@data_loader
def load_with_state(*args, **kwargs):
    state = kwargs.get('state', {})
    last_offset = state.get('kafka_offset', 0)
    
    consumer = KafkaConsumer(
        'events',
        bootstrap_servers='kafka:9092',
        enable_auto_commit=False
    )
    
    # Seek to last offset
    partition = TopicPartition('events', 0)
    consumer.assign([partition])
    consumer.seek(partition, last_offset)
    
    messages = []
    for message in consumer:
        messages.append(message.value)
        
        if len(messages) >= 1000:
            # Save offset for next run
            state['kafka_offset'] = message.offset + 1
            consumer.commit()
            break
    
    return messages

Monitoring & Observability

Message Metrics

Track message processing:
@data_loader
def load_with_metrics(*args, **kwargs):
    import time
    from datetime import datetime
    
    consumer = KafkaConsumer('events', bootstrap_servers='kafka:9092')
    
    start_time = time.time()
    message_count = 0
    error_count = 0
    
    messages = []
    
    for message in consumer:
        try:
            data = json.loads(message.value)
            messages.append(data)
            message_count += 1
        except Exception as e:
            error_count += 1
            print(f"Error processing message: {e}")
        
        if len(messages) >= 1000:
            break
    
    elapsed = time.time() - start_time
    print(f"Processed {message_count} messages in {elapsed:.2f}s")
    print(f"Throughput: {message_count/elapsed:.2f} msg/s")
    print(f"Errors: {error_count}")
    
    return messages

Installation

# Install streaming dependencies
pip install "mage-ai[streaming]"

# This includes:
# - kafka-python
# - pika (RabbitMQ)
# - nats-py
# - stomp.py (ActiveMQ)
# - elasticsearch
# - opensearch-py
# - influxdb_client
# - pymongo
# - confluent-avro

Comparison Table

PlatformBest ForThroughputDurabilityOrdering
KafkaHigh throughput, log aggregationVery HighHighPartition-level
KinesisAWS-native streamingHighHighShard-level
Pub/SubGCP-native, fan-outHighHighNo guarantee
RabbitMQMessage routing, job queuesMediumHighQueue-level
NATSLow latency, IoTVery HighMediumNo guarantee
ActiveMQEnterprise messagingMediumHighQueue-level

Best Practices

  1. Use consumer groups to distribute load across multiple instances
  2. Implement batching to reduce processing overhead
  3. Handle message failures with dead letter queues
  4. Monitor lag to ensure timely processing
  5. Set appropriate timeouts based on message processing time
  6. Use schema validation for message formats
  7. Implement idempotency for duplicate message handling
  8. Track offsets/sequence numbers for exactly-once processing
  9. Set up monitoring for consumer lag and errors
  10. Test failure scenarios (broker downtime, network issues)

Troubleshooting

  • Verify network connectivity
  • Check firewall rules
  • Ensure correct ports are open
  • Validate credentials
  • Increase batch size
  • Add more consumer instances
  • Optimize message processing
  • Check for slow downstream systems
  • Verify message format
  • Check schema compatibility
  • Handle different message versions
  • Log problematic messages
  • Implement idempotent processing
  • Use unique message IDs
  • Deduplicate at write time
  • Track processed message IDs

Next Steps

Database Sources

Connect to PostgreSQL, MySQL, Snowflake, and more

API Sources

Integrate with REST APIs and SaaS platforms

Build docs developers (and LLMs) love