Skip to content

Commit 7cfacc4

Browse files
committed
import tornado selector thread
v6.5.1 License: Apache-2.0 Copyright (c) 2025 The Tornado Authors unmodified from original, so we can track changes
1 parent 2d50dd2 commit 7cfacc4

File tree

1 file changed

+302
-0
lines changed

1 file changed

+302
-0
lines changed

Lib/asyncio/_selector_thread.py

Lines changed: 302 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,302 @@
1+
"""
2+
Compatibility for [add|remove]_[reader|writer] where unavailable (Proactor).
3+
4+
Runs select in a background thread.
5+
6+
Adapted from Tornado 6.5.1
7+
8+
:copyright: 2025, The Tornado Authors
9+
:license: Apache-2.0
10+
"""
11+
12+
import asyncio
13+
import atexit
14+
import contextvars
15+
import errno
16+
import functools
17+
import select
18+
import socket
19+
import threading
20+
import typing
21+
22+
from typing import (
23+
Any,
24+
Callable,
25+
Dict,
26+
List,
27+
Optional,
28+
Protocol,
29+
Set,
30+
Tuple,
31+
TypeVar,
32+
Union,
33+
)
34+
35+
if typing.TYPE_CHECKING:
36+
from typing_extensions import TypeVarTuple, Unpack
37+
38+
39+
class _HasFileno(Protocol):
40+
def fileno(self) -> int:
41+
pass
42+
43+
44+
_FileDescriptorLike = Union[int, _HasFileno]
45+
46+
_T = TypeVar("_T")
47+
48+
if typing.TYPE_CHECKING:
49+
_Ts = TypeVarTuple("_Ts")
50+
51+
# Collection of selector thread event loops to shut down on exit.
52+
_selector_loops: Set["SelectorThread"] = set()
53+
54+
55+
def _atexit_callback() -> None:
56+
for loop in _selector_loops:
57+
with loop._select_cond:
58+
loop._closing_selector = True
59+
loop._select_cond.notify()
60+
try:
61+
loop._waker_w.send(b"a")
62+
except BlockingIOError:
63+
pass
64+
if loop._thread is not None:
65+
# If we don't join our (daemon) thread here, we may get a deadlock
66+
# during interpreter shutdown. I don't really understand why. This
67+
# deadlock happens every time in CI (both travis and appveyor) but
68+
# I've never been able to reproduce locally.
69+
loop._thread.join()
70+
_selector_loops.clear()
71+
72+
73+
atexit.register(_atexit_callback)
74+
75+
76+
class SelectorThread:
77+
"""Define ``add_reader`` methods to be called in a background select thread.
78+
79+
Instances of this class start a second thread to run a selector.
80+
This thread is completely hidden from the user;
81+
all callbacks are run on the wrapped event loop's thread.
82+
83+
Typically used via ``AddThreadSelectorEventLoop``,
84+
but can be attached to a running asyncio loop.
85+
"""
86+
87+
_closed = False
88+
89+
def __init__(self, real_loop: asyncio.AbstractEventLoop) -> None:
90+
self._main_thread_ctx = contextvars.copy_context()
91+
92+
self._real_loop = real_loop
93+
94+
self._select_cond = threading.Condition()
95+
self._select_args: Optional[
96+
Tuple[List[_FileDescriptorLike], List[_FileDescriptorLike]]
97+
] = None
98+
self._closing_selector = False
99+
self._thread: Optional[threading.Thread] = None
100+
self._thread_manager_handle = self._thread_manager()
101+
102+
async def thread_manager_anext() -> None:
103+
# the anext builtin wasn't added until 3.10. We just need to iterate
104+
# this generator one step.
105+
await self._thread_manager_handle.__anext__()
106+
107+
# When the loop starts, start the thread. Not too soon because we can't
108+
# clean up if we get to this point but the event loop is closed without
109+
# starting.
110+
self._real_loop.call_soon(
111+
lambda: self._real_loop.create_task(thread_manager_anext()),
112+
context=self._main_thread_ctx,
113+
)
114+
115+
self._readers: Dict[_FileDescriptorLike, Callable] = {}
116+
self._writers: Dict[_FileDescriptorLike, Callable] = {}
117+
118+
# Writing to _waker_w will wake up the selector thread, which
119+
# watches for _waker_r to be readable.
120+
self._waker_r, self._waker_w = socket.socketpair()
121+
self._waker_r.setblocking(False)
122+
self._waker_w.setblocking(False)
123+
_selector_loops.add(self)
124+
self.add_reader(self._waker_r, self._consume_waker)
125+
126+
def close(self) -> None:
127+
if self._closed:
128+
return
129+
with self._select_cond:
130+
self._closing_selector = True
131+
self._select_cond.notify()
132+
self._wake_selector()
133+
if self._thread is not None:
134+
self._thread.join()
135+
_selector_loops.discard(self)
136+
self.remove_reader(self._waker_r)
137+
self._waker_r.close()
138+
self._waker_w.close()
139+
self._closed = True
140+
141+
async def _thread_manager(self) -> typing.AsyncGenerator[None, None]:
142+
# Create a thread to run the select system call. We manage this thread
143+
# manually so we can trigger a clean shutdown from an atexit hook. Note
144+
# that due to the order of operations at shutdown, only daemon threads
145+
# can be shut down in this way (non-daemon threads would require the
146+
# introduction of a new hook: https://bugs.python.org/issue41962)
147+
self._thread = threading.Thread(
148+
name="Tornado selector",
149+
daemon=True,
150+
target=self._run_select,
151+
)
152+
self._thread.start()
153+
self._start_select()
154+
try:
155+
# The presense of this yield statement means that this coroutine
156+
# is actually an asynchronous generator, which has a special
157+
# shutdown protocol. We wait at this yield point until the
158+
# event loop's shutdown_asyncgens method is called, at which point
159+
# we will get a GeneratorExit exception and can shut down the
160+
# selector thread.
161+
yield
162+
except GeneratorExit:
163+
self.close()
164+
raise
165+
166+
def _wake_selector(self) -> None:
167+
if self._closed:
168+
return
169+
try:
170+
self._waker_w.send(b"a")
171+
except BlockingIOError:
172+
pass
173+
174+
def _consume_waker(self) -> None:
175+
try:
176+
self._waker_r.recv(1024)
177+
except BlockingIOError:
178+
pass
179+
180+
def _start_select(self) -> None:
181+
# Capture reader and writer sets here in the event loop
182+
# thread to avoid any problems with concurrent
183+
# modification while the select loop uses them.
184+
with self._select_cond:
185+
assert self._select_args is None
186+
self._select_args = (list(self._readers.keys()), list(self._writers.keys()))
187+
self._select_cond.notify()
188+
189+
def _run_select(self) -> None:
190+
while True:
191+
with self._select_cond:
192+
while self._select_args is None and not self._closing_selector:
193+
self._select_cond.wait()
194+
if self._closing_selector:
195+
return
196+
assert self._select_args is not None
197+
to_read, to_write = self._select_args
198+
self._select_args = None
199+
200+
# We use the simpler interface of the select module instead of
201+
# the more stateful interface in the selectors module because
202+
# this class is only intended for use on windows, where
203+
# select.select is the only option. The selector interface
204+
# does not have well-documented thread-safety semantics that
205+
# we can rely on so ensuring proper synchronization would be
206+
# tricky.
207+
try:
208+
# On windows, selecting on a socket for write will not
209+
# return the socket when there is an error (but selecting
210+
# for reads works). Also select for errors when selecting
211+
# for writes, and merge the results.
212+
#
213+
# This pattern is also used in
214+
# https://github.com/python/cpython/blob/v3.8.0/Lib/selectors.py#L312-L317
215+
rs, ws, xs = select.select(to_read, to_write, to_write)
216+
ws = ws + xs
217+
except OSError as e:
218+
# After remove_reader or remove_writer is called, the file
219+
# descriptor may subsequently be closed on the event loop
220+
# thread. It's possible that this select thread hasn't
221+
# gotten into the select system call by the time that
222+
# happens in which case (at least on macOS), select may
223+
# raise a "bad file descriptor" error. If we get that
224+
# error, check and see if we're also being woken up by
225+
# polling the waker alone. If we are, just return to the
226+
# event loop and we'll get the updated set of file
227+
# descriptors on the next iteration. Otherwise, raise the
228+
# original error.
229+
if e.errno == getattr(errno, "WSAENOTSOCK", errno.EBADF):
230+
rs, _, _ = select.select([self._waker_r.fileno()], [], [], 0)
231+
if rs:
232+
ws = []
233+
else:
234+
raise
235+
else:
236+
raise
237+
238+
try:
239+
self._real_loop.call_soon_threadsafe(
240+
self._handle_select, rs, ws, context=self._main_thread_ctx
241+
)
242+
except RuntimeError:
243+
# "Event loop is closed". Swallow the exception for
244+
# consistency with PollIOLoop (and logical consistency
245+
# with the fact that we can't guarantee that an
246+
# add_callback that completes without error will
247+
# eventually execute).
248+
pass
249+
except AttributeError:
250+
# ProactorEventLoop may raise this instead of RuntimeError
251+
# if call_soon_threadsafe races with a call to close().
252+
# Swallow it too for consistency.
253+
pass
254+
255+
def _handle_select(
256+
self, rs: List[_FileDescriptorLike], ws: List[_FileDescriptorLike]
257+
) -> None:
258+
for r in rs:
259+
self._handle_event(r, self._readers)
260+
for w in ws:
261+
self._handle_event(w, self._writers)
262+
self._start_select()
263+
264+
def _handle_event(
265+
self,
266+
fd: _FileDescriptorLike,
267+
cb_map: Dict[_FileDescriptorLike, Callable],
268+
) -> None:
269+
try:
270+
callback = cb_map[fd]
271+
except KeyError:
272+
return
273+
callback()
274+
275+
def add_reader(
276+
self, fd: _FileDescriptorLike, callback: Callable[..., None], *args: Any
277+
) -> None:
278+
self._readers[fd] = functools.partial(callback, *args)
279+
self._wake_selector()
280+
281+
def add_writer(
282+
self, fd: _FileDescriptorLike, callback: Callable[..., None], *args: Any
283+
) -> None:
284+
self._writers[fd] = functools.partial(callback, *args)
285+
self._wake_selector()
286+
287+
def remove_reader(self, fd: _FileDescriptorLike) -> bool:
288+
try:
289+
del self._readers[fd]
290+
except KeyError:
291+
return False
292+
self._wake_selector()
293+
return True
294+
295+
def remove_writer(self, fd: _FileDescriptorLike) -> bool:
296+
try:
297+
del self._writers[fd]
298+
except KeyError:
299+
return False
300+
self._wake_selector()
301+
return True
302+

0 commit comments

Comments
 (0)