Skip to main content

Overview

The pipeline generates detailed profiling artifacts that break down performance at the operator level, enabling fine-grained optimization and bottleneck identification.

Generated Profiling Artifacts

After running run_all(), the pipeline creates three main profiling outputs:

operator_profile.csv

Per-chunk operator-level timing breakdown located in output_dir/profiles/:
chunk_id,preprocess_s,feature_engineering_s,feature_selection_s,encode_scale_s,estimated_input_bandwidth_mb_s,input_bytes
1,0.012,0.018,0.008,0.007,45.3,2097152
2,0.011,0.019,0.007,0.008,47.1,2097152
3,0.013,0.017,0.008,0.007,43.8,2097152
Column Descriptions:
  • preprocess_s: Time spent in data cleaning and preprocessing
  • feature_engineering_s: Time spent building derived features
  • feature_selection_s: Time spent in multicollinearity detection and feature filtering
  • encode_scale_s: Time spent in one-hot encoding and scaling
  • estimated_input_bandwidth_mb_s: Calculated input data bandwidth (MB/s)
  • input_bytes: Raw input data size for the chunk

streaming_chunks.csv

Chunk-level latency, throughput, and memory observations in output_dir/benchmarks/:
chunk_id,rows,latency_s,throughput_rows_s,memory_before_mb,memory_after_mb,memory_exceeded,retries
1,128,0.045,2844.4,245.3,268.7,false,0
2,128,0.042,3047.6,268.7,289.1,false,0
See Benchmarking for complete field descriptions.

pipeline_report.json

Aggregate telemetry and operator profile summary in output_dir/reports/:
{
  "streaming": {
    "operator_profile_summary_s": {
      "preprocess_s": 0.012,
      "feature_engineering_s": 0.018,
      "feature_selection_s": 0.008,
      "encode_scale_s": 0.007
    },
    "telemetry": {
      "cpu_percent_start": 12.3,
      "cpu_percent_end": 45.7,
      "process_memory_start_mb": 156.2,
      "process_memory_end_mb": 342.8,
      "rapl_energy_j": 23.45
    }
  }
}

Operator-Level Profiling

Implementation

The _profile_stream_chunk() method in engine.py:84-106 measures wall-clock time for each pipeline stage:
def _profile_stream_chunk(self, chunk: pd.DataFrame, rolling_state: Any):
    stage_start = time.perf_counter()
    cleaned = self.preprocessor.clean(chunk)
    preprocess_s = time.perf_counter() - stage_start

    stage_start = time.perf_counter()
    featured = self.engineer.build_features_streaming(cleaned, rolling_state)
    feature_s = time.perf_counter() - stage_start

    stage_start = time.perf_counter()
    filtered = self.engineer.drop_multicollinearity(featured)
    select_s = time.perf_counter() - stage_start

    stage_start = time.perf_counter()
    x_chunk, y_chunk = self.engineer.encode_and_scale(filtered)
    encode_s = time.perf_counter() - stage_start

    return x_chunk, y_chunk, {
        'preprocess_s': float(preprocess_s),
        'feature_engineering_s': float(feature_s),
        'feature_selection_s': float(select_s),
        'encode_scale_s': float(encode_s),
    }

Identifying Bottlenecks

Use the operator profile to identify dominant stages:
import pandas as pd

# Load operator profile
profile = pd.read_csv('artifacts/profiles/operator_profile.csv')

# Calculate mean time per operator
operator_means = profile[[
    'preprocess_s',
    'feature_engineering_s',
    'feature_selection_s',
    'encode_scale_s'
]].mean()

print("Operator breakdown:")
print(operator_means.sort_values(ascending=False))
Example output:
feature_engineering_s    0.0182
preprocess_s            0.0121
feature_selection_s     0.0078
encode_scale_s          0.0071
In this case, feature engineering is the bottleneck and should be the focus of optimization efforts.

Hardware Telemetry

HardwareMonitor Class

The HardwareMonitor class (monitor.py:16-76) provides fallback-safe hardware telemetry:
from pipeline.hardware import HardwareMonitor

monitor = HardwareMonitor()

# Capture snapshot before processing
start = monitor.snapshot()

# ... run pipeline ...

# Capture snapshot after processing
end = monitor.snapshot()

# Compare snapshots
telemetry = monitor.compare(start, end)

TelemetrySnapshot Structure

From monitor.py:8-13:
@dataclass
class TelemetrySnapshot:
    cpu_percent: float
    process_memory_mb: float
    system_memory_percent: float
    energy_uj: float | None  # RAPL energy in microjoules

RAPL Energy Measurement

On Linux systems with Intel RAPL support, the monitor reads energy counters from:
/sys/class/powercap/intel-rapl*/energy_uj
Implementation in monitor.py:30-45:
def _discover_rapl_path(self) -> Path | None:
    base = Path('/sys/class/powercap')
    if not base.exists():
        return None
    for cand in base.glob('intel-rapl*/energy_uj'):
        if cand.is_file():
            return cand
    return None

def _read_rapl_energy_uj(self) -> float | None:
    if self._rapl_path is None:
        return None
    try:
        return float(self._rapl_path.read_text(encoding='utf-8').strip())
    except Exception:
        return None
Energy calculation (engine.py:186, 299):
telemetry = self.hardware.compare(start_snapshot, end_snapshot)

# Fallback if RAPL unavailable
telemetry['fallback_energy_estimate_j'] = elapsed * 45.0  # batch
telemetry['fallback_energy_estimate_j'] = elapsed * 30.0  # streaming

energy = telemetry['rapl_energy_j'] if telemetry['rapl_energy_j'] is not None else telemetry['fallback_energy_estimate_j']
Assumptions:
  • Batch mode: 45W average power
  • Streaming mode: 30W average power

Memory Hierarchy and Cache Effects

The project does not capture PMU (Performance Monitoring Unit) counters directly, but practical signals help identify memory bottlenecks:

Cache Pressure Indicators

  1. Increased encode_scale_s with larger chunks
    • One-hot encoding creates sparse matrices that stress cache
    • Try reducing chunk size if this stage dominates
  2. Rising end-to-end latency with stable throughput
    • May indicate memory copy overhead
    • Check for unnecessary DataFrame copies
  3. Divergence between bandwidth estimate and throughput
    • Could indicate storage or serialization bottlenecks
    • Monitor estimated_input_bandwidth_mb_s vs actual I/O speed

Bandwidth Estimation

From engine.py:282-283:
'input_bytes': int(chunk.memory_usage(index=True, deep=True).sum()),
'estimated_input_bandwidth_mb_s': float(
    (chunk.memory_usage(index=True, deep=True).sum() / (1024 * 1024)) / max(elapsed, 1e-9)
)
Formula: bandwidth = input_bytes_mb / latency_s Interpretation:
  • High bandwidth (> 1000 MB/s): Good cache utilization
  • Low bandwidth (< 100 MB/s): Potential I/O or memory bottleneck
  • Decreasing bandwidth over time: Growing memory pressure

Reproducible Profiling Command

To generate profiling artifacts with controlled parameters:
cd "NBA Data Preprocessing/task"
python run_pipeline.py \
  --input ../data/nba2k-full.csv \
  --output-dir artifacts_profile \
  --chunk-size 128 \
  --batch-size 256 \
  --max-memory-mb 512 \
  --max-compute-units 0.5 \
  --benchmark-runs 3 \
  --random-seed 42
Key parameters:
  • --chunk-size: Size of streaming chunks (affects cache behavior)
  • --max-memory-mb: Memory limit for adaptive sizing
  • --max-compute-units: CPU constraint (0.0-1.0)
  • --benchmark-runs: Number of profiling iterations
  • --random-seed: Ensures reproducibility

Using Profiling Data for Optimization

Step 1: Identify Dominant Operator

python -c "import pandas as pd; \
df = pd.read_csv('artifacts/profiles/operator_profile.csv'); \
print(df[['preprocess_s', 'feature_engineering_s', 'feature_selection_s', 'encode_scale_s']].mean())"

Step 2: Analyze Chunk Size Impact

Compare operator times across different chunk sizes:
import pandas as pd
import matplotlib.pyplot as plt

profile = pd.read_csv('artifacts/profiles/operator_profile.csv')
chunks = pd.read_csv('artifacts/benchmarks/streaming_chunks.csv')

# Merge on chunk_id
merged = profile.merge(chunks[['chunk_id', 'chunk_size']], on='chunk_id')

# Plot encode time vs chunk size
merged.plot.scatter(x='chunk_size', y='encode_scale_s')
plt.xlabel('Chunk Size')
plt.ylabel('Encode & Scale Time (s)')
plt.title('Cache Pressure vs Chunk Size')
plt.savefig('cache_analysis.png')

Step 3: Investigate Memory Patterns

chunks = pd.read_csv('artifacts/benchmarks/streaming_chunks.csv')

# Find chunks with memory pressure
memory_issues = chunks[chunks['memory_exceeded'] == True]
print(f"Chunks exceeding memory limit: {len(memory_issues)}")

# Analyze retry patterns
if len(memory_issues) > 0:
    print(f"Average retries: {memory_issues['retries'].mean():.2f}")
    print(f"Max retries: {memory_issues['retries'].max()}")

Limitations

From the source documentation:
  • No GPU profiling: GPU kernels are not currently measured
  • User-space timing: All timing is wall-clock time in Python user space
  • No quantization path: Direct quantization is not implemented
  • Platform-dependent energy: RAPL counters only available on Intel Linux systems
  • No PMU counters: Cache misses, branch mispredictions not captured

Next Steps

Build docs developers (and LLMs) love