Skip to main content
Mage provides native integration with Apache Spark through PySpark, enabling large-scale data processing on AWS EMR (Elastic MapReduce) clusters. The integration handles cluster provisioning, script deployment, and job submission automatically.

Overview

The Spark integration uses the pyspark executor type to run pipeline blocks on AWS EMR clusters. Mage automatically:
  • Uploads execution scripts to S3
  • Provisions or connects to EMR clusters
  • Submits Spark jobs with custom configurations
  • Manages cluster lifecycle and resource allocation
The PySpark executor is implemented in mage_ai/data_preparation/executors/pyspark_block_executor.py and requires AWS credentials and an S3 bucket.

Prerequisites

1

AWS Account and Credentials

Ensure you have AWS credentials configured with permissions for:
  • S3 bucket access (read/write)
  • EMR cluster management
  • EC2 instance creation
2

S3 Bucket

Create an S3 bucket for storing:
  • Execution scripts
  • Bootstrap scripts
  • Spark logs and output
  • Remote variables
3

Install Dependencies

Install Spark extras (from setup.py):
pip install "mage-ai[spark]"
This installs:
  • boto3==1.26.60
  • botocore==1.29.60

Configuration

Project-Level Configuration

Configure Spark in your project’s metadata.yaml file:
metadata.yaml
# S3 configuration for remote storage
remote_variables_dir: s3://your-bucket-name

# EMR cluster configuration
emr_config:
  # EC2 key for SSH access (optional)
  ec2_key_name: "your-key-name"
  
  # Instance types
  master_instance_type: "r5.4xlarge"
  slave_instance_type: "r5.4xlarge"
  slave_instance_count: 2
  
  # Security groups (optional)
  master_security_group: "sg-xxxxxxxxxxxx"
  slave_security_group: "sg-yyyyyyyyyyyy"
  
  # Custom bootstrap script (optional)
  bootstrap_script_path: "/path/to/emr_bootstrap.sh"
  
  # Spark properties for master node
  master_spark_properties:
    spark.driver.memory: "32000M"
    spark.driver.maxResultSize: "0"
    spark.executor.memory: "32000M"
    spark.executor.cores: "8"
  
  # Spark properties for slave nodes
  slave_spark_properties:
    spark.driver.memory: "16000M"
    spark.executor.memory: "16000M"
    spark.executor.cores: "4"
  
  # Auto-scaling configuration (optional)
  scaling_policy:
    unit_type: "Instances"  # 'InstanceFleetUnits'|'Instances'|'VCPU'
    minimum_capacity_units: 1
    maximum_capacity_units: 4
    maximum_on_demand_capacity_units: 4
    maximum_core_capacity_units: 3
  
  # External JAR libraries
  spark_jars:
    - s3://bucket/mysql-connector-java-8.0.28.jar
    - s3://bucket/other-library.jar
You may need to request quota increases for specific EC2 instance types. See AWS EC2 resource limits.

Default Instance Memory Mapping

Mage automatically configures driver memory based on instance type (from mage_ai/services/aws/emr/config.py):
INSTANCE_DRIVER_MEMORY_MAPPING = {
    'r5.2xlarge': '16000M',
    'r5.xlarge': '8000M',
    # Default for other instance types
    'default': '32000M',
}

Pipeline-Level Configuration

Create a PySpark pipeline or configure an existing pipeline:
# Pipeline metadata.yaml
type: pyspark

# Override EMR configuration for this pipeline
executor_config:
  master_instance_type: "r5.2xlarge"
  slave_instance_type: "r5.2xlarge"
  slave_instance_count: 1

Execution Flow

When a PySpark block executes, Mage follows this workflow (from pyspark_block_executor.py:38-55):
1

Upload Execution Script

Generate and upload block execution script to S3:
# Script location
s3://your-bucket/scripts/{pipeline_uuid}/{block_uuid}.py
The script includes:
  • Block code
  • Global variables
  • Pipeline configuration
  • Execution partition information
2

Upload Bootstrap Script

Upload bootstrap script for EMR cluster initialization (if configured)
3

Submit Spark Job

Create and submit a Spark job to EMR:
step = {
    'name': f'run_mage_block_{pipeline_uuid}_{block_uuid}',
    'jars': executor_config.spark_jars,
    'script_uri': 's3://bucket/scripts/pipeline/block.py',
    'script_args': [],
}
4

Monitor and Log

Track job execution and stream logs to configured log URI

Docker Integration

Mage provides a Docker image with Spark pre-installed:

Using the Official Spark Dockerfile

integrations/spark/Dockerfile
FROM mageai/mageai:latest
ARG PIP=pip3

# Add Debian Bullseye repository
RUN echo 'deb http://deb.debian.org/debian bullseye main' > /etc/apt/sources.list.d/bullseye.list

# Install OpenJDK 11
RUN apt-get update -y && \
    apt-get install -y openjdk-11-jdk

# Remove Debian Bullseye repository
RUN rm /etc/apt/sources.list.d/bullseye.list

RUN ${PIP} install pyspark

ENV MAGE_DATA_DIR=

Build and Run

docker build -t mage_spark -f integrations/spark/Dockerfile .

Custom Spark Cluster with Helm

Mage includes a custom Spark cluster configuration for Kubernetes deployments:

Setup Instructions

From integrations/custom_spark/README.md:
1

Build Custom Image

cd integrations/custom_spark
docker build -t <docker_name/docker_repo>:<tag> .
2

Push to Registry

docker push <docker_name/docker_repo>:<tag>
3

Update Helm Values

Edit spark.yaml and update the image configuration:
spark.yaml
image:
  registry: docker.io
  repository: <docker_name/docker_repo>
  tag: <tag>
4

Deploy with Helm

helm upgrade --install <spark-name> bitnami/spark -f spark.yaml

Connect Mage to Spark Cluster

Set the Spark master URL in your environment:
export SPARK_MASTER_HOST="spark://spark-master-url:7077"
Or configure in metadata.yaml:
spark_config:
  spark_master: "spark://spark-master-url:7077"
  app_name: "my spark app"

Writing PySpark Code

Access the Spark session in your blocks:
from pyspark.sql import DataFrame

if 'data_loader' not in globals():
    from mage_ai.data_preparation.decorators import data_loader

@data_loader
def load_data(*args, **kwargs) -> DataFrame:
    # Access Spark session from kwargs
    spark = kwargs['spark']
    
    # Read from S3
    df = spark.read \
        .format('csv') \
        .option('header', 'true') \
        .option('inferSchema', 'true') \
        .load('s3://bucket/data/*.csv')
    
    return df

Security Configuration

EMR Security Groups

To access EMR clusters from Mage, configure security group inbound rules:
TypeProtocolPort RangeSource
Custom TCPTCP8998Your IP/Security Group
SSHTCP22Your IP (if using SSH tunneling)

IAM Permissions

Your Mage instance needs the following IAM permissions:
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "elasticmapreduce:*",
        "s3:GetObject",
        "s3:PutObject",
        "s3:ListBucket",
        "ec2:DescribeSubnets",
        "ec2:DescribeSecurityGroups"
      ],
      "Resource": "*"
    }
  ]
}

Troubleshooting

  • Verify AWS credentials are configured correctly
  • Check EC2 instance quota limits for requested instance types
  • Ensure S3 bucket exists and is accessible
  • Verify security group IDs are correct
  • Check security group rules allow access from Mage to EMR (port 8998)
  • Verify S3 bucket permissions for script upload
  • Review EMR cluster logs in S3
  • Ensure JAR files in spark_jars are accessible
  • Increase instance types (e.g., r5.4xlarge → r5.8xlarge)
  • Adjust spark.driver.memory and spark.executor.memory settings
  • Increase slave_instance_count for more distributed memory
  • Configure auto-scaling with scaling_policy

Best Practices

  1. Resource Sizing: Start with smaller instances and scale up based on actual usage
  2. Auto-Scaling: Use scaling_policy to optimize costs and handle variable workloads
  3. S3 Optimization: Store intermediate results in S3 with partitioning for better performance
  4. Custom JARs: Pre-upload frequently used JAR files to S3 and reference them in spark_jars
  5. Bootstrap Scripts: Use bootstrap scripts for cluster-wide dependencies and configurations
  6. Monitoring: Enable CloudWatch metrics and configure log aggregation in S3

PySpark Executor

Legacy Spark executor documentation

Compute Overview

Overview of all compute integrations

AWS EMR

AWS EMR official documentation

PySpark API

Apache Spark PySpark API reference

Build docs developers (and LLMs) love