Engines¶
base
¶
Abstract base class for DataFrame engines.
BaseEngine[DF] is a generic ABC parameterised by DataFrame type.
Concrete implementations (Spark, Polars) bind DF to their native
DataFrame class.
Section layout¶
- Construction —
__init__,platform,set_platform - Read —
read_parquet,read_delta,read_iceberg,read_csv,read_json,read_jsonl,read_avro,read_excel,read_path,read_database,create_dataframe,execute_sql,read_table - Write —
write_to_path,write_to_table - Merge —
merge_to_path,merge_overwrite_to_path,merge_to_table,merge_overwrite_to_table - Transform —
add_column,drop_columns,select_columns,rename_column,filter_rows,apply_watermark_filter,deduplicate,deduplicate_by_rank,cast_column - System columns —
add_system_columns,add_file_info_columns,remove_system_columns,convert_timestamp_ntz_to_timestamp - Metrics —
count_rows,is_empty,get_columns,get_schema,get_max_values,get_count_and_max_values - Maintenance —
table_exists_by_path,table_exists_by_name,get_history_by_path,get_history_by_name,compact_by_path,compact_by_name,cleanup_by_path,cleanup_by_name - Navigation (concrete dispatchers) —
read,write,merge,merge_overwrite,exists,get_history,compact,cleanup - SCD Type 2 —
scd2_to_path,scd2_to_table,scd2
BaseEngine
¶
BaseEngine(platform: Optional[BasePlatform] = None)
Bases: ABC, Generic[DF]
Engine abstraction for DataFrame-based read / write / transform.
Type parameter DF is the concrete DataFrame class
(e.g. pyspark.sql.DataFrame or polars.DataFrame).
add_column
abstractmethod
¶
Add (or replace) a column using a SQL expression.
add_file_info_columns
abstractmethod
¶
Add __file_name, __file_path, __file_modification_time.
Engines that embed the source path at scan time (e.g. Polars via
include_file_paths) map file_infos onto the already-present
__file_path column to inject __file_name and
__file_modification_time. Engines with native metadata support
(e.g. Spark _metadata) ignore file_infos entirely.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
df
|
DF
|
Input DataFrame — for Polars this must already contain the
|
required |
file_infos
|
Optional[List[FileInfo]]
|
Ordered list of :class: |
None
|
add_system_columns
abstractmethod
¶
Add __created_at, __updated_at, __updated_by.
apply_watermark_filter
abstractmethod
¶
Filter rows where any watermark column exceeds its stored value.
Builds an OR condition: col1 > val1 OR col2 > val2 ...
using native DataFrame API.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
df
|
DF
|
Input DataFrame. |
required |
watermark_columns
|
List[str]
|
Column names to compare. |
required |
watermark
|
Dict[str, Any]
|
|
required |
Returns:
| Type | Description |
|---|---|
DF
|
Filtered DataFrame. |
cast_column
abstractmethod
¶
Cast a column to target_type, optionally using fmt.
cleanup
¶
cleanup(*, table_name: Optional[str] = None, path: Optional[str] = None, retention_hours: int = 168, fmt: str = 'delta', options: Optional[Dict[str, Any]] = None) -> None
Run cleanup on a table name (preferred) or a path.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
table_name
|
Optional[str]
|
Fully qualified table name. Takes precedence over path. |
None
|
path
|
Optional[str]
|
Table file path (used when table_name is not given). |
None
|
retention_hours
|
int
|
Minimum age in hours of files to retain. |
168
|
fmt
|
str
|
Table format ( |
'delta'
|
options
|
Optional[Dict[str, Any]]
|
Format-specific options for cleanup sub-operations. |
None
|
Raises:
| Type | Description |
|---|---|
EngineError
|
If neither table_name nor path is provided. |
cleanup_by_name
abstractmethod
¶
cleanup_by_name(table_name: str, retention_hours: int = 168, *, fmt: str = 'delta', options: Optional[Dict[str, Any]] = None) -> None
Run cleanup (VACUUM or equivalent) on a named table.
cleanup_by_path
abstractmethod
¶
cleanup_by_path(path: str, retention_hours: int = 168, *, fmt: str = 'delta', options: Optional[Dict[str, Any]] = None) -> None
Run cleanup (VACUUM or equivalent) on the table at path.
compact
¶
compact(*, table_name: Optional[str] = None, path: Optional[str] = None, fmt: str = 'delta', options: Optional[Dict[str, Any]] = None) -> None
Run compaction on a table name (preferred) or a path.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
table_name
|
Optional[str]
|
Fully qualified table name. Takes precedence over path. |
None
|
path
|
Optional[str]
|
Table file path (used when table_name is not given). |
None
|
fmt
|
str
|
Table format ( |
'delta'
|
options
|
Optional[Dict[str, Any]]
|
Format-specific options for compaction sub-operations. |
None
|
Raises:
| Type | Description |
|---|---|
EngineError
|
If neither table_name nor path is provided. |
compact_by_name
abstractmethod
¶
compact_by_name(table_name: str, *, fmt: str = 'delta', options: Optional[Dict[str, Any]] = None) -> None
Run compaction (OPTIMIZE or equivalent) on a named table.
compact_by_path
abstractmethod
¶
Run compaction (OPTIMIZE or equivalent) on the table at path.
convert_timestamp_ntz_to_timestamp
abstractmethod
¶
Convert all timestamp_ntz (no-TZ) columns to TZ-aware UTC timestamps.
create_dataframe
abstractmethod
¶
create_dataframe(records: List[Dict[str, Any]]) -> DF
Create a DataFrame from a list of dicts.
Records may have different keys (heterogeneous schema). Missing
fields are filled with null so that the resulting DataFrame
has the union of all keys as its columns.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
records
|
List[Dict[str, Any]]
|
List of flat dicts. Values may be |
required |
Returns:
| Type | Description |
|---|---|
DF
|
A DataFrame with one row per record and one column per |
DF
|
distinct key across all records. |
deduplicate
abstractmethod
¶
deduplicate(df: DF, partition_columns: List[str], order_columns: Optional[List[str]] = None, order: str = 'desc') -> DF
Remove duplicate rows.
Uses ROW_NUMBER() window partitioned by partition_columns
and ordered by order_columns.
deduplicate_by_rank
abstractmethod
¶
deduplicate_by_rank(df: DF, partition_columns: List[str], order_columns: List[str], order: str = 'desc') -> DF
Deduplicate using RANK(), keeping ties.
drop_columns
abstractmethod
¶
Drop one or more columns from a DataFrame.
execute_sql
abstractmethod
¶
execute_sql(sql: str, parameters: Optional[Dict[Any, Any]] = None) -> DF
Execute a SQL query and return a DataFrame.
exists
¶
Return True if the target table or path exists.
Checks by table name first (via :meth:table_exists_by_name),
then by path (via :meth:table_exists_by_path). Returns False if
neither is provided.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
table_name
|
Optional[str]
|
Fully qualified table name. |
None
|
path
|
Optional[str]
|
Table file path. |
None
|
filter_rows
abstractmethod
¶
Filter rows using a SQL expression.
generate_symlink_manifest
abstractmethod
¶
Generate a symlink-format manifest for the Delta table at path.
Creates the _symlink_format_manifest/ directory alongside the
Delta log, enabling query engines that don't natively support Delta
(e.g. Redshift Spectrum) to read the table via
SymlinkTextInputFormat.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str
|
Root path of the Delta table. |
required |
get_count_and_max_values
abstractmethod
¶
get_count_and_max_values(df: DF, columns: List[str]) -> Tuple[int, Dict[str, Any]]
Return (row_count, {column: max_value}) in one pass.
get_history
¶
get_history(*, table_name: Optional[str] = None, path: Optional[str] = None, limit: int = 1, start_time: Optional[datetime] = None, end_time: Optional[datetime] = None, fmt: str = 'delta') -> List[Dict[str, Any]]
Return audit history for a table name (preferred) or a path.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
table_name
|
Optional[str]
|
Fully qualified table name. Takes precedence over path. |
None
|
path
|
Optional[str]
|
Table file path (used when table_name is not given). |
None
|
limit
|
int
|
Maximum number of history entries to return. |
1
|
start_time
|
Optional[datetime]
|
If provided, only return entries whose timestamp is
|
None
|
end_time
|
Optional[datetime]
|
If provided, only return entries whose timestamp is <= end_time. |
None
|
fmt
|
str
|
Table format ( |
'delta'
|
Raises:
| Type | Description |
|---|---|
EngineError
|
If neither table_name nor path is provided. |
get_history_by_name
abstractmethod
¶
get_history_by_name(table_name: str, limit: int = 1, start_time: Optional[datetime] = None, *, end_time: Optional[datetime] = None, fmt: str = 'delta') -> List[Dict[str, Any]]
Return audit history entries for a named table.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
table_name
|
str
|
Fully qualified table name. |
required |
limit
|
int
|
Maximum number of history entries to return. |
1
|
start_time
|
Optional[datetime]
|
If provided, only return entries whose
|
None
|
end_time
|
Optional[datetime]
|
If provided, only return entries whose
|
None
|
fmt
|
str
|
Table format ( |
'delta'
|
get_history_by_path
abstractmethod
¶
get_history_by_path(path: str, limit: int = 1, start_time: Optional[datetime] = None, *, end_time: Optional[datetime] = None, fmt: str = 'delta') -> List[Dict[str, Any]]
Return audit history entries for the table at path.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str
|
Table location. |
required |
limit
|
int
|
Maximum number of history entries to return. |
1
|
start_time
|
Optional[datetime]
|
If provided, only return entries whose
|
None
|
end_time
|
Optional[datetime]
|
If provided, only return entries whose
|
None
|
fmt
|
str
|
Table format ( |
'delta'
|
get_hive_schema
abstractmethod
¶
get_hive_schema(df: DF) -> Dict[str, str]
Return {column_name: hive_type_string} using native type objects.
Each engine inspects its native DataFrame schema (e.g.
pyspark.sql.types, polars.datatypes) to produce
Hive/Athena-compatible DDL type strings directly — no string
parsing required.
get_max_values
abstractmethod
¶
get_max_values(df: DF, columns: List[str]) -> Dict[str, Any]
Return the maximum value for each of columns.
get_schema
abstractmethod
¶
get_schema(df: DF) -> Dict[str, str]
Return {column_name: data_type_string}.
merge
¶
merge(df: DF, *, table_name: Optional[str] = None, path: Optional[str] = None, merge_keys: List[str], fmt: str = 'delta', partition_columns: Optional[List[str]] = None, options: Optional[Dict[str, str]] = None) -> None
Merge (upsert) df into a table name (preferred) or a path.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
df
|
DF
|
Source DataFrame. |
required |
table_name
|
Optional[str]
|
Target table name. Takes precedence over path. |
None
|
path
|
Optional[str]
|
Target file path (used when table_name is not given). |
None
|
merge_keys
|
List[str]
|
Join columns for the merge condition. |
required |
fmt
|
str
|
Table / file format. |
'delta'
|
partition_columns
|
Optional[List[str]]
|
Optional partition column names. |
None
|
options
|
Optional[Dict[str, str]]
|
Additional options. |
None
|
Raises:
| Type | Description |
|---|---|
EngineError
|
If neither table_name nor path is provided. |
merge_overwrite
¶
merge_overwrite(df: DF, *, table_name: Optional[str] = None, path: Optional[str] = None, merge_keys: List[str], fmt: str = 'delta', partition_columns: Optional[List[str]] = None, options: Optional[Dict[str, str]] = None) -> None
Merge-overwrite (delete + append) into a table name (preferred) or a path.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
df
|
DF
|
Source DataFrame. |
required |
table_name
|
Optional[str]
|
Target table name. Takes precedence over path. |
None
|
path
|
Optional[str]
|
Target file path (used when table_name is not given). |
None
|
merge_keys
|
List[str]
|
Join columns for the delete condition. |
required |
fmt
|
str
|
Table / file format. |
'delta'
|
partition_columns
|
Optional[List[str]]
|
Optional partition column names. |
None
|
options
|
Optional[Dict[str, str]]
|
Additional options. |
None
|
Raises:
| Type | Description |
|---|---|
EngineError
|
If neither table_name nor path is provided. |
merge_overwrite_to_path
abstractmethod
¶
merge_overwrite_to_path(df: DF, path: str, merge_keys: List[str], fmt: str = 'delta', partition_columns: Optional[List[str]] = None, options: Optional[Dict[str, str]] = None) -> None
Merge with full-row overwrite (rolling overwrite / SCD1-style).
Existing rows matching merge keys are deleted and re-inserted from the source DataFrame.
merge_overwrite_to_table
abstractmethod
¶
merge_overwrite_to_table(df: DF, table_name: str, merge_keys: List[str], fmt: str = 'delta', partition_columns: Optional[List[str]] = None, options: Optional[Dict[str, str]] = None) -> None
Merge-overwrite (delete + append) a named table via SQL MERGE + WriterV2.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
df
|
DF
|
Source DataFrame. |
required |
table_name
|
str
|
Fully qualified target table name. |
required |
merge_keys
|
List[str]
|
Join columns for the delete condition. |
required |
fmt
|
str
|
Table format. |
'delta'
|
partition_columns
|
Optional[List[str]]
|
Optional partition column names. |
None
|
options
|
Optional[Dict[str, str]]
|
Additional options. |
None
|
merge_to_path
abstractmethod
¶
merge_to_path(df: DF, path: str, merge_keys: List[str], fmt: str = 'delta', partition_columns: Optional[List[str]] = None, options: Optional[Dict[str, str]] = None) -> None
Merge (upsert) a DataFrame into a table at path.
merge_to_table
abstractmethod
¶
merge_to_table(df: DF, table_name: str, merge_keys: List[str], fmt: str, partition_columns: Optional[List[str]] = None, options: Optional[Dict[str, str]] = None) -> None
Merge (upsert) a DataFrame into a named table via SQL MERGE.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
df
|
DF
|
Source DataFrame. |
required |
table_name
|
str
|
Fully qualified target table name. |
required |
merge_keys
|
List[str]
|
Join columns for the merge condition. |
required |
fmt
|
str
|
Table format. |
required |
partition_columns
|
Optional[List[str]]
|
Optional partition column names. |
None
|
options
|
Optional[Dict[str, str]]
|
Additional options. |
None
|
read
¶
read(fmt: str, *, table_name: Optional[str] = None, path: Optional[str | list[str]] = None, options: Optional[Dict[str, str]] = None) -> DF
Read data by table name (preferred) or by path.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
fmt
|
str
|
Format string — |
required |
table_name
|
Optional[str]
|
Fully qualified table name. Takes precedence over path. |
None
|
path
|
Optional[str | list[str]]
|
File or directory path (used when table_name is not given). |
None
|
options
|
Optional[Dict[str, str]]
|
Additional reader options. |
None
|
Raises:
| Type | Description |
|---|---|
EngineError
|
If neither table_name nor path is provided, or if the format has no path reader. |
read_avro
abstractmethod
¶
read_avro(path: str | list[str], options: Optional[Dict[str, str]] = None) -> DF
Read Avro file(s) into a DataFrame.
read_csv
abstractmethod
¶
read_csv(path: str | list[str], options: Optional[Dict[str, str]] = None) -> DF
Read CSV file(s) into a DataFrame.
read_database
abstractmethod
¶
read_database(*, table: Optional[str] = None, query: Optional[str] = None, options: Optional[Dict[str, Any]] = None) -> DF
Read from an external database.
Exactly one of table or query must be provided.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
table
|
Optional[str]
|
Plain table or view name (e.g. |
None
|
query
|
Optional[str]
|
Full SQL query (e.g. |
None
|
options
|
Optional[Dict[str, Any]]
|
All connection and reader settings in one flat dict.
Must include at minimum |
None
|
In Spark this maps to spark.read.format("jdbc").option(...).load().
Use this for reading from external databases. For running
queries in the engine's own SQL context, use :meth:execute_sql.
read_delta
abstractmethod
¶
read_delta(path: str, options: Optional[Dict[str, str]] = None) -> DF
Read a Delta Lake table into a DataFrame.
read_excel
abstractmethod
¶
read_excel(path: str | list[str], options: Optional[Dict[str, str]] = None) -> DF
Read Excel file(s) into a DataFrame.
read_iceberg
abstractmethod
¶
read_iceberg(path: str, options: Optional[Dict[str, str]] = None) -> DF
Read an Iceberg table into a DataFrame.
read_json
abstractmethod
¶
read_json(path: str | list[str], options: Optional[Dict[str, str]] = None) -> DF
Read JSON file(s) into a DataFrame.
read_jsonl
abstractmethod
¶
read_jsonl(path: str | list[str], options: Optional[Dict[str, str]] = None) -> DF
Read JSONL (newline-delimited JSON) file(s) into a DataFrame.
read_parquet
abstractmethod
¶
read_parquet(path: str | list[str], options: Optional[Dict[str, str]] = None) -> DF
Read Parquet file(s) into a DataFrame.
read_path
abstractmethod
¶
read_path(path: str | list[str], fmt: str, options: Optional[Dict[str, str]] = None) -> DF
Read a file or directory at path using the given format.
This is the unified path-based reader. Implementations should
dispatch to the appropriate format reader (read_delta,
read_parquet, etc.) based on fmt.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str | list[str]
|
File or directory path (scalar or list for multi-path reads). |
required |
fmt
|
str
|
Format string — |
required |
options
|
Optional[Dict[str, str]]
|
Additional reader options. |
None
|
Raises:
| Type | Description |
|---|---|
EngineError
|
If the format is not supported. |
read_table
abstractmethod
¶
read_table(table_name: str, fmt: str = 'delta', options: Optional[Dict[str, str]] = None) -> DF
Read a named table and return a DataFrame.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
table_name
|
str
|
Fully qualified table name
(e.g. |
required |
fmt
|
str
|
Table format ( |
'delta'
|
options
|
Optional[Dict[str, str]]
|
Additional read options. |
None
|
remove_system_columns
¶
Drop system columns (__created_at, etc.).
This is a concrete convenience method that delegates to
:meth:drop_columns.
rename_column
abstractmethod
¶
Rename a column.
scd2
¶
scd2(df: DF, *, table_name: Optional[str] = None, path: Optional[str] = None, merge_keys: List[str], fmt: str = 'delta', partition_columns: Optional[List[str]] = None, options: Optional[Dict[str, str]] = None) -> None
SCD Type 2 write to a table name (preferred) or a path.
scd2_to_path
¶
scd2_to_path(df: DF, path: str, merge_keys: List[str], fmt: str = 'delta', partition_columns: Optional[List[str]] = None, options: Optional[Dict[str, str]] = None) -> None
SCD Type 2 write to a path-based table.
Tracks historical changes by closing existing current records
(setting scd2_end_date, scd2_is_current = false) and
inserting new versions with scd2_start_date,
scd2_is_current = true.
scd2_to_table
¶
scd2_to_table(df: DF, table_name: str, merge_keys: List[str], fmt: str = 'delta', partition_columns: Optional[List[str]] = None, options: Optional[Dict[str, str]] = None) -> None
SCD Type 2 write to a named table.
select_columns
abstractmethod
¶
Return df with only the given columns, in the specified order.
table_exists_by_name
abstractmethod
¶
Return True if a named table exists in the catalog.
table_exists_by_path
abstractmethod
¶
Return True if a table exists at path.
write
¶
write(df: DF, *, table_name: Optional[str] = None, path: Optional[str] = None, mode: str, fmt: str, partition_columns: Optional[List[str]] = None, options: Optional[Dict[str, str]] = None) -> None
Write df to a table name (preferred) or to a path.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
df
|
DF
|
DataFrame to write. |
required |
table_name
|
Optional[str]
|
Target table name. Takes precedence over path. |
None
|
path
|
Optional[str]
|
Target file path (used when table_name is not given). |
None
|
mode
|
str
|
Write mode ( |
required |
fmt
|
str
|
Table / file format. |
required |
partition_columns
|
Optional[List[str]]
|
Optional partition column names. |
None
|
options
|
Optional[Dict[str, str]]
|
Additional writer options. |
None
|
Raises:
| Type | Description |
|---|---|
EngineError
|
If neither table_name nor path is provided. |
write_to_path
abstractmethod
¶
write_to_path(df: DF, path: str, mode: str, fmt: str, partition_columns: Optional[List[str]] = None, options: Optional[Dict[str, str]] = None) -> None
Write a DataFrame to a path.
write_to_table
abstractmethod
¶
write_to_table(df: DF, table_name: str, mode: str, fmt: str, partition_columns: Optional[List[str]] = None, options: Optional[Dict[str, str]] = None) -> None
Write a DataFrame to a named table using DataFrameWriterV2.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
df
|
DF
|
DataFrame to write. |
required |
table_name
|
str
|
Fully qualified table name (e.g. |
required |
mode
|
str
|
Write mode — |
required |
fmt
|
str
|
Table format ( |
required |
partition_columns
|
Optional[List[str]]
|
Optional partition column names. |
None
|
options
|
Optional[Dict[str, str]]
|
Additional writer options. |
None
|
spark_engine
¶
PySpark + Delta Lake engine implementation.
:class:SparkEngine binds :class:BaseEngine to
pyspark.sql.DataFrame and implements every abstract method using
the Spark DataFrame API and the delta-spark library.
SparkEngine
¶
SparkEngine(spark_session: Optional[SparkSession] = None, config: Optional[Dict[str, str]] = None, platform: Optional[BasePlatform] = None)
Bases: BaseEngine[DataFrame]
PySpark implementation of :class:BaseEngine.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
spark_session
|
Optional[SparkSession]
|
Existing |
None
|
config
|
Optional[Dict[str, str]]
|
Extra Spark configuration overrides applied on top of
:data: |
None
|
platform
|
Optional[BasePlatform]
|
Optional platform to attach to this engine immediately. |
None
|
spark_major_version
property
¶
Return the major version of the Spark runtime (e.g. 3 or 4).
create_dataframe
¶
Create a DataFrame from a list of heterogeneous dicts.
Spark's createDataFrame infers a merged schema automatically
when samplingRatio=1.0 is used, filling missing fields with
null. An explicit union of all keys is pre-computed so that
every row dict contains all columns before passing to Spark,
ensuring consistent schema inference even for small datasets.
generate_symlink_manifest
¶
Generate a symlink manifest using Spark's DeltaTable API.
get_hive_schema
¶
Return {column_name: hive_type} using native PySpark type objects.
merge_overwrite_to_path
¶
merge_overwrite_to_path(df: DataFrame, path: str, merge_keys: List[str], fmt: str = 'delta', partition_columns: Optional[List[str]] = None, options: Optional[Dict[str, str]] = None) -> None
Rolling overwrite via DeltaTable MERGE DELETE + APPEND.
merge_overwrite_to_table
¶
merge_overwrite_to_table(df: DataFrame, table_name: str, merge_keys: List[str], fmt: str = 'delta', partition_columns: Optional[List[str]] = None, options: Optional[Dict[str, str]] = None) -> None
Merge-overwrite (delete + append) for a named table.
Uses the DataFrame mergeInto API on Spark >= 4.0, otherwise
falls back to SQL MERGE DELETE + WriterV2 append. Iceberg
schema evolution runs once up-front so both steps observe the
updated schema.
merge_to_table
¶
merge_to_table(df: DataFrame, table_name: str, merge_keys: List[str], fmt: str, partition_columns: Optional[List[str]] = None, options: Optional[Dict[str, str]] = None) -> None
Merge (upsert) into a named table.
Uses the DataFrame mergeInto API on Spark >= 4.0, otherwise
falls back to SQL MERGE. For Iceberg, schema + partition spec
are evolved up-front (metadata-only) so new columns are available
before the MERGE runs. MERGE uses explicit column names, so no
DataFrame reorder is required.
scd2_to_path
¶
scd2_to_path(df: DataFrame, path: str, merge_keys: List[str], fmt: str = 'delta', partition_columns: Optional[List[str]] = None, options: Optional[Dict[str, str]] = None) -> None
SCD2 via two-step MERGE + APPEND on a path-based Delta table.
Step 1: MERGE to close current rows where source is strictly newer. Step 2: APPEND all source rows as new versions.
scd2_to_table
¶
scd2_to_table(df: DataFrame, table_name: str, merge_keys: List[str], fmt: str = 'delta', partition_columns: Optional[List[str]] = None, options: Optional[Dict[str, str]] = None) -> None
SCD2 via two-step MERGE + APPEND on a named table.
write_to_table
¶
write_to_table(df: DataFrame, table_name: str, mode: str, fmt: str, partition_columns: Optional[List[str]] = None, options: Optional[Dict[str, str]] = None, _skip_iceberg_evolution: bool = False) -> None
Write via DataFrameWriterV2 (df.writeTo).
Supported mode values:
LoadType.OVERWRITE/LoadType.FULL_LOAD— create or replace.LoadType.APPEND— append rows. When the target table does not yet exist, behaves as create-or-replace.
Column matching between DataFrame and existing table is case-insensitive, consistent with Spark's default catalog resolver.
polars_engine
¶
Polars + Delta Lake + Iceberg engine implementation.
:class:PolarsEngine binds :class:BaseEngine to
polars.LazyFrame and implements every abstract method using
the Polars LazyFrame API, the deltalake library, and optional
pyiceberg integration.
Reads return lazy frames via scan_* APIs; materialisation
(.collect()) happens only at write / merge / metrics boundaries.
Catalog support
- Iceberg Catalog via
iceberg_catalog(apyiceberg.catalog.Cataloginstance). - SQLContext for executing SQL queries against registered tables.
PolarsEngine
¶
PolarsEngine(platform: Optional[BasePlatform] = None, storage_options: Optional[Dict[str, str]] = None, *, iceberg_catalog: Optional[Any] = None, sql_context: Optional[SQLContext] = None, **kwargs: Any)
Bases: BaseEngine['pl.LazyFrame']
Polars implementation of :class:BaseEngine bound to pl.LazyFrame.
All reads return lazy frames via scan_* APIs. Materialisation
(.collect()) happens only at write boundaries, merge operations,
and metric computations.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
storage_options
|
Optional[Dict[str, str]]
|
Cloud storage credentials forwarded to
Polars readers and |
None
|
iceberg_catalog
|
Optional[Any]
|
A |
None
|
sql_context
|
Optional[SQLContext]
|
An existing |
None
|
platform
|
Optional[BasePlatform]
|
Optional platform to attach to this engine immediately. |
None
|
add_file_info_columns
¶
add_file_info_columns(df: LazyFrame, file_infos: Optional[List[FileInfo]] = None) -> LazyFrame
Map file metadata onto rows using the embedded __file_path column.
The scan readers (read_parquet, read_csv, read_json) embed
the source path via include_file_paths / manual injection, so
__file_path is already present in df.
When file_infos is provided a small mapping LazyFrame is joined on
__file_path to resolve __file_name and
__file_modification_time. When file_infos is None the name
is derived from the path and modification-time is set to null.
create_dataframe
¶
Create a Polars LazyFrame from a list of heterogeneous dicts.
pl.from_dicts with infer_schema_length=None scans all
records to compute the union schema and fills missing fields
with null.
execute_sql
¶
Execute a SQL query against the engine's :class:polars.SQLContext.
Register tables first via :meth:register_table,
:meth:register_delta_tables, or :meth:register_iceberg_tables.
generate_symlink_manifest
¶
Generate a symlink manifest using delta-rs DeltaTable.generate().
get_hive_schema
¶
Return {column_name: hive_type} using native Polars dtype objects.
merge_overwrite_to_path
¶
merge_overwrite_to_path(df: LazyFrame, path: str, merge_keys: List[str], fmt: str = 'delta', partition_columns: Optional[List[str]] = None, options: Optional[Dict[str, str]] = None) -> None
Rolling overwrite via MERGE DELETE + APPEND (mirrors SparkEngine).
read_iceberg
¶
Read an Iceberg table directly from path.
For catalog-based lookup by table name use :meth:read_table with
fmt="iceberg" and an iceberg_catalog configured.
register_delta_tables
¶
Discover Delta tables under base_path and register them.
Each immediate sub-directory that is a valid Delta table is registered with its folder name as the table name.
Requires :attr:platform to be set.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
base_path
|
str
|
Root directory to scan for Delta tables. |
required |
prefix
|
str
|
Optional prefix prepended to every registered table name
(e.g. |
''
|
Returns:
| Type | Description |
|---|---|
List[str]
|
List of registered table names (including any prefix). |
Raises:
| Type | Description |
|---|---|
EngineError
|
If no platform is attached. |
register_iceberg_tables
¶
register_iceberg_tables(namespace: Optional[str] = None, *, base_path: Optional[str] = None, prefix: str = '') -> List[str]
Discover Iceberg tables and register them in the SQLContext.
Discovery strategy (evaluated in order):
- Catalog + namespace — if namespace is given and
iceberg_catalogis configured, list tables from the catalog (preferred). - Path scan — if base_path is given, scan immediate
sub-directories for an Iceberg
metadata/directory and register each valid table.
At least one of namespace or base_path must be provided.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
namespace
|
Optional[str]
|
Catalog namespace to list tables from. |
None
|
base_path
|
Optional[str]
|
Filesystem path whose sub-directories are scanned for Iceberg tables (fallback when no catalog is configured). |
None
|
prefix
|
str
|
Optional prefix prepended to every registered table name
(e.g. |
''
|
Returns:
| Type | Description |
|---|---|
List[str]
|
List of registered table names (including any prefix). |
register_table
¶
Register a frame as a named table in the SQLContext.
scd2_to_path
¶
scd2_to_path(df: LazyFrame, path: str, merge_keys: List[str], fmt: str = 'delta', partition_columns: Optional[List[str]] = None, options: Optional[Dict[str, str]] = None) -> None
SCD2 via two-step MERGE + APPEND on a Delta table.
MERGE to close current rows (set __valid_to,
__is_current = false) where source is strictly newer.
Step 2: APPEND all source rows as new versions.
This avoids the staged-UNION pattern (2× data duplication) by
splitting close and insert into separate operations — same
approach as :meth:merge_overwrite_to_path.
scd2_to_table
¶
scd2_to_table(df: LazyFrame, table_name: str, merge_keys: List[str], fmt: str = 'delta', partition_columns: Optional[List[str]] = None, options: Optional[Dict[str, str]] = None) -> None
SCD2 for named tables (Iceberg two-step, non-atomic).
- Upsert to close current matched rows (
__valid_to,__is_current = false). - Append all source rows as new versions.
set_iceberg_catalog
¶
Replace the Iceberg catalog used for Iceberg table operations.
write_to_path
¶
write_to_path(df: LazyFrame, path: str, mode: str, fmt: str, partition_columns: Optional[List[str]] = None, options: Optional[Dict[str, str]] = None) -> None
Write a LazyFrame to path (always treated as a folder).
Behaviour by format:
- Delta / Iceberg — uses
sink_delta(streaming);partition_columnsforwarded viadelta_write_options. - Parquet / CSV / JSONL — uses streaming
sink_*. When partition_columns are provided the target is a :class:polars.PartitionByso data is split by those columns. - JSON / Avro — no
sink_available; falls back to eagerwrite_*to a single file (partition_columns not supported).
Write modes:
- overwrite / full_load — existing files in the folder are
removed before writing. File name:
{folder_name}.{ext}. - append — a new file is added. File name includes a
timestamp:
{folder_name}_{yyyyMMdd_HHmmss}.{ext}.
spark_session_builder
¶
Spark session creation and default configuration.
Provides :func:get_or_create_spark_session and the
DEFAULT_SPARK_CONFIGS dictionary used by :class:SparkEngine.
get_or_create_spark_session
¶
get_or_create_spark_session(app_name: str = 'DataCoolie', config: Optional[Dict[str, str]] = None, existing_session: Optional[SparkSession] = None) -> SparkSession
Get or create a :class:SparkSession with optimised defaults.
In notebook environments (Fabric, Databricks) pass the existing session so only the default configs are applied on top. For standalone usage a fresh session is created.
Callers are responsible for providing Delta/Iceberg catalog,
extensions, and JARs via the config dict.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
app_name
|
str
|
Spark application name. |
'DataCoolie'
|
config
|
Optional[Dict[str, str]]
|
Extra Spark configs that override the defaults. |
None
|
existing_session
|
Optional[SparkSession]
|
Existing session to re-use. |
None
|
Returns:
| Type | Description |
|---|---|
SparkSession
|
Configured |