Write an engine¶
Prerequisites · You have a DataFrame library you want to run DataCoolie pipelines on · you're ready to implement ~40 abstract methods.
End state · A new engine that passes the engine conformance tests and can be selected via create_engine("mylib").
Large surface area
BaseEngine has ~40 abstract methods. Expect a multi-week effort. Start
by copying datacoolie.engines.polars_engine.PolarsEngine as a template —
it's the smaller of the two built-ins.
Skeleton¶
from datacoolie.engines.base import BaseEngine
import mylib
class MyLibEngine(BaseEngine[mylib.DataFrame]):
def __init__(self, platform=None):
super().__init__(platform)
# --- Read ---
def read_parquet(self, path, options=None): ...
def read_delta(self, path, options=None): ...
def read_iceberg(self, path, options=None): ...
# ... etc. for csv, json, jsonl, avro, excel
def read_path(self, path, fmt, options=None):
# Dispatch on fmt → the right abstract reader
...
def read_database(self, *, table=None, query=None, options=None): ...
def read_table(self, table_name, fmt="delta", options=None): ...
def create_dataframe(self, records): ...
def execute_sql(self, sql, parameters=None): ...
# --- Write ---
def write_to_path(self, df, path, mode, fmt, partition_columns=None, options=None): ...
def write_to_table(self, df, table_name, mode, fmt, partition_columns=None, options=None): ...
# --- Merge ---
def merge_to_path(self, df, path, merge_keys, fmt="delta", partition_columns=None, options=None): ...
def merge_overwrite_to_path(self, df, path, merge_keys, fmt="delta", partition_columns=None, options=None): ...
def merge_to_table(self, df, table_name, merge_keys, fmt="delta", options=None): ...
def merge_overwrite_to_table(self, df, table_name, merge_keys, fmt="delta", options=None): ...
# --- Transform, system columns, metrics, maintenance, SCD2 ---
# (see BaseEngine for the full list)
fmt parameter contract¶
Every format-aware method must accept a fmt string. read_table,
merge_to_table, and table_exists_by_name have contract-specific
signatures:
def read_table(self, table_name: str, fmt: str = "delta", options=None): ...
def merge_to_table(self, df, table_name, merge_keys, fmt: str = "delta", options=None): ...
def table_exists_by_name(self, table_name: str, *, fmt: str = "delta") -> bool: ...
table_exists_by_name uses keyword-only fmt.
See ADR-0001.
Register¶
Conformance¶
Run the framework's engine tests against your implementation. At minimum:
- All tests under
tests/unit/engines/that don't bind to a specific DataFrame class. - The full
tests/unit/transformers/suite — transformers exercise many engine methods. - The full
tests/unit/destinations/suite.
For reference the Polars engine ships 73 dedicated tests; expect similar coverage for a new engine.
delete_by_window — Range-based delete¶
Engines must implement delete_by_window_path and delete_by_window_table
to support the replace_by_watermark destination feature:
@abstractmethod
def delete_by_window_path(
self,
path: str,
window: Dict[str, tuple],
fmt: str = "delta",
) -> None:
"""Delete rows in a path-based table within the value window."""
@abstractmethod
def delete_by_window_table(
self,
table_name: str,
window: Dict[str, tuple],
fmt: str = "delta",
) -> None:
"""Delete rows in a named table within the value window."""
The window dict maps column names to (lower_bound, upper_bound) tuples.
Build a predicate like col > lower AND col <= upper for each entry and delete
all matching rows. The lower bound is exclusive to match the source
watermark filter semantics.
The base class dispatches to these via delete_by_window() — you only need to
implement the two abstract variants.