Skip to main content
Mage provides native integrations with major cloud storage platforms, supporting CSV, Parquet, and Excel file formats. Load data incrementally based on file modification times and organize pipelines around bucket prefixes.

Supported Platforms

Amazon S3

AWS object storage with global availability

Google Cloud Storage

GCP object storage and data lakes

Azure Blob Storage

Azure cloud storage platform

Amazon S3

Extract data from S3 buckets with support for CSV, Parquet, and multiple file patterns.

Configuration

{
  "bucket": "my-data-bucket",
  "prefix": "raw_data/",
  "search_pattern": ".*\\.csv$",
  "aws_access_key_id": "${env:AWS_ACCESS_KEY_ID}",
  "aws_secret_access_key": "${env:AWS_SECRET_ACCESS_KEY}",
  "aws_region": "us-west-2",
  "file_type": "csv"
}

File Types

Supported file formats:
  • csv - Comma-separated values
  • parquet - Columnar storage format

IAM Role Authentication

Use IAM roles instead of access keys:
{
  "bucket": "my-data-bucket",
  "prefix": "raw_data/",
  "search_pattern": ".*\\.parquet$",
  "role_arn": "arn:aws:iam::123456789012:role/MageDataAccess",
  "role_session_name": "mage-data-integration",
  "aws_region": "us-west-2",
  "file_type": "parquet"
}

Multiple Tables Configuration

Extract different tables from different prefixes:
{
  "bucket": "my-data-bucket",
  "aws_access_key_id": "${env:AWS_ACCESS_KEY_ID}",
  "aws_secret_access_key": "${env:AWS_SECRET_ACCESS_KEY}",
  "aws_region": "us-west-2",
  "table_configs": [
    {
      "table_name": "users",
      "prefix": "raw_data/users/",
      "search_pattern": ".*\\.csv$"
    },
    {
      "table_name": "orders",
      "prefix": "raw_data/orders/",
      "search_pattern": ".*\\.parquet$"
    },
    {
      "table_name": "events",
      "prefix": "raw_data/events/",
      "search_pattern": ".*\\.csv$"
    }
  ]
}

Custom S3-Compatible Endpoints

Connect to MinIO, DigitalOcean Spaces, or other S3-compatible services:
{
  "bucket": "my-bucket",
  "prefix": "data/",
  "aws_access_key_id": "${env:S3_ACCESS_KEY}",
  "aws_secret_access_key": "${env:S3_SECRET_KEY}",
  "aws_endpoint": "https://nyc3.digitaloceanspaces.com",
  "aws_region": "us-east-1",
  "file_type": "csv"
}
from mage_integrations.sources.amazon_s3 import AmazonS3

config = {
    'bucket': 'my-data-bucket',
    'prefix': 'raw_data/users/',
    'search_pattern': r'.*\.csv$',
    'aws_access_key_id': 'AKIAIOSFODNN7EXAMPLE',
    'aws_secret_access_key': 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY',
    'aws_region': 'us-west-2',
    'file_type': 'csv'
}

source = AmazonS3(config=config)

# Test connection
source.test_connection()

# Discover schema from first file
catalog = source.discover()
for stream in catalog.streams:
    print(f"Stream: {stream.tap_stream_id}")
    print(f"Columns: {list(stream.schema.properties.keys())}")

# Load data
for rows in source.load_data(catalog.streams[0]):
    print(f"Loaded {len(rows)} rows")

Google Cloud Storage

Load data from GCS buckets with service account authentication.

Configuration

{
  "bucket": "my-gcs-bucket",
  "prefix": "raw_data/",
  "file_type": "parquet",
  "path_to_credentials_json_file": "/path/to/service-account.json"
}

Credentials Info (Alternative)

Pass credentials directly instead of file path:
{
  "bucket": "my-gcs-bucket",
  "prefix": "raw_data/",
  "file_type": "csv",
  "credentials_info": {
    "type": "service_account",
    "project_id": "my-project",
    "private_key_id": "key-id",
    "private_key": "${env:GCP_PRIVATE_KEY}",
    "client_email": "[email protected]",
    "client_id": "123456789"
  }
}
  1. Go to Google Cloud Console
  2. Navigate to IAM & Admin > Service Accounts
  3. Create a new service account
  4. Grant “Storage Object Viewer” role
  5. Create and download JSON key
  6. Grant service account access to the bucket
from mage_integrations.sources.google_cloud_storage import GoogleCloudStorage

config = {
    'bucket': 'my-gcs-bucket',
    'prefix': 'raw_data/',
    'file_type': 'parquet',
    'path_to_credentials_json_file': 'service-account.json'
}

source = GoogleCloudStorage(config=config)

# Test connection
source.test_connection()

# Discover files
catalog = source.discover()
print(f"Found {len(catalog.streams)} streams")

# Load all files
for stream in catalog.streams:
    for rows in source.load_data(stream):
        print(f"Loaded {len(rows)} rows from {stream.tap_stream_id}")

Azure Blob Storage

Extract data from Azure Blob Storage containers.

Configuration

{
  "container_name": "my-container",
  "prefix": "raw_data/",
  "connection_string": "${env:AZURE_STORAGE_CONNECTION_STRING}"
}

Connection String Format

DefaultEndpointsProtocol=https;
AccountName=myaccount;
AccountKey=mykey;
EndpointSuffix=core.windows.net
  1. Go to Azure Portal
  2. Navigate to Storage Accounts
  3. Select your storage account
  4. Go to Access keys
  5. Copy the connection string
from mage_integrations.sources.azure_blob_storage import AzureBlobStorage

config = {
    'container_name': 'my-container',
    'prefix': 'raw_data/',
    'connection_string': 'DefaultEndpointsProtocol=https;...'
}

source = AzureBlobStorage(config=config)

# Test connection
source.test_connection()

# Discover blobs
catalog = source.discover()

# Load data
for stream in catalog.streams:
    for rows in source.load_data(stream):
        print(f"Loaded {len(rows)} rows")

File Format Support

CSV Files

Automatically detect encoding and delimiters:
{
  "file_type": "csv",
  "bucket": "my-bucket",
  "prefix": "data/"
}
Mage automatically handles:
  • Various encodings (UTF-8, Latin-1, etc.)
  • Different delimiters (comma, tab, pipe)
  • Headers and data types
  • Null values

Parquet Files

Native Parquet support for efficient columnar storage:
{
  "file_type": "parquet",
  "bucket": "my-bucket",
  "prefix": "data/"
}
Benefits:
  • Compressed storage
  • Column pruning
  • Predicate pushdown
  • Schema preservation

Excel Files (API Source)

For Excel files, use the generic API source:
{
  "url": "https://example.com/data.xlsx",
  "has_header": true
}

Search Patterns

Use regex patterns to filter files:
{
  "search_pattern": "users_.*\\.csv$"
}
Matches: users_2024.csv, users_01.csv
{
  "search_pattern": "data_2024-03-.*\\.parquet$"
}
Matches: data_2024-03-01.parquet, data_2024-03-15.parquet
{
  "search_pattern": "^(?!.*test).*\\.csv$"
}
Matches CSV files not containing “test”
{
  "search_pattern": ".*\\.(csv|parquet)$"
}
Matches both CSV and Parquet files

Incremental Loading

Load only new or modified files based on LastModified timestamp:
from mage_integrations.sources.amazon_s3 import AmazonS3

# Initial load - loads all files
source = AmazonS3(config=config)
for stream in catalog.streams:
    for rows in source.load_data(stream):
        # Process rows
        pass

# Subsequent loads - only new files
state = {
    "bookmarks": {
        "raw_data_users": {
            "_s3_last_modified": "2024-03-01 12:00:00.000000"
        }
    }
}

source = AmazonS3(config=config, state=state)
for stream in catalog.streams:
    for rows in source.load_data(stream, bookmarks=state['bookmarks'][stream.tap_stream_id]):
        # Only new files since last sync
        pass

Schema Discovery

Mage automatically infers schema from the first file:
source = AmazonS3(config=config)
catalog = source.discover()

for stream in catalog.streams:
    print(f"\nTable: {stream.tap_stream_id}")
    print("Columns:")
    for column, props in stream.schema.properties.items():
        col_type = props['type']
        col_format = props.get('format', '')
        print(f"  - {column}: {col_type} {col_format}")
Output:
Table: raw_data_users
Columns:
  - id: ['null', 'integer']
  - name: ['null', 'string']
  - email: ['null', 'string']
  - created_at: ['null', 'string'] date-time
  - _s3_last_modified: ['string'] date-time

Partitioned Data

Load data organized in partitioned folders:
{
  "bucket": "my-data-bucket",
  "table_configs": [
    {
      "table_name": "events",
      "prefix": "events/year=2024/month=03/",
      "search_pattern": ".*\\.parquet$"
    },
    {
      "table_name": "events",
      "prefix": "events/year=2024/month=02/",
      "search_pattern": ".*\\.parquet$"
    }
  ]
}

SFTP Support

For SFTP file transfers, use the SFTP source:
{
  "host": "sftp.example.com",
  "port": 22,
  "username": "user",
  "password": "${env:SFTP_PASSWORD}",
  "remote_path": "/data/exports/",
  "file_pattern": "*.csv"
}
Alternatively, use SSH key authentication:
{
  "host": "sftp.example.com",
  "port": 22,
  "username": "user",
  "private_key_path": "/path/to/id_rsa",
  "remote_path": "/data/exports/",
  "file_pattern": "*.csv"
}

Installation

# Amazon S3
pip install "mage-ai[s3]"

# Google Cloud Storage
pip install "mage-ai[google-cloud-storage]"

# Azure Blob Storage
pip install "mage-ai[azure]"

# All cloud storage providers
pip install "mage-ai[s3,google-cloud-storage,azure]"

Best Practices

  1. Use IAM roles instead of access keys when running in cloud environments
  2. Organize files by prefix for easier management and partitioning
  3. Use Parquet format for large datasets to reduce storage and transfer costs
  4. Implement file naming conventions that include timestamps
  5. Enable versioning on buckets for data recovery
  6. Set lifecycle policies to archive old files
  7. Monitor storage costs and set up alerts
  8. Use incremental loading to avoid reprocessing unchanged files
  9. Test with small file sets before full production runs
  10. Compress CSV files (gzip) to reduce transfer time

Performance Tips

Large Files

For very large files, consider:
  • Using Parquet instead of CSV
  • Splitting files into smaller chunks
  • Using columnar formats for analytics

Many Small Files

For many small files:
  • Combine files before loading
  • Use batching in your pipeline
  • Consider using streaming sources instead

Cross-Region Transfers

For cross-region data:
  • Use same region for source and Mage deployment
  • Enable transfer acceleration (S3)
  • Consider data replication

Troubleshooting

Check IAM permissions:
  • S3: s3:GetObject, s3:ListBucket
  • GCS: storage.objects.get, storage.objects.list
  • Azure: Storage Blob Data Reader role
Ensure all files have consistent schema:
  • Same column names
  • Same data types
  • Same delimiter (for CSV)
Verify:
  • Prefix path is correct
  • Search pattern matches files
  • Files are not empty
  • Credentials have access
For CSV files with special characters:
  • Ensure UTF-8 encoding
  • Check for BOM characters
  • Verify delimiter characters

Next Steps

Streaming Sources

Real-time data from Kafka, Kinesis, and Pub/Sub

Database Sources

Connect to PostgreSQL, MySQL, Snowflake, and more

Build docs developers (and LLMs) love