Skip to main content

Overview

This guide helps you migrate from Dagster to Mage. Both tools emphasize data assets and testability, but Mage provides a more visual, notebook-style development experience with simpler configuration.

Key Concept Mapping

Core Components

DagsterMageNotes
AssetBlockSelf-contained data transformation units
OpBlockCan be combined in a pipeline
JobPipelineWorkflow definition with blocks
GraphPipelineVisual DAG representation
ResourceIO ConfigDatabase connections, API clients
IOManagerData ExporterHandles output persistence
SensorSensor BlockMonitors external conditions
ScheduleTriggerTime-based execution
PartitionDynamic BlockProcess multiple data chunks
RepositoryProjectCollection of pipelines

Data & Execution

DagsterMageNotes
AssetMaterializationBlock OutputAutomatic data persistence
DagsterTypeType HintsUse Python type hints for validation
Config SchemaBlock ConfigurationYAML-based configuration
RunPipeline RunExecution instance with metadata
ExecutorExecutor TypeLocal, K8s, ECS, Cloud Run

Migration Steps

1. Converting Assets to Blocks

from dagster import asset, AssetIn
import pandas as pd

@asset
def raw_users() -> pd.DataFrame:
    """
    Load raw user data from database.
    """
    return pd.read_sql(
        "SELECT * FROM users",
        con="postgresql://localhost/db"
    )

@asset
def clean_users(raw_users: pd.DataFrame) -> pd.DataFrame:
    """
    Clean and transform user data.
    """
    df = raw_users.copy()
    df['full_name'] = df['first_name'] + ' ' + df['last_name']
    return df[df['is_active']]

@asset(
    ins={"users": AssetIn("clean_users")}
)
def user_analytics(users: pd.DataFrame) -> pd.DataFrame:
    """
    Calculate user analytics.
    """
    return users.groupby('signup_date').agg({
        'user_id': 'count',
        'revenue': 'sum'
    })

2. Configuration and Resources

from dagster import resource, op, job, ConfigurableResource
from pydantic import Field

class DatabaseResource(ConfigurableResource):
    host: str
    port: int = Field(default=5432)
    database: str
    
    def get_connection(self):
        return connect(self.host, self.port, self.database)

@op
def load_data(context, database: DatabaseResource):
    conn = database.get_connection()
    return conn.execute("SELECT * FROM users")

@job(
    resource_defs={
        "database": DatabaseResource(
            host="localhost",
            database="prod"
        )
    }
)
def my_job():
    load_data()

3. Partitions and Dynamic Execution

from dagster import asset, DailyPartitionsDefinition
import pandas as pd

daily_partition = DailyPartitionsDefinition(start_date="2024-01-01")

@asset(partitions_def=daily_partition)
def daily_sales(context) -> pd.DataFrame:
    """
    Process sales for a specific partition.
    """
    partition_date = context.partition_key
    return pd.read_sql(
        f"SELECT * FROM sales WHERE date = '{partition_date}'",
        con=get_connection()
    )

@asset(partitions_def=daily_partition)
def aggregated_sales(context, daily_sales: pd.DataFrame) -> pd.DataFrame:
    """
    Aggregate sales data.
    """
    return daily_sales.groupby('product_id').agg({'amount': 'sum'})

4. Scheduling and Sensors

from dagster import schedule, sensor, RunRequest

@schedule(
    cron_schedule="0 2 * * *",
    job=my_job,
    execution_timezone="America/New_York"
)
def daily_schedule(context):
    return RunRequest(
        run_config={
            "ops": {
                "load_data": {
                    "config": {"date": context.scheduled_execution_time}
                }
            }
        }
    )

@sensor(job=my_job)
def s3_sensor(context):
    files = check_for_new_files()
    for file in files:
        yield RunRequest(
            run_key=file,
            run_config={"file_path": file}
        )

5. Testing

from dagster import materialize, asset
import pytest

@asset
def my_asset():
    return [1, 2, 3]

def test_my_asset():
    result = materialize([my_asset])
    assert result.success
    
    materialization = result.asset_materializations_for_node("my_asset")[0]
    assert materialization is not None

def test_asset_output():
    from my_assets import my_asset
    output = my_asset()
    assert len(output) == 3
Mage tests run automatically after each block execution during development and in production. Failed tests prevent downstream blocks from executing.

6. Data Quality and Validation

from dagster import asset, AssetCheckResult, asset_check

@asset
def users_table():
    return load_users()

@asset_check(asset=users_table)
def check_users_not_empty(asset_value):
    return AssetCheckResult(
        passed=len(asset_value) > 0,
        description="Users table should not be empty"
    )

@asset_check(asset=users_table)
def check_no_null_emails(asset_value):
    null_count = asset_value['email'].isnull().sum()
    return AssetCheckResult(
        passed=null_count == 0,
        metadata={"null_count": null_count}
    )

Advanced Features

Multi-Asset Pipelines

from dagster import multi_asset, AssetOut

@multi_asset(
    outs={
        "users": AssetOut(),
        "orders": AssetOut(),
    }
)
def load_tables(context):
    users_df = load_users()
    orders_df = load_orders()
    return users_df, orders_df

Retries and Error Handling

from dagster import op, Backoff, RetryPolicy

@op(
    retry_policy=RetryPolicy(
        max_retries=3,
        delay=30,
        backoff=Backoff.EXPONENTIAL,
    )
)
def flaky_operation():
    # May fail and retry
    pass

Migration Checklist

  • Map Assets: Identify all Dagster assets and ops
  • Review Dependencies: Document asset dependencies and data flow
  • Extract Configurations: Export resource configs and run configs
  • Create Pipelines: Build equivalent Mage pipelines
  • Convert Logic: Transform each asset into a Mage block
  • Migrate Tests: Convert asset checks to Mage test functions
  • Set Up IO Configs: Configure database and API connections
  • Configure Schedules: Set up triggers for automated runs
  • Handle Partitions: Implement dynamic blocks for partitioned assets
  • Test Execution: Run full pipeline tests
  • Set Up Monitoring: Configure alerts and notifications
  • Document Pipelines: Add descriptions and documentation

Common Patterns

Pattern 1: Software-Defined Assets

Dagster: Assets with dependencies via function parameters Mage: Blocks with upstream/downstream configuration in YAML

Pattern 2: Dynamic Partitioning

Dagster: PartitionsDefinition with partitioned assets Mage: Dynamic blocks that generate partition metadata

Pattern 3: Multi-Output Operations

Dagster: Multi-assets with multiple AssetOuts Mage: Blocks returning tuples or lists, accessed by position in downstream blocks

Additional Resources

Getting Help

Build docs developers (and LLMs) love