Skip to content

Sources

base

Abstract base class for source readers.

BaseSourceReader[DF] uses the Template Method pattern:

  • Public :meth:read handles timing, watermark filtering, count/max calculation, file-info columns, and error wrapping.
  • Subclasses implement :meth:_read_internal (and optionally :meth:_read_data) with format-specific logic.

The reader delegates all DataFrame operations to a :class:BaseEngine.

BaseSourceReader

BaseSourceReader(engine: BaseEngine[DF])

Bases: ABC, Generic[DF]

Abstract source reader with Template Method lifecycle.

Subclasses must implement :meth:_read_internal. Optionally they may also implement :meth:_read_data for the raw read step.

Type parameter DF is the concrete DataFrame class.

get_new_watermark

get_new_watermark() -> Dict[str, Any]

Return the new watermark values computed during the last read.

get_runtime_info

get_runtime_info() -> SourceRuntimeInfo

Return runtime information for the most recent read.

read

read(source: Source, watermark: Optional[Dict[str, Any]] = None) -> Optional[DF]

Read data from a source (Template Method).

  1. Initialise runtime info.
  2. Delegate to :meth:_read_internal.
  3. Record timing and watermark data.

Parameters:

Name Type Description Default
source Source

Pipeline source configuration.

required
watermark Optional[Dict[str, Any]]

Previous watermark values (None = first run).

None

Returns:

Type Description
Optional[DF]

DataFrame with source data, or None when there is

Optional[DF]

no data to process.

Raises:

Type Description
SourceError

On any failure during reading.

file_reader

File-based source reader (Parquet, CSV, JSON, Avro, etc.).

Reads flat files from storage, supporting date-folder partitioning patterns and optional platform-based folder discovery.

FileReader

FileReader(engine: BaseEngine[DF])

Bases: BaseSourceReader[DF]

Source reader for flat files (Parquet, CSV, JSON, Avro, etc.).

Supports: * Single path and multi-path reads. * Date-folder partition patterns ({year}/{month}/{day}...). * Watermark-based incremental reads with folder pruning. * File-info column injection (__file_name, etc.).

delta_reader

Delta Lake source reader.

Reads Delta tables via :meth:engine.read_delta with support for watermark-based incremental reads and SQL queries.

DeltaReader

DeltaReader(engine: BaseEngine[DF])

Bases: BaseSourceReader[DF]

Source reader for Delta Lake tables.

Supports: * Full and incremental reads (watermark filtering). * SQL query mode (source.query).

iceberg_reader

Iceberg source reader.

Reads Iceberg tables via SQL (SELECT * FROM table_name) using the catalog-aware fully qualified table name. Falls back to path-based reads when no catalog is configured.

IcebergReader

IcebergReader(engine: BaseEngine[DF])

Bases: BaseSourceReader[DF]

Source reader for Apache Iceberg tables.

Prefers SQL-based reads via full_table_name (catalog-aware). Falls back to path-based reads when no catalog is configured.

database_reader

Database source reader.

Reads data from relational databases via :meth:engine.read_database, supporting both table and query modes.

DatabaseReader

DatabaseReader(engine: BaseEngine[DF])

Bases: BaseSourceReader[DF]

Source reader for relational databases.

Uses :meth:engine.read_database which delegates to the engine-specific external database reader (JDBC in Spark, connectorx/ADBC in Polars, etc.).

Supports: * Table mode — read entire table (or schema.table). * Query mode — arbitrary SQL query executed on the source database. * Watermark-based incremental reads pushed down to SQL.

api_reader

API source reader.

Reads data from HTTP/REST APIs via httpx with support for pagination, authentication, and rate limiting.

APIReader

APIReader(engine: BaseEngine[DF])

Bases: BaseSourceReader[DF]

Source reader for HTTP/REST APIs.

Fetches JSON data from paginated REST endpoints, converts the collected records to a DataFrame via the engine, and supports incremental reads through watermark filtering.

API configuration is supplied through source.connection.configure and source.configure:

Connection-level (source.connection.configure): - base_url (str): Base URL for the API (required). - auth_type (str): "bearer", "basic", "api_key", or "oauth2_client_credentials". - auth_token (str): Bearer token value (for auth_type="bearer"). - username / password (str): Basic auth credentials. - api_key_header (str): Header name for API key auth. - api_key_value (str): API key value. - token_url (str): OAuth2 token endpoint (for oauth2_client_credentials). - client_id (str): OAuth2 client ID. - client_secret (str): OAuth2 client secret — use secrets_ref so it is resolved from a secret store at runtime rather than stored in plain text. - scope (str): Optional space-separated OAuth2 scopes. - token_auth_method (str): How to send client credentials to the token endpoint. "client_secret_post" (default) puts client_id and client_secret in the POST body. "client_secret_basic" sends them as HTTP Basic Auth (required by Okta, Ping Identity, and some others). - token_request_body_format (str): "form" (default, application/x-www-form-urlencoded) or "json" (application/json) — some providers (e.g. GitHub Apps) expect JSON. - token_request_extras (dict): Extra fields forwarded to the token POST body. - watermark_to_param_timezone (str, optional): Default timezone for the "now" timestamp injected into watermark_to_param and for advancing watermarks of pushed-down columns. Can be overridden per source via the source-level key of the same name (source wins). Accepts IANA names or UTC offset strings; defaults to "UTC".

auth_type="aws_sigv4" (API Gateway IAM auth, direct AWS service endpoints): Signs every request with AWS Signature Version 4 via botocore. When aws_access_key_id is omitted, the standard boto3 credential chain is used (env vars, ~/.aws/credentials, EC2/ECS instance role, etc.). - aws_region (str): AWS region, e.g. "us-east-1". - aws_service (str): AWS service name (default "execute-api" for API Gateway). Use "s3", "lambda", etc. for other endpoints. - aws_access_key_id (str, optional): Override access key — use secrets_ref rather than hard-coding. - aws_secret_access_key (str, optional): Override secret — use secrets_ref. - aws_session_token (str, optional): Temporary session token for role-assumed or STS credentials. - default_headers (dict): Headers applied to every request. - timeout (int/float): Request timeout in seconds (default 30).

Source-level (source.configure): - endpoint (str): API endpoint path appended to base_url. - method (str): HTTP method (default "GET"). - params (dict): Query parameters. - body (dict): Request body (for POST/PUT). - pagination_type (str): "offset", "cursor", or "next_link". - page_size (int): Number of records per page (default 100). - max_pages (int): Safety limit on pages fetched (default 1000). - data_path (str): Dot-separated path to records array in response JSON (e.g. "data.items"). Defaults to root-level list. - next_link_path (str): Dot-separated path to the next-page URL. - cursor_path (str): Dot-separated path to the cursor token. - cursor_param (str): Query param name for cursor (default "cursor"). - offset_param (str): Query param for offset (default "offset"). - limit_param (str): Query param for page size (default "limit"). - rate_limit_delay (float): Seconds to wait between requests (default 0). - max_retries (int): Maximum number of retries on HTTP 429 rate-limit responses (default 10). When the Retry-After response header is present its value is used as the wait duration; otherwise exponential backoff applies: min(2 ** attempt, 30) seconds per retry (1 s, 2 s, 4 s, 8 s, 16 s, 30 s, 30 s, …), capped at 30 s. Raises :class:~datacoolie.core.exceptions.SourceError once all retries are exhausted. - total_path (str, optional): Dot-separated path to the total record count in the first-page response JSON (e.g. "meta.total" or "pagination.count"). Only applies when pagination_type="offset". When set, the reader fetches page 0 to discover the total, calculates the number of remaining pages, and dispatches them concurrently via :class:~concurrent.futures.ThreadPoolExecutor. Without this key, offset pagination falls back to sequential page-by-page fetching (stopping when a page returns fewer records than page_size). - offset_max_workers (int): Maximum number of parallel workers used when total_path is configured for concurrent offset fetching (default 4). Has no effect when total_path is absent.

Incremental / watermark push-down (source.configure): Instead of filtering rows in memory after fetching, these keys inject the stored watermark value directly into the outgoing request so the API server returns only new records — analogous to a SQL WHERE col > last_value.

- ``watermark_param_mapping`` (dict): Maps each ``watermark_columns`` entry
  to its API parameter name, e.g.
  ``{"updated_at": "updated_since", "id": "after_id"}``.
  Only columns that are present in the stored watermark are injected.
  Pushed-down columns may be absent from the API response; they skip
  in-memory filtering and their new watermark is set to ``"now"``
  (see ``watermark_to_param_timezone``) rather than the column's max
  value in the dataframe.
- ``watermark_to_param`` (str, optional): API parameter name to receive the
  current UTC timestamp as an upper bound ("to" side of the window).
  If omitted, no upper-bound parameter is sent.
- ``watermark_param_location`` (str): Where to inject — ``"params"`` (default,
  URL query string) or ``"body"`` (JSON/form POST body).
- ``watermark_param_format`` (str): How to serialise the watermark value:
  ``"iso"`` (default, e.g. ``"2024-01-15T12:00:00"``),
  ``"date"`` (``"2024-01-15"``),
  ``"timestamp"`` (Unix seconds as float string),
  ``"timestamp_ms"`` (Unix milliseconds as int string)
  ``"datetime"`` (``"2024-01-15 12:00:00"``),
  ``"datetime_ms"`` (``"2024-01-15 12:00:00.123"``).
- ``watermark_to_param_timezone`` (str, optional): Timezone for the
  ``"now"`` timestamp injected into ``watermark_to_param`` and used
  when advancing the watermark of pushed-down columns.  Accepts IANA
  names (``"Asia/Ho_Chi_Minh"``, ``"America/New_York"``) or UTC
  offset strings (``"+07:00"``, ``"-05:30"``).  Defaults to ``"UTC"``.
  Source-level value takes precedence over the connection-level default.

Range-split / parallel fetch (source.configure): Splits a large watermark window into equal-sized intervals and calls the API concurrently for each sub-range. Requires watermark_to_param to be set so each range's upper-bound can be passed to the API.

- ``watermark_range_interval_unit`` (str): Interval unit for each sub-range
  — ``"hour"``, ``"day"``, ``"month"``, or ``"year"``.  When set, range
  splitting is enabled; otherwise a single API call is made (default).
- ``watermark_range_interval_amount`` (int): Number of units per interval
  (default ``1``).  E.g. ``unit="hour", amount=3`` → 3-hour windows.
- ``watermark_range_start`` (str): ISO-8601 datetime used as the initial
  lower bound when no stored watermark exists yet (first run).  Required
  when ``watermark_range_interval_unit`` is set and no watermark has been
  saved.
- ``watermark_range_max_workers`` (int): Maximum parallel HTTP workers
  (default ``4``).
- ``watermark_range_to_exclusive_offset`` (str, optional): Epsilon
  subtracted from each range's upper-bound **before sending it to the
  API**, to prevent duplicate rows when the API uses inclusive
  ``BETWEEN from AND to`` semantics.  The internal boundary that
  starts the next range is unchanged.  Values: ``None`` (default
  — no adjustment, correct for APIs with half-open ``[from, to)``
  semantics), ``"1ms"`` (1 millisecond), ``"1s"`` (1 second),
  ``"1day"`` (1 day; for date-precision parameters like
  ``"2025-01-15"``).

python_function_reader

Python function source reader.

Reads data by importing and calling a user-defined Python function that returns a DataFrame. This is intended for complex source logic that cannot be expressed as a simple source.query.

The function path is specified on :attr:Source.python_function as a dotted module path, e.g. "mypackage.loaders.load_orders".

Two-phase watermark filtering:

  1. Before — the previous watermark dict is passed to the function so it can apply efficient source-side filtering (e.g. a WHERE clause).
  2. After — the framework applies :meth:_apply_watermark_filter on the returned DataFrame for precise, engine-level filtering — identical to what other readers do.

The function signature must be::

def my_loader(engine, source, watermark) -> DataFrame | None
  • engine — the active :class:BaseEngine instance.
  • source — the :class:Source model.
  • watermark — previous watermark values (None on first run).

PythonFunctionReader

PythonFunctionReader(engine: BaseEngine[DF], allowed_prefixes: Optional[List[str]] = None)

Bases: BaseSourceReader[DF]

Source reader that delegates to a user-defined Python function.

The function path is read from source.python_function and must be a fully-qualified dotted path (e.g. "my_module.load_data").

The function signature must be::

def my_loader(engine, source, watermark) -> DataFrame | None

Watermark handling follows a before / after pattern:

  • The watermark dict is passed to the function so it can pre-filter at the source (optional — the function may ignore it).
  • After the function returns, the framework applies :meth:_apply_watermark_filter for precise row-level filtering, consistent with other readers.