Skip to content

Workflows

workflows

@workflow decorator and Workflow class for eager task entry points.

SyncWorkflow dataclass

Bases: BaseWorkflow[P, R]

Workflow wrapping a synchronous function.

Calling an instance sets up a session, executes the function, and returns its result.

Source code in src/daglite/workflows.py
@dataclass(frozen=True)
class SyncWorkflow(BaseWorkflow[P, R]):
    """
    Workflow wrapping a synchronous function.

    Calling an instance sets up a `session`, executes the function, and returns its result.
    """

    def __call__(self, *args: P.args, **kwargs: P.kwargs) -> R:  # noqa: D102
        from daglite.session import session

        with session():
            return self.func(*args, **kwargs)

AsyncWorkflow dataclass

Bases: BaseWorkflow[P, R]

Workflow wrapping an async coroutine function.

Calling an instance returns a coroutine that sets up an async_session, executes the function, and returns its result.

Source code in src/daglite/workflows.py
@dataclass(frozen=True)
class AsyncWorkflow(BaseWorkflow[P, R]):
    """
    Workflow wrapping an async coroutine function.

    Calling an instance returns a coroutine that sets up an `async_session`, executes the function,
    and returns its result.
    """

    def __call__(self, *args: P.args, **kwargs: P.kwargs) -> Coroutine[Any, Any, R]:  # noqa: D102
        return self._run(*args, **kwargs)

    async def _run(self, *args: Any, **kwargs: Any) -> R:
        from daglite.session import async_session

        async with async_session():
            result = self.func(*args, **kwargs)
            if inspect.isawaitable(result):
                return await result
            return result

workflow

workflow(func: Callable[P, Coroutine[Any, Any, R]]) -> AsyncWorkflow[P, R]
workflow(func: Callable[P, R]) -> SyncWorkflow[P, R]
workflow(*, name: str | None = None, description: str | None = None) -> _WorkflowDecorator
workflow(func: Any = None, *, name: str | None = None, description: str | None = None) -> Any

Decorator to convert a Python function into a daglite workflow.

Workflows are named entry points that wrap a function calling @task-decorated functions. Tasks execute eagerly inside the workflow — they run immediately and return real values.

Calling a workflow sets up a managed session that provides backend, cache, plugin, and event infrastructure. Sync workflows return the result directly; async workflows return a coroutine that the caller must await.

Note that workflows cannot wrap generator functions. Use @task for generator functions or collect results inside the workflow.

Parameters:

Name Type Description Default
func Any

The function to wrap. When used without parentheses (@workflow), this is automatically passed. When used with parentheses (@workflow()), this is None.

None
name str | None

Custom name for the workflow. Defaults to the function's __name__.

None
description str | None

Workflow description. Defaults to the function's docstring.

None

Returns:

Type Description
Any

A SyncWorkflow or AsyncWorkflow (when used as @workflow) or a decorator function

Any

(when used as @workflow()).

Examples:

>>> from daglite import task, workflow
>>> @task
... def add(x: int, y: int) -> int:
...     return x + y
>>> @task
... def mul(x: int, y: int) -> int:
...     return x * y

Single-value workflow

>>> @workflow
... def my_workflow(x: int, y: int):
...     return add(x=x, y=y)
>>> my_workflow(2, 3)
5

Multi-step workflow

>>> @workflow
... def chain_workflow(x: int, y: int):
...     a = add(x=x, y=y)
...     return mul(x=a, y=10)
>>> chain_workflow(2, 3)
50
Source code in src/daglite/workflows.py
def workflow(
    func: Any = None,
    *,
    name: str | None = None,
    description: str | None = None,
) -> Any:
    """
    Decorator to convert a Python function into a daglite workflow.

    Workflows are **named entry points** that wrap a function calling `@task`-decorated functions.
    Tasks execute eagerly inside the workflow — they run immediately and return real values.

    Calling a workflow sets up a managed session that provides backend, cache, plugin, and event
    infrastructure. Sync workflows return the result directly; async workflows return a coroutine
    that the caller must `await`.

    Note that workflows cannot wrap generator functions. Use `@task` for generator functions or
    collect results inside the workflow.

    Args:
        func: The function to wrap. When used without parentheses (`@workflow`), this is
            automatically passed.  When used with parentheses (`@workflow()`), this is `None`.
        name: Custom name for the workflow.  Defaults to the function's `__name__`.
        description: Workflow description.  Defaults to the function's docstring.

    Returns:
        A `SyncWorkflow` or `AsyncWorkflow` (when used as `@workflow`) or a decorator function
        (when used as `@workflow()`).

    Examples:
        >>> from daglite import task, workflow
        >>> @task
        ... def add(x: int, y: int) -> int:
        ...     return x + y
        >>> @task
        ... def mul(x: int, y: int) -> int:
        ...     return x * y

        Single-value workflow
        >>> @workflow
        ... def my_workflow(x: int, y: int):
        ...     return add(x=x, y=y)
        >>> my_workflow(2, 3)
        5

        Multi-step workflow
        >>> @workflow
        ... def chain_workflow(x: int, y: int):
        ...     a = add(x=x, y=y)
        ...     return mul(x=a, y=10)
        >>> chain_workflow(2, 3)
        50
    """

    def decorator(fn: Callable[..., Any]) -> SyncWorkflow[Any, Any] | AsyncWorkflow[Any, Any]:
        if inspect.isclass(fn) or not callable(fn):
            raise TypeError("`@workflow` can only be applied to callable functions.")

        if inspect.isgeneratorfunction(fn) or inspect.isasyncgenfunction(fn):
            raise TypeError(
                "`@workflow` cannot wrap generator functions. Use `@task` for generator functions "
                "or collect results inside the workflow."
            )

        _name = name if name is not None else getattr(fn, "__name__", "unnamed_workflow")
        _description = description if description is not None else getattr(fn, "__doc__", "") or ""

        if inspect.iscoroutinefunction(fn):
            return AsyncWorkflow(func=fn, name=_name, description=_description)
        return SyncWorkflow(func=fn, name=_name, description=_description)

    if func is not None:
        return decorator(func)

    return decorator