From d3fdee6bc545a116ea076feeda97716bcb160b40 Mon Sep 17 00:00:00 2001 From: Alex Kondratev Date: Sat, 22 Nov 2025 23:58:43 +0300 Subject: [PATCH 1/4] fix opentelemetry tests --- taskiq/instrumentation.py | 2 +- .../test_auto_instrumentation.py | 19 ++++++---- tests/opentelemetry/test_tasks.py | 38 ++++++++++++------- 3 files changed, 38 insertions(+), 21 deletions(-) diff --git a/taskiq/instrumentation.py b/taskiq/instrumentation.py index c36a02ac..173faeb8 100644 --- a/taskiq/instrumentation.py +++ b/taskiq/instrumentation.py @@ -114,7 +114,7 @@ def uninstrument_broker(self, broker: AsyncBroker) -> None: def instrumentation_dependencies(self) -> Collection[str]: """This function tells which library this instrumentor instruments.""" - return ("taskiq >= 0.0.1",) + return ("taskiq >= 0.0.0",) @classmethod def _start_listen_with_initialize(cls, args: WorkerArgs) -> None: diff --git a/tests/opentelemetry/test_auto_instrumentation.py b/tests/opentelemetry/test_auto_instrumentation.py index 65d5660f..83d4b835 100644 --- a/tests/opentelemetry/test_auto_instrumentation.py +++ b/tests/opentelemetry/test_auto_instrumentation.py @@ -1,3 +1,5 @@ +import asyncio + from opentelemetry.test.test_base import TestBase from opentelemetry.trace import SpanKind, StatusCode @@ -6,7 +8,7 @@ class TestTaskiqAutoInstrumentation(TestBase): - async def test_auto_instrument(self) -> None: + def test_auto_instrument(self) -> None: TaskiqInstrumentor().instrument() broker = InMemoryBroker(await_inplace=True) @@ -15,8 +17,11 @@ async def test_auto_instrument(self) -> None: async def task_add(a: float, b: float) -> float: return a + b - await task_add.kiq(1, 2) - await broker.wait_all() + async def test() -> None: + await task_add.kiq(1, 2) + await broker.wait_all() + + asyncio.run(test()) spans = self.sorted_spans(self.memory_exporter.get_finished_spans()) self.assertEqual(len(spans), 2) @@ -25,7 +30,7 @@ async def task_add(a: float, b: float) -> float: self.assertEqual( consumer.name, - "execute/tests.test_auto_instrumentation:task_add", + "execute/tests.opentelemetry.test_auto_instrumentation:task_add", f"{consumer._end_time}:{producer._end_time}", ) self.assertEqual(consumer.kind, SpanKind.CONSUMER) @@ -33,7 +38,7 @@ async def task_add(a: float, b: float) -> float: consumer, { "taskiq.action": "execute", - "taskiq.task_name": "tests.test_auto_instrumentation:task_add", + "taskiq.task_name": "tests.opentelemetry.test_auto_instrumentation:task_add", # noqa: E501 }, ) @@ -43,14 +48,14 @@ async def task_add(a: float, b: float) -> float: self.assertEqual( producer.name, - "send/tests.test_auto_instrumentation:task_add", + "send/tests.opentelemetry.test_auto_instrumentation:task_add", ) self.assertEqual(producer.kind, SpanKind.PRODUCER) self.assertSpanHasAttributes( producer, { "taskiq.action": "send", - "taskiq.task_name": "tests.test_auto_instrumentation:task_add", + "taskiq.task_name": "tests.opentelemetry.test_auto_instrumentation:task_add", # noqa: E501 }, ) diff --git a/tests/opentelemetry/test_tasks.py b/tests/opentelemetry/test_tasks.py index d9524e77..c79096c9 100644 --- a/tests/opentelemetry/test_tasks.py +++ b/tests/opentelemetry/test_tasks.py @@ -9,6 +9,7 @@ from opentelemetry.trace import Span, SpanKind, StatusCode from wrapt import wrap_function_wrapper +from taskiq import TaskiqResult from taskiq.instrumentation import TaskiqInstrumentor from taskiq.middlewares import opentelemetry_middleware @@ -25,11 +26,14 @@ def tearDown(self) -> None: super().tearDown() TaskiqInstrumentor().uninstrument_broker(broker) - async def test_task(self) -> None: + def test_task(self) -> None: TaskiqInstrumentor().instrument_broker(broker) - await task_add.kiq(1, 2) - await broker.wait_all() + async def test() -> None: + await task_add.kiq(1, 2) + await broker.wait_all() + + asyncio.run(test()) spans = self.sorted_spans(self.memory_exporter.get_finished_spans()) self.assertEqual(len(spans), 2) @@ -71,11 +75,14 @@ async def test_task(self) -> None: self.assertEqual(consumer.parent.span_id, producer.context.span_id) self.assertEqual(consumer.context.trace_id, producer.context.trace_id) - async def test_task_raises(self) -> None: + def test_task_raises(self) -> None: TaskiqInstrumentor().instrument_broker(broker) - await task_raises.kiq() - await broker.wait_all() + async def test() -> None: + await task_raises.kiq() + await broker.wait_all() + + asyncio.run(test()) spans = self.sorted_spans(self.memory_exporter.get_finished_spans()) self.assertEqual(len(spans), 2) @@ -129,7 +136,7 @@ async def test_task_raises(self) -> None: self.assertEqual(consumer.parent.span_id, producer.context.span_id) self.assertEqual(consumer.context.trace_id, producer.context.trace_id) - async def test_uninstrument(self) -> None: + def test_uninstrument(self) -> None: TaskiqInstrumentor().instrument_broker(broker) TaskiqInstrumentor().uninstrument_broker(broker) @@ -142,18 +149,21 @@ async def test() -> None: spans = self.memory_exporter.get_finished_spans() self.assertEqual(len(spans), 0) - async def test_baggage(self) -> None: + def test_baggage(self) -> None: TaskiqInstrumentor().instrument_broker(broker) + async def test() -> TaskiqResult[Any]: + task = await task_returns_baggage.kiq() + return await task.wait_result(timeout=2) + ctx = baggage.set_baggage("key", "value") context.attach(ctx) - task = await task_returns_baggage.kiq() - result = await task.wait_result(timeout=2) + result = asyncio.run(test()) self.assertEqual(result.return_value, {"key": "value"}) - async def test_task_not_instrumented_does_not_raise(self) -> None: + def test_task_not_instrumented_does_not_raise(self) -> None: def _retrieve_context_wrapper_none_token( wrapped: Callable[ [Any], @@ -183,9 +193,11 @@ def _retrieve_context_wrapper_none_token( TaskiqInstrumentor().instrument_broker(broker) - task = await task_add.kiq(1, 2) - result = await task.wait_result(timeout=2) + async def test() -> TaskiqResult[float]: + task = await task_add.kiq(1, 2) + return await task.wait_result(timeout=2) + result = asyncio.run(test()) spans = self.sorted_spans(self.memory_exporter.get_finished_spans()) self.assertEqual(len(spans), 2) From 326bd241b1543808c7816b4b69a1f5ce05d68858 Mon Sep 17 00:00:00 2001 From: soapun Date: Sun, 23 Nov 2025 10:07:37 +0300 Subject: [PATCH 2/4] fix pydantic v1 --- taskiq/middlewares/opentelemetry_middleware.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/taskiq/middlewares/opentelemetry_middleware.py b/taskiq/middlewares/opentelemetry_middleware.py index c6e68937..d5869e6a 100644 --- a/taskiq/middlewares/opentelemetry_middleware.py +++ b/taskiq/middlewares/opentelemetry_middleware.py @@ -1,7 +1,10 @@ import logging from contextlib import AbstractContextManager +from importlib.metadata import version from typing import Any, Dict, Optional, Tuple, TypeVar +from packaging.version import Version, parse + try: import opentelemetry # noqa: F401 except ImportError as exc: @@ -10,6 +13,7 @@ "Please install 'taskiq[opentelemetry]'.", ) from exc + from opentelemetry import context as context_api from opentelemetry import trace from opentelemetry.metrics import Meter, MeterProvider, get_meter @@ -27,6 +31,14 @@ # Taskiq Context key CTX_KEY = "__otel_task_span" +PYDANTIC_VER = parse(version("pydantic")) +IS_PYDANTIC1 = Version("2.0") > PYDANTIC_VER +if IS_PYDANTIC1: + if TaskiqMessage.__exclude_fields__: # type: ignore[attr-defined] + TaskiqMessage.__exclude_fields__.update(CTX_KEY) # type: ignore + else: + TaskiqMessage.__exclude_fields__ = {CTX_KEY} # type: ignore + # Taskiq Context attributes TASKIQ_CONTEXT_ATTRIBUTES = [ "_retries", @@ -95,7 +107,7 @@ def attach_context( if ctx_dict is None: ctx_dict = {} - setattr(message, CTX_KEY, ctx_dict) + object.__setattr__(message, CTX_KEY, ctx_dict) ctx_dict[(message.task_id, is_publish)] = (span, activation, token) From 5e46f18b0bbe1166ee0801bc54a6f8a1a156a7f0 Mon Sep 17 00:00:00 2001 From: soapun Date: Sun, 23 Nov 2025 13:18:53 +0300 Subject: [PATCH 3/4] fix 3.9 async test --- taskiq/middlewares/opentelemetry_middleware.py | 4 ++++ tests/opentelemetry/test_auto_instrumentation.py | 10 +++++----- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/taskiq/middlewares/opentelemetry_middleware.py b/taskiq/middlewares/opentelemetry_middleware.py index d5869e6a..c046e582 100644 --- a/taskiq/middlewares/opentelemetry_middleware.py +++ b/taskiq/middlewares/opentelemetry_middleware.py @@ -31,6 +31,8 @@ # Taskiq Context key CTX_KEY = "__otel_task_span" +# unlike pydantic v2, v1 includes CTX_KEY by default +# excluding it here PYDANTIC_VER = parse(version("pydantic")) IS_PYDANTIC1 = Version("2.0") > PYDANTIC_VER if IS_PYDANTIC1: @@ -107,6 +109,8 @@ def attach_context( if ctx_dict is None: ctx_dict = {} + # use object.__setattr__ directly + # to skip pydantic v1 setattr object.__setattr__(message, CTX_KEY, ctx_dict) ctx_dict[(message.task_id, is_publish)] = (span, activation, token) diff --git a/tests/opentelemetry/test_auto_instrumentation.py b/tests/opentelemetry/test_auto_instrumentation.py index 83d4b835..c7290869 100644 --- a/tests/opentelemetry/test_auto_instrumentation.py +++ b/tests/opentelemetry/test_auto_instrumentation.py @@ -11,13 +11,13 @@ class TestTaskiqAutoInstrumentation(TestBase): def test_auto_instrument(self) -> None: TaskiqInstrumentor().instrument() - broker = InMemoryBroker(await_inplace=True) + async def test() -> None: + broker = InMemoryBroker(await_inplace=True) - @broker.task - async def task_add(a: float, b: float) -> float: - return a + b + @broker.task + async def task_add(a: float, b: float) -> float: + return a + b - async def test() -> None: await task_add.kiq(1, 2) await broker.wait_all() From 19e37b2cd0d0c0dae593052c2afafdeb1ec3eff8 Mon Sep 17 00:00:00 2001 From: soapun Date: Sun, 23 Nov 2025 14:49:21 +0300 Subject: [PATCH 4/4] increase timeout --- tests/api/test_scheduler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/api/test_scheduler.py b/tests/api/test_scheduler.py index 46064205..b67eb481 100644 --- a/tests/api/test_scheduler.py +++ b/tests/api/test_scheduler.py @@ -16,7 +16,7 @@ async def test_successful() -> None: @broker.task(schedule=[{"time": datetime.now(timezone.utc) - timedelta(seconds=1)}]) def _() -> None: ... - msg = await asyncio.wait_for(broker.queue.get(), 1) + msg = await asyncio.wait_for(broker.queue.get(), 2) assert msg scheduler_task.cancel()