Skip to content

Orchestration

driver

Main ETL driver coordinating all framework components.

DataCoolieDriver ties together metadata, watermark, engine, platform, loggers, job distribution, parallel execution, and retry handling through constructor injection.

Typical usage::

driver = create_driver(
    engine=spark_engine,
    platform=local_platform,
    metadata_provider=file_provider,
    job_num=4, job_index=0, max_workers=4,
)
result = driver.run(stage="bronze2silver")

DEFAULT_TRANSFORMERS module-attribute

DEFAULT_TRANSFORMERS: list[str] = ['schema_converter', 'deduplicator', 'scd2_column_adder', 'column_adder', 'system_column_adder', 'partition_handler', 'column_name_sanitizer']

DataCoolieDriver

DataCoolieDriver(engine: BaseEngine, platform: Optional[BasePlatform] = None, metadata_provider: Optional[BaseMetadataProvider] = None, watermark_manager: Optional[BaseWatermarkManager] = None, config: Optional[DataCoolieRunConfig] = None, secret_provider: Optional[BaseSecretProvider] = None, system_logger: Optional[SystemLogger] = None, etl_logger: Optional[ETLLogger] = None, base_log_path: Optional[str] = None, log_config: Optional[LogConfig] = None)

Main orchestration class for DataCoolie ETL pipelines.

Uses constructor injection for all dependencies. Supports:

  • ETL mode — read → transform → write with run or run_dataflow function.
  • Maintenance mode — optimize + vacuum on destinations with run_maintenance function.
  • Dry-run — logs planned work without side effects.
  • Context manager (with) for resource cleanup.

Parameters:

Name Type Description Default
engine BaseEngine

Data operation engine (e.g. PySpark).

required
platform Optional[BasePlatform]

Platform abstraction for file I/O.

None
metadata_provider Optional[BaseMetadataProvider]

Provides dataflow / connection metadata.

None
watermark_manager Optional[BaseWatermarkManager]

Reads and writes watermarks. When None and metadata_provider is supplied, a :class:~datacoolie.watermark. watermark_manager.WatermarkManager is created automatically.

None
config Optional[DataCoolieRunConfig]

Execution parameters (includes job_id).

None
secret_provider Optional[BaseSecretProvider]

Resolves secrets in connection configs. If not provided, the resolved platform is used as the default provider.

None
system_logger Optional[SystemLogger]

Optional system-level logger.

None
etl_logger Optional[ETLLogger]

Optional structured ETL logger.

None
base_log_path Optional[str]

Base directory for auto-created loggers. When provided, SystemLogger and ETLLogger are created under <base_log_path>/system_logs and <base_log_path>/etl_logs. Takes precedence over log_config.output_path.

None
log_config Optional[LogConfig]

Optional :class:LogConfig used as the template for auto-created loggers. If base_log_path is also given it overrides output_path; otherwise log_config.output_path is used as the base directory. All other fields (log_level, storage_mode, partition_by_date, partition_pattern, flush_interval_seconds) are always preserved.

None

close

close() -> None

Close driver and flush logs. Safe to call multiple times.

load_dataflows

load_dataflows(stage: Optional[Union[str, List[str]]] = None, active_only: bool = True, attach_schema_hints: bool = True) -> List[DataFlow]

Load and filter dataflows for this job.

Parameters:

Name Type Description Default
stage Optional[Union[str, List[str]]]

Optional stage filter. Accepts a single name ("bronze2silver"), a comma-separated string ("bronze2silver,silver2gold"), or a list of names.

None
active_only bool

Skip inactive dataflows.

True
attach_schema_hints bool

Attach schema hints from metadata.

True

Returns:

Type Description
List[DataFlow]

Filtered list for this job.

load_maintenance_dataflows

load_maintenance_dataflows(connection: Optional[Union[str, List[str]]] = None, active_only: bool = True) -> List[DataFlow]

Load lakehouse dataflows eligible for maintenance.

Dataflows that share the same physical destination (same catalog-qualified table or storage path) are deduplicated before job distribution so OPTIMIZE / VACUUM runs at most once per destination, avoiding concurrent-write races in fan-in topologies. Only the winning dataflow produces a maintenance log row; covered dataflows are not individually logged.

Parameters:

Name Type Description Default
connection Optional[Union[str, List[str]]]

Optional filter by destination connection id or name. Accepts a single value, a comma-separated string, or a list.

None
active_only bool

Skip inactive dataflows.

True

Returns:

Type Description
List[DataFlow]

Filtered list of unique-destination dataflows for this job.

run

run(stage: Optional[Union[str, List[str]]] = None, dataflows: Optional[List[DataFlow]] = None, column_name_mode: Union[ColumnCaseMode, str] = LOWER) -> ExecutionResult

Alias for :meth:run_dataflow.

run_dataflow

run_dataflow(stage: Optional[Union[str, List[str]]] = None, dataflows: Optional[List[DataFlow]] = None, column_name_mode: Union[ColumnCaseMode, str] = LOWER) -> ExecutionResult

Execute ETL (read → transform → write) for this job.

Loads dataflows from metadata when dataflows is not provided. Can be called multiple times within the same job session (logs accumulate until :meth:close).

Parameters:

Name Type Description Default
stage Optional[Union[str, List[str]]]

Optional stage filter. Accepts a single name ("bronze2silver"), a comma-separated string ("bronze2silver,silver2gold"), or a list of names.

None
dataflows Optional[List[DataFlow]]

Pre-loaded dataflows (skips metadata loading).

None
column_name_mode Union[ColumnCaseMode, str]

Column name case-conversion mode. "lower" (default) lowercases without inserting underscores; "snake" converts to snake_case.

LOWER

Returns:

Type Description
ExecutionResult

Aggregated execution statistics.

run_maintenance

run_maintenance(connection: Optional[Union[str, List[str]]] = None, dataflows: Optional[List[DataFlow]] = None, do_compact: bool = True, do_cleanup: bool = True) -> ExecutionResult

Run maintenance operations (optimize + vacuum).

Only lakehouse destinations (Delta Lake and Iceberg) are eligible.

Parameters:

Name Type Description Default
connection Optional[Union[str, List[str]]]

Optional filter by destination connection id or name. Accepts a single value, a comma-separated string, or a list.

None
dataflows Optional[List[DataFlow]]

Pre-loaded dataflows to maintain. When None, dataflows are fetched from metadata filtered to lakehouse formats only.

None
do_compact bool

Run the compaction (optimize) step.

True
do_cleanup bool

Run the cleanup (vacuum) step.

True

Returns:

Type Description
ExecutionResult

Aggregated execution statistics.

job_distributor

Job distribution logic for parallel execution of dataflows.

JobDistributor assigns dataflows to jobs using either hash-based or group-based distribution, and groups them for sequential execution within a group.

Distribution rules
  • group_number is not Nonegroup_number % job_num == job_index
  • group_number is Nonehash(dataflow_id) % job_num == job_index

Within a group, dataflows are sorted by execution_order.

JobDistributor

JobDistributor(job_num: int = 1, job_index: int = 0)

Distribute dataflows across parallel jobs.

Example::

distributor = JobDistributor(job_num=4, job_index=0)
my_dataflows = distributor.filter_dataflows(all_dataflows)
groups = distributor.group_dataflows(my_dataflows)

filter_dataflows

filter_dataflows(dataflows: List[DataFlow], active_only: bool = True) -> List[DataFlow]

Return the subset of dataflows assigned to this job.

Parameters:

Name Type Description Default
dataflows List[DataFlow]

All available dataflows.

required
active_only bool

Skip inactive dataflows.

True

Returns:

Type Description
List[DataFlow]

Filtered list of dataflows for this job.

get_grouped_dataflows

get_grouped_dataflows(dataflows: List[DataFlow]) -> Dict[int, List[DataFlow]]

Return only grouped dataflows (group_number is not None).

Returns:

Type Description
Dict[int, List[DataFlow]]

Mapping of group_number → sorted list of dataflows.

get_independent_dataflows

get_independent_dataflows(dataflows: List[DataFlow]) -> List[DataFlow]

Return dataflows with no group (group_number is None).

group_dataflows

group_dataflows(dataflows: List[DataFlow]) -> Dict[Optional[int], List[DataFlow]]

Group dataflows by group_number, sorted by execution_order.

Dataflows with group_number = None are placed under the None key and can run independently in parallel.

Returns:

Type Description
Dict[Optional[int], List[DataFlow]]

Mapping of group_number → sorted list of dataflows.

should_process

should_process(dataflow: DataFlow) -> bool

Check whether this job should process the given dataflow.

Uses group_number when set, otherwise hash-based distribution.

parallel_executor

Parallel executor for dataflow and maintenance processing.

ParallelExecutor uses :class:concurrent.futures.ThreadPoolExecutor to run dataflows in parallel, respecting group ordering and stop-on-error semantics.

ExecutionResult dataclass

ExecutionResult(total: int = 0, succeeded: int = 0, failed: int = 0, skipped: int = 0, errors: Dict[str, str] = dict(), duration_seconds: float = 0.0)

Lightweight statistics container for a parallel execution run.

Detailed per-dataflow logs are written by the ETLLogger; this class only tracks counters and errors.

success_rate property

success_rate: float

Percentage of runs that succeeded.

ParallelExecutor

ParallelExecutor(max_workers: Optional[int] = None, stop_on_error: bool = False)

Thread-based parallel executor for dataflows.

Features
  • Configurable max_workers.
  • stop_on_error cancels remaining work on first failure.
  • execute_with_groups respects sequential ordering within groups.

Example::

executor = ParallelExecutor(max_workers=4)
result = executor.execute(dataflows, process_fn)

execute

execute(dataflows: List[DataFlow], process_fn: Callable[[DataFlow], DataFlowRuntimeInfo], callback: Optional[Callable[[DataFlowRuntimeInfo], None]] = None) -> ExecutionResult

Execute dataflows in parallel.

Parameters:

Name Type Description Default
dataflows List[DataFlow]

Items to process.

required
process_fn Callable[[DataFlow], DataFlowRuntimeInfo]

Callable receiving a :class:DataFlow and returning :class:DataFlowRuntimeInfo.

required
callback Optional[Callable[[DataFlowRuntimeInfo], None]]

Optional per-item completion callback.

None

Returns:

Type Description
ExecutionResult

Aggregated execution statistics.

execute_sequential

execute_sequential(dataflows: List[DataFlow], process_fn: Callable[[DataFlow], DataFlowRuntimeInfo], callback: Optional[Callable[[DataFlowRuntimeInfo], None]] = None) -> ExecutionResult

Execute dataflows one by one (for grouped / ordered execution).

execute_with_groups

execute_with_groups(groups: Dict[Optional[int], List[DataFlow]], process_fn: Callable[[DataFlow], DataFlowRuntimeInfo], callback: Optional[Callable[[DataFlowRuntimeInfo], None]] = None) -> ExecutionResult

Execute respecting group ordering.

  • None group → individual items run in parallel.
  • Numbered groups → items within a group run sequentially; different groups run in parallel.

retry_handler

Retry handler with exponential backoff.

RetryHandler wraps a callable and retries it on any exception up to the configured number of attempts. If all attempts fail the last exception is re-raised.

Example::

handler = RetryHandler(retry_count=3, retry_delay=2.0)
result = handler.execute(my_function, arg1, arg2)

RetryHandler

RetryHandler(retry_count: int = 0, retry_delay: float = 5.0, backoff_multiplier: float = 2.0, max_delay: float = 60.0)

Retry logic with configurable exponential backoff.

Parameters:

Name Type Description Default
retry_count int

Maximum number of retries (0 = no retry).

0
retry_delay float

Base delay in seconds between retries.

5.0
backoff_multiplier float

Multiplier applied each retry (exponential).

2.0
max_delay float

Upper cap on computed delay.

60.0

The actual delay for attempt n is::

min(retry_delay * backoff_multiplier ** n, max_delay)

max_attempts property

max_attempts: int

Total attempts = 1 (initial) + retry_count.

compute_delay

compute_delay(attempt: int) -> float

Compute the delay before the given retry attempt (0-based).

Parameters:

Name Type Description Default
attempt int

The retry attempt number (0 = first retry).

required

Returns:

Type Description
float

Sleep duration in seconds.

execute

execute(func: Callable[..., T], *args: Any, **kwargs: Any) -> tuple[T, int]

Execute func with retry logic.

Parameters:

Name Type Description Default
func Callable[..., T]

Callable to execute.

required
*args Any

Positional arguments forwarded to func.

()
**kwargs Any

Keyword arguments forwarded to func.

{}

Returns:

Type Description
T

(result, attempts) — the return value of func and the

int

number of attempts used (1 = first-try success).

Raises:

Type Description
Exception

The last exception once all attempts are exhausted.

utils

Shared helpers for orchestration-layer concerns.

Lives next to :mod:driver, :mod:job_distributor, :mod:parallel_executor, and :mod:retry_handler. Hosts small, reusable utilities that coordinate work across dataflows without pulling logic into the driver or model layer.

dedupe_by_destination

dedupe_by_destination(dataflows: List[DataFlow]) -> List[DataFlow]

Return one dataflow per physical destination.

Multiple dataflows may target the same table or storage path (fan-in topology). When the unit of work is the destination itself — e.g. running OPTIMIZE / VACUUM — executing the same work more than once would race or waste compute. Input is sorted by dataflow_id first so "first-meet wins" is deterministic across runs regardless of metadata ordering.

Dataflows whose :attr:~datacoolie.core.models.Destination.destination_key cannot be computed are skipped with a warning rather than aborting the whole run.