Sources¶
base
¶
Abstract base class for source readers.
BaseSourceReader[DF] uses the Template Method pattern:
- Public :meth:
readhandles timing, watermark filtering, count/max calculation, file-info columns, and error wrapping. - Subclasses implement :meth:
_read_internal(and optionally :meth:_read_data) with format-specific logic.
The reader delegates all DataFrame operations to a :class:BaseEngine.
BaseSourceReader
¶
BaseSourceReader(engine: BaseEngine[DF])
Bases: ABC, Generic[DF]
Abstract source reader with Template Method lifecycle.
Subclasses must implement :meth:_read_internal. Optionally they
may also implement :meth:_read_data for the raw read step.
Type parameter DF is the concrete DataFrame class.
get_new_watermark
¶
Return the new watermark values computed during the last read.
get_runtime_info
¶
Return runtime information for the most recent read.
read
¶
Read data from a source (Template Method).
- Initialise runtime info.
- Delegate to :meth:
_read_internal. - Record timing and watermark data.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
source
|
Source
|
Pipeline source configuration. |
required |
watermark
|
Optional[Dict[str, Any]]
|
Previous watermark values ( |
None
|
Returns:
| Type | Description |
|---|---|
Optional[DF]
|
DataFrame with source data, or |
Optional[DF]
|
no data to process. |
Raises:
| Type | Description |
|---|---|
SourceError
|
On any failure during reading. |
file_reader
¶
File-based source reader (Parquet, CSV, JSON, Avro, etc.).
Reads flat files from storage, supporting date-folder partitioning patterns and optional platform-based folder discovery.
FileReader
¶
FileReader(engine: BaseEngine[DF])
Bases: BaseSourceReader[DF]
Source reader for flat files (Parquet, CSV, JSON, Avro, etc.).
Supports:
* Single path and multi-path reads.
* Date-folder partition patterns ({year}/{month}/{day}...).
* Watermark-based incremental reads with folder pruning.
* File-info column injection (__file_name, etc.).
delta_reader
¶
Delta Lake source reader.
Reads Delta tables via :meth:engine.read_delta with support for
watermark-based incremental reads and SQL queries.
DeltaReader
¶
DeltaReader(engine: BaseEngine[DF])
Bases: BaseSourceReader[DF]
Source reader for Delta Lake tables.
Supports:
* Full and incremental reads (watermark filtering).
* SQL query mode (source.query).
iceberg_reader
¶
Iceberg source reader.
Reads Iceberg tables via SQL (SELECT * FROM table_name) using
the catalog-aware fully qualified table name. Falls back to path-based
reads when no catalog is configured.
IcebergReader
¶
IcebergReader(engine: BaseEngine[DF])
Bases: BaseSourceReader[DF]
Source reader for Apache Iceberg tables.
Prefers SQL-based reads via full_table_name (catalog-aware).
Falls back to path-based reads when no catalog is configured.
database_reader
¶
Database source reader.
Reads data from relational databases via :meth:engine.read_database,
supporting both table and query modes.
DatabaseReader
¶
DatabaseReader(engine: BaseEngine[DF])
Bases: BaseSourceReader[DF]
Source reader for relational databases.
Uses :meth:engine.read_database which delegates to the
engine-specific external database reader (JDBC in Spark,
connectorx/ADBC in Polars, etc.).
Supports: * Table mode — read entire table (or schema.table). * Query mode — arbitrary SQL query executed on the source database. * Watermark-based incremental reads pushed down to SQL.
api_reader
¶
API source reader.
Reads data from HTTP/REST APIs via httpx with support for
pagination, authentication, and rate limiting.
APIReader
¶
APIReader(engine: BaseEngine[DF])
Bases: BaseSourceReader[DF]
Source reader for HTTP/REST APIs.
Fetches JSON data from paginated REST endpoints, converts the collected records to a DataFrame via the engine, and supports incremental reads through watermark filtering.
API configuration is supplied through source.connection.configure
and source.configure:
Connection-level (source.connection.configure):
- base_url (str): Base URL for the API (required).
- auth_type (str): "bearer", "basic", "api_key", or
"oauth2_client_credentials".
- auth_token (str): Bearer token value (for auth_type="bearer").
- username / password (str): Basic auth credentials.
- api_key_header (str): Header name for API key auth.
- api_key_value (str): API key value.
- token_url (str): OAuth2 token endpoint (for oauth2_client_credentials).
- client_id (str): OAuth2 client ID.
- client_secret (str): OAuth2 client secret — use secrets_ref so
it is resolved from a secret store at runtime rather than stored in plain text.
- scope (str): Optional space-separated OAuth2 scopes.
- token_auth_method (str): How to send client credentials to the token
endpoint. "client_secret_post" (default) puts client_id and
client_secret in the POST body. "client_secret_basic" sends them
as HTTP Basic Auth (required by Okta, Ping Identity, and some others).
- token_request_body_format (str): "form" (default,
application/x-www-form-urlencoded) or "json"
(application/json) — some providers (e.g. GitHub Apps) expect JSON.
- token_request_extras (dict): Extra fields forwarded to the token POST body.
- watermark_to_param_timezone (str, optional): Default timezone for
the "now" timestamp injected into watermark_to_param and for
advancing watermarks of pushed-down columns. Can be overridden per
source via the source-level key of the same name (source wins).
Accepts IANA names or UTC offset strings; defaults to "UTC".
auth_type="aws_sigv4" (API Gateway IAM auth, direct AWS service endpoints):
Signs every request with AWS Signature Version 4 via botocore.
When aws_access_key_id is omitted, the standard boto3 credential chain
is used (env vars, ~/.aws/credentials, EC2/ECS instance role, etc.).
- aws_region (str): AWS region, e.g. "us-east-1".
- aws_service (str): AWS service name (default "execute-api" for
API Gateway). Use "s3", "lambda", etc. for other endpoints.
- aws_access_key_id (str, optional): Override access key — use
secrets_ref rather than hard-coding.
- aws_secret_access_key (str, optional): Override secret — use
secrets_ref.
- aws_session_token (str, optional): Temporary session token for
role-assumed or STS credentials.
- default_headers (dict): Headers applied to every request.
- timeout (int/float): Request timeout in seconds (default 30).
Source-level (source.configure):
- endpoint (str): API endpoint path appended to base_url.
- method (str): HTTP method (default "GET").
- params (dict): Query parameters.
- body (dict): Request body (for POST/PUT).
- pagination_type (str): "offset", "cursor",
or "next_link".
- page_size (int): Number of records per page (default 100).
- max_pages (int): Safety limit on pages fetched (default 1000).
- data_path (str): Dot-separated path to records array in response
JSON (e.g. "data.items"). Defaults to root-level list.
- next_link_path (str): Dot-separated path to the next-page URL.
- cursor_path (str): Dot-separated path to the cursor token.
- cursor_param (str): Query param name for cursor (default "cursor").
- offset_param (str): Query param for offset (default "offset").
- limit_param (str): Query param for page size (default "limit").
- rate_limit_delay (float): Seconds to wait between requests (default 0).
- max_retries (int): Maximum number of retries on HTTP 429 rate-limit
responses (default 10). When the Retry-After response header is
present its value is used as the wait duration; otherwise exponential backoff
applies: min(2 ** attempt, 30) seconds per retry
(1 s, 2 s, 4 s, 8 s, 16 s, 30 s, 30 s, …), capped at 30 s.
Raises :class:~datacoolie.core.exceptions.SourceError once all
retries are exhausted.
- total_path (str, optional): Dot-separated path to the total record count
in the first-page response JSON (e.g. "meta.total" or
"pagination.count"). Only applies when pagination_type="offset".
When set, the reader fetches page 0 to discover the total, calculates the
number of remaining pages, and dispatches them concurrently via
:class:~concurrent.futures.ThreadPoolExecutor. Without this key,
offset pagination falls back to sequential page-by-page fetching (stopping
when a page returns fewer records than page_size).
- offset_max_workers (int): Maximum number of parallel workers used when
total_path is configured for concurrent offset fetching (default 4).
Has no effect when total_path is absent.
Incremental / watermark push-down (source.configure):
Instead of filtering rows in memory after fetching, these keys inject the
stored watermark value directly into the outgoing request so the API server
returns only new records — analogous to a SQL WHERE col > last_value.
- ``watermark_param_mapping`` (dict): Maps each ``watermark_columns`` entry
to its API parameter name, e.g.
``{"updated_at": "updated_since", "id": "after_id"}``.
Only columns that are present in the stored watermark are injected.
Pushed-down columns may be absent from the API response; they skip
in-memory filtering and their new watermark is set to ``"now"``
(see ``watermark_to_param_timezone``) rather than the column's max
value in the dataframe.
- ``watermark_to_param`` (str, optional): API parameter name to receive the
current UTC timestamp as an upper bound ("to" side of the window).
If omitted, no upper-bound parameter is sent.
- ``watermark_param_location`` (str): Where to inject — ``"params"`` (default,
URL query string) or ``"body"`` (JSON/form POST body).
- ``watermark_param_format`` (str): How to serialise the watermark value:
``"iso"`` (default, e.g. ``"2024-01-15T12:00:00"``),
``"date"`` (``"2024-01-15"``),
``"timestamp"`` (Unix seconds as float string),
``"timestamp_ms"`` (Unix milliseconds as int string)
``"datetime"`` (``"2024-01-15 12:00:00"``),
``"datetime_ms"`` (``"2024-01-15 12:00:00.123"``).
- ``watermark_to_param_timezone`` (str, optional): Timezone for the
``"now"`` timestamp injected into ``watermark_to_param`` and used
when advancing the watermark of pushed-down columns. Accepts IANA
names (``"Asia/Ho_Chi_Minh"``, ``"America/New_York"``) or UTC
offset strings (``"+07:00"``, ``"-05:30"``). Defaults to ``"UTC"``.
Source-level value takes precedence over the connection-level default.
Range-split / parallel fetch (source.configure):
Splits a large watermark window into equal-sized intervals and calls
the API concurrently for each sub-range. Requires watermark_to_param
to be set so each range's upper-bound can be passed to the API.
- ``watermark_range_interval_unit`` (str): Interval unit for each sub-range
— ``"hour"``, ``"day"``, ``"month"``, or ``"year"``. When set, range
splitting is enabled; otherwise a single API call is made (default).
- ``watermark_range_interval_amount`` (int): Number of units per interval
(default ``1``). E.g. ``unit="hour", amount=3`` → 3-hour windows.
- ``watermark_range_start`` (str): ISO-8601 datetime used as the initial
lower bound when no stored watermark exists yet (first run). Required
when ``watermark_range_interval_unit`` is set and no watermark has been
saved.
- ``watermark_range_max_workers`` (int): Maximum parallel HTTP workers
(default ``4``).
- ``watermark_range_to_exclusive_offset`` (str, optional): Epsilon
subtracted from each range's upper-bound **before sending it to the
API**, to prevent duplicate rows when the API uses inclusive
``BETWEEN from AND to`` semantics. The internal boundary that
starts the next range is unchanged. Values: ``None`` (default
— no adjustment, correct for APIs with half-open ``[from, to)``
semantics), ``"1ms"`` (1 millisecond), ``"1s"`` (1 second),
``"1day"`` (1 day; for date-precision parameters like
``"2025-01-15"``).
python_function_reader
¶
Python function source reader.
Reads data by importing and calling a user-defined Python function that
returns a DataFrame. This is intended for complex source logic that
cannot be expressed as a simple source.query.
The function path is specified on :attr:Source.python_function as a
dotted module path, e.g. "mypackage.loaders.load_orders".
Two-phase watermark filtering:
- Before — the previous watermark dict is passed to the function so it can apply efficient source-side filtering (e.g. a WHERE clause).
- After — the framework applies :meth:
_apply_watermark_filteron the returned DataFrame for precise, engine-level filtering — identical to what other readers do.
The function signature must be::
def my_loader(engine, source, watermark) -> DataFrame | None
engine— the active :class:BaseEngineinstance.source— the :class:Sourcemodel.watermark— previous watermark values (Noneon first run).
PythonFunctionReader
¶
PythonFunctionReader(engine: BaseEngine[DF], allowed_prefixes: Optional[List[str]] = None)
Bases: BaseSourceReader[DF]
Source reader that delegates to a user-defined Python function.
The function path is read from source.python_function and must
be a fully-qualified dotted path (e.g. "my_module.load_data").
The function signature must be::
def my_loader(engine, source, watermark) -> DataFrame | None
Watermark handling follows a before / after pattern:
- The
watermarkdict is passed to the function so it can pre-filter at the source (optional — the function may ignore it). - After the function returns, the framework applies
:meth:
_apply_watermark_filterfor precise row-level filtering, consistent with other readers.