Write a transformer¶
Prerequisites · You have a per-row or per-batch transformation to apply between read and write. End state · Transformer in the pipeline at a known order slot, registered via entry points.
Minimal transformer¶
from datacoolie.transformers.base import BaseTransformer
from datacoolie.core.models import DataFlow
class PiiMaskerTransformer(BaseTransformer):
# Slots 40-60 are reserved for user plugins. Pick one.
ORDER = 45
@property
def order(self) -> int:
return self.ORDER
def transform(self, df, dataflow: DataFlow):
cfg = (dataflow.transform and dataflow.transform.additional.get("pii_mask")) or {}
cols = cfg.get("columns", [])
if not cols:
self._mark_skipped()
return df
for c in cols:
df = self._engine.add_column(df, c, f"sha2({c}, 256)")
self._mark_applied(f"cols={len(cols)}")
return df
Register¶
[project.entry-points."datacoolie.transformers"]
pii_masker = "mypkg.transformers:PiiMaskerTransformer"
Opt in from metadata¶
The driver's DEFAULT_TRANSFORMERS list does not include your plugin. To
use it:
driver = DataCoolieDriver(engine=engine, metadata_provider=metadata)
driver._create_transformer_pipeline = lambda: build_pipeline(
engine,
names=["schema_converter", "deduplicator", "pii_masker", "system_column_adder", "column_name_sanitizer"],
)
Or subclass DataCoolieDriver and override _create_transformer_pipeline.
Order slot cheat-sheet¶
| Slots | Who owns them |
|---|---|
| 0–9 | Reserved for future framework pre-cast work |
| 10 | SchemaConverter |
| 20 | Deduplicator |
| 30 | ColumnAdder, SCD2ColumnAdder |
| 40–60 | Your plugins |
| 70 | SystemColumnAdder |
| 80 | PartitionHandler |
| 90 | ColumnNameSanitizer |
| 100+ | Reserved for future framework post-sanitize work |
See ADR-0003.
Tracking labels¶
Call _mark_applied(), _mark_applied("detail"), or _mark_skipped() inside
transform so the ETL log records exactly what your transformer did. Without
a call, the default is to record your class name.