Skip to content

Logging

base

Base logging infrastructure for the DataCoolie framework.

Provides:

  • :class:LogManager — singleton that configures Python logging with a capture handler for later persistence to datalake.
  • :class:CaptureHandler — a :class:logging.Handler that buffers :class:LogRecord objects in memory or a temp file.
  • :class:BaseLogger — ABC for persistent loggers (system, ETL).
  • :class:LogConfig — configuration dataclass.
  • :func:get_logger — module-level convenience to create child loggers.

Usage::

from datacoolie.logging.base import get_logger

logger = get_logger(__name__)
logger.info("Processing started")

BaseLogger

BaseLogger(config: LogConfig, platform: Optional[BasePlatform] = None)

Bases: ABC

Abstract base for persistent loggers (system, ETL).

Provides configuration, lifecycle management, and context-manager support. Subclasses implement :meth:flush.

close

close() -> None

Flush and release resources.

flush abstractmethod

flush() -> None

Flush buffered entries to persistent storage.

CaptureHandler

CaptureHandler(level: int = DEBUG, storage_mode: str = MEMORY.value)

Bases: Handler

Captures Python log records for later persistence.

get_jsonl_logs

get_jsonl_logs() -> str

Return all captured records as newline-delimited JSON.

DataflowContextFilter

Bases: Filter

Inject the current dataflow_id from :mod:contextvars into every log record.

Attach to handlers (not loggers) so it applies to all propagated messages.

LogConfig dataclass

LogConfig(log_level: str = INFO.value, storage_mode: str = MEMORY.value, output_path: Optional[str] = None, partition_by_date: bool = True, partition_pattern: str = DEFAULT_PARTITION_PATTERN, flush_interval_seconds: int = 60)

Configuration dataclass for loggers.

LogLevel

Bases: str, Enum

Standard logging levels.

LogManager

LogManager()

Singleton that configures Python logging with capture support.

configure

configure(level: str = INFO.value, capture_logs: bool = True, storage_mode: str = MEMORY.value, console_output: bool = True, format_string: Optional[str] = None, force: bool = False) -> None

Configure the global logging system.

If already configured, this is a no-op unless force is True. Pass force=True (as SystemLogger does) to apply new settings and replace existing handlers.

Parameters:

Name Type Description Default
level str

Minimum log level.

INFO.value
capture_logs bool

Enable :class:CaptureHandler.

True
storage_mode str

"memory" or "file".

MEMORY.value
console_output bool

Emit to stderr.

True
format_string Optional[str]

Custom logging.Formatter pattern.

None
force bool

Re-configure even if already configured.

False

get_captured_jsonl_logs

get_captured_jsonl_logs() -> str

Return captured logs as newline-delimited JSON.

get_logger

get_logger(name: str) -> Logger

Create (or reuse) a child logger under the framework root.

reset classmethod

reset() -> None

Reset the singleton (primarily for testing).

LogRecord dataclass

LogRecord(timestamp: datetime, level: str, logger_name: str, message: str, module: Optional[str] = None, func_name: Optional[str] = None, line_no: Optional[int] = None, exc_info: Optional[str] = None, dataflow_id: Optional[str] = None)

Captured log entry.

from_dict classmethod

from_dict(d: Dict[str, Any]) -> 'LogRecord'

Reconstruct a LogRecord from a dict produced by :meth:to_dict.

to_dict

to_dict() -> Dict[str, Any]

Serialize to a JSON-compatible dictionary.

SensitiveValueFilter

Bases: Filter

Scrub resolved secret values from log messages.

Works with the global set maintained by :func:~datacoolie.core.secret_provider.register_secret_values.

StorageMode

Bases: str, Enum

Temporary storage mode for log buffering.

format_partition_path

format_partition_path(base_path: str, run_date: Optional[datetime] = None, pattern: str = DEFAULT_PARTITION_PATTERN) -> str

Append a partition folder to base_path using pattern.

Supported placeholders: {year}, {month}, {day}, {hour}.

get_logger

get_logger(name: str) -> Logger

Get a framework logger (convenience wrapper).

All loggers are children of the DataCoolie root logger and inherit its handlers (console + capture).

Parameters:

Name Type Description Default
name str

Typically __name__.

required

Returns:

Name Type Description
Configured Logger

class:logging.Logger.

etl_logger

Structured ETL execution logger.

ETLLogger accumulates dataflow / maintenance runtime entries across one or more driver.run() invocations within a single job session and writes them once when :meth:close is called.

Usage pattern::

lgr = create_etl_logger(output_path="logs/", job_id="job-1", platform=plat)
driver.run(stage="bronze")
driver.run(stage="silver")
driver.run_maintenance()
lgr.close()            # single write of all accumulated data

Outputs

Debug JSONL A single JSONL file per session (appended to on each periodic flush). Per-dataflow entries + job summary as the last line. Datetime values are serialised as ISO-8601 strings.

Analyst Parquet (requires pyarrow) Two Parquet files per session, each with an explicit PyArrow schema:

* ``dataflow_<stem>.parquet`` — one row per dataflow/maintenance execution.
* ``job_summary_<stem>.parquet`` — a single-row job aggregate.

Datetime columns use ``timestamp[us, tz=UTC]`` for native query support.

Partition layout::

<output_path>/<purpose>/<log_type>/run_date=yyyy-mm-dd/<filename>

ETLLogger

ETLLogger(config: LogConfig, platform: Optional[BasePlatform] = None)

Bases: BaseLogger

Structured ETL execution logger — accumulate-then-flush.

All :meth:log calls accumulate entries in memory. A single :class:~datacoolie.core.models.JobRuntimeInfo tracks session-level aggregates. On :meth:flush (called automatically by :meth:close) the logger writes debug JSONL and analyst Parquet in one shot.

flush

flush() -> None

Write all accumulated logs to storage (no-op if nothing logged).

log

log(dataflow: DataFlow, runtime_info: DataFlowRuntimeInfo) -> None

Record one dataflow or maintenance execution result.

Stream-writes a JSONL line to a local temp file and updates the session-level :attr:_job_info counters.

create_etl_logger

create_etl_logger(output_path: Optional[str] = None, platform: Optional[BasePlatform] = None) -> ETLLogger

Create an :class:ETLLogger with common configuration.

system_logger

System logger — captures and persists Python log output.

SystemLogger is not a logger itself; it configures the global :class:LogManager to capture logs, then on :meth:flush / :meth:close uploads the captured content to datalake storage via the platform.

Usage::

from datacoolie.logging.base import get_logger
from datacoolie.logging.system_logger import SystemLogger

logger = get_logger(__name__)

with SystemLogger(config, platform) as log_mgr:
    logger.info("Processing started")
# logs uploaded on close

SystemLogger

SystemLogger(config: LogConfig, platform: Optional[BasePlatform] = None)

Bases: BaseLogger

Captures all framework Python logs and persists them to storage.

On initialization it re-configures the global :class:LogManager to capture logs. On :meth:flush the captured content is written as a single text file to the platform.

Parameters:

Name Type Description Default
config LogConfig

Logging configuration (output_path, level, etc.).

required
platform Optional[BasePlatform]

Platform for file operations.

None

flush

flush() -> None

Write captured logs to storage as JSONL.

create_system_logger

create_system_logger(output_path: Optional[str] = None, log_level: str = INFO.value, platform: Optional[BasePlatform] = None, storage_mode: str = MEMORY.value) -> SystemLogger

Factory for :class:SystemLogger.

context

Thread-safe dataflow context propagation via :mod:contextvars.

Stores the current dataflow_id in a :class:contextvars.ContextVar so that every log record emitted on the same thread automatically includes it — without any changes to the 28+ modules that call logger.info(…).

Usage in driver code::

token = set_dataflow_id(dataflow.dataflow_id)
try:
    ...  # all logging here will carry the dataflow_id
finally:
    clear_dataflow_id(token)

clear_dataflow_id

clear_dataflow_id(token: Token[str]) -> None

Restore the previous dataflow ID value.

get_dataflow_id

get_dataflow_id() -> str

Return the current dataflow ID (empty string when unset).

set_dataflow_id

set_dataflow_id(dataflow_id: str) -> Token[str]

Set the current dataflow ID and return a reset token.