Skip to content

Core

Model classes are documented on the Metadata schema page

Connection, Source, Destination, Transform, DataFlow, ReplayConfig, DataCoolieRunConfig, and supporting types are rendered from source on the Metadata schema reference page to avoid duplicate anchors.

registry

Generic plugin registry with lazy discovery via entry points.

:class:PluginRegistry provides a thread-safe registry for classes (engines, platforms, readers, writers, transformers) that supports:

  • Manual registration via :meth:register
  • Lazy auto-discovery via importlib.metadata entry points
  • Instantiation via :meth:get

PluginRegistry

PluginRegistry(entry_point_group: str, base_class: type[T])

Bases: Generic[T]

Generic plugin registry with lazy discovery via entry points.

Type parameter T is the base class that all registered plugins must be a subclass of (e.g. BaseEngine, BasePlatform).

Parameters:

Name Type Description Default
entry_point_group str

The entry-point group name used for auto-discovery (e.g. "datacoolie.engines").

required
base_class type[T]

The base class that registered plugins must subclass.

required

clear_singletons

clear_singletons() -> None

Evict all cached singleton instances.

Useful in tests to reset state between runs.

get

get(name: str, **kwargs: object) -> T

Get a plugin instance by name.

On the first call, triggers entry-point discovery so that externally installed plugins are found automatically.

Parameters:

Name Type Description Default
name str

Plugin name.

required
**kwargs object

Constructor arguments forwarded to the plugin class.

{}

Returns:

Type Description
T

An instance of the requested plugin.

Raises:

Type Description
DataCoolieError

If no plugin is registered under name.

get_or_create

get_or_create(name: str, **kwargs: object) -> T

Get a cached singleton instance, creating it on first call.

Unlike :meth:get, subsequent calls with the same name return the same instance. Useful for stateless plugins (e.g. secret resolvers) where per-call instantiation is wasteful.

Parameters:

Name Type Description Default
name str

Plugin name.

required
**kwargs object

Constructor arguments forwarded to the plugin class on first creation only.

{}

Returns:

Type Description
T

A (possibly cached) instance of the requested plugin.

Raises:

Type Description
DataCoolieError

If no plugin is registered under name.

is_available

is_available(name: str) -> bool

Check if a plugin is registered (triggers discovery).

list_plugins

list_plugins() -> list[str]

List all registered plugin names (triggers discovery).

register

register(name: str, cls: type[T]) -> None

Manually register a plugin class.

Parameters:

Name Type Description Default
name str

Unique plugin name (e.g. "spark", "local").

required
cls type[T]

The plugin class (must be a subclass of base_class).

required

Raises:

Type Description
DataCoolieError

If cls is not a subclass of base_class.

unregister

unregister(name: str) -> None

Remove a previously registered plugin.

Useful for testing and dynamic plugin management.

Parameters:

Name Type Description Default
name str

Plugin name to remove.

required

Raises:

Type Description
DataCoolieError

If name is not registered.

exceptions

Exception hierarchy for the DataCoolie framework.

All framework exceptions inherit from :class:DataCoolieError which carries a human-readable message and an optional details dict for structured error context.

ConfigurationError

ConfigurationError(message: str, details: dict[str, Any] | None = None)

Bases: DataCoolieError

Raised for invalid or missing configuration.

DataCoolieError

DataCoolieError(message: str, details: dict[str, Any] | None = None)

Bases: Exception

Base exception for all DataCoolie errors.

Attributes:

Name Type Description
message

Human-readable error description.

details

Optional structured context for logging / debugging.

DataFlowError

DataFlowError(message: str, dataflow_id: str | None = None, stage: str | None = None, details: dict[str, Any] | None = None)

Bases: DataCoolieError

Raised for dataflow-level execution errors.

Attributes:

Name Type Description
dataflow_id

Identifier of the failed dataflow.

stage

Pipeline stage where the error occurred.

DestinationError

DestinationError(message: str, details: dict[str, Any] | None = None)

Bases: DataCoolieError

Raised when writing to a destination fails.

EngineError

EngineError(message: str, details: dict[str, Any] | None = None)

Bases: DataCoolieError

Raised for engine-level failures (Spark, Polars).

MetadataError

MetadataError(message: str, details: dict[str, Any] | None = None)

Bases: DataCoolieError

Raised for metadata loading / parsing failures.

PipelineError

PipelineError(message: str, partial_result: Any = None, details: dict[str, Any] | None = None)

Bases: DataCoolieError

Raised when a pipeline step fails, carrying partial runtime results.

Attributes:

Name Type Description
partial_result

The incomplete result tuple collected before the failure.

PlatformError

PlatformError(message: str, details: dict[str, Any] | None = None)

Bases: DataCoolieError

Raised for platform-specific failures (Fabric, Databricks, AWS).

SourceError

SourceError(message: str, details: dict[str, Any] | None = None)

Bases: DataCoolieError

Raised when reading from a data source fails.

TransformError

TransformError(message: str, details: dict[str, Any] | None = None)

Bases: DataCoolieError

Raised when a transformation step fails.

WatermarkError

WatermarkError(message: str, details: dict[str, Any] | None = None)

Bases: DataCoolieError

Raised for watermark read / write / parse failures.

secret_provider

Secret provider abstraction and resolution utilities.

:class:BaseSecretProvider defines the contract for fetching secrets from platform-specific vaults. :func:resolve_secrets wires a provider (or any :class:~datacoolie.core.secret_resolver.BaseSecretResolver) into a :class:Connection by replacing vault key references with real values.

:class:SecretStr is an opaque wrapper that prevents accidental exposure of secret values through str(), repr(), print(), f-strings, and tracebacks. The raw value is never exposed via a public method. Framework internals use :func:unwrap_secret / :func:unwrap_configure to obtain plain strings at I/O boundaries (HTTP auth, JDBC, etc.).

BaseSecretProvider

BaseSecretProvider(cache_ttl: int = 300, **kwargs: Any)

Bases: ABC

Abstract base class for secret providers.

Implementations fetch secrets from a platform-specific vault (Azure Key Vault, AWS Secrets Manager, Databricks Secrets, env vars).

Supports optional in-memory caching with a configurable TTL.

Parameters:

Name Type Description Default
cache_ttl int

Time-to-live for cached secrets in seconds. 0 disables caching. Default is 300 (5 minutes).

300

clear_cache

clear_cache() -> None

Evict all cached entries.

get_secret

get_secret(key: str, source: str = '') -> str

Fetch a single secret by key from the given source.

Uses the cache when available and not expired, otherwise delegates to :meth:_fetch_secret.

Parameters:

Name Type Description Default
key str

Vault key / secret name. For AWS this is the JSON field name within the secret; for other platforms it is the secret key within the scope / vault.

required
source str

Source identifier whose meaning depends on the platform: scope for Databricks, vault URL for Fabric, env prefix for Local, and secret name/ARN (SecretId) for AWS. Defaults to "".

''

Raises:

Type Description
DataCoolieError

If the secret cannot be retrieved.

get_secrets

get_secrets(keys: List[str], source: str = '') -> Dict[str, str]

Fetch multiple secrets by keys from the given source.

Parameters:

Name Type Description Default
keys List[str]

Secret key names (JSON field names for AWS).

required
source str

Source identifier (scope, vault URL, env prefix, or secret name/ARN for AWS). Defaults to "".

''

Returns:

Type Description
Dict[str, str]

Mapping of key → secret value.

SecretStr

SecretStr(value: str)

Opaque wrapper that hides secret values from all public access.

str(), repr(), print(), format(), and logging all render *** instead of the real value.

There is no public method to extract the underlying string. Framework internals access the value via :func:unwrap_secret or :func:unwrap_configure.

The wrapper is immutable, unhashable-by-value, and unpicklable to prevent accidental persistence or cache-key leaks.

resolve_secrets

resolve_secrets(connection: Connection, provider: BaseSecretProvider, *, resolver_lookup: ResolverLookup | None = None) -> None

Resolve secret references in connection.

connection.secrets_ref maps each source (scope / vault URL / prefix / SecretId) to a list of configure field names. Each listed field must already be present in configure with the vault key name as its value; resolve_secrets replaces that value with the real secret.

Source keys may contain a <resolver>:<arg> prefix (e.g. "env:APP_"). When resolver_lookup is provided and returns a resolver for that prefix, the plugin resolver is used. Otherwise the platform-native provider handles the request via :class:~datacoolie.core.secret_resolver.NativeProviderResolver.

Parameters:

Name Type Description Default
connection Connection

A :class:~datacoolie.core.models.Connection instance.

required
provider BaseSecretProvider

The platform-native secret provider.

required
resolver_lookup ResolverLookup | None

Optional callable that maps a prefix string to a :class:~datacoolie.core.secret_resolver.BaseSecretResolver, or returns None if the prefix is unrecognized.

None

Raises:

Type Description
DataCoolieError

If secrets_ref is invalid, a listed field is absent from configure, or a secret cannot be retrieved.

unwrap_configure

unwrap_configure(configure: Dict[str, Any]) -> Dict[str, Any]

Return a shallow copy with all :class:SecretStr values unwrapped.

Call this once at the I/O boundary (HTTP auth, JDBC connect, etc.) to obtain a plain-string dict for external libraries. The original configure dict retains its SecretStr wrappers.

unwrap_secret

unwrap_secret(value: Any) -> Any

Return the raw string if value is a :class:SecretStr, else pass through.

secret_resolver

Pluggable secret resolver abstraction.

:class:BaseSecretResolver defines the single plugin contract for fetching secrets — whether from environment variables, a cloud vault, HashiCorp Vault, 1Password, or any other backend.

Resolvers are registered via :data:datacoolie.resolver_registry and dispatched by prefix in secrets_ref source keys ("env:APP_").

Sources without a prefix use the platform-native provider via :class:NativeProviderResolver.

BaseSecretResolver

Bases: ABC

Abstract base class for pluggable secret resolvers.

This is the single interface that every secret backend implements. Subclasses must implement :meth:resolve which fetches a single secret value given a key and a source argument.

resolve abstractmethod

resolve(key: str, source: str) -> str

Fetch a single secret.

Parameters:

Name Type Description Default
key str

The secret / field name (value from configure).

required
source str

Backend-specific source identifier. For prefixed sources ("env:APP_"), this is the portion after the :. For native providers, this is the full source key from secrets_ref.

required

Returns:

Type Description
str

The secret value as a string.

Raises:

Type Description
DataCoolieError

If the secret cannot be retrieved.

EnvResolver

Bases: BaseSecretResolver

Resolve secrets from environment variables.

The environment variable name is {source}{key}. For example with source "APP_" and key "DB_PASSWORD" this looks up os.environ["APP_DB_PASSWORD"].

NativeProviderResolver

NativeProviderResolver(provider: BaseSecretProvider, **_: object)

Bases: BaseSecretResolver

Adapter that wraps a :class:BaseSecretProvider as a resolver.

This bridges the existing platform secret providers (Local, Fabric, Databricks, AWS) into the unified :class:BaseSecretResolver interface so that resolve_secrets needs only one code path.

parse_source

parse_source(source: str, fallback: BaseSecretResolver, resolver_lookup: ResolverLookup | None = None) -> tuple[BaseSecretResolver, str]

Parse an optional <prefix>:<arg> source key.

Parameters:

Name Type Description Default
source str

The raw source key from secrets_ref (e.g. "env:APP_" or "my-vault-scope").

required
fallback BaseSecretResolver

The resolver to use when source has no recognized prefix (the :class:NativeProviderResolver in practice).

required
resolver_lookup ResolverLookup | None

Optional callable that maps a prefix string to a :class:BaseSecretResolver instance, or None if unrecognized.

None

Returns:

Type Description
BaseSecretResolver

A (resolver, source_arg) tuple ready for

str

resolver.resolve(key, source_arg).