Skip to content

Logging

base

Base logging infrastructure for the DataCoolie framework.

Provides:

  • :class:LogManager — singleton that configures Python logging with a capture handler for later persistence to datalake.
  • :class:CaptureHandler — a :class:logging.Handler that buffers :class:LogRecord objects in memory or a temp file.
  • :class:BaseLogger — ABC for persistent loggers (system, ETL).
  • :class:LogConfig — configuration dataclass.
  • :func:get_logger — module-level convenience to create child loggers.

Usage::

from datacoolie.logging.base import get_logger

logger = get_logger(__name__)
logger.info("Processing started")

BaseLogger

BaseLogger(config: LogConfig, platform: Optional[BasePlatform] = None)

Bases: ABC

Abstract base for persistent loggers (system, ETL).

Provides configuration, lifecycle management, periodic flush timer, and context-manager support. Subclasses implement :meth:flush and optionally override :meth:_on_periodic_flush for incremental behavior.

The periodic timer is started automatically when flush_interval_seconds is positive and both output_path and platform are configured.

close

close() -> None

Flush and release resources.

flush abstractmethod

flush() -> None

Flush buffered entries to persistent storage.

CaptureHandler

CaptureHandler(level: int = DEBUG, storage_mode: str = MEMORY.value)

Bases: Handler

Captures Python log records for later persistence.

Uses the handler's built-in self.lock (RLock) for thread safety — no separate lock needed since logging.Handler.handle() already acquires it before calling :meth:emit.

get_and_clear_formatted_logs

get_and_clear_formatted_logs(include_location: bool = False) -> str

Atomically drain captured logs and return as formatted text.

Minimises lock hold time: swaps/clears the buffer under the lock, then formats the text outside the lock.

DataflowContextFilter

Bases: Filter

Inject the current dataflow_id from :mod:contextvars into every log record.

Attach to handlers (not loggers) so it applies to all propagated messages.

LogConfig dataclass

LogConfig(log_level: str = INFO.value, file_level: str = DEBUG.value, storage_mode: str = MEMORY.value, output_path: Optional[str] = None, partition_by_date: bool = True, partition_pattern: str = DEFAULT_PARTITION_PATTERN, flush_interval_seconds: int = 60)

Configuration dataclass for loggers.

LogLevel

Bases: str, Enum

Standard logging levels.

LogManager

LogManager()

Singleton that configures Python logging with capture support.

configure

configure(level: str = INFO.value, file_level: Optional[str] = None, capture_logs: bool = True, storage_mode: str = MEMORY.value, console_output: bool = True, format_string: Optional[str] = None, force: bool = False) -> None

Configure the global logging system.

If already configured, this is a no-op unless force is True. Pass force=True (as SystemLogger does) to apply new settings and replace existing handlers.

Parameters:

Name Type Description Default
level str

Console log level (controls what is printed to stderr).

INFO.value
file_level Optional[str]

Capture log level for file persistence. Defaults to level when not provided. Set to "DEBUG" to capture all framework messages regardless of the console level.

None
capture_logs bool

Enable :class:CaptureHandler.

True
storage_mode str

"memory" or "file".

MEMORY.value
console_output bool

Emit to stderr.

True
format_string Optional[str]

Custom logging.Formatter pattern.

None
force bool

Re-configure even if already configured.

False

get_and_clear_captured_logs

get_and_clear_captured_logs(include_location: bool = False) -> str

Atomically return captured logs as plain text and clear the buffer.

get_logger

get_logger(name: str) -> Logger

Create (or reuse) a child logger under the framework root.

reset classmethod

reset() -> None

Reset the singleton (primarily for testing).

LogRecord dataclass

LogRecord(timestamp: datetime, level: str, logger_name: str, message: str, module: Optional[str] = None, func_name: Optional[str] = None, line_no: Optional[int] = None, exc_info: Optional[str] = None, dataflow_id: Optional[str] = None)

Captured log entry.

from_dict classmethod

from_dict(d: Dict[str, Any]) -> 'LogRecord'

Reconstruct a LogRecord from a dict produced by :meth:to_dict.

to_dict

to_dict() -> Dict[str, Any]

Serialize to a JSON-compatible dictionary.

StorageMode

Bases: str, Enum

Temporary storage mode for log buffering.

format_partition_path

format_partition_path(base_path: str, run_date: Optional[datetime] = None, pattern: str = DEFAULT_PARTITION_PATTERN) -> str

Append a partition folder to base_path using pattern.

Supported placeholders: {year}, {month}, {day}, {hour}.

get_logger

get_logger(name: str) -> Logger

Get a framework logger (convenience wrapper).

All loggers are children of the DataCoolie root logger and inherit its handlers (console + capture).

Parameters:

Name Type Description Default
name str

Typically __name__.

required

Returns:

Name Type Description
Configured Logger

class:logging.Logger.

etl_logger

Structured ETL execution logger.

ETLLogger accumulates dataflow / maintenance runtime entries across one or more driver.run() invocations within a single job session and writes them once when :meth:close is called.

Usage pattern::

lgr = create_etl_logger(output_path="logs/", job_id="job-1", platform=plat)
driver.run(stage="bronze")
driver.run(stage="silver")
driver.run_maintenance()
lgr.close()            # single write of all accumulated data

Outputs

Debug JSONL A single JSONL file per session (appended to on each periodic flush). Per-dataflow entries + job summary as the last line. Datetime values are serialised as ISO-8601 strings.

Analyst Parquet (requires pyarrow) Two Parquet files per session, each with an explicit PyArrow schema:

* ``dataflow_<stem>.parquet`` — one row per dataflow/maintenance execution.
* ``job_summary_<stem>.parquet`` — a single-row job aggregate.

Datetime columns use ``timestamp[us, tz=UTC]`` for native query support.

Partition layout::

<output_path>/<purpose>/<log_type>/run_date=yyyy-mm-dd/<filename>

ETLLogger

ETLLogger(config: LogConfig, platform: Optional[BasePlatform] = None)

Bases: BaseLogger

Structured ETL execution logger — accumulate-then-flush.

All :meth:log calls accumulate entries in memory. A single :class:~datacoolie.core.models.JobRuntimeInfo tracks session-level aggregates. On :meth:flush (called automatically by :meth:close) the logger appends debug JSONL and writes analyst Parquet.

flush

flush() -> None

Write all accumulated logs to storage (no-op if nothing logged).

log

log(dataflow: DataFlow, runtime_info: DataFlowRuntimeInfo) -> None

Record one dataflow or maintenance execution result.

Stream-writes a JSONL line to a local temp file and updates the session-level :attr:_job_info counters.

create_etl_logger

create_etl_logger(output_path: Optional[str] = None, platform: Optional[BasePlatform] = None) -> ETLLogger

Create an :class:ETLLogger with common configuration.

system_logger

System logger — captures and persists Python log output.

SystemLogger is not a logger itself; it configures the global :class:LogManager to capture logs and periodically appends them to a plain-text .log file in storage via the platform.

Two independent log levels are supported:

  • log_level — controls the console stream (what operators see live).
  • file_level — controls what is captured to the file (defaults to DEBUG, capturing everything regardless of console level).

Flushing is periodic (background timer) plus a final flush on :meth:close. Each flush appends only new captured records to the remote file via :meth:~datacoolie.platforms.base.BasePlatform.append_file, which allows crash-safe incremental persistence without re-uploading the whole file.

Usage::

from datacoolie.logging.base import get_logger
from datacoolie.logging.system_logger import SystemLogger

logger = get_logger(__name__)

with SystemLogger(config, platform) as sys_log:
    logger.info("Processing started")   # printed at INFO, captured at DEBUG+
# all remaining logs appended on close

SystemLogger

SystemLogger(config: LogConfig, platform: Optional[BasePlatform] = None)

Bases: BaseLogger

Captures all framework Python logs and appends them to a .log file.

The file is a plain-text log (one line per record) so operators can read it directly without a JSON parser. The capture level defaults to DEBUG so every framework message is recorded regardless of the console level set by the Driver.

Periodic flushing is driven by the :class:BaseLogger daemon timer. Each tick atomically drains the in-memory capture buffer and appends the text to the remote file via :meth:~BasePlatform.append_file. A final flush is performed on :meth:close.

Parameters:

Name Type Description Default
config LogConfig

Logging configuration (output_path, log_level, file_level, flush_interval_seconds, etc.).

required
platform Optional[BasePlatform]

Platform for file operations.

None

flush

flush() -> None

Flush remaining captured logs to storage.

create_system_logger

create_system_logger(output_path: Optional[str] = None, log_level: str = INFO.value, file_level: str = DEBUG.value, platform: Optional[BasePlatform] = None, storage_mode: str = MEMORY.value) -> SystemLogger

Factory for :class:SystemLogger.

Parameters:

Name Type Description Default
output_path Optional[str]

Base directory for the log file.

None
log_level str

Console stream level (default INFO).

INFO.value
file_level str

Capture / file level (default DEBUG — records everything regardless of console level).

DEBUG.value
platform Optional[BasePlatform]

Platform for file I/O.

None
storage_mode str

In-memory or file-backed capture buffer.

MEMORY.value

context

Thread-safe dataflow context propagation via :mod:contextvars.

Stores the current dataflow_id in a :class:contextvars.ContextVar so that every log record emitted on the same thread automatically includes it — without any changes to the 28+ modules that call logger.info(…).

Usage in driver code::

token = set_dataflow_id(dataflow.dataflow_id)
try:
    ...  # all logging here will carry the dataflow_id
finally:
    clear_dataflow_id(token)

clear_dataflow_id

clear_dataflow_id(token: Token[str]) -> None

Restore the previous dataflow ID value.

get_dataflow_id

get_dataflow_id() -> str

Return the current dataflow ID (empty string when unset).

set_dataflow_id

set_dataflow_id(dataflow_id: str) -> Token[str]

Set the current dataflow ID and return a reset token.