Skip to content

Metadata

base

Abstract base class for metadata providers and in-memory metadata cache.

BaseMetadataProvider uses a Template Method pattern: public get_* methods call protected _fetch_* abstract methods, wrapping them with an optional in-memory cache layer (MetadataCache).

Concrete providers — FileProvider, DatabaseProvider, APIClient — implement only the _fetch_* and watermark methods.

BaseMetadataProvider

BaseMetadataProvider(*, enable_cache: bool = True, auto_prefetch: bool = True, eager_prefetch: bool = False)

Bases: ABC

Abstract metadata provider with optional caching.

Subclasses implement the _fetch_* methods (I/O layer) and the watermark methods. This base class wraps them with cache-aside logic: check cache → miss → fetch → store → return.

Context managerwith provider: ... calls :meth:close on exit.

clear_cache

clear_cache() -> None

Clear the in-memory cache (no-op when caching is disabled).

close

close() -> None

Release resources. Base implementation clears the cache.

get_connection_by_id

get_connection_by_id(connection_id: str) -> Optional[Connection]

Return a single connection by connection_id.

When the cache has been bulk-loaded and the id is not in it, returns None without falling through to a remote fetch (negative cache — bulk-load owns the full workspace).

get_connection_by_name

get_connection_by_name(name: str) -> Optional[Connection]

Return a single connection by name.

Negative-cache behaviour mirrors :meth:get_connection_by_id.

get_connections

get_connections(*, active_only: bool = True) -> List[Connection]

Return all connections, optionally filtered to active ones.

When the cache has been bulk-loaded via :meth:prefetch_all, this is served entirely from memory. Otherwise the fetched list is seeded into the cache so that subsequent :meth:get_connection_by_id / _by_name lookups hit the cache. Seeding a filtered subset is safe because is_loaded() gates bulk reads and set_connections replaces on every full fetch.

get_dataflow_by_id

get_dataflow_by_id(dataflow_id: str, *, attach_schema_hints: bool = True) -> Optional[DataFlow]

Return a single dataflow by dataflow_id.

When the cache has been bulk-loaded and the id is not in it, returns None without falling through to a remote fetch (negative cache).

get_dataflows

get_dataflows(*, stage: Optional[Union[str, List[str]]] = None, active_only: bool = True, attach_schema_hints: bool = True) -> List[DataFlow]

Return dataflows, optionally filtered by stage.

stage may be a single name, a comma-separated string ("bronze2silver,silver2gold"), or a list of names. When attach_schema_hints is True (default), schema hints are fetched and attached to each dataflow's transform.schema_hints.

On the first call (when caching is enabled and auto_prefetch is on), the entire workspace metadata is bulk-loaded into the cache via :meth:prefetch_all; subsequent calls are served entirely from memory.

get_maintenance_dataflows

get_maintenance_dataflows(*, connection: Optional[Union[str, List[str]]] = None, active_only: bool = True, attach_schema_hints: bool = False) -> List[DataFlow]

Return lakehouse-only dataflows for maintenance.

Only destinations whose format is in CONNECTION_TYPE_FORMATS[ConnectionType.LAKEHOUSE.value] (Delta Lake and Iceberg) are included.

Parameters:

Name Type Description Default
connection Optional[Union[str, List[str]]]

Filter by destination connection id or name. Accepts a single value, a comma-separated string, or a list.

None
active_only bool

Skip inactive dataflows.

True
attach_schema_hints bool

Attach schema hints from metadata.

False

get_schema_hints

get_schema_hints(connection_id: str, table_name: str, schema_name: Optional[str] = None) -> List[SchemaHint]

Return schema hints for a given connection + table.

get_watermark abstractmethod

get_watermark(dataflow_id: str) -> Optional[str]

Return the raw serialised watermark string for dataflow_id, or None.

prefetch_all

prefetch_all() -> None

Bulk-load connections, dataflows and schema hints into the cache.

After this call returns, get_connections, get_dataflows, get_schema_hints and the per-id getters are served entirely from the in-memory cache — no further I/O is performed by the provider for those reads.

Workspace scoping (database / API providers) and the single-file nature of the file provider make it safe to load the full set of metadata up-front.

The bulk-load always loads the full set (active and inactive) so that subsequent calls with any active_only flag can be filtered correctly from cache.

Idempotent and thread-safe: concurrent callers race on a lock and only one bulk-load is performed. No-op when caching is disabled or when the cache is already loaded.

Subclasses implement :meth:_bulk_load to provide the actual I/O.

update_watermark abstractmethod

update_watermark(dataflow_id: str, watermark_value: str, *, job_id: Optional[str] = None, dataflow_run_id: Optional[str] = None) -> None

Persist a serialised watermark for dataflow_id.

file_provider

File-based metadata provider — reads connections, dataflows, and schema hints from YAML, JSON, or Excel (.xlsx / .xls) configuration files.

FileProvider is the primary standalone / development metadata backend. Watermark state is stored as JSON files on the platform's file system.

Supported formats

YAML / JSON — hierarchical, nested structure:

.. code-block:: yaml

connections:
  - name: bronze_adls
    connection_type: file
    format: delta
    configure:
      base_path: abfss://bronze@storage/
      use_schema_hint: true

  - name: source_erp
    connection_type: database
    format: jdbc
    database: ERP
    configure:
      host: erp-server.example.com
      port: 1433
      username: etl_reader

dataflows:
  - name: orders_bronze_to_silver
    stage: bronze2silver
    source:
      connection_name: bronze_adls
      table: orders
      watermark_columns: [modified_at]
    destination:
      connection_name: silver_lakehouse
      table: dim_orders
      load_type: merge_upsert
      merge_keys: [order_id]
    transform:
      schema_hints:
        - column_name: amount
          data_type: DECIMAL
          precision: 18
          scale: 2

schema_hints:
  - connection_name: bronze_adls
    table_name: orders
    hints:
      - column_name: order_date
        data_type: DATE
        format: yyyy-MM-dd

Excel (.xlsx / .xls) — flat workbook with three sheets:

connections sheet — one row per connection:

Required: name, connection_type Optional: connection_id, format, catalog, database, is_active Nested configure via configure_* columns (e.g. configure_base_path, configure_host, configure_use_schema_hint) catalog and database are top-level columns (not configure_catalog or configure_database) secrets_ref: JSON column mapping config field names to vault key names (e.g. {"password": "vault/db-pass", "api_key": "vault/api-key"})

dataflows sheet — one row per dataflow:

Required: source_connection_name, source_table, destination_connection_name, destination_table Optional: dataflow_id, name, stage, description, group_number, execution_order, processing_mode, is_active configure: JSON column for the dataflow's own configure dict. transform: JSON column for the full transform dict (individual transform_* columns take precedence when both are present). Source fields prefixed source_; destination fields prefixed destination_; List columns (e.g. source_watermark_columns, destination_merge_keys, destination_partition_columns) accept comma-separated values. transform_schema_hints, transform_additional_columns, transform_partition_columns accept JSON strings.

schema_hints sheet — one row per hint (grouped internally by connection_name + table_name + optional schema_name):

Columns: connection_name, table_name, schema_name (optional), column_name, data_type, precision, scale, format

Separate files — each section may live in its own file instead of (or in addition to) the primary config_path. Pass any combination of:

  • connections_path — file that contains only a connections list.
  • schema_hints_path — file that contains only a schema_hints list.

Each override file can be YAML, JSON, or Excel in the same format as the corresponding section in the primary file. When a separate path is specified, its section replaces the same section from the primary file.

FileProvider

FileProvider(config_path: str, platform: BasePlatform, *, connections_path: Optional[str] = None, schema_hints_path: Optional[str] = None, watermark_base_path: Optional[str] = None, enable_cache: bool = True, eager_prefetch: bool = False)

Bases: BaseMetadataProvider

Metadata provider backed by YAML, JSON, or Excel configuration files.

Parameters:

Name Type Description Default
config_path str

Path to the primary YAML, JSON, or Excel metadata file. May contain connections, dataflows, and/or schema_hints sections.

required
platform BasePlatform

Platform used for watermark file I/O.

required
connections_path Optional[str]

Optional separate file that provides the connections list. Overrides any connections section in config_path when supplied.

None
schema_hints_path Optional[str]

Optional separate file that provides the schema_hints list. Overrides any schema_hints section in config_path when supplied.

None
watermark_base_path Optional[str]

Root directory for watermark files. Defaults to <config_path_parent>/watermarks.

None
enable_cache bool

Enable the in-memory cache.

True

close

close() -> None

Clear cache, memoised builders, and internal data.

get_watermark

get_watermark(dataflow_id: str) -> Optional[str]

Return the raw watermark JSON string for dataflow_id, or None.

update_watermark

update_watermark(dataflow_id: str, watermark_value: str, *, job_id: Optional[str] = None, dataflow_run_id: Optional[str] = None) -> None

Write the serialised watermark JSON to a file.

database_provider

Database-backed metadata provider — SQLAlchemy 2.0.

DatabaseProvider reads connections, dataflows, schema hints and watermarks from a relational database using SQLAlchemy 2.0 ORM sessions.

The database schema follows the dc_framework_* table convention described in the architecture document (03-metadata-schema-design).

All queries are workspace-scoped and honour soft-delete (deleted_at IS NULL).

DatabaseProvider

DatabaseProvider(*, connection_string: Optional[str] = None, workspace_id: str, engine: Optional[Engine] = None, enable_cache: bool = True, pool_size: int = 10, max_overflow: int = 20, pool_pre_ping: bool = True, pool_timeout: int = 30, pool_recycle: int = 1800, warm_up_retries: int = 1, warm_up_delay: float = 15.0, eager_prefetch: bool = False)

Bases: BaseMetadataProvider

Metadata provider backed by a relational database (SQLAlchemy 2.0).

Supports any SQLAlchemy-compatible backend (PostgreSQL, SQLite, SQL Server, MySQL, etc.).

Parameters:

Name Type Description Default
connection_string Optional[str]

SQLAlchemy database URL.

None
workspace_id str

Workspace scope for all queries.

required
engine Optional[Engine]

Optional pre-built Engine (overrides connection_string and all pool parameters).

None
enable_cache bool

Enable the in-memory metadata cache.

True
pool_size int

Number of persistent connections kept in the pool.

10
max_overflow int

Extra connections allowed beyond pool_size under load. Combined with pool_size this caps total concurrent connections at pool_size + max_overflow.

20
pool_pre_ping bool

When True (default), validate each connection before checkout with a lightweight ping. Prevents stale- connection errors common with serverless / auto-paused databases.

True
warm_up_retries int

Number of extra connection attempts on the very first session open. Defaults to 1 so that serverless / auto-paused databases (e.g. Azure SQL Serverless, Aurora Serverless) get one automatic retry while they resume from cold start. Set to 0 to disable.

1
warm_up_delay float

Seconds to wait between warm-up retry attempts.

15.0

close

close() -> None

Dispose the engine and clear cache.

create_tables

create_tables() -> None

Create all dc_framework_* tables (useful for testing).

drop_tables

drop_tables() -> None

Drop all dc_framework_* tables (useful for testing).

get_watermark

get_watermark(dataflow_id: str) -> Optional[str]

Return the raw serialised watermark string for dataflow_id, or None.

update_watermark

update_watermark(dataflow_id: str, watermark_value: str, *, job_id: Optional[str] = None, dataflow_run_id: Optional[str] = None) -> None

Upsert a watermark with atomic previous_value rotation.

Strategy (avoids SELECT … FOR UPDATE gap locks which cause deadlocks on MySQL InnoDB under parallel load):

  1. Issue a single UPDATE that rotates previous_value from the current value in one statement (SQL evaluates all RHS expressions from pre-update values).
  2. If rowcount == 0, the row does not yet exist → INSERT.
  3. If the insert races with another worker and raises :class:IntegrityError, rollback and re-run the UPDATE.

Transient MySQL deadlocks / lock-wait timeouts (OperationalError) are retried up to _WATERMARK_MAX_RETRIES times.

api_client

REST API metadata provider — httpx.

APIClient fetches connections, dataflows, schema hints and watermarks from a remote metadata API via HTTP.

Authentication is done via an X-API-Key header. The client handles automatic pagination, retry on transient errors (429 / 5xx), and maps all HTTP failures to MetadataError or WatermarkError.

APIClient

APIClient(*, base_url: str, api_key: str, workspace_id: str, enable_cache: bool = True, timeout: float = 30.0, max_retries: int = 3, retry_backoff: float = 1.0, page_size: int = 200, max_workers: int = 8, eager_prefetch: bool = False)

Bases: BaseMetadataProvider

Metadata provider backed by a remote REST API.

Parameters:

Name Type Description Default
base_url str

Root URL of the metadata API, including any path prefix (e.g. https://api.datacoolie.io/api/v1).

required
api_key str

Value for the X-API-Key header.

required
workspace_id str

Workspace scope for all requests.

required
enable_cache bool

Enable the in-memory metadata cache.

True
timeout float

Request timeout in seconds (default 30).

30.0
max_retries int

Number of retries on transient failures (default 3).

3
retry_backoff float

Base backoff in seconds for exponential retry (default 1.0).

1.0

close

close() -> None

Close the HTTP client and clear cache.

get_watermark

get_watermark(dataflow_id: str) -> Optional[str]

Return the raw serialised watermark string for dataflow_id, or None.

The API may return current_value as a JSON string or as an already-parsed dict. Either way, this method normalises the result to a JSON string so that WatermarkManager can deserialize it uniformly.

update_watermark

update_watermark(dataflow_id: str, watermark_value: str, *, job_id: Optional[str] = None, dataflow_run_id: Optional[str] = None) -> None

Update the watermark via the API.