From 30ddcdca88fe581b7775a65367e208035de89703 Mon Sep 17 00:00:00 2001 From: Ernest Provo Date: Sun, 22 Feb 2026 10:41:08 -0500 Subject: [PATCH 1/2] Truncate large coro repr in retry log output When retry() falls back to str(coro) for the log message, truncate the representation to 200 characters to prevent excessively large logs. This was observed in P2P shuffles where functools.partial repr includes serialized binary data. Closes #8529 --- distributed/tests/test_utils_comm.py | 44 ++++++++++++++++++++++++++++ distributed/utils_comm.py | 5 +++- 2 files changed, 48 insertions(+), 1 deletion(-) diff --git a/distributed/tests/test_utils_comm.py b/distributed/tests/test_utils_comm.py index 36f5fca77e..1d02d4e209 100644 --- a/distributed/tests/test_utils_comm.py +++ b/distributed/tests/test_utils_comm.py @@ -229,6 +229,50 @@ async def f(): assert sleep_calls == [0.0, 1.0, 3.0, 6.0, 6.0] +def test_retry_truncates_large_coro_repr(cleanup): + """Test that retry truncates excessively large string representations of coro.""" + + class MyEx(Exception): + pass + + class LargeReprCallable: + def __repr__(self): + return "x" * 500 + + async def __call__(self): + raise MyEx("fail") + + log_messages = [] + + async def f(): + return await retry( + LargeReprCallable(), + retry_on_exceptions=(MyEx,), + count=1, + delay_min=0, + delay_max=0, + jitter_fraction=0, + ) + + import logging + + handler = logging.Handler() + handler.emit = lambda record: log_messages.append(record.getMessage()) + + logger = logging.getLogger("distributed.utils_comm") + logger.addHandler(handler) + try: + with pytest.raises(MyEx): + asyncio_run(f(), loop_factory=get_loop_factory()) + finally: + logger.removeHandler(handler) + + assert len(log_messages) == 1 + # The 500-char repr should be truncated to 200 + "..." + assert len(log_messages[0]) < 500 + assert "..." in log_messages[0] + + def test_unpack_remotedata(): def assert_eq(keys1: set[TaskRef], keys2: set[TaskRef]) -> None: if len(keys1) != len(keys2): diff --git a/distributed/utils_comm.py b/distributed/utils_comm.py index 92679b13ad..b1b4320877 100644 --- a/distributed/utils_comm.py +++ b/distributed/utils_comm.py @@ -384,7 +384,10 @@ async def retry( try: return await coro() except retry_on_exceptions as ex: - operation = operation or str(coro) + if not operation: + operation = str(coro) + if len(operation) > 200: + operation = operation[:200] + "..." logger.info( f"Retrying {operation} after exception in attempt {i_try}/{count}: {ex}" ) From 5cb36b31d7bb702b8f413bf919f4383b2a6b5b63 Mon Sep 17 00:00:00 2001 From: Ernest Provo Date: Fri, 6 Mar 2026 19:22:39 -0500 Subject: [PATCH 2/2] Use reprlib for truncating large coro repr in retry logs Replace manual str[:200] + "..." truncation with stdlib reprlib.Repr() per reviewer feedback. reprlib handles truncation consistently and is a well-known stdlib pattern for this use case. --- distributed/tests/test_utils_comm.py | 5 ++++- distributed/utils_comm.py | 7 ++++--- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/distributed/tests/test_utils_comm.py b/distributed/tests/test_utils_comm.py index 1d02d4e209..0cf4235c5f 100644 --- a/distributed/tests/test_utils_comm.py +++ b/distributed/tests/test_utils_comm.py @@ -268,9 +268,12 @@ async def f(): logger.removeHandler(handler) assert len(log_messages) == 1 - # The 500-char repr should be truncated to 200 + "..." + # reprlib truncates the 500-char repr to maxother (200) chars assert len(log_messages[0]) < 500 + # reprlib uses "..." to indicate truncation assert "..." in log_messages[0] + # Verify the full 500-char repr is NOT present + assert "x" * 500 not in log_messages[0] def test_unpack_remotedata(): diff --git a/distributed/utils_comm.py b/distributed/utils_comm.py index b1b4320877..043ab04caf 100644 --- a/distributed/utils_comm.py +++ b/distributed/utils_comm.py @@ -3,6 +3,7 @@ import asyncio import logging import random +import reprlib from collections import defaultdict from collections.abc import Callable, Collection, Coroutine, Mapping from functools import partial @@ -385,9 +386,9 @@ async def retry( return await coro() except retry_on_exceptions as ex: if not operation: - operation = str(coro) - if len(operation) > 200: - operation = operation[:200] + "..." + aRepr = reprlib.Repr() + aRepr.maxother = 200 + operation = aRepr.repr(coro) logger.info( f"Retrying {operation} after exception in attempt {i_try}/{count}: {ex}" )