Logging¶
base
¶
Base logging infrastructure for the DataCoolie framework.
Provides:
- :class:
LogManager— singleton that configures Python logging with a capture handler for later persistence to datalake. - :class:
CaptureHandler— a :class:logging.Handlerthat buffers :class:LogRecordobjects in memory or a temp file. - :class:
BaseLogger— ABC for persistent loggers (system, ETL). - :class:
LogConfig— configuration dataclass. - :func:
get_logger— module-level convenience to create child loggers.
Usage::
from datacoolie.logging.base import get_logger
logger = get_logger(__name__)
logger.info("Processing started")
BaseLogger
¶
BaseLogger(config: LogConfig, platform: Optional[BasePlatform] = None)
Bases: ABC
Abstract base for persistent loggers (system, ETL).
Provides configuration, lifecycle management, and context-manager support.
Subclasses implement :meth:flush.
CaptureHandler
¶
Bases: Handler
Captures Python log records for later persistence.
DataflowContextFilter
¶
Bases: Filter
Inject the current dataflow_id from :mod:contextvars into every log record.
Attach to handlers (not loggers) so it applies to all propagated messages.
LogConfig
dataclass
¶
LogConfig(log_level: str = INFO.value, storage_mode: str = MEMORY.value, output_path: Optional[str] = None, partition_by_date: bool = True, partition_pattern: str = DEFAULT_PARTITION_PATTERN, flush_interval_seconds: int = 60)
Configuration dataclass for loggers.
LogLevel
¶
Bases: str, Enum
Standard logging levels.
LogManager
¶
Singleton that configures Python logging with capture support.
configure
¶
configure(level: str = INFO.value, capture_logs: bool = True, storage_mode: str = MEMORY.value, console_output: bool = True, format_string: Optional[str] = None, force: bool = False) -> None
Configure the global logging system.
If already configured, this is a no-op unless force is True.
Pass force=True (as SystemLogger does) to apply new settings
and replace existing handlers.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
level
|
str
|
Minimum log level. |
INFO.value
|
capture_logs
|
bool
|
Enable :class: |
True
|
storage_mode
|
str
|
|
MEMORY.value
|
console_output
|
bool
|
Emit to stderr. |
True
|
format_string
|
Optional[str]
|
Custom |
None
|
force
|
bool
|
Re-configure even if already configured. |
False
|
get_captured_jsonl_logs
¶
Return captured logs as newline-delimited JSON.
get_logger
¶
Create (or reuse) a child logger under the framework root.
LogRecord
dataclass
¶
SensitiveValueFilter
¶
Bases: Filter
Scrub resolved secret values from log messages.
Works with the global set maintained by
:func:~datacoolie.core.secret_provider.register_secret_values.
StorageMode
¶
Bases: str, Enum
Temporary storage mode for log buffering.
format_partition_path
¶
format_partition_path(base_path: str, run_date: Optional[datetime] = None, pattern: str = DEFAULT_PARTITION_PATTERN) -> str
Append a partition folder to base_path using pattern.
Supported placeholders: {year}, {month}, {day}, {hour}.
get_logger
¶
Get a framework logger (convenience wrapper).
All loggers are children of the DataCoolie root logger and inherit
its handlers (console + capture).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Typically |
required |
Returns:
| Name | Type | Description |
|---|---|---|
Configured |
Logger
|
class: |
etl_logger
¶
Structured ETL execution logger.
ETLLogger accumulates dataflow / maintenance runtime entries across
one or more driver.run() invocations within a single job session
and writes them once when :meth:close is called.
Usage pattern::
lgr = create_etl_logger(output_path="logs/", job_id="job-1", platform=plat)
driver.run(stage="bronze")
driver.run(stage="silver")
driver.run_maintenance()
lgr.close() # single write of all accumulated data
Outputs¶
Debug JSONL A single JSONL file per session (appended to on each periodic flush). Per-dataflow entries + job summary as the last line. Datetime values are serialised as ISO-8601 strings.
Analyst Parquet (requires pyarrow)
Two Parquet files per session, each with an explicit PyArrow schema:
* ``dataflow_<stem>.parquet`` — one row per dataflow/maintenance execution.
* ``job_summary_<stem>.parquet`` — a single-row job aggregate.
Datetime columns use ``timestamp[us, tz=UTC]`` for native query support.
Partition layout::
<output_path>/<purpose>/<log_type>/run_date=yyyy-mm-dd/<filename>
ETLLogger
¶
ETLLogger(config: LogConfig, platform: Optional[BasePlatform] = None)
Bases: BaseLogger
Structured ETL execution logger — accumulate-then-flush.
All :meth:log calls accumulate entries in memory. A single
:class:~datacoolie.core.models.JobRuntimeInfo tracks session-level
aggregates. On :meth:flush (called automatically by :meth:close)
the logger writes debug JSONL and analyst Parquet in one shot.
create_etl_logger
¶
create_etl_logger(output_path: Optional[str] = None, platform: Optional[BasePlatform] = None) -> ETLLogger
Create an :class:ETLLogger with common configuration.
system_logger
¶
System logger — captures and persists Python log output.
SystemLogger is not a logger itself; it configures the global
:class:LogManager to capture logs, then on :meth:flush / :meth:close
uploads the captured content to datalake storage via the platform.
Usage::
from datacoolie.logging.base import get_logger
from datacoolie.logging.system_logger import SystemLogger
logger = get_logger(__name__)
with SystemLogger(config, platform) as log_mgr:
logger.info("Processing started")
# logs uploaded on close
SystemLogger
¶
SystemLogger(config: LogConfig, platform: Optional[BasePlatform] = None)
Bases: BaseLogger
Captures all framework Python logs and persists them to storage.
On initialization it re-configures the global :class:LogManager to
capture logs. On :meth:flush the captured content is written as a
single text file to the platform.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
LogConfig
|
Logging configuration (output_path, level, etc.). |
required |
platform
|
Optional[BasePlatform]
|
Platform for file operations. |
None
|
create_system_logger
¶
create_system_logger(output_path: Optional[str] = None, log_level: str = INFO.value, platform: Optional[BasePlatform] = None, storage_mode: str = MEMORY.value) -> SystemLogger
Factory for :class:SystemLogger.
context
¶
Thread-safe dataflow context propagation via :mod:contextvars.
Stores the current dataflow_id in a :class:contextvars.ContextVar so that
every log record emitted on the same thread automatically includes it — without
any changes to the 28+ modules that call logger.info(…).
Usage in driver code::
token = set_dataflow_id(dataflow.dataflow_id)
try:
... # all logging here will carry the dataflow_id
finally:
clear_dataflow_id(token)