Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 80 additions & 0 deletions temporalio/_log_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
"""Internal utilities for Temporal logging.

This module is internal and may change at any time.
"""

from __future__ import annotations

from collections.abc import Mapping, MutableMapping
from typing import Any, Literal

TemporalLogExtraMode = Literal["dict", "flatten"]
"""Mode controlling how Temporal context is added to log record extra.

Values:
dict: (default) Add context as a nested dictionary under a single key.
This is the original behavior. Suitable for logging handlers that
support nested structures.
flatten: Add each context field as a separate top-level key with a
namespaced prefix. Values that are not primitives (str/int/float/bool)
are converted to strings. This mode is recommended for OpenTelemetry
and other logging pipelines that require flat, scalar attributes.
"""


def _apply_temporal_context_to_extra(
extra: MutableMapping[str, Any],
*,
key: str,
ctx: Mapping[str, Any],
mode: TemporalLogExtraMode,
) -> None:
"""Apply temporal context to log record extra based on the configured mode.

Args:
extra: The mutable extra dict to update.
key: The base key (e.g., "temporal_workflow"). In dict mode this is
used directly. In flatten mode the prefix is derived by replacing
underscores with dots (e.g., "temporal.workflow").
ctx: The context mapping containing temporal fields.
mode: The mode controlling how context is added.
"""
if mode == "flatten":
prefix = key.replace("_", ".")
for k, v in ctx.items():
# Ensure value is a primitive type safe for OTel attributes
if not isinstance(v, (str, int, float, bool, type(None))):
v = str(v)
extra[f"{prefix}.{k}"] = v
else:
extra[key] = dict(ctx)


def _update_temporal_context_in_extra(
extra: MutableMapping[str, Any],
*,
key: str,
update_ctx: Mapping[str, Any],
mode: TemporalLogExtraMode,
) -> None:
"""Update existing temporal context in extra with additional fields.

This is used when adding update info to existing workflow context.

Args:
extra: The mutable extra dict to update.
key: The base key (e.g., "temporal_workflow"). In dict mode this is
used directly. In flatten mode the prefix is derived by replacing
underscores with dots (e.g., "temporal.workflow").
update_ctx: Additional context fields to add/update.
mode: The mode controlling how context is added.
"""
if mode == "flatten":
prefix = key.replace("_", ".")
for k, v in update_ctx.items():
# Ensure value is a primitive type safe for OTel attributes
if not isinstance(v, (str, int, float, bool, type(None))):
v = str(v)
extra[f"{prefix}.{k}"] = v
else:
extra.setdefault(key, {}).update(update_ctx)
13 changes: 12 additions & 1 deletion temporalio/activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import temporalio.common
import temporalio.converter

from ._log_utils import TemporalLogExtraMode, _apply_temporal_context_to_extra
from .types import CallableType

if TYPE_CHECKING:
Expand Down Expand Up @@ -500,6 +501,10 @@ class LoggerAdapter(logging.LoggerAdapter):
value will be added to the ``extra`` dictionary with the entire
activity info, making it present on the ``LogRecord.__dict__`` for
use by others. Default is False.
temporal_extra_mode: Controls how activity context is added to log
``extra``. Default is ``"dict"`` (current behavior). Set to
``"flatten"`` for OpenTelemetry compatibility (scalar attributes
with ``temporal.activity.`` prefix).
"""

def __init__(self, logger: logging.Logger, extra: Mapping[str, Any] | None) -> None:
Expand All @@ -508,6 +513,7 @@ def __init__(self, logger: logging.Logger, extra: Mapping[str, Any] | None) -> N
self.activity_info_on_message = True
self.activity_info_on_extra = True
self.full_activity_info_on_extra = False
self.temporal_extra_mode: TemporalLogExtraMode = "dict"

def process(
self, msg: Any, kwargs: MutableMapping[str, Any]
Expand All @@ -525,7 +531,12 @@ def process(
if self.activity_info_on_extra:
# Extra can be absent or None, this handles both
extra = kwargs.get("extra", None) or {}
extra["temporal_activity"] = context.logger_details
_apply_temporal_context_to_extra(
extra,
key="temporal_activity",
ctx=context.logger_details,
mode=self.temporal_extra_mode,
)
kwargs["extra"] = extra
if self.full_activity_info_on_extra:
# Extra can be absent or None, this handles both
Expand Down
24 changes: 22 additions & 2 deletions temporalio/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@
import temporalio.workflow
from temporalio.nexus._util import ServiceHandlerT

from ._log_utils import (
TemporalLogExtraMode,
_apply_temporal_context_to_extra,
_update_temporal_context_in_extra,
)
from .types import (
AnyType,
CallableAsyncNoParam,
Expand Down Expand Up @@ -1569,6 +1574,10 @@ class LoggerAdapter(logging.LoggerAdapter):
use by others. Default is False.
log_during_replay: Boolean for whether logs should occur during replay.
Default is False.
temporal_extra_mode: Controls how workflow context is added to log
``extra``. Default is ``"dict"`` (current behavior). Set to
``"flatten"`` for OpenTelemetry compatibility (scalar attributes
with ``temporal.workflow.`` prefix).

Values added to ``extra`` are merged with the ``extra`` dictionary from a
logging call, with values from the logging call taking precedence. I.e. the
Expand All @@ -1582,6 +1591,7 @@ def __init__(self, logger: logging.Logger, extra: Mapping[str, Any] | None) -> N
self.workflow_info_on_extra = True
self.full_workflow_info_on_extra = False
self.log_during_replay = False
self.temporal_extra_mode: TemporalLogExtraMode = "dict"
self.disable_sandbox = False

def process(
Expand All @@ -1602,7 +1612,12 @@ def process(
if self.workflow_info_on_message:
msg_extra.update(workflow_details)
if self.workflow_info_on_extra:
extra["temporal_workflow"] = workflow_details
_apply_temporal_context_to_extra(
extra,
key="temporal_workflow",
ctx=workflow_details,
mode=self.temporal_extra_mode,
)
if self.full_workflow_info_on_extra:
extra["workflow_info"] = runtime.workflow_info()
update_info = current_update_info()
Expand All @@ -1611,7 +1626,12 @@ def process(
if self.workflow_info_on_message:
msg_extra.update(update_details)
if self.workflow_info_on_extra:
extra.setdefault("temporal_workflow", {}).update(update_details)
_update_temporal_context_in_extra(
extra,
key="temporal_workflow",
update_ctx=update_details,
mode=self.temporal_extra_mode,
)

kwargs["extra"] = {**extra, **(kwargs.get("extra") or {})}
if msg_extra:
Expand Down
Loading
Loading