Skip to content

Commit 5331ba6

Browse files
committed
Fix routing of background thread output when no parent is set explicitly
- removes guessing about whether threads should route output to their originating cell - thread-local value is persisted, if set explicitly - if no thread-local value is set, fallback on a global value (usually the latest cell) There's lots of duplication around, but I couldn't see a way to resolve that without breaking things.
1 parent f14b95b commit 5331ba6

6 files changed

Lines changed: 181 additions & 146 deletions

File tree

ipykernel/displayhook.py

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ def __init__(self, session, pub_socket):
2929

3030
self._parent_header: ContextVar[dict[str, Any]] = ContextVar("parent_header")
3131
self._parent_header.set({})
32+
self._parent_header_global = {}
3233

3334
def get_execution_count(self):
3435
"""This method is replaced in kernelapp"""
@@ -57,11 +58,16 @@ def __call__(self, obj):
5758

5859
@property
5960
def parent_header(self):
60-
return self._parent_header.get()
61+
try:
62+
return self._parent_header.get()
63+
except LookupError:
64+
return self._parent_header_global
6165

6266
def set_parent(self, parent):
6367
"""Set the parent header."""
64-
self._parent_header.set(extract_header(parent))
68+
parent_header = extract_header(parent)
69+
self._parent_header.set(parent_header)
70+
self._parent_header_global = parent_header
6571

6672

6773
class ZMQShellDisplayHook(DisplayHook):
@@ -83,11 +89,16 @@ def __init__(self, *args, **kwargs):
8389

8490
@property
8591
def parent_header(self):
86-
return self._parent_header.get()
92+
try:
93+
return self._parent_header.get()
94+
except LookupError:
95+
return self._parent_header_global
8796

8897
def set_parent(self, parent):
89-
"""Set the parent for outbound messages."""
90-
self._parent_header.set(extract_header(parent))
98+
"""Set the parent header."""
99+
parent_header = extract_header(parent)
100+
self._parent_header.set(parent_header)
101+
self._parent_header_global = parent_header
91102

92103
def start_displayhook(self):
93104
"""Start the display hook."""

ipykernel/iostream.py

Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -456,8 +456,6 @@ def __init__(
456456
"parent_header"
457457
)
458458
self._parent_header.set({})
459-
self._thread_to_parent = {}
460-
self._thread_to_parent_header = {}
461459
self._parent_header_global = {}
462460
self._master_pid = os.getpid()
463461
self._flush_pending = False
@@ -512,21 +510,11 @@ def __init__(
512510
@property
513511
def parent_header(self):
514512
try:
515-
# asyncio-specific
513+
# asyncio or thread-specific
516514
return self._parent_header.get()
517515
except LookupError:
518-
try:
519-
# thread-specific
520-
identity = threading.current_thread().ident
521-
# retrieve the outermost (oldest ancestor,
522-
# discounting the kernel thread) thread identity
523-
while identity in self._thread_to_parent:
524-
identity = self._thread_to_parent[identity]
525-
# use the header of the oldest ancestor
526-
return self._thread_to_parent_header[identity]
527-
except KeyError:
528-
# global (fallback)
529-
return self._parent_header_global
516+
# global (fallback)
517+
return self._parent_header_global
530518

531519
@parent_header.setter
532520
def parent_header(self, value):

ipykernel/ipkernel.py

Lines changed: 0 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44

55
import asyncio
66
import builtins
7-
import gc
87
import getpass
98
import os
109
import signal
@@ -17,15 +16,13 @@
1716
import comm
1817
from IPython.core import release
1918
from IPython.utils.tokenutil import line_at_cursor, token_at_cursor
20-
from jupyter_client.session import extract_header
2119
from traitlets import Any, Bool, HasTraits, Instance, List, Type, default, observe, observe_compat
2220
from zmq.eventloop.zmqstream import ZMQStream
2321

2422
from .comm.comm import BaseComm
2523
from .comm.manager import CommManager
2624
from .compiler import XCachingCompiler
2725
from .eventloops import _use_appnope
28-
from .iostream import OutStream
2926
from .kernelbase import Kernel as KernelBase
3027
from .kernelbase import _accepts_parameters
3128
from .zmqshell import ZMQInteractiveShell
@@ -167,14 +164,6 @@ def __init__(self, **kwargs):
167164

168165
appnope.nope()
169166

170-
self._new_threads_parent_header = {}
171-
self._initialize_thread_hooks()
172-
173-
if hasattr(gc, "callbacks"):
174-
# while `gc.callbacks` exists since Python 3.3, pypy does not
175-
# implement it even as of 3.9.
176-
gc.callbacks.append(self._clean_thread_parent_frames)
177-
178167
help_links = List(
179168
[
180169
{
@@ -374,8 +363,6 @@ def _dummy_context_manager(self, *args):
374363

375364
async def execute_request(self, stream, ident, parent):
376365
"""Override for cell output - cell reconciliation."""
377-
parent_header = extract_header(parent)
378-
self._associate_new_top_level_threads_with(parent_header)
379366
await super().execute_request(stream, ident, parent)
380367

381368
async def do_execute(
@@ -750,83 +737,6 @@ def do_clear(self):
750737
self.shell.reset(False)
751738
return dict(status="ok")
752739

753-
def _associate_new_top_level_threads_with(self, parent_header):
754-
"""Store the parent header to associate it with new top-level threads"""
755-
self._new_threads_parent_header = parent_header
756-
757-
def _initialize_thread_hooks(self):
758-
"""Store thread hierarchy and thread-parent_header associations."""
759-
stdout = self._stdout
760-
stderr = self._stderr
761-
kernel_thread_ident = threading.get_ident()
762-
kernel = self
763-
_threading_Thread_run = threading.Thread.run
764-
_threading_Thread__init__ = threading.Thread.__init__
765-
766-
def run_closure(self: threading.Thread):
767-
"""Wrap the `threading.Thread.start` to intercept thread identity.
768-
769-
This is needed because there is no "start" hook yet, but there
770-
might be one in the future: https://bugs.python.org/issue14073
771-
772-
This is a no-op if the `self._stdout` and `self._stderr` are not
773-
sub-classes of `OutStream`.
774-
"""
775-
776-
try:
777-
parent = self._ipykernel_parent_thread_ident # type:ignore[attr-defined]
778-
except AttributeError:
779-
return
780-
for stream in [stdout, stderr]:
781-
if isinstance(stream, OutStream):
782-
if parent == kernel_thread_ident:
783-
stream._thread_to_parent_header[self.ident] = (
784-
kernel._new_threads_parent_header
785-
)
786-
else:
787-
stream._thread_to_parent[self.ident] = parent
788-
_threading_Thread_run(self)
789-
790-
def init_closure(self: threading.Thread, *args, **kwargs):
791-
_threading_Thread__init__(self, *args, **kwargs)
792-
self._ipykernel_parent_thread_ident = threading.get_ident() # type:ignore[attr-defined]
793-
794-
threading.Thread.__init__ = init_closure # type:ignore[method-assign]
795-
threading.Thread.run = run_closure # type:ignore[method-assign]
796-
797-
def _clean_thread_parent_frames(
798-
self, phase: t.Literal["start", "stop"], info: dict[str, t.Any]
799-
):
800-
"""Clean parent frames of threads which are no longer running.
801-
This is meant to be invoked by garbage collector callback hook.
802-
803-
The implementation enumerates the threads because there is no "exit" hook yet,
804-
but there might be one in the future: https://bugs.python.org/issue14073
805-
806-
This is a no-op if the `self._stdout` and `self._stderr` are not
807-
sub-classes of `OutStream`.
808-
"""
809-
# Only run before the garbage collector starts
810-
if phase != "start":
811-
return
812-
active_threads = {thread.ident for thread in threading.enumerate()}
813-
for stream in [self._stdout, self._stderr]:
814-
if isinstance(stream, OutStream):
815-
thread_to_parent_header = stream._thread_to_parent_header
816-
for identity in list(thread_to_parent_header.keys()):
817-
if identity not in active_threads:
818-
try:
819-
del thread_to_parent_header[identity]
820-
except KeyError:
821-
pass
822-
thread_to_parent = stream._thread_to_parent
823-
for identity in list(thread_to_parent.keys()):
824-
if identity not in active_threads:
825-
try:
826-
del thread_to_parent[identity]
827-
except KeyError:
828-
pass
829-
830740

831741
# This exists only for backwards compatibility - use IPythonKernel instead
832742

ipykernel/kernelbase.py

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
import uuid
1818
import warnings
1919
from collections.abc import Mapping
20-
from contextvars import ContextVar
20+
from contextvars import Context, ContextVar, copy_context
2121
from datetime import datetime
2222
from functools import partial
2323
from signal import SIGINT, SIGTERM, Signals, default_int_handler, signal
@@ -201,6 +201,7 @@ def _default_ident(self):
201201
_control_parent_ident: bytes = b""
202202
_shell_parent: ContextVar[dict[str, Any]]
203203
_shell_parent_ident: ContextVar[bytes]
204+
_shell_context: Context
204205
# Kept for backward-compatibility, accesses _control_parent_ident and _shell_parent_ident,
205206
# see https://github.com/jupyterlab/jupyterlab/issues/17785
206207
_parent_ident: Mapping[str, bytes]
@@ -320,13 +321,14 @@ def __init__(self, **kwargs):
320321
self._shell_parent.set({})
321322
self._shell_parent_ident = ContextVar("shell_parent_ident")
322323
self._shell_parent_ident.set(b"")
324+
self._shell_context = copy_context()
323325

324326
# For backward compatibility so that _parent_ident["shell"] and _parent_ident["control"]
325327
# work as they used to for ipykernel >= 7
326328
self._parent_ident = LazyDict(
327329
{
328330
"control": lambda: self._control_parent_ident,
329-
"shell": lambda: self._shell_parent_ident.get(),
331+
"shell": lambda: self._get_shell_context_var(self._shell_parent_ident),
330332
}
331333
)
332334

@@ -768,6 +770,8 @@ def set_parent(self, ident, parent, channel="shell"):
768770
else:
769771
self._shell_parent_ident.set(ident)
770772
self._shell_parent.set(parent)
773+
# preserve the last call to set_parent
774+
self._shell_context = copy_context()
771775

772776
def get_parent(self, channel=None):
773777
"""Get the parent request associated with a channel.
@@ -794,7 +798,20 @@ def get_parent(self, channel=None):
794798

795799
if channel == "control":
796800
return self._control_parent
797-
return self._shell_parent.get()
801+
802+
return self._get_shell_context_var(self._shell_parent)
803+
804+
def _get_shell_context_var(self, var: ContextVar):
805+
"""Lookup a ContextVar, falling back on the shell context
806+
807+
Allows for user-launched Threads to still resolve to the shell's mai context
808+
809+
necessary for e.g. display from threads.
810+
"""
811+
try:
812+
return var.get()
813+
except LookupError:
814+
return self._shell_context[var]
798815

799816
def send_response(
800817
self,
@@ -1455,7 +1472,7 @@ def getpass(self, prompt="", stream=None):
14551472
)
14561473
return self._input_request(
14571474
prompt,
1458-
self._shell_parent_ident.get(),
1475+
self._get_shell_context_var(self._shell_parent_ident),
14591476
self.get_parent("shell"),
14601477
password=True,
14611478
)
@@ -1472,7 +1489,7 @@ def raw_input(self, prompt=""):
14721489
raise StdinNotImplementedError(msg)
14731490
return self._input_request(
14741491
str(prompt),
1475-
self._shell_parent_ident.get(),
1492+
self._get_shell_context_var(self._shell_parent_ident),
14761493
self.get_parent("shell"),
14771494
password=False,
14781495
)

ipykernel/zmqshell.py

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -63,14 +63,20 @@ def __init__(self, *args, **kwargs):
6363
super().__init__(*args, **kwargs)
6464
self._parent_header = contextvars.ContextVar("parent_header")
6565
self._parent_header.set({})
66+
self._parent_header_global = {}
6667

6768
@property
6869
def parent_header(self):
69-
return self._parent_header.get()
70+
try:
71+
return self._parent_header.get()
72+
except LookupError:
73+
return self._parent_header_global
7074

7175
def set_parent(self, parent):
7276
"""Set the parent for outbound messages."""
73-
self._parent_header.set(extract_header(parent))
77+
parent_header = extract_header(parent)
78+
self._parent_header.set(parent_header)
79+
self._parent_header_global = parent_header
7480

7581
def _flush_streams(self):
7682
"""flush IO Streams prior to display"""
@@ -673,11 +679,23 @@ def set_next_input(self, text, replace=False):
673679

674680
@property
675681
def parent_header(self):
676-
return self._parent_header.get()
682+
try:
683+
return self._parent_header.get()
684+
except LookupError:
685+
return self._parent_header_global
686+
687+
@parent_header.setter
688+
def parent_header(self, value):
689+
self._parent_header_global = value
690+
self._parent_header.set(value)
677691

678692
def set_parent(self, parent):
679-
"""Set the parent header for associating output with its triggering input"""
680-
self._parent_header.set(parent)
693+
"""Set the parent header for associating output with its triggering input
694+
695+
When called from a thread, sets the thread-local value, which persists
696+
until the next call from this thread.
697+
"""
698+
self.parent_header = parent
681699
self.displayhook.set_parent(parent) # type:ignore[attr-defined]
682700
self.display_pub.set_parent(parent) # type:ignore[attr-defined]
683701
if hasattr(self, "_data_pub"):
@@ -688,7 +706,12 @@ def set_parent(self, parent):
688706
sys.stderr.set_parent(parent)
689707

690708
def get_parent(self):
691-
"""Get the parent header."""
709+
"""Get the parent header.
710+
711+
If set_parent has never been called from the current thread,
712+
the value from the last call to set_parent from _any_ thread will be used
713+
(typically the currently running cell).
714+
"""
692715
return self.parent_header
693716

694717
def init_magics(self):

0 commit comments

Comments
 (0)