Skip to main content
Schema evolution enables dlt to automatically detect and handle changes in your data structure without breaking downstream systems. This includes new columns, type changes, and nested data modifications.

How It Works

dlt automatically infers schema on the first pipeline run and adapts to changes over time:
1

Initial Schema Inference

On first run, dlt scans your data and generates a schema:
import dlt

data = [{
    "organization": "Tech Innovations Inc.",
    "address": {
        "building": "r&d",
        "room": 7890,
    },
    "inventory": [
        {"name": "Plasma ray", "inventory_nr": 2411},
        {"name": "Self-aware Roomba", "inventory_nr": 268},
    ]
}]

pipeline = dlt.pipeline(
    pipeline_name="organizations",
    destination="duckdb",
    dataset_name="org_data"
)

# Creates tables: org, org__inventory
pipeline.run(data, table_name="org")
2

Automatic Evolution

On subsequent runs, dlt adapts to schema changes:
# Updated data with schema changes
data = [{
    "organization": "Tech Innovations Inc.",
    "ceo": "Alice Smith",  # NEW: Column added
    "address": {
        "main_block": "r&d",  # RENAMED: building -> main_block
        # REMOVED: room field
    },
    "inventory": [
        # TYPE CHANGE: inventory_nr from int to string
        {"name": "Plasma ray", "inventory_nr": "AR2411"},
    ]
}]

# dlt automatically handles all changes
pipeline.run(data, table_name="org")
3

Schema Changes Applied

What happened:
  • Column added: New ceo column created in org table
  • Column renamed: New address__main_block column created, old address__building stops receiving data
  • Column removed: address__room stops receiving data but remains in destination
  • Type changed: New variant column inventory_nr__v_text created for string type

Nested Data Normalization

dlt flattens nested structures into relational tables:
import dlt

data = [{
    "id": 1,
    "user": "alice",
    "profile": {
        "age": 30,
        "city": "NYC",
        "preferences": {
            "theme": "dark",
            "notifications": True,
        },
    },
    "orders": [
        {"order_id": 101, "amount": 50.0},
        {"order_id": 102, "amount": 75.5},
    ]
}]

pipeline = dlt.pipeline(
    pipeline_name="users",
    destination="duckdb",
    dataset_name="user_data"
)

pipeline.run(data, table_name="users")

# Creates tables:
# - users: id, user, profile__age, profile__city, profile__preferences__theme, profile__preferences__notifications
# - users__orders: order_id, amount, _dlt_parent_id, _dlt_list_idx

Column Type Changes

Variant Columns

When a column’s type changes, dlt creates variant columns:
# First load
data_v1 = [
    {"id": 1, "score": 42},        # score is integer
    {"id": 2, "score": 100},
]

pipeline.run(data_v1, table_name="results")
# Creates: results table with score (bigint)

# Second load - type changed
data_v2 = [
    {"id": 3, "score": "high"},     # score is now string
    {"id": 4, "score": "medium"},
]

pipeline.run(data_v2, table_name="results")
# Creates: variant column score__v_text (text)
# Original score column remains for integers
Variant columns are named using the pattern: {column_name}__v_{type}

Supported Type Coercions

dlt automatically coerces compatible types:
# These type changes are handled automatically:

# Integer -> Float (coerced)
{"value": 42}      # int
{"value": 42.5}    # float - coerced automatically

# String -> Text (coerced)
{"text": "short"}           # varchar(256)
{"text": "very long..." }   # text - upgraded automatically

# Incompatible types create variants:
{"field": 123}      # bigint
{"field": "text"}   # Creates field__v_text

Controlling Schema Changes

Use hints to control how schemas evolve:
import dlt
from dlt.common.schema.typing import TColumnSchema

@dlt.resource
def users():
    yield [
        {"id": 1, "name": "Alice", "email": "[email protected]"},
    ]

# Apply hints to control schema
users.apply_hints(
    columns={
        "id": {"data_type": "bigint", "nullable": False},
        "email": {"data_type": "text", "unique": True},
        "name": {"data_type": "text"},
    }
)

pipeline = dlt.pipeline(
    pipeline_name="users",
    destination="duckdb",
    dataset_name="user_data"
)

pipeline.run(users())

Schema Versioning

dlt tracks schema versions automatically:
import dlt

pipeline = dlt.pipeline(
    pipeline_name="versioned",
    destination="duckdb",
    dataset_name="data"
)

# Access schema
schema = pipeline.default_schema

# View schema version
print(f"Schema version: {schema.version}")
print(f"Schema hash: {schema.version_hash}")

# Export schema to file
schema.to_pretty_yaml("schema.yaml")
schema.to_pretty_json("schema.json")

Tracking Schema Changes

Monitor schema evolution with lineage tracking:
import dlt

pipeline = dlt.pipeline(
    pipeline_name="tracked",
    destination="duckdb",
    dataset_name="data"
)

data = [{"id": 1, "name": "Alice"}]
pipeline.run(data, table_name="users")

# View schema changes in load info
load_info = pipeline.run(data, table_name="users")

for package in load_info.load_packages:
    if package.schema_update:
        print("Schema was updated!")
        for table_name, columns in package.schema_update.items():
            print(f"Table {table_name} changes:")
            for column in columns:
                print(f"  - {column}")

Alerting on Schema Changes

Get notified when schema changes occur:
import dlt

def check_schema_changes(load_info):
    """Alert on schema changes"""
    for package in load_info.load_packages:
        if package.schema_update:
            # Send alert (email, Slack, etc.)
            print(f"⚠️  Schema changed in {package.schema_name}")
            for table_name, columns in package.schema_update.items():
                print(f"Table: {table_name}")
                for col in columns:
                    print(f"  New column: {col['name']} ({col['data_type']})")

pipeline = dlt.pipeline(
    pipeline_name="monitored",
    destination="duckdb",
    dataset_name="data"
)

load_info = pipeline.run(data, table_name="users")
check_schema_changes(load_info)

Custom Normalizers

Customize how data is normalized:
import dlt
from dlt.common.schema.typing import TColumnSchema

# Custom naming convention
def custom_naming(path: str) -> str:
    """Use custom column naming"""
    # Convert nested paths to custom format
    # e.g., "user.profile.name" -> "user_profile_name"
    return path.replace(".", "_")

pipeline = dlt.pipeline(
    pipeline_name="custom",
    destination="duckdb",
    dataset_name="data"
)

# Apply custom normalizer (implement via schema configuration)

Handling Removed Columns

Columns are never deleted from destination:
# First load
data_v1 = [{"id": 1, "name": "Alice", "age": 30}]
pipeline.run(data_v1, table_name="users")
# Creates: id, name, age columns

# Second load - 'age' removed from source
data_v2 = [{"id": 2, "name": "Bob"}]
pipeline.run(data_v2, table_name="users")
# 'age' column remains in destination with NULL for new rows
Removed columns remain in the destination with NULL values for new records. They are never dropped automatically.

Schema Export and Import

Export schemas for documentation or version control:
import dlt

pipeline = dlt.pipeline(
    pipeline_name="export_schema",
    destination="duckdb",
    dataset_name="data"
)

# Load data
pipeline.run(data, table_name="users")

# Export schema
schema = pipeline.default_schema
schema.to_pretty_yaml("schemas/users_schema.yaml")
schema.to_pretty_json("schemas/users_schema.json")

# Import schema
from dlt.common.schema import Schema

imported_schema = Schema.from_dict(
    dlt.common.json.load_json("schemas/users_schema.json")
)

Best Practices

  • Design data models expecting changes
  • Use semantic versioning for major changes
  • Document expected schema evolution
  • Test schema changes in development first
  • Set up alerts for schema modifications
  • Review schema changes regularly
  • Track variant column creation
  • Document type changes for consumers
  • Use schema contracts to restrict changes
  • Apply hints to enforce data types
  • Validate critical columns
  • Test with sample data first
  • Notify downstream consumers of changes
  • Maintain schema documentation
  • Version control schema files
  • Use column lineage for tracking

Next Steps

Data Contracts

Control and validate schema changes

Incremental Loading

Combine with incremental loading

Build docs developers (and LLMs) love