Skip to main content
Loaders are Python functions that load data into source tables. They replace expression sources and manual ETL scripts with code that lives inside your project, runs as part of the build, and supports incremental write strategies. Loaders are one of the four Python node kinds, and the only one that writes into a SQL source.

How it works

  1. Write a Python function under loaders/ decorated with @loader
  2. Declare a managed source in sources/*.yml with managed: true and the same name as the loader function
  3. SQLBuild calls the function, writes returned rows to a staging table, then applies the configured write strategy to the target
Loaders participate in the build lifecycle. When sqb build runs, managed sources are loaded before any dependent model is materialized.

Defining a loader

Place Python files under loaders/ in your project directory. Each file can contain one or more loader functions:
# loaders/raw_sources.py
from sqlbuild.loaders import loader
from sqlbuild.executor.load.models import LoaderContext

@loader
def raw_customers(ctx: LoaderContext) -> list[dict[str, object]]:
    return [
        {"id": 1, "name": "Leslie Knope", "email": "leslie@pawnee.gov"},
        {"id": 2, "name": "Ron Swanson", "email": "ron@pawnee.gov"},
    ]
The function receives a LoaderContext and returns rows as a list of dicts, an iterator of dicts, or None for self-managed loaders.

Binding to a source

Declare a managed source in sources/*.yml. A managed source is bound to the loader function with the same name - there is no separate loader field:
sources:
  - name: raw_customers
    managed: true
    write_strategy: table
    columns:
      - name: id
        type: INTEGER
      - name: name
        type: VARCHAR
      - name: email
        type: VARCHAR
Setting managed: true makes this a managed source - SQLBuild owns both the loading and the schema. The binding is by name: the source raw_customers is populated by the @loader function named raw_customers. SQLBuild raises an error if a managed source has no loader function of the same name. Models reference managed sources the same way as any other source:
SELECT id, name FROM __source("raw_customers")

Write strategies

The write_strategy field controls how returned rows are written to the target table.

table

Full replace. The target is dropped and recreated from the loader output on every run.
sources:
  - name: raw_countries
    managed: true
    write_strategy: table
    columns:
      - name: country_id
        type: INTEGER
      - name: country_code
        type: VARCHAR

append

Insert all returned rows into the target. No deduplication.
sources:
  - name: raw_webhook_events
    managed: true
    write_strategy: append
    columns:
      - name: event_id
        type: INTEGER
      - name: event_name
        type: VARCHAR

delete_insert

Delete rows in the cursor range, then insert replacements. Requires cursor_column.
sources:
  - name: raw_order_events
    managed: true
    write_strategy: delete_insert
    cursor_column: event_at
    columns:
      - name: event_id
        type: INTEGER
      - name: event_at
        type: TIMESTAMP
      - name: amount_cents
        type: INTEGER
The loader receives ctx.current_cursor_value with the current MAX(cursor_column) from the target, so it can fetch only new or updated data. Its function name matches the source name (raw_order_events):
@loader
def raw_order_events(ctx: LoaderContext) -> list[dict[str, object]]:
    if ctx.current_cursor_value is None:
        return fetch_all_events()
    return fetch_events_since(ctx.current_cursor_value)

merge

Upsert based on unique_key. Requires both unique_key and cursor_column.
sources:
  - name: raw_customers
    managed: true
    write_strategy: merge
    unique_key: customer_id
    cursor_column: updated_at
    columns:
      - name: customer_id
        type: INTEGER
      - name: plan_name
        type: VARCHAR
      - name: updated_at
        type: TIMESTAMP
Existing rows matching the unique key are updated; new rows are inserted.

Self-managed loaders

If a loader returns None, SQLBuild skips its row-writing pipeline. The loader is responsible for writing data to the target itself, using whatever approach makes sense - ctx.execute_sql(), an external library, a subprocess, or anything else:
@loader
def raw_status(ctx: LoaderContext) -> None:
    ctx.execute_sql(f"DROP TABLE IF EXISTS {ctx.destination}")
    ctx.execute_sql(
        f"CREATE TABLE {ctx.destination} AS "
        "SELECT 1 AS status_id, 'loaded' AS status_name"
    )
The source is still declared as managed, just without a write_strategy:
sources:
  - name: raw_status
    managed: true
    columns:
      - name: status_id
        type: INTEGER
      - name: status_name
        type: VARCHAR
Self-managed loaders must not declare a write_strategy. They are useful when you want to use adapter-specific SQL (e.g. COPY INTO, external tables), call an external ingestion tool like dlt, or handle writes in a way that doesn’t fit the dict-return pattern.

Loader context

Every loader function receives a LoaderContext as its first argument. It provides access to the destination relation, cursor state, active target, and helper methods.

Properties

PropertyTypeDescription
destinationstrFully-qualified destination relation name (where rows are written)
destination_databasestr | NoneDestination database
destination_schemastr | NoneDestination schema
destination_namestrUnqualified destination table name
current_cursor_valueobject | NoneCurrent MAX(cursor_column) from the destination, or None if the table does not exist or has no cursor column
run_idstrUnique identifier for this execution run
targetstr | NoneActive target name (e.g. dev, prod)
varsdictProject variables (merged from project, target, and local config)
is_reloadboolTrue when --reload was passed
start_cursor_tsdatetime | NoneTimestamp cursor start override from --start-cursor-ts
end_cursor_tsdatetime | NoneTimestamp cursor end override from --end-cursor-ts
start_cursor_intint | NoneInteger cursor start override from --start-cursor-int
end_cursor_intint | NoneInteger cursor end override from --end-cursor-int
adapterBaseAdapterThe database adapter instance
connectionobjectThe active database connection
loggerLoggerPython logger scoped to the loader

Methods

MethodDescription
execute_sql(sql)Execute a SQL statement against the connection
query(sql)Execute a SQL query and return the cursor
log(message)Log a message to the execution lifecycle output
qualify_name(name)Return a fully-qualified relation name in the destination database/schema
skip(reason, mode=...)Skip this loader. mode is "soft" (default, skip only this loader) or "hard" (also block dependents)
result(payload=, metadata=, materialized=)Return a structured result for a self-managed loader
result_of(node_fn)Read the latest persisted result of an upstream node (current or previous run)
results_of(node_fn, limit=N)Read the last N successful results of an upstream node, newest first
loader(loader_fn)Return a LoaderRelationRef for an upstream loader dependency
source(source_name)Return a LoaderRelationRef for a project source by YAML name

LoaderRelationRef

Returned by ctx.loader() and ctx.source(). Provides access to an upstream relation:
Property / MethodDescription
destinationFully-qualified relation name
current_cursor_valueCurrent MAX(cursor_column) from the relation
max(column)Return the MAX of any column from the relation

Loader dependencies

Loaders can depend on other loaders using depends_on. Dependencies are executed first, and their destination relations are available via ctx.loader():
from sqlbuild.loaders import loader
from sqlbuild.executor.load.models import LoaderContext

@loader
def raw_accounts(ctx: LoaderContext) -> list[dict[str, object]]:
    return [
        {"account_id": 1, "account_name": "Pawnee Parks"},
        {"account_id": 2, "account_name": "Eagleton"},
    ]

@loader(depends_on=[raw_accounts])
def raw_account_metrics(ctx: LoaderContext) -> list[dict[str, object]]:
    accounts = ctx.loader(raw_accounts)
    rows = ctx.query(f"SELECT account_id FROM {accounts.destination}")
    return [
        {"account_id": row[0], "metric": "active"}
        for row in rows.fetchall()
    ]
Dependencies form a DAG. SQLBuild schedules loaders in topological order and executes independent loaders concurrently when --concurrency is set. Intermediate loaders (those referenced only via depends_on, with no managed source of the same name) are given synthetic source entries and write to __loader__<name> tables by default. Only the terminal loader - the one whose name matches a managed source - populates that source; intermediate loaders feed it. Use the destination parameter on the decorator to override the intermediate relation:
@loader(destination="staging.shared_accounts")
def raw_accounts(ctx: LoaderContext):
    ...

Decorator parameters

The @loader decorator accepts optional parameters that can also be set in the source YAML. When both are specified, the YAML takes precedence.
ParameterDescription
depends_onList of loader functions this loader depends on
destinationOverride the destination relation name (can include schema or database)
write_strategytable, append, delete_insert, or merge
cursor_columnColumn used for incremental cursor tracking
unique_keyColumn(s) used as the merge key (string or list of strings)
columnsColumn specifications with name, type, nullable, and description
contractenforced or none

Auto-load during builds

By default, sqb build automatically loads managed sources before building dependent models. This is controlled by the auto_load_sources setting:
[settings]
auto_load_sources = true   # default
You can also control this per-run with CLI flags:
# Explicitly load sources before building
sqb build --load

# Skip source loading
sqb build --no-load

# Reload sources (passes is_reload=True to loaders)
sqb build --reload
When --reload is passed, ctx.is_reload is True in the loader function. This lets loaders implement different behavior for full reloads versus normal incremental loads.

Source deferral

When using multiple targets, loaders write data into the active target. But models may need to read source data from a different target (e.g. reading production data while developing in dev). The defer_sources_to field controls this:
[targets.dev]
schema = "dev"
defer_sources_to = "prod"

[targets.prod]
schema = "prod"
With this config, models in the dev target read managed source data from prod schema, even though sqb load writes to dev. This prevents accidentally reading empty or partial source tables during development. If a target uses managed sources but does not declare defer_sources_to, SQLBuild raises an error rather than guessing.

Schema evolution

When a loader returns rows with columns not present in the existing target table, SQLBuild detects the schema change and adds the new columns automatically. Type mismatches between the staging table and the existing target raise an error.

Project structure

my-project/
  loaders/
    raw_sources.py          # loader functions
    api_sources.py           # more loader functions
  sources/
    raw.yml                  # managed source declarations (managed: true)
  models/
    staging/
      stg_customers.sql      # __source("raw_customers")
SQLBuild discovers all .py files under loaders/ recursively (excluding __init__.py and files starting with _). Each file is scanned for functions decorated with @loader.

Config reference

Source YAML fields for managed sources

FieldDescription
managedSet to true to bind the source to the @loader function of the same name
write_strategytable, append, delete_insert, or merge (requires managed: true)
cursor_columnColumn for incremental cursor tracking (required for delete_insert and merge)
unique_keyMerge key column(s) (required for merge)
columnsColumn declarations with types
contractenforced or none

Validation rules

  • append cannot have unique_key
  • merge requires unique_key
  • table cannot have cursor_column or unique_key
  • delete_insert requires cursor_column and cannot have unique_key
  • cursor_column requires one of append, delete_insert, or merge