From c428e8eba3f4d6ac7483354c0dcb01af812195ff Mon Sep 17 00:00:00 2001 From: Viicos <65306057+Viicos@users.noreply.github.com> Date: Thu, 29 Jan 2026 21:26:45 +0100 Subject: [PATCH 1/4] Update Ruff --- pyproject.toml | 1 + requirements/linting.txt | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 188d3fff..5e3f3ffe 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -76,6 +76,7 @@ exclude_lines = [ ] [tool.ruff] +target-version = "py39" line-length = 120 [tool.ruff.lint] diff --git a/requirements/linting.txt b/requirements/linting.txt index 853bbc79..794b2c06 100644 --- a/requirements/linting.txt +++ b/requirements/linting.txt @@ -16,7 +16,7 @@ mypy-extensions==1.0.0 # via mypy pycparser==2.22 # via cffi -ruff==0.3.4 +ruff==0.14.14 # via -r requirements/linting.in tomli==2.4.0 # via mypy From 2d15e68c61998dc4a8eff5d580439488b91de7d4 Mon Sep 17 00:00:00 2001 From: Viicos <65306057+Viicos@users.noreply.github.com> Date: Thu, 29 Jan 2026 21:26:51 +0100 Subject: [PATCH 2/4] Apply lint rules --- arq/connections.py | 10 +++++----- arq/jobs.py | 20 ++++++++++---------- arq/logs.py | 4 ++-- arq/typing.py | 11 ++++++----- arq/utils.py | 17 ++++++++++------- arq/worker.py | 29 +++++++++++++++-------------- 6 files changed, 48 insertions(+), 43 deletions(-) diff --git a/arq/connections.py b/arq/connections.py index c1058890..a3a3026b 100644 --- a/arq/connections.py +++ b/arq/connections.py @@ -4,7 +4,7 @@ from dataclasses import dataclass from datetime import datetime, timedelta from operator import attrgetter -from typing import TYPE_CHECKING, Any, Callable, List, Optional, Tuple, Union, cast +from typing import TYPE_CHECKING, Any, Callable, Optional, Union, cast from urllib.parse import parse_qs, urlparse from uuid import uuid4 @@ -28,7 +28,7 @@ class RedisSettings: Used by :func:`arq.connections.create_pool` and :class:`arq.worker.Worker`. """ - host: Union[str, List[Tuple[str, int]]] = 'localhost' + host: Union[str, list[tuple[str, int]]] = 'localhost' port: int = 6379 unix_socket_path: Optional[str] = None database: int = 0 @@ -50,7 +50,7 @@ class RedisSettings: sentinel_master: str = 'mymaster' retry_on_timeout: bool = False - retry_on_error: Optional[List[Exception]] = None + retry_on_error: Optional[list[Exception]] = None retry: Optional[Retry] = None @classmethod @@ -189,7 +189,7 @@ async def _get_job_result(self, key: bytes) -> JobResult: r.job_id = job_id return r - async def all_job_results(self) -> List[JobResult]: + async def all_job_results(self) -> list[JobResult]: """ Get results for all jobs in redis. """ @@ -207,7 +207,7 @@ async def _get_job_def(self, job_id: bytes, score: int) -> JobDef: jd.job_id = job_id.decode() return jd - async def queued_jobs(self, *, queue_name: Optional[str] = None) -> List[JobDef]: + async def queued_jobs(self, *, queue_name: Optional[str] = None) -> list[JobDef]: """ Get information about queued, mostly useful when testing. """ diff --git a/arq/jobs.py b/arq/jobs.py index 15b7231e..3d3c639e 100644 --- a/arq/jobs.py +++ b/arq/jobs.py @@ -5,7 +5,7 @@ from dataclasses import dataclass from datetime import datetime from enum import Enum -from typing import Any, Callable, Dict, Optional, Tuple +from typing import Any, Callable, Optional from redis.asyncio import Redis @@ -14,8 +14,8 @@ logger = logging.getLogger('arq.jobs') -Serializer = Callable[[Dict[str, Any]], bytes] -Deserializer = Callable[[bytes], Dict[str, Any]] +Serializer = Callable[[dict[str, Any]], bytes] +Deserializer = Callable[[bytes], dict[str, Any]] class ResultNotFound(RuntimeError): @@ -42,8 +42,8 @@ class JobStatus(str, Enum): @dataclass class JobDef: function: str - args: Tuple[Any, ...] - kwargs: Dict[str, Any] + args: tuple[Any, ...] + kwargs: dict[str, Any] job_try: int enqueue_time: datetime score: Optional[int] @@ -210,8 +210,8 @@ class DeserializationError(SerializationError): def serialize_job( function_name: str, - args: Tuple[Any, ...], - kwargs: Dict[str, Any], + args: tuple[Any, ...], + kwargs: dict[str, Any], job_try: Optional[int], enqueue_time_ms: int, *, @@ -228,8 +228,8 @@ def serialize_job( def serialize_result( function: str, - args: Tuple[Any, ...], - kwargs: Dict[str, Any], + args: tuple[Any, ...], + kwargs: dict[str, Any], job_try: int, enqueue_time_ms: int, success: bool, @@ -291,7 +291,7 @@ def deserialize_job(r: bytes, *, deserializer: Optional[Deserializer] = None) -> def deserialize_job_raw( r: bytes, *, deserializer: Optional[Deserializer] = None -) -> Tuple[str, Tuple[Any, ...], Dict[str, Any], int, int]: +) -> tuple[str, tuple[Any, ...], dict[str, Any], int, int]: if deserializer is None: deserializer = pickle.loads try: diff --git a/arq/logs.py b/arq/logs.py index 2231cbed..c57f627e 100644 --- a/arq/logs.py +++ b/arq/logs.py @@ -1,7 +1,7 @@ -from typing import Any, Dict +from typing import Any -def default_log_config(verbose: bool) -> Dict[str, Any]: +def default_log_config(verbose: bool) -> dict[str, Any]: """ Setup default config. for dictConfig. diff --git a/arq/typing.py b/arq/typing.py index 454cc5b6..e6ca76e4 100644 --- a/arq/typing.py +++ b/arq/typing.py @@ -1,5 +1,6 @@ +from collections.abc import Sequence from datetime import timedelta -from typing import TYPE_CHECKING, Any, Dict, Literal, Optional, Protocol, Sequence, Set, Type, Union +from typing import TYPE_CHECKING, Any, Literal, Optional, Protocol, Union __all__ = ( 'OptionType', @@ -16,7 +17,7 @@ from .cron import CronJob from .worker import Function -OptionType = Union[None, Set[int], int] +OptionType = Union[None, set[int], int] WEEKDAYS = 'mon', 'tues', 'wed', 'thurs', 'fri', 'sat', 'sun' WeekdayOptionType = Union[OptionType, Literal['mon', 'tues', 'wed', 'thurs', 'fri', 'sat', 'sun']] SecondsTimedelta = Union[int, float, timedelta] @@ -25,14 +26,14 @@ class WorkerCoroutine(Protocol): __qualname__: str - async def __call__(self, ctx: Dict[Any, Any], *args: Any, **kwargs: Any) -> Any: # pragma: no cover + async def __call__(self, ctx: dict[Any, Any], *args: Any, **kwargs: Any) -> Any: # pragma: no cover pass class StartupShutdown(Protocol): __qualname__: str - async def __call__(self, ctx: Dict[Any, Any]) -> Any: # pragma: no cover + async def __call__(self, ctx: dict[Any, Any]) -> Any: # pragma: no cover pass @@ -44,4 +45,4 @@ class WorkerSettingsBase(Protocol): # and many more... -WorkerSettingsType = Union[Dict[str, Any], Type[WorkerSettingsBase]] +WorkerSettingsType = Union[dict[str, Any], type[WorkerSettingsBase]] diff --git a/arq/utils.py b/arq/utils.py index 2cbde056..e4305e21 100644 --- a/arq/utils.py +++ b/arq/utils.py @@ -1,22 +1,25 @@ import asyncio import logging import os +from collections.abc import AsyncGenerator, Sequence from datetime import datetime, timedelta, timezone from functools import lru_cache from time import time -from typing import TYPE_CHECKING, Any, AsyncGenerator, Dict, Optional, Sequence, overload +from typing import TYPE_CHECKING, Any, Optional, overload from .constants import timezone_env_vars -try: - import pytz -except ImportError: # pragma: no cover - pytz = None # type: ignore - logger = logging.getLogger('arq.utils') if TYPE_CHECKING: + import pytz + from .typing import SecondsTimedelta +else: + try: + import pytz + except ImportError: # pragma: no cover + pytz = None # type: ignore def as_int(f: float) -> int: @@ -121,7 +124,7 @@ def truncate(s: str, length: int = DEFAULT_CURTAIL) -> str: return s -def args_to_string(args: Sequence[Any], kwargs: Dict[str, Any]) -> str: +def args_to_string(args: Sequence[Any], kwargs: dict[str, Any]) -> str: arguments = '' if args: arguments = ', '.join(map(repr, args)) diff --git a/arq/worker.py b/arq/worker.py index f1e613c9..2550369a 100644 --- a/arq/worker.py +++ b/arq/worker.py @@ -3,12 +3,13 @@ import inspect import logging import signal +from collections.abc import Sequence from dataclasses import dataclass from datetime import datetime, timedelta, timezone from functools import partial from signal import Signals from time import time -from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Sequence, Set, Tuple, Union, cast +from typing import TYPE_CHECKING, Any, Callable, Optional, Union, cast from redis.exceptions import ResponseError, WatchError @@ -81,7 +82,7 @@ def func( if isinstance(coroutine, str): name = name or coroutine - coroutine_: 'WorkerCoroutine' = import_string(coroutine) + coroutine_: WorkerCoroutine = import_string(coroutine) else: coroutine_ = coroutine @@ -118,7 +119,7 @@ def __eq__(self, other: Any) -> bool: class FailedJobs(RuntimeError): - def __init__(self, count: int, job_results: List[JobResult]): + def __init__(self, count: int, job_results: list[JobResult]): self.count = count self.job_results = job_results @@ -208,7 +209,7 @@ def __init__( max_tries: int = 5, health_check_interval: 'SecondsTimedelta' = 3600, health_check_key: Optional[str] = None, - ctx: Optional[Dict[Any, Any]] = None, + ctx: Optional[dict[Any, Any]] = None, retry_jobs: bool = True, allow_abort_jobs: bool = False, max_burst_jobs: int = -1, @@ -218,14 +219,14 @@ def __init__( timezone: Optional[timezone] = None, log_results: bool = True, ): - self.functions: Dict[str, Union[Function, CronJob]] = {f.name: f for f in map(func, functions)} + self.functions: dict[str, Union[Function, CronJob]] = {f.name: f for f in map(func, functions)} if queue_name is None: if redis_pool is not None: queue_name = redis_pool.default_queue_name else: raise ValueError('If queue_name is absent, redis_pool must be present.') self.queue_name = queue_name - self.cron_jobs: List[CronJob] = [] + self.cron_jobs: list[CronJob] = [] if cron_jobs is not None: if not all(isinstance(cj, CronJob) for cj in cron_jobs): raise RuntimeError('cron_jobs, must be instances of CronJob') @@ -262,9 +263,9 @@ def __init__( else: self.redis_settings = None # self.tasks holds references to run_job coroutines currently running - self.tasks: Dict[str, asyncio.Task[Any]] = {} + self.tasks: dict[str, asyncio.Task[Any]] = {} # self.job_tasks holds references the actual jobs running - self.job_tasks: Dict[str, asyncio.Task[Any]] = {} + self.job_tasks: dict[str, asyncio.Task[Any]] = {} self.main_task: Optional[asyncio.Task[None]] = None self.loop = asyncio.get_event_loop() self.ctx = ctx or {} @@ -289,7 +290,7 @@ def __init__( self.retry_jobs = retry_jobs self.allow_abort_jobs = allow_abort_jobs self.allow_pick_jobs: bool = True - self.aborting_tasks: Set[str] = set() + self.aborting_tasks: set[str] = set() self.max_burst_jobs = max_burst_jobs self.job_serializer = job_serializer self.job_deserializer = job_deserializer @@ -409,7 +410,7 @@ async def _cancel_aborted_jobs(self) -> None: pipe.zremrangebyscore(abort_jobs_ss, min=timestamp_ms() + abort_job_max_age, max=float('inf')) abort_job_ids, _ = await pipe.execute() - aborted: Set[str] = set() + aborted: set[str] = set() for job_id_bytes in abort_job_ids: job_id = job_id_bytes.decode() try: @@ -428,7 +429,7 @@ def _release_sem_dec_counter_on_complete(self) -> None: self.job_counter = self.job_counter - 1 self.sem.release() - async def start_jobs(self, job_ids: List[bytes]) -> None: + async def start_jobs(self, job_ids: list[bytes]) -> None: """ For each job id, get the job definition, check it's not running and start it in a task """ @@ -484,8 +485,8 @@ async def run_job(self, job_id: str, score: int) -> None: # noqa: C901 abort_job = False function_name, enqueue_time_ms = '', 0 - args: Tuple[Any, ...] = () - kwargs: Dict[Any, Any] = {} + args: tuple[Any, ...] = () + kwargs: dict[Any, Any] = {} async def job_failed(exc: BaseException) -> None: self.jobs_failed += 1 @@ -879,7 +880,7 @@ def __repr__(self) -> str: ) -def get_kwargs(settings_cls: 'WorkerSettingsType') -> Dict[str, NameError]: +def get_kwargs(settings_cls: 'WorkerSettingsType') -> dict[str, NameError]: worker_args = set(inspect.signature(Worker).parameters.keys()) d = settings_cls if isinstance(settings_cls, dict) else settings_cls.__dict__ return {k: v for k, v in d.items() if k in worker_args} From 6f7b36db824a86e503a9a85c1732d1dbc9b34276 Mon Sep 17 00:00:00 2001 From: Viicos <65306057+Viicos@users.noreply.github.com> Date: Thu, 29 Jan 2026 21:27:25 +0100 Subject: [PATCH 3/4] Format --- arq/connections.py | 5 +---- tests/test_worker.py | 2 +- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/arq/connections.py b/arq/connections.py index a3a3026b..96e55bd8 100644 --- a/arq/connections.py +++ b/arq/connections.py @@ -312,8 +312,5 @@ async def log_redis_info(redis: 'Redis[bytes]', log_func: Callable[[str], Any]) clients_connected = info_clients.get('connected_clients', '?') log_func( - f'redis_version={redis_version} ' - f'mem_usage={mem_usage} ' - f'clients_connected={clients_connected} ' - f'db_keys={key_count}' + f'redis_version={redis_version} mem_usage={mem_usage} clients_connected={clients_connected} db_keys={key_count}' ) diff --git a/tests/test_worker.py b/tests/test_worker.py index 93fbc7f0..98b00ac7 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -421,7 +421,7 @@ async def test_job_old(arq_redis: ArqRedis, worker, caplog): assert worker.jobs_retried == 0 log = re.sub(r'(\d+).\d\ds', r'\1.XXs', '\n'.join(r.message for r in caplog.records)) - assert log.endswith(' 0.XXs → testing:foobar() delayed=2.XXs\n' ' 0.XXs ← testing:foobar ● 42') + assert log.endswith(' 0.XXs → testing:foobar() delayed=2.XXs\n 0.XXs ← testing:foobar ● 42') async def test_retry_repr(): From 71628170c8c14825f04eb9c5be49b910875ceb01 Mon Sep 17 00:00:00 2001 From: Viicos <65306057+Viicos@users.noreply.github.com> Date: Thu, 29 Jan 2026 21:29:49 +0100 Subject: [PATCH 4/4] Missing lint --- tests/conftest.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/conftest.py b/tests/conftest.py index 9b6b7f5b..da8bbe3a 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -2,7 +2,7 @@ import functools import os import sys -from typing import Generator +from collections.abc import Generator import msgpack import pytest