Source patterns¶
Prerequisites · Completed Build your first metadata file.
End state · A working connections entry and source block for any DataCoolie source type.
A source connection describes where DataCoolie reads data from. Two things
work together: the Connection (shared, reusable endpoint definition) and the
Source block inside each dataflow (table/query/function selector and
incremental options).
Connection.configure carries reusable defaults. source.configure carries
per-dataflow overrides. For read behavior, DataCoolie merges them with
source overrides winning over connection defaults.
Source at a glance¶
"source": {
"connection_name": "my_conn",
"schema_name": "sales",
"table": "orders",
"query": "SELECT * FROM sales.orders",
"python_function": "mypkg.loaders.load_orders",
"watermark_columns": ["updated_at"],
"configure": {
"read_options": { "separator": ";" },
"endpoint": "/orders"
}
}
| Field | When you use it |
|---|---|
connection_name |
Always — must match a connection name |
schema_name |
The source lives under a folder / schema namespace |
table |
File folder, lakehouse table, or database table |
query |
Database query mode instead of table mode |
python_function |
Function-source mode instead of a physical table |
watermark_columns |
Incremental loading |
configure |
Per-dataflow overrides such as read_options, API endpoint, API params |
In practice, you normally use one selector style per source:
tablefor file/lakehouse/database table readsqueryfor database query readspython_functionfor function reads
watermark_columns enables incremental loading — DataCoolie remembers the
max value from the previous run and only reads new rows on the next run.
Decide source type¶
| Source family | Connection shape | Typical source fields |
|---|---|---|
| File | connection_type: file, file format |
schema_name, table, watermark_columns, configure.read_options |
| Lakehouse | connection_type: lakehouse, delta or iceberg |
schema_name, table, watermark_columns |
| Database | connection_type: database, format: sql |
table or query, schema_name, watermark_columns |
| REST API | connection_type: api, format: api |
configure.endpoint, configure.params/body, pagination, watermark push-down |
| Python function | connection_type: function, format: function |
python_function, watermark_columns, custom source.configure |
File source (CSV, Parquet, JSON, JSONL, Avro, Excel)¶
Use when your data is in flat files on local disk, cloud object storage (S3, ADLS, GCS), or a Fabric/Databricks lakehouse path.
{
"name": "raw_files",
"connection_type": "file",
"format": "csv",
"configure": {
"base_path": "data/input",
"read_options": { "separator": ";" },
"use_hive_partitioning": true,
"date_folder_partitions": "{year}/{month}/{day}"
}
}
| Option | Required | Notes |
|---|---|---|
base_path |
yes | Root folder. For S3: s3://my-bucket/raw. For ADLS: abfss://container@account.dfs.core.windows.net/raw |
read_options |
no | Reader defaults for this connection. source.configure.read_options can override them per dataflow |
use_hive_partitioning |
no | Enables partition-folder discovery like country=VN/year=2026/ |
date_folder_partitions |
no | Date-folder pattern such as {year}/{month}/{day} for folder pruning |
backward_days / backward |
no | Re-read a historical window behind the last watermark |
use_schema_hint |
no | Defaults to true; disable to ignore schema_hints for this connection |
format |
yes | One of csv, parquet, json, jsonl, avro, excel |
Dataflow source block:
Reads the folder at data/input/sales/orders.
Per-dataflow file overrides¶
If one dataset needs special reader settings, override the connection-level
defaults in source.configure:
"source": {
"connection_name": "raw_files",
"schema_name": "sales",
"table": "orders",
"configure": {
"read_options": {
"separator": "|",
"encoding": "utf8-lossy"
}
}
}
File-source incremental cases¶
File readers support more than simple row-level watermarks:
| Pattern | How it works |
|---|---|
| Row-column watermark | CSV/JSON/Excel are fully scanned, then filtered in the engine |
| File modification watermark | Use __file_modification_time in watermark_columns |
| Date-folder pruning | date_folder_partitions lets the reader prune old folders before reading |
| Historical replay window | backward_days / backward re-opens a look-back window |
File reads also inject file lineage columns such as __file_name,
__file_path, and __file_modification_time.
Internal folder watermark
When you use date_folder_partitions, DataCoolie stores the folder-level
watermark internally as __date_folder_partition__. You normally do not
need to author that field yourself; the reader maintains it.
Excel is read-only
Excel (format: "excel") can only be a source. You cannot write Excel
as a destination.
Lakehouse source (Delta or Iceberg)¶
Use when reading from a Delta table on a lakehouse (Databricks, Fabric, local Delta folder, or S3 Delta Lake), or an Iceberg table registered in a catalog.
{
"name": "bronze_lake",
"connection_type": "lakehouse",
"format": "delta",
"configure": {
"base_path": "data/output/bronze"
}
}
Dataflow source block:
"source": {
"connection_name": "bronze_lake",
"schema_name": "sales",
"table": "orders",
"watermark_columns": ["updated_at"]
}
For Databricks Unity Catalog or Fabric Lakehouse, set catalog and database
instead of base_path:
{
"name": "unity_bronze",
"connection_type": "lakehouse",
"format": "delta",
"catalog": "my_catalog",
"database": "bronze",
"configure": {}
}
Table is then referenced by name: schema_name maps to the schema, table to
the table. Leave schema_name empty when the Unity Catalog layout only uses
three parts (catalog.database.table).
Same shape for Iceberg, just change format:
{
"name": "iceberg_lake",
"connection_type": "lakehouse",
"format": "iceberg",
"catalog": "glue_catalog",
"database": "raw",
"configure": {}
}
| Addressing mode | What to configure |
|---|---|
| Path-based local/cloud table | configure.base_path + schema_name + table |
| Registered metastore table | catalog, database, optional schema_name, table |
Delta and Iceberg both support predicate push-down for watermarks when the engine supports it.
Database source (SQL via SQLAlchemy)¶
Use when reading from PostgreSQL, MySQL, MSSQL, Oracle, SQLite, or any SQLAlchemy-supported database.
{
"name": "postgres_src",
"connection_type": "database",
"format": "sql",
"configure": {
"database_type": "postgresql",
"host": "warehouse.internal",
"port": 5432,
"database": "analytics",
"username": "DC_DB_USER",
"password": "DC_DB_PASSWORD"
},
"secrets_ref": {
"env": ["username", "password"]
}
}
Never hardcode passwords
Use secrets_ref instead of putting credentials in configure. See
Concepts · Secrets and the credential section
below.
You can connect with either of these shapes:
| Pattern | When to use |
|---|---|
configure.url |
You already have one connection string |
database_type + host + port + database (+ credentials) |
You want DataCoolie to assemble database options more explicitly |
Resolving a full URL from environment variables:
{
"name": "postgres_src",
"connection_type": "database",
"format": "sql",
"configure": {
"url": "DC_POSTGRES_URL"
},
"secrets_ref": {
"env": ["url"]
}
}
Set DC_POSTGRES_URL=postgresql+psycopg2://realuser:realpass@host:5432/mydb
in your environment. DataCoolie replaces configure.url at runtime.
Dataflow source block:
"source": {
"connection_name": "postgres_src",
"schema_name": "public",
"table": "orders",
"watermark_columns": ["updated_at"]
}
schema_name maps to the SQL schema, table to the SQL table name.
Query mode¶
When the source is a SQL query, use source.query instead of source.table:
"source": {
"connection_name": "postgres_src",
"query": "SELECT * FROM sales.orders WHERE status = 'OPEN'",
"watermark_columns": ["updated_at"]
}
If watermark_columns are set, DataCoolie wraps the query as a subquery and
applies the watermark filter outside it.
REST API source¶
Use when reading from a paginated REST API.
{
"name": "orders_api",
"connection_type": "api",
"format": "api",
"configure": {
"base_url": "https://api.example.com/v1",
"auth_type": "bearer",
"auth_token": "DC_ORDERS_API_TOKEN",
"timeout": 30,
"default_headers": { "Accept": "application/json" }
},
"secrets_ref": {
"env": ["auth_token"]
}
}
Put request shape and pagination on the source, not on the connection:
"source": {
"connection_name": "orders_api",
"table": "orders",
"watermark_columns": ["updated_at"],
"configure": {
"endpoint": "/orders",
"method": "GET",
"params": { "status": "open" },
"pagination_type": "offset",
"page_size": 200,
"total_path": "meta.total",
"data_path": "data.items"
}
}
| Connection-level key | Purpose |
|---|---|
base_url |
Required root URL |
auth_type |
bearer, basic, api_key, oauth2_client_credentials, aws_sigv4 |
auth_token / username / password / api_key_* |
Auth credentials |
token_url, client_id, client_secret |
OAuth2 client-credentials flow |
default_headers, timeout |
Shared request defaults |
| Source-level key | Purpose |
|---|---|
endpoint |
Path appended to base_url |
method, params, body |
Request shape |
pagination_type |
offset, cursor, or next_link |
page_size, max_pages, data_path |
Response traversal and size |
total_path, offset_max_workers |
Parallel offset pagination |
rate_limit_delay, max_retries |
Backoff behavior |
Watermark push-down for APIs¶
Instead of filtering only after the response arrives, DataCoolie can inject the last saved watermark into request params or body:
"source": {
"connection_name": "orders_api",
"watermark_columns": ["updated_at"],
"configure": {
"endpoint": "/orders",
"watermark_param_mapping": { "updated_at": "updated_since" },
"watermark_to_param": "updated_before",
"watermark_param_location": "params",
"watermark_param_format": "iso"
}
}
Range-split API fetch¶
For large historical windows, the API reader can split a time range into parallel calls:
"source": {
"connection_name": "orders_api",
"watermark_columns": ["updated_at"],
"configure": {
"endpoint": "/orders",
"watermark_to_param": "updated_before",
"watermark_range_interval_unit": "day",
"watermark_range_interval_amount": 1,
"watermark_range_start": "2026-01-01T00:00:00Z",
"watermark_range_max_workers": 4
}
}
table is optional for APIs
The API reader uses connection.configure.base_url plus
source.configure.endpoint. source.table is best treated as a logical
label for your metadata or logging, not as the actual HTTP path.
Python function source¶
Use when your data comes from a custom Python function (streaming generator, SDK call, in-memory dataset, etc.).
The actual function path goes on source.python_function:
Dataflow source block:
"source": {
"connection_name": "custom_src",
"python_function": "mypkg.sources.load_orders",
"watermark_columns": ["updated_at"],
"configure": {
"api_base": "https://partner.example.com"
}
}
load_orders must accept (engine, source, watermark) and return a DataFrame
compatible with the active engine. Read custom arguments from source.configure
inside your function.
If you want to restrict which Python functions may be imported from metadata,
pass allowed_function_prefixes in DataCoolieRunConfig.
Incremental reads with watermark_columns¶
For any source type, add watermark_columns to the source block to enable
incremental loading:
"source": {
"connection_name": "postgres_src",
"schema_name": "public",
"table": "orders",
"watermark_columns": ["updated_at"]
}
On the first run DataCoolie reads everything. After a successful run it saves
the max updated_at value. On subsequent runs it adds WHERE updated_at > <saved value>.
Behavior differs by source family:
| Source family | Watermark behavior |
|---|---|
| Parquet / Delta / Iceberg | Predicate push-down during read |
| Database | WHERE clause pushed to SQL |
| API | Injected into request params/body when configured |
| CSV / JSON / Excel | Read then filter in the engine |
| Python function | Watermark is passed into the function first, then the framework filters again after the function returns |
See Concepts · Watermarks for the storage format and provider interaction.
Common mistakes¶
| Symptom | Likely cause | Fix |
|---|---|---|
ValidationError: format not allowed |
connection_type and format don't match |
Use lakehouse + delta, not file + delta |
| Path not found | base_path / schema_name / table resolves wrong |
Check base_path exists; schema_name is optional |
APIReader requires 'base_url' |
API connection used url instead of base_url |
Put the root URL in connection.configure.base_url |
PythonFunctionReader requires source.python_function |
Function path was put on the connection or omitted | Put a dotted path like mypkg.sources.load_orders on source.python_function |
| Full load every run even with watermarks | watermark_columns missing from source block, or push-down config missing for APIs |
Add "watermark_columns": [...] and API watermark mapping when needed |
| Credentials exposed in logs | URL/token/password is hardcoded | Move the field to configure, store the secret name there, and resolve with secrets_ref |