From 49875a6584f8b969e160237572c2a0a1f193299b Mon Sep 17 00:00:00 2001 From: Ilya Volnistov Date: Tue, 25 Mar 2025 17:26:18 +0100 Subject: [PATCH 1/2] fix: log configuring when running under spawn and add --log-format Fixed issue with logging configuration when using multiprocessing 'spawn' method on macOS. The spawn method reimports all modules which caused logging configuration to be lost. Added `--log-format` argument to easily customize logging format across all processes, including spawned child processes. --- taskiq/cli/worker/args.py | 6 ++++++ taskiq/cli/worker/run.py | 11 +++++++---- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/taskiq/cli/worker/args.py b/taskiq/cli/worker/args.py index bb50300b..09fad6d0 100644 --- a/taskiq/cli/worker/args.py +++ b/taskiq/cli/worker/args.py @@ -30,6 +30,7 @@ class WorkerArgs: fs_discover: bool = False configure_logging: bool = True log_level: LogLevel = LogLevel.INFO + log_format: str = "[%(asctime)s][%(name)s][%(levelname)-7s][%(processName)s] %(message)s" workers: int = 2 max_threadpool_threads: Optional[int] = None max_process_pool_processes: Optional[int] = None @@ -118,6 +119,11 @@ def from_cli( choices=[level.name for level in LogLevel], help="worker log level", ) + parser.add_argument( + "--log-format", + default="[%(asctime)s][%(name)s][%(levelname)-7s][%(processName)s] %(message)s", + help="worker log format", + ) parser.add_argument( "--workers", "-w", diff --git a/taskiq/cli/worker/run.py b/taskiq/cli/worker/run.py index 4b5f2d57..88bf5837 100644 --- a/taskiq/cli/worker/run.py +++ b/taskiq/cli/worker/run.py @@ -5,7 +5,7 @@ import signal import sys from concurrent.futures import Executor, ProcessPoolExecutor, ThreadPoolExecutor -from multiprocessing import set_start_method +from multiprocessing import set_start_method, get_start_method from sys import platform from typing import Any, Optional, Type @@ -86,7 +86,11 @@ def start_listen(args: WorkerArgs) -> None: """ shutdown_event = asyncio.Event() hardkill_counter = 0 - + if args.configure_logging and get_start_method() == "spawn": + logging.basicConfig( + level=logging.getLevelName(args.log_level), + format=args.log_format, + ) def interrupt_handler(signum: int, _frame: Any) -> None: """ Signal handler. @@ -186,8 +190,7 @@ def run_worker(args: WorkerArgs) -> Optional[int]: if args.configure_logging: logging.basicConfig( level=logging.getLevelName(args.log_level), - format="[%(asctime)s][%(name)s][%(levelname)-7s]" - "[%(processName)s] %(message)s", + format=args.log_format, ) logging.getLogger("taskiq").setLevel(level=logging.getLevelName(args.log_level)) logging.getLogger("watchdog.observers.inotify_buffer").setLevel(level=logging.INFO) From af5439bd93db04aa884ecec345d49aae7175fb3f Mon Sep 17 00:00:00 2001 From: Ilya Volnistov Date: Tue, 25 Mar 2025 17:52:52 +0100 Subject: [PATCH 2/2] fix linting and formatting --- taskiq/cli/worker/args.py | 7 +++++-- taskiq/cli/worker/run.py | 3 ++- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/taskiq/cli/worker/args.py b/taskiq/cli/worker/args.py index 09fad6d0..06b7fa2a 100644 --- a/taskiq/cli/worker/args.py +++ b/taskiq/cli/worker/args.py @@ -30,7 +30,9 @@ class WorkerArgs: fs_discover: bool = False configure_logging: bool = True log_level: LogLevel = LogLevel.INFO - log_format: str = "[%(asctime)s][%(name)s][%(levelname)-7s][%(processName)s] %(message)s" + log_format: str = ( + "[%(asctime)s][%(name)s][%(levelname)-7s][%(processName)s] %(message)s" + ) workers: int = 2 max_threadpool_threads: Optional[int] = None max_process_pool_processes: Optional[int] = None @@ -121,7 +123,8 @@ def from_cli( ) parser.add_argument( "--log-format", - default="[%(asctime)s][%(name)s][%(levelname)-7s][%(processName)s] %(message)s", + default="[%(asctime)s][%(name)s][%(levelname)-7s]" + "[%(processName)s] %(message)s", help="worker log format", ) parser.add_argument( diff --git a/taskiq/cli/worker/run.py b/taskiq/cli/worker/run.py index 88bf5837..3d58fedf 100644 --- a/taskiq/cli/worker/run.py +++ b/taskiq/cli/worker/run.py @@ -5,7 +5,7 @@ import signal import sys from concurrent.futures import Executor, ProcessPoolExecutor, ThreadPoolExecutor -from multiprocessing import set_start_method, get_start_method +from multiprocessing import get_start_method, set_start_method from sys import platform from typing import Any, Optional, Type @@ -91,6 +91,7 @@ def start_listen(args: WorkerArgs) -> None: level=logging.getLevelName(args.log_level), format=args.log_format, ) + def interrupt_handler(signum: int, _frame: Any) -> None: """ Signal handler.