Skip to content

Transformers

base

Abstract base classes for transformers and the transformer pipeline.

BaseTransformer[DF] defines the contract for individual transformers. TransformerPipeline orchestrates multiple transformers in priority order, tracking runtime info.

BaseTransformer

Bases: ABC, Generic[DF]

Abstract base class for a single transformation step.

Each transformer has an :attr:order that determines execution priority within the pipeline (lower values execute first).

Tracking

Call :meth:_mark_applied or :meth:_mark_skipped inside :meth:transform to control what the pipeline records in transformers_applied.

  • _mark_applied() → record ClassName
  • _mark_applied("detail") → record ClassName(detail)
  • _mark_skipped() → do not record anything
  • (no call) → default: record ClassName

applied_label property

applied_label: str | None

Label resolved after :meth:transform returns.

Returns:

Type Description
str | None

A string label to record, or None to skip recording.

name property

name: str

Human-readable transformer name (class name by default).

order abstractmethod property

order: int

Execution priority (lower = earlier).

transform abstractmethod

transform(df: DF, dataflow: DataFlow) -> DF

Apply the transformation to a DataFrame.

Parameters:

Name Type Description Default
df DF

Input DataFrame.

required
dataflow DataFlow

Full pipeline configuration for context.

required

Returns:

Type Description
DF

Transformed DataFrame.

TransformerPipeline

TransformerPipeline(engine: BaseEngine[DF])

Bases: Generic[DF]

Ordered pipeline of transformers with runtime tracking.

Transformers are sorted by :attr:BaseTransformer.order and executed sequentially. Runtime info (timing, status, applied transformer names) is tracked for observability.

transformers property

transformers: List[BaseTransformer[DF]]

Return transformers sorted by execution order.

add_transformer

add_transformer(transformer: BaseTransformer[DF]) -> None

Add a transformer to the pipeline.

clear

clear() -> None

Remove all transformers from the pipeline.

get_runtime_info

get_runtime_info() -> TransformRuntimeInfo

Return runtime information from the most recent transform.

remove_transformer

remove_transformer(transformer_class: type) -> bool

Remove all transformers of a given class.

Returns:

Type Description
bool

True if any were removed.

transform

transform(df: DF, dataflow: DataFlow) -> DF

Run all transformers in order.

Parameters:

Name Type Description Default
df DF

Input DataFrame.

required
dataflow DataFlow

Full pipeline configuration.

required

Returns:

Type Description
DF

Transformed DataFrame.

Raises:

Type Description
TransformError

If any transformer fails.

schema_converter

Schema conversion transformer.

Casts DataFrame columns according to schema hints defined in the :class:Transform configuration. Also handles timestamp_ntztimestamp conversion. Type resolution (SQL alias → engine-native type) is delegated to each engine's :meth:~BaseEngine.cast_column.

SchemaConverter

SchemaConverter(engine: BaseEngine[DF])

Bases: BaseTransformer[DF]

Cast columns per schema hints (order = 10).

Processing
  1. Convert timestamp_ntz columns to timestamp (engine hook).
  2. If source.connection.use_schema_hint is truthy and hints exist, cast each matching column using the raw type string from the hint — type resolution is handled by the engine.

transform

transform(df: DF, dataflow: DataFlow) -> DF

Apply schema conversions.

deduplicator

Deduplication transformer.

Removes duplicate rows based on partition (dedup) columns and ordering columns. Uses RANK-based dedup for MERGE_OVERWRITE load type and ROW_NUMBER-based dedup otherwise.

Deduplicator

Deduplicator(engine: BaseEngine[DF])

Bases: BaseTransformer[DF]

Deduplicate DataFrame rows (order = 20).

Decision logic
  • order_cols = dataflow.order_columns (latest_data_columns with fallback to source.watermark_columns).
  • If deduplicate_columns or order_cols is empty, the transformer is a no-op (returns input unchanged).
  • Uses :meth:engine.deduplicate_by_rank (keeps ties) when:

  • load_type == MERGE_OVERWRITE and merge_keys is non-empty and transform.deduplicate_columns is None (partition resolved implicitly from merge keys).

  • or transform.configure["deduplicate_by_rank"] is True.

  • Otherwise → :meth:engine.deduplicate (strict ROW_NUMBER).

transform

transform(df: DF, dataflow: DataFlow) -> DF

Apply deduplication if configured.

column_adder

Column adder transformers.

:class:ColumnAdder removes stale system columns and adds computed (user-defined) columns. :class:SystemColumnAdder appends the standard audit columns as the final step in the pipeline.

ColumnAdder

ColumnAdder(engine: BaseEngine[DF])

Bases: BaseTransformer[DF]

Remove stale system columns and add computed columns (order = 30).

Processing steps
  1. Remove existing system columns (__created_at, etc.).
  2. Add user-defined additional columns.

transform

transform(df: DF, dataflow: DataFlow) -> DF

Apply column additions.

SCD2ColumnAdder

SCD2ColumnAdder(engine: BaseEngine[DF])

Bases: BaseTransformer[DF]

Add SCD2 tracking columns for scd2 load type (order = 60).

Adds __valid_from (from the effective column), __valid_to (NULL), and __is_current (true) so downstream MERGE can close old rows and insert new versions.

transform

transform(df: DF, dataflow: DataFlow) -> DF

Add SCD2 columns if load_type is scd2, otherwise no-op.

SystemColumnAdder

SystemColumnAdder(engine: BaseEngine[DF], author: str = DEFAULT_AUTHOR)

Bases: BaseTransformer[DF]

Append system audit columns as the final pipeline step (order = 70).

Adds __created_at, __updated_at, and __updated_by to every DataFrame. Running last ensures that column name sanitization never touches system column names.

transform

transform(df: DF, dataflow: DataFlow) -> DF

Add system audit columns.

partition_handler

Partition handler transformer.

Generates partition columns from SQL expressions defined in the destination partition_columns configuration.

PartitionHandler

PartitionHandler(engine: BaseEngine[DF])

Bases: BaseTransformer[DF]

Generate partition columns from expressions (order = 80).

For each :class:PartitionColumn with an expression, adds a new column (or overwrites an existing one) using :meth:engine.add_column. Columns without expressions are assumed to already exist in the DataFrame.

transform

transform(df: DF, dataflow: DataFlow) -> DF

Add partition columns where expressions are defined.

column_name_sanitizer

Column name sanitizer transformer.

Renames DataFrame columns using the chosen case-conversion mode:

lower (default) Lowercases the name without inserting underscores at word boundaries. "MyColumn""mycolumn", "HTTPStatus""httpstatus".

snake Converts camelCase / PascalCasesnake_case. "MyColumn""my_column", "HTTPStatus""http_status".

Both modes apply the same clean-up rules:

  • Replaces special characters (spaces, hyphens, dots, etc.) with _.
  • Collapses consecutive underscores and strips leading / trailing ones.
  • Prefixes names that start with a digit with _.
  • Columns whose names already start with _ are left unchanged.

ColumnNameSanitizer

ColumnNameSanitizer(engine: BaseEngine[DF], *, mode: ColumnCaseMode = LOWER)

Bases: BaseTransformer[DF]

Rename columns using the chosen case-conversion mode (order = 90).

Runs after :class:SystemColumnAdder so the framework audit columns keep their canonical names. All prior transformers (schema conversion, dedup, additional columns, partition columns) still work with the original business column names. Only columns whose names actually change are renamed. Columns that already start with _ (e.g. system columns added in the current or a previous run) are skipped.

Parameters:

Name Type Description Default
engine BaseEngine[DF]

Data engine instance.

required
mode ColumnCaseMode

Case-conversion mode — ColumnCaseMode.LOWER (default) or ColumnCaseMode.SNAKE.

LOWER

transform

transform(df: DF, dataflow: DataFlow) -> DF

Rename columns that are not already clean, then reorder so that system / file-info columns appear last.

to_lower_case

to_lower_case(name: str) -> str

Convert an arbitrary column name to lowercased form.

Column names that already start with an underscore (e.g. system columns such as __created_at) are returned unchanged.

Examples::

"MyColumn"     → "mycolumn"
"HTTPStatus"   → "httpstatus"
"order-date"   → "order_date"
"Column Name"  → "column_name"
"123"          → "_123"
"col@#name"    → "col_name"
"__created_at" → "__created_at"  # unchanged

to_snake_case

to_snake_case(name: str) -> str

Convert an arbitrary column name to snake_case.

Column names that already start with an underscore (e.g. system columns such as __created_at) are returned unchanged.

Examples::

"MyColumn"     → "my_column"
"order-date"   → "order_date"
"Column Name"  → "column_name"
"HTTPStatus"   → "http_status"
"123"          → "_123"
"col@#name"    → "col_name"
"__created_at" → "__created_at"  # unchanged