@@ -79,7 +79,6 @@ def _accepts_parameters(meth, param_names):
7979
8080class Kernel (SingletonConfigurable ):
8181 """The base kernel class."""
82-
8382 # ---------------------------------------------------------------------------
8483 # Kernel interface
8584 # ---------------------------------------------------------------------------
@@ -277,6 +276,12 @@ def __init__(self, **kwargs):
277276 """Initialize the kernel."""
278277 super ().__init__ (** kwargs )
279278
279+
280+
281+ self ._iant_lock = threading .Lock ()
282+
283+
284+
280285 # Kernel application may swap stdout and stderr to OutStream,
281286 # which is the case in `IPKernelApp.init_io`, hence `sys.stdout`
282287 # can already by different from TextIO at initialization time.
@@ -571,6 +576,10 @@ def schedule_dispatch(self, dispatch, *args):
571576
572577 def start (self ):
573578 """register dispatchers for streams"""
579+
580+ with open ("debug.txt" , "a" ) as f :
581+ f .write (f"--- kernelbase start --- { threading .main_thread ().ident } \n " )
582+
574583 self .io_loop = ioloop .IOLoop .current ()
575584 self .msg_queue : Queue [t .Any ] = Queue ()
576585 if not self .shell_channel_thread :
@@ -609,6 +618,9 @@ async def shell_channel_thread_main(self, msg):
609618 msg3 = self .session .deserialize (msg2 , content = False , copy = False )
610619 subshell_id = msg3 ["header" ].get ("subshell_id" )
611620
621+ with open ("debug.txt" , "a" ) as f :
622+ f .write (f"{ threading .current_thread ().ident } shell_channel_thread_main msg received for { subshell_id } \n " )
623+
612624 # Find inproc pair socket to use to send message to correct subshell.
613625 subshell_manager = self .shell_channel_thread .manager
614626 socket = subshell_manager .get_shell_channel_stream (subshell_id )
@@ -622,6 +634,10 @@ async def shell_channel_thread_main(self, msg):
622634
623635 async def shell_main (self , subshell_id : str | None , msg ):
624636 """Handler of shell messages for a single subshell"""
637+
638+ with open ("debug.txt" , "a" ) as f :
639+ f .write (f"{ threading .current_thread ().ident } shell_main msg recvd on { subshell_id } \n " )
640+
625641 if self ._supports_kernel_subshells :
626642 if subshell_id is None :
627643 assert threading .current_thread () == threading .main_thread ()
@@ -667,25 +683,35 @@ def _publish_execute_input(self, code, parent, execution_count):
667683 """Publish the code request on the iopub stream."""
668684 if not self .session :
669685 return
670- self .session .send (
671- self .iopub_socket ,
672- "execute_input" ,
673- {"code" : code , "execution_count" : execution_count },
674- parent = parent ,
675- ident = self ._topic ("execute_input" ),
676- )
686+ with self ._iant_lock :
687+
688+ with open ("debug.txt" , "a" ) as f :
689+ f .write (f"{ threading .current_thread ().ident } iopub_socket execute_input\n " )
690+
691+ self .session .send (
692+ self .iopub_socket ,
693+ "execute_input" ,
694+ {"code" : code , "execution_count" : execution_count },
695+ parent = parent ,
696+ ident = self ._topic ("execute_input" ),
697+ )
677698
678699 def _publish_status (self , status , channel , parent = None ):
679700 """send status (busy/idle) on IOPub"""
680701 if not self .session :
681702 return
682- self .session .send (
683- self .iopub_socket ,
684- "status" ,
685- {"execution_state" : status },
686- parent = parent or self .get_parent (channel ),
687- ident = self ._topic ("status" ),
688- )
703+ with self ._iant_lock :
704+
705+ with open ("debug.txt" , "a" ) as f :
706+ f .write (f"{ threading .current_thread ().ident } iopub_socket status { status } \n " )
707+
708+ self .session .send (
709+ self .iopub_socket ,
710+ "status" ,
711+ {"execution_state" : status },
712+ parent = parent or self .get_parent (channel ),
713+ ident = self ._topic ("status" ),
714+ )
689715
690716 def _publish_status_and_flush (self , status , channel , stream , parent = None ):
691717 """send status on IOPub and flush specified stream to ensure reply is sent before handling the next reply"""
@@ -696,13 +722,18 @@ def _publish_status_and_flush(self, status, channel, stream, parent=None):
696722 def _publish_debug_event (self , event ):
697723 if not self .session :
698724 return
699- self .session .send (
700- self .iopub_socket ,
701- "debug_event" ,
702- event ,
703- parent = self .get_parent (),
704- ident = self ._topic ("debug_event" ),
705- )
725+ with self ._iant_lock :
726+
727+ with open ("debug.txt" , "a" ) as f :
728+ f .write (f"{ threading .current_thread ().ident } iopub_socket debug_event\n " )
729+
730+ self .session .send (
731+ self .iopub_socket ,
732+ "debug_event" ,
733+ event ,
734+ parent = self .get_parent (),
735+ ident = self ._topic ("debug_event" ),
736+ )
706737
707738 def set_parent (self , ident , parent , channel = "shell" ):
708739 """Set the current parent request
@@ -763,6 +794,10 @@ def send_response(
763794 """
764795 if not self .session :
765796 return None
797+
798+ with open ("debug.txt" , "a" ) as f :
799+ f .write (f"{ threading .current_thread ().ident } ? ?send_response\n " )
800+
766801 return self .session .send (
767802 stream ,
768803 msg_or_type ,
@@ -835,6 +870,13 @@ async def execute_request(self, stream, ident, parent):
835870 if self ._do_exec_accepted_params ["cell_id" ]:
836871 do_execute_args ["cell_id" ] = cell_id
837872
873+
874+ subshell_id = parent ["header" ].get ("subshell_id" )
875+ msg_id = parent ["header" ].get ("msg_id" )
876+
877+ with open ("debug.txt" , "a" ) as f :
878+ f .write (f"{ threading .current_thread ().ident } about to execute_request { msg_id } { subshell_id } { code } \n " )
879+
838880 # Call do_execute with the appropriate arguments
839881 reply_content = self .do_execute (** do_execute_args )
840882
@@ -854,6 +896,9 @@ async def execute_request(self, stream, ident, parent):
854896 reply_content = json_clean (reply_content )
855897 metadata = self .finish_metadata (parent , metadata , reply_content )
856898
899+ with open ("debug.txt" , "a" ) as f :
900+ f .write (f"{ threading .current_thread ().ident } execute_reply { msg_id } { subshell_id } \n " )
901+
857902 reply_msg : dict [str , t .Any ] = self .session .send ( # type:ignore[assignment]
858903 stream ,
859904 "execute_reply" ,
@@ -943,6 +988,10 @@ async def history_request(self, stream, ident, parent):
943988 reply_content = await reply_content
944989
945990 reply_content = json_clean (reply_content )
991+
992+ with open ("debug.txt" , "a" ) as f :
993+ f .write (f"{ threading .current_thread ().ident } ? history_reply\n " )
994+
946995 msg = self .session .send (stream , "history_reply" , reply_content , parent , ident )
947996 self .log .debug ("%s" , msg )
948997
@@ -967,6 +1016,10 @@ async def connect_request(self, stream, ident, parent):
9671016 return
9681017 content = self ._recorded_ports .copy () if self ._recorded_ports else {}
9691018 content ["status" ] = "ok"
1019+
1020+ with open ("debug.txt" , "a" ) as f :
1021+ f .write (f"{ threading .current_thread ().ident } ? connect_reply\n " )
1022+
9701023 msg = self .session .send (stream , "connect_reply" , content , parent , ident )
9711024 self .log .debug ("%s" , msg )
9721025
@@ -991,6 +1044,10 @@ async def kernel_info_request(self, stream, ident, parent):
9911044 return
9921045 content = {"status" : "ok" }
9931046 content .update (self .kernel_info )
1047+
1048+ with open ("debug.txt" , "a" ) as f :
1049+ f .write (f"{ threading .current_thread ().ident } ? kernel_info_reply\n " )
1050+
9941051 msg = self .session .send (stream , "kernel_info_reply" , content , parent , ident )
9951052 self .log .debug ("%s" , msg )
9961053
@@ -1058,6 +1115,10 @@ async def shutdown_request(self, stream, ident, parent):
10581115 content = self .do_shutdown (parent ["content" ]["restart" ])
10591116 if inspect .isawaitable (content ):
10601117 content = await content
1118+
1119+ with open ("debug.txt" , "a" ) as f :
1120+ f .write (f"{ threading .current_thread ().ident } ? shutdown_reply\n " )
1121+
10611122 self .session .send (stream , "shutdown_reply" , content , parent , ident = ident )
10621123 # same content, but different msg_id for broadcasting on IOPub
10631124 self ._shutdown_message = self .session .msg ("shutdown_reply" , content , parent )
@@ -1108,6 +1169,10 @@ async def debug_request(self, stream, ident, parent):
11081169 if inspect .isawaitable (reply_content ):
11091170 reply_content = await reply_content
11101171 reply_content = json_clean (reply_content )
1172+
1173+ with open ("debug.txt" , "a" ) as f :
1174+ f .write (f"{ threading .current_thread ().ident } ? debug_reply\n " )
1175+
11111176 reply_msg = self .session .send (stream , "debug_reply" , reply_content , parent , ident )
11121177 self .log .debug ("%s" , reply_msg )
11131178
@@ -1177,6 +1242,10 @@ async def create_subshell_request(self, socket, ident, parent) -> None:
11771242 other_socket = self .shell_channel_thread .manager .get_control_other_socket ()
11781243 other_socket .send_json ({"type" : "create" })
11791244 reply = other_socket .recv_json ()
1245+
1246+ with open ("debug.txt" , "a" ) as f :
1247+ f .write (f"{ threading .current_thread ().ident } ? create_subshell_reply\n " )
1248+
11801249 self .session .send (socket , "create_subshell_reply" , reply , parent , ident )
11811250
11821251 async def delete_subshell_request (self , socket , ident , parent ) -> None :
@@ -1200,6 +1269,10 @@ async def delete_subshell_request(self, socket, ident, parent) -> None:
12001269 other_socket = self .shell_channel_thread .manager .get_control_other_socket ()
12011270 other_socket .send_json ({"type" : "delete" , "subshell_id" : subshell_id })
12021271 reply = other_socket .recv_json ()
1272+
1273+ with open ("debug.txt" , "a" ) as f :
1274+ f .write (f"{ threading .current_thread ().ident } ? delete_subshell_reply\n " )
1275+
12031276 self .session .send (socket , "delete_subshell_reply" , reply , parent , ident )
12041277
12051278 async def list_subshell_request (self , socket , ident , parent ) -> None :
@@ -1216,6 +1289,10 @@ async def list_subshell_request(self, socket, ident, parent) -> None:
12161289 other_socket = self .shell_channel_thread .manager .get_control_other_socket ()
12171290 other_socket .send_json ({"type" : "list" })
12181291 reply = other_socket .recv_json ()
1292+
1293+ with open ("debug.txt" , "a" ) as f :
1294+ f .write (f"{ threading .current_thread ().ident } ? list_subshell_reply\n " )
1295+
12191296 self .session .send (socket , "list_subshell_reply" , reply , parent , ident )
12201297
12211298 # ---------------------------------------------------------------------------
@@ -1280,6 +1357,10 @@ async def abort_request(self, stream, ident, parent): # pragma: no cover
12801357 content = dict (status = "ok" )
12811358 if not self .session :
12821359 return
1360+
1361+ with open ("debug.txt" , "a" ) as f :
1362+ f .write (f"{ threading .current_thread ().ident } ? abort_reply\n " )
1363+
12831364 reply_msg = self .session .send (
12841365 stream , "abort_reply" , content = content , parent = parent , ident = ident
12851366 )
@@ -1378,6 +1459,9 @@ def _send_abort_reply(self, stream, msg, idents):
13781459 md = self .finish_metadata (msg , md , status )
13791460 md .update (status )
13801461
1462+ with open ("debug.txt" , "a" ) as f :
1463+ f .write (f"{ threading .current_thread ().ident } ? { reply_type } \n " )
1464+
13811465 self .session .send (
13821466 stream ,
13831467 reply_type ,
@@ -1558,11 +1642,16 @@ async def _at_shutdown(self):
15581642
15591643 finally :
15601644 if self ._shutdown_message is not None and self .session :
1561- self .session .send (
1562- self .iopub_socket ,
1563- self ._shutdown_message ,
1564- ident = self ._topic ("shutdown" ),
1565- )
1645+ with self ._iant_lock :
1646+
1647+ with open ("debug.txt" , "a" ) as f :
1648+ f .write (f"{ threading .current_thread ().ident } ? _shutdown\n " )
1649+
1650+ self .session .send (
1651+ self .iopub_socket ,
1652+ self ._shutdown_message ,
1653+ ident = self ._topic ("shutdown" ),
1654+ )
15661655 self .log .debug ("%s" , self ._shutdown_message )
15671656 if self .control_stream :
15681657 self .control_stream .flush (zmq .POLLOUT )
0 commit comments