Overview
This guide helps you migrate from Dagster to Mage. Both tools emphasize data assets and testability, but Mage provides a more visual, notebook-style development experience with simpler configuration.
Key Concept Mapping
Core Components
Dagster Mage Notes Asset Block Self-contained data transformation units Op Block Can be combined in a pipeline Job Pipeline Workflow definition with blocks Graph Pipeline Visual DAG representation Resource IO Config Database connections, API clients IOManager Data Exporter Handles output persistence Sensor Sensor Block Monitors external conditions Schedule Trigger Time-based execution Partition Dynamic Block Process multiple data chunks Repository Project Collection of pipelines
Data & Execution
Dagster Mage Notes AssetMaterialization Block Output Automatic data persistence DagsterType Type Hints Use Python type hints for validation Config Schema Block Configuration YAML-based configuration Run Pipeline Run Execution instance with metadata Executor Executor Type Local, K8s, ECS, Cloud Run
Migration Steps
1. Converting Assets to Blocks
Dagster - Software-Defined Assets
Mage Block: raw_users.py
Mage Block: clean_users.py
Mage Block: user_analytics.py
Mage Pipeline: metadata.yaml
from dagster import asset, AssetIn
import pandas as pd
@asset
def raw_users () -> pd.DataFrame:
"""
Load raw user data from database.
"""
return pd.read_sql(
"SELECT * FROM users" ,
con = "postgresql://localhost/db"
)
@asset
def clean_users ( raw_users : pd.DataFrame) -> pd.DataFrame:
"""
Clean and transform user data.
"""
df = raw_users.copy()
df[ 'full_name' ] = df[ 'first_name' ] + ' ' + df[ 'last_name' ]
return df[df[ 'is_active' ]]
@asset (
ins = { "users" : AssetIn( "clean_users" )}
)
def user_analytics ( users : pd.DataFrame) -> pd.DataFrame:
"""
Calculate user analytics.
"""
return users.groupby( 'signup_date' ).agg({
'user_id' : 'count' ,
'revenue' : 'sum'
})
2. Configuration and Resources
Dagster - Resources & Config
Mage - IO Config
Mage Block - Using Config
Block Configuration
from dagster import resource, op, job, ConfigurableResource
from pydantic import Field
class DatabaseResource ( ConfigurableResource ):
host: str
port: int = Field( default = 5432 )
database: str
def get_connection ( self ):
return connect( self .host, self .port, self .database)
@op
def load_data ( context , database : DatabaseResource):
conn = database.get_connection()
return conn.execute( "SELECT * FROM users" )
@job (
resource_defs = {
"database" : DatabaseResource(
host = "localhost" ,
database = "prod"
)
}
)
def my_job ():
load_data()
3. Partitions and Dynamic Execution
Dagster - Partitioned Assets
Mage - Dynamic Blocks
Mage Block: daily_sales.py
Dynamic Configuration
from dagster import asset, DailyPartitionsDefinition
import pandas as pd
daily_partition = DailyPartitionsDefinition( start_date = "2024-01-01" )
@asset ( partitions_def = daily_partition)
def daily_sales ( context ) -> pd.DataFrame:
"""
Process sales for a specific partition.
"""
partition_date = context.partition_key
return pd.read_sql(
f "SELECT * FROM sales WHERE date = ' { partition_date } '" ,
con = get_connection()
)
@asset ( partitions_def = daily_partition)
def aggregated_sales ( context , daily_sales : pd.DataFrame) -> pd.DataFrame:
"""
Aggregate sales data.
"""
return daily_sales.groupby( 'product_id' ).agg({ 'amount' : 'sum' })
4. Scheduling and Sensors
from dagster import schedule, sensor, RunRequest
@schedule (
cron_schedule = "0 2 * * *" ,
job = my_job,
execution_timezone = "America/New_York"
)
def daily_schedule ( context ):
return RunRequest(
run_config = {
"ops" : {
"load_data" : {
"config" : { "date" : context.scheduled_execution_time}
}
}
}
)
@sensor ( job = my_job)
def s3_sensor ( context ):
files = check_for_new_files()
for file in files:
yield RunRequest(
run_key = file ,
run_config = { "file_path" : file }
)
# Create schedule via UI or API
from mage_ai.orchestration.db.models.schedules import PipelineSchedule
PipelineSchedule.create(
name = 'daily_at_2am' ,
pipeline_uuid = 'user_analytics_pipeline' ,
schedule_type = 'time' ,
schedule_interval = '0 2 * * *' ,
status = 'active' ,
variables = {
'env' : 'production' ,
'date' : ' {{ execution_date }} ' ,
},
)
For sensors: # Create sensor block
from mage_ai.io.s3 import S3
if 'sensor' not in globals ():
from mage_ai.data_preparation.decorators import sensor
@sensor
def check_s3_files ( * args , ** kwargs ) -> bool :
"""
Check for new files in S3.
"""
s3 = S3()
files = s3.list_objects( 'my-bucket' , prefix = 'data/' )
# Store file list for downstream processing
if files:
kwargs[ 'context' ][ 'files' ] = files
return True
return False
5. Testing
Dagster - Tests
Mage - Inline Tests
from dagster import materialize, asset
import pytest
@asset
def my_asset ():
return [ 1 , 2 , 3 ]
def test_my_asset ():
result = materialize([my_asset])
assert result.success
materialization = result.asset_materializations_for_node( "my_asset" )[ 0 ]
assert materialization is not None
def test_asset_output ():
from my_assets import my_asset
output = my_asset()
assert len (output) == 3
Mage tests run automatically after each block execution during development and in production. Failed tests prevent downstream blocks from executing.
6. Data Quality and Validation
Dagster - Asset Checks
Mage - Test Functions
from dagster import asset, AssetCheckResult, asset_check
@asset
def users_table ():
return load_users()
@asset_check ( asset = users_table)
def check_users_not_empty ( asset_value ):
return AssetCheckResult(
passed = len (asset_value) > 0 ,
description = "Users table should not be empty"
)
@asset_check ( asset = users_table)
def check_no_null_emails ( asset_value ):
null_count = asset_value[ 'email' ].isnull().sum()
return AssetCheckResult(
passed = null_count == 0 ,
metadata = { "null_count" : null_count}
)
Advanced Features
Multi-Asset Pipelines
from dagster import multi_asset, AssetOut
@multi_asset (
outs = {
"users" : AssetOut(),
"orders" : AssetOut(),
}
)
def load_tables ( context ):
users_df = load_users()
orders_df = load_orders()
return users_df, orders_df
# Mage blocks output multiple variables automatically
from pandas import DataFrame
from typing import Tuple
if 'data_loader' not in globals ():
from mage_ai.data_preparation.decorators import data_loader
@data_loader
def load_data ( * args , ** kwargs ) -> Tuple[DataFrame, DataFrame]:
"""
Load multiple tables.
Returns tuple of dataframes.
"""
users_df = load_users()
orders_df = load_orders()
return users_df, orders_df
Access in downstream blocks: @transformer
def transform ( data_1 : DataFrame, data_2 : DataFrame, * args , ** kwargs ):
# data_1 = users_df
# data_2 = orders_df
return data_1.merge(data_2, on = 'user_id' )
Retries and Error Handling
Dagster - Retry Policy
Mage - Retry Config
Mage - Block-Level Error Handling
from dagster import op, Backoff, RetryPolicy
@op (
retry_policy = RetryPolicy(
max_retries = 3 ,
delay = 30 ,
backoff = Backoff. EXPONENTIAL ,
)
)
def flaky_operation ():
# May fail and retry
pass
Migration Checklist
Common Patterns
Pattern 1: Software-Defined Assets
Dagster : Assets with dependencies via function parameters
Mage : Blocks with upstream/downstream configuration in YAML
Pattern 2: Dynamic Partitioning
Dagster : PartitionsDefinition with partitioned assets
Mage : Dynamic blocks that generate partition metadata
Pattern 3: Multi-Output Operations
Dagster : Multi-assets with multiple AssetOuts
Mage : Blocks returning tuples or lists, accessed by position in downstream blocks
Additional Resources
Getting Help