API Reference¶
Overview of the daglite public API.
Daglite: Lightweight Python framework for building eager task pipelines.
task
¶
task(
*,
name: str | None = None,
description: str | None = None,
backend: str | None = None,
retries: int = 0,
timeout: float | None = None,
cache: bool = False,
cache_store: CacheStore | str | None = None,
cache_ttl: int | None = None,
cache_hash: CacheHashFn | None = None,
dataset: str | None = None,
dataset_store: DatasetStore | str | None = None,
dataset_format: str | None = None,
) -> _TaskDecorator
task(
func: Any = None,
*,
name: str | None = None,
description: str | None = None,
backend: str | None = None,
retries: int = 0,
timeout: float | None = None,
cache: bool = False,
cache_store: CacheStore | str | None = None,
cache_ttl: int | None = None,
cache_hash: CacheHashFn | None = None,
dataset: str | None = None,
dataset_store: DatasetStore | str | None = None,
dataset_format: str | None = None,
) -> Any
Creates a daglite task from a sync or async function.
The decorated function executes immediately on call (no futures, no graph).
When called inside an active session or workflow it emits events, fires hooks, participates in caching, and can interact with dataset stores.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str | None
|
Custom name of the task. Can include |
None
|
description
|
str | None
|
Description of the task. Defaults to the function's docstring if not provided. |
None
|
backend
|
str | None
|
Backend name for parallel operations. If |
None
|
retries
|
int
|
Number of automatic task retries on failure. |
0
|
timeout
|
float | None
|
Max execution time in seconds. |
None
|
cache
|
bool
|
Indicates whether the task participates in caching. Possible values include
* |
False
|
cache_ttl
|
int | None
|
Cache time-to-live (TTL) in seconds. This parameter ignored if cache is |
None
|
cache_hash
|
CacheHashFn | None
|
Custom |
None
|
Returns:
| Type | Description |
|---|---|
Any
|
A sync or async eager task callable with the original signature. |
Source code in src/daglite/tasks.py
128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 | |
workflow
¶
Decorator to convert a Python function into a daglite workflow.
Workflows are named entry points that wrap a function calling @task-decorated functions.
Tasks execute eagerly inside the workflow — they run immediately and return real values.
Calling a workflow sets up a managed session that provides backend, cache, plugin, and event
infrastructure. Sync workflows return the result directly; async workflows return a coroutine
that the caller must await.
Note that workflows cannot wrap generator functions. Use @task for generator functions or
collect results inside the workflow.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
func
|
Any
|
The function to wrap. When used without parentheses ( |
None
|
name
|
str | None
|
Custom name for the workflow. Defaults to the function's |
None
|
description
|
str | None
|
Workflow description. Defaults to the function's docstring. |
None
|
Returns:
| Type | Description |
|---|---|
Any
|
A |
Any
|
(when used as |
Examples:
>>> from daglite import task, workflow
>>> @task
... def add(x: int, y: int) -> int:
... return x + y
>>> @task
... def mul(x: int, y: int) -> int:
... return x * y
Single-value workflow
Multi-step workflow
>>> @workflow
... def chain_workflow(x: int, y: int):
... a = add(x=x, y=y)
... return mul(x=a, y=10)
>>> chain_workflow(2, 3)
50
Source code in src/daglite/workflows.py
async_session
async
¶
async_session(
*,
backend: str | None = None,
cache_store: str | CacheStore | None = None,
dataset_store: str | DatasetStore | None = None,
plugins: SerializablePlugin | list[SerializablePlugin] | None = None,
settings: DagliteSettings | None = None,
) -> AsyncIterator[SessionContext]
Async context manager that sets up an eager execution context.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
backend
|
str | None
|
Name of the backend to default. If |
None
|
cache_store
|
str | CacheStore | None
|
Cache store configuration. Can be a |
None
|
dataset_store
|
str | DatasetStore | None
|
Dataset store configuration. Can be a |
None
|
plugins
|
SerializablePlugin | list[SerializablePlugin] | None
|
Extra plugin instances for this session only. |
None
|
settings
|
DagliteSettings | None
|
A |
None
|
Yields:
| Type | Description |
|---|---|
AsyncIterator[SessionContext]
|
The active |
Source code in src/daglite/session.py
gather_tasks
async
¶
gather_tasks(
task: Callable[..., AsyncIterator[R]], *iterables: Iterable[Any], backend: str | None = None
) -> list[list[R]]
gather_tasks(
task: Callable[..., Any], *iterables: Iterable[Any], backend: str | None = None
) -> list[Any]
Gather async tasks concurrently across iterables using asyncio.gather.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
task
|
Callable[..., Any]
|
An async |
required |
*iterables
|
Iterable[Any]
|
Iterables whose elements are zipped and unpacked as arguments to |
()
|
backend
|
str | None
|
Backend override. |
None
|
Returns:
| Type | Description |
|---|---|
list[Any]
|
Ordered list of results, one per zipped item tuple. |
Raises:
| Type | Description |
|---|---|
TypeError
|
If task is not a |
Source code in src/daglite/composers.py
map_tasks
¶
map_tasks(
task: Callable[..., AsyncIterator[R]], *iterables: Iterable[Any], backend: str | None = None
) -> list[list[R]]
map_tasks(
task: Callable[..., Any], *iterables: Iterable[Any], backend: str | None = None
) -> list[Any]
Map a task across iterables using the active backend.
Each item is executed as a separate task call with full event emission and hook dispatch (if
a session is active). Async tasks are transparently wrapped with asyncio.run so that they can
be dispatched to backends that execute callables in worker threads or processes.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
task
|
Callable[..., Any]
|
Task to be mapped. Must be a |
required |
*iterables
|
Iterable[Any]
|
Iterables whose elements are zipped and unpacked as arguments to |
()
|
backend
|
str | None
|
Backend override. |
None
|
Returns:
| Type | Description |
|---|---|
list[Any]
|
Ordered list of results, one per zipped item tuple. |
Raises:
| Type | Description |
|---|---|
TypeError
|
If task is not a |
Source code in src/daglite/composers.py
load_dataset
¶
load_dataset(
key: str,
return_type: type[R] | None = None,
*,
format: str | None = None,
store: DatasetStore | str | None = None,
options: dict[str, Any] | None = None,
) -> R | Any
Load a dataset from the active (or explicitly provided) dataset store.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
str
|
Storage key/path. May contain |
required |
return_type
|
type[R] | None
|
Expected Python type for deserialization dispatch. |
None
|
format
|
str | None
|
Serialization format hint (e.g. |
None
|
store
|
DatasetStore | str | None
|
Explicit |
None
|
options
|
dict[str, Any] | None
|
Additional options forwarded to the Dataset constructor. |
None
|
Returns:
| Type | Description |
|---|---|
R | Any
|
The deserialized Python object. |
Source code in src/daglite/composers.py
save_dataset
¶
save_dataset(
key: str,
value: Any,
*,
format: str | None = None,
store: DatasetStore | str | None = None,
options: dict[str, Any] | None = None,
) -> str
Save a value to the active (or explicitly provided) dataset store.
When a DatasetReporter is available (inside a session), the save is routed through
the reporter so that process/remote backends push the write back to the coordinator.
The store is resolved from the context chain: explicit argument -> task context -> session context -> global settings.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
str
|
Storage key/path. May contain |
required |
value
|
Any
|
The Python object to serialize and persist. |
required |
format
|
str | None
|
Serialization format hint (e.g. |
None
|
store
|
DatasetStore | str | None
|
Explicit |
None
|
options
|
dict[str, Any] | None
|
Additional options forwarded to the Dataset constructor. |
None
|
Returns:
| Type | Description |
|---|---|
str
|
The actual path where data was stored. |