Skip to main content
Mage supports real-time data streaming to messaging platforms, enabling event-driven architectures and low-latency data pipelines.

Supported Streaming Platforms

Apache Kafka

Distributed event streaming platform for high-throughput data pipelines

Apache Kafka

Overview

Apache Kafka is a distributed event streaming platform used for:
  • Real-time data pipelines - Stream data between systems
  • Event sourcing - Capture state changes as events
  • Log aggregation - Centralize logs from multiple services
  • Stream processing - Process data in real-time with Kafka Streams or Flink

Configuration

bootstrap_server: localhost:9092
topic: user_events
api_version: (0, 10, 2)  # Kafka API version as tuple

Features

  • High throughput - Millions of messages per second
  • Low latency - Millisecond message delivery
  • Message keys - Partition data by key for ordering
  • JSON serialization - Automatic value serialization
  • Automatic retries - Built-in error handling
  • Multiple brokers - High availability clustering

Message Format

Mage sends messages as JSON with automatic serialization:
{
  "user_id": 123,
  "event_name": "page_view",
  "timestamp": "2024-03-04T15:30:45.123456+00:00",
  "properties": {
    "page": "/products",
    "referrer": "google.com"
  },
  "_mage_created_at": "2024-03-04T15:30:45.123456+00:00",
  "_mage_updated_at": "2024-03-04T15:30:45.123456+00:00"
}

Message Keys

Use message keys to ensure ordering within partitions:
# Specify key property in pipeline metadata
key_properties:
  user_events:
    - user_id  # Messages with same user_id go to same partition
Benefits:
  • Ordered processing - Messages with same key are ordered
  • Partition affinity - Related messages go to same partition
  • Consumer scaling - Distribute load across consumers

API Versions

Specify Kafka API version as a tuple:
api_version: (0, 10, 2)  # Kafka 0.10.2
api_version: (1, 1, 0)   # Kafka 1.1.0
api_version: (2, 5, 0)   # Kafka 2.5.0
api_version: (3, 0, 0)   # Kafka 3.0.0
Use the version matching your Kafka cluster for best compatibility.

Kafka Authentication

SASL/PLAIN with SSL

Common for cloud providers like Confluent Cloud:
bootstrap_server: pkc-xxxxx.region.provider.confluent.cloud:9092
topic: events
security_protocol: SASL_SSL
sasl_mechanism: PLAIN
sasl_plain_username: YOUR_API_KEY
sasl_plain_password: YOUR_API_SECRET

SASL/SCRAM with SSL

More secure than PLAIN:
bootstrap_server: kafka.example.com:9093
topic: events
security_protocol: SASL_SSL
sasl_mechanism: SCRAM-SHA-256
sasl_plain_username: kafka_user
sasl_plain_password: kafka_password
ssl_ca_location: /path/to/ca-cert.pem

SSL Only

Client certificate authentication:
bootstrap_server: kafka.example.com:9093
topic: events
security_protocol: SSL
ssl_ca_location: /path/to/ca-cert.pem
ssl_cert_location: /path/to/client-cert.pem
ssl_key_location: /path/to/client-key.pem
ssl_key_password: key_password

No Authentication (Development)

For local development:
bootstrap_server: localhost:9092
topic: events
api_version: (2, 5, 0)
# No security_protocol needed

Topic Configuration

Single Topic

Export all data to one topic:
topic: user_events

Dynamic Topics

Route to different topics based on data (requires custom implementation):
@data_exporter
def export_to_kafka(data, *args, **kwargs):
    for record in data:
        topic = f"events.{record['event_type']}"
        # Send to dynamic topic

Data Serialization

JSON (Default)

Mage automatically serializes data as JSON:
# Your data
{
    'user_id': 123,
    'event': 'click',
    'metadata': {'page': '/home'}
}

# Serialized to Kafka
b'{"user_id":123,"event":"click","metadata":{"page":"/home"}}'

Key Serialization

Keys are UTF-8 encoded strings:
# Key from key_properties
key = record['user_id']  # e.g., 123

# Serialized
key_bytes = b'123'

Performance Tuning

Kafka producer batches messages for efficiency:
# These settings would go in custom producer config if supported
batch_size: 16384      # 16 KB batches
linger_ms: 10          # Wait 10ms to batch messages
compression_type: snappy  # Compress batches
Benefits:
  • Reduced network overhead
  • Higher throughput
  • Lower latency with compression
Control durability vs performance:
acks: 1  # Leader acknowledged (default)
# acks: 0  # No acknowledgment (fastest, least durable)
# acks: all  # All replicas acknowledged (slowest, most durable)
acks=0: Maximum performance, risk of data lossacks=1: Good balance, leader must write before ackacks=all: Maximum durability, all in-sync replicas must write
Message distribution across partitions:With Key:
  • Hash of key determines partition
  • Same key always goes to same partition
  • Maintains ordering per key
Without Key:
  • Round-robin across partitions
  • No ordering guarantee
  • Better load distribution
# Specify key for ordered processing
key_properties:
  events:
    - user_id

Example: Kafka Export Pipeline

from mage_ai.settings.repo import get_repo_path
from mage_ai.io.config import ConfigFileLoader
from pandas import DataFrame
import os

if 'data_exporter' not in globals():
    from mage_ai.data_preparation.decorators import data_exporter

@data_exporter
def export_to_kafka(df: DataFrame, **kwargs) -> None:
    """
    Export events to Kafka topic in real-time.
    """
    from mage_integrations.destinations.kafka import Kafka
    
    config = {
        'bootstrap_server': os.environ.get('KAFKA_BOOTSTRAP_SERVERS', 'localhost:9092'),
        'topic': 'user_events',
        'api_version': (2, 5, 0),
    }
    
    # For Confluent Cloud
    if os.environ.get('KAFKA_API_KEY'):
        config.update({
            'security_protocol': 'SASL_SSL',
            'sasl_mechanism': 'PLAIN',
            'sasl_plain_username': os.environ.get('KAFKA_API_KEY'),
            'sasl_plain_password': os.environ.get('KAFKA_API_SECRET'),
        })
    
    kafka = Kafka(
        config=config,
        batch_processing=True,
    )
    
    # Convert DataFrame to records
    records = df.to_dict('records')
    
    # Kafka exporter handles sending
    # Each record becomes a separate message

Testing Connection

from mage_integrations.destinations.kafka import Kafka
import ast

config = {
    'bootstrap_server': 'localhost:9092',
    'api_version': (2, 5, 0),
}

kafka = Kafka(config=config, batch_processing=True)

try:
    kafka.test_connection()
    print('Kafka connection successful')
    print('Available topics:', kafka.build_client().topics())
except Exception as e:
    print(f'Connection failed: {e}')

Common Use Cases

Event Streaming

Stream user events to Kafka for real-time analytics:
@data_exporter
def stream_events(df, **kwargs):
    """
    Stream clickstream events to Kafka.
    Each row becomes an event message.
    """
    # Mage handles message-by-message sending
    return df

Change Data Capture (CDC)

Capture database changes and stream to Kafka:
@data_exporter
def stream_changes(df, **kwargs):
    """
    Stream database changes to Kafka.
    Each change becomes a message.
    """
    # Add change type metadata
    df['_change_type'] = 'UPDATE'
    df['_source_timestamp'] = datetime.utcnow().isoformat()
    return df

Log Aggregation

Centralize logs from multiple sources:
@data_exporter
def aggregate_logs(df, **kwargs):
    """
    Send application logs to Kafka.
    """
    # Structure logs as events
    df['log_level'] = df['level']
    df['message'] = df['msg']
    df['service'] = 'api-service'
    return df

Error Handling

Error: “NoBrokersAvailable”Causes:
  • Incorrect bootstrap server address
  • Network connectivity issues
  • Firewall blocking connection
  • Wrong API version
Solutions:
# Verify bootstrap server
bootstrap_server: correct-host:9092

# Test connectivity
telnet kafka-host 9092

# Check API version compatibility
api_version: (2, 5, 0)  # Match your Kafka version
Error: “AuthenticationFailure”Solutions:
  • Verify username/password
  • Check security_protocol matches cluster
  • Ensure SASL mechanism is correct
  • Verify SSL certificates are valid
# Double-check credentials
sasl_plain_username: correct_username
sasl_plain_password: correct_password
security_protocol: SASL_SSL  # Must match cluster config
Error: “UnknownTopicOrPartition”Causes:
  • Topic doesn’t exist
  • No permission to create topic
  • Auto-creation disabled
Solutions:
# Create topic manually
kafka-topics --create \
  --bootstrap-server localhost:9092 \
  --topic user_events \
  --partitions 3 \
  --replication-factor 2

# Or grant auto-create permission
# (if cluster allows)
Error: “MessageSizeTooLarge”Solution: Increase broker’s message.max.bytes or reduce message size
# Split large messages
@data_exporter
def export_to_kafka(df, **kwargs):
    for record in df.to_dict('records'):
        # Remove large fields or split into multiple messages
        if 'large_field' in record:
            del record['large_field']
        yield record

Monitoring

Producer Metrics

Monitor Kafka producer performance:
# After sending messages
producer = kafka.build_client()
metrics = producer.metrics()

print(f"Record send rate: {metrics['record-send-rate']}")
print(f"Request latency avg: {metrics['request-latency-avg']}")
print(f"Buffer available bytes: {metrics['buffer-available-bytes']}")

Consumer Lag

Monitor if consumers are keeping up:
# Check consumer lag
kafka-consumer-groups --bootstrap-server localhost:9092 \
  --describe --group my-consumer-group

Best Practices

Include Metadata
{
  "event_id": "uuid",
  "event_type": "page_view",
  "timestamp": "2024-03-04T15:30:45Z",
  "schema_version": "1.0",
  "data": {
    // Event-specific fields
  }
}
Keep Messages Small
  • Target < 1 MB per message
  • Use references for large data
  • Store large payloads in object storage
Schema Evolution
  • Add optional fields for backward compatibility
  • Include schema version in messages
  • Consider using Avro or Protobuf
Choose Good Keys
  • High cardinality (many unique values)
  • Evenly distributed
  • Relevant for consumers
# Good keys
key_properties:
  - user_id    # Even distribution
  - device_id  # Natural partitioning

# Avoid
key_properties:
  - country    # Low cardinality
  - timestamp  # Too granular
Right Number of Partitions
  • Start with 3-5 partitions
  • Scale up based on throughput needs
  • Consider: partitions = max(producers, consumers)
Idempotent Producers Enable idempotence to prevent duplicates:
enable_idempotence: true
Retry Configuration
retries: 3
retry_backoff_ms: 100
Dead Letter Queue Route failed messages to separate topic for investigation

Integration with Other Services

Kafka Connect

Use Kafka Connect to sync Kafka topics with databases:
# JDBC Sink Connector example
{
  "name": "jdbc-sink",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "topics": "user_events",
    "connection.url": "jdbc:postgresql://localhost/mydb",
    "auto.create": "true"
  }
}

Stream Processing

Process Kafka messages with:
  • Kafka Streams - Java/Scala stream processing
  • Apache Flink - Distributed stream processing
  • ksqlDB - SQL interface for Kafka

AWS Integration

For AWS Kinesis instead of Kafka, consider:
  • Kinesis has different API (not covered here)
  • Similar concepts (streams, shards, records)
  • Managed service with auto-scaling

Next Steps

Databases

Export to PostgreSQL, MySQL, and other databases

Data Warehouses

Configure BigQuery, Snowflake, and Redshift

Build docs developers (and LLMs) love