Skip to main content
Tasks are Python functions that run as part of the DAG. Use them for computation, side effects, and orchestration steps that are not loading data into a source and not producing a tracked artifact. See Python Nodes for the shared model and the SQL boundary rules.

Defining a task

Place Python files under tasks/ and decorate functions with @task:
# tasks/orders.py
from sqlbuild.tasks import task, TaskContext


@task
def export_orders(ctx: TaskContext):
    rows = fetch_orders()
    return ctx.result(payload={"rows": len(rows)}, metadata={"rows": len(rows)})
The task receives a TaskContext and returns through ctx.result(...). A plain return value or None is also accepted and normalized to a successful result.

Dependencies

Tasks declare dependencies with depends_on, accepting a single function, a tuple, or a list:
@task
def fetch_orders(ctx: TaskContext):
    return ctx.result(payload=download_orders())


@task(depends_on=fetch_orders)
def summarize_orders(ctx: TaskContext):
    result = ctx.result_of(fetch_orders)
    return ctx.result(metadata={"count": len(result.payload)})
ctx.result_of(node_fn) reads the latest persisted result of an upstream node, returning a NodeResultEnvelope with payload, metadata, status, and ts fields. Results persist across runs. Use ctx.results_of(node_fn, limit=N) to read result history. Reading a missing or unsuccessful upstream raises unless you pass default=. Tasks may depend on other tasks, assets, and loaders. They may not depend on SQL models or sources as graph dependencies, but they can read them at runtime with typed references - see SQL references.

Returning results

@task
def build_export(ctx: TaskContext):
    return ctx.result(
        payload={"path": "/exports/orders.csv"},
        metadata={"rows": 1200},
    )
  • payload is the value downstream nodes read with ctx.result_of(...).
  • metadata is structured JSON for catalogs and downstream reads.
  • Tasks cannot set materialized - that is for assets.

Skipping

Return ctx.skip(...) to skip a task:
@task
def export_if_present(ctx: TaskContext):
    if not new_files_available():
        return ctx.skip("no new files")
    return ctx.result(payload=do_export())
  • "soft" (default) skips only this task; dependents may still run if another upstream succeeded.
  • "hard" skips this task and blocks its dependents.
mode accepts either a plain string or the SkipMode enum:
from sqlbuild.tasks import task, SkipMode


@task
def optional_step(ctx):
    return ctx.skip("nothing to do", mode=SkipMode.HARD)  # or mode="hard"

Retries

@task accepts a retry policy for transient failures:
from sqlbuild.retries import RetryPolicy
from sqlbuild.tasks import task


@task(retry=RetryPolicy(max_attempts=3, retry_on=(ConnectionError,)))
def call_api(ctx):
    return ctx.result(payload=fetch_from_flaky_api())
FieldDefaultDescription
max_attempts3Total attempts including the first
retry_onExceptionException class, tuple, or list to retry on
initial_delay_seconds1.0Delay before the first retry
backoff_multiplier2.0Exponential backoff multiplier
max_delay_seconds30.0Cap on any single delay
max_elapsed_secondsNoneOverall time bound for retries
jitterTrueRandomize delays to avoid synchronized retries
The default is no retry. Set retry_on explicitly rather than relying on the broad default when you can. The original exception is preserved if all attempts fail.

Decorator parameters

ParameterDescription
nameOverride the node name (defaults to the function name)
depends_onUpstream nodes (function, tuple, or list); model()/source() for read-only SQL refs
tagsLabels for selection and grouping
groupDisplay/catalog grouping
descriptionDocs (defaults to docstring)
metaFreeform JSON metadata
retryA RetryPolicy

Running tasks

# Run a task and its required graph
sqb build --select export_orders --no-tests --no-audits

# Include in a full build
sqb build --select export_orders
Tasks run during sqb build. They are not validated by checks unless you also write a check that depends on them.