From 4ab9f66edb8787b081227eb0246a076c99ee4c56 Mon Sep 17 00:00:00 2001 From: Eric Hare Date: Fri, 27 Feb 2026 09:31:49 -0800 Subject: [PATCH] feat: Modernize the async routines --- cassandra/cluster.py | 31 +++++++++++++++------------ cassandra/io/asyncioreactor.py | 27 ++++++----------------- tests/__init__.py | 2 +- tests/unit/io/test_asyncioreactor.py | 32 ++++++++++++++++++++++++---- 4 files changed, 52 insertions(+), 40 deletions(-) diff --git a/cassandra/cluster.py b/cassandra/cluster.py index 6b2ab4b288..5da01dbc6c 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -149,6 +149,13 @@ def _try_libev_import(): except DependencyException as e: return (None, e) +def _try_asyncio_import(): + try: + from cassandra.io.asyncioreactor import AsyncioConnection + return (AsyncioConnection, None) + except (ImportError, DependencyException) as e: + return (None, e) + def _try_asyncore_import(): try: from cassandra.io.asyncorereactor import AsyncoreConnection @@ -168,7 +175,7 @@ def _connection_reduce_fn(val,import_fn): log = logging.getLogger(__name__) -conn_fns = (_try_gevent_import, _try_eventlet_import, _try_libev_import, _try_asyncore_import) +conn_fns = (_try_gevent_import, _try_eventlet_import, _try_libev_import, _try_asyncio_import, _try_asyncore_import) (conn_class, excs) = reduce(_connection_reduce_fn, conn_fns, (None,[])) if not conn_class: raise DependencyException("Unable to load a default connection class", excs) @@ -876,25 +883,21 @@ def default_retry_policy(self, policy): This determines what event loop system will be used for managing I/O with Cassandra. These are the current options: - * :class:`cassandra.io.asyncorereactor.AsyncoreConnection` * :class:`cassandra.io.libevreactor.LibevConnection` + * :class:`cassandra.io.asyncioreactor.AsyncioConnection` + * :class:`cassandra.io.asyncorereactor.AsyncoreConnection` (Python < 3.12 only) * :class:`cassandra.io.eventletreactor.EventletConnection` (requires monkey-patching - see doc for details) * :class:`cassandra.io.geventreactor.GeventConnection` (requires monkey-patching - see doc for details) * :class:`cassandra.io.twistedreactor.TwistedConnection` - * EXPERIMENTAL: :class:`cassandra.io.asyncioreactor.AsyncioConnection` - - By default, ``AsyncoreConnection`` will be used, which uses - the ``asyncore`` module in the Python standard library. - - If ``libev`` is installed, ``LibevConnection`` will be used instead. - If ``gevent`` or ``eventlet`` monkey-patching is detected, the corresponding - connection class will be used automatically. + The default is selected automatically using the following priority: - ``AsyncioConnection``, which uses the ``asyncio`` module in the Python - standard library, is also available, but currently experimental. Note that - it requires ``asyncio`` features that were only introduced in the 3.4 line - in 3.4.6, and in the 3.5 line in 3.5.1. + 1. If ``gevent`` or ``eventlet`` monkey-patching is detected, the + corresponding connection class will be used. + 2. If the ``libev`` C extension is available, ``LibevConnection`` is used. + 3. ``AsyncioConnection`` is used as the standard-library fallback. This is + the preferred default on Python 3.12+ where ``asyncore`` was removed. + 4. On Python < 3.12, ``AsyncoreConnection`` is used as a last resort. """ control_connection_timeout = 2.0 diff --git a/cassandra/io/asyncioreactor.py b/cassandra/io/asyncioreactor.py index 95f92e26e0..e1c4d4ec3c 100644 --- a/cassandra/io/asyncioreactor.py +++ b/cassandra/io/asyncioreactor.py @@ -11,21 +11,6 @@ log = logging.getLogger(__name__) -# This module uses ``yield from`` and ``@asyncio.coroutine`` over ``await`` and -# ``async def`` for pre-Python-3.5 compatibility, so keep in mind that the -# managed coroutines are generator-based, not native coroutines. See PEP 492: -# https://www.python.org/dev/peps/pep-0492/#coroutine-objects - - -try: - asyncio.run_coroutine_threadsafe -except AttributeError: - raise ImportError( - 'Cannot use asyncioreactor without access to ' - 'asyncio.run_coroutine_threadsafe (added in 3.4.6 and 3.5.1)' - ) - - class AsyncioTimer(object): """ An ``asyncioreactor``-specific Timer. Similar to :class:`.connection.Timer, @@ -67,11 +52,12 @@ def finish(self): class AsyncioConnection(Connection): """ - An experimental implementation of :class:`.Connection` that uses the - ``asyncio`` module in the Python standard library for its event loop. + An implementation of :class:`.Connection` that uses the ``asyncio`` + module in the Python standard library for its event loop. - Note that it requires ``asyncio`` features that were only introduced in the - 3.4 line in 3.4.6, and in the 3.5 line in 3.5.1. + This is the preferred connection class on Python 3.12+ where the + ``asyncore`` module has been removed. It is also used as a fallback + when the libev C extension is not available. """ _loop = None @@ -109,7 +95,6 @@ def initialize_reactor(cls): cls._loop = None if cls._loop is None: cls._loop = asyncio.new_event_loop() - asyncio.set_event_loop(cls._loop) if not cls._loop_thread: # daemonize so the loop will be shut down on interpreter @@ -173,7 +158,7 @@ def push(self, data): async def _push_msg(self, chunks): # This lock ensures all chunks of a message are sequential in the Queue - with await self._write_queue_lock: + async with self._write_queue_lock: for chunk in chunks: self._write_queue.put_nowait(chunk) diff --git a/tests/__init__.py b/tests/__init__.py index 7799b51399..8388059826 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -54,7 +54,7 @@ def is_monkey_patched(): return is_gevent_monkey_patched() or is_eventlet_monkey_patched() MONKEY_PATCH_LOOP = bool(os.getenv('MONKEY_PATCH_LOOP', False)) -EVENT_LOOP_MANAGER = os.getenv('EVENT_LOOP_MANAGER', "libev") +EVENT_LOOP_MANAGER = os.getenv('EVENT_LOOP_MANAGER', "asyncio") # If set to to true this will force the Cython tests to run regardless of whether they are installed diff --git a/tests/unit/io/test_asyncioreactor.py b/tests/unit/io/test_asyncioreactor.py index 65708d41dc..0886102dbf 100644 --- a/tests/unit/io/test_asyncioreactor.py +++ b/tests/unit/io/test_asyncioreactor.py @@ -1,16 +1,15 @@ AsyncioConnection, ASYNCIO_AVAILABLE = None, False try: from cassandra.io.asyncioreactor import AsyncioConnection - import asynctest ASYNCIO_AVAILABLE = True except (ImportError, SyntaxError): AsyncioConnection = None ASYNCIO_AVAILABLE = False from tests import is_monkey_patched, connection_class -from tests.unit.io.utils import TimerCallback, TimerTestMixin +from tests.unit.io.utils import TimerCallback, TimerTestMixin, submit_and_wait_for_completion -from unittest.mock import patch +from unittest.mock import patch, MagicMock import unittest import time @@ -56,7 +55,7 @@ def setUp(self): socket_patcher.start() old_selector = AsyncioConnection._loop._selector - AsyncioConnection._loop._selector = asynctest.TestSelector() + AsyncioConnection._loop._selector = MagicMock() def reset_selector(): AsyncioConnection._loop._selector = old_selector @@ -65,6 +64,31 @@ def reset_selector(): super(AsyncioTimerTests, self).setUp() + def test_multi_timer_validation(self): + """ + Override with a wider tolerance for asyncio's thread-based scheduling, + which has inherently more jitter than libev's native event loop. + """ + from tests.unit.io.utils import get_timeout + pending_callbacks = [] + completed_callbacks = [] + + for gross_time in range(0, 100, 1): + timeout = get_timeout(gross_time, 0, 100, 100, False) + callback = TimerCallback(timeout) + self.create_timer(timeout, callback.invoke) + pending_callbacks.append(callback) + + while len(pending_callbacks) != 0: + for callback in pending_callbacks: + if callback.was_invoked(): + pending_callbacks.remove(callback) + completed_callbacks.append(callback) + time.sleep(.1) + + for callback in completed_callbacks: + self.assertAlmostEqual(callback.expected_wait, callback.get_wait_time(), delta=.25) + def test_timer_cancellation(self): # Various lists for tracking callback stage timeout = .1