Transformers¶
base
¶
Abstract base classes for transformers and the transformer pipeline.
BaseTransformer[DF] defines the contract for individual transformers.
TransformerPipeline orchestrates multiple transformers in priority
order, tracking runtime info.
BaseTransformer
¶
Bases: ABC, Generic[DF]
Abstract base class for a single transformation step.
Each transformer has an :attr:order that determines execution
priority within the pipeline (lower values execute first).
Tracking¶
Call :meth:_mark_applied or :meth:_mark_skipped inside
:meth:transform to control what the pipeline records in
transformers_applied.
_mark_applied()→ recordClassName_mark_applied("detail")→ recordClassName(detail)_mark_skipped()→ do not record anything- (no call) → default: record
ClassName
TransformerPipeline
¶
TransformerPipeline(engine: BaseEngine[DF])
Bases: Generic[DF]
Ordered pipeline of transformers with runtime tracking.
Transformers are sorted by :attr:BaseTransformer.order and executed
sequentially. Runtime info (timing, status, applied transformer names)
is tracked for observability.
transformers
property
¶
transformers: List[BaseTransformer[DF]]
Return transformers sorted by execution order.
add_transformer
¶
add_transformer(transformer: BaseTransformer[DF]) -> None
Add a transformer to the pipeline.
get_runtime_info
¶
Return runtime information from the most recent transform.
remove_transformer
¶
Remove all transformers of a given class.
Returns:
| Type | Description |
|---|---|
bool
|
|
transform
¶
Run all transformers in order.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
df
|
DF
|
Input DataFrame. |
required |
dataflow
|
DataFlow
|
Full pipeline configuration. |
required |
Returns:
| Type | Description |
|---|---|
DF
|
Transformed DataFrame. |
Raises:
| Type | Description |
|---|---|
TransformError
|
If any transformer fails. |
schema_converter
¶
Schema conversion transformer.
Casts DataFrame columns according to schema hints defined in the
:class:Transform configuration. Also handles timestamp_ntz →
timestamp conversion. Type resolution (SQL alias → engine-native
type) is delegated to each engine's :meth:~BaseEngine.cast_column.
SchemaConverter
¶
SchemaConverter(engine: BaseEngine[DF])
Bases: BaseTransformer[DF]
Cast columns per schema hints (order = 10).
Processing
- Convert
timestamp_ntzcolumns totimestamp(engine hook). - If
source.connection.use_schema_hintis truthy and hints exist, cast each matching column using the raw type string from the hint — type resolution is handled by the engine.
deduplicator
¶
Deduplication transformer.
Removes duplicate rows based on partition (dedup) columns and ordering
columns. Uses RANK-based dedup for MERGE_OVERWRITE load type
and ROW_NUMBER-based dedup otherwise.
Deduplicator
¶
Deduplicator(engine: BaseEngine[DF])
Bases: BaseTransformer[DF]
Deduplicate DataFrame rows (order = 20).
Decision logic
order_cols=dataflow.order_columns(latest_data_columnswith fallback tosource.watermark_columns).- If
deduplicate_columnsororder_colsis empty, the transformer is a no-op (returns input unchanged). -
Uses :meth:
engine.deduplicate_by_rank(keeps ties) when: -
load_type == MERGE_OVERWRITEandmerge_keysis non-empty andtransform.deduplicate_columnsisNone(partition resolved implicitly from merge keys). -
or
transform.configure["deduplicate_by_rank"]isTrue. -
Otherwise → :meth:
engine.deduplicate(strict ROW_NUMBER).
column_adder
¶
Column adder transformers.
:class:ColumnAdder removes stale system columns and adds computed
(user-defined) columns. :class:SystemColumnAdder appends the standard
audit columns as the final step in the pipeline.
ColumnAdder
¶
ColumnAdder(engine: BaseEngine[DF])
SCD2ColumnAdder
¶
SCD2ColumnAdder(engine: BaseEngine[DF])
Bases: BaseTransformer[DF]
Add SCD2 tracking columns for scd2 load type (order = 60).
Adds __valid_from (from the effective column), __valid_to (NULL),
and __is_current (true) so downstream MERGE can close old rows and
insert new versions.
SystemColumnAdder
¶
SystemColumnAdder(engine: BaseEngine[DF], author: str = DEFAULT_AUTHOR)
Bases: BaseTransformer[DF]
Append system audit columns as the final pipeline step (order = 70).
Adds __created_at, __updated_at, and __updated_by to every
DataFrame. Running last ensures that column name sanitization never
touches system column names.
partition_handler
¶
Partition handler transformer.
Generates partition columns from SQL expressions defined in the
destination partition_columns configuration.
PartitionHandler
¶
PartitionHandler(engine: BaseEngine[DF])
Bases: BaseTransformer[DF]
Generate partition columns from expressions (order = 80).
For each :class:PartitionColumn with an expression, adds
a new column (or overwrites an existing one) using
:meth:engine.add_column. Columns without expressions are
assumed to already exist in the DataFrame.
column_name_sanitizer
¶
Column name sanitizer transformer.
Renames DataFrame columns using the chosen case-conversion mode:
lower (default)
Lowercases the name without inserting underscores at word boundaries.
"MyColumn" → "mycolumn", "HTTPStatus" → "httpstatus".
snake
Converts camelCase / PascalCase → snake_case.
"MyColumn" → "my_column", "HTTPStatus" → "http_status".
Both modes apply the same clean-up rules:
- Replaces special characters (spaces, hyphens, dots, etc.) with
_. - Collapses consecutive underscores and strips leading / trailing ones.
- Prefixes names that start with a digit with
_. - Columns whose names already start with
_are left unchanged.
ColumnNameSanitizer
¶
ColumnNameSanitizer(engine: BaseEngine[DF], *, mode: ColumnCaseMode = LOWER)
Bases: BaseTransformer[DF]
Rename columns using the chosen case-conversion mode (order = 90).
Runs after :class:SystemColumnAdder so the framework audit columns keep
their canonical names. All prior transformers (schema conversion, dedup,
additional columns, partition columns) still work with the original
business column names. Only columns whose names actually change are
renamed. Columns that already start with _ (e.g. system columns added
in the current or a previous run) are skipped.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
engine
|
BaseEngine[DF]
|
Data engine instance. |
required |
mode
|
ColumnCaseMode
|
Case-conversion mode — |
LOWER
|
to_lower_case
¶
Convert an arbitrary column name to lowercased form.
Column names that already start with an underscore (e.g. system
columns such as __created_at) are returned unchanged.
Examples::
"MyColumn" → "mycolumn"
"HTTPStatus" → "httpstatus"
"order-date" → "order_date"
"Column Name" → "column_name"
"123" → "_123"
"col@#name" → "col_name"
"__created_at" → "__created_at" # unchanged
to_snake_case
¶
Convert an arbitrary column name to snake_case.
Column names that already start with an underscore (e.g. system
columns such as __created_at) are returned unchanged.
Examples::
"MyColumn" → "my_column"
"order-date" → "order_date"
"Column Name" → "column_name"
"HTTPStatus" → "http_status"
"123" → "_123"
"col@#name" → "col_name"
"__created_at" → "__created_at" # unchanged