From a42605482565157d70efc302e74d5064d9be5cb3 Mon Sep 17 00:00:00 2001 From: Dmitry Kropachev Date: Sun, 22 Feb 2026 17:33:37 -0400 Subject: [PATCH] Fix RecursionError in execute_concurrent on synchronous errbacks When a ResponseFuture already has _final_exception set by the time add_callbacks() is called (e.g. the request timeout expired synchronously inside send_request()), the errback fires inline creating an unbounded recursion: _execute -> add_callbacks -> _on_error -> _put_result -> _execute_next -> _execute -> ... The existing _exec_depth guard only protected the except-Exception path (when execute_async itself raises), not this synchronous-callback path. Replace the depth counter with a re-entrancy guard: when _execute is re-entered from a synchronous callback, the work is appended to a pending list and drained iteratively by the outermost invocation. Fixes: #712 --- cassandra/concurrent.py | 46 +++++++++++++++++++------------- tests/unit/test_concurrent.py | 49 +++++++++++++++++++++++++++++++++++ 2 files changed, 77 insertions(+), 18 deletions(-) diff --git a/cassandra/concurrent.py b/cassandra/concurrent.py index d6345ca452..b96d0b12d4 100644 --- a/cassandra/concurrent.py +++ b/cassandra/concurrent.py @@ -92,8 +92,6 @@ def execute_concurrent(session, statements_and_parameters, concurrency=100, rais class _ConcurrentExecutor(object): - max_error_recursion = 100 - def __init__(self, session, statements_and_params, execution_profile): self.session = session self._enum_statements = enumerate(iter(statements_and_params)) @@ -103,7 +101,7 @@ def __init__(self, session, statements_and_params, execution_profile): self._results_queue = [] self._current = 0 self._exec_count = 0 - self._exec_depth = 0 + self._executing = False def execute(self, concurrency, fail_fast): self._fail_fast = fail_fast @@ -127,22 +125,34 @@ def _execute_next(self): pass def _execute(self, idx, statement, params): - self._exec_depth += 1 + # When execute_async completes synchronously (e.g. immediate timeout), + # the errback fires inline: _on_error -> _put_result -> _execute_next + # -> _execute. Without protection this recurses once per remaining + # statement and blows the stack. + # + # ``_executing`` marks that we are already inside this method higher up + # the call stack. When a synchronous callback re-enters, we just stash + # the pending work in ``_pending_executions`` and let the outermost + # invocation drain it in a loop -- no recursion. + if self._executing: + self._pending_executions.append((idx, statement, params)) + return + + self._executing = True + self._pending_executions = [(idx, statement, params)] try: - future = self.session.execute_async(statement, params, timeout=None, execution_profile=self._execution_profile) - args = (future, idx) - future.add_callbacks( - callback=self._on_success, callback_args=args, - errback=self._on_error, errback_args=args) - except Exception as exc: - # If we're not failing fast and all executions are raising, there is a chance of recursing - # here as subsequent requests are attempted. If we hit this threshold, schedule this result/retry - # and let the event loop thread return. - if self._exec_depth < self.max_error_recursion: - self._put_result(exc, idx, False) - else: - self.session.submit(self._put_result, exc, idx, False) - self._exec_depth -= 1 + while self._pending_executions: + p_idx, p_statement, p_params = self._pending_executions.pop(0) + try: + future = self.session.execute_async(p_statement, p_params, timeout=None, execution_profile=self._execution_profile) + args = (future, p_idx) + future.add_callbacks( + callback=self._on_success, callback_args=args, + errback=self._on_error, errback_args=args) + except Exception as exc: + self._put_result(exc, p_idx, False) + finally: + self._executing = False def _on_success(self, result, future, idx): future.clear_callbacks() diff --git a/tests/unit/test_concurrent.py b/tests/unit/test_concurrent.py index 9c85b1ccac..d3888aa9de 100644 --- a/tests/unit/test_concurrent.py +++ b/tests/unit/test_concurrent.py @@ -258,3 +258,52 @@ def test_recursion_limited(self): for r in results: assert not r[0] assert isinstance(r[1], TypeError) + + def test_no_recursion_on_synchronous_errback(self): + """ + Verify that execute_concurrent does not blow the stack when every + future completes with an error *before* add_callbacks is called + (i.e. the errback fires synchronously inside add_callbacks). + + This exercises a different code path from test_recursion_limited: + that test covers execute_async raising an exception, while this one + covers execute_async returning a future whose errback fires inline. + """ + count = sys.getrecursionlimit() + error = Exception("immediate failure") + + class AlreadyFailedFuture: + """A future that already has _final_exception set.""" + _query_trace = None + _col_names = None + _col_types = None + has_more_pages = False + + def add_callback(self, fn, *args, **kwargs): + pass + + def add_errback(self, fn, *args, **kwargs): + # Fire errback synchronously, mimicking a future that + # completed before add_callbacks was called. + fn(error, *args, **kwargs) + + def add_callbacks(self, callback, errback, + callback_args=(), callback_kwargs=None, + errback_args=(), errback_kwargs=None): + self.add_callback(callback, *callback_args, **(callback_kwargs or {})) + self.add_errback(errback, *errback_args, **(errback_kwargs or {})) + + def clear_callbacks(self): + pass + + mock_session = Mock() + mock_session.execute_async.return_value = AlreadyFailedFuture() + + statements_and_params = [("SELECT 1", ())] * count + results = execute_concurrent(mock_session, statements_and_params, + raise_on_first_error=False) + + assert len(results) == count + for success, result in results: + assert not success + assert result is error