Skip to content

API Reference

Overview of the daglite public API.

Daglite: Lightweight Python framework for building eager task pipelines.

task

task(func: Callable[P, AsyncIterator[R]]) -> AsyncTaskStream[P, R]
task(func: Callable[P, Iterator[R]]) -> SyncTaskStream[P, R]
task(func: Callable[P, Coroutine[Any, Any, R]]) -> AsyncTask[P, R]
task(func: Callable[P, R]) -> SyncTask[P, R]
task(
    *,
    name: str | None = None,
    description: str | None = None,
    backend: str | None = None,
    retries: int = 0,
    timeout: float | None = None,
    cache: bool = False,
    cache_store: CacheStore | str | None = None,
    cache_ttl: int | None = None,
    cache_hash: CacheHashFn | None = None,
    dataset: str | None = None,
    dataset_store: DatasetStore | str | None = None,
    dataset_format: str | None = None,
) -> _TaskDecorator
task(
    func: Any = None,
    *,
    name: str | None = None,
    description: str | None = None,
    backend: str | None = None,
    retries: int = 0,
    timeout: float | None = None,
    cache: bool = False,
    cache_store: CacheStore | str | None = None,
    cache_ttl: int | None = None,
    cache_hash: CacheHashFn | None = None,
    dataset: str | None = None,
    dataset_store: DatasetStore | str | None = None,
    dataset_format: str | None = None,
) -> Any

Creates a daglite task from a sync or async function.

The decorated function executes immediately on call (no futures, no graph).

When called inside an active session or workflow it emits events, fires hooks, participates in caching, and can interact with dataset stores.

Parameters:

Name Type Description Default
name str | None

Custom name of the task. Can include {param} placeholders for argument values and ({map_index} for the current map iteration index (if applicable). Defaults to the function's __name__ if not provided. Cannot contain period (.) characters.

None
description str | None

Description of the task. Defaults to the function's docstring if not provided.

None
backend str | None

Backend name for parallel operations. If None, inherits from either the active session or the default settings backend.

None
retries int

Number of automatic task retries on failure.

0
timeout float | None

Max execution time in seconds.

None
cache bool

Indicates whether the task participates in caching. Possible values include * False (default) — no caching. * True — use the session's cache store (if any). * str — path to a file based cache store for this task. * CacheStore instance — use the provided store for this task.

False
cache_ttl int | None

Cache time-to-live (TTL) in seconds. This parameter ignored if cache is False or a store with its own TTL is provided.

None
cache_hash CacheHashFn | None

Custom (func, inputs) -> str hash function. The default hash function uses the function's qualified name and the bound arguments to produce a stable hash. A custom hash function can be provided to override this behavior, for example to ignore certain arguments or to use a different hashing algorithm. This parameter is ignored if caching is disabled.

None

Returns:

Type Description
Any

A sync or async eager task callable with the original signature.

Source code in src/daglite/tasks.py
def task(  # noqa: D417
    func: Any = None,
    *,
    name: str | None = None,
    description: str | None = None,
    backend: str | None = None,
    retries: int = 0,
    timeout: float | None = None,
    cache: bool = False,
    cache_store: CacheStore | str | None = None,
    cache_ttl: int | None = None,
    cache_hash: CacheHashFn | None = None,
    dataset: str | None = None,
    dataset_store: DatasetStore | str | None = None,
    dataset_format: str | None = None,
) -> Any:
    """
    Creates a daglite task from a sync or async function.

    The decorated function executes immediately on call (no futures, no graph).

    When called inside an active session or workflow it emits events, fires hooks, participates
    in caching, and can interact with dataset stores.

    Args:
        name: Custom name of the task. Can include `{param}` placeholders for argument values and
            (`{map_index}` for the current map iteration index (if applicable). Defaults to the
            function's `__name__` if not provided. Cannot contain period (`.`) characters.

        description: Description of the task. Defaults to the function's docstring if not provided.

        backend: Backend name for parallel operations. If `None`, inherits from either the active
            session or the default settings backend.

        retries: Number of automatic task retries on failure.

        timeout: Max execution time in seconds.

        cache: Indicates whether the task participates in caching. Possible values include
            * `False` (default) — no caching.
            * `True` — use the session's cache store (if any).
            * `str` — path to a file based cache store for this task.
            * `CacheStore` instance — use the provided store for this task.

        cache_ttl: Cache time-to-live (TTL) in seconds. This parameter ignored if cache is `False`
            or a store with its own TTL is provided.

        cache_hash: Custom `(func, inputs) -> str` hash function. The default hash function uses
            the function's qualified name and the bound arguments to produce a stable hash. A
            custom hash function can be provided to override this behavior, for example to ignore
            certain arguments or to use a different hashing algorithm. This parameter is ignored if
            caching is disabled.

    Returns:
        A sync or async eager task callable with the original signature.
    """

    def decorator(
        fn: Callable[..., Any],
    ) -> (
        SyncTask[Any, Any]
        | AsyncTask[Any, Any]
        | SyncTaskStream[Any, Any]
        | AsyncTaskStream[Any, Any]
    ):
        if inspect.isclass(fn) or not callable(fn):
            raise TypeError("`@task` can only be applied to callable functions.")

        _name = name if name is not None else getattr(fn, "__name__", "unnamed_task")
        _description = description if description is not None else getattr(fn, "__doc__", "") or ""

        # Validate name template at decoration time.
        name_placeholders = parse_template(_name)
        if name_placeholders:
            param_names = set(inspect.signature(fn).parameters) | _BUILTIN_TEMPLATE_VARS
            unknown = name_placeholders - param_names
            if unknown:
                raise ValueError(
                    f"Name template '{_name}' references {unknown} which won't be available "
                    f"at runtime. Available placeholders: {sorted(param_names)}."
                )

        # Validate dataset key template at decoration time.
        if dataset:
            ds_placeholders = parse_template(dataset)
            if ds_placeholders:
                param_names = set(inspect.signature(fn).parameters) | _BUILTIN_TEMPLATE_VARS
                unknown = ds_placeholders - param_names
                if unknown:
                    raise ValueError(
                        f"Dataset template '{dataset}' references {unknown} which won't be "
                        f"available at runtime. Available placeholders: {sorted(param_names)}."
                    )

        # Store original function in module namespace for pickling (process backend)
        if hasattr(fn, "__module__") and hasattr(fn, "__name__"):  # pragma: no branch
            module = sys.modules.get(fn.__module__)
            if module is not None:  # pragma: no branch
                private_name = f"__{fn.__name__}_func__"
                setattr(module, private_name, fn)
                fn.__qualname__ = private_name

        # Resolve task based on function type
        is_sync_gen = inspect.isgeneratorfunction(fn)
        is_async_gen = inspect.isasyncgenfunction(fn)
        if is_sync_gen or is_async_gen:
            if cache:
                raise ValueError("Caching is not supported for generator tasks.")
            if retries > 0:
                raise ValueError("Retries are not supported for generator tasks.")
            if timeout is not None:
                raise ValueError("Timeouts are not supported for generator tasks.")
            if is_sync_gen:
                cls = SyncTaskStream
            else:
                cls = AsyncTaskStream
        elif inspect.iscoroutinefunction(fn):
            cls = AsyncTask
        else:
            cls = SyncTask

        return cls(
            func=fn,
            name=_name,
            description=_description,
            backend=backend,
            retries=retries,
            timeout=timeout,
            cache=cache,
            cache_store=cache_store,
            cache_ttl=cache_ttl,
            cache_hash_fn=cache_hash,
            dataset=dataset,
            dataset_store=dataset_store,
            dataset_format=dataset_format,
        )

    if func is not None:
        return decorator(func)
    return decorator

workflow

workflow(func: Callable[P, Coroutine[Any, Any, R]]) -> AsyncWorkflow[P, R]
workflow(func: Callable[P, R]) -> SyncWorkflow[P, R]
workflow(*, name: str | None = None, description: str | None = None) -> _WorkflowDecorator
workflow(func: Any = None, *, name: str | None = None, description: str | None = None) -> Any

Decorator to convert a Python function into a daglite workflow.

Workflows are named entry points that wrap a function calling @task-decorated functions. Tasks execute eagerly inside the workflow — they run immediately and return real values.

Calling a workflow sets up a managed session that provides backend, cache, plugin, and event infrastructure. Sync workflows return the result directly; async workflows return a coroutine that the caller must await.

Note that workflows cannot wrap generator functions. Use @task for generator functions or collect results inside the workflow.

Parameters:

Name Type Description Default
func Any

The function to wrap. When used without parentheses (@workflow), this is automatically passed. When used with parentheses (@workflow()), this is None.

None
name str | None

Custom name for the workflow. Defaults to the function's __name__.

None
description str | None

Workflow description. Defaults to the function's docstring.

None

Returns:

Type Description
Any

A SyncWorkflow or AsyncWorkflow (when used as @workflow) or a decorator function

Any

(when used as @workflow()).

Examples:

>>> from daglite import task, workflow
>>> @task
... def add(x: int, y: int) -> int:
...     return x + y
>>> @task
... def mul(x: int, y: int) -> int:
...     return x * y

Single-value workflow

>>> @workflow
... def my_workflow(x: int, y: int):
...     return add(x=x, y=y)
>>> my_workflow(2, 3)
5

Multi-step workflow

>>> @workflow
... def chain_workflow(x: int, y: int):
...     a = add(x=x, y=y)
...     return mul(x=a, y=10)
>>> chain_workflow(2, 3)
50
Source code in src/daglite/workflows.py
def workflow(
    func: Any = None,
    *,
    name: str | None = None,
    description: str | None = None,
) -> Any:
    """
    Decorator to convert a Python function into a daglite workflow.

    Workflows are **named entry points** that wrap a function calling `@task`-decorated functions.
    Tasks execute eagerly inside the workflow — they run immediately and return real values.

    Calling a workflow sets up a managed session that provides backend, cache, plugin, and event
    infrastructure. Sync workflows return the result directly; async workflows return a coroutine
    that the caller must `await`.

    Note that workflows cannot wrap generator functions. Use `@task` for generator functions or
    collect results inside the workflow.

    Args:
        func: The function to wrap. When used without parentheses (`@workflow`), this is
            automatically passed.  When used with parentheses (`@workflow()`), this is `None`.
        name: Custom name for the workflow.  Defaults to the function's `__name__`.
        description: Workflow description.  Defaults to the function's docstring.

    Returns:
        A `SyncWorkflow` or `AsyncWorkflow` (when used as `@workflow`) or a decorator function
        (when used as `@workflow()`).

    Examples:
        >>> from daglite import task, workflow
        >>> @task
        ... def add(x: int, y: int) -> int:
        ...     return x + y
        >>> @task
        ... def mul(x: int, y: int) -> int:
        ...     return x * y

        Single-value workflow
        >>> @workflow
        ... def my_workflow(x: int, y: int):
        ...     return add(x=x, y=y)
        >>> my_workflow(2, 3)
        5

        Multi-step workflow
        >>> @workflow
        ... def chain_workflow(x: int, y: int):
        ...     a = add(x=x, y=y)
        ...     return mul(x=a, y=10)
        >>> chain_workflow(2, 3)
        50
    """

    def decorator(fn: Callable[..., Any]) -> SyncWorkflow[Any, Any] | AsyncWorkflow[Any, Any]:
        if inspect.isclass(fn) or not callable(fn):
            raise TypeError("`@workflow` can only be applied to callable functions.")

        if inspect.isgeneratorfunction(fn) or inspect.isasyncgenfunction(fn):
            raise TypeError(
                "`@workflow` cannot wrap generator functions. Use `@task` for generator functions "
                "or collect results inside the workflow."
            )

        _name = name if name is not None else getattr(fn, "__name__", "unnamed_workflow")
        _description = description if description is not None else getattr(fn, "__doc__", "") or ""

        if inspect.iscoroutinefunction(fn):
            return AsyncWorkflow(func=fn, name=_name, description=_description)
        return SyncWorkflow(func=fn, name=_name, description=_description)

    if func is not None:
        return decorator(func)

    return decorator

async_session async

async_session(
    *,
    backend: str | None = None,
    cache_store: str | CacheStore | None = None,
    dataset_store: str | DatasetStore | None = None,
    plugins: SerializablePlugin | list[SerializablePlugin] | None = None,
    settings: DagliteSettings | None = None,
) -> AsyncIterator[SessionContext]

Async context manager that sets up an eager execution context.

Parameters:

Name Type Description Default
backend str | None

Name of the backend to default. If None uses settings.backend.

None
cache_store str | CacheStore | None

Cache store configuration. Can be a CacheStore or a string path.

None
dataset_store str | DatasetStore | None

Dataset store configuration. Can be a DatasetStore or a string path.

None
plugins SerializablePlugin | list[SerializablePlugin] | None

Extra plugin instances for this session only.

None
settings DagliteSettings | None

A DagliteSettings override. Falls back to the global settings singleton.

None

Yields:

Type Description
AsyncIterator[SessionContext]

The active SessionContext for the duration of the block.

Source code in src/daglite/session.py
@asynccontextmanager
async def async_session(
    *,
    backend: str | None = None,
    cache_store: str | CacheStore | None = None,
    dataset_store: str | DatasetStore | None = None,
    plugins: SerializablePlugin | list[SerializablePlugin] | None = None,
    settings: DagliteSettings | None = None,
) -> AsyncIterator[SessionContext]:
    """
    Async context manager that sets up an eager execution context.

    Args:
        backend: Name of the backend to default. If `None` uses `settings.backend`.
        cache_store: Cache store configuration. Can be a `CacheStore` or a string path.
        dataset_store: Dataset store configuration. Can be a `DatasetStore` or a string path.
        plugins: Extra plugin instances for this session only.
        settings: A `DagliteSettings` override. Falls back to the global settings singleton.

    Yields:
        The active `SessionContext` for the duration of the block.
    """
    plugins = plugins if isinstance(plugins, list) else [plugins] if plugins is not None else []
    ctx = _build_context(
        backend=backend,
        cache_store=cache_store,
        dataset_store=dataset_store,
        plugins=plugins,
        settings=settings,
    )
    with ctx:
        with _processors_context(ctx):
            yield ctx

gather_tasks async

gather_tasks(
    task: Callable[..., AsyncIterator[R]], *iterables: Iterable[Any], backend: str | None = None
) -> list[list[R]]
gather_tasks(
    task: Callable[..., Coroutine[Any, Any, R]],
    *iterables: Iterable[Any],
    backend: str | None = None,
) -> list[R]
gather_tasks(
    task: Callable[..., R], *iterables: Iterable[Any], backend: str | None = None
) -> list[R]
gather_tasks(
    task: Callable[..., Any], *iterables: Iterable[Any], backend: str | None = None
) -> list[Any]

Gather async tasks concurrently across iterables using asyncio.gather.

Parameters:

Name Type Description Default
task Callable[..., Any]

An async @task-decorated callable.

required
*iterables Iterable[Any]

Iterables whose elements are zipped and unpacked as arguments to task.

()
backend str | None

Backend override. None inherits from the active session or global settings.

None

Returns:

Type Description
list[Any]

Ordered list of results, one per zipped item tuple.

Raises:

Type Description
TypeError

If task is not a @task-decorated callable or is not async.

Source code in src/daglite/composers.py
async def gather_tasks(
    task: Callable[..., Any], *iterables: Iterable[Any], backend: str | None = None
) -> list[Any]:
    """
    Gather async tasks concurrently across iterables using `asyncio.gather`.

    Args:
        task: An async `@task`-decorated callable.
        *iterables: Iterables whose elements are zipped and unpacked as arguments to `task`.
        backend: Backend override. `None` inherits from the active session or global settings.

    Returns:
        Ordered list of results, one per zipped item tuple.

    Raises:
        TypeError: If *task* is not a `@task`-decorated callable or is not async.
    """
    task = _ensure_task(task, "gather_tasks")

    if not getattr(task, "is_async", False):
        raise TypeError(
            "`gather_tasks` requires an async `@task`. Use `map_tasks` for sync tasks, or make "
            "your task async."
        )

    items = list(zip(*iterables))

    if not items:
        return []

    futures = [_async_indexed_call(task, i, args) for i, args in enumerate(items)]
    return list(await asyncio.gather(*futures))

map_tasks

map_tasks(
    task: Callable[..., AsyncIterator[R]], *iterables: Iterable[Any], backend: str | None = None
) -> list[list[R]]
map_tasks(
    task: Callable[..., Iterator[R]], *iterables: Iterable[Any], backend: str | None = None
) -> list[list[R]]
map_tasks(task: Callable[..., R], *iterables: Iterable[Any], backend: str | None = None) -> list[R]
map_tasks(
    task: Callable[..., Any], *iterables: Iterable[Any], backend: str | None = None
) -> list[Any]

Map a task across iterables using the active backend.

Each item is executed as a separate task call with full event emission and hook dispatch (if a session is active). Async tasks are transparently wrapped with asyncio.run so that they can be dispatched to backends that execute callables in worker threads or processes.

Parameters:

Name Type Description Default
task Callable[..., Any]

Task to be mapped. Must be a @task-decorated callable, either sync or async.

required
*iterables Iterable[Any]

Iterables whose elements are zipped and unpacked as arguments to task.

()
backend str | None

Backend override. None inherits from the active session or global settings.

None

Returns:

Type Description
list[Any]

Ordered list of results, one per zipped item tuple.

Raises:

Type Description
TypeError

If task is not a @task-decorated callable.

Source code in src/daglite/composers.py
def map_tasks(
    task: Callable[..., Any], *iterables: Iterable[Any], backend: str | None = None
) -> list[Any]:
    """
    Map a task across iterables using the active backend.

    Each item is executed as a separate task call with full event emission and hook dispatch (if
    a session is active). Async tasks are transparently wrapped with `asyncio.run` so that they can
    be dispatched to backends that execute callables in worker threads or processes.

    Args:
        task: Task to be mapped. Must be a `@task`-decorated callable, either sync or async.
        *iterables: Iterables whose elements are zipped and unpacked as arguments to `task`.
        backend: Backend override. `None` inherits from the active session or global settings.

    Returns:
        Ordered list of results, one per zipped item tuple.

    Raises:
        TypeError: If *task* is not a `@task`-decorated callable.
    """
    from daglite.backends.manager import BackendManager

    callable_task = _make_backend_callable(task, "map_tasks")

    # Avoid over-saturation: nested maps default to inline when no explicit backend is given.
    if backend is None and resolve_task_metadata():
        backend = "inline"

    items = list(zip(*iterables))
    if not items:
        return []

    instance = BackendManager.get_active().get(backend)
    futures = [instance.submit(callable_task, args, map_index=i) for i, args in enumerate(items)]

    return [f.result() for f in futures]

load_dataset

load_dataset(
    key: str,
    return_type: type[R],
    *,
    format: str | None = None,
    store: DatasetStore | str | None = None,
    options: dict[str, Any] | None = None,
) -> R
load_dataset(
    key: str,
    *,
    format: str | None = None,
    store: DatasetStore | str | None = None,
    options: dict[str, Any] | None = None,
) -> Any
load_dataset(
    key: str,
    return_type: type[R] | None = None,
    *,
    format: str | None = None,
    store: DatasetStore | str | None = None,
    options: dict[str, Any] | None = None,
) -> R | Any

Load a dataset from the active (or explicitly provided) dataset store.

Parameters:

Name Type Description Default
key str

Storage key/path. May contain {param} placeholders that are resolved from the current task's bound arguments.

required
return_type type[R] | None

Expected Python type for deserialization dispatch.

None
format str | None

Serialization format hint (e.g. "pickle").

None
store DatasetStore | str | None

Explicit DatasetStore or string path. When None, the store is resolved from the context chain.

None
options dict[str, Any] | None

Additional options forwarded to the Dataset constructor.

None

Returns:

Type Description
R | Any

The deserialized Python object.

Source code in src/daglite/composers.py
def load_dataset(
    key: str,
    return_type: type[R] | None = None,
    *,
    format: str | None = None,
    store: DatasetStore | str | None = None,
    options: dict[str, Any] | None = None,
) -> R | Any:
    """
    Load a dataset from the active (or explicitly provided) dataset store.

    Args:
        key: Storage key/path.  May contain ``{param}`` placeholders that are resolved from the
            current task's bound arguments.
        return_type: Expected Python type for deserialization dispatch.
        format: Serialization format hint (e.g. ``"pickle"``).
        store: Explicit ``DatasetStore`` or string path.  When ``None``, the
            store is resolved from the context chain.
        options: Additional options forwarded to the Dataset constructor.

    Returns:
        The deserialized Python object.
    """
    resolved_store = resolve_dataset_store(store)

    hook = resolve_hook()
    metadata = resolve_task_metadata()

    hook_kw = {
        "key": key,
        "return_type": return_type,
        "format": format,
        "options": options,
        "metadata": metadata,
    }

    hook.before_dataset_load(**hook_kw)

    t0 = time.perf_counter()
    result = resolved_store.load(key, return_type=return_type, format=format, options=options)
    duration = time.perf_counter() - t0

    hook.after_dataset_load(**hook_kw, result=result, duration=duration)

    return result

save_dataset

save_dataset(
    key: str,
    value: Any,
    *,
    format: str | None = None,
    store: DatasetStore | str | None = None,
    options: dict[str, Any] | None = None,
) -> str

Save a value to the active (or explicitly provided) dataset store.

When a DatasetReporter is available (inside a session), the save is routed through the reporter so that process/remote backends push the write back to the coordinator.

The store is resolved from the context chain: explicit argument -> task context -> session context -> global settings.

Parameters:

Name Type Description Default
key str

Storage key/path. May contain {param} placeholders resolved from the current task's bound arguments.

required
value Any

The Python object to serialize and persist.

required
format str | None

Serialization format hint (e.g. "pickle").

None
store DatasetStore | str | None

Explicit DatasetStore or string path. When None, the store is resolved from the context chain.

None
options dict[str, Any] | None

Additional options forwarded to the Dataset constructor.

None

Returns:

Type Description
str

The actual path where data was stored.

Source code in src/daglite/composers.py
def save_dataset(
    key: str,
    value: Any,
    *,
    format: str | None = None,
    store: DatasetStore | str | None = None,
    options: dict[str, Any] | None = None,
) -> str:
    """
    Save a value to the active (or explicitly provided) dataset store.

    When a ``DatasetReporter`` is available (inside a session), the save is routed through
    the reporter so that process/remote backends push the write back to the coordinator.

    The store is resolved from the context chain: explicit argument -> task context
    -> session context -> global settings.

    Args:
        key: Storage key/path.  May contain ``{param}`` placeholders resolved from the current
            task's bound arguments.
        value: The Python object to serialize and persist.
        format: Serialization format hint (e.g. ``"pickle"``).
        store: Explicit ``DatasetStore`` or string path.  When ``None``, the
            store is resolved from the context chain.
        options: Additional options forwarded to the Dataset constructor.

    Returns:
        The actual path where data was stored.
    """
    resolved_store = resolve_dataset_store(store)

    metadata = resolve_task_metadata()
    reporter = resolve_dataset_reporter()

    # Route through the reporter when available (handles coordination + hooks).
    if reporter is not None:
        reporter.save(key, value, resolved_store, format=format, options=options, metadata=metadata)
        return key

    # No reporter (bare call outside session) — save directly with hook dispatch.
    hook = resolve_hook()
    hook_kw: dict[str, Any] = {
        "key": key,
        "value": value,
        "format": format,
        "options": options,
        "metadata": metadata,
    }

    hook.before_dataset_save(**hook_kw)

    path = resolved_store.save(key, value, format=format, options=options)

    hook.after_dataset_save(**hook_kw)

    return path