Skip to content
5 changes: 1 addition & 4 deletions Lib/concurrent/interpreters/_queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
QueueError, QueueNotFoundError,
)
from ._crossinterp import (
UNBOUND_ERROR, UNBOUND_REMOVE,
UNBOUND, UNBOUND_ERROR, UNBOUND_REMOVE,
)

__all__ = [
Expand Down Expand Up @@ -46,9 +46,6 @@ class ItemInterpreterDestroyed(QueueError,
_PICKLED = 1


UNBOUND = _crossinterp.UnboundItem.singleton('queue', __name__)


def _serialize_unbound(unbound):
if unbound is UNBOUND:
unbound = _crossinterp.UNBOUND
Expand Down
2 changes: 2 additions & 0 deletions Lib/test/test_concurrent_futures/test_interpreter_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,8 @@ def run(taskid, ready, blocker):
ready.get(timeout=1) # blocking
except interpreters.QueueEmpty:
pass
except queues.QueueEmpty:
pass
Comment on lines +430 to +431
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happened here?

Copy link
Contributor Author

@note35 note35 Dec 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

./python -m test test_interpreters test_concurrent_futures.test_interpreter_pool 
Using random seed: 1867170309
0:00:00 load avg: 0.27 Run 2 tests sequentially in a single process
0:00:00 load avg: 0.27 [1/2] test_interpreters
0:00:24 load avg: 0.48 [1/2] test_interpreters passed
0:00:24 load avg: 0.48 [2/2] test_concurrent_futures.test_interpreter_pool
... (running forever without catching queues.QueueEmpty)
def test_blocking_with_limited_workers(self):
    ready = queues.create()
    blocker = queues.create()

    def run(taskid, ready, blocker):
        ready.put_nowait(taskid)
        blocker.get()

    numtasks = 10
    futures = []
    with self.executor_type(4) as executor:
        for i in range(numtasks):
            fut = executor.submit(run, i, ready, blocker)
            futures.append(fut)
        
        pending = numtasks
        while pending > 0:
            done = 0
            for _ in range(pending):
                try:
                    ready.get(timeout=1)
                    # https://github.com/python/cpython/blob/2a820e2b9cf9e470d9f5342019fca3fe7f4ed7bc/Lib/concurrent/interpreters/_queues.py#L261C20-L261C30

                    # test_interpreters test_concurrent_futures.test_interpreter_pool 
                    # Before: queue.get() throws interpreters.QueueEmpty
                    # After: queue.get() throws queues.QueueEmpty

                    # test_concurrent_futures.test_interpreter_pool 
                    # Before: queue.get() throws interpreters.QueueEmpty.
                    # After: queue.get() throws interpreters.QueueEmpty
                except interpreters.QueueEmpty:
                    pass                    
                except queues.QueueEmpty:
                    pass                    
                else:
                    # If the error is not caught, the loop is running forever.
                    done += 1
            pending -= done
            
            for _ in range(done):
                blocker.put_nowait(None)

Why does running test_interpreters + test_concurrent_futures.test_interpreter_pool make the queue.get() throws queues.QueueEmpty after having the same UNBOUND object in _queues.py and _crossinterp.py?

FWIK, cpython/Lib/concurrent/interpreters/init.py loads queues.QueueEmpty as interpreters.QueueEmpty, and before this PR, queue.get() is expected to throw interpreters.QueueEmpty,

For some reasons, queue.get() throws _queues.QueueEmpty after running test_interpreters. This may be another thing to explore further (I think it depends on how the sub-interpreter is implemented. Do you have any knowledge about this?

Given this is a test fix, maybe it's okay to have a further dive deep separately? I think this so far can be reproduced only during a combination of test_interpreters + test_concurrent_futures.test_interpreter_pool. (which is likely rare and not reported?) I'd also like to add a TODO gh-XXX, but we don't yet have enough context to make an issue...)

else:
done += 1
pending -= done
Expand Down
9 changes: 9 additions & 0 deletions Lib/test/test_interpreters/test_queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
# Raise SkipTest if subinterpreters not supported.
_queues = import_helper.import_module('_interpqueues')
from concurrent import interpreters
from concurrent.futures import InterpreterPoolExecutor
from concurrent.interpreters import _queues as queues, _crossinterp
from .utils import _run_output, TestBase as _TestBase

Expand Down Expand Up @@ -93,6 +94,14 @@ def test_bind_release(self):
with self.assertRaises(queues.QueueError):
_queues.release(qid)

def test_interpreter_pool_executor_after_reload(self):
# Regression test for gh-142414 (KeyError in serialize_unbound).
importlib.reload(queues)
code = "import struct"
with InterpreterPoolExecutor(max_workers=1) as executor:
results = executor.map(exec, [code] * 1)
self.assertEqual(list(results), [None] * 1)


class QueueTests(TestBase):

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix spurious :exc:`KeyError` when :mod:`concurrent.interpreters` is reloaded after import.
Loading