Metadata¶
base
¶
Abstract base class for metadata providers and in-memory metadata cache.
BaseMetadataProvider uses a Template Method pattern: public get_*
methods call protected _fetch_* abstract methods, wrapping them with an
optional in-memory cache layer (MetadataCache).
Concrete providers — FileProvider, DatabaseProvider, APIClient —
implement only the _fetch_* and watermark methods.
BaseMetadataProvider
¶
BaseMetadataProvider(*, enable_cache: bool = True, auto_prefetch: bool = True, eager_prefetch: bool = False)
Bases: ABC
Abstract metadata provider with optional caching.
Subclasses implement the _fetch_* methods (I/O layer) and the
watermark methods. This base class wraps them with cache-aside logic:
check cache → miss → fetch → store → return.
Context manager — with provider: ... calls :meth:close on exit.
get_connection_by_id
¶
get_connection_by_id(connection_id: str) -> Optional[Connection]
Return a single connection by connection_id.
When the cache has been bulk-loaded and the id is not in it,
returns None without falling through to a remote fetch
(negative cache — bulk-load owns the full workspace).
get_connection_by_name
¶
get_connection_by_name(name: str) -> Optional[Connection]
Return a single connection by name.
Negative-cache behaviour mirrors :meth:get_connection_by_id.
get_connections
¶
get_connections(*, active_only: bool = True) -> List[Connection]
Return all connections, optionally filtered to active ones.
When the cache has been bulk-loaded via :meth:prefetch_all,
this is served entirely from memory. Otherwise the fetched
list is seeded into the cache so that subsequent
:meth:get_connection_by_id / _by_name lookups hit the
cache. Seeding a filtered subset is safe because
is_loaded() gates bulk reads and set_connections
replaces on every full fetch.
get_dataflow_by_id
¶
get_dataflow_by_id(dataflow_id: str, *, attach_schema_hints: bool = True) -> Optional[DataFlow]
Return a single dataflow by dataflow_id.
When the cache has been bulk-loaded and the id is not in it,
returns None without falling through to a remote fetch
(negative cache).
get_dataflows
¶
get_dataflows(*, stage: Optional[Union[str, List[str]]] = None, active_only: bool = True, attach_schema_hints: bool = True) -> List[DataFlow]
Return dataflows, optionally filtered by stage.
stage may be a single name, a comma-separated string
("bronze2silver,silver2gold"), or a list of names.
When attach_schema_hints is True (default), schema hints
are fetched and attached to each dataflow's transform.schema_hints.
On the first call (when caching is enabled and auto_prefetch
is on), the entire workspace metadata is bulk-loaded into the
cache via :meth:prefetch_all; subsequent calls are served
entirely from memory.
get_maintenance_dataflows
¶
get_maintenance_dataflows(*, connection: Optional[Union[str, List[str]]] = None, active_only: bool = True, attach_schema_hints: bool = False) -> List[DataFlow]
Return lakehouse-only dataflows for maintenance.
Only destinations whose format is in
CONNECTION_TYPE_FORMATS[ConnectionType.LAKEHOUSE.value]
(Delta Lake and Iceberg) are included.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
connection
|
Optional[Union[str, List[str]]]
|
Filter by destination connection id or name. Accepts a single value, a comma-separated string, or a list. |
None
|
active_only
|
bool
|
Skip inactive dataflows. |
True
|
attach_schema_hints
|
bool
|
Attach schema hints from metadata. |
False
|
get_schema_hints
¶
get_schema_hints(connection_id: str, table_name: str, schema_name: Optional[str] = None) -> List[SchemaHint]
Return schema hints for a given connection + table.
get_watermark
abstractmethod
¶
Return the raw serialised watermark string for dataflow_id, or None.
prefetch_all
¶
Bulk-load connections, dataflows and schema hints into the cache.
After this call returns, get_connections, get_dataflows,
get_schema_hints and the per-id getters are served entirely
from the in-memory cache — no further I/O is performed by the
provider for those reads.
Workspace scoping (database / API providers) and the single-file nature of the file provider make it safe to load the full set of metadata up-front.
The bulk-load always loads the full set (active and inactive)
so that subsequent calls with any active_only flag can be
filtered correctly from cache.
Idempotent and thread-safe: concurrent callers race on a lock and only one bulk-load is performed. No-op when caching is disabled or when the cache is already loaded.
Subclasses implement :meth:_bulk_load to provide the actual
I/O.
update_watermark
abstractmethod
¶
update_watermark(dataflow_id: str, watermark_value: str, *, job_id: Optional[str] = None, dataflow_run_id: Optional[str] = None) -> None
Persist a serialised watermark for dataflow_id.
file_provider
¶
File-based metadata provider — reads connections, dataflows, and schema hints from YAML, JSON, or Excel (.xlsx / .xls) configuration files.
FileProvider is the primary standalone / development metadata backend.
Watermark state is stored as JSON files on the platform's file system.
Supported formats¶
YAML / JSON — hierarchical, nested structure:
.. code-block:: yaml
connections:
- name: bronze_adls
connection_type: file
format: delta
configure:
base_path: abfss://bronze@storage/
use_schema_hint: true
- name: source_erp
connection_type: database
format: jdbc
database: ERP
configure:
host: erp-server.example.com
port: 1433
username: etl_reader
dataflows:
- name: orders_bronze_to_silver
stage: bronze2silver
source:
connection_name: bronze_adls
table: orders
watermark_columns: [modified_at]
destination:
connection_name: silver_lakehouse
table: dim_orders
load_type: merge_upsert
merge_keys: [order_id]
transform:
schema_hints:
- column_name: amount
data_type: DECIMAL
precision: 18
scale: 2
schema_hints:
- connection_name: bronze_adls
table_name: orders
hints:
- column_name: order_date
data_type: DATE
format: yyyy-MM-dd
Excel (.xlsx / .xls) — flat workbook with three sheets:
connections sheet — one row per connection:
Required: name, connection_type
Optional: connection_id, format, catalog, database, is_active
Nested configure via configure_* columns (e.g. configure_base_path, configure_host,
configure_use_schema_hint)
catalog and database are top-level columns (not configure_catalog or configure_database)
secrets_ref: JSON column mapping config field names to vault key names
(e.g. {"password": "vault/db-pass", "api_key": "vault/api-key"})
dataflows sheet — one row per dataflow:
Required: source_connection_name, source_table,
destination_connection_name, destination_table
Optional: dataflow_id, name, stage, description, group_number,
execution_order, processing_mode, is_active
configure: JSON column for the dataflow's own configure dict.
transform: JSON column for the full transform dict (individual transform_*
columns take precedence when both are present).
Source fields prefixed source_; destination fields prefixed destination_;
List columns (e.g. source_watermark_columns, destination_merge_keys,
destination_partition_columns) accept comma-separated values.
transform_schema_hints, transform_additional_columns,
transform_partition_columns accept JSON strings.
schema_hints sheet — one row per hint (grouped internally by
connection_name + table_name + optional schema_name):
Columns: connection_name, table_name, schema_name (optional),
column_name, data_type, precision, scale, format
Separate files — each section may live in its own file instead of (or
in addition to) the primary config_path. Pass any combination of:
connections_path— file that contains only aconnectionslist.schema_hints_path— file that contains only aschema_hintslist.
Each override file can be YAML, JSON, or Excel in the same format as the corresponding section in the primary file. When a separate path is specified, its section replaces the same section from the primary file.
FileProvider
¶
FileProvider(config_path: str, platform: BasePlatform, *, connections_path: Optional[str] = None, schema_hints_path: Optional[str] = None, watermark_base_path: Optional[str] = None, enable_cache: bool = True, eager_prefetch: bool = False)
Bases: BaseMetadataProvider
Metadata provider backed by YAML, JSON, or Excel configuration files.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config_path
|
str
|
Path to the primary YAML, JSON, or Excel metadata file.
May contain |
required |
platform
|
BasePlatform
|
Platform used for watermark file I/O. |
required |
connections_path
|
Optional[str]
|
Optional separate file that provides the
|
None
|
schema_hints_path
|
Optional[str]
|
Optional separate file that provides the
|
None
|
watermark_base_path
|
Optional[str]
|
Root directory for watermark files.
Defaults to |
None
|
enable_cache
|
bool
|
Enable the in-memory cache. |
True
|
database_provider
¶
Database-backed metadata provider — SQLAlchemy 2.0.
DatabaseProvider reads connections, dataflows, schema hints and
watermarks from a relational database using SQLAlchemy 2.0 ORM sessions.
The database schema follows the dc_framework_* table convention
described in the architecture document (03-metadata-schema-design).
All queries are workspace-scoped and honour soft-delete
(deleted_at IS NULL).
DatabaseProvider
¶
DatabaseProvider(*, connection_string: Optional[str] = None, workspace_id: str, engine: Optional[Engine] = None, enable_cache: bool = True, pool_size: int = 10, max_overflow: int = 20, pool_pre_ping: bool = True, pool_timeout: int = 30, pool_recycle: int = 1800, warm_up_retries: int = 1, warm_up_delay: float = 15.0, eager_prefetch: bool = False)
Bases: BaseMetadataProvider
Metadata provider backed by a relational database (SQLAlchemy 2.0).
Supports any SQLAlchemy-compatible backend (PostgreSQL, SQLite, SQL Server, MySQL, etc.).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
connection_string
|
Optional[str]
|
SQLAlchemy database URL. |
None
|
workspace_id
|
str
|
Workspace scope for all queries. |
required |
engine
|
Optional[Engine]
|
Optional pre-built |
None
|
enable_cache
|
bool
|
Enable the in-memory metadata cache. |
True
|
pool_size
|
int
|
Number of persistent connections kept in the pool. |
10
|
max_overflow
|
int
|
Extra connections allowed beyond pool_size under
load. Combined with pool_size this caps total concurrent
connections at |
20
|
pool_pre_ping
|
bool
|
When |
True
|
warm_up_retries
|
int
|
Number of extra connection attempts on the very first
session open. Defaults to |
1
|
warm_up_delay
|
float
|
Seconds to wait between warm-up retry attempts. |
15.0
|
get_watermark
¶
Return the raw serialised watermark string for dataflow_id, or None.
update_watermark
¶
update_watermark(dataflow_id: str, watermark_value: str, *, job_id: Optional[str] = None, dataflow_run_id: Optional[str] = None) -> None
Upsert a watermark with atomic previous_value rotation.
Strategy (avoids SELECT … FOR UPDATE gap locks which cause
deadlocks on MySQL InnoDB under parallel load):
- Issue a single
UPDATEthat rotatesprevious_valuefrom the current value in one statement (SQL evaluates all RHS expressions from pre-update values). - If
rowcount == 0, the row does not yet exist →INSERT. - If the insert races with another worker and raises
:class:
IntegrityError, rollback and re-run theUPDATE.
Transient MySQL deadlocks / lock-wait timeouts (OperationalError)
are retried up to _WATERMARK_MAX_RETRIES times.
api_client
¶
REST API metadata provider — httpx.
APIClient fetches connections, dataflows, schema hints and watermarks
from a remote metadata API via HTTP.
Authentication is done via an X-API-Key header. The client handles
automatic pagination, retry on transient errors (429 / 5xx), and maps
all HTTP failures to MetadataError or WatermarkError.
APIClient
¶
APIClient(*, base_url: str, api_key: str, workspace_id: str, enable_cache: bool = True, timeout: float = 30.0, max_retries: int = 3, retry_backoff: float = 1.0, page_size: int = 200, max_workers: int = 8, eager_prefetch: bool = False)
Bases: BaseMetadataProvider
Metadata provider backed by a remote REST API.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
base_url
|
str
|
Root URL of the metadata API, including any path prefix
(e.g. |
required |
api_key
|
str
|
Value for the |
required |
workspace_id
|
str
|
Workspace scope for all requests. |
required |
enable_cache
|
bool
|
Enable the in-memory metadata cache. |
True
|
timeout
|
float
|
Request timeout in seconds (default |
30.0
|
max_retries
|
int
|
Number of retries on transient failures (default |
3
|
retry_backoff
|
float
|
Base backoff in seconds for exponential retry (default |
1.0
|
get_watermark
¶
Return the raw serialised watermark string for dataflow_id, or None.
The API may return current_value as a JSON string or as an already-parsed
dict. Either way, this method normalises the result to a JSON string so that
WatermarkManager can deserialize it uniformly.
update_watermark
¶
update_watermark(dataflow_id: str, watermark_value: str, *, job_id: Optional[str] = None, dataflow_run_id: Optional[str] = None) -> None
Update the watermark via the API.