Skip to content

Backends

backends

Task execution backends for DagLite.

Backend

Bases: ABC

Abstract base class for task execution backends.

A backend defines how task calls are executed — sequentially, across threads, across processes, or on a remote cluster. Custom backends can be registered with the BackendManager to extend daglite.

Subclasses must implement _submit.

Lifecycle:

  1. start(session) — called once when the backend is first requested inside a session.
  2. submit(func, args, kwargs) — called to dispatch work.
  3. stop() — called when the session exits.
Source code in src/daglite/backends/base.py
class Backend(abc.ABC):
    """
    Abstract base class for task execution backends.

    A backend defines *how* task calls are executed — sequentially, across threads, across
    processes, or on a remote cluster. Custom backends can be registered with the `BackendManager`
    to extend daglite.

    Subclasses **must** implement `_submit`.

    Lifecycle:

    1. `start(session)` — called once when the backend is first requested inside a session.
    2. `submit(func, args, kwargs)` — called to dispatch work.
    3. `stop()` — called when the session exits.
    """

    name: ClassVar[str]

    _started: bool = False

    # region Lifecycle

    @final
    def start(self, session: SessionContext | None) -> None:
        """
        Initialise backend resources (thread pools, process pools, queues, etc.).

        Args:
            session: Active session context carrying event/plugin infrastructure and settings.
        """
        if self._started:  # pragma: no cover
            raise RuntimeError("Backend is already started.")

        self._session = session
        self._settings = session.settings if session else get_global_settings()
        self._backend_context = BackendContext.from_session(self.name)
        self._start(context=self._backend_context)
        self._started = True

    def _start(self, *, context: BackendContext) -> None:
        """Subclass hook for creating per-backend resources."""

    @final
    def stop(self) -> None:
        """Releases backend resources."""
        if not self._started:  # pragma: no cover
            return

        self._stop()
        self._started = False

    def _stop(self) -> None:
        """Subclass hook for cleaning up per-backend resources."""

    # region Execution

    @final
    def submit(
        self,
        func: Callable[..., Any],
        args: tuple[Any, ...],
        kwargs: dict[str, Any] | None = None,
        *,
        map_index: int | None = None,
    ) -> Future[Any]:
        """
        Submit a single task call and return a `Future` for its result.

        This is the fundamental execution primitive. Backends that wrap thread pools,
        process pools, or remote clusters implement this to dispatch one unit of work.

        Args:
            func: A callable to be executed on the backend.
            args: Positional arguments for the callable.
            kwargs: Optional keyword arguments for the callable.
            map_index: Optional index of the item in a fan-out map operation.

        Returns:
            A `concurrent.futures.Future` whose `.result()` yields the task return value.
        """
        submit_context = SubmitContext(map_index=map_index)
        return self._submit(func, args, kwargs or {}, context=submit_context)

    @abc.abstractmethod
    def _submit(
        self,
        func: Callable[..., Any],
        args: tuple[Any, ...],
        kwargs: dict[str, Any],
        *,
        context: SubmitContext,
    ) -> Future[Any]:
        """
        Subclass hook for dispatching a single task call.

        Args:
            func: A callable to be executed on the backend.
            args: Positional arguments for the callable.
            kwargs: Keyword arguments for the callable.
            context: Per-submission dynamic context (e.g. map index).

        Returns:
            A `concurrent.futures.Future` whose `.result()` yields the task return value.
        """
        ...

start

start(session: SessionContext | None) -> None

Initialise backend resources (thread pools, process pools, queues, etc.).

Parameters:

Name Type Description Default
session SessionContext | None

Active session context carrying event/plugin infrastructure and settings.

required
Source code in src/daglite/backends/base.py
@final
def start(self, session: SessionContext | None) -> None:
    """
    Initialise backend resources (thread pools, process pools, queues, etc.).

    Args:
        session: Active session context carrying event/plugin infrastructure and settings.
    """
    if self._started:  # pragma: no cover
        raise RuntimeError("Backend is already started.")

    self._session = session
    self._settings = session.settings if session else get_global_settings()
    self._backend_context = BackendContext.from_session(self.name)
    self._start(context=self._backend_context)
    self._started = True

stop

stop() -> None

Releases backend resources.

Source code in src/daglite/backends/base.py
@final
def stop(self) -> None:
    """Releases backend resources."""
    if not self._started:  # pragma: no cover
        return

    self._stop()
    self._started = False

submit

submit(
    func: Callable[..., Any],
    args: tuple[Any, ...],
    kwargs: dict[str, Any] | None = None,
    *,
    map_index: int | None = None,
) -> Future[Any]

Submit a single task call and return a Future for its result.

This is the fundamental execution primitive. Backends that wrap thread pools, process pools, or remote clusters implement this to dispatch one unit of work.

Parameters:

Name Type Description Default
func Callable[..., Any]

A callable to be executed on the backend.

required
args tuple[Any, ...]

Positional arguments for the callable.

required
kwargs dict[str, Any] | None

Optional keyword arguments for the callable.

None
map_index int | None

Optional index of the item in a fan-out map operation.

None

Returns:

Type Description
Future[Any]

A concurrent.futures.Future whose .result() yields the task return value.

Source code in src/daglite/backends/base.py
@final
def submit(
    self,
    func: Callable[..., Any],
    args: tuple[Any, ...],
    kwargs: dict[str, Any] | None = None,
    *,
    map_index: int | None = None,
) -> Future[Any]:
    """
    Submit a single task call and return a `Future` for its result.

    This is the fundamental execution primitive. Backends that wrap thread pools,
    process pools, or remote clusters implement this to dispatch one unit of work.

    Args:
        func: A callable to be executed on the backend.
        args: Positional arguments for the callable.
        kwargs: Optional keyword arguments for the callable.
        map_index: Optional index of the item in a fan-out map operation.

    Returns:
        A `concurrent.futures.Future` whose `.result()` yields the task return value.
    """
    submit_context = SubmitContext(map_index=map_index)
    return self._submit(func, args, kwargs or {}, context=submit_context)