Skip to content

Commit cc0e55a

Browse files
committed
Add ProactorEventLoop.add_reader and friends
via background SelectorThread, imported from tornado
1 parent dabc739 commit cc0e55a

File tree

5 files changed

+92
-3
lines changed

5 files changed

+92
-3
lines changed

Doc/library/asyncio-eventloop.rst

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1051,6 +1051,9 @@ Watching file descriptors
10511051
See also :ref:`Platform Support <asyncio-platform-support>` section
10521052
for some limitations of these methods.
10531053

1054+
.. versionchanged:: 3.15
1055+
1056+
Added support for these methods to :class:`ProactorEventLoop`.
10541057

10551058
Working with socket objects directly
10561059
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Doc/library/asyncio-platforms.rst

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ due to the platforms' underlying architecture and capabilities.
1616
All Platforms
1717
=============
1818

19-
* :meth:`loop.add_reader` and :meth:`loop.add_writer`
19+
* :meth:`~asyncio.loop.add_reader` and :meth:`~asyncio.loop.add_writer`
2020
cannot be used to monitor file I/O.
2121

2222

@@ -59,15 +59,19 @@ All event loops on Windows do not support the following methods:
5959

6060
:class:`ProactorEventLoop` has the following limitations:
6161

62-
* The :meth:`loop.add_reader` and :meth:`loop.add_writer`
63-
methods are not supported.
62+
* :meth:`loop.add_reader` and :meth:`loop.add_writer` only accept
63+
socket handles (e.g. pipe file descriptors are not supported).
64+
When called, :func:`select.select` is run in an additional thread.
6465

6566
The resolution of the monotonic clock on Windows is usually around 15.6
6667
milliseconds. The best resolution is 0.5 milliseconds. The resolution depends on the
6768
hardware (availability of `HPET
6869
<https://en.wikipedia.org/wiki/High_Precision_Event_Timer>`_) and on the
6970
Windows configuration.
7071

72+
.. versionadded:: 3.15
73+
74+
Support for :meth:`loop.add_reader`, :meth:`loop.add_writer` added to :class:`ProactorEventLoop`.
7175

7276
.. _asyncio-windows-subprocess:
7377

Lib/asyncio/proactor_events.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
from . import transports
2424
from . import trsock
2525
from .log import logger
26+
from ._selector_thread import SelectorThread as _SelectorThread
2627

2728

2829
def _set_socket_extra(transport, sock):
@@ -633,6 +634,7 @@ def __init__(self, proactor):
633634
logger.debug('Using proactor: %s', proactor.__class__.__name__)
634635
self._proactor = proactor
635636
self._selector = proactor # convenient alias
637+
self._selector_thread = None
636638
self._self_reading_future = None
637639
self._accept_futures = {} # socket file descriptor => Future
638640
proactor.set_loop(self)
@@ -641,6 +643,17 @@ def __init__(self, proactor):
641643
# wakeup fd can only be installed to a file descriptor from the main thread
642644
signal.set_wakeup_fd(self._csock.fileno())
643645

646+
def _get_selector_thread(self):
647+
"""Return the SelectorThread.
648+
649+
creating it on first request,
650+
so no thread is created until/unless
651+
the first call to `add_reader` and friends.
652+
"""
653+
if self._selector_thread is None:
654+
self._selector_thread = _SelectorThread(self)
655+
return self._selector_thread
656+
644657
def _make_socket_transport(self, sock, protocol, waiter=None,
645658
extra=None, server=None):
646659
return _ProactorSocketTransport(self, sock, protocol, waiter,
@@ -697,10 +710,25 @@ def close(self):
697710
self._proactor.close()
698711
self._proactor = None
699712
self._selector = None
713+
if self._selector_thread is not None:
714+
self._selector_thread.close()
715+
self._selector_thread = None
700716

701717
# Close the event loop
702718
super().close()
703719

720+
def add_reader(self, fd, callback, *args):
721+
return self._get_selector_thread().add_reader(fd, callback, *args)
722+
723+
def remove_reader(self, fd):
724+
return self._get_selector_thread().remove_reader(fd)
725+
726+
def add_writer(self, fd, callback, *args):
727+
return self._get_selector_thread().add_writer(fd, callback, *args)
728+
729+
def remove_writer(self, fd):
730+
return self._get_selector_thread().remove_writer(fd)
731+
704732
async def sock_recv(self, sock, n):
705733
return await self._proactor.recv(sock, n)
706734

Lib/test/test_asyncio/test_windows_events.py

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -323,6 +323,58 @@ def threadMain():
323323
stop.set()
324324
thr.join()
325325

326+
def test_add_reader_invalid_argument(self):
327+
def assert_raises():
328+
return self.assertRaisesRegex(ValueError, r'Invalid file object')
329+
330+
def cb(sock):
331+
return None
332+
333+
with assert_raises():
334+
self.loop.add_reader(object(), cb)
335+
with assert_raises():
336+
self.loop.add_writer(object(), cb)
337+
338+
with assert_raises():
339+
self.loop.remove_reader(object())
340+
with assert_raises():
341+
self.loop.remove_writer(object())
342+
343+
def test_selector_thread(self):
344+
assert self.loop._selector_thread is None
345+
a, b = socket.socketpair()
346+
async def _test():
347+
read_future = asyncio.Future()
348+
sent = b"asdf"
349+
350+
def write():
351+
a.sendall(sent)
352+
self.loop.remove_writer(a)
353+
354+
def read():
355+
msg = b.recv(100)
356+
read_future.set_result(msg)
357+
358+
self.loop.add_reader(b, read)
359+
_selector_thread = self.loop._selector_thread
360+
assert b in _selector_thread._readers
361+
assert _selector_thread is not None
362+
self.loop.add_writer(a, write)
363+
assert self.loop._selector_thread is _selector_thread
364+
assert a in _selector_thread._writers
365+
msg = await asyncio.wait_for(read_future, timeout=10)
366+
367+
self.loop.remove_writer(a)
368+
assert a not in _selector_thread._writers
369+
self.loop.remove_reader(b)
370+
assert b not in _selector_thread._readers
371+
a.close()
372+
b.close()
373+
assert self.loop._selector_thread is _selector_thread
374+
assert msg == sent
375+
376+
self.loop.run_until_complete(_test())
377+
326378

327379
class WinPolicyTests(WindowsEventsTestCase):
328380

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
Added support for :meth:`~asyncio.loop.add_reader` to :class:`~asyncio.ProactorEventLoop` on Windows by
2+
running :func:`select.select` in a background thread.

0 commit comments

Comments
 (0)