Engines¶
TL;DR BaseEngine[DF] is a generic ABC that abstracts read / write /
merge / transform / maintenance across DataFrame libraries. All format-aware
methods take a fmt= parameter so Delta and Iceberg share the same surface.
The DF type parameter¶
from datacoolie.engines.base import BaseEngine
class SparkEngine(BaseEngine[pyspark.sql.DataFrame]): ...
class PolarsEngine(BaseEngine[polars.DataFrame]): ...
Sources, destinations, and transformers are parameterised by the same DF, so
the type system prevents you from passing a Polars DataFrame to a Spark
destination.
Method sections¶
BaseEngine organises its API into sections that match the abstraction in
src/datacoolie/engines/base.py:
| Section | Methods |
|---|---|
| Construction | __init__, platform, set_platform |
| Read | read_parquet, read_delta, read_iceberg, read_csv, read_json, read_jsonl, read_avro, read_excel, read_path, read_database, read_table, create_dataframe, execute_sql |
| Write | write_to_path, write_to_table |
| Merge | merge_to_path, merge_overwrite_to_path, scd2_to_path, merge_to_table, merge_overwrite_to_table, scd2_to_table |
| Transform | add_column, drop_columns, select_columns, rename_column, filter_rows, apply_watermark_filter, deduplicate, deduplicate_by_rank, cast_column |
| System columns | add_system_columns, add_file_info_columns, remove_system_columns, convert_timestamp_ntz_to_timestamp |
| Metrics | count_rows, is_empty, get_columns, get_schema, get_max_values, get_count_and_max_values |
| Maintenance | table_exists_by_path, table_exists_by_name, get_history_*, compact_*, cleanup_* |
| Navigation (concrete dispatch) | read, write, merge, merge_overwrite, scd2, exists, get_history, compact, cleanup |
The navigation group (read, write, ...) is concrete on BaseEngine — it
dispatches on connection type and format to the right abstract method. You
rarely override it.
The fmt contract¶
Format-aware methods take a fmt string ("delta", "iceberg", "parquet",
"csv", ...). This single parameter unifies lakehouse formats across engines:
engine.read_table("`cat`.`db`.`sales`.`orders`", fmt="iceberg")
engine.merge_to_table(df, table, keys=["id"], fmt="delta", options={"overwriteSchema": "true"})
engine.table_exists_by_name(table, fmt="iceberg")
Rules:
fmtdefaults to"delta"where it would otherwise be required — legacy code that predates Iceberg keeps working.merge_to_table/read_table/table_exists_by_nameall acceptfmt.table_exists_by_nameuses keyword-onlyfmt(*, fmt="delta").- Engines raise
EngineErrorfor unsupportedfmtvalues rather than silently falling back.
See ADR-0001 for history.
Driver connection keys¶
BaseEngine.DRIVER_CONNECTION_KEYS is a frozenset of JDBC-specific option keys
(encrypt, trustServerCertificate, hostNameInCertificate) that must not
leak into higher-level connection APIs (connectorx, SQLAlchemy). Spark folds
them into the JDBC URL; Polars strips them before handing options to
connectorx. Extend the set when adding new driver-specific keys.
Platform attachment¶
An engine is useless without a platform. There are three valid states:
- Construct engine with platform:
SparkEngine(spark, platform=p). - Attach later:
engine.set_platform(p)before the driver runs. - Let the driver attach: pass
platform=toDataCoolieDriver(...).
For Spark, the explicit form SparkEngine(spark_session=spark, platform=p) is
the current constructor name; the positional call works too, but the keyword is
clearer in docs.
The driver guards against mismatched platform types to prevent silent "works on my laptop, fails in Fabric" bugs.
Case-insensitive column resolution¶
BaseEngine._resolve_column_name and _resolve_column_names do a
case-insensitive lookup against an actual DataFrame schema. Most transformers
use these helpers so users can write amount in metadata even when Spark
inferred AMOUNT.