Skip to main content
Mage provides native integrations with popular SQL and NoSQL databases, supporting full table replication, incremental sync, and change data capture (CDC) for real-time updates.

Supported Databases

PostgreSQL

RDBMS with CDC support via logical replication

MySQL

Popular open-source relational database

Snowflake

Cloud data warehouse platform

BigQuery

Google Cloud serverless data warehouse

Redshift

AWS cloud data warehouse

MongoDB

NoSQL document database

MSSQL

Microsoft SQL Server

Oracle

Enterprise relational database

Teradata

Enterprise data warehouse

PostgreSQL

Extract data from PostgreSQL with support for full table, incremental, and log-based (CDC) replication.

Configuration

{
  "host": "postgres.example.com",
  "port": 5432,
  "database": "production",
  "schema": "public",
  "username": "mage_user",
  "password": "${env:POSTGRES_PASSWORD}",
  "start_date": "2024-01-01T00:00:00Z"
}

Log-Based Replication (CDC)

Configure PostgreSQL for real-time change data capture:
{
  "host": "postgres.example.com",
  "database": "production",
  "schema": "public",
  "username": "replication_user",
  "password": "${env:POSTGRES_PASSWORD}",
  "replication_slot": "mage_slot",
  "publication_name": "mage_pub",
  "logical_poll_total_seconds": 60
}
  1. Enable logical replication in postgresql.conf:
wal_level = logical
max_replication_slots = 4
max_wal_senders = 4
  1. Create a publication:
CREATE PUBLICATION mage_pub FOR ALL TABLES;
  1. Create a replication slot:
SELECT pg_create_logical_replication_slot('mage_slot', 'pgoutput');
  1. Grant necessary permissions:
GRANT SELECT ON ALL TABLES IN SCHEMA public TO mage_user;
GRANT USAGE ON SCHEMA public TO mage_user;
ALTER USER mage_user WITH REPLICATION;

Example Usage

from mage_integrations.sources.postgresql import PostgreSQL

config = {
    'host': 'localhost',
    'database': 'mydb',
    'schema': 'public',
    'username': 'user',
    'password': 'pass'
}

source = PostgreSQL(config=config)

# Discover tables
catalog = source.discover()
print(f"Found {len(catalog.streams)} tables")

# Test connection
source.test_connection()

MySQL

Connect to MySQL databases with support for SSH tunneling and multiple connection methods.

Configuration

{
  "host": "mysql.example.com",
  "port": 3306,
  "database": "ecommerce",
  "username": "readonly",
  "password": "${env:MYSQL_PASSWORD}",
  "connection_method": "direct"
}

SSH Tunnel Connection

{
  "host": "internal-mysql.local",
  "port": 3306,
  "database": "production",
  "username": "app_user",
  "password": "${env:MYSQL_PASSWORD}",
  "connection_method": "ssh_tunnel",
  "ssh_host": "bastion.example.com",
  "ssh_port": 22,
  "ssh_username": "ssh_user",
  "ssh_pkey": "${env:SSH_PRIVATE_KEY}"
}

Snowflake

Connect to Snowflake data warehouse using password or private key authentication.

Configuration

{
  "account": "xy12345.us-east-1",
  "warehouse": "COMPUTE_WH",
  "database": "ANALYTICS",
  "schema": "PUBLIC",
  "username": "MAGE_USER",
  "password": "${env:SNOWFLAKE_PASSWORD}",
  "role": "ANALYST"
}

Private Key Authentication

{
  "account": "xy12345.us-east-1",
  "warehouse": "COMPUTE_WH",
  "database": "ANALYTICS",
  "schema": "PUBLIC",
  "username": "MAGE_USER",
  "private_key_file": "/path/to/rsa_key.p8",
  "private_key_file_pwd": "${env:KEY_PASSWORD}",
  "role": "ANALYST"
}
from mage_integrations.sources.snowflake import Snowflake

config = {
    'account': 'xy12345.us-east-1',
    'warehouse': 'COMPUTE_WH',
    'database': 'ANALYTICS',
    'schema': 'PUBLIC',
    'username': 'MAGE_USER',
    'password': 'secure_password'
}

source = Snowflake(config=config)
catalog = source.discover(streams=['customers', 'transactions'])
source.sync(catalog)

BigQuery

Extract data from Google BigQuery using service account credentials.

Configuration

{
  "dataset": "analytics",
  "path_to_credentials_json_file": "/path/to/service-account.json"
}

Credentials Info (Alternative)

{
  "dataset": "analytics",
  "credentials_info": {
    "type": "service_account",
    "project_id": "my-project",
    "private_key_id": "key-id",
    "private_key": "${env:GCP_PRIVATE_KEY}",
    "client_email": "[email protected]",
    "client_id": "123456789"
  }
}
from mage_integrations.sources.bigquery import BigQuery

config = {
    'dataset': 'analytics',
    'path_to_credentials_json_file': 'service-account.json'
}

source = BigQuery(config=config)
catalog = source.discover()

for stream in catalog.streams:
    print(f"Table: {stream.tap_stream_id}")
    for column, props in stream.schema.properties.items():
        print(f"  - {column}: {props['type']}")

Redshift

Connect to Amazon Redshift using standard authentication or IAM credentials.

Configuration

{
  "host": "redshift-cluster.us-west-2.redshift.amazonaws.com",
  "port": 5439,
  "database": "analytics",
  "schema": "public",
  "user": "mage_user",
  "password": "${env:REDSHIFT_PASSWORD}"
}

IAM Authentication

{
  "cluster_identifier": "my-cluster",
  "database": "analytics",
  "schema": "public",
  "db_user": "mage_user",
  "region": "us-west-2",
  "access_key_id": "${env:AWS_ACCESS_KEY_ID}",
  "secret_access_key": "${env:AWS_SECRET_ACCESS_KEY}"
}

MongoDB

Extract data from MongoDB with automatic schema detection.

Configuration

{
  "host": "mongodb.example.com",
  "port": 27017,
  "database": "app_db",
  "username": "readonly",
  "password": "${env:MONGO_PASSWORD}",
  "auth_database": "admin"
}

Connection String

{
  "connection_string": "mongodb+srv://user:[email protected]/app_db?retryWrites=true",
  "database": "app_db"
}
from mage_integrations.sources.mongodb import MongoDB

config = {
    'host': 'localhost',
    'port': 27017,
    'database': 'mydb',
    'username': 'user',
    'password': 'pass'
}

source = MongoDB(config=config)

# Discover collections
catalog = source.discover(streams=['users', 'orders'])

# Load data
for stream in catalog.streams:
    for rows in source.load_data(stream):
        print(f"Loaded {len(rows)} records from {stream.tap_stream_id}")

MSSQL

Connect to Microsoft SQL Server with Windows or SQL Server authentication.

Configuration

{
  "host": "sqlserver.example.com",
  "port": 1433,
  "database": "production",
  "schema": "dbo",
  "username": "mage_user",
  "password": "${env:MSSQL_PASSWORD}",
  "authentication": "sql"
}

Windows Authentication

{
  "host": "sqlserver.local",
  "database": "production",
  "schema": "dbo",
  "username": "DOMAIN\\user",
  "authentication": "windows"
}

Oracle Database

Extract data from Oracle databases.

Configuration

{
  "host": "oracle.example.com",
  "port": 1521,
  "service_name": "ORCL",
  "username": "mage_user",
  "password": "${env:ORACLE_PASSWORD}"
}

Other Databases

{
  "host": "teradata.example.com",
  "database": "production",
  "username": "dbc",
  "password": "${env:TERADATA_PASSWORD}"
}
{
  "host": "couchbase.example.com",
  "bucket": "my_bucket",
  "username": "admin",
  "password": "${env:COUCHBASE_PASSWORD}"
}
{
  "region": "us-east-1",
  "access_key_id": "${env:AWS_ACCESS_KEY_ID}",
  "secret_access_key": "${env:AWS_SECRET_ACCESS_KEY}",
  "table_names": ["users", "orders"]
}
{
  "host": "doris.example.com",
  "port": 9030,
  "database": "analytics",
  "username": "root",
  "password": "${env:DORIS_PASSWORD}"
}
{
  "host": "dremio.example.com",
  "port": 31010,
  "username": "dremio",
  "password": "${env:DREMIO_PASSWORD}"
}

Replication Methods

All database sources support these replication strategies:

Full Table

Complete refresh of all data on each sync:
{
  "replication_method": "FULL_TABLE"
}

Incremental

Sync only new/updated records based on a timestamp or ID column:
{
  "replication_method": "INCREMENTAL",
  "replication_key": "updated_at",
  "start_date": "2024-01-01T00:00:00Z"
}

Log-Based (PostgreSQL Only)

Real-time CDC using PostgreSQL logical replication:
{
  "replication_method": "LOG_BASED",
  "replication_slot": "mage_slot",
  "publication_name": "mage_pub"
}

Installation

Install database-specific dependencies:
# PostgreSQL
pip install "mage-ai[postgres]"

# MySQL
pip install "mage-ai[mysql]"

# Snowflake
pip install "mage-ai[snowflake]"

# BigQuery
pip install "mage-ai[bigquery]"

# Redshift
pip install "mage-ai[redshift]"

# Oracle
pip install "mage-ai[oracle]"

# Multiple databases
pip install "mage-ai[postgres,mysql,snowflake,bigquery]"

Best Practices

  1. Use read-only credentials for source databases
  2. Enable incremental sync when possible to reduce load
  3. Configure connection pooling for high-volume extractions
  4. Test connections before scheduling pipelines
  5. Monitor replication lag for CDC sources
  6. Use environment variables for sensitive credentials
  7. Select specific streams to reduce data transfer

Next Steps

API Sources

Connect to REST APIs and SaaS platforms

Cloud Storage

Load data from S3, GCS, and Azure Blob Storage

Build docs developers (and LLMs) love