Skip to content

Watermark

base

Watermark serialization and abstract manager.

WatermarkSerializer converts Dict[str, Any] watermarks to/from JSON using the __datetime__ sentinel pattern for datetime round-tripping.

BaseWatermarkManager is the ABC that :class:WatermarkManager implements.

BaseWatermarkManager

BaseWatermarkManager()

Bases: ABC

Abstract watermark manager.

Concrete implementations decide where watermarks are stored (file, database, API). The serializer is an internal concern — callers always pass and receive Dict[str, Any].

deserialize

deserialize(json_str: str) -> Dict[str, Any]

Deserialize a JSON string to a watermark dict.

get_watermark abstractmethod

get_watermark(dataflow_id: str) -> Optional[Dict[str, Any]]

Return the current watermark for dataflow_id, or None.

save_watermark abstractmethod

save_watermark(dataflow_id: str, watermark: Dict[str, Any], *, job_id: Optional[str] = None, dataflow_run_id: Optional[str] = None) -> None

Persist a watermark for dataflow_id.

serialize

serialize(watermark: Dict[str, Any]) -> str

Serialize a dict watermark to JSON.

WatermarkSerializer

Serialize / deserialize watermark dictionaries to / from JSON.

datetime values are stored as::

{"__datetime__": "2026-02-09T10:30:00+00:00"}

and restored on deserialization.

deserialize staticmethod

deserialize(json_str: str) -> Dict[str, Any]

Parse a JSON string back into a watermark dict.

Restores __datetime__ patterns to datetime objects. Empty / null inputs return {}.

Parameters:

Name Type Description Default
json_str str

JSON string (may be None, empty, "null").

required

Returns:

Type Description
Dict[str, Any]

Watermark dictionary.

serialize staticmethod

serialize(watermark: Dict[str, Any]) -> str

Convert a watermark dict to a JSON string.

datetime values are encoded with the __datetime__ pattern.

Parameters:

Name Type Description Default
watermark Dict[str, Any]

Watermark key-value pairs.

required

Returns:

Type Description
str

JSON string.

serialize_watermark

serialize_watermark(watermark: Dict[str, Any]) -> str

Convenience wrapper for :meth:WatermarkSerializer.serialize.

deserialize_watermark

deserialize_watermark(json_str: str) -> Dict[str, Any]

Convenience wrapper for :meth:WatermarkSerializer.deserialize.

is_watermark_empty

is_watermark_empty(watermark: Optional[Dict[str, Any]]) -> bool

Return True if the watermark is None, empty, or all-None values.

watermark_manager

Concrete watermark manager that delegates storage to a metadata provider.

WatermarkManager is the primary implementation used by the ETL Driver. It serializes / deserializes watermarks and delegates persistence to the injected :class:~datacoolie.metadata.base.BaseMetadataProvider.

For file-based standalone operation, FileProvider implements both metadata and watermark storage, so WatermarkManager simply delegates save/get through the same provider.

WatermarkManager

WatermarkManager(metadata_provider: BaseMetadataProvider)

Bases: BaseWatermarkManager

Watermark manager backed by a metadata provider.

Parameters:

Name Type Description Default
metadata_provider BaseMetadataProvider

Provider responsible for watermark persistence.

required

get_watermark

get_watermark(dataflow_id: str) -> Optional[Dict[str, Any]]

Return the current watermark dict, or None.

save_watermark

save_watermark(dataflow_id: str, watermark: Dict[str, Any], *, job_id: Optional[str] = None, dataflow_run_id: Optional[str] = None) -> None

Serialize the watermark dict and persist via the provider.