Skip to main content
Mage is built around a simple, modular architecture that makes building data pipelines intuitive and maintainable. This guide covers the core concepts you need to know.

Pipelines

A pipeline is a directed acyclic graph (DAG) of blocks that process data. Pipelines define the workflow from data ingestion to export.

Pipeline Types

Mage supports several pipeline types:

Standard (Batch)

Traditional ETL pipelines that run on a schedule or manually. Process data in batches.

Streaming

Real-time data processing pipelines for continuous data streams.

Integration

Pre-built connectors for syncing data between sources and destinations.

DBT

Run dbt models directly inside Mage with full observability.

Creating a Pipeline

Pipelines are created with the Pipeline.create() method or via the CLI:
from mage_ai.data_preparation.models.pipeline import Pipeline, PipelineType

pipeline = Pipeline.create(
    'my_pipeline',
    repo_path='/path/to/project',
    pipeline_type=PipelineType.PYTHON,
    description='My data pipeline',
    tags=['etl', 'production']
)
Or using the CLI:
mage init my_project
cd my_project
# Pipelines are created through the UI or programmatically

Pipeline Structure

Pipelines are stored in your project directory:
my_project/
├── pipelines/
│   └── my_pipeline/
│       ├── metadata.yaml      # Pipeline configuration
│       ├── __init__.py
│       └── requirements.txt   # Pipeline-specific dependencies
└── io_config.yaml             # Data source configurations
The metadata.yaml file contains:
blocks:
- uuid: load_data
  type: data_loader
  upstream_blocks: []
  downstream_blocks: [transform_data]
- uuid: transform_data
  type: transformer
  upstream_blocks: [load_data]
  downstream_blocks: [export_data]
- uuid: export_data
  type: data_exporter
  upstream_blocks: [transform_data]
  downstream_blocks: []
created_at: '2026-03-04 12:00:00.000000+00:00'
name: my_pipeline
type: python
uuid: my_pipeline

Blocks

Blocks are the building units of pipelines. Each block performs a specific function in your data workflow. Blocks are Python, SQL, or R files that execute code and pass data to downstream blocks.

Block Types

Mage has several block types, each serving a different purpose:

Data Loader

Data loaders import data from external sources into your pipeline.
import pandas as pd

if 'data_loader' not in globals():
    from mage_ai.data_preparation.decorators import data_loader

@data_loader
def load_data(*args, **kwargs):
    """
    Load data from an API endpoint
    """
    url = 'https://api.example.com/data'
    response = requests.get(url)
    return pd.DataFrame(response.json())
Use cases:
  • Load data from APIs
  • Read from databases (PostgreSQL, MySQL, BigQuery, etc.)
  • Import from files (CSV, Parquet, JSON)
  • Fetch from cloud storage (S3, GCS, Azure Blob)

Block Structure

Blocks are stored as Python files in your project:
my_project/
├── data_loaders/
│   └── load_api_data.py
├── transformers/
│   └── clean_data.py
└── data_exporters/
    └── export_to_postgres.py

Block Decorators

Mage uses Python decorators to define block functionality:
DecoratorPurposeReturn Type
@data_loaderLoad data from external sourcesDataFrame, dict, list, or any serializable object
@transformerTransform data from upstream blocksDataFrame, dict, list, or any serializable object
@data_exporterExport data to external destinationsNone (or optionally return data)
@testTest block outputNone (raises AssertionError on failure)
@sensorCheck external conditionsbool (True to proceed)
@customExecute custom logicAny
@callbackRun after block completionNone

Block Dependencies

Blocks are connected through upstream and downstream relationships:
# Transformer with multiple upstream blocks
from pandas import DataFrame

if 'transformer' not in globals():
    from mage_ai.data_preparation.decorators import transformer

@transformer
def join_data(
    customers: DataFrame,  # From first upstream block
    orders: DataFrame,     # From second upstream block
    *args, **kwargs
) -> DataFrame:
    """
    Join customer and order data
    """
    return customers.merge(
        orders,
        left_on='customer_id',
        right_on='id',
        how='left'
    )
The pipeline executes blocks in topological order based on dependencies.

Data Flow

Data flows through your pipeline automatically:
1

Block execution

When a block runs, it:
  1. Receives output from upstream blocks as function arguments
  2. Executes your code
  3. Returns data to downstream blocks
2

Data persistence

Block outputs are automatically saved to:
my_project/.mage_data/
└── my_pipeline/
    └── load_data/
        └── output_0.parquet
3

Data retrieval

Downstream blocks automatically load the saved data:
  • No manual file handling required
  • Data is cached between runs
  • Supports multiple output variables

Multiple Outputs

Blocks can return multiple outputs:
@data_loader
def load_data(*args, **kwargs):
    """
    Return multiple dataframes
    """
    customers = pd.read_csv('customers.csv')
    orders = pd.read_csv('orders.csv')
    
    # Return tuple for multiple outputs
    return customers, orders
Downstream blocks receive them as separate arguments:
@transformer
def transform(customers: DataFrame, orders: DataFrame, *args) -> DataFrame:
    return customers.merge(orders, on='customer_id')

Variables and Configuration

Global Variables

Pass variables to your pipeline:
@data_loader
def load_data(*args, **kwargs):
    # Access global variables
    env = kwargs.get('env', 'dev')
    api_key = kwargs.get('api_key')
    
    url = f'https://api.{env}.example.com/data'
    return fetch_data(url, api_key)
Set variables in the UI or via CLI:
mage run my_project my_pipeline --runtime-vars '{"env": "prod", "api_key": "xxx"}'

Block Configuration

Store block-specific config in io_config.yaml:
io_config.yaml
default:
  POSTGRES_HOST: localhost
  POSTGRES_PORT: 5432
  POSTGRES_USER: mage
  POSTGRES_PASSWORD: password
  POSTGRES_DB: analytics
Access in blocks:
from mage_ai.settings.repo import get_repo_path
from mage_ai.io.config import ConfigFileLoader
from os import path

config_path = path.join(get_repo_path(), 'io_config.yaml')
config = ConfigFileLoader(config_path, 'default')

host = config['POSTGRES_HOST']

Testing

Every block should include tests:
if 'test' not in globals():
    from mage_ai.data_preparation.decorators import test

@test
def test_output(output: DataFrame, *args) -> None:
    """
    Validate block output
    """
    assert output is not None, 'Output is undefined'
    assert len(output) > 0, 'Output is empty'
    assert output['amount'].min() >= 0, 'Negative amounts found'
    assert not output['id'].duplicated().any(), 'Duplicate IDs found'
Run tests:
mage run my_project my_pipeline --test

Execution Modes

Sequential Execution

Blocks run one at a time in dependency order:
pipeline.execute_sync(
    global_vars={'env': 'prod'},
    run_tests=True
)

Parallel Execution

Independent blocks run in parallel:
await pipeline.execute(
    global_vars={'env': 'prod'},
    parallel=True,
    run_tests=True
)

Block-Level Execution

Run a single block:
mage run my_project my_pipeline --block-uuid transform_data

Advanced Features

Dynamic Blocks

Generate blocks dynamically at runtime:
@data_loader
def load_data(*args, **kwargs):
    # Return list of items to process
    return [
        {'date': '2024-01-01', 'region': 'US'},
        {'date': '2024-01-02', 'region': 'EU'},
        {'date': '2024-01-03', 'region': 'ASIA'},
    ]

@transformer
def process_partition(data: dict, *args) -> DataFrame:
    # This block runs once per item
    date = data['date']
    region = data['region']
    return fetch_and_process(date, region)

Conditional Blocks

Execute blocks conditionally:
@transformer
def should_process(df: DataFrame, *args) -> bool:
    # Return True to execute downstream blocks
    return len(df) > 1000

SQL Blocks

Write SQL queries directly:
-- data_loader
SELECT 
    customer_id,
    SUM(amount) as total_spent,
    COUNT(*) as order_count
FROM orders
WHERE created_at >= '2024-01-01'
GROUP BY customer_id

Best Practices

Keep blocks focused

Each block should do one thing well. Small, reusable blocks are easier to test and maintain.

Write tests

Always include @test functions to validate your data quality and business logic.

Use descriptive names

Name blocks clearly: load_customer_data, not load_data_1.

Document your code

Add docstrings to explain what each block does and any important assumptions.

Next Steps

Quick Start

Build your first pipeline

API Reference

Explore the complete API

Examples

Browse real-world examples

Deployment

Deploy to production

Build docs developers (and LLMs) love