Skip to content

Core

registry

Generic plugin registry with lazy discovery via entry points.

:class:PluginRegistry provides a thread-safe registry for classes (engines, platforms, readers, writers, transformers) that supports:

  • Manual registration via :meth:register
  • Lazy auto-discovery via importlib.metadata entry points
  • Instantiation via :meth:get

PluginRegistry

PluginRegistry(entry_point_group: str, base_class: type[T])

Bases: Generic[T]

Generic plugin registry with lazy discovery via entry points.

Type parameter T is the base class that all registered plugins must be a subclass of (e.g. BaseEngine, BasePlatform).

Parameters:

Name Type Description Default
entry_point_group str

The entry-point group name used for auto-discovery (e.g. "datacoolie.engines").

required
base_class type[T]

The base class that registered plugins must subclass.

required

clear_singletons

clear_singletons() -> None

Evict all cached singleton instances.

Useful in tests to reset state between runs.

get

get(name: str, **kwargs: object) -> T

Get a plugin instance by name.

On the first call, triggers entry-point discovery so that externally installed plugins are found automatically.

Parameters:

Name Type Description Default
name str

Plugin name.

required
**kwargs object

Constructor arguments forwarded to the plugin class.

{}

Returns:

Type Description
T

An instance of the requested plugin.

Raises:

Type Description
DataCoolieError

If no plugin is registered under name.

get_or_create

get_or_create(name: str, **kwargs: object) -> T

Get a cached singleton instance, creating it on first call.

Unlike :meth:get, subsequent calls with the same name return the same instance. Useful for stateless plugins (e.g. secret resolvers) where per-call instantiation is wasteful.

Parameters:

Name Type Description Default
name str

Plugin name.

required
**kwargs object

Constructor arguments forwarded to the plugin class on first creation only.

{}

Returns:

Type Description
T

A (possibly cached) instance of the requested plugin.

Raises:

Type Description
DataCoolieError

If no plugin is registered under name.

is_available

is_available(name: str) -> bool

Check if a plugin is registered (triggers discovery).

list_plugins

list_plugins() -> list[str]

List all registered plugin names (triggers discovery).

register

register(name: str, cls: type[T]) -> None

Manually register a plugin class.

Parameters:

Name Type Description Default
name str

Unique plugin name (e.g. "spark", "local").

required
cls type[T]

The plugin class (must be a subclass of base_class).

required

Raises:

Type Description
DataCoolieError

If cls is not a subclass of base_class.

unregister

unregister(name: str) -> None

Remove a previously registered plugin.

Useful for testing and dynamic plugin management.

Parameters:

Name Type Description Default
name str

Plugin name to remove.

required

Raises:

Type Description
DataCoolieError

If name is not registered.

exceptions

Exception hierarchy for the DataCoolie framework.

All framework exceptions inherit from :class:DataCoolieError which carries a human-readable message and an optional details dict for structured error context.

ConfigurationError

ConfigurationError(message: str, details: dict[str, Any] | None = None)

Bases: DataCoolieError

Raised for invalid or missing configuration.

DataCoolieError

DataCoolieError(message: str, details: dict[str, Any] | None = None)

Bases: Exception

Base exception for all DataCoolie errors.

Attributes:

Name Type Description
message

Human-readable error description.

details

Optional structured context for logging / debugging.

DataFlowError

DataFlowError(message: str, dataflow_id: str | None = None, stage: str | None = None, details: dict[str, Any] | None = None)

Bases: DataCoolieError

Raised for dataflow-level execution errors.

Attributes:

Name Type Description
dataflow_id

Identifier of the failed dataflow.

stage

Pipeline stage where the error occurred.

DestinationError

DestinationError(message: str, details: dict[str, Any] | None = None)

Bases: DataCoolieError

Raised when writing to a destination fails.

EngineError

EngineError(message: str, details: dict[str, Any] | None = None)

Bases: DataCoolieError

Raised for engine-level failures (Spark, Polars).

MetadataError

MetadataError(message: str, details: dict[str, Any] | None = None)

Bases: DataCoolieError

Raised for metadata loading / parsing failures.

PipelineError

PipelineError(message: str, partial_result: Any = None, details: dict[str, Any] | None = None)

Bases: DataCoolieError

Raised when a pipeline step fails, carrying partial runtime results.

Attributes:

Name Type Description
partial_result

The incomplete result tuple collected before the failure.

PlatformError

PlatformError(message: str, details: dict[str, Any] | None = None)

Bases: DataCoolieError

Raised for platform-specific failures (Fabric, Databricks, AWS).

SourceError

SourceError(message: str, details: dict[str, Any] | None = None)

Bases: DataCoolieError

Raised when reading from a data source fails.

TransformError

TransformError(message: str, details: dict[str, Any] | None = None)

Bases: DataCoolieError

Raised when a transformation step fails.

WatermarkError

WatermarkError(message: str, details: dict[str, Any] | None = None)

Bases: DataCoolieError

Raised for watermark read / write / parse failures.

secret_provider

Secret provider abstraction and resolution utilities.

:class:BaseSecretProvider defines the contract for fetching secrets from platform-specific vaults. :func:resolve_secrets wires a provider (or any :class:~datacoolie.core.secret_resolver.BaseSecretResolver) into a :class:Connection by replacing vault key references with real values.

BaseSecretProvider

BaseSecretProvider(cache_ttl: int = 300, **kwargs: Any)

Bases: ABC

Abstract base class for secret providers.

Implementations fetch secrets from a platform-specific vault (Azure Key Vault, AWS Secrets Manager, Databricks Secrets, env vars).

Supports optional in-memory caching with a configurable TTL.

Parameters:

Name Type Description Default
cache_ttl int

Time-to-live for cached secrets in seconds. 0 disables caching. Default is 300 (5 minutes).

300

clear_cache

clear_cache() -> None

Evict all cached entries.

get_secret

get_secret(key: str, source: str = '') -> str

Fetch a single secret by key from the given source.

Uses the cache when available and not expired, otherwise delegates to :meth:_fetch_secret.

Parameters:

Name Type Description Default
key str

Vault key / secret name. For AWS this is the JSON field name within the secret; for other platforms it is the secret key within the scope / vault.

required
source str

Source identifier whose meaning depends on the platform: scope for Databricks, vault URL for Fabric, env prefix for Local, and secret name/ARN (SecretId) for AWS. Defaults to "".

''

Raises:

Type Description
DataCoolieError

If the secret cannot be retrieved.

get_secrets

get_secrets(keys: List[str], source: str = '') -> Dict[str, str]

Fetch multiple secrets by keys from the given source.

Parameters:

Name Type Description Default
keys List[str]

Secret key names (JSON field names for AWS).

required
source str

Source identifier (scope, vault URL, env prefix, or secret name/ARN for AWS). Defaults to "".

''

Returns:

Type Description
Dict[str, str]

Mapping of key → secret value.

get_registered_secret_values

get_registered_secret_values() -> frozenset[str]

Return a snapshot of all registered secret values.

register_secret_values

register_secret_values(*values: str) -> None

Register resolved secret values so the log filter can scrub them.

resolve_secrets

resolve_secrets(connection: Connection, provider: BaseSecretProvider, *, resolver_lookup: ResolverLookup | None = None) -> None

Resolve secret references in connection.

connection.secrets_ref maps each source (scope / vault URL / prefix / SecretId) to a list of configure field names. Each listed field must already be present in configure with the vault key name as its value; resolve_secrets replaces that value with the real secret.

Source keys may contain a <resolver>:<arg> prefix (e.g. "env:APP_"). When resolver_lookup is provided and returns a resolver for that prefix, the plugin resolver is used. Otherwise the platform-native provider handles the request via :class:~datacoolie.core.secret_resolver.NativeProviderResolver.

Also registers the resolved values with the global log-masking set.

Parameters:

Name Type Description Default
connection Connection

A :class:~datacoolie.core.models.Connection instance.

required
provider BaseSecretProvider

The platform-native secret provider.

required
resolver_lookup ResolverLookup | None

Optional callable that maps a prefix string to a :class:~datacoolie.core.secret_resolver.BaseSecretResolver, or returns None if the prefix is unrecognized.

None

Raises:

Type Description
DataCoolieError

If secrets_ref is invalid, a listed field is absent from configure, or a secret cannot be retrieved.

secret_resolver

Pluggable secret resolver abstraction.

:class:BaseSecretResolver defines the single plugin contract for fetching secrets — whether from environment variables, a cloud vault, HashiCorp Vault, 1Password, or any other backend.

Resolvers are registered via :data:datacoolie.resolver_registry and dispatched by prefix in secrets_ref source keys ("env:APP_").

Sources without a prefix use the platform-native provider via :class:NativeProviderResolver.

BaseSecretResolver

Bases: ABC

Abstract base class for pluggable secret resolvers.

This is the single interface that every secret backend implements. Subclasses must implement :meth:resolve which fetches a single secret value given a key and a source argument.

resolve abstractmethod

resolve(key: str, source: str) -> str

Fetch a single secret.

Parameters:

Name Type Description Default
key str

The secret / field name (value from configure).

required
source str

Backend-specific source identifier. For prefixed sources ("env:APP_"), this is the portion after the :. For native providers, this is the full source key from secrets_ref.

required

Returns:

Type Description
str

The secret value as a string.

Raises:

Type Description
DataCoolieError

If the secret cannot be retrieved.

EnvResolver

Bases: BaseSecretResolver

Resolve secrets from environment variables.

The environment variable name is {source}{key}. For example with source "APP_" and key "DB_PASSWORD" this looks up os.environ["APP_DB_PASSWORD"].

NativeProviderResolver

NativeProviderResolver(provider: BaseSecretProvider, **_: object)

Bases: BaseSecretResolver

Adapter that wraps a :class:BaseSecretProvider as a resolver.

This bridges the existing platform secret providers (Local, Fabric, Databricks, AWS) into the unified :class:BaseSecretResolver interface so that resolve_secrets needs only one code path.

parse_source

parse_source(source: str, fallback: BaseSecretResolver, resolver_lookup: ResolverLookup | None = None) -> tuple[BaseSecretResolver, str]

Parse an optional <prefix>:<arg> source key.

Parameters:

Name Type Description Default
source str

The raw source key from secrets_ref (e.g. "env:APP_" or "my-vault-scope").

required
fallback BaseSecretResolver

The resolver to use when source has no recognized prefix (the :class:NativeProviderResolver in practice).

required
resolver_lookup ResolverLookup | None

Optional callable that maps a prefix string to a :class:BaseSecretResolver instance, or None if unrecognized.

None

Returns:

Type Description
BaseSecretResolver

A (resolver, source_arg) tuple ready for

str

resolver.resolve(key, source_arg).