Skip to content

Logging

logging

Default plugins shipped with daglite.

CentralizedLoggingPlugin

Bases: EventHandlerPlugin

Plugin that enables centralized logging via the reporter system.

This plugin centralizes logs from out-of-process or distributed workers to the coordinator. On the worker side, log records are sent to the coordinator using the reporter system. On the coordinator side, log records are reconstructed and emitted through the standard logging framework.

Parameters:

Name Type Description Default
level int

Minimum log level to handle on coordinator side (default: WARNING).

WARNING
Source code in src/daglite/logging/plugin.py
class CentralizedLoggingPlugin(EventHandlerPlugin):
    """
    Plugin that enables centralized logging via the reporter system.

    This plugin centralizes logs from out-of-process or distributed workers to the coordinator. On
    the worker side, log records are sent to the coordinator using the reporter system. On the
    coordinator side, log records are reconstructed and emitted through the standard logging
    framework.

    Args:
        level: Minimum log level to handle on coordinator side (default: WARNING).
    """

    def __init__(self, level: int = logging.WARNING):
        self._level = level

    def register_event_handlers(self, registry: EventRegistry) -> None:
        """
        Register coordinator-side handler for log events.

        Args:
            registry: PluginEvent registry for registering handlers
        """
        registry.register(LOGGER_EVENT, self._handle_log_event)

    def _handle_log_event(self, event_type: Hashable, event_data: dict[str, Any]) -> None:
        """Handles log event from worker."""

        logger_name = event_data.get("name", "daglite")
        level = event_data.get("level", "INFO")
        message = event_data.get("message", "")
        exc_info_str = event_data.get("exc_info")
        all_extra = event_data.get("extra", {})

        # Filter based on the plugin's configured minimum level
        log_level = getattr(logging, level, logging.INFO)
        if log_level < self._level:
            return

        # Format message with exception info if present
        if exc_info_str:
            message = f"{message}\n{exc_info_str}"

        # Separate standard LogRecord fields from custom extra fields
        # Standard fields must be passed as makeRecord parameters, not in extra dict
        standard_fields = {
            "filename",
            "pathname",
            "module",
            "funcName",
            "lineno",
            "created",
            "msecs",
            "relativeCreated",
            "process",
            "processName",
            "thread",
            "threadName",
            "taskName",
            "asctime",  # Generated by formatters, not allowed in extra
        }

        extra = {k: v for k, v in all_extra.items() if k not in standard_fields}

        # Emit record to coordinator-side logger (excluding ReporterHandler to avoid loops)
        base_logger = logging.getLogger(logger_name or DEFAULT_LOGGER_NAME_TASKS)
        record = base_logger.makeRecord(
            name=base_logger.name,
            level=log_level,
            fn=all_extra.get("filename", ""),
            lno=all_extra.get("lineno", 0),
            msg=message,
            args=(),
            exc_info=None,
            extra=extra,
        )

        # Restore standard fields that makeRecord doesn't set via parameters
        for field in [
            "pathname",
            "module",
            "funcName",
            "created",
            "msecs",
            "relativeCreated",
            "process",
            "processName",
            "thread",
            "threadName",
            "taskName",
        ]:
            if field in all_extra:
                setattr(record, field, all_extra[field])

        # Mark record to prevent re-emission by ReporterHandler (avoid infinite loops)
        setattr(record, "_daglite_already_forwarded", True)

        # Use normal logging flow with configured propagation settings
        base_logger.handle(record)

register_event_handlers

register_event_handlers(registry: EventRegistry) -> None

Register coordinator-side handler for log events.

Parameters:

Name Type Description Default
registry EventRegistry

PluginEvent registry for registering handlers

required
Source code in src/daglite/logging/plugin.py
def register_event_handlers(self, registry: EventRegistry) -> None:
    """
    Register coordinator-side handler for log events.

    Args:
        registry: PluginEvent registry for registering handlers
    """
    registry.register(LOGGER_EVENT, self._handle_log_event)

LifecycleLoggingPlugin

Bases: CentralizedLoggingPlugin, SerializablePlugin

Plugin that logs session and task lifecycle events.

Parameters:

Name Type Description Default
name str | None

Optional logger name to use (default: "daglite").

None
level int

Optional minimum log level to handle on coordinator side (default: INFO).

INFO
Source code in src/daglite/logging/plugin.py
class LifecycleLoggingPlugin(CentralizedLoggingPlugin, SerializablePlugin):
    """
    Plugin that logs session and task lifecycle events.

    Args:
        name: Optional logger name to use (default: "daglite").
        level: Optional minimum log level to handle on coordinator side (default: INFO).
    """

    def __init__(
        self,
        name: str | None = None,
        level: int = logging.INFO,
        config: dict[str, Any] | None = None,
    ):
        super().__init__(level=level)
        self._logger = get_logger(name)

        # Set logger level to ensure debug messages aren't filtered out.
        self._logger.logger.setLevel(level)

        # Load logging config if not provided
        config = config if config is not None else self._load_default_config()
        self._apply_logging_config(config)

    @property
    def _config_path(self) -> Path:
        """Path to the default logging configuration file."""
        return Path(__file__).parent / "logging.json"

    def _load_default_config(self) -> dict[str, Any]:
        """
        Load default logging configuration from :attr:`_config_path`.

        If the `DAGLITE_DEBUG` environment variable is set (any truthy value), the base `daglite`
        logger is configured with the `file` handler so that internal debug logs (logs outside of
        `daglite.lifecycle` and `daglite.tasks`) are written to `run.log`. By default these are
        suppressed.
        """
        config_path = self._config_path
        if config_path.exists():
            with open(config_path) as f:
                config = json.load(f)
        else:  # pragma: no cover
            raise FileNotFoundError(f"Default logging configuration not found at {config_path}")

        if os.environ.get("DAGLITE_DEBUG"):
            loggers = config.get("loggers", {})
            if "daglite" in loggers and "daglite.lifecycle" in loggers:  # pragma: no branch
                loggers["daglite"]["handlers"] = loggers["daglite.lifecycle"]["handlers"]

        return config

    def _apply_logging_config(self, config: dict[str, Any]) -> None:
        """Apply logging configuration using dictConfig."""
        logging.config.dictConfig(config)

    @override
    def to_config(self) -> dict[str, Any]:
        return {}

    @classmethod
    @override
    def from_config(cls, config: dict[str, Any]) -> "LifecycleLoggingPlugin":
        return cls()  # Use defaults when deserializing on workers

    def register_event_handlers(self, registry: EventRegistry) -> None:
        """Register event handlers for task lifecycle events."""
        super().register_event_handlers(registry)
        registry.register("daglite-logging-node-start", self._handle_node_start)
        registry.register("daglite-logging-node-complete", self._handle_node_complete)
        registry.register("daglite-logging-node-fail", self._handle_node_fail)
        registry.register("daglite-logging-node-retry", self._handle_node_retry)
        registry.register("daglite-logging-node-retry-result", self._handle_node_retry_result)

    # region Session hooks

    @hook_impl
    def before_session_start(self, session_id: UUID) -> None:
        self._logger.info(f"Starting session {session_id}")

    @hook_impl
    def after_session_end(self, session_id: UUID, duration: float) -> None:
        self._logger.info(f"Session {session_id} completed in {format_duration(duration)}")

    # region Node hooks

    @hook_impl
    def before_node_execute(
        self,
        metadata: TaskMetadata,
        reporter: EventReporter | None,
    ) -> None:
        if reporter:
            data = {
                "node_id": metadata.id,
                "node_key": metadata.name,
                "backend_name": metadata.backend,
            }
            reporter.report("daglite-logging-node-start", data)
        else:
            backend = metadata.backend or "inline"
            self._logger.info(f"Task '{metadata.name}' - Starting task using {backend} backend")

    @hook_impl
    def after_node_execute(
        self,
        metadata: TaskMetadata,
        result: Any,
        duration: float,
        reporter: EventReporter | None,
    ) -> None:
        if reporter:
            data = {
                "node_id": metadata.id,
                "node_key": metadata.name,
                "duration": duration,
            }
            reporter.report("daglite-logging-node-complete", data)
        else:
            self._logger.info(
                f"Task '{metadata.name}' - Completed task successfully "
                f"in {format_duration(duration)}"
            )

    @hook_impl
    def on_node_error(
        self,
        metadata: TaskMetadata,
        error: Exception,
        duration: float,
        reporter: EventReporter | None,
    ) -> None:
        if reporter:
            data = {
                "node_id": metadata.id,
                "node_key": metadata.name,
                "error": str(error),
                "error_type": type(error).__name__,
                "duration": duration,
            }
            reporter.report("daglite-logging-node-fail", data)
        else:
            error_type = type(error).__name__
            self._logger.error(
                f"Task '{metadata.name}' - Failed after {format_duration(duration)}: "
                f"{error_type}: {error}"
            )

    @hook_impl
    def before_node_retry(
        self,
        metadata: TaskMetadata,
        attempt: int,
        error: Exception,
        reporter: EventReporter | None,
    ) -> None:
        if reporter:
            data = {
                "node_id": metadata.id,
                "node_key": metadata.name,
                "attempt": attempt,
                "error": str(error),
                "error_type": type(error).__name__,
            }
            reporter.report("daglite-logging-node-retry", data)
        else:
            error_type = type(error).__name__
            self._logger.warning(
                f"Task '{metadata.name}' - Retrying after failure (attempt {attempt}): "
                f"{error_type}: {error}"
            )

    @hook_impl
    def after_node_retry(
        self,
        metadata: TaskMetadata,
        attempt: int,
        succeeded: bool,
        reporter: EventReporter | None,
    ) -> None:
        if reporter:
            data = {
                "node_id": metadata.id,
                "node_key": metadata.name,
                "attempt": attempt,
                "succeeded": succeeded,
            }
            reporter.report("daglite-logging-node-retry-result", data)
        else:
            if succeeded:
                self._logger.info(f"Task '{metadata.name}' - Retry succeeded on attempt {attempt}")
            else:
                self._logger.debug(f"Task '{metadata.name}' - Retry attempt {attempt} failed")

    @hook_impl
    def on_cache_hit(
        self,
        metadata: TaskMetadata,
        key: str,
        value: Any,
        reporter: EventReporter | None,
    ) -> None:
        self._logger.info(
            f"Task '{metadata.name}' - Using cached result (key={key})",
            extra=build_task_extra(metadata.id, metadata.name),
        )

    @hook_impl
    def before_dataset_save(
        self,
        key: str,
        value: Any,
        format: str | None,
        options: dict[str, Any] | None,
        metadata: TaskMetadata | None = None,
    ) -> None:
        suffix = f" (format={format})" if format else ""
        extra = build_task_extra(metadata.id, metadata.name) if metadata else {}
        self._logger.debug(f"Saving dataset to '{key}'{suffix}", extra=extra)

    @hook_impl
    def after_dataset_save(
        self,
        key: str,
        value: Any,
        format: str | None,
        options: dict[str, Any] | None,
        metadata: TaskMetadata | None = None,
    ) -> None:
        suffix = f" (format={format})" if format else ""
        extra = build_task_extra(metadata.id, metadata.name) if metadata else {}
        self._logger.info(f"Saved dataset to '{key}'{suffix}", extra=extra)

    @hook_impl
    def before_dataset_load(
        self,
        key: str,
        return_type: type | None,
        format: str | None,
        options: dict[str, Any] | None,
        metadata: TaskMetadata | None = None,
    ) -> None:
        suffix = f" (format={format})" if format else ""
        extra = build_task_extra(metadata.id, metadata.name) if metadata else {}
        self._logger.debug(f"Loading dataset from '{key}'{suffix}", extra=extra)

    @hook_impl
    def after_dataset_load(
        self,
        key: str,
        return_type: type | None,
        format: str | None,
        options: dict[str, Any] | None,
        result: Any,
        duration: float,
        metadata: TaskMetadata | None = None,
    ) -> None:
        suffix = f" (format={format})" if format else ""
        extra = build_task_extra(metadata.id, metadata.name) if metadata else {}
        self._logger.info(
            f"Loaded dataset from '{key}'{suffix} in {format_duration(duration)}", extra=extra
        )

    # region Event handlers

    def _handle_node_start(self, event_type: Hashable, event_data: dict[str, Any]) -> None:
        node_id = event_data["node_id"]
        node_key = event_data["node_key"]
        backend_name = event_data.get("backend_name") or "inline"
        self._logger.info(
            f"Task '{node_key}' - Starting task using {backend_name} backend",
            extra=build_task_extra(node_id, node_key),
        )

    def _handle_node_complete(self, event_type: Hashable, event_data: dict[str, Any]) -> None:
        node_id = event_data["node_id"]
        node_key = event_data["node_key"]
        duration = event_data.get("duration", 0)
        self._logger.info(
            f"Task '{node_key}' - Completed task successfully in {format_duration(duration)}",
            extra=build_task_extra(node_id, node_key),
        )

    def _handle_node_fail(self, event_type: Hashable, event_data: dict[str, Any]) -> None:
        node_id = event_data["node_id"]
        node_key = event_data["node_key"]
        error = event_data.get("error", "unknown error")
        error_type = event_data.get("error_type", "Exception")
        duration = event_data.get("duration", 0)
        self._logger.error(
            f"Task '{node_key}' - Failed after {format_duration(duration)}: {error_type}: {error}",
            extra=build_task_extra(node_id, node_key),
        )

    def _handle_node_retry(self, event_type: Hashable, event_data: dict[str, Any]) -> None:
        node_id = event_data["node_id"]
        node_key = event_data["node_key"]
        attempt = event_data["attempt"]
        error_type = event_data.get("error_type", "Exception")
        error = event_data.get("error", "unknown error")
        self._logger.warning(
            f"Task '{node_key}' - Retrying after failure (attempt {attempt}): "
            f"{error_type}: {error}",
            extra=build_task_extra(node_id, node_key),
        )

    def _handle_node_retry_result(self, event_type: Hashable, event_data: dict[str, Any]) -> None:
        node_id = event_data["node_id"]
        node_key = event_data["node_key"]
        attempt = event_data["attempt"]
        succeeded = event_data["succeeded"]
        extra = build_task_extra(node_id, node_key)
        if succeeded:
            self._logger.info(
                f"Task '{node_key}' - Retry succeeded on attempt {attempt}", extra=extra
            )
        else:
            self._logger.debug(f"Task '{node_key}' - Retry attempt {attempt} failed", extra=extra)

register_event_handlers

register_event_handlers(registry: EventRegistry) -> None

Register event handlers for task lifecycle events.

Source code in src/daglite/logging/plugin.py
def register_event_handlers(self, registry: EventRegistry) -> None:
    """Register event handlers for task lifecycle events."""
    super().register_event_handlers(registry)
    registry.register("daglite-logging-node-start", self._handle_node_start)
    registry.register("daglite-logging-node-complete", self._handle_node_complete)
    registry.register("daglite-logging-node-fail", self._handle_node_fail)
    registry.register("daglite-logging-node-retry", self._handle_node_retry)
    registry.register("daglite-logging-node-retry-result", self._handle_node_retry_result)

get_logger

get_logger(name: str | None = None) -> logging.LoggerAdapter

Get a logger instance that works across process/thread/machine boundaries.

This is the main entry point into daglite logging for user code. It returns a standard Python logging.LoggerAdapter that automatically: - Injects task context (daglite_task_name, daglite_task_id, and daglite_task_key) into all log records - Uses the reporter system when available for centralized logging (requires CentralizedLoggingPlugin on coordinator side) - Works with standard Python logging when no reporter is available (Inline execution)

Parameters:

Name Type Description Default
name str | None

Logger name for code organization. If None, uses "daglite.tasks". Typically use __name__ for module-based naming. Note: Task context (daglite_task_name, daglite_task_id, daglite_task_key) is automatically added to log records regardless of logger name and can be used in formatters.

None

Returns:

Type Description
LoggerAdapter

LoggerAdapter instance configured with current execution context and

LoggerAdapter

automatic task context injection

Examples:

>>> from daglite import task
>>> from daglite.logging import get_logger

Simple usage - automatic task context in logs

>>> @task
... def my_task(x):
...     logger = get_logger()  # Uses "daglite.tasks" logger
...     logger.info(f"Processing {x}")  # Output: "Node: my_task - ..."
...     return x * 2

Module-based naming for code organization

>>> @task
... def custom_logging(x):
...     logger = get_logger(__name__)  # Uses module name
...     logger.info(f"Custom log for {x}")  # Still has task_name in output
...     return x

Configure logging with custom format

>>> import logging
>>> logging.basicConfig(
...     format="%(daglite_task_name)s [%(levelname)s] %(message)s", level=logging.INFO
... )
Source code in src/daglite/logging/core.py
def get_logger(name: str | None = None) -> logging.LoggerAdapter:
    """
    Get a logger instance that works across process/thread/machine boundaries.

    This is the main entry point into daglite logging for user code. It returns a standard
    Python `logging.LoggerAdapter` that automatically:
    - Injects task context (`daglite_task_name`, `daglite_task_id`, and `daglite_task_key`) into
      all log records
    - Uses the reporter system when available for centralized logging (requires
      CentralizedLoggingPlugin on coordinator side)
    - Works with standard Python logging when no reporter is available (Inline execution)

    Args:
        name: Logger name for code organization. If None, uses "daglite.tasks". Typically use
            `__name__` for module-based naming. Note: Task context (daglite_task_name,
            daglite_task_id, daglite_task_key) is automatically added to log records
            regardless of logger name and can be used in formatters.

    Returns:
        LoggerAdapter instance configured with current execution context and
        automatic task context injection

    Examples:
        >>> from daglite import task
        >>> from daglite.logging import get_logger

        Simple usage - automatic task context in logs
        >>> @task
        ... def my_task(x):
        ...     logger = get_logger()  # Uses "daglite.tasks" logger
        ...     logger.info(f"Processing {x}")  # Output: "Node: my_task - ..."
        ...     return x * 2

        Module-based naming for code organization
        >>> @task
        ... def custom_logging(x):
        ...     logger = get_logger(__name__)  # Uses module name
        ...     logger.info(f"Custom log for {x}")  # Still has task_name in output
        ...     return x

        Configure logging with custom format
        >>> import logging
        >>> logging.basicConfig(
        ...     format="%(daglite_task_name)s [%(levelname)s] %(message)s", level=logging.INFO
        ... )
    """
    if name is None and resolve_task_metadata() is not None:
        name = DEFAULT_LOGGER_NAME_TASKS

    if name is None:
        name = DEFAULT_LOGGER_NAME_COORD

    base_logger = logging.getLogger(name)

    # Add ReporterHandler only for all non-direct reporters to route logs to coordinator.
    reporter = resolve_event_reporter()
    if reporter and not reporter.is_direct:
        with _logger_lock:
            if not any(isinstance(hlr, _ReporterHandler) for hlr in base_logger.handlers):
                # In worker processes, remove all existing handlers and only use ReporterHandler
                # The coordinator will receive logs via the reporter and re-emit them through
                # its own handlers (console, file, etc.)
                base_logger.handlers.clear()

                handler = _ReporterHandler(reporter)
                base_logger.addHandler(handler)

                # Set logger to DEBUG to prevent filtering before handler. Actual filtering happens
                # on coordinator side via CentralizedLoggingPlugin level.
                if base_logger.getEffectiveLevel() > logging.DEBUG:  # pragma: no branch
                    base_logger.setLevel(logging.DEBUG)

                # Disable propagation to prevent duplicate logging from inherited handlers.
                # Worker processes send logs ONLY via ReporterHandler; coordinator re-emits.
                base_logger.propagate = False

    return _TaskLoggerAdapter(base_logger, {})