Skip to content

Metadata schema

DataCoolie's metadata contract lives in datacoolie.core.models as CompatModel-backed dataclasses. This page is generated at docs-build time from those models — field descriptions, defaults, and validation rules come straight from source. Treat the models as the source of truth; this page as the rendered view.

Top-level run configuration

DataCoolieRunConfig dataclass

DataCoolieRunConfig()

Validated execution parameters for a DataCoolie run.

Connection

Connection dataclass

Connection()

Endpoint configuration for a data source or destination.

The configure JSON field stores type-specific settings (host, port, read_options, write_options, etc.). Frequently-used values are surfaced as computed properties.

base_path property

base_path: Optional[str]

Base storage path (e.g. abfss://container@storage/).

database_type property

database_type: Optional[str]

Database type (mysql, mssql, postgresql, oracle, sqlite).

auth_type property

auth_type: Optional[str]

Database authentication type (password, service_principal, managed_identity, access_token).

tenant_id property

tenant_id: Optional[str]

Azure AD tenant ID for service_principal auth.

token property

token: Optional[str]

Pre-fetched access token for access_token auth.

url property

url: Optional[str]

Explicit URL / connection string from configure.

driver property

driver: Optional[str]

JDBC driver class name.

athena_output_location property

athena_output_location: Optional[str]

S3 path for Athena DDL query results.

When set, the writer always registers a native Delta table via Athena DDL (DROP + CREATE EXTERNAL TABLE ... TBLPROPERTIES ('table_type'='DELTA')) after every write and maintenance.

generate_manifest property

generate_manifest: bool

Generate _symlink_format_manifest/ after writes and maintenance.

register_symlink_table: bool

Register a SymlinkTextInputFormat table in Glue after writes.

Implies :attr:generate_manifest.

symlink_database_prefix: str

Prefix for symlink Glue database name. Default "symlink_".

date_backward property

date_backward: Optional[Dict[str, Any]]

Backward look-back offset for date-folder partition discovery.

Reads backward_days, backward_months, backward_hours as top-level keys from config, or a nested backward dict.

Strategies:

Fixed offset — subtract days / months / hours from watermark::

config:
  backward_days: 7
  # or
  backward: {days: 7, months: 1}

Closing-day — monthly period boundary based on current date::

config:
  backward: {closing_day: 10}

refresh_from_configure

refresh_from_configure() -> None

Unconditionally sync database and catalog from configure.

Unlike the model validator (which only sets empty fields at construction time), this always overwrites — call after secret resolution when configure values have been resolved from vault keys to real values.

Source

Source dataclass

Source()

Read-side pipeline configuration.

namespace property

namespace: Optional[str]

Namespace without the table: catalog.database.schema.

read_options property

read_options: Dict[str, Any]

Merged read options: connection defaults + source overrides.

date_backward property

date_backward: Optional[Dict[str, Any]]

Backward look-back offset, source-level overrides connection-level.

Reads from configure (same keys as :attr:Connection.date_backward). If no source-level config is present, falls back to the connection's value.

Example (YAML / source configure)::

configure:
  backward_days: 7         # overrides connection setting
  # or
  backward: {months: 1}
  # or closing-day strategy
  backward: {closing_day: 10}

Destination

Destination dataclass

Destination()

Write-side pipeline configuration.

namespace property

namespace: Optional[str]

Namespace without the table: catalog.database.schema.

destination_key property

destination_key: str

Stable identity for this destination as a physical object.

Two destinations that resolve to the same physical object share the same key. Useful for orchestration concerns like deduplicating fan-in writes or scheduling maintenance at most once per object.

Identity priority:

  1. Fully-qualified table name when catalog or database is set on the connection — this matches how Databricks Unity Catalog, Fabric Lakehouse, and AWS Glue address tables.
  2. Storage path otherwise — covers unregistered Delta tables (local dev / tests).

Results are prefixed ("table:" / "path:") to prevent a path string from colliding with a qualified name, and lowercased for case-insensitive equivalence.

Raises:

Type Description
ConfigurationError

When the destination has neither a catalog/database registration nor a storage path.

write_options property

write_options: Dict[str, Any]

Merged write options: connection defaults + destination overrides.

merge_keys_extended property

merge_keys_extended: List[str]

Return merge keys extended with partition columns.

scd2_effective_column property

scd2_effective_column: Optional[str]

SQL expression used as __valid_from for SCD2 loads.

Read from destination.configure["scd2_effective_column"]. Returns None when not set (non-SCD2 destinations).

replace_by_watermark property

replace_by_watermark: bool

Whether merge_overwrite should use range-based window replace.

When True, the strategy deletes all target rows within the watermark window (watermark_effective → new_watermark) instead of doing key-based delete. This handles source-side deletions.

Requires date_backward on the source to ensure the read window covers the delete scope.

Transform

Transform dataclass

Transform()

Transformation rules applied between source read and destination write.

convert_timestamp_ntz property

convert_timestamp_ntz: bool

Whether to convert timestamp_ntz columns to timestamp.

Reads convert_timestamp_ntz from :attr:configure. Defaults to True.

Example (YAML / metadata)::

transform:
  configure:
    convert_timestamp_ntz: false

deduplicate_by_rank property

deduplicate_by_rank: bool

Whether to use RANK-based deduplication instead of ROW_NUMBER.

Reads deduplicate_by_rank from :attr:configure. Defaults to False.

Example (YAML / metadata)::

transform:
  configure:
    deduplicate_by_rank: true

deduplicate_column_names

deduplicate_column_names(merge_keys: List[str] | None = None) -> List[str]

Return dedup columns, falling back to merge_keys.

DataFlow

DataFlow dataclass

DataFlow()

Complete ETL pipeline configuration.

Composes :class:Source, :class:Destination, and :class:Transform.

watermark_window property

watermark_window: Optional[Dict[str, tuple]]

Watermark window for range-based replace.

Set by the driver after source read. Maps column names to (lower_bound, upper_bound) tuples.

order_columns property

order_columns: List[str]

Columns used to order rows during deduplication.

Returns transform.latest_data_columns when set, otherwise falls back to source.watermark_columns.

validate

validate() -> None

Validate metadata configuration; raise on invalid combinations.

Called automatically in __post_init__ to fail fast at metadata load time.

apply_watermark_window

apply_watermark_window(source_runtime: 'SourceRuntimeInfo') -> None

Set :attr:watermark_window from source runtime info.

Computes the (watermark_effective, watermark_after) window per column when replace_by_watermark is active and both bounds are available. Otherwise leaves the window as None.

Supporting models

ReplayConfig dataclass

ReplayConfig(start: Any, end: Any, chunk_interval: Optional[Dict[str, int]] = None, save_watermark: bool = False, chunk_column: Optional[str] = None)

Configuration for replaying a bounded time range in chunks.

Used by :meth:DataCoolieDriver.run_replay to reprocess historical data without corrupting the production watermark.

The range uses the left-closed, right-open [start, end) convention: start is inclusive, end is exclusive. This aligns chunks to whole calendar units (days, weeks, months, etc.) and is the industry-standard interval convention used by Python’s range(), Spark partition pruning, and PostgreSQL range types.

Example::

# Replay all of Q1 2025 in monthly chunks:
ReplayConfig(
    start="2025-01-01",  # inclusive
    end="2025-04-01",    # exclusive (first day NOT included)
    chunk_interval={"months": 1},
)
# Produces chunks: [Jan 1, Feb 1), [Feb 1, Mar 1), [Mar 1, Apr 1)

The chunk column is auto-resolved from dataflow.source.watermark_columns[0] at runtime. Override with chunk_column for multi-column watermarks where the first column is not the one to chunk on.

Type detection is automatic:

  • str parseable to date/datetime → time-based chunking
  • datetime / date objects → time-based chunking
  • int → integer-based chunking

Parameters:

Name Type Description Default
start Any

Inclusive lower bound of the replay range.

required
end Any

Exclusive upper bound of the replay range.

required
chunk_interval Optional[Dict[str, int]]

Chunking interval. Time-based keys (months, days, hours, minutes, weeks, years) use relativedelta; step key is for integer watermarks. None disables chunking (single-shot replay).

None
save_watermark bool

When True (init mode), save the watermark after each successful chunk — enables crash-resume. When False (backfill mode), the stored watermark is never touched.

False
chunk_column Optional[str]

Override auto-resolved chunk column. Only needed for multi-column watermarks where the first column is not the desired chunking dimension.

None

SchemaHint dataclass

SchemaHint()

Column-level type hint for schema conversion.

PartitionColumn dataclass

PartitionColumn()

Partition column definition.

expression is an optional SQL expression used to derive the partition value (e.g. "year(event_date)").

AdditionalColumn dataclass

AdditionalColumn()

Computed column added during the transform phase.

Enums

LoadType

Supported load (write) strategies.

Format

Supported data formats.

ConnectionType

Connection endpoint categories.

ProcessingMode

ETL processing modes.

DataFlowStatus

Dataflow execution statuses.