Overview
Parquet is a columnar storage format optimized for analytics. PyArrow provides comprehensive support for reading and writing Parquet files through thepyarrow.parquet module.
import pyarrow.parquet as pq
import pyarrow as pa
# Write Parquet file
table = pa.table({'a': [1, 2, 3], 'b': ['x', 'y', 'z']})
pq.write_table(table, 'data.parquet')
# Read Parquet file
table = pq.read_table('data.parquet')
print(table)
Writing Parquet Files
Basic Writing
- From Table
- From Pandas
- Incremental Writing
import pyarrow as pa
import pyarrow.parquet as pq
# Create a table
table = pa.table({
'id': [1, 2, 3, 4, 5],
'name': ['Alice', 'Bob', 'Charlie', 'David', 'Eve'],
'score': [95.5, 87.3, 92.1, 88.9, 91.5]
})
# Write to Parquet file
pq.write_table(table, 'output.parquet')
import pandas as pd
import pyarrow.parquet as pq
# Create pandas DataFrame
df = pd.DataFrame({
'id': [1, 2, 3],
'name': ['Alice', 'Bob', 'Charlie']
})
# Write directly from pandas
df.to_parquet('output.parquet', engine='pyarrow')
# Or convert to Arrow table first
import pyarrow as pa
table = pa.Table.from_pandas(df)
pq.write_table(table, 'output.parquet')
import pyarrow as pa
import pyarrow.parquet as pq
# Write multiple batches to single file
schema = pa.schema([
('id', pa.int32()),
('value', pa.float64())
])
with pq.ParquetWriter('output.parquet', schema) as writer:
for i in range(10):
# Create batch
batch = pa.table({
'id': range(i*100, (i+1)*100),
'value': [x * 1.5 for x in range(100)]
})
writer.write_table(batch)
Compression
import pyarrow as pa
import pyarrow.parquet as pq
table = pa.table({'a': range(1000), 'b': range(1000)})
# Snappy compression (default, fast)
pq.write_table(table, 'data_snappy.parquet', compression='snappy')
# Gzip compression (better compression ratio)
pq.write_table(table, 'data_gzip.parquet', compression='gzip')
# Zstd compression (good balance)
pq.write_table(table, 'data_zstd.parquet', compression='zstd')
# Brotli compression (best compression)
pq.write_table(table, 'data_brotli.parquet', compression='brotli')
# Per-column compression
pq.write_table(
table,
'data.parquet',
compression={'a': 'snappy', 'b': 'gzip'}
)
# No compression
pq.write_table(table, 'data_uncompressed.parquet', compression='none')
Snappy offers the best balance of speed and compression for most use cases.
Write Options
import pyarrow as pa
import pyarrow.parquet as pq
table = pa.table({
'id': range(10000),
'category': ['A', 'B', 'C'] * 3333 + ['A'],
'value': range(10000)
})
pq.write_table(
table,
'data.parquet',
# Row group size
row_group_size=5000,
# Data page size
data_page_size=1024*1024, # 1MB
# Dictionary encoding
use_dictionary=['category'],
# Compression level
compression='gzip',
compression_level=6,
# Write statistics
write_statistics=True,
# Parquet version
version='2.6',
# Use legacy format
use_deprecated_int96_timestamps=False
)
Reading Parquet Files
Basic Reading
import pyarrow.parquet as pq
# Read entire file
table = pq.read_table('data.parquet')
print(f"Rows: {table.num_rows}, Columns: {table.num_columns}")
# Read into pandas
df = pq.read_pandas('data.parquet')
# Read specific columns
table = pq.read_table('data.parquet', columns=['id', 'name'])
# Read with filter
import pyarrow.compute as pc
table = pq.read_table(
'data.parquet',
filters=[
('id', '>', 100),
('category', 'in', ['A', 'B'])
]
)
Column Selection
- Select Columns
- Column Indices
import pyarrow.parquet as pq
# Read specific columns
table = pq.read_table(
'data.parquet',
columns=['id', 'name', 'score']
)
# Read nested column fields
table = pq.read_table(
'data.parquet',
columns=['user.name', 'user.email']
)
import pyarrow.parquet as pq
# Get file metadata first
parquet_file = pq.ParquetFile('data.parquet')
print(parquet_file.schema)
# Read by column indices
table = pq.read_table(
'data.parquet',
columns=[0, 2, 4] # First, third, fifth columns
)
Filtering
import pyarrow.parquet as pq
# DNF (Disjunctive Normal Form) filters
# [[('x', '=', 0), ...], ...] means AND within inner lists, OR between them
# Simple filter: age > 25
table = pq.read_table(
'data.parquet',
filters=[('age', '>', 25)]
)
# Multiple conditions (AND): age > 25 AND score >= 80
table = pq.read_table(
'data.parquet',
filters=[
('age', '>', 25),
('score', '>=', 80)
]
)
# OR conditions: age < 18 OR age > 65
table = pq.read_table(
'data.parquet',
filters=[
[('age', '<', 18)],
[('age', '>', 65)]
]
)
# Complex: (age >= 18 AND age <= 65) AND status = 'active'
table = pq.read_table(
'data.parquet',
filters=[
('age', '>=', 18),
('age', '<=', 65),
('status', '=', 'active')
]
)
# IN filter
table = pq.read_table(
'data.parquet',
filters=[('category', 'in', ['A', 'B', 'C'])]
)
Filters are pushed down to the file reader, so only relevant row groups are read from disk.
Row Group Reading
import pyarrow.parquet as pq
# Open file for reading
parquet_file = pq.ParquetFile('data.parquet')
print(f"Number of row groups: {parquet_file.num_row_groups}")
# Read specific row group
row_group_0 = parquet_file.read_row_group(0)
print(row_group_0)
# Read multiple row groups
row_groups = parquet_file.read_row_groups([0, 2, 4])
# Iterate over row groups
for i in range(parquet_file.num_row_groups):
row_group = parquet_file.read_row_group(i)
print(f"Row group {i}: {row_group.num_rows} rows")
File Metadata
Schema and Statistics
import pyarrow.parquet as pq
# Read file metadata
parquet_file = pq.ParquetFile('data.parquet')
# Schema
print("Schema:")
print(parquet_file.schema)
print(parquet_file.schema_arrow) # Arrow schema
# File-level metadata
metadata = parquet_file.metadata
print(f"Created by: {metadata.created_by}")
print(f"Number of rows: {metadata.num_rows}")
print(f"Number of row groups: {metadata.num_row_groups}")
print(f"Number of columns: {metadata.num_columns}")
print(f"Serialized size: {metadata.serialized_size} bytes")
# Row group metadata
for i in range(metadata.num_row_groups):
rg = metadata.row_group(i)
print(f"\nRow group {i}:")
print(f" Rows: {rg.num_rows}")
print(f" Total byte size: {rg.total_byte_size}")
# Column metadata
for j in range(rg.num_columns):
col = rg.column(j)
print(f" Column {j} ({col.physical_type}):")
print(f" Compressed size: {col.total_compressed_size}")
print(f" Uncompressed size: {col.total_uncompressed_size}")
# Statistics
if col.statistics:
stats = col.statistics
print(f" Min: {stats.min}")
print(f" Max: {stats.max}")
print(f" Null count: {stats.null_count}")
print(f" Distinct count: {stats.distinct_count}")
Custom Metadata
import pyarrow as pa
import pyarrow.parquet as pq
# Write with custom metadata
table = pa.table({'a': [1, 2, 3]})
custom_metadata = {
b'source': b'data_pipeline_v2',
b'created': b'2024-01-15',
b'version': b'1.0'
}
pq.write_table(
table,
'data.parquet',
metadata_collector=custom_metadata
)
# Read custom metadata
parquet_file = pq.ParquetFile('data.parquet')
metadata = parquet_file.schema_arrow.metadata
print(metadata)
Partitioned Datasets
For partitioned datasets, use the Dataset API:import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.dataset as ds
# Write partitioned dataset
table = pa.table({
'year': [2020, 2021, 2022] * 100,
'month': list(range(1, 13)) * 25,
'value': range(300)
})
ds.write_dataset(
table,
'partitioned_data/',
format='parquet',
partitioning=['year', 'month']
)
# Read partitioned dataset
dataset = ds.dataset(
'partitioned_data/',
format='parquet',
partitioning='hive'
)
table = dataset.to_table()
Advanced Features
Dictionary Encoding
import pyarrow as pa
import pyarrow.parquet as pq
# Create table with categorical data
table = pa.table({
'id': range(10000),
'category': ['A', 'B', 'C', 'D'] * 2500
})
# Enable dictionary encoding for specific columns
pq.write_table(
table,
'data.parquet',
use_dictionary=['category'],
compression='snappy'
)
# Disable dictionary encoding
pq.write_table(
table,
'data.parquet',
use_dictionary=False
)
Nested Data
- Structs
- Lists
- Maps
import pyarrow as pa
import pyarrow.parquet as pq
# Create table with struct column
table = pa.table({
'id': [1, 2, 3],
'user': [
{'name': 'Alice', 'age': 30},
{'name': 'Bob', 'age': 25},
{'name': 'Charlie', 'age': 35}
]
})
pq.write_table(table, 'nested.parquet')
# Read nested data
table = pq.read_table('nested.parquet')
print(table['user'])
# Read specific nested fields
table = pq.read_table(
'nested.parquet',
columns=['id', 'user.name']
)
import pyarrow as pa
import pyarrow.parquet as pq
# Create table with list column
table = pa.table({
'id': [1, 2, 3],
'tags': [
['python', 'arrow'],
['data', 'analytics'],
['parquet', 'columnar']
]
})
pq.write_table(table, 'lists.parquet')
table = pq.read_table('lists.parquet')
import pyarrow as pa
import pyarrow.parquet as pq
# Create table with map column
table = pa.table({
'id': [1, 2, 3],
'attributes': [
{'color': 'red', 'size': 'large'},
{'color': 'blue', 'size': 'medium'},
{'color': 'green'}
]
})
pq.write_table(table, 'maps.parquet')
table = pq.read_table('maps.parquet')
Encryption
import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.parquet.encryption as pe
from datetime import timedelta
# Setup encryption
FOOTER_KEY = b"0123456789112345"
FOOTER_KEY_NAME = "footer_key"
COL_KEY = b"1234567890123450"
COL_KEY_NAME = "col_key"
encryption_config = pe.EncryptionConfiguration(
footer_key=FOOTER_KEY_NAME,
plaintext_footer=False,
column_keys={
COL_KEY_NAME: ["sensitive_column"],
},
encryption_algorithm="AES_GCM_V1",
cache_lifetime=timedelta(minutes=5.0),
data_key_length_bits=256
)
# Write encrypted Parquet file
table = pa.table({
'id': [1, 2, 3],
'sensitive_column': ['secret1', 'secret2', 'secret3']
})
# Note: Full encryption setup requires KMS configuration
# See examples/dataset/write_dataset_encrypted.py for complete example
Performance Optimization
Memory-Efficient Reading
import pyarrow.parquet as pq
# Use iterator for large files
parquet_file = pq.ParquetFile('large_file.parquet')
# Read in batches
for batch in parquet_file.iter_batches(batch_size=10000):
# Process batch
print(f"Processing {batch.num_rows} rows")
# Your processing logic here
# Or use row group iteration
for i in range(parquet_file.num_row_groups):
row_group = parquet_file.read_row_group(i)
# Process row group
Pre-buffering
import pyarrow.parquet as pq
# Enable pre-buffering for better performance
table = pq.read_table(
'data.parquet',
pre_buffer=True,
use_threads=True,
buffer_size=8*1024*1024 # 8MB buffer
)
Optimizing Row Groups
import pyarrow as pa
import pyarrow.parquet as pq
table = pa.table({'a': range(1000000)})
# Optimal row group size (typically 64-128 MB uncompressed)
pq.write_table(
table,
'optimized.parquet',
row_group_size=100000,
compression='snappy'
)
Best Practices:
- Row groups should be 64-128 MB uncompressed for optimal performance
- Use dictionary encoding for low-cardinality string columns
- Enable statistics for better filter pushdown
- Choose compression based on your CPU/IO balance
Working with Metadata Files
import pyarrow.parquet as pq
# Write metadata file for dataset
metadata_path = '_metadata'
# Collect metadata from multiple files
files = ['part0.parquet', 'part1.parquet', 'part2.parquet']
# Write common metadata
pq.write_metadata(
schema,
metadata_path,
metadata_collector=files
)
# Read using metadata file
parquet_dataset = pq.ParquetDataset(
metadata_path,
filesystem=None
)
table = parquet_dataset.read()
Practical Examples
Example: Convert CSV to Parquet
import pyarrow as pa
import pyarrow.csv as csv
import pyarrow.parquet as pq
# Read CSV
table = csv.read_csv('input.csv')
# Write as Parquet with compression
pq.write_table(
table,
'output.parquet',
compression='snappy',
use_dictionary=True
)
Example: Split Large File
import pyarrow.parquet as pq
# Read large file
parquet_file = pq.ParquetFile('large_file.parquet')
# Split by row groups
for i in range(parquet_file.num_row_groups):
row_group = parquet_file.read_row_group(i)
pq.write_table(
row_group,
f'split_{i}.parquet',
compression='snappy'
)
Example: Append to Parquet File
import pyarrow as pa
import pyarrow.parquet as pq
# Read existing file
existing_table = pq.read_table('data.parquet')
# Create new data
new_data = pa.table({'a': [4, 5, 6], 'b': ['x', 'y', 'z']})
# Concatenate and write
import pyarrow as pa
combined = pa.concat_tables([existing_table, new_data])
pq.write_table(combined, 'data.parquet')
Next Steps
- Dataset API - Work with multi-file Parquet datasets
- Compute Functions - Process Parquet data
- Tables and Arrays - Understand data structures