Logging¶
base
¶
Base logging infrastructure for the DataCoolie framework.
Provides:
- :class:
LogManager— singleton that configures Python logging with a capture handler for later persistence to datalake. - :class:
CaptureHandler— a :class:logging.Handlerthat buffers :class:LogRecordobjects in memory or a temp file. - :class:
BaseLogger— ABC for persistent loggers (system, ETL). - :class:
LogConfig— configuration dataclass. - :func:
get_logger— module-level convenience to create child loggers.
Usage::
from datacoolie.logging.base import get_logger
logger = get_logger(__name__)
logger.info("Processing started")
BaseLogger
¶
BaseLogger(config: LogConfig, platform: Optional[BasePlatform] = None)
Bases: ABC
Abstract base for persistent loggers (system, ETL).
Provides configuration, lifecycle management, periodic flush timer,
and context-manager support. Subclasses implement :meth:flush and
optionally override :meth:_on_periodic_flush for incremental behavior.
The periodic timer is started automatically when flush_interval_seconds is positive and both output_path and platform are configured.
CaptureHandler
¶
Bases: Handler
Captures Python log records for later persistence.
Uses the handler's built-in self.lock (RLock) for thread safety —
no separate lock needed since logging.Handler.handle() already
acquires it before calling :meth:emit.
get_and_clear_formatted_logs
¶
Atomically drain captured logs and return as formatted text.
Minimises lock hold time: swaps/clears the buffer under the lock, then formats the text outside the lock.
DataflowContextFilter
¶
Bases: Filter
Inject the current dataflow_id from :mod:contextvars into every log record.
Attach to handlers (not loggers) so it applies to all propagated messages.
LogConfig
dataclass
¶
LogConfig(log_level: str = INFO.value, file_level: str = DEBUG.value, storage_mode: str = MEMORY.value, output_path: Optional[str] = None, partition_by_date: bool = True, partition_pattern: str = DEFAULT_PARTITION_PATTERN, flush_interval_seconds: int = 60)
Configuration dataclass for loggers.
LogLevel
¶
Bases: str, Enum
Standard logging levels.
LogManager
¶
Singleton that configures Python logging with capture support.
configure
¶
configure(level: str = INFO.value, file_level: Optional[str] = None, capture_logs: bool = True, storage_mode: str = MEMORY.value, console_output: bool = True, format_string: Optional[str] = None, force: bool = False) -> None
Configure the global logging system.
If already configured, this is a no-op unless force is True.
Pass force=True (as SystemLogger does) to apply new settings
and replace existing handlers.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
level
|
str
|
Console log level (controls what is printed to stderr). |
INFO.value
|
file_level
|
Optional[str]
|
Capture log level for file persistence. Defaults to
|
None
|
capture_logs
|
bool
|
Enable :class: |
True
|
storage_mode
|
str
|
|
MEMORY.value
|
console_output
|
bool
|
Emit to stderr. |
True
|
format_string
|
Optional[str]
|
Custom |
None
|
force
|
bool
|
Re-configure even if already configured. |
False
|
get_and_clear_captured_logs
¶
Atomically return captured logs as plain text and clear the buffer.
get_logger
¶
Create (or reuse) a child logger under the framework root.
LogRecord
dataclass
¶
StorageMode
¶
Bases: str, Enum
Temporary storage mode for log buffering.
format_partition_path
¶
format_partition_path(base_path: str, run_date: Optional[datetime] = None, pattern: str = DEFAULT_PARTITION_PATTERN) -> str
Append a partition folder to base_path using pattern.
Supported placeholders: {year}, {month}, {day}, {hour}.
get_logger
¶
Get a framework logger (convenience wrapper).
All loggers are children of the DataCoolie root logger and inherit
its handlers (console + capture).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Typically |
required |
Returns:
| Name | Type | Description |
|---|---|---|
Configured |
Logger
|
class: |
etl_logger
¶
Structured ETL execution logger.
ETLLogger accumulates dataflow / maintenance runtime entries across
one or more driver.run() invocations within a single job session
and writes them once when :meth:close is called.
Usage pattern::
lgr = create_etl_logger(output_path="logs/", job_id="job-1", platform=plat)
driver.run(stage="bronze")
driver.run(stage="silver")
driver.run_maintenance()
lgr.close() # single write of all accumulated data
Outputs¶
Debug JSONL A single JSONL file per session (appended to on each periodic flush). Per-dataflow entries + job summary as the last line. Datetime values are serialised as ISO-8601 strings.
Analyst Parquet (requires pyarrow)
Two Parquet files per session, each with an explicit PyArrow schema:
* ``dataflow_<stem>.parquet`` — one row per dataflow/maintenance execution.
* ``job_summary_<stem>.parquet`` — a single-row job aggregate.
Datetime columns use ``timestamp[us, tz=UTC]`` for native query support.
Partition layout::
<output_path>/<purpose>/<log_type>/run_date=yyyy-mm-dd/<filename>
ETLLogger
¶
ETLLogger(config: LogConfig, platform: Optional[BasePlatform] = None)
Bases: BaseLogger
Structured ETL execution logger — accumulate-then-flush.
All :meth:log calls accumulate entries in memory. A single
:class:~datacoolie.core.models.JobRuntimeInfo tracks session-level
aggregates. On :meth:flush (called automatically by :meth:close)
the logger appends debug JSONL and writes analyst Parquet.
create_etl_logger
¶
create_etl_logger(output_path: Optional[str] = None, platform: Optional[BasePlatform] = None) -> ETLLogger
Create an :class:ETLLogger with common configuration.
system_logger
¶
System logger — captures and persists Python log output.
SystemLogger is not a logger itself; it configures the global
:class:LogManager to capture logs and periodically appends them to a
plain-text .log file in storage via the platform.
Two independent log levels are supported:
log_level— controls the console stream (what operators see live).file_level— controls what is captured to the file (defaults toDEBUG, capturing everything regardless of console level).
Flushing is periodic (background timer) plus a final flush on :meth:close.
Each flush appends only new captured records to the remote file via
:meth:~datacoolie.platforms.base.BasePlatform.append_file, which allows
crash-safe incremental persistence without re-uploading the whole file.
Usage::
from datacoolie.logging.base import get_logger
from datacoolie.logging.system_logger import SystemLogger
logger = get_logger(__name__)
with SystemLogger(config, platform) as sys_log:
logger.info("Processing started") # printed at INFO, captured at DEBUG+
# all remaining logs appended on close
SystemLogger
¶
SystemLogger(config: LogConfig, platform: Optional[BasePlatform] = None)
Bases: BaseLogger
Captures all framework Python logs and appends them to a .log file.
The file is a plain-text log (one line per record) so operators can read
it directly without a JSON parser. The capture level defaults to
DEBUG so every framework message is recorded regardless of the
console level set by the Driver.
Periodic flushing is driven by the :class:BaseLogger daemon timer.
Each tick atomically drains the in-memory capture buffer and appends
the text to the remote file via :meth:~BasePlatform.append_file.
A final flush is performed on :meth:close.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
LogConfig
|
Logging configuration (output_path, log_level, file_level, flush_interval_seconds, etc.). |
required |
platform
|
Optional[BasePlatform]
|
Platform for file operations. |
None
|
create_system_logger
¶
create_system_logger(output_path: Optional[str] = None, log_level: str = INFO.value, file_level: str = DEBUG.value, platform: Optional[BasePlatform] = None, storage_mode: str = MEMORY.value) -> SystemLogger
Factory for :class:SystemLogger.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
output_path
|
Optional[str]
|
Base directory for the log file. |
None
|
log_level
|
str
|
Console stream level (default |
INFO.value
|
file_level
|
str
|
Capture / file level (default |
DEBUG.value
|
platform
|
Optional[BasePlatform]
|
Platform for file I/O. |
None
|
storage_mode
|
str
|
In-memory or file-backed capture buffer. |
MEMORY.value
|
context
¶
Thread-safe dataflow context propagation via :mod:contextvars.
Stores the current dataflow_id in a :class:contextvars.ContextVar so that
every log record emitted on the same thread automatically includes it — without
any changes to the 28+ modules that call logger.info(…).
Usage in driver code::
token = set_dataflow_id(dataflow.dataflow_id)
try:
... # all logging here will carry the dataflow_id
finally:
clear_dataflow_id(token)