Transformers & pipeline¶
TL;DR TransformerPipeline runs a fixed ordered list of transformers
between read and write. Each transformer has an order integer (lower first);
DataCoolie's eight built-ins claim the slots 10 / 20 / 30 / 35 / 60 / 70 / 80 / 90,
leaving 40–50 free for user plugins.
The eight built-ins (order → responsibility)¶
| Order | Name | Responsibility |
|---|---|---|
| 10 | SchemaConverter |
Cast columns to schema_hints types first — everything downstream sees the target schema. |
| 20 | Deduplicator |
Drop duplicates by transform.deduplicate_columns (partition keys) and dataflow.order_columns (latest-row selector, from transform.latest_data_columns or source.watermark_columns) before any compute work is wasted on them. |
| 30 | ColumnAdder |
User-configured calculated columns from transform.additional_columns. |
| 35 | RowFilter |
Discard rows by transform.filter_expression after computed columns exist but before SCD2 logic. |
| 60 | SCD2ColumnAdder |
For load_type="scd2" only: copy scd2_effective_column into __valid_from, seed __valid_to = NULL, __is_current = true. No-op otherwise. |
| 70 | SystemColumnAdder |
Framework audit columns: __created_at, __updated_at, __updated_by. |
| 80 | PartitionHandler |
Derive partition values from SQL expressions. |
| 90 | ColumnNameSanitizer |
Last — sanitise casing and special chars for the destination engine. |
The default pipeline assembled by DataCoolieDriver:
DEFAULT_TRANSFORMERS = [
"schema_converter", # 10
"deduplicator", # 20
"column_adder", # 30
"row_filter", # 35 — post-column_adder, pre-scd2
"scd2_column_adder", # 60
"system_column_adder", # 70
"partition_handler", # 80
"column_name_sanitizer", # 90
]
(List order is informational; TransformerPipeline sorts by the transformer's
order attribute, so ColumnAdder (30) still runs before RowFilter (35)
and SCD2ColumnAdder (60).)
Why slots jump from 35 to 60?¶
Slots 40–50 are reserved for your plugins. A common third-party addition is a PII masker at 40 or a validator at 50. See ADR-0003.
Tracking applied transformers¶
BaseTransformer exposes:
_mark_applied()— recordClassNameintransformers_applied_mark_applied("detail")— recordClassName(detail)_mark_skipped()— record nothing (no-op transformer)- no call → default: record
ClassName
The tracking label ends up in TransformRuntimeInfo.transformers_applied and
is surfaced by ETLLogger as a column on every dataflow_entry.
Failure semantics¶
Transformers that raise TransformError are not wrapped — the driver sees
the original exception. Any other exception is wrapped by the pipeline into a
TransformError with details={"applied": [...so_far]} so the log records
which transformers ran before the failure.