Skip to main content

Batch Prediction

Batch prediction allows you to send large numbers of multimodal requests to Gemini asynchronously. Instead of getting immediate responses, results are written to Cloud Storage or BigQuery when processing completes.

Why Batch Prediction?

Cost Effective

50% lower cost compared to online predictions

High Volume

Process thousands of requests in a single job

No Rate Limits

Bypass per-minute quota restrictions

When to Use Batch Prediction

Good Use Cases:
  • Processing large datasets (1000+ items)
  • Offline analysis and evaluation
  • Bulk content classification or summarization
  • Dataset labeling and annotation
  • Periodic batch jobs (nightly, weekly)
  • Cost-sensitive workloads
Not Suitable For:
  • Real-time applications
  • Interactive user experiences
  • Low-latency requirements
  • Small request volumes (less than 100 items)

Supported Models

Batch prediction is available for:
  • gemini-3.1-pro-preview
  • gemini-3-flash-preview
  • gemini-2.5-pro
  • gemini-2.5-flash
  • gemini-2.0-flash

Quick Start

Installation

pip install --upgrade google-genai google-cloud-storage google-cloud-bigquery

Setup

import os
from google import genai
from google.genai.types import CreateBatchJobConfig

PROJECT_ID = "your-project-id"
LOCATION = "global"  # or "us-central1"

client = genai.Client(
    vertexai=True,
    project=PROJECT_ID,
    location=LOCATION
)

Cloud Storage Workflow

Step 1: Prepare Input Data

Create a JSONL file with your requests: batch_requests.jsonl:
{"request":{"contents":[{"role":"user","parts":[{"text":"Summarize this: AI is transforming industries."}]}],"generationConfig":{"temperature":0.4}}}
{"request":{"contents":[{"role":"user","parts":[{"text":"What is machine learning?"}]}],"generationConfig":{"temperature":0.2}}}
{"request":{"contents":[{"role":"user","parts":[{"text":"Explain neural networks."}]}],"generationConfig":{"temperature":0.3}}}

Step 2: Upload to Cloud Storage

# Create bucket
gsutil mb -l us-central1 gs://your-bucket-name

# Upload input file
gsutil cp batch_requests.jsonl gs://your-bucket-name/input/

Step 3: Submit Batch Job

batch_job = client.batches.create(
    model="gemini-2.5-flash",
    src="gs://your-bucket-name/input/batch_requests.jsonl",
    config=CreateBatchJobConfig(
        dest="gs://your-bucket-name/output/"
    )
)

print(f"Job created: {batch_job.name}")
print(f"State: {batch_job.state}")

Step 4: Monitor Job Status

import time

# Poll until complete
while batch_job.state in ["JOB_STATE_PENDING", "JOB_STATE_RUNNING", "JOB_STATE_QUEUED"]:
    time.sleep(10)
    batch_job = client.batches.get(name=batch_job.name)
    print(f"Status: {batch_job.state}")

if batch_job.state == "JOB_STATE_SUCCEEDED":
    print("\n✓ Job completed successfully!")
    print(f"Output: {batch_job.dest.gcs_uri}")
else:
    print(f"\n✗ Job failed: {batch_job.error}")

Step 5: Retrieve Results

import pandas as pd
import fsspec

# Read results from Cloud Storage
fs = fsspec.filesystem("gcs")
file_paths = fs.glob(f"{batch_job.dest.gcs_uri}/*/predictions.jsonl")

if file_paths:
    df = pd.read_json(f"gs://{file_paths[0]}", lines=True)
    
    # Extract responses
    df = df.join(pd.json_normalize(df["response"], "candidates"))
    
    # View results
    for idx, row in df.iterrows():
        request_text = row["request"]["contents"][0]["parts"][0]["text"]
        response_text = row["content"]["parts"][0]["text"]
        
        print(f"\nRequest: {request_text}")
        print(f"Response: {response_text[:200]}...")

Multimodal Batch Requests

Images

{"request":{"contents":[{"role":"user","parts":[{"text":"Describe this image."},{"file_data":{"file_uri":"gs://samples/image1.jpg","mime_type":"image/jpeg"}}]}],"generationConfig":{"temperature":0.4}}}
{"request":{"contents":[{"role":"user","parts":[{"text":"What objects are visible?"},{"file_data":{"file_uri":"gs://samples/image2.jpg","mime_type":"image/jpeg"}}]}],"generationConfig":{"temperature":0.4}}}

Videos

{"request":{"contents":[{"role":"user","parts":[{"text":"Summarize this video."},{"file_data":{"file_uri":"gs://samples/video.mp4","mime_type":"video/mp4"}}]}],"generationConfig":{"temperature":0.3}}}

PDFs

{"request":{"contents":[{"role":"user","parts":[{"text":"Extract key findings."},{"file_data":{"file_uri":"gs://samples/paper.pdf","mime_type":"application/pdf"}}]}],"generationConfig":{"temperature":0.2}}}

BigQuery Workflow

Step 1: Create Input Table

CREATE OR REPLACE TABLE `project.dataset.batch_input` AS
SELECT
  STRUCT(
    [STRUCT(
      'user' AS role,
      [STRUCT('What is AI?' AS text)] AS parts
    )] AS contents,
    STRUCT(0.4 AS temperature) AS generationConfig
  ) AS request
UNION ALL
SELECT
  STRUCT(
    [STRUCT(
      'user' AS role,
      [STRUCT('Explain machine learning.' AS text)] AS parts
    )] AS contents,
    STRUCT(0.3 AS temperature) AS generationConfig
  ) AS request;

Step 2: Submit Batch Job

from google.genai.types import BigQueryDestination, BigQuerySource

batch_job = client.batches.create(
    model="gemini-2.5-flash",
    src=BigQuerySource(
        input_uri=f"bq://{PROJECT_ID}.dataset.batch_input"
    ),
    config=CreateBatchJobConfig(
        dest=BigQueryDestination(
            output_uri=f"bq://{PROJECT_ID}.dataset.batch_output"
        )
    )
)

print(f"Job ID: {batch_job.name}")

Step 3: Query Results

SELECT 
  request.contents[0].parts[0].text AS input_text,
  response.candidates[0].content.parts[0].text AS output_text,
  response.usageMetadata.totalTokenCount AS total_tokens
FROM `project.dataset.batch_output`
WHERE status = ''
LIMIT 10;

Advanced Input Formatting

System Instructions

{
  "request": {
    "contents": [
      {
        "role": "user",
        "parts": [{"text": "Translate 'hello' to Spanish."}]
      }
    ],
    "systemInstruction": {
      "parts": [{"text": "You are a professional translator."}]
    },
    "generationConfig": {
      "temperature": 0.2
    }
  }
}

Safety Settings

{
  "request": {
    "contents": [{"role": "user", "parts": [{"text": "Your prompt"}]}],
    "safetySettings": [
      {
        "category": "HARM_CATEGORY_HATE_SPEECH",
        "threshold": "BLOCK_LOW_AND_ABOVE"
      }
    ],
    "generationConfig": {"temperature": 0.4}
  }
}

Multiple Models

Mix different generation configs per request:
{"request":{"contents":[...],"generationConfig":{"temperature":0.2,"maxOutputTokens":100}}}
{"request":{"contents":[...],"generationConfig":{"temperature":0.8,"maxOutputTokens":500}}}
{"request":{"contents":[...],"generationConfig":{"temperature":0.1,"topP":0.9}}}

List and Manage Jobs

List All Jobs

for job in client.batches.list():
    print(f"Job: {job.name}")
    print(f"  Created: {job.create_time}")
    print(f"  State: {job.state}")
    print(f"  Model: {job.model}")
    print()

Get Job Details

job = client.batches.get(name="projects/.../locations/.../batchPredictionJobs/...")

print(f"State: {job.state}")
print(f"Progress: {job.completion_stats}")
print(f"Input: {job.src}")
print(f"Output: {job.dest}")
print(f"Error: {job.error}")

Cancel a Job

client.batches.cancel(name=batch_job.name)
print("Job cancelled")

Response Structure

Batch prediction output JSONL format:
{
  "status": "",
  "processed_time": "2024-03-09T10:30:00.000Z",
  "request": {
    "contents": [{"role": "user", "parts": [{"text": "What is AI?"}]}],
    "generationConfig": {"temperature": 0.4}
  },
  "response": {
    "candidates": [{
      "content": {
        "role": "model",
        "parts": [{"text": "AI stands for Artificial Intelligence..."}]
      },
      "finishReason": "STOP",
      "avgLogprobs": -0.123
    }],
    "usageMetadata": {
      "promptTokenCount": 12,
      "candidatesTokenCount": 150,
      "totalTokenCount": 162
    },
    "modelVersion": "gemini-2.5-flash@001"
  }
}

Error Handling

Request-Level Errors

Check status field in output:
for idx, row in df.iterrows():
    if row["status"]:
        print(f"Error in request {idx}: {row['status']}")
    else:
        print(f"Request {idx}: Success")

Job-Level Errors

if batch_job.state == "JOB_STATE_FAILED":
    print(f"Job failed: {batch_job.error.message}")
    print(f"Error code: {batch_job.error.code}")
elif batch_job.state == "JOB_STATE_CANCELLED":
    print("Job was cancelled")
elif batch_job.state == "JOB_STATE_PAUSED":
    print("Job is paused")

Cost Optimization

Calculate Costs

def calculate_batch_cost(df):
    """Calculate approximate batch prediction cost."""
    total_input_tokens = 0
    total_output_tokens = 0
    
    for _, row in df.iterrows():
        if row["response"]:
            usage = row["response"].get("usageMetadata", {})
            total_input_tokens += usage.get("promptTokenCount", 0)
            total_output_tokens += usage.get("candidatesTokenCount", 0)
    
    # Batch pricing (50% discount)
    INPUT_RATE = 0.0005  # Per 1K tokens
    OUTPUT_RATE = 0.0015  # Per 1K tokens
    
    input_cost = (total_input_tokens / 1000) * INPUT_RATE
    output_cost = (total_output_tokens / 1000) * OUTPUT_RATE
    
    return {
        "input_tokens": total_input_tokens,
        "output_tokens": total_output_tokens,
        "input_cost": input_cost,
        "output_cost": output_cost,
        "total_cost": input_cost + output_cost
    }

costs = calculate_batch_cost(df)
print(f"Total cost: ${costs['total_cost']:.4f}")

Best Practices

Batch Size

Optimal batch size: 100-10,000 requests per file

File Location

Keep input files in us-central1 for best performance

Monitoring

Monitor job progress via console or API polling

Retries

Implement retry logic for failed individual requests

Input File Guidelines

  • Format: JSONL (JSON Lines) with one request per line
  • Size: Up to 10,000 requests per file
  • Location: Must be in us-central1 region
  • Naming: Use regex patterns like gs://bucket/*.jsonl for multiple files
  • Permissions: Service account needs storage.objects.get access

Output Considerations

  • Results maintain input order
  • Failed requests included with error status
  • Output files written to timestamped subdirectories
  • Use BigQuery for easier querying of large result sets

Processing Results at Scale

Parallel Processing

import concurrent.futures

def process_result(row):
    """Process a single result row."""
    if row["status"]:
        return {"error": row["status"]}
    
    response = row["response"]["candidates"][0]["content"]["parts"][0]["text"]
    return {"success": response}

with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
    results = list(executor.map(process_result, df.to_dict('records')))

Export to Database

from google.cloud import bigquery

bq_client = bigquery.Client()

# Write results to BigQuery
table_id = f"{PROJECT_ID}.dataset.results"
df.to_gbq(table_id, project_id=PROJECT_ID, if_exists="replace")

Next Steps

Context Caching

Cache repeated content in batch jobs

Multimodal

Process images and videos in batch

Function Calling

Use function calling in batch requests

Grounding

Ground batch predictions in data sources

Resources

Build docs developers (and LLMs) love