Skip to main content
Mage provides native integrations with major cloud data warehouses, enabling you to build scalable data pipelines with optimized batch loading and schema management.

Supported Data Warehouses

Google BigQuery

Serverless data warehouse with ML and BI capabilities

Snowflake

Cloud data platform with elastic compute and storage

Amazon Redshift

AWS data warehouse with columnar storage

Apache Doris

Real-time analytical database

Google BigQuery

Configuration

project_id: my-gcp-project
dataset: analytics
table: events
path_to_credentials_json_file: /path/to/service-account.json
location: US  # or EU, asia-northeast1, etc.

Features

  • Batch load API - High-performance data loading using native BigQuery Storage API
  • Automatic partitioning - Date-based table partitioning
  • Schema evolution - Automatic column additions and type changes
  • Nested structures - ARRAY and STRUCT (JSON) support
  • Query parameters - Parameterized inserts for large datasets
  • Transaction support - ACID compliance with sessions

Batch Loading

Enable batch loading for significantly faster data ingestion:
use_batch_load: true
Batch loading uses:
  • Pandas DataFrames → Parquet format
  • Arrays in data → JSON format
  • Storage Write API for parallel uploads
Standard Insert: ~1,000 rows/secondBatch Load: ~50,000+ rows/secondBatch loading is recommended for:
  • Large datasets (>10,000 rows)
  • Frequent data loads
  • Time-sensitive pipelines

Partitioning

Automatically partition tables by date:
partition_keys:
  - created_date
Mage creates partitioned tables:
CREATE TABLE `project.dataset.table`
(
  id INT64,
  name STRING,
  created_date DATE,
  _mage_created_at DATETIME
)
PARTITION BY
  DATE(created_date)

Data Type Mapping

Python TypeBigQuery Type
strSTRING
intINT64
floatFLOAT64
boolBOOL
datetimeDATETIME
dictJSON
listARRAY

Unique Constraints

Handle duplicates with MERGE operations:
unique_constraints:
  - user_id
  - event_date
unique_conflict_method: UPDATE
Generates a MERGE statement:
MERGE INTO `project.dataset.events` AS a
USING (SELECT * FROM `temp_table`) AS b
ON ((a.user_id IS NULL AND b.user_id IS NULL) OR a.user_id = b.user_id)
   AND ((a.event_date IS NULL AND b.event_date IS NULL) OR a.event_date = b.event_date)
WHEN MATCHED THEN
  UPDATE SET a.event_name = b.event_name, a._mage_updated_at = b._mage_updated_at
WHEN NOT MATCHED THEN
  INSERT (user_id, event_date, event_name) VALUES (b.user_id, b.event_date, b.event_name)

Snowflake

Configuration

account: xy12345.us-east-1
warehouse: COMPUTE_WH
database: ANALYTICS
schema: PUBLIC
table: EVENTS
username: MAGE_USER
password: your_password
role: MAGE_ROLE  # Optional

Features

  • Batch loading - Fast data loading with write_pandas
  • Key pair authentication - Secure authentication without passwords
  • Role-based access - Fine-grained permissions
  • MERGE operations - Efficient upserts
  • VARIANT columns - Store semi-structured data (JSON, Arrays)
  • Temporary tables - Session-scoped staging tables

Batch Loading

Snowflake’s batch load uses the native write_pandas function:
use_batch_load: true
Benefits:
  • Automatic Parquet conversion
  • Parallel file uploads
  • Native compression
  • Up to 10x faster than standard SQL inserts

Unique Constraints with Temp Tables

Upserts use temporary tables for staging:
unique_constraints:
  - customer_id
unique_conflict_method: UPDATE
Process:
  1. Create temporary table matching target schema
  2. Load data into temp table using write_pandas
  3. Execute MERGE from temp to target
  4. Drop temporary table
CREATE TEMP TABLE "ANALYTICS"."PUBLIC"."temp_EVENTS" LIKE "ANALYTICS"."PUBLIC"."EVENTS";

-- Load data using write_pandas

MERGE INTO "ANALYTICS"."PUBLIC"."EVENTS" AS a
USING (SELECT * FROM "ANALYTICS"."PUBLIC"."temp_EVENTS") AS b
ON a."customer_id" = b."customer_id"
WHEN MATCHED THEN UPDATE SET a."name" = b."name", a."email" = b."email"
WHEN NOT MATCHED THEN INSERT ("customer_id", "name", "email") VALUES (b."customer_id", b."name", b."email");

DROP TABLE IF EXISTS "ANALYTICS"."PUBLIC"."temp_EVENTS";

Data Type Mapping

Python TypeSnowflake Type
strVARCHAR
intNUMBER
floatFLOAT
boolBOOLEAN
datetimeTIMESTAMP
dictVARIANT
listARRAY

Disable Double Quotes

Snowflake uppercases identifiers by default. Disable quotes to match this behavior:
disable_double_quotes: true

Amazon Redshift

Configuration

host: my-cluster.abc123.us-east-1.redshift.amazonaws.com
port: 5439
user: mage_user
password: your_password
database: analytics
schema: public
table: events

Features

  • IAM authentication - AWS credential-based access
  • Redshift Serverless - Automatic scaling without cluster management
  • MERGE operations - Efficient deduplication
  • Columnar storage - Optimized for analytical queries
  • Distribution keys - Optimize data distribution across nodes
  • Sort keys - Improve query performance

Merge Load Method

Use staging tables for better performance:
use_merge_load: true
Process:
  1. Create staging table with same schema
  2. Load data into staging table
  3. Execute MERGE to deduplicate and update
  4. Drop staging table
DROP TABLE IF EXISTS public.stage_events;
CREATE TABLE public.stage_events (LIKE public.events INCLUDING DEFAULTS);

INSERT INTO public.stage_events (id, name, created_at)
VALUES (1, 'Event A', '2024-01-01'), (2, 'Event B', '2024-01-02');

MERGE INTO public.events USING public.stage_events AS _source
ON (events.id = _source.id) REMOVE DUPLICATES;

DROP TABLE IF EXISTS public.stage_events;

Data Type Mapping

Python TypeRedshift Type
strVARCHAR(65535)
intBIGINT
floatDOUBLE PRECISION
boolBOOLEAN
datetimeTIMESTAMP
dictVARCHAR(65535)
listVARCHAR(65535)

Redshift Serverless

Connecting to Redshift Serverless:
host: workgroup-name.123456789012.us-east-1.redshift-serverless.amazonaws.com
port: 5439
region: us-east-1
db_user: IAMR:admin
# Use IAM authentication

Apache Doris

Configuration

sqlalchemy_url: doris://user:password@host:9030/database
table: events

Features

  • Real-time analytics - Low-latency queries
  • MySQL protocol - Compatible with MySQL clients
  • Vectorized execution - High performance queries
  • MPP architecture - Distributed processing

Common Configuration Options

Lowercase Column Names

Force lowercase column names:
use_lowercase: true

Disable Column Type Updates

Prevent automatic schema changes:
disable_update_column_types: true

Custom Table Names

Override table names per stream:
table: custom_table_name

Performance Optimization

Use Batch Load
use_batch_load: true
max_subquery_count: 500  # Adjust based on data size
Partition Large Tables
partition_keys:
  - event_date
Query Optimization
  • BigQuery automatically manages query parameters
  • Supports up to 100MB query payload
  • Automatic retry on concurrent update conflicts
Enable Batch Load
use_batch_load: true
Warehouse Sizing
  • Start with X-Small for testing
  • Scale up for production workloads
  • Use multi-cluster warehouses for concurrency
Key Pair Authentication
  • Faster than password authentication
  • More secure for production
  • Supports key rotation
Use Merge Load
use_merge_load: true
Distribution Strategy
-- Define distribution key when creating table
CREATE TABLE events (
  event_id INT,
  user_id INT,
  event_name VARCHAR(256)
)
DISTKEY(user_id)  -- Distribute by frequently joined column
SORTKEY(event_id);  -- Sort by frequently filtered column
VACUUM and ANALYZE
  • Run VACUUM regularly to reclaim space
  • Run ANALYZE after large loads to update statistics

Example: BigQuery Export with Batch Load

from mage_ai.settings.repo import get_repo_path
from mage_ai.io.bigquery import BigQuery
from mage_ai.io.config import ConfigFileLoader
from pandas import DataFrame
import os

if 'data_exporter' not in globals():
    from mage_ai.data_preparation.decorators import data_exporter

@data_exporter
def export_to_bigquery(df: DataFrame, **kwargs) -> None:
    """
    Export data to BigQuery using batch load for optimal performance.
    """
    config_path = os.path.join(get_repo_path(), 'io_config.yaml')
    config_profile = 'default'

    config = ConfigFileLoader(config_path, config_profile)
    
    # BigQuery configuration
    table_id = 'my-project.analytics.events'
    
    BigQuery.with_config(config).export(
        df,
        table_id,
        if_exists='append',  # 'replace' or 'fail'
        use_batch_load=True,  # Enable batch loading
    )

Error Handling

Concurrent Update Conflicts
  • Mage automatically retries up to 2 times
  • Recreates table if needed
Query Size Limits
  • Automatically splits large queries
  • Uses query parameters to reduce size
Schema Conflicts
  • Check disable_update_column_types setting
  • Verify column type compatibility
Authentication Failures
  • Verify account identifier format: account.region
  • Check role permissions
  • Validate key pair if using
Warehouse Suspended
  • Snowflake auto-resumes warehouses
  • May cause initial query delay
Connection Timeouts
  • Check VPC security groups
  • Verify cluster is publicly accessible (if needed)
  • Use VPN or SSH tunnel for private clusters
Permission Denied
  • Grant INSERT, UPDATE, DELETE on tables
  • Grant USAGE on schema

Next Steps

Cloud Storage

Export to S3, GCS, and Delta Lake

Databases

Configure traditional database destinations

Build docs developers (and LLMs) love