Skip to content

Engines

base

Abstract base class for DataFrame engines.

BaseEngine[DF] is a generic ABC parameterised by DataFrame type. Concrete implementations (Spark, Polars) bind DF to their native DataFrame class.

Section layout

  • Construction__init__, platform, set_platform
  • Readread_parquet, read_delta, read_iceberg, read_csv, read_json, read_jsonl, read_avro, read_excel, read_path, read_database, create_dataframe, execute_sql, read_table
  • Writewrite_to_path, write_to_table
  • Mergemerge_to_path, merge_overwrite_to_path, merge_to_table, merge_overwrite_to_table
  • Transformadd_column, drop_columns, select_columns, rename_column, filter_rows, apply_watermark_filter, deduplicate, deduplicate_by_rank, cast_column
  • System columnsadd_system_columns, add_file_info_columns, remove_system_columns, convert_timestamp_ntz_to_timestamp
  • Metricscount_rows, is_empty, get_columns, get_schema, get_max_values, get_count_and_max_values
  • Maintenancetable_exists_by_path, table_exists_by_name, get_history_by_path, get_history_by_name, compact_by_path, compact_by_name, cleanup_by_path, cleanup_by_name
  • Navigation (concrete dispatchers) — read, write, merge, merge_overwrite, exists, get_history, compact, cleanup
  • SCD Type 2scd2_to_path, scd2_to_table, scd2

DF module-attribute

DF = TypeVar('DF')

BaseEngine

BaseEngine(platform: Optional[BasePlatform] = None)

Bases: ABC, Generic[DF]

Engine abstraction for DataFrame-based read / write / transform.

Type parameter DF is the concrete DataFrame class (e.g. pyspark.sql.DataFrame or polars.DataFrame).

platform property

platform: Optional[BasePlatform]

The platform attached to this engine, if any.

add_column abstractmethod

add_column(df: DF, column_name: str, expression: str) -> DF

Add (or replace) a column using a SQL expression.

add_file_info_columns abstractmethod

add_file_info_columns(df: DF, file_infos: Optional[List[FileInfo]] = None) -> DF

Add __file_name, __file_path, __file_modification_time.

Engines that embed the source path at scan time (e.g. Polars via include_file_paths) map file_infos onto the already-present __file_path column to inject __file_name and __file_modification_time. Engines with native metadata support (e.g. Spark _metadata) ignore file_infos entirely.

Parameters:

Name Type Description Default
df DF

Input DataFrame — for Polars this must already contain the __file_path column added by the scan reader.

required
file_infos Optional[List[FileInfo]]

Ordered list of :class:~datacoolie.platforms.base.FileInfo objects used to resolve name / modification-time per path, or None when only the path itself is available.

None

add_system_columns abstractmethod

add_system_columns(df: DF, author: Optional[str] = None) -> DF

Add __created_at, __updated_at, __updated_by.

apply_watermark_filter abstractmethod

apply_watermark_filter(df: DF, watermark_columns: List[str], watermark: Dict[str, Any]) -> DF

Filter rows where any watermark column exceeds its stored value.

Builds an OR condition: col1 > val1 OR col2 > val2 ... using native DataFrame API.

Parameters:

Name Type Description Default
df DF

Input DataFrame.

required
watermark_columns List[str]

Column names to compare.

required
watermark Dict[str, Any]

{column: threshold_value} mapping.

required

Returns:

Type Description
DF

Filtered DataFrame.

cast_column abstractmethod

cast_column(df: DF, column_name: str, target_type: str, fmt: Optional[str] = None) -> DF

Cast a column to target_type, optionally using fmt.

cleanup

cleanup(*, table_name: Optional[str] = None, path: Optional[str] = None, retention_hours: int = 168, fmt: str = 'delta', options: Optional[Dict[str, Any]] = None) -> None

Run cleanup on a table name (preferred) or a path.

Parameters:

Name Type Description Default
table_name Optional[str]

Fully qualified table name. Takes precedence over path.

None
path Optional[str]

Table file path (used when table_name is not given).

None
retention_hours int

Minimum age in hours of files to retain.

168
fmt str

Table format ("delta" or "iceberg").

'delta'
options Optional[Dict[str, Any]]

Format-specific options for cleanup sub-operations.

None

Raises:

Type Description
EngineError

If neither table_name nor path is provided.

cleanup_by_name abstractmethod

cleanup_by_name(table_name: str, retention_hours: int = 168, *, fmt: str = 'delta', options: Optional[Dict[str, Any]] = None) -> None

Run cleanup (VACUUM or equivalent) on a named table.

cleanup_by_path abstractmethod

cleanup_by_path(path: str, retention_hours: int = 168, *, fmt: str = 'delta', options: Optional[Dict[str, Any]] = None) -> None

Run cleanup (VACUUM or equivalent) on the table at path.

compact

compact(*, table_name: Optional[str] = None, path: Optional[str] = None, fmt: str = 'delta', options: Optional[Dict[str, Any]] = None) -> None

Run compaction on a table name (preferred) or a path.

Parameters:

Name Type Description Default
table_name Optional[str]

Fully qualified table name. Takes precedence over path.

None
path Optional[str]

Table file path (used when table_name is not given).

None
fmt str

Table format ("delta" or "iceberg").

'delta'
options Optional[Dict[str, Any]]

Format-specific options for compaction sub-operations.

None

Raises:

Type Description
EngineError

If neither table_name nor path is provided.

compact_by_name abstractmethod

compact_by_name(table_name: str, *, fmt: str = 'delta', options: Optional[Dict[str, Any]] = None) -> None

Run compaction (OPTIMIZE or equivalent) on a named table.

compact_by_path abstractmethod

compact_by_path(path: str, *, fmt: str = 'delta', options: Optional[Dict[str, Any]] = None) -> None

Run compaction (OPTIMIZE or equivalent) on the table at path.

convert_timestamp_ntz_to_timestamp abstractmethod

convert_timestamp_ntz_to_timestamp(df: DF) -> DF

Convert all timestamp_ntz (no-TZ) columns to TZ-aware UTC timestamps.

count_rows abstractmethod

count_rows(df: DF) -> int

Return the number of rows in df.

create_dataframe abstractmethod

create_dataframe(records: List[Dict[str, Any]]) -> DF

Create a DataFrame from a list of dicts.

Records may have different keys (heterogeneous schema). Missing fields are filled with null so that the resulting DataFrame has the union of all keys as its columns.

Parameters:

Name Type Description Default
records List[Dict[str, Any]]

List of flat dicts. Values may be None.

required

Returns:

Type Description
DF

A DataFrame with one row per record and one column per

DF

distinct key across all records.

deduplicate abstractmethod

deduplicate(df: DF, partition_columns: List[str], order_columns: Optional[List[str]] = None, order: str = 'desc') -> DF

Remove duplicate rows.

Uses ROW_NUMBER() window partitioned by partition_columns and ordered by order_columns.

deduplicate_by_rank abstractmethod

deduplicate_by_rank(df: DF, partition_columns: List[str], order_columns: List[str], order: str = 'desc') -> DF

Deduplicate using RANK(), keeping ties.

drop_columns abstractmethod

drop_columns(df: DF, columns: List[str]) -> DF

Drop one or more columns from a DataFrame.

execute_sql abstractmethod

execute_sql(sql: str, parameters: Optional[Dict[Any, Any]] = None) -> DF

Execute a SQL query and return a DataFrame.

exists

exists(*, table_name: Optional[str] = None, path: Optional[str] = None, fmt: str = 'delta') -> bool

Return True if the target table or path exists.

Checks by table name first (via :meth:table_exists_by_name), then by path (via :meth:table_exists_by_path). Returns False if neither is provided.

Parameters:

Name Type Description Default
table_name Optional[str]

Fully qualified table name.

None
path Optional[str]

Table file path.

None

filter_rows abstractmethod

filter_rows(df: DF, condition: str) -> DF

Filter rows using a SQL expression.

generate_symlink_manifest(path: str) -> None

Generate a symlink-format manifest for the Delta table at path.

Creates the _symlink_format_manifest/ directory alongside the Delta log, enabling query engines that don't natively support Delta (e.g. Redshift Spectrum) to read the table via SymlinkTextInputFormat.

Parameters:

Name Type Description Default
path str

Root path of the Delta table.

required

get_columns abstractmethod

get_columns(df: DF) -> List[str]

Return the column names of df.

get_count_and_max_values abstractmethod

get_count_and_max_values(df: DF, columns: List[str]) -> Tuple[int, Dict[str, Any]]

Return (row_count, {column: max_value}) in one pass.

get_history

get_history(*, table_name: Optional[str] = None, path: Optional[str] = None, limit: int = 1, start_time: Optional[datetime] = None, end_time: Optional[datetime] = None, fmt: str = 'delta') -> List[Dict[str, Any]]

Return audit history for a table name (preferred) or a path.

Parameters:

Name Type Description Default
table_name Optional[str]

Fully qualified table name. Takes precedence over path.

None
path Optional[str]

Table file path (used when table_name is not given).

None
limit int

Maximum number of history entries to return.

1
start_time Optional[datetime]

If provided, only return entries whose timestamp is

= start_time.

None
end_time Optional[datetime]

If provided, only return entries whose timestamp is <= end_time.

None
fmt str

Table format ("delta" or "iceberg").

'delta'

Raises:

Type Description
EngineError

If neither table_name nor path is provided.

get_history_by_name abstractmethod

get_history_by_name(table_name: str, limit: int = 1, start_time: Optional[datetime] = None, *, end_time: Optional[datetime] = None, fmt: str = 'delta') -> List[Dict[str, Any]]

Return audit history entries for a named table.

Parameters:

Name Type Description Default
table_name str

Fully qualified table name.

required
limit int

Maximum number of history entries to return.

1
start_time Optional[datetime]

If provided, only return entries whose timestamp / made_current_at is >= start_time.

None
end_time Optional[datetime]

If provided, only return entries whose timestamp / made_current_at is <= end_time.

None
fmt str

Table format ("delta" or "iceberg").

'delta'

get_history_by_path abstractmethod

get_history_by_path(path: str, limit: int = 1, start_time: Optional[datetime] = None, *, end_time: Optional[datetime] = None, fmt: str = 'delta') -> List[Dict[str, Any]]

Return audit history entries for the table at path.

Parameters:

Name Type Description Default
path str

Table location.

required
limit int

Maximum number of history entries to return.

1
start_time Optional[datetime]

If provided, only return entries whose timestamp is >= start_time.

None
end_time Optional[datetime]

If provided, only return entries whose timestamp is <= end_time.

None
fmt str

Table format ("delta" or "iceberg").

'delta'

get_hive_schema abstractmethod

get_hive_schema(df: DF) -> Dict[str, str]

Return {column_name: hive_type_string} using native type objects.

Each engine inspects its native DataFrame schema (e.g. pyspark.sql.types, polars.datatypes) to produce Hive/Athena-compatible DDL type strings directly — no string parsing required.

get_max_values abstractmethod

get_max_values(df: DF, columns: List[str]) -> Dict[str, Any]

Return the maximum value for each of columns.

get_schema abstractmethod

get_schema(df: DF) -> Dict[str, str]

Return {column_name: data_type_string}.

is_empty abstractmethod

is_empty(df: DF) -> bool

Return True if the DataFrame has zero rows.

merge

merge(df: DF, *, table_name: Optional[str] = None, path: Optional[str] = None, merge_keys: List[str], fmt: str = 'delta', partition_columns: Optional[List[str]] = None, options: Optional[Dict[str, str]] = None) -> None

Merge (upsert) df into a table name (preferred) or a path.

Parameters:

Name Type Description Default
df DF

Source DataFrame.

required
table_name Optional[str]

Target table name. Takes precedence over path.

None
path Optional[str]

Target file path (used when table_name is not given).

None
merge_keys List[str]

Join columns for the merge condition.

required
fmt str

Table / file format.

'delta'
partition_columns Optional[List[str]]

Optional partition column names.

None
options Optional[Dict[str, str]]

Additional options.

None

Raises:

Type Description
EngineError

If neither table_name nor path is provided.

merge_overwrite

merge_overwrite(df: DF, *, table_name: Optional[str] = None, path: Optional[str] = None, merge_keys: List[str], fmt: str = 'delta', partition_columns: Optional[List[str]] = None, options: Optional[Dict[str, str]] = None) -> None

Merge-overwrite (delete + append) into a table name (preferred) or a path.

Parameters:

Name Type Description Default
df DF

Source DataFrame.

required
table_name Optional[str]

Target table name. Takes precedence over path.

None
path Optional[str]

Target file path (used when table_name is not given).

None
merge_keys List[str]

Join columns for the delete condition.

required
fmt str

Table / file format.

'delta'
partition_columns Optional[List[str]]

Optional partition column names.

None
options Optional[Dict[str, str]]

Additional options.

None

Raises:

Type Description
EngineError

If neither table_name nor path is provided.

merge_overwrite_to_path abstractmethod

merge_overwrite_to_path(df: DF, path: str, merge_keys: List[str], fmt: str = 'delta', partition_columns: Optional[List[str]] = None, options: Optional[Dict[str, str]] = None) -> None

Merge with full-row overwrite (rolling overwrite / SCD1-style).

Existing rows matching merge keys are deleted and re-inserted from the source DataFrame.

merge_overwrite_to_table abstractmethod

merge_overwrite_to_table(df: DF, table_name: str, merge_keys: List[str], fmt: str = 'delta', partition_columns: Optional[List[str]] = None, options: Optional[Dict[str, str]] = None) -> None

Merge-overwrite (delete + append) a named table via SQL MERGE + WriterV2.

Parameters:

Name Type Description Default
df DF

Source DataFrame.

required
table_name str

Fully qualified target table name.

required
merge_keys List[str]

Join columns for the delete condition.

required
fmt str

Table format.

'delta'
partition_columns Optional[List[str]]

Optional partition column names.

None
options Optional[Dict[str, str]]

Additional options.

None

merge_to_path abstractmethod

merge_to_path(df: DF, path: str, merge_keys: List[str], fmt: str = 'delta', partition_columns: Optional[List[str]] = None, options: Optional[Dict[str, str]] = None) -> None

Merge (upsert) a DataFrame into a table at path.

merge_to_table abstractmethod

merge_to_table(df: DF, table_name: str, merge_keys: List[str], fmt: str, partition_columns: Optional[List[str]] = None, options: Optional[Dict[str, str]] = None) -> None

Merge (upsert) a DataFrame into a named table via SQL MERGE.

Parameters:

Name Type Description Default
df DF

Source DataFrame.

required
table_name str

Fully qualified target table name.

required
merge_keys List[str]

Join columns for the merge condition.

required
fmt str

Table format.

required
partition_columns Optional[List[str]]

Optional partition column names.

None
options Optional[Dict[str, str]]

Additional options.

None

read

read(fmt: str, *, table_name: Optional[str] = None, path: Optional[str | list[str]] = None, options: Optional[Dict[str, str]] = None) -> DF

Read data by table name (preferred) or by path.

Parameters:

Name Type Description Default
fmt str

Format string — "delta", "parquet", "csv", or "json".

required
table_name Optional[str]

Fully qualified table name. Takes precedence over path.

None
path Optional[str | list[str]]

File or directory path (used when table_name is not given).

None
options Optional[Dict[str, str]]

Additional reader options.

None

Raises:

Type Description
EngineError

If neither table_name nor path is provided, or if the format has no path reader.

read_avro abstractmethod

read_avro(path: str | list[str], options: Optional[Dict[str, str]] = None) -> DF

Read Avro file(s) into a DataFrame.

read_csv abstractmethod

read_csv(path: str | list[str], options: Optional[Dict[str, str]] = None) -> DF

Read CSV file(s) into a DataFrame.

read_database abstractmethod

read_database(*, table: Optional[str] = None, query: Optional[str] = None, options: Optional[Dict[str, Any]] = None) -> DF

Read from an external database.

Exactly one of table or query must be provided.

Parameters:

Name Type Description Default
table Optional[str]

Plain table or view name (e.g. "schema.orders").

None
query Optional[str]

Full SQL query (e.g. "SELECT * FROM orders WHERE id > 100").

None
options Optional[Dict[str, Any]]

All connection and reader settings in one flat dict. Must include at minimum url and driver. May also carry JDBC tuning keys such as fetchsize, numPartitions, partitionColumn, etc.

None

In Spark this maps to spark.read.format("jdbc").option(...).load(). Use this for reading from external databases. For running queries in the engine's own SQL context, use :meth:execute_sql.

read_delta abstractmethod

read_delta(path: str, options: Optional[Dict[str, str]] = None) -> DF

Read a Delta Lake table into a DataFrame.

read_excel abstractmethod

read_excel(path: str | list[str], options: Optional[Dict[str, str]] = None) -> DF

Read Excel file(s) into a DataFrame.

read_iceberg abstractmethod

read_iceberg(path: str, options: Optional[Dict[str, str]] = None) -> DF

Read an Iceberg table into a DataFrame.

read_json abstractmethod

read_json(path: str | list[str], options: Optional[Dict[str, str]] = None) -> DF

Read JSON file(s) into a DataFrame.

read_jsonl abstractmethod

read_jsonl(path: str | list[str], options: Optional[Dict[str, str]] = None) -> DF

Read JSONL (newline-delimited JSON) file(s) into a DataFrame.

read_parquet abstractmethod

read_parquet(path: str | list[str], options: Optional[Dict[str, str]] = None) -> DF

Read Parquet file(s) into a DataFrame.

read_path abstractmethod

read_path(path: str | list[str], fmt: str, options: Optional[Dict[str, str]] = None) -> DF

Read a file or directory at path using the given format.

This is the unified path-based reader. Implementations should dispatch to the appropriate format reader (read_delta, read_parquet, etc.) based on fmt.

Parameters:

Name Type Description Default
path str | list[str]

File or directory path (scalar or list for multi-path reads).

required
fmt str

Format string — "delta", "iceberg", "parquet", "csv", or "json".

required
options Optional[Dict[str, str]]

Additional reader options.

None

Raises:

Type Description
EngineError

If the format is not supported.

read_table abstractmethod

read_table(table_name: str, fmt: str = 'delta', options: Optional[Dict[str, str]] = None) -> DF

Read a named table and return a DataFrame.

Parameters:

Name Type Description Default
table_name str

Fully qualified table name (e.g. "`catalog`.`db`.`schema`.`table`").

required
fmt str

Table format ("delta", "iceberg", etc.).

'delta'
options Optional[Dict[str, str]]

Additional read options.

None

remove_system_columns

remove_system_columns(df: DF) -> DF

Drop system columns (__created_at, etc.).

This is a concrete convenience method that delegates to :meth:drop_columns.

rename_column abstractmethod

rename_column(df: DF, old_name: str, new_name: str) -> DF

Rename a column.

scd2

scd2(df: DF, *, table_name: Optional[str] = None, path: Optional[str] = None, merge_keys: List[str], fmt: str = 'delta', partition_columns: Optional[List[str]] = None, options: Optional[Dict[str, str]] = None) -> None

SCD Type 2 write to a table name (preferred) or a path.

scd2_to_path

scd2_to_path(df: DF, path: str, merge_keys: List[str], fmt: str = 'delta', partition_columns: Optional[List[str]] = None, options: Optional[Dict[str, str]] = None) -> None

SCD Type 2 write to a path-based table.

Tracks historical changes by closing existing current records (setting scd2_end_date, scd2_is_current = false) and inserting new versions with scd2_start_date, scd2_is_current = true.

scd2_to_table

scd2_to_table(df: DF, table_name: str, merge_keys: List[str], fmt: str = 'delta', partition_columns: Optional[List[str]] = None, options: Optional[Dict[str, str]] = None) -> None

SCD Type 2 write to a named table.

select_columns abstractmethod

select_columns(df: DF, columns: List[str]) -> DF

Return df with only the given columns, in the specified order.

set_platform

set_platform(platform: BasePlatform) -> None

Attach a platform to this engine.

table_exists_by_name abstractmethod

table_exists_by_name(table_name: str, *, fmt: str = 'delta') -> bool

Return True if a named table exists in the catalog.

table_exists_by_path abstractmethod

table_exists_by_path(path: str, *, fmt: str = 'delta') -> bool

Return True if a table exists at path.

write

write(df: DF, *, table_name: Optional[str] = None, path: Optional[str] = None, mode: str, fmt: str, partition_columns: Optional[List[str]] = None, options: Optional[Dict[str, str]] = None) -> None

Write df to a table name (preferred) or to a path.

Parameters:

Name Type Description Default
df DF

DataFrame to write.

required
table_name Optional[str]

Target table name. Takes precedence over path.

None
path Optional[str]

Target file path (used when table_name is not given).

None
mode str

Write mode ("overwrite", "append", …).

required
fmt str

Table / file format.

required
partition_columns Optional[List[str]]

Optional partition column names.

None
options Optional[Dict[str, str]]

Additional writer options.

None

Raises:

Type Description
EngineError

If neither table_name nor path is provided.

write_to_path abstractmethod

write_to_path(df: DF, path: str, mode: str, fmt: str, partition_columns: Optional[List[str]] = None, options: Optional[Dict[str, str]] = None) -> None

Write a DataFrame to a path.

write_to_table abstractmethod

write_to_table(df: DF, table_name: str, mode: str, fmt: str, partition_columns: Optional[List[str]] = None, options: Optional[Dict[str, str]] = None) -> None

Write a DataFrame to a named table using DataFrameWriterV2.

Parameters:

Name Type Description Default
df DF

DataFrame to write.

required
table_name str

Fully qualified table name (e.g. catalog.db.schema.table).

required
mode str

Write mode — LoadType.OVERWRITE ("overwrite"), LoadType.APPEND ("append"), or LoadType.FULL_LOAD ("full_load").

required
fmt str

Table format ("delta", "iceberg").

required
partition_columns Optional[List[str]]

Optional partition column names.

None
options Optional[Dict[str, str]]

Additional writer options.

None

spark_engine

PySpark + Delta Lake engine implementation.

:class:SparkEngine binds :class:BaseEngine to pyspark.sql.DataFrame and implements every abstract method using the Spark DataFrame API and the delta-spark library.

SparkEngine

SparkEngine(spark_session: Optional[SparkSession] = None, config: Optional[Dict[str, str]] = None, platform: Optional[BasePlatform] = None)

Bases: BaseEngine[DataFrame]

PySpark implementation of :class:BaseEngine.

Parameters:

Name Type Description Default
spark_session Optional[SparkSession]

Existing SparkSession (notebook environments). If None, a fresh session with Delta extensions is created.

None
config Optional[Dict[str, str]]

Extra Spark configuration overrides applied on top of :data:DEFAULT_SPARK_CONFIGS.

None
platform Optional[BasePlatform]

Optional platform to attach to this engine immediately.

None

spark property

spark: SparkSession

Return the underlying SparkSession.

spark_major_version property

spark_major_version: int

Return the major version of the Spark runtime (e.g. 3 or 4).

create_dataframe

create_dataframe(records: List[Dict[str, Any]]) -> DataFrame

Create a DataFrame from a list of heterogeneous dicts.

Spark's createDataFrame infers a merged schema automatically when samplingRatio=1.0 is used, filling missing fields with null. An explicit union of all keys is pre-computed so that every row dict contains all columns before passing to Spark, ensuring consistent schema inference even for small datasets.

generate_symlink_manifest(path: str) -> None

Generate a symlink manifest using Spark's DeltaTable API.

get_hive_schema

get_hive_schema(df: DataFrame) -> Dict[str, str]

Return {column_name: hive_type} using native PySpark type objects.

merge_overwrite_to_path

merge_overwrite_to_path(df: DataFrame, path: str, merge_keys: List[str], fmt: str = 'delta', partition_columns: Optional[List[str]] = None, options: Optional[Dict[str, str]] = None) -> None

Rolling overwrite via DeltaTable MERGE DELETE + APPEND.

merge_overwrite_to_table

merge_overwrite_to_table(df: DataFrame, table_name: str, merge_keys: List[str], fmt: str = 'delta', partition_columns: Optional[List[str]] = None, options: Optional[Dict[str, str]] = None) -> None

Merge-overwrite (delete + append) for a named table.

Uses the DataFrame mergeInto API on Spark >= 4.0, otherwise falls back to SQL MERGE DELETE + WriterV2 append. Iceberg schema evolution runs once up-front so both steps observe the updated schema.

merge_to_table

merge_to_table(df: DataFrame, table_name: str, merge_keys: List[str], fmt: str, partition_columns: Optional[List[str]] = None, options: Optional[Dict[str, str]] = None) -> None

Merge (upsert) into a named table.

Uses the DataFrame mergeInto API on Spark >= 4.0, otherwise falls back to SQL MERGE. For Iceberg, schema + partition spec are evolved up-front (metadata-only) so new columns are available before the MERGE runs. MERGE uses explicit column names, so no DataFrame reorder is required.

scd2_to_path

scd2_to_path(df: DataFrame, path: str, merge_keys: List[str], fmt: str = 'delta', partition_columns: Optional[List[str]] = None, options: Optional[Dict[str, str]] = None) -> None

SCD2 via two-step MERGE + APPEND on a path-based Delta table.

Step 1: MERGE to close current rows where source is strictly newer. Step 2: APPEND all source rows as new versions.

scd2_to_table

scd2_to_table(df: DataFrame, table_name: str, merge_keys: List[str], fmt: str = 'delta', partition_columns: Optional[List[str]] = None, options: Optional[Dict[str, str]] = None) -> None

SCD2 via two-step MERGE + APPEND on a named table.

write_to_table

write_to_table(df: DataFrame, table_name: str, mode: str, fmt: str, partition_columns: Optional[List[str]] = None, options: Optional[Dict[str, str]] = None, _skip_iceberg_evolution: bool = False) -> None

Write via DataFrameWriterV2 (df.writeTo).

Supported mode values:

  • LoadType.OVERWRITE / LoadType.FULL_LOAD — create or replace.
  • LoadType.APPEND — append rows. When the target table does not yet exist, behaves as create-or-replace.

Column matching between DataFrame and existing table is case-insensitive, consistent with Spark's default catalog resolver.

polars_engine

Polars + Delta Lake + Iceberg engine implementation.

:class:PolarsEngine binds :class:BaseEngine to polars.LazyFrame and implements every abstract method using the Polars LazyFrame API, the deltalake library, and optional pyiceberg integration.

Reads return lazy frames via scan_* APIs; materialisation (.collect()) happens only at write / merge / metrics boundaries.

Catalog support
  • Iceberg Catalog via iceberg_catalog (a pyiceberg.catalog.Catalog instance).
  • SQLContext for executing SQL queries against registered tables.

PolarsEngine

PolarsEngine(platform: Optional[BasePlatform] = None, storage_options: Optional[Dict[str, str]] = None, *, iceberg_catalog: Optional[Any] = None, sql_context: Optional[SQLContext] = None, **kwargs: Any)

Bases: BaseEngine['pl.LazyFrame']

Polars implementation of :class:BaseEngine bound to pl.LazyFrame.

All reads return lazy frames via scan_* APIs. Materialisation (.collect()) happens only at write boundaries, merge operations, and metric computations.

Parameters:

Name Type Description Default
storage_options Optional[Dict[str, str]]

Cloud storage credentials forwarded to Polars readers and deltalake operations.

None
iceberg_catalog Optional[Any]

A pyiceberg.catalog.Catalog instance for Iceberg table discovery and management.

None
sql_context Optional[SQLContext]

An existing polars.SQLContext to reuse. A new context is created when None (the default).

None
platform Optional[BasePlatform]

Optional platform to attach to this engine immediately.

None

delta property

delta: type

Lazily imported DeltaTable class from the deltalake package.

sql_context property

sql_context: SQLContext

Return the engine's :class:polars.SQLContext.

add_file_info_columns

add_file_info_columns(df: LazyFrame, file_infos: Optional[List[FileInfo]] = None) -> LazyFrame

Map file metadata onto rows using the embedded __file_path column.

The scan readers (read_parquet, read_csv, read_json) embed the source path via include_file_paths / manual injection, so __file_path is already present in df.

When file_infos is provided a small mapping LazyFrame is joined on __file_path to resolve __file_name and __file_modification_time. When file_infos is None the name is derived from the path and modification-time is set to null.

create_dataframe

create_dataframe(records: List[Dict[str, Any]]) -> LazyFrame

Create a Polars LazyFrame from a list of heterogeneous dicts.

pl.from_dicts with infer_schema_length=None scans all records to compute the union schema and fills missing fields with null.

execute_sql

execute_sql(sql: str, parameters: Optional[Dict[Any, Any]] = None) -> LazyFrame

Execute a SQL query against the engine's :class:polars.SQLContext.

Register tables first via :meth:register_table, :meth:register_delta_tables, or :meth:register_iceberg_tables.

generate_symlink_manifest(path: str) -> None

Generate a symlink manifest using delta-rs DeltaTable.generate().

get_hive_schema

get_hive_schema(df: LazyFrame) -> Dict[str, str]

Return {column_name: hive_type} using native Polars dtype objects.

merge_overwrite_to_path

merge_overwrite_to_path(df: LazyFrame, path: str, merge_keys: List[str], fmt: str = 'delta', partition_columns: Optional[List[str]] = None, options: Optional[Dict[str, str]] = None) -> None

Rolling overwrite via MERGE DELETE + APPEND (mirrors SparkEngine).

read_iceberg

read_iceberg(path: str, options: Optional[Dict[str, str]] = None) -> LazyFrame

Read an Iceberg table directly from path.

For catalog-based lookup by table name use :meth:read_table with fmt="iceberg" and an iceberg_catalog configured.

register_delta_tables

register_delta_tables(base_path: str, *, prefix: str = '') -> List[str]

Discover Delta tables under base_path and register them.

Each immediate sub-directory that is a valid Delta table is registered with its folder name as the table name.

Requires :attr:platform to be set.

Parameters:

Name Type Description Default
base_path str

Root directory to scan for Delta tables.

required
prefix str

Optional prefix prepended to every registered table name (e.g. "db1__""db1__orders"). Useful when registering tables from multiple databases into the same SQLContext to avoid name collisions.

''

Returns:

Type Description
List[str]

List of registered table names (including any prefix).

Raises:

Type Description
EngineError

If no platform is attached.

register_iceberg_tables

register_iceberg_tables(namespace: Optional[str] = None, *, base_path: Optional[str] = None, prefix: str = '') -> List[str]

Discover Iceberg tables and register them in the SQLContext.

Discovery strategy (evaluated in order):

  1. Catalog + namespace — if namespace is given and iceberg_catalog is configured, list tables from the catalog (preferred).
  2. Path scan — if base_path is given, scan immediate sub-directories for an Iceberg metadata/ directory and register each valid table.

At least one of namespace or base_path must be provided.

Parameters:

Name Type Description Default
namespace Optional[str]

Catalog namespace to list tables from.

None
base_path Optional[str]

Filesystem path whose sub-directories are scanned for Iceberg tables (fallback when no catalog is configured).

None
prefix str

Optional prefix prepended to every registered table name (e.g. "cat1__""cat1__orders"). Useful when registering tables from multiple catalogs/namespaces into the same SQLContext to avoid name collisions.

''

Returns:

Type Description
List[str]

List of registered table names (including any prefix).

register_table

register_table(name: str, data: Union['pl.LazyFrame', 'pl.DataFrame']) -> None

Register a frame as a named table in the SQLContext.

scd2_to_path

scd2_to_path(df: LazyFrame, path: str, merge_keys: List[str], fmt: str = 'delta', partition_columns: Optional[List[str]] = None, options: Optional[Dict[str, str]] = None) -> None

SCD2 via two-step MERGE + APPEND on a Delta table.

MERGE to close current rows (set __valid_to,

__is_current = false) where source is strictly newer.

Step 2: APPEND all source rows as new versions.

This avoids the staged-UNION pattern (2× data duplication) by splitting close and insert into separate operations — same approach as :meth:merge_overwrite_to_path.

scd2_to_table

scd2_to_table(df: LazyFrame, table_name: str, merge_keys: List[str], fmt: str = 'delta', partition_columns: Optional[List[str]] = None, options: Optional[Dict[str, str]] = None) -> None

SCD2 for named tables (Iceberg two-step, non-atomic).

  1. Upsert to close current matched rows (__valid_to, __is_current = false).
  2. Append all source rows as new versions.

set_iceberg_catalog

set_iceberg_catalog(catalog: Any) -> None

Replace the Iceberg catalog used for Iceberg table operations.

write_to_path

write_to_path(df: LazyFrame, path: str, mode: str, fmt: str, partition_columns: Optional[List[str]] = None, options: Optional[Dict[str, str]] = None) -> None

Write a LazyFrame to path (always treated as a folder).

Behaviour by format:

  • Delta / Iceberg — uses sink_delta (streaming); partition_columns forwarded via delta_write_options.
  • Parquet / CSV / JSONL — uses streaming sink_*. When partition_columns are provided the target is a :class:polars.PartitionBy so data is split by those columns.
  • JSON / Avro — no sink_ available; falls back to eager write_* to a single file (partition_columns not supported).

Write modes:

  • overwrite / full_load — existing files in the folder are removed before writing. File name: {folder_name}.{ext}.
  • append — a new file is added. File name includes a timestamp: {folder_name}_{yyyyMMdd_HHmmss}.{ext}.

spark_session_builder

Spark session creation and default configuration.

Provides :func:get_or_create_spark_session and the DEFAULT_SPARK_CONFIGS dictionary used by :class:SparkEngine.

get_or_create_spark_session

get_or_create_spark_session(app_name: str = 'DataCoolie', config: Optional[Dict[str, str]] = None, existing_session: Optional[SparkSession] = None) -> SparkSession

Get or create a :class:SparkSession with optimised defaults.

In notebook environments (Fabric, Databricks) pass the existing session so only the default configs are applied on top. For standalone usage a fresh session is created.

Callers are responsible for providing Delta/Iceberg catalog, extensions, and JARs via the config dict.

Parameters:

Name Type Description Default
app_name str

Spark application name.

'DataCoolie'
config Optional[Dict[str, str]]

Extra Spark configs that override the defaults.

None
existing_session Optional[SparkSession]

Existing session to re-use.

None

Returns:

Type Description
SparkSession

Configured SparkSession.