Skip to content

Datasets

datasets

Dataset storage and serialization infrastructure.

AbstractDataset

Bases: ABC

Abstract base class for type-based serialization.

Subclasses auto-register via init_subclass. Supports lazy plugin discovery via entry points when a type/format combination is not found.

Examples:

Create a custom dataset using class parameters:

>>> class JsonDataset(AbstractDataset, format="json", types=dict, extensions="json"):
...     def serialize(self, obj: dict) -> bytes:
...         import json
...
...         return json.dumps(obj).encode("utf-8")
...
...     def deserialize(self, data: bytes) -> dict:
...         import json
...
...         return json.loads(data.decode("utf-8"))

Or using class variables:

>>> class YamlDataset(AbstractDataset):
...     format = "yaml"
...     types = (dict,)
...     extensions = ("yaml", "yml")
...
...     def serialize(self, obj: dict) -> bytes: ...
...     def deserialize(self, data: bytes) -> dict: ...

The dataset is now registered and can be retrieved:

>>> dataset_class = AbstractDataset.get(dict, "json")
>>> dataset = dataset_class()
>>> dataset.serialize({"key": "value"})
b'{"key": "value"}'
Source code in src/daglite/datasets/base.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
class AbstractDataset(ABC):
    """
    Abstract base class for type-based serialization.

    Subclasses auto-register via __init_subclass__. Supports lazy plugin discovery
    via entry points when a type/format combination is not found.

    Examples:
        Create a custom dataset using class parameters:

        >>> class JsonDataset(AbstractDataset, format="json", types=dict, extensions="json"):
        ...     def serialize(self, obj: dict) -> bytes:
        ...         import json
        ...
        ...         return json.dumps(obj).encode("utf-8")
        ...
        ...     def deserialize(self, data: bytes) -> dict:
        ...         import json
        ...
        ...         return json.loads(data.decode("utf-8"))

        Or using class variables:

        >>> class YamlDataset(AbstractDataset):
        ...     format = "yaml"
        ...     types = (dict,)
        ...     extensions = ("yaml", "yml")
        ...
        ...     def serialize(self, obj: dict) -> bytes: ...
        ...     def deserialize(self, data: bytes) -> dict: ...

        The dataset is now registered and can be retrieved:

        >>> dataset_class = AbstractDataset.get(dict, "json")
        >>> dataset = dataset_class()
        >>> dataset.serialize({"key": "value"})
        b'{"key": "value"}'
    """

    # Class-level registry: (type, format) -> Dataset class
    _registry: ClassVar[dict[tuple[type, str], type[AbstractDataset]]] = {}

    # Reverse lookup: extension -> list of (type, format) for Driver hints
    _extension_hints: ClassVar[dict[str, list[tuple[type, str]]]] = {}

    # Track which entry point modules we've already tried to load
    _discovered: ClassVar[set[str]] = set()

    # Whether to auto-discover plugins on lookup failure
    _auto_discover: ClassVar[bool] = True

    # Defined by subclasses via __init_subclass__ or as class variables
    format: ClassVar[str]
    types: ClassVar[tuple[type, ...] | Any]
    extensions: ClassVar[tuple[str, ...] | str | None]

    def __init_subclass__(
        cls,
        *,
        format: str | None = None,
        types: type | tuple[type, ...] | None = None,
        extensions: str | tuple[str, ...] | None = None,
        **kwargs: Any,
    ) -> None:
        """
        Auto-register subclasses in the dataset registry.

        Parameters can be provided as class parameters or as class variables.
        Class parameters take precedence over class variables.

        Args:
            format: Format identifier (e.g., 'pickle', 'parquet'). If None, checks for class
                variable. If neither exists, this is an abstract intermediate class and won't be
                registered.
            types: Python type(s) this dataset can serialize. Can be a single
                type or a tuple of types.
            extensions: Optional file extension(s) for format inference (without dots). Can be a
                single string or a tuple of strings.
            kwargs: Additional keyword arguments.

        Examples:
            Using class parameters:

            >>> class JsonDataset(AbstractDataset, format="json", types=dict): ...

            Using class variables:

            >>> class CsvDataset(AbstractDataset):
            ...     format = "csv"
            ...     types = (dict,)
            ...     extensions = ("csv",)
            ...     ...
        """
        super().__init_subclass__(**kwargs)

        # Resolve format: parameter > class variable > None (abstract class)
        resolved_format = format
        if resolved_format is None:
            resolved_format = getattr(cls, "format", None)

        if resolved_format is None:
            return

        # Resolve types: parameter > class variable > empty
        types = getattr(cls, "types", ()) if types is None else types
        if not types:
            raise DatasetError(
                f"Dataset class {cls.__name__} must specify 'types' either as a class parameter "
                f"or class variable"
            )
        resolved_types = (types,) if isinstance(types, type) else types

        # Resolve extensions: parameter > class variable > empty
        extensions = getattr(cls, "extensions", ()) if extensions is None else extensions
        extensions = () if extensions is None else extensions
        resolved_extensions = (extensions,) if isinstance(extensions, str) else extensions

        cls.format = resolved_format
        cls.supported_types = resolved_types  # type: ignore
        cls.file_extensions = resolved_extensions  # type: ignore

        # Register for each supported type
        for t in resolved_types:
            AbstractDataset._registry[(t, resolved_format)] = cls

        # Register extension hints for Driver
        for ext in resolved_extensions:
            if ext not in AbstractDataset._extension_hints:
                AbstractDataset._extension_hints[ext] = []
            for t in resolved_types:
                AbstractDataset._extension_hints[ext].append((t, resolved_format))

    def __init__(self, *args, **kwargs) -> None:
        # NOTE: Subclasses can define their own __init__ with specific parameters. The base
        # implementation allows arbitrary args/kwargs for inheritance, but raises an error if any
        # are provided to prevent silent bugs from unrecognized parameters.
        if args or kwargs:
            name = self.__class__.__name__
            raise ValueError(
                f"{name} does not accept parameters in its constructor, but got args={args} "
                f"kwargs={kwargs}"
            )

    @abstractmethod
    def serialize(self, obj: Any) -> bytes:
        """
        Convert a Python object to bytes.

        Args:
            obj: Python object to serialize.

        Returns:
            Serialized bytes representation.
        """
        ...

    @abstractmethod
    def deserialize(self, data: bytes) -> Any:
        """
        Convert bytes back to a value.

        Args:
            data: Serialized bytes to deserialize.

        Returns:
            The deserialized value.
        """
        ...

    @classmethod
    def get(cls, type_: type, format: str) -> type[AbstractDataset]:
        """
        Look up and return the Dataset type for a type/format combination.

        Supports lazy plugin discovery - if the combination isn't found,
        attempts to load relevant entry points before failing.

        Args:
            type_: The Python type to serialize/deserialize.
            format: The format identifier (e.g., 'pickle', 'parquet').

        Returns:
            An instantiated Dataset that can handle the type/format.

        Raises:
            ValueError: If no dataset is registered for the type/format.

        Examples:
            >>> dataset_cls = AbstractDataset.get(str, "text")
            >>> dataset = dataset_cls()
            >>> dataset.serialize("hello")
            b'hello'
        """
        key = (type_, format)

        # Fast path: exact match in registry
        if key in cls._registry:
            return cls._registry[key]

        # Check for subclass matches (e.g., custom dict subclass → dict handler)
        for (reg_type, reg_format), dataset_cls in cls._registry.items():
            if reg_format == format:
                try:
                    if issubclass(type_, reg_type):
                        return dataset_cls
                except TypeError:  # pragma: no cover - defensive
                    continue

        # Slow path: try discovering plugins for this type
        if cls._auto_discover:  # pragma: no branch - defensive
            cls._discover_for_type(type_)

            if key in cls._registry:
                return cls._registry[key]  # pragma: no cover - post-discovery fast path

            for (
                reg_type,
                reg_format,
            ), dataset_cls in cls._registry.items():  # pragma: no cover - post-discovery
                if reg_format == format:
                    try:
                        if issubclass(type_, reg_type):
                            return dataset_cls
                    except TypeError:
                        continue

        module = type_.__module__.split(".")[0]
        available_formats = cls.get_formats_for_type(type_)

        if available_formats:
            raise ValueError(
                f"No dataset registered for {type_.__name__} with format '{format}'.\n"
                f"Available formats for this type: {', '.join(sorted(available_formats))}"
            )
        else:
            raise ValueError(
                f"No dataset registered for {type_.__name__} with format '{format}'.\n"
                f"Try: pip install daglite-datasets[{module}]"
            )

    @classmethod
    def infer_format(cls, type_: type, extension: str | None = None) -> str:
        """
        Infer the format for a type, optionally using extension as a hint.

        Args:
            type_: The Python type.
            extension: Optional file extension (without dot) to help infer format.

        Returns:
            The inferred format identifier.

        Raises:
            ValueError: If no format can be inferred for the type.

        Examples:
            >>> AbstractDataset.infer_format(str, "txt")
            'text'
            >>> AbstractDataset.infer_format(dict, "pkl")
            'pickle'
        """
        # Try extension hint first
        if extension:
            hints = cls._extension_hints.get(extension, [])
            for hint_type, hint_format in hints:
                if hint_type == type_:
                    return hint_format
            # Check subclasses
            for hint_type, hint_format in hints:
                try:
                    if issubclass(type_, hint_type):
                        return hint_format
                except TypeError:  # pragma: no cover - defensive
                    continue

        # Fall back to first registered format for this type
        for (reg_type, reg_format), _ in cls._registry.items():
            if reg_type == type_:
                return reg_format

        # Check subclass matches
        for (reg_type, reg_format), _ in cls._registry.items():
            try:
                if issubclass(type_, reg_type):
                    return reg_format
            except TypeError:  # pragma: no cover - defensive
                continue

        # Try auto-discovery
        if cls._auto_discover:  # pragma: no branch - defensive
            cls._discover_for_type(type_)

            for (reg_type, reg_format), _ in cls._registry.items():
                if reg_type == type_:
                    return reg_format  # pragma: no cover - post-discovery format match

        raise ValueError(f"No default format registered for {type_.__name__}")

    @classmethod
    def get_formats_for_type(cls, type_: type) -> set[str]:
        """
        Get all registered formats for a type.

        Args:
            type_: The Python type.

        Returns:
            Set of format identifiers registered for this type.

        Examples:
            >>> "text" in AbstractDataset.get_formats_for_type(str)
            True
        """
        formats: set[str] = set()
        for (reg_type, reg_format), _ in cls._registry.items():
            if reg_type == type_:
                formats.add(reg_format)
            else:
                try:
                    if issubclass(type_, reg_type):
                        formats.add(reg_format)
                except TypeError:  # pragma: no cover - defensive
                    continue
        return formats

    @classmethod
    def get_extension(cls, type_: type, format: str) -> str | None:
        """
        Get the preferred file extension for a type/format combination.

        Args:
            type_: The Python type.
            format: The format identifier.

        Returns:
            The preferred file extension (without dot), or None if not found.

        Examples:
            >>> AbstractDataset.get_extension(str, "text")
            'txt'
        """
        key = (type_, format)
        if key in cls._registry:
            dataset_cls = cls._registry[key]
            if dataset_cls.file_extensions:
                return dataset_cls.file_extensions[0]
        return None

    @classmethod
    def load_plugins(cls, *names: str, auto_discover: bool | None = None) -> None:
        """
        Explicitly load dataset plugins.

        Args:
            *names: Plugin names to load (e.g., "pandas", "numpy").
                If empty, loads all installed plugins.
            auto_discover: If provided, sets whether lazy discovery is enabled.
                Set to False to disable automatic plugin loading on lookup failure.

        Examples:
            Load specific plugins:

            >>> AbstractDataset.load_plugins("pandas", "numpy")

            Load all installed plugins:

            >>> AbstractDataset.load_plugins()

            Disable auto-discovery (strict mode):

            >>> AbstractDataset.load_plugins("pandas", auto_discover=False)
        """
        if auto_discover is not None:
            cls._auto_discover = auto_discover

        try:
            eps = importlib.metadata.entry_points(group="daglite.datasets")
        except TypeError:  # pragma: no cover
            # Python < 3.10 compatibility
            all_eps = importlib.metadata.entry_points()
            eps = all_eps.get("daglite.datasets", [])  # type: ignore[assignment]

        for ep in eps:
            if not names or ep.name in names:  # pragma: no cover - requires plugins
                try:
                    ep.load()  # Imports module, triggers __init_subclass__
                    cls._discovered.add(ep.name)
                except Exception:  # pragma: no cover
                    pass  # Plugin failed to load, continue with others

    @classmethod
    def _discover_for_type(cls, type_: type) -> None:
        """
        Lazily discover and load plugins that might handle this type.

        Uses the type's module name to find relevant entry points.
        For example, pandas.DataFrame → looks for "pandas" entry point.
        """
        module_name = type_.__module__.split(".")[0]

        if module_name in cls._discovered:
            return

        cls._discovered.add(module_name)

        try:
            eps = importlib.metadata.entry_points(group="daglite.datasets")
        except TypeError:  # pragma: no cover
            # Python < 3.10 compatibility
            all_eps = importlib.metadata.entry_points()
            eps = all_eps.get("daglite.datasets", [])  # type: ignore[assignment]

        for ep in eps:
            if ep.name == module_name:  # pragma: no cover - requires plugins
                try:
                    ep.load()
                except Exception:  # pragma: no cover
                    pass  # Plugin failed to load
                break

__init_subclass__

__init_subclass__(
    *,
    format: str | None = None,
    types: type | tuple[type, ...] | None = None,
    extensions: str | tuple[str, ...] | None = None,
    **kwargs: Any,
) -> None

Auto-register subclasses in the dataset registry.

Parameters can be provided as class parameters or as class variables. Class parameters take precedence over class variables.

Parameters:

Name Type Description Default
format str | None

Format identifier (e.g., 'pickle', 'parquet'). If None, checks for class variable. If neither exists, this is an abstract intermediate class and won't be registered.

None
types type | tuple[type, ...] | None

Python type(s) this dataset can serialize. Can be a single type or a tuple of types.

None
extensions str | tuple[str, ...] | None

Optional file extension(s) for format inference (without dots). Can be a single string or a tuple of strings.

None
kwargs Any

Additional keyword arguments.

{}

Examples:

Using class parameters:

>>> class JsonDataset(AbstractDataset, format="json", types=dict): ...

Using class variables:

>>> class CsvDataset(AbstractDataset):
...     format = "csv"
...     types = (dict,)
...     extensions = ("csv",)
...     ...
Source code in src/daglite/datasets/base.py
def __init_subclass__(
    cls,
    *,
    format: str | None = None,
    types: type | tuple[type, ...] | None = None,
    extensions: str | tuple[str, ...] | None = None,
    **kwargs: Any,
) -> None:
    """
    Auto-register subclasses in the dataset registry.

    Parameters can be provided as class parameters or as class variables.
    Class parameters take precedence over class variables.

    Args:
        format: Format identifier (e.g., 'pickle', 'parquet'). If None, checks for class
            variable. If neither exists, this is an abstract intermediate class and won't be
            registered.
        types: Python type(s) this dataset can serialize. Can be a single
            type or a tuple of types.
        extensions: Optional file extension(s) for format inference (without dots). Can be a
            single string or a tuple of strings.
        kwargs: Additional keyword arguments.

    Examples:
        Using class parameters:

        >>> class JsonDataset(AbstractDataset, format="json", types=dict): ...

        Using class variables:

        >>> class CsvDataset(AbstractDataset):
        ...     format = "csv"
        ...     types = (dict,)
        ...     extensions = ("csv",)
        ...     ...
    """
    super().__init_subclass__(**kwargs)

    # Resolve format: parameter > class variable > None (abstract class)
    resolved_format = format
    if resolved_format is None:
        resolved_format = getattr(cls, "format", None)

    if resolved_format is None:
        return

    # Resolve types: parameter > class variable > empty
    types = getattr(cls, "types", ()) if types is None else types
    if not types:
        raise DatasetError(
            f"Dataset class {cls.__name__} must specify 'types' either as a class parameter "
            f"or class variable"
        )
    resolved_types = (types,) if isinstance(types, type) else types

    # Resolve extensions: parameter > class variable > empty
    extensions = getattr(cls, "extensions", ()) if extensions is None else extensions
    extensions = () if extensions is None else extensions
    resolved_extensions = (extensions,) if isinstance(extensions, str) else extensions

    cls.format = resolved_format
    cls.supported_types = resolved_types  # type: ignore
    cls.file_extensions = resolved_extensions  # type: ignore

    # Register for each supported type
    for t in resolved_types:
        AbstractDataset._registry[(t, resolved_format)] = cls

    # Register extension hints for Driver
    for ext in resolved_extensions:
        if ext not in AbstractDataset._extension_hints:
            AbstractDataset._extension_hints[ext] = []
        for t in resolved_types:
            AbstractDataset._extension_hints[ext].append((t, resolved_format))

deserialize abstractmethod

deserialize(data: bytes) -> Any

Convert bytes back to a value.

Parameters:

Name Type Description Default
data bytes

Serialized bytes to deserialize.

required

Returns:

Type Description
Any

The deserialized value.

Source code in src/daglite/datasets/base.py
@abstractmethod
def deserialize(self, data: bytes) -> Any:
    """
    Convert bytes back to a value.

    Args:
        data: Serialized bytes to deserialize.

    Returns:
        The deserialized value.
    """
    ...

get classmethod

get(type_: type, format: str) -> type[AbstractDataset]

Look up and return the Dataset type for a type/format combination.

Supports lazy plugin discovery - if the combination isn't found, attempts to load relevant entry points before failing.

Parameters:

Name Type Description Default
type_ type

The Python type to serialize/deserialize.

required
format str

The format identifier (e.g., 'pickle', 'parquet').

required

Returns:

Type Description
type[AbstractDataset]

An instantiated Dataset that can handle the type/format.

Raises:

Type Description
ValueError

If no dataset is registered for the type/format.

Examples:

>>> dataset_cls = AbstractDataset.get(str, "text")
>>> dataset = dataset_cls()
>>> dataset.serialize("hello")
b'hello'
Source code in src/daglite/datasets/base.py
@classmethod
def get(cls, type_: type, format: str) -> type[AbstractDataset]:
    """
    Look up and return the Dataset type for a type/format combination.

    Supports lazy plugin discovery - if the combination isn't found,
    attempts to load relevant entry points before failing.

    Args:
        type_: The Python type to serialize/deserialize.
        format: The format identifier (e.g., 'pickle', 'parquet').

    Returns:
        An instantiated Dataset that can handle the type/format.

    Raises:
        ValueError: If no dataset is registered for the type/format.

    Examples:
        >>> dataset_cls = AbstractDataset.get(str, "text")
        >>> dataset = dataset_cls()
        >>> dataset.serialize("hello")
        b'hello'
    """
    key = (type_, format)

    # Fast path: exact match in registry
    if key in cls._registry:
        return cls._registry[key]

    # Check for subclass matches (e.g., custom dict subclass → dict handler)
    for (reg_type, reg_format), dataset_cls in cls._registry.items():
        if reg_format == format:
            try:
                if issubclass(type_, reg_type):
                    return dataset_cls
            except TypeError:  # pragma: no cover - defensive
                continue

    # Slow path: try discovering plugins for this type
    if cls._auto_discover:  # pragma: no branch - defensive
        cls._discover_for_type(type_)

        if key in cls._registry:
            return cls._registry[key]  # pragma: no cover - post-discovery fast path

        for (
            reg_type,
            reg_format,
        ), dataset_cls in cls._registry.items():  # pragma: no cover - post-discovery
            if reg_format == format:
                try:
                    if issubclass(type_, reg_type):
                        return dataset_cls
                except TypeError:
                    continue

    module = type_.__module__.split(".")[0]
    available_formats = cls.get_formats_for_type(type_)

    if available_formats:
        raise ValueError(
            f"No dataset registered for {type_.__name__} with format '{format}'.\n"
            f"Available formats for this type: {', '.join(sorted(available_formats))}"
        )
    else:
        raise ValueError(
            f"No dataset registered for {type_.__name__} with format '{format}'.\n"
            f"Try: pip install daglite-datasets[{module}]"
        )

get_extension classmethod

get_extension(type_: type, format: str) -> str | None

Get the preferred file extension for a type/format combination.

Parameters:

Name Type Description Default
type_ type

The Python type.

required
format str

The format identifier.

required

Returns:

Type Description
str | None

The preferred file extension (without dot), or None if not found.

Examples:

>>> AbstractDataset.get_extension(str, "text")
'txt'
Source code in src/daglite/datasets/base.py
@classmethod
def get_extension(cls, type_: type, format: str) -> str | None:
    """
    Get the preferred file extension for a type/format combination.

    Args:
        type_: The Python type.
        format: The format identifier.

    Returns:
        The preferred file extension (without dot), or None if not found.

    Examples:
        >>> AbstractDataset.get_extension(str, "text")
        'txt'
    """
    key = (type_, format)
    if key in cls._registry:
        dataset_cls = cls._registry[key]
        if dataset_cls.file_extensions:
            return dataset_cls.file_extensions[0]
    return None

get_formats_for_type classmethod

get_formats_for_type(type_: type) -> set[str]

Get all registered formats for a type.

Parameters:

Name Type Description Default
type_ type

The Python type.

required

Returns:

Type Description
set[str]

Set of format identifiers registered for this type.

Examples:

>>> "text" in AbstractDataset.get_formats_for_type(str)
True
Source code in src/daglite/datasets/base.py
@classmethod
def get_formats_for_type(cls, type_: type) -> set[str]:
    """
    Get all registered formats for a type.

    Args:
        type_: The Python type.

    Returns:
        Set of format identifiers registered for this type.

    Examples:
        >>> "text" in AbstractDataset.get_formats_for_type(str)
        True
    """
    formats: set[str] = set()
    for (reg_type, reg_format), _ in cls._registry.items():
        if reg_type == type_:
            formats.add(reg_format)
        else:
            try:
                if issubclass(type_, reg_type):
                    formats.add(reg_format)
            except TypeError:  # pragma: no cover - defensive
                continue
    return formats

infer_format classmethod

infer_format(type_: type, extension: str | None = None) -> str

Infer the format for a type, optionally using extension as a hint.

Parameters:

Name Type Description Default
type_ type

The Python type.

required
extension str | None

Optional file extension (without dot) to help infer format.

None

Returns:

Type Description
str

The inferred format identifier.

Raises:

Type Description
ValueError

If no format can be inferred for the type.

Examples:

>>> AbstractDataset.infer_format(str, "txt")
'text'
>>> AbstractDataset.infer_format(dict, "pkl")
'pickle'
Source code in src/daglite/datasets/base.py
@classmethod
def infer_format(cls, type_: type, extension: str | None = None) -> str:
    """
    Infer the format for a type, optionally using extension as a hint.

    Args:
        type_: The Python type.
        extension: Optional file extension (without dot) to help infer format.

    Returns:
        The inferred format identifier.

    Raises:
        ValueError: If no format can be inferred for the type.

    Examples:
        >>> AbstractDataset.infer_format(str, "txt")
        'text'
        >>> AbstractDataset.infer_format(dict, "pkl")
        'pickle'
    """
    # Try extension hint first
    if extension:
        hints = cls._extension_hints.get(extension, [])
        for hint_type, hint_format in hints:
            if hint_type == type_:
                return hint_format
        # Check subclasses
        for hint_type, hint_format in hints:
            try:
                if issubclass(type_, hint_type):
                    return hint_format
            except TypeError:  # pragma: no cover - defensive
                continue

    # Fall back to first registered format for this type
    for (reg_type, reg_format), _ in cls._registry.items():
        if reg_type == type_:
            return reg_format

    # Check subclass matches
    for (reg_type, reg_format), _ in cls._registry.items():
        try:
            if issubclass(type_, reg_type):
                return reg_format
        except TypeError:  # pragma: no cover - defensive
            continue

    # Try auto-discovery
    if cls._auto_discover:  # pragma: no branch - defensive
        cls._discover_for_type(type_)

        for (reg_type, reg_format), _ in cls._registry.items():
            if reg_type == type_:
                return reg_format  # pragma: no cover - post-discovery format match

    raise ValueError(f"No default format registered for {type_.__name__}")

load_plugins classmethod

load_plugins(*names: str, auto_discover: bool | None = None) -> None

Explicitly load dataset plugins.

Parameters:

Name Type Description Default
*names str

Plugin names to load (e.g., "pandas", "numpy"). If empty, loads all installed plugins.

()
auto_discover bool | None

If provided, sets whether lazy discovery is enabled. Set to False to disable automatic plugin loading on lookup failure.

None

Examples:

Load specific plugins:

>>> AbstractDataset.load_plugins("pandas", "numpy")

Load all installed plugins:

>>> AbstractDataset.load_plugins()

Disable auto-discovery (strict mode):

>>> AbstractDataset.load_plugins("pandas", auto_discover=False)
Source code in src/daglite/datasets/base.py
@classmethod
def load_plugins(cls, *names: str, auto_discover: bool | None = None) -> None:
    """
    Explicitly load dataset plugins.

    Args:
        *names: Plugin names to load (e.g., "pandas", "numpy").
            If empty, loads all installed plugins.
        auto_discover: If provided, sets whether lazy discovery is enabled.
            Set to False to disable automatic plugin loading on lookup failure.

    Examples:
        Load specific plugins:

        >>> AbstractDataset.load_plugins("pandas", "numpy")

        Load all installed plugins:

        >>> AbstractDataset.load_plugins()

        Disable auto-discovery (strict mode):

        >>> AbstractDataset.load_plugins("pandas", auto_discover=False)
    """
    if auto_discover is not None:
        cls._auto_discover = auto_discover

    try:
        eps = importlib.metadata.entry_points(group="daglite.datasets")
    except TypeError:  # pragma: no cover
        # Python < 3.10 compatibility
        all_eps = importlib.metadata.entry_points()
        eps = all_eps.get("daglite.datasets", [])  # type: ignore[assignment]

    for ep in eps:
        if not names or ep.name in names:  # pragma: no cover - requires plugins
            try:
                ep.load()  # Imports module, triggers __init_subclass__
                cls._discovered.add(ep.name)
            except Exception:  # pragma: no cover
                pass  # Plugin failed to load, continue with others

serialize abstractmethod

serialize(obj: Any) -> bytes

Convert a Python object to bytes.

Parameters:

Name Type Description Default
obj Any

Python object to serialize.

required

Returns:

Type Description
bytes

Serialized bytes representation.

Source code in src/daglite/datasets/base.py
@abstractmethod
def serialize(self, obj: Any) -> bytes:
    """
    Convert a Python object to bytes.

    Args:
        obj: Python object to serialize.

    Returns:
        Serialized bytes representation.
    """
    ...

DatasetStore

High-level store that handles serialization via Datasets.

This wraps a Driver (like FileDriver) and adds automatic serialization/deserialization using the Dataset registry.

Format is inferred from the file extension in the key.

This is the user-facing API - it accepts Python objects and handles all serialization internally.

Examples:

>>> from daglite.datasets import DatasetStore
>>> store = DatasetStore("/tmp/outputs")
>>> store.save("data.pkl", {"data": [1, 2, 3]})
>>> store.load("data.pkl", dict)
{'data': [1, 2, 3]}
Source code in src/daglite/datasets/store.py
class DatasetStore:
    """
    High-level store that handles serialization via Datasets.

    This wraps a Driver (like FileDriver) and adds automatic
    serialization/deserialization using the Dataset registry.

    Format is inferred from the file extension in the key.

    This is the user-facing API - it accepts Python objects and handles
    all serialization internally.

    Examples:
        >>> from daglite.datasets import DatasetStore
        >>> store = DatasetStore("/tmp/outputs")  # doctest: +SKIP
        >>> store.save("data.pkl", {"data": [1, 2, 3]})  # doctest: +SKIP
        >>> store.load("data.pkl", dict)  # doctest: +SKIP
        {'data': [1, 2, 3]}
    """

    def __init__(self, driver: Driver | str) -> None:
        """
        Initialize with a driver.

        Args:
            driver: A Driver instance or string path (creates FileDriver).
        """
        self._driver: Driver
        if isinstance(driver, str):
            from daglite.drivers import FileDriver

            self._driver = FileDriver(driver)
        else:
            self._driver = driver

    @property
    def base_path(self) -> str:
        """Get base path (for FileDriver compatibility)."""
        return getattr(self._driver, "base_path", "")

    @property
    def is_local(self) -> bool:
        """Whether the underlying driver accesses local storage."""
        return getattr(self._driver, "is_local", True)

    def save(
        self,
        key: str,
        value: Any,
        format: str | None = None,
        options: dict[str, Any] | None = None,
    ) -> str:
        """
        Save a value using Dataset serialization.

        Args:
            key: Storage key/path. Format hint from driver (e.g., extension).
            value: Value to serialize and save.
            format: Serialization format. If None, inferred from type and/or driver hint.
            options: Additional options passed to the Dataset's save method.

        Returns:
            The actual path where data was stored.
        """
        key = _resolve_key(key)
        value_type = type(value)

        if format is None:
            hint = self._driver.get_format_hint(key)
            format = AbstractDataset.infer_format(value_type, hint)

        dataset_cls = AbstractDataset.get(value_type, format)
        options = options or {}
        dataset = dataset_cls(**options)
        data = dataset.serialize(value)
        return self._driver.save(key, data)

    def load(
        self,
        key: str,
        return_type: type[T] | None = None,
        format: str | None = None,
        options: dict[str, Any] | None = None,
    ) -> T:
        """
        Load a value using Dataset deserialization.

        Args:
            key: Storage key/path. Format hint from driver (e.g., extension).
            return_type: Expected return type. If None, uses pickle format.
            format: Serialization format. If None, inferred from type and/or driver hint.
            options: Additional options passed to the Dataset's load method.

        Returns:
            The deserialized value.

        Raises:
            KeyError: If key not found.
        """
        key = _resolve_key(key)
        data = self._driver.load(key)

        if return_type is None and format is None:
            return pickle.loads(data)  # No type hint or format - assume pickle

        hint = self._driver.get_format_hint(key)
        if return_type is None:  # pragma: no cover
            raise ValueError(
                "return_type must be provided when 'format' is specified; "
                "omit 'format' and 'return_type' together to use pickle-based loading."
            )
        if not format:
            format = AbstractDataset.infer_format(return_type, hint)
        dataset_cls = AbstractDataset.get(return_type, format)
        options = options or {}
        dataset = dataset_cls(**options)
        return cast(T, dataset.deserialize(data))

    def exists(self, key: str) -> bool:
        """Check if a key exists."""
        return self._driver.exists(_resolve_key(key))

    def delete(self, key: str) -> None:
        """Delete stored data."""
        self._driver.delete(_resolve_key(key))

    def list_keys(self) -> list[str]:
        """List all stored keys."""
        return self._driver.list_keys()

    def __getstate__(self) -> dict[str, Any]:
        """Serialize for pickling."""
        return {"driver": self._driver}

    def __setstate__(self, state: dict[str, Any]) -> None:
        """Reconstruct from pickled state."""
        self._driver = state["driver"]

base_path property

base_path: str

Get base path (for FileDriver compatibility).

is_local property

is_local: bool

Whether the underlying driver accesses local storage.

__getstate__

__getstate__() -> dict[str, Any]

Serialize for pickling.

Source code in src/daglite/datasets/store.py
def __getstate__(self) -> dict[str, Any]:
    """Serialize for pickling."""
    return {"driver": self._driver}

__init__

__init__(driver: Driver | str) -> None

Initialize with a driver.

Parameters:

Name Type Description Default
driver Driver | str

A Driver instance or string path (creates FileDriver).

required
Source code in src/daglite/datasets/store.py
def __init__(self, driver: Driver | str) -> None:
    """
    Initialize with a driver.

    Args:
        driver: A Driver instance or string path (creates FileDriver).
    """
    self._driver: Driver
    if isinstance(driver, str):
        from daglite.drivers import FileDriver

        self._driver = FileDriver(driver)
    else:
        self._driver = driver

__setstate__

__setstate__(state: dict[str, Any]) -> None

Reconstruct from pickled state.

Source code in src/daglite/datasets/store.py
def __setstate__(self, state: dict[str, Any]) -> None:
    """Reconstruct from pickled state."""
    self._driver = state["driver"]

delete

delete(key: str) -> None

Delete stored data.

Source code in src/daglite/datasets/store.py
def delete(self, key: str) -> None:
    """Delete stored data."""
    self._driver.delete(_resolve_key(key))

exists

exists(key: str) -> bool

Check if a key exists.

Source code in src/daglite/datasets/store.py
def exists(self, key: str) -> bool:
    """Check if a key exists."""
    return self._driver.exists(_resolve_key(key))

list_keys

list_keys() -> list[str]

List all stored keys.

Source code in src/daglite/datasets/store.py
def list_keys(self) -> list[str]:
    """List all stored keys."""
    return self._driver.list_keys()

load

load(
    key: str,
    return_type: type[T] | None = None,
    format: str | None = None,
    options: dict[str, Any] | None = None,
) -> T

Load a value using Dataset deserialization.

Parameters:

Name Type Description Default
key str

Storage key/path. Format hint from driver (e.g., extension).

required
return_type type[T] | None

Expected return type. If None, uses pickle format.

None
format str | None

Serialization format. If None, inferred from type and/or driver hint.

None
options dict[str, Any] | None

Additional options passed to the Dataset's load method.

None

Returns:

Type Description
T

The deserialized value.

Raises:

Type Description
KeyError

If key not found.

Source code in src/daglite/datasets/store.py
def load(
    self,
    key: str,
    return_type: type[T] | None = None,
    format: str | None = None,
    options: dict[str, Any] | None = None,
) -> T:
    """
    Load a value using Dataset deserialization.

    Args:
        key: Storage key/path. Format hint from driver (e.g., extension).
        return_type: Expected return type. If None, uses pickle format.
        format: Serialization format. If None, inferred from type and/or driver hint.
        options: Additional options passed to the Dataset's load method.

    Returns:
        The deserialized value.

    Raises:
        KeyError: If key not found.
    """
    key = _resolve_key(key)
    data = self._driver.load(key)

    if return_type is None and format is None:
        return pickle.loads(data)  # No type hint or format - assume pickle

    hint = self._driver.get_format_hint(key)
    if return_type is None:  # pragma: no cover
        raise ValueError(
            "return_type must be provided when 'format' is specified; "
            "omit 'format' and 'return_type' together to use pickle-based loading."
        )
    if not format:
        format = AbstractDataset.infer_format(return_type, hint)
    dataset_cls = AbstractDataset.get(return_type, format)
    options = options or {}
    dataset = dataset_cls(**options)
    return cast(T, dataset.deserialize(data))

save

save(key: str, value: Any, format: str | None = None, options: dict[str, Any] | None = None) -> str

Save a value using Dataset serialization.

Parameters:

Name Type Description Default
key str

Storage key/path. Format hint from driver (e.g., extension).

required
value Any

Value to serialize and save.

required
format str | None

Serialization format. If None, inferred from type and/or driver hint.

None
options dict[str, Any] | None

Additional options passed to the Dataset's save method.

None

Returns:

Type Description
str

The actual path where data was stored.

Source code in src/daglite/datasets/store.py
def save(
    self,
    key: str,
    value: Any,
    format: str | None = None,
    options: dict[str, Any] | None = None,
) -> str:
    """
    Save a value using Dataset serialization.

    Args:
        key: Storage key/path. Format hint from driver (e.g., extension).
        value: Value to serialize and save.
        format: Serialization format. If None, inferred from type and/or driver hint.
        options: Additional options passed to the Dataset's save method.

    Returns:
        The actual path where data was stored.
    """
    key = _resolve_key(key)
    value_type = type(value)

    if format is None:
        hint = self._driver.get_format_hint(key)
        format = AbstractDataset.infer_format(value_type, hint)

    dataset_cls = AbstractDataset.get(value_type, format)
    options = options or {}
    dataset = dataset_cls(**options)
    data = dataset.serialize(value)
    return self._driver.save(key, data)

load_dataset

load_dataset(
    key: str,
    return_type: type[R],
    *,
    format: str | None = None,
    store: DatasetStore | str | None = None,
    options: dict[str, Any] | None = None,
) -> R
load_dataset(
    key: str,
    *,
    format: str | None = None,
    store: DatasetStore | str | None = None,
    options: dict[str, Any] | None = None,
) -> Any
load_dataset(
    key: str,
    return_type: type[R] | None = None,
    *,
    format: str | None = None,
    store: DatasetStore | str | None = None,
    options: dict[str, Any] | None = None,
) -> R | Any

Load a dataset from the active (or explicitly provided) dataset store.

Parameters:

Name Type Description Default
key str

Storage key/path. May contain {param} placeholders that are resolved from the current task's bound arguments.

required
return_type type[R] | None

Expected Python type for deserialization dispatch.

None
format str | None

Serialization format hint (e.g. "pickle").

None
store DatasetStore | str | None

Explicit DatasetStore or string path. When None, the store is resolved from the context chain.

None
options dict[str, Any] | None

Additional options forwarded to the Dataset constructor.

None

Returns:

Type Description
R | Any

The deserialized Python object.

Source code in src/daglite/composers.py
def load_dataset(
    key: str,
    return_type: type[R] | None = None,
    *,
    format: str | None = None,
    store: DatasetStore | str | None = None,
    options: dict[str, Any] | None = None,
) -> R | Any:
    """
    Load a dataset from the active (or explicitly provided) dataset store.

    Args:
        key: Storage key/path.  May contain ``{param}`` placeholders that are resolved from the
            current task's bound arguments.
        return_type: Expected Python type for deserialization dispatch.
        format: Serialization format hint (e.g. ``"pickle"``).
        store: Explicit ``DatasetStore`` or string path.  When ``None``, the
            store is resolved from the context chain.
        options: Additional options forwarded to the Dataset constructor.

    Returns:
        The deserialized Python object.
    """
    resolved_store = resolve_dataset_store(store)

    hook = resolve_hook()
    metadata = resolve_task_metadata()

    hook_kw = {
        "key": key,
        "return_type": return_type,
        "format": format,
        "options": options,
        "metadata": metadata,
    }

    hook.before_dataset_load(**hook_kw)

    t0 = time.perf_counter()
    result = resolved_store.load(key, return_type=return_type, format=format, options=options)
    duration = time.perf_counter() - t0

    hook.after_dataset_load(**hook_kw, result=result, duration=duration)

    return result

save_dataset

save_dataset(
    key: str,
    value: Any,
    *,
    format: str | None = None,
    store: DatasetStore | str | None = None,
    options: dict[str, Any] | None = None,
) -> str

Save a value to the active (or explicitly provided) dataset store.

When a DatasetReporter is available (inside a session), the save is routed through the reporter so that process/remote backends push the write back to the coordinator.

The store is resolved from the context chain: explicit argument -> task context -> session context -> global settings.

Parameters:

Name Type Description Default
key str

Storage key/path. May contain {param} placeholders resolved from the current task's bound arguments.

required
value Any

The Python object to serialize and persist.

required
format str | None

Serialization format hint (e.g. "pickle").

None
store DatasetStore | str | None

Explicit DatasetStore or string path. When None, the store is resolved from the context chain.

None
options dict[str, Any] | None

Additional options forwarded to the Dataset constructor.

None

Returns:

Type Description
str

The actual path where data was stored.

Source code in src/daglite/composers.py
def save_dataset(
    key: str,
    value: Any,
    *,
    format: str | None = None,
    store: DatasetStore | str | None = None,
    options: dict[str, Any] | None = None,
) -> str:
    """
    Save a value to the active (or explicitly provided) dataset store.

    When a ``DatasetReporter`` is available (inside a session), the save is routed through
    the reporter so that process/remote backends push the write back to the coordinator.

    The store is resolved from the context chain: explicit argument -> task context
    -> session context -> global settings.

    Args:
        key: Storage key/path.  May contain ``{param}`` placeholders resolved from the current
            task's bound arguments.
        value: The Python object to serialize and persist.
        format: Serialization format hint (e.g. ``"pickle"``).
        store: Explicit ``DatasetStore`` or string path.  When ``None``, the
            store is resolved from the context chain.
        options: Additional options forwarded to the Dataset constructor.

    Returns:
        The actual path where data was stored.
    """
    resolved_store = resolve_dataset_store(store)

    metadata = resolve_task_metadata()
    reporter = resolve_dataset_reporter()

    # Route through the reporter when available (handles coordination + hooks).
    if reporter is not None:
        reporter.save(key, value, resolved_store, format=format, options=options, metadata=metadata)
        return key

    # No reporter (bare call outside session) — save directly with hook dispatch.
    hook = resolve_hook()
    hook_kw: dict[str, Any] = {
        "key": key,
        "value": value,
        "format": format,
        "options": options,
        "metadata": metadata,
    }

    hook.before_dataset_save(**hook_kw)

    path = resolved_store.save(key, value, format=format, options=options)

    hook.after_dataset_save(**hook_kw)

    return path