From f98d672e8196e721f1a8e83c6916d879610c50c7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gr=C3=A9goire=20Deveaux?= Date: Wed, 17 Dec 2025 14:48:20 +0100 Subject: [PATCH] Make ConcurrentMultiSpanProcessor fork safe --- .../src/opentelemetry/sdk/trace/__init__.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py index 0e7e1f6db3..94436cae2b 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py @@ -18,6 +18,7 @@ import concurrent.futures import json import logging +import os import threading import traceback import typing @@ -238,8 +239,16 @@ def __init__(self, num_threads: int = 2): # iterating through it on "on_start" and "on_end". self._span_processors = () # type: Tuple[SpanProcessor, ...] self._lock = threading.Lock() - self._executor = concurrent.futures.ThreadPoolExecutor( - max_workers=num_threads + self._num_threads = num_threads + self._executor = self._build_executor() + if hasattr(os, "register_at_fork"): + # Only the main thread is kept in forked processed, the executor + # needs to be re-instantiated to get a fresh pool of threads: + os.register_at_fork(after_in_child=self._build_executor) + + def _build_executor(self) -> concurrent.futures.ThreadPoolExecutor: + return concurrent.futures.ThreadPoolExecutor( + max_workers=self._num_threads ) def add_span_processor(self, span_processor: SpanProcessor) -> None: