Skip to content

Write an engine

Prerequisites · You have a DataFrame library you want to run DataCoolie pipelines on · you're ready to implement ~40 abstract methods. End state · A new engine that passes the engine conformance tests and can be selected via create_engine("mylib").

Large surface area

BaseEngine has ~40 abstract methods. Expect a multi-week effort. Start by copying datacoolie.engines.polars_engine.PolarsEngine as a template — it's the smaller of the two built-ins.

Skeleton

from datacoolie.engines.base import BaseEngine
import mylib


class MyLibEngine(BaseEngine[mylib.DataFrame]):
    def __init__(self, platform=None):
        super().__init__(platform)

    # --- Read ---
    def read_parquet(self, path, options=None): ...
    def read_delta(self, path, options=None):  ...
    def read_iceberg(self, path, options=None): ...
    # ... etc. for csv, json, jsonl, avro, excel

    def read_path(self, path, fmt, options=None):
        # Dispatch on fmt → the right abstract reader
        ...

    def read_database(self, *, table=None, query=None, options=None): ...
    def read_table(self, table_name, fmt="delta", options=None): ...
    def create_dataframe(self, records): ...
    def execute_sql(self, sql, parameters=None): ...

    # --- Write ---
    def write_to_path(self, df, path, mode, fmt, partition_columns=None, options=None): ...
    def write_to_table(self, df, table_name, mode, fmt, partition_columns=None, options=None): ...

    # --- Merge ---
    def merge_to_path(self, df, path, merge_keys, fmt="delta", partition_columns=None, options=None): ...
    def merge_overwrite_to_path(self, df, path, merge_keys, fmt="delta", partition_columns=None, options=None): ...
    def merge_to_table(self, df, table_name, merge_keys, fmt="delta", options=None): ...
    def merge_overwrite_to_table(self, df, table_name, merge_keys, fmt="delta", options=None): ...

    # --- Transform, system columns, metrics, maintenance, SCD2 ---
    # (see BaseEngine for the full list)

fmt parameter contract

Every format-aware method must accept a fmt string. read_table, merge_to_table, and table_exists_by_name have contract-specific signatures:

def read_table(self, table_name: str, fmt: str = "delta", options=None): ...
def merge_to_table(self, df, table_name, merge_keys, fmt: str = "delta", options=None): ...
def table_exists_by_name(self, table_name: str, *, fmt: str = "delta") -> bool: ...

table_exists_by_name uses keyword-only fmt.

See ADR-0001.

Register

[project.entry-points."datacoolie.engines"]
mylib = "mypkg.engine:MyLibEngine"

Conformance

Run the framework's engine tests against your implementation. At minimum:

  • All tests under tests/unit/engines/ that don't bind to a specific DataFrame class.
  • The full tests/unit/transformers/ suite — transformers exercise many engine methods.
  • The full tests/unit/destinations/ suite.

For reference the Polars engine ships 73 dedicated tests; expect similar coverage for a new engine.

delete_by_window — Range-based delete

Engines must implement delete_by_window_path and delete_by_window_table to support the replace_by_watermark destination feature:

@abstractmethod
def delete_by_window_path(
    self,
    path: str,
    window: Dict[str, tuple],
    fmt: str = "delta",
) -> None:
    """Delete rows in a path-based table within the value window."""

@abstractmethod
def delete_by_window_table(
    self,
    table_name: str,
    window: Dict[str, tuple],
    fmt: str = "delta",
) -> None:
    """Delete rows in a named table within the value window."""

The window dict maps column names to (lower_bound, upper_bound) tuples. Build a predicate like col > lower AND col <= upper for each entry and delete all matching rows. The lower bound is exclusive to match the source watermark filter semantics.

The base class dispatches to these via delete_by_window() — you only need to implement the two abstract variants.