Orchestration¶
TL;DR DataCoolieDriver is a thin coordinator. Heavy lifting is split
across JobDistributor (multi-job sharding), ParallelExecutor
(thread-level), and RetryHandler (per-dataflow retry with backoff).
Driver¶
with DataCoolieDriver(
engine=engine,
platform=platform, # or attached via engine
metadata_provider=metadata,
watermark_manager=None, # auto-created from metadata
config=DataCoolieRunConfig(job_num=4, job_index=0, max_workers=4),
secret_provider=None, # defaults to platform
base_log_path="logs/", # auto-creates ETLLogger + SystemLogger
) as driver:
result = driver.run(stage=["bronze2silver"])
Key behaviours:
- Constructor injection for every dependency — no global state.
- Auto-creates
WatermarkManagerwhen a metadata provider is supplied and no manager is passed. - Auto-creates loggers under
base_log_path/{system,etl}_logswhen you don't bring your own. - Resource cleanup via context manager — loggers flush, connections close.
- Platform-type guard — refuses to run if
platformandengine.platformare different concrete types.
Job distribution¶
JobDistributor is a hash-based sharder for horizontally scaling a single
metadata set across multiple worker processes or cluster tasks:
config = DataCoolieRunConfig(job_num=4, job_index=2)
# → this worker picks up dataflows where hash(dataflow.id) % 4 == 2
Use this when running four Spark jobs in parallel to split 1000 dataflows evenly. Each dataflow runs exactly once across the fleet.
Parallel execution¶
ParallelExecutor uses a ThreadPoolExecutor (not processes — Spark and
Polars both release the GIL during I/O and compute). ExecutionResult fields:
total— dataflows submittedsucceeded— finished withstatus == "succeeded"failed— raisedskipped— not executed (e.g. afterstop_on_error=Trueshort-circuit)
Retry¶
RetryHandler wraps each dataflow with:
retry_countattemptsretry_delayseconds (fixed delay between attempts)
If all attempts fail the error is recorded and the executor moves on (or
stops, per stop_on_error).
Retry is dataflow-scoped, not pipeline-scoped. A transient network blip reads the source again on retry — including re-applying the current watermark, which means retries are idempotent for incremental loads.
Maintenance path¶
driver.run_maintenance(connection=..., do_compact=True, do_cleanup=True) is a parallel
variant for OPTIMIZE / VACUUM. It:
- Loads the same dataflow metadata.
- Deduplicates by destination so fan-in topologies don't race on the same table.
- Dispatches to
BaseDestinationWriter.run_maintenance.
See How-to · Maintenance.
Dry run¶
DataCoolieRunConfig(dry_run=True) logs what would happen without reads,
writes, or watermark updates. Useful for validating new metadata before the
first real run.
Replay / backfill¶
driver.run_replay(dataflows, replay: ReplayConfig) re-processes a bounded
historical range in sequential, calendar-aligned chunks without disturbing the
production watermark.
Each dataflow is processed concurrently (bounded by max_workers); chunks
within a single dataflow always run sequentially. A ReplayConfig specifies:
start/end— inclusive/exclusive bounds (timestamps, dates, or integers).chunk_interval— chunking unit such as{"months": 1}or{"days": 7}.save_watermark— whenTrue, enables crash-resume by saving the chunk upper bound as the watermark after each successful chunk.chunk_column— overrides the auto-resolved column (defaults towatermark_columns[0]).
from datacoolie.core.models import ReplayConfig
replay = ReplayConfig(
start="2025-01-01",
end="2025-04-01",
chunk_interval={"months": 1},
)
result = driver.run_replay(dataflows=dataflows, replay=replay)
See How-to · Replay & backfill.