Core API¶
Tracking Control¶
enable¶
tp.enable(
mode: str = "ci",
watch: list[str] | None = None,
backend: str | None = None,
identity: str | None = None,
) -> None
Start TracePipe tracking.
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
mode |
str |
"ci" |
Tracking mode: "ci" or "debug" |
watch |
list[str] |
None |
Columns to track for cell changes (debug mode) |
backend |
str |
None |
Lineage storage backend |
identity |
str |
None |
Row identity strategy |
Example:
# CI mode - lightweight
tp.enable(mode="ci")
# Debug mode with watched columns
tp.enable(mode="debug", watch=["price", "quantity", "status"])
disable¶
Stop TracePipe tracking. Restores original pandas methods.
Example:
reset¶
Clear all lineage data. Does not disable tracking.
Example:
tp.enable()
df = process_data_v1()
tp.check(df)
tp.reset() # Clear lineage, keep tracking enabled
df = process_data_v2()
tp.check(df)
register¶
Manually register DataFrames for tracking.
Use this when DataFrames are created before tp.enable() is called.
Lineage Break
Calling register() assigns new row IDs, which breaks lineage from any prior transformations. Use it only for "entry point" DataFrames.
Parameters:
| Parameter | Type | Description |
|---|---|---|
*dfs |
pd.DataFrame |
One or more DataFrames to register |
Example:
# DataFrames created before enable()
customers = pd.read_csv("customers.csv")
orders = pd.read_csv("orders.csv")
tp.enable(mode="debug")
tp.register(customers, orders) # Now they're tracked
stage¶
Label the current pipeline stage.
Parameters:
| Parameter | Type | Description |
|---|---|---|
name |
str |
Stage name |
Example:
tp.stage("load")
df = pd.read_csv("data.csv")
tp.stage("clean")
df = df.dropna()
tp.stage("transform")
df["total"] = df["price"] * df["qty"]
Query Functions¶
check¶
tp.check(
df: pd.DataFrame,
*,
retention_threshold: float | None = None,
merge_expansion_threshold: float | None = None,
) -> CheckResult
Health check for a DataFrame's lineage.
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
df |
pd.DataFrame |
required | DataFrame to check |
retention_threshold |
float |
0.5 |
Warn if retention below this |
merge_expansion_threshold |
float |
None |
Warn if merge expands beyond this |
Returns: CheckResult
| Attribute | Type | Description |
|---|---|---|
.ok |
bool |
True if no FACT-level warnings |
.passed |
bool |
Alias for .ok |
.mode |
str |
Current tracking mode |
.retention |
float \| None |
Row retention rate (0.0-1.0) |
.n_dropped |
int |
Total rows dropped |
.n_steps |
int |
Total pipeline steps recorded |
.drops_by_op |
dict[str, int] |
Drops by operation name |
.warnings |
list[CheckWarning] |
Warning objects with details |
.facts |
dict |
Raw measured facts (for power users) |
Example:
result = tp.check(df)
print(result)
if not result.passed:
for warning in result.warnings:
print(f"⚠ {warning}")
trace¶
tp.trace(
df: pd.DataFrame,
row: int | None = None,
*,
where: dict[str, Any] | None = None,
) -> TraceResult
Trace a row's journey through the pipeline.
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
df |
pd.DataFrame |
required | DataFrame containing the row |
row |
int |
None |
Row index (0-based) |
where |
dict |
None |
Business key lookup |
Returns: TraceResult
| Attribute | Type | Description |
|---|---|---|
.row_id |
int |
Internal row ID |
.is_alive |
bool |
True if row exists in current DataFrame |
.events |
list |
All events for this row |
.dropped_at |
dict |
Operation that dropped (if dropped) |
.origin |
dict |
Where row came from: {"type": "concat", "source_df": 1} or {"type": "merge", "left_parent": 10, "right_parent": 20} |
.representative |
dict |
If dropped by dedup: {"kept_rid": 42, "subset": [...], "keep": "first"} |
Example:
# By index
trace = tp.trace(df, row=0)
# By business key
trace = tp.trace(df, where={"customer_id": "C-123"})
print(trace)
why¶
tp.why(
df: pd.DataFrame,
col: str,
row: int | None = None,
*,
where: dict[str, Any] | None = None,
) -> WhyResult
Explain why a cell has its current value.
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
df |
pd.DataFrame |
required | DataFrame containing the cell |
col |
str |
required | Column name |
row |
int |
None |
Row index (0-based) |
where |
dict |
None |
Business key lookup |
Returns: WhyResult
| Attribute | Type | Description |
|---|---|---|
.column |
str |
Column name |
.row_id |
int |
Internal row ID |
.current_value |
Any |
Current cell value |
.history |
list |
All changes to this cell |
.was_null |
bool |
Was ever null |
.null_recovered |
bool |
Null was later filled |
Example:
why = tp.why(df, col="income", row=0)
print(why)
for change in why.history:
print(f"{change.old_value} → {change.new_value}")
Requires Debug Mode
tp.why() requires debug mode with the column being watched.
Output Functions¶
report¶
tp.report(
df: pd.DataFrame,
path: str,
*,
title: str | None = None,
include_data: bool = False,
) -> None
Generate an HTML report.
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
df |
pd.DataFrame |
required | DataFrame to report on |
path |
str |
required | Output file path |
title |
str |
None |
Report title |
include_data |
bool |
False |
Include data preview |
Example:
snapshot¶
tp.snapshot(
df: pd.DataFrame,
*,
include_data: bool = False,
path: str | None = None,
) -> Snapshot
Capture DataFrame state for later comparison.
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
df |
pd.DataFrame |
required | DataFrame to snapshot |
include_data |
bool |
False |
Store full data copy |
path |
str |
None |
Save to disk |
Returns: Snapshot
Example:
diff¶
Compare two snapshots.
Parameters:
| Parameter | Type | Description |
|---|---|---|
before |
Snapshot |
Earlier snapshot |
after |
Snapshot |
Later snapshot |
Returns: DiffResult
| Attribute | Type | Description |
|---|---|---|
.rows_added |
int |
New rows |
.rows_removed |
int |
Removed rows |
.cells_changed |
int |
Modified cells |
Example: