From 8151f4b415664e976449c2d41e440aee4db51beb Mon Sep 17 00:00:00 2001 From: Yann Combarnous Date: Mon, 5 May 2025 10:21:37 +0200 Subject: [PATCH 1/7] feat: add typing for generators in tasks --- taskiq_faststream/broker.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/taskiq_faststream/broker.py b/taskiq_faststream/broker.py index 4ed4e7e..c28699b 100644 --- a/taskiq_faststream/broker.py +++ b/taskiq_faststream/broker.py @@ -73,6 +73,8 @@ def task( # type: ignore[override] SendableMessage, typing.Callable[[], SendableMessage], typing.Callable[[], typing.Awaitable[SendableMessage]], + typing.Callable[[], typing.Generator[SendableMessage]], + typing.Callable[[], typing.AsyncGenerator[SendableMessage]], ] = None, *, schedule: list[ScheduledTask], From 3aa40bfde6ba03174abf615f50ada82a8c9ed7a2 Mon Sep 17 00:00:00 2001 From: Pastukhov Nikita Date: Mon, 5 May 2025 18:33:53 +0300 Subject: [PATCH 2/7] Update broker.py --- taskiq_faststream/broker.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/taskiq_faststream/broker.py b/taskiq_faststream/broker.py index c28699b..cee1606 100644 --- a/taskiq_faststream/broker.py +++ b/taskiq_faststream/broker.py @@ -73,8 +73,8 @@ def task( # type: ignore[override] SendableMessage, typing.Callable[[], SendableMessage], typing.Callable[[], typing.Awaitable[SendableMessage]], - typing.Callable[[], typing.Generator[SendableMessage]], - typing.Callable[[], typing.AsyncGenerator[SendableMessage]], + typing.Callable[[], typing.Generator[SendableMessage, None, None]], + typing.Callable[[], typing.AsyncGenerator[SendableMessage, None, None]], ] = None, *, schedule: list[ScheduledTask], From 2a0df3c9b64c7943b7485c9598e69628dc0152ed Mon Sep 17 00:00:00 2001 From: Pastukhov Nikita Date: Mon, 5 May 2025 18:34:57 +0300 Subject: [PATCH 3/7] Update broker.py --- taskiq_faststream/broker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/taskiq_faststream/broker.py b/taskiq_faststream/broker.py index cee1606..34b6b2a 100644 --- a/taskiq_faststream/broker.py +++ b/taskiq_faststream/broker.py @@ -74,7 +74,7 @@ def task( # type: ignore[override] typing.Callable[[], SendableMessage], typing.Callable[[], typing.Awaitable[SendableMessage]], typing.Callable[[], typing.Generator[SendableMessage, None, None]], - typing.Callable[[], typing.AsyncGenerator[SendableMessage, None, None]], + typing.Callable[[], typing.AsyncGenerator[SendableMessage, None]], ] = None, *, schedule: list[ScheduledTask], From beeb3c342bd48f89610bf3b33e04fc013a1346ea Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 12 May 2025 18:28:30 +0000 Subject: [PATCH 4/7] chore(deps): bump the pip group across 1 directory with 2 updates Bumps the pip group with 2 updates in the / directory: [mypy](https://github.com/python/mypy) and [ruff](https://github.com/astral-sh/ruff). Updates `mypy` from 1.11.2 to 1.15.0 - [Changelog](https://github.com/python/mypy/blob/master/CHANGELOG.md) - [Commits](https://github.com/python/mypy/compare/v1.11.2...v1.15.0) Updates `ruff` from 0.11.8 to 0.11.9 - [Release notes](https://github.com/astral-sh/ruff/releases) - [Changelog](https://github.com/astral-sh/ruff/blob/main/CHANGELOG.md) - [Commits](https://github.com/astral-sh/ruff/compare/0.11.8...0.11.9) --- updated-dependencies: - dependency-name: mypy dependency-version: 1.15.0 dependency-type: direct:production update-type: version-update:semver-minor dependency-group: pip - dependency-name: ruff dependency-version: 0.11.9 dependency-type: direct:production update-type: version-update:semver-patch dependency-group: pip ... Signed-off-by: dependabot[bot] --- pyproject.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 5cb62d0..3b483f2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -77,8 +77,8 @@ test = [ dev = [ "taskiq-faststream[test]", - "mypy==1.11.2", - "ruff==0.11.8", + "mypy==1.15.0", + "ruff==0.11.9", "pre-commit >=3.6.0,<5.0.0", ] From 704f1b6355767b655f53ef2b39f2500e66c131e5 Mon Sep 17 00:00:00 2001 From: Elmir Abkerimov Date: Wed, 14 May 2025 20:57:22 +0300 Subject: [PATCH 5/7] fix: StreamSchedule loses messages --- pyproject.toml | 1 + taskiq_faststream/kicker.py | 8 ------ tests/testcase.py | 54 +++++++++++++++++++++++++++++++++++-- 3 files changed, 53 insertions(+), 10 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 5cb62d0..1b1ef56 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -73,6 +73,7 @@ test = [ "coverage[toml]>=7.2.0,<8.0.0", "pytest>=7.4.0,<9", + "freezegun>=1.2.2" ] dev = [ diff --git a/taskiq_faststream/kicker.py b/taskiq_faststream/kicker.py index f2a96eb..2226e5d 100644 --- a/taskiq_faststream/kicker.py +++ b/taskiq_faststream/kicker.py @@ -1,13 +1,5 @@ -from typing import Any - from taskiq.kicker import AsyncKicker, _FuncParams, _ReturnType -from taskiq.message import TaskiqMessage class LabelRespectKicker(AsyncKicker[_FuncParams, _ReturnType]): """Patched kicker doesn't cast labels to str.""" - - def _prepare_message(self, *args: Any, **kwargs: Any) -> TaskiqMessage: - msg = super()._prepare_message(*args, **kwargs) - msg.labels = self.labels - return msg diff --git a/tests/testcase.py b/tests/testcase.py index c22b7ff..d1e62a0 100644 --- a/tests/testcase.py +++ b/tests/testcase.py @@ -1,16 +1,17 @@ import asyncio -from datetime import datetime, timezone +from datetime import datetime, timedelta, timezone from typing import Any from unittest.mock import MagicMock import pytest from faststream.utils.functions import timeout_scope +from freezegun import freeze_time from taskiq import AsyncBroker, TaskiqScheduler from taskiq.cli.scheduler.args import SchedulerArgs from taskiq.cli.scheduler.run import run_scheduler from taskiq.schedule_sources import LabelScheduleSource -from taskiq_faststream import BrokerWrapper +from taskiq_faststream import BrokerWrapper, StreamScheduler @pytest.mark.anyio @@ -67,3 +68,52 @@ async def handler(msg: str) -> None: mock.assert_called_once_with("Hi!") task.cancel() + + async def test_task_multiple_schedules_by_cron( + self, + subject: str, + broker: Any, + event: asyncio.Event, + ) -> None: + """Test cron runs twice via StreamScheduler.""" + received_message = [] + + @broker.subscriber(subject) + async def handler(msg: str) -> None: + received_message.append(msg) + event.set() + + taskiq_broker = self.build_taskiq_broker(broker) + + taskiq_broker.task( + "Hi!", + **{self.subj_name: subject}, + schedule=[ + { + "cron": "* * * * *", + }, + ], + ) + + async with self.test_class(broker): + with freeze_time("00:00:00", tick=True) as frozen_datetime: + task = asyncio.create_task( + run_scheduler( + SchedulerArgs( + scheduler=StreamScheduler( + broker=taskiq_broker, + sources=[LabelScheduleSource(taskiq_broker)], + ), + modules=[], + ), + ), + ) + + await asyncio.wait_for(event.wait(), 2.0) + event.clear() + frozen_datetime.tick(timedelta(minutes=2)) + await asyncio.wait_for(event.wait(), 2.0) + + task.cancel() + + assert received_message == ["Hi!", "Hi!"], received_message From 9bc245236444dbf6aeb8b2aa2b20250a6541e783 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 19 May 2025 18:54:58 +0000 Subject: [PATCH 6/7] chore(deps): bump ruff from 0.11.9 to 0.11.10 in the pip group Bumps the pip group with 1 update: [ruff](https://github.com/astral-sh/ruff). Updates `ruff` from 0.11.9 to 0.11.10 - [Release notes](https://github.com/astral-sh/ruff/releases) - [Changelog](https://github.com/astral-sh/ruff/blob/main/CHANGELOG.md) - [Commits](https://github.com/astral-sh/ruff/compare/0.11.9...0.11.10) --- updated-dependencies: - dependency-name: ruff dependency-version: 0.11.10 dependency-type: direct:production update-type: version-update:semver-patch dependency-group: pip ... Signed-off-by: dependabot[bot] --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 3b483f2..91888e1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -78,7 +78,7 @@ test = [ dev = [ "taskiq-faststream[test]", "mypy==1.15.0", - "ruff==0.11.9", + "ruff==0.11.10", "pre-commit >=3.6.0,<5.0.0", ] From 11bcebd5e6a9f4835362a9d50f1c0be952ad7a3a Mon Sep 17 00:00:00 2001 From: Pastukhov Nikita Date: Thu, 22 May 2025 20:25:50 +0300 Subject: [PATCH 7/7] chore: bump version --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index a0e9799..1bfe704 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "taskiq-faststream" -version = "0.2.1" +version = "0.2.2" description = "FastStream - taskiq integration to schedule FastStream tasks" readme = "README.md" authors = [