ML Pipeline Example¶
A complete example of using TracePipe to audit an ML data pipeline.
The Scenario¶
You're building a customer churn prediction model. Your pipeline:
- Loads customer data
- Cleans missing values
- Filters valid customers
- Engineers features
- Merges with additional data
You need to understand what happens to your training data.
Full Example¶
import tracepipe as tp
import pandas as pd
import numpy as np
# Enable debug tracking
tp.enable(mode="debug", watch=["income", "age", "churn_score"])
# =============================================================================
# Stage 1: Load Data
# =============================================================================
tp.stage("load")
customers = pd.DataFrame({
"customer_id": [f"C-{i}" for i in range(1000)],
"age": np.random.randint(18, 80, 1000),
"income": np.random.normal(50000, 20000, 1000),
"tenure_months": np.random.randint(1, 120, 1000),
"region": np.random.choice(["US", "EU", "APAC"], 1000),
"churn_score": np.random.random(1000),
})
# Inject some realistic issues
customers.loc[50:100, "income"] = None
customers.loc[200:220, "age"] = -1 # Invalid ages
customers.loc[300:310, "churn_score"] = None
print(f"Loaded {len(customers)} customers")
# =============================================================================
# Stage 2: Clean Data
# =============================================================================
tp.stage("clean")
# Drop rows with missing income
customers = customers.dropna(subset=["income"])
print(f"After dropna: {len(customers)}")
# Fill missing churn scores with median
median_score = customers["churn_score"].median()
customers["churn_score"] = customers["churn_score"].fillna(median_score)
# Remove invalid ages
customers = customers[customers["age"] > 0]
print(f"After age filter: {len(customers)}")
# =============================================================================
# Stage 3: Feature Engineering
# =============================================================================
tp.stage("features")
# Normalize income
customers["income_normalized"] = (
customers["income"] - customers["income"].mean()
) / customers["income"].std()
# Age buckets
customers["age_bucket"] = pd.cut(
customers["age"],
bins=[0, 25, 35, 50, 65, 100],
labels=["18-25", "26-35", "36-50", "51-65", "65+"]
)
# Log transform
customers["log_tenure"] = np.log1p(customers["tenure_months"])
# =============================================================================
# Stage 4: Enrich with Region Data
# =============================================================================
tp.stage("enrich")
region_data = pd.DataFrame({
"region": ["US", "EU", "APAC"],
"market_size": [1.0, 0.8, 1.2],
"growth_rate": [0.05, 0.03, 0.08],
})
customers = customers.merge(region_data, on="region", how="left")
# =============================================================================
# Audit the Pipeline
# =============================================================================
print("\n" + "="*60)
print("PIPELINE AUDIT")
print("="*60)
# Health check
result = tp.check(customers)
print(result)
# Contract validation
print("\n--- Contract Check ---")
contract_result = (tp.contract()
.expect_no_nulls(["customer_id", "income", "churn_score"])
.expect_unique("customer_id")
.expect_retention(min_rate=0.8)
.expect_range("age", min_val=0, max_val=150)
.check(customers))
print(contract_result)
# Trace a specific customer
print("\n--- Customer Journey ---")
trace = tp.trace(customers, where={"customer_id": "C-55"})
print(trace)
# Why did churn_score change?
print("\n--- Cell Provenance ---")
why = tp.why(customers, col="churn_score", where={"customer_id": "C-305"})
print(why)
# Generate report
tp.report(customers, "ml_pipeline_audit.html", title="Churn Model Pipeline Audit")
print("\n✓ Report saved to ml_pipeline_audit.html")
# Debug inspection
print("\n--- Debug Stats ---")
dbg = tp.debug.inspect()
stats = dbg.stats()
print(f"Steps recorded: {stats['steps_recorded']}")
print(f"Rows dropped: {stats['rows_dropped']}")
# Export lineage
dbg.export("json", "ml_pipeline_lineage.json")
print("✓ Lineage exported to ml_pipeline_lineage.json")
tp.disable()
Expected Output¶
Loaded 1000 customers
After dropna: 949
After age filter: 928
============================================================
PIPELINE AUDIT
============================================================
TracePipe Check: [OK] Pipeline healthy
Mode: debug
Retention: 928/1000 (92.8%)
Dropped: 72 rows
• DataFrame.dropna: 51
• DataFrame.__getitem__[mask]: 21
Value changes: 11 cells modified
• DataFrame.fillna: 11 (churn_score)
--- Contract Check ---
Contract: [PASSED] All 5 expectations met
✓ no_nulls(customer_id, income, churn_score)
✓ unique(customer_id)
✓ retention >= 80.0%
✓ range(age): 0 <= x <= 150
--- Customer Journey ---
Row 55 Journey:
Status: [DROPPED]
Dropped by: DataFrame.dropna (step 2)
Events: 1
[DROPPED] DataFrame.dropna
--- Cell Provenance ---
Cell History: row 294, column 'churn_score'
Current value: 0.4821
[i] Was null at step 1 (later recovered)
by: DataFrame.fillna
History (1 change):
None -> 0.4821
by: DataFrame.fillna
✓ Report saved to ml_pipeline_audit.html
--- Debug Stats ---
Steps recorded: 12
Rows dropped: 72
✓ Lineage exported to ml_pipeline_lineage.json
Key Takeaways¶
- Visibility: You can see exactly where rows are dropped and why
- Null Recovery: TracePipe flags when nulls are filled, showing the imputation
- Contract Validation: Ensure your training data meets quality standards
- Reproducibility: Export lineage for documentation and debugging