Orchestration¶
driver
¶
Main ETL driver coordinating all framework components.
DataCoolieDriver ties together metadata, watermark, engine, platform, loggers,
job distribution, parallel execution, and retry handling through constructor
injection.
Typical usage::
driver = create_driver(
engine=spark_engine,
platform=local_platform,
metadata_provider=file_provider,
job_num=4, job_index=0, max_workers=4,
)
result = driver.run(stage="bronze2silver")
DEFAULT_TRANSFORMERS
module-attribute
¶
DEFAULT_TRANSFORMERS: list[str] = ['schema_converter', 'deduplicator', 'scd2_column_adder', 'column_adder', 'system_column_adder', 'partition_handler', 'column_name_sanitizer']
DataCoolieDriver
¶
DataCoolieDriver(engine: BaseEngine, platform: Optional[BasePlatform] = None, metadata_provider: Optional[BaseMetadataProvider] = None, watermark_manager: Optional[BaseWatermarkManager] = None, config: Optional[DataCoolieRunConfig] = None, secret_provider: Optional[BaseSecretProvider] = None, system_logger: Optional[SystemLogger] = None, etl_logger: Optional[ETLLogger] = None, base_log_path: Optional[str] = None, log_config: Optional[LogConfig] = None)
Main orchestration class for DataCoolie ETL pipelines.
Uses constructor injection for all dependencies. Supports:
- ETL mode — read → transform → write with
runorrun_dataflowfunction. - Maintenance mode — optimize + vacuum on destinations with
run_maintenancefunction. - Dry-run — logs planned work without side effects.
- Context manager (
with) for resource cleanup.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
engine
|
BaseEngine
|
Data operation engine (e.g. PySpark). |
required |
platform
|
Optional[BasePlatform]
|
Platform abstraction for file I/O. |
None
|
metadata_provider
|
Optional[BaseMetadataProvider]
|
Provides dataflow / connection metadata. |
None
|
watermark_manager
|
Optional[BaseWatermarkManager]
|
Reads and writes watermarks. When |
None
|
config
|
Optional[DataCoolieRunConfig]
|
Execution parameters (includes |
None
|
secret_provider
|
Optional[BaseSecretProvider]
|
Resolves secrets in connection configs. If not provided, the resolved platform is used as the default provider. |
None
|
system_logger
|
Optional[SystemLogger]
|
Optional system-level logger. |
None
|
etl_logger
|
Optional[ETLLogger]
|
Optional structured ETL logger. |
None
|
base_log_path
|
Optional[str]
|
Base directory for auto-created loggers. When
provided, |
None
|
log_config
|
Optional[LogConfig]
|
Optional :class: |
None
|
load_dataflows
¶
load_dataflows(stage: Optional[Union[str, List[str]]] = None, active_only: bool = True, attach_schema_hints: bool = True) -> List[DataFlow]
Load and filter dataflows for this job.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
stage
|
Optional[Union[str, List[str]]]
|
Optional stage filter. Accepts a single name
( |
None
|
active_only
|
bool
|
Skip inactive dataflows. |
True
|
attach_schema_hints
|
bool
|
Attach schema hints from metadata. |
True
|
Returns:
| Type | Description |
|---|---|
List[DataFlow]
|
Filtered list for this job. |
load_maintenance_dataflows
¶
load_maintenance_dataflows(connection: Optional[Union[str, List[str]]] = None, active_only: bool = True) -> List[DataFlow]
Load lakehouse dataflows eligible for maintenance.
Dataflows that share the same physical destination (same
catalog-qualified table or storage path) are deduplicated
before job distribution so OPTIMIZE / VACUUM runs
at most once per destination, avoiding concurrent-write
races in fan-in topologies. Only the winning dataflow
produces a maintenance log row; covered dataflows are not
individually logged.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
connection
|
Optional[Union[str, List[str]]]
|
Optional filter by destination connection id or name. Accepts a single value, a comma-separated string, or a list. |
None
|
active_only
|
bool
|
Skip inactive dataflows. |
True
|
Returns:
| Type | Description |
|---|---|
List[DataFlow]
|
Filtered list of unique-destination dataflows for this job. |
run
¶
run(stage: Optional[Union[str, List[str]]] = None, dataflows: Optional[List[DataFlow]] = None, column_name_mode: Union[ColumnCaseMode, str] = LOWER) -> ExecutionResult
Alias for :meth:run_dataflow.
run_dataflow
¶
run_dataflow(stage: Optional[Union[str, List[str]]] = None, dataflows: Optional[List[DataFlow]] = None, column_name_mode: Union[ColumnCaseMode, str] = LOWER) -> ExecutionResult
Execute ETL (read → transform → write) for this job.
Loads dataflows from metadata when dataflows is not provided.
Can be called multiple times within the same job session
(logs accumulate until :meth:close).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
stage
|
Optional[Union[str, List[str]]]
|
Optional stage filter. Accepts a single name
( |
None
|
dataflows
|
Optional[List[DataFlow]]
|
Pre-loaded dataflows (skips metadata loading). |
None
|
column_name_mode
|
Union[ColumnCaseMode, str]
|
Column name case-conversion mode.
|
LOWER
|
Returns:
| Type | Description |
|---|---|
ExecutionResult
|
Aggregated execution statistics. |
run_maintenance
¶
run_maintenance(connection: Optional[Union[str, List[str]]] = None, dataflows: Optional[List[DataFlow]] = None, do_compact: bool = True, do_cleanup: bool = True) -> ExecutionResult
Run maintenance operations (optimize + vacuum).
Only lakehouse destinations (Delta Lake and Iceberg) are eligible.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
connection
|
Optional[Union[str, List[str]]]
|
Optional filter by destination connection id or name. Accepts a single value, a comma-separated string, or a list. |
None
|
dataflows
|
Optional[List[DataFlow]]
|
Pre-loaded dataflows to maintain. When |
None
|
do_compact
|
bool
|
Run the compaction (optimize) step. |
True
|
do_cleanup
|
bool
|
Run the cleanup (vacuum) step. |
True
|
Returns:
| Type | Description |
|---|---|
ExecutionResult
|
Aggregated execution statistics. |
job_distributor
¶
Job distribution logic for parallel execution of dataflows.
JobDistributor assigns dataflows to jobs using either hash-based or
group-based distribution, and groups them for sequential execution within
a group.
Distribution rules
- group_number is not None →
group_number % job_num == job_index - group_number is None →
hash(dataflow_id) % job_num == job_index
Within a group, dataflows are sorted by execution_order.
JobDistributor
¶
Distribute dataflows across parallel jobs.
Example::
distributor = JobDistributor(job_num=4, job_index=0)
my_dataflows = distributor.filter_dataflows(all_dataflows)
groups = distributor.group_dataflows(my_dataflows)
filter_dataflows
¶
get_grouped_dataflows
¶
Return only grouped dataflows (group_number is not None).
Returns:
| Type | Description |
|---|---|
Dict[int, List[DataFlow]]
|
Mapping of group_number → sorted list of dataflows. |
get_independent_dataflows
¶
Return dataflows with no group (group_number is None).
parallel_executor
¶
Parallel executor for dataflow and maintenance processing.
ParallelExecutor uses :class:concurrent.futures.ThreadPoolExecutor
to run dataflows in parallel, respecting group ordering and
stop-on-error semantics.
ExecutionResult
dataclass
¶
ExecutionResult(total: int = 0, succeeded: int = 0, failed: int = 0, skipped: int = 0, errors: Dict[str, str] = dict(), duration_seconds: float = 0.0)
Lightweight statistics container for a parallel execution run.
Detailed per-dataflow logs are written by the ETLLogger; this class only tracks counters and errors.
ParallelExecutor
¶
Thread-based parallel executor for dataflows.
Features
- Configurable
max_workers. stop_on_errorcancels remaining work on first failure.execute_with_groupsrespects sequential ordering within groups.
Example::
executor = ParallelExecutor(max_workers=4)
result = executor.execute(dataflows, process_fn)
execute
¶
execute(dataflows: List[DataFlow], process_fn: Callable[[DataFlow], DataFlowRuntimeInfo], callback: Optional[Callable[[DataFlowRuntimeInfo], None]] = None) -> ExecutionResult
Execute dataflows in parallel.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
dataflows
|
List[DataFlow]
|
Items to process. |
required |
process_fn
|
Callable[[DataFlow], DataFlowRuntimeInfo]
|
Callable receiving a :class: |
required |
callback
|
Optional[Callable[[DataFlowRuntimeInfo], None]]
|
Optional per-item completion callback. |
None
|
Returns:
| Type | Description |
|---|---|
ExecutionResult
|
Aggregated execution statistics. |
execute_sequential
¶
execute_sequential(dataflows: List[DataFlow], process_fn: Callable[[DataFlow], DataFlowRuntimeInfo], callback: Optional[Callable[[DataFlowRuntimeInfo], None]] = None) -> ExecutionResult
Execute dataflows one by one (for grouped / ordered execution).
execute_with_groups
¶
execute_with_groups(groups: Dict[Optional[int], List[DataFlow]], process_fn: Callable[[DataFlow], DataFlowRuntimeInfo], callback: Optional[Callable[[DataFlowRuntimeInfo], None]] = None) -> ExecutionResult
Execute respecting group ordering.
Nonegroup → individual items run in parallel.- Numbered groups → items within a group run sequentially; different groups run in parallel.
retry_handler
¶
Retry handler with exponential backoff.
RetryHandler wraps a callable and retries it on any exception up to
the configured number of attempts. If all attempts fail the last exception
is re-raised.
Example::
handler = RetryHandler(retry_count=3, retry_delay=2.0)
result = handler.execute(my_function, arg1, arg2)
RetryHandler
¶
RetryHandler(retry_count: int = 0, retry_delay: float = 5.0, backoff_multiplier: float = 2.0, max_delay: float = 60.0)
Retry logic with configurable exponential backoff.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
retry_count
|
int
|
Maximum number of retries (0 = no retry). |
0
|
retry_delay
|
float
|
Base delay in seconds between retries. |
5.0
|
backoff_multiplier
|
float
|
Multiplier applied each retry (exponential). |
2.0
|
max_delay
|
float
|
Upper cap on computed delay. |
60.0
|
The actual delay for attempt n is::
min(retry_delay * backoff_multiplier ** n, max_delay)
compute_delay
¶
Compute the delay before the given retry attempt (0-based).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
attempt
|
int
|
The retry attempt number (0 = first retry). |
required |
Returns:
| Type | Description |
|---|---|
float
|
Sleep duration in seconds. |
execute
¶
Execute func with retry logic.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
func
|
Callable[..., T]
|
Callable to execute. |
required |
*args
|
Any
|
Positional arguments forwarded to func. |
()
|
**kwargs
|
Any
|
Keyword arguments forwarded to func. |
{}
|
Returns:
| Type | Description |
|---|---|
T
|
|
int
|
number of attempts used (1 = first-try success). |
Raises:
| Type | Description |
|---|---|
Exception
|
The last exception once all attempts are exhausted. |
utils
¶
Shared helpers for orchestration-layer concerns.
Lives next to :mod:driver, :mod:job_distributor, :mod:parallel_executor,
and :mod:retry_handler. Hosts small, reusable utilities that coordinate
work across dataflows without pulling logic into the driver or model layer.
dedupe_by_destination
¶
Return one dataflow per physical destination.
Multiple dataflows may target the same table or storage path
(fan-in topology). When the unit of work is the destination
itself — e.g. running OPTIMIZE / VACUUM — executing the
same work more than once would race or waste compute. Input is
sorted by dataflow_id first so "first-meet wins" is
deterministic across runs regardless of metadata ordering.
Dataflows whose :attr:~datacoolie.core.models.Destination.destination_key
cannot be computed are skipped with a warning rather than aborting
the whole run.