@@ -247,7 +247,7 @@ def _parent_header(self):
247247 execution_count = 0
248248
249249 # Asyncio lock to ensure only one control queue message is processed at a time.
250- _control_lock = asyncio .Lock ()
250+ _control_lock = asyncio .Lock
251251
252252 msg_types = [
253253 "execute_request" ,
@@ -543,6 +543,10 @@ def schedule_dispatch(self, dispatch, *args):
543543 # ensure the eventloop wakes up
544544 self .io_loop .add_callback (lambda : None )
545545
546+ async def _create_control_lock (self ):
547+ # This can be removed when minimum python increases to 3.10
548+ self ._control_lock = asyncio .Lock ()
549+
546550 def start (self ):
547551 """register dispatchers for streams"""
548552 self .io_loop = ioloop .IOLoop .current ()
@@ -552,6 +556,14 @@ def start(self):
552556 if self .control_stream :
553557 self .control_stream .on_recv (self .dispatch_control , copy = False )
554558
559+ if self .control_thread and sys .version_info < (3 , 10 ):
560+ # Before Python 3.10 we need to ensure the _control_lock is created in the
561+ # thread that uses it. When our minimum python is 3.10 we can remove this
562+ # and always use the else below, or just assign it where it is declared.
563+ self .control_thread .io_loop .add_callback (self ._create_control_lock )
564+ else :
565+ self ._control_lock = asyncio .Lock ()
566+
555567 if self .shell_stream :
556568 self .shell_stream .on_recv (
557569 partial (
0 commit comments