Skip to content

Sources & destinations

TL;DR Sources and destinations are plugins keyed by format name. A FileReader serves parquet, csv, json, jsonl, avro, excel — the plugin registry maps a format string to the reader/writer class at runtime.

Registry mapping

From pyproject.toml (see Plugin entry points for the generated table):

Format Source Destination
delta DeltaReader DeltaWriter
iceberg IcebergReader IcebergWriter
parquet / csv / json / jsonl / avro / excel FileReader FileWriter*
sql DatabaseReader
api APIReader
function PythonFunctionReader

* Excel is read-only; FileWriter handles the writable formats.

Source contract

BaseSourceReader[DF] declares:

  • __init__(engine) — the reader is constructed with an engine
  • read(source, watermark_start, *, watermark_start_operator=">", watermark_end=None, watermark_end_operator="<") -> DF — public entry point (Template Method)
  • Subclasses implement _read_internal(source, watermark_start, *, watermark_end=None) and optionally _read_data(source, configure)
  • Must honour source.read_options and source.connection.read_options

watermark_start is the lower bound (previously named watermark). watermark_start_operator controls the comparison (">" for normal ETL, ">=" for replay's inclusive lower bound). watermark_end provides an optional upper ceiling for replay chunks; watermark_end_operator defaults to "<" (exclusive).

The source reader is also responsible for watermark push-down — applying the watermark predicate during the read when the backend supports it (e.g. parquet predicate pushdown, JDBC WHERE clause). If pushdown is not possible the reader returns the full DataFrame and the framework filters afterwards.

filter_expression (post-read filter)

After watermark filtering, every reader calls _apply_filter_expression(df, source). When source.filter_expression is non-empty, the engine evaluates it as a SQL WHERE clause against the in-memory DataFrame:

watermark filter  →  source.filter_expression  →  count / new watermark

This lets you exclude rows at read time using raw source columns before the transformer pipeline runs. For database readers the expression is pushed into the generated WHERE clause alongside the watermark condition.

Secret resolution

Before the reader is created, the driver calls _resolve_connection_secrets() which processes connection.secrets_ref and replaces placeholder values in configure with actual credentials from the active secret provider. Readers never handle secret fetching directly.

Destination contract

BaseDestinationWriter[DF] declares:

  • __init__(engine)
  • write(df, dataflow) -> DestinationRuntimeInfo — public entry point (Template Method)
  • Subclasses implement _write_internal(df, dataflow) -> None
  • run_maintenance(dataflow, *, do_compact=True, do_cleanup=True, retention_hours=None) -> DestinationRuntimeInfo

write dispatches on load_type:

  • appendengine.write_to_*(mode="append")
  • overwrite / full_loadengine.write_to_*(mode="overwrite")
  • merge_upsertengine.merge_to_*
  • merge_overwriteengine.merge_overwrite_to_*
  • scd2engine.scd2_*

See Load strategies for semantics.

Non-obvious behaviours

  • _attach_schema_hints uses the source connection/table, not the destination. The goal is to cast the incoming DataFrame into the declared column types before writing.
  • Column-case mode defaults to ColumnCaseMode.LOWER on the driver — the ColumnNameSanitizer downcases and strips at write time.
  • Decimal precision / scale are honoured when writing to Delta; some Polars readers upcast to Float64 by default — SchemaHint with data_type="decimal" fixes that.
  • Two filter points existsource.filter_expression (read-time, raw columns) and transform.filter_expression (order 35, after ColumnAdder creates computed columns). See Transformers & pipeline for the pipeline ordering.