|
42 | 42 | import zmq_anyio |
43 | 43 | from anyio import ( |
44 | 44 | TASK_STATUS_IGNORED, |
45 | | - CancelScope, |
46 | 45 | Event, |
47 | 46 | create_memory_object_stream, |
48 | 47 | create_task_group, |
@@ -441,30 +440,34 @@ async def shell_main(self, subshell_id: str | None): |
441 | 440 | if socket not in self._send_exec_request: |
442 | 441 | send_stream, receive_stream = create_memory_object_stream(max_buffer_size=math.inf) |
443 | 442 | self._send_exec_request[socket] = send_stream |
444 | | - async with create_task_group() as tg: |
445 | | - if not socket.started.is_set(): |
446 | | - await tg.start(socket.start) |
447 | | - tg.start_soon(self._process_shell, socket) |
448 | | - tg.start_soon(self._execute_request_loop, receive_stream) |
449 | | - if not subshell_id: |
450 | | - # Main subshell |
451 | | - with contextlib.suppress(RuntimeError): |
452 | | - self.set_trait("asyncio_event_loop", asyncio.get_running_loop()) |
453 | | - async with create_task_group() as tg_main: |
454 | | - with CancelScope(shield=True) as scope: |
| 443 | + try: |
| 444 | + async with create_task_group() as tg: |
| 445 | + if not socket.started.is_set(): |
| 446 | + await tg.start(socket.start) |
| 447 | + tg.start_soon(self._process_shell, socket) |
| 448 | + tg.start_soon(self._execute_request_loop, receive_stream) |
| 449 | + if not subshell_id: |
| 450 | + # Main subshell |
| 451 | + with contextlib.suppress(RuntimeError): |
| 452 | + self.set_trait("asyncio_event_loop", asyncio.get_running_loop()) |
| 453 | + async with create_task_group() as tg_main: |
| 454 | + tg_main.cancel_scope.shield = True |
455 | 455 | self._tg_main = tg_main |
456 | 456 | async with BlockingPortal() as portal: |
457 | 457 | # Provide a portal for general threadsafe access |
458 | 458 | self._portal = portal |
459 | 459 | self._main_subshell_ready.set() |
460 | 460 | await to_thread.run_sync(self.shell_stop.wait) |
461 | 461 | await portal.stop(True) |
462 | | - scope.cancel() |
463 | | - tg_main.cancel_scope.cancel() |
464 | | - tg.cancel_scope.cancel() |
465 | | - self._send_exec_request.pop(socket, None) |
466 | | - await send_stream.aclose() |
467 | | - await receive_stream.aclose() |
| 462 | + tg_main.cancel_scope.cancel() |
| 463 | + tg.cancel_scope.cancel() |
| 464 | + except BaseException: |
| 465 | + if not self.shell_stop.is_set(): |
| 466 | + raise |
| 467 | + finally: |
| 468 | + self._send_exec_request.pop(socket, None) |
| 469 | + await send_stream.aclose() |
| 470 | + await receive_stream.aclose() |
468 | 471 |
|
469 | 472 | async def _execute_request_loop(self, receive_stream: MemoryObjectReceiveStream): |
470 | 473 | async with receive_stream: |
|
0 commit comments