7171import rich .box
7272from prompt_toolkit import print_formatted_text
7373from prompt_toolkit .application import get_app
74+ from prompt_toolkit .auto_suggest import AutoSuggestFromHistory
75+ from prompt_toolkit .completion import Completer , DummyCompleter
76+ from prompt_toolkit .formatted_text import ANSI , FormattedText
77+ from prompt_toolkit .history import InMemoryHistory
78+ from prompt_toolkit .input import DummyInput , create_input
79+ from prompt_toolkit .key_binding import KeyBindings
80+ from prompt_toolkit .output import DummyOutput , create_output
81+ from prompt_toolkit .patch_stdout import patch_stdout
82+ from prompt_toolkit .shortcuts import CompleteStyle , PromptSession , set_title
7483from rich .console import (
7584 Group ,
7685 RenderableType ,
158167with contextlib .suppress (ImportError ):
159168 from IPython import start_ipython
160169
161- from prompt_toolkit .auto_suggest import AutoSuggestFromHistory
162- from prompt_toolkit .completion import Completer , DummyCompleter
163- from prompt_toolkit .formatted_text import ANSI , FormattedText
164- from prompt_toolkit .history import InMemoryHistory
165- from prompt_toolkit .input import DummyInput , create_input
166- from prompt_toolkit .key_binding import KeyBindings
167- from prompt_toolkit .output import DummyOutput , create_output
168- from prompt_toolkit .patch_stdout import patch_stdout
169- from prompt_toolkit .shortcuts import CompleteStyle , PromptSession , set_title
170-
171170try :
172171 if sys .platform == "win32" :
173172 from prompt_toolkit .output .win32 import NoConsoleScreenBufferError # type: ignore[attr-defined]
@@ -413,9 +412,6 @@ def __init__(
413412 else :
414413 self .stdout = sys .stdout
415414
416- # Key used for completion
417- self .completekey = completekey
418-
419415 # Attributes which should NOT be dynamically settable via the set command at runtime
420416 self .default_to_shell = False # Attempt to run unrecognized commands as shell commands
421417 self .allow_redirection = allow_redirection # Security setting to prevent redirection of stdout
@@ -468,17 +464,14 @@ def __init__(
468464 self ._persistent_history_length = persistent_history_length
469465 self ._initialize_history (persistent_history_file )
470466
471- # Initialize prompt-toolkit PromptSession
472- self .history_adapter = Cmd2History (self )
473- self .completer = Cmd2Completer (self )
474- self .lexer = Cmd2Lexer (self )
467+ # Initialize the main PromptSession
475468 self .bottom_toolbar = bottom_toolbar
469+ self .main_session = self ._initialize_main_session (auto_suggest , completekey )
476470
477- self .auto_suggest = None
478- if auto_suggest :
479- self .auto_suggest = AutoSuggestFromHistory ()
480-
481- self .session = self ._init_session ()
471+ # The session currently holding focus (either the main REPL or a command's
472+ # custom prompt). Completion and UI logic should reference this variable
473+ # to ensure they modify the correct session state.
474+ self .active_session = self .main_session
482475
483476 # Commands to exclude from the history command
484477 self .exclude_from_history = ['_eof' , 'history' ]
@@ -651,18 +644,18 @@ def __init__(
651644 # the current command being executed
652645 self .current_command : Statement | None = None
653646
654- def _init_session (self ) -> PromptSession [str ]:
647+ def _initialize_main_session (self , auto_suggest : bool , completekey : str ) -> PromptSession [str ]:
655648 """Initialize and return the core PromptSession for the application.
656649
657650 Builds an interactive session if stdin is a TTY. Otherwise, uses
658651 dummy drivers to support non-interactive streams like pipes or files.
659652 """
660653 key_bindings = None
661- if self . completekey != self .DEFAULT_COMPLETEKEY :
654+ if completekey != self .DEFAULT_COMPLETEKEY :
662655 # Configure prompt_toolkit `KeyBindings` with the custom key for completion
663656 key_bindings = KeyBindings ()
664657
665- @key_bindings .add (self . completekey )
658+ @key_bindings .add (completekey )
666659 def _ (event : Any ) -> None : # pragma: no cover
667660 """Trigger completion."""
668661 b = event .current_buffer
@@ -673,15 +666,15 @@ def _(event: Any) -> None: # pragma: no cover
673666
674667 # Base configuration
675668 kwargs : dict [str , Any ] = {
676- "auto_suggest" : self . auto_suggest ,
669+ "auto_suggest" : AutoSuggestFromHistory () if auto_suggest else None ,
677670 "bottom_toolbar" : self .get_bottom_toolbar if self .bottom_toolbar else None ,
678671 "complete_style" : CompleteStyle .MULTI_COLUMN ,
679672 "complete_in_thread" : True ,
680673 "complete_while_typing" : False ,
681- "completer" : self . completer ,
682- "history" : self . history_adapter ,
674+ "completer" : Cmd2Completer ( self ) ,
675+ "history" : Cmd2History ( self ) ,
683676 "key_bindings" : key_bindings ,
684- "lexer" : self . lexer ,
677+ "lexer" : Cmd2Lexer ( self ) ,
685678 "rprompt" : self .get_rprompt ,
686679 }
687680
@@ -2448,9 +2441,9 @@ def complete(
24482441
24492442 # Swap between COLUMN and MULTI_COLUMN style based on the number of matches.
24502443 if len (completions ) > self .max_column_completion_results :
2451- self .session .complete_style = CompleteStyle .MULTI_COLUMN
2444+ self .active_session .complete_style = CompleteStyle .MULTI_COLUMN
24522445 else :
2453- self .session .complete_style = CompleteStyle .COLUMN
2446+ self .active_session .complete_style = CompleteStyle .COLUMN
24542447
24552448 return completions # noqa: TRY300
24562449
@@ -3227,11 +3220,23 @@ def completedefault(self, *_ignored: Sequence[str]) -> Completions:
32273220 def _suggest_similar_command (self , command : str ) -> str | None :
32283221 return suggest_similar (command , self .get_visible_commands ())
32293222
3223+ @staticmethod
3224+ def _is_tty_session (session : PromptSession [str ]) -> bool :
3225+ """Determine if the session supports full terminal interactions.
3226+
3227+ Returns True if the session is attached to a real TTY or a virtual
3228+ terminal (like PipeInput in tests). Returns False if the session is
3229+ running in a headless environment (DummyInput).
3230+ """
3231+ # Validate against the session's assigned input driver rather than sys.stdin.
3232+ # This respects the fallback logic in _initialize_session() and allows unit
3233+ # tests to inject PipeInput for programmatic interaction.
3234+ return not isinstance (session .input , DummyInput )
3235+
32303236 def _read_raw_input (
32313237 self ,
32323238 prompt : Callable [[], ANSI | str ] | ANSI | str ,
32333239 session : PromptSession [str ],
3234- completer : Completer ,
32353240 ** prompt_kwargs : Any ,
32363241 ) -> str :
32373242 """Execute the low-level input read from either a terminal or a redirected stream.
@@ -3242,17 +3247,23 @@ def _read_raw_input(
32423247
32433248 :param prompt: the prompt text or a callable that returns the prompt.
32443249 :param session: the PromptSession instance to use for reading.
3245- :param completer: the completer to use for this specific input.
32463250 :param prompt_kwargs: additional arguments passed directly to session.prompt().
32473251 :return: the stripped input string.
32483252 :raises EOFError: if the input stream is closed or the user signals EOF (e.g., Ctrl+D)
32493253 """
32503254 # Check if the session is configured for interactive terminal use.
3251- if not isinstance (session .input , DummyInput ):
3255+ if self ._is_tty_session (session ):
3256+ if not callable (prompt ):
3257+ prompt = pt_filter_style (prompt )
3258+
32523259 with patch_stdout ():
3253- if not callable (prompt ):
3254- prompt = pt_filter_style (prompt )
3255- return session .prompt (prompt , completer = completer , ** prompt_kwargs )
3260+ try :
3261+ # Set this session as the active one for UI/completion logic.
3262+ self .active_session = session
3263+ return session .prompt (prompt , ** prompt_kwargs )
3264+ finally :
3265+ # Revert back to the main session.
3266+ self .active_session = self .main_session
32563267
32573268 # We're not at a terminal, so we're likely reading from a file or a pipe.
32583269 prompt_obj = prompt () if callable (prompt ) else prompt
@@ -3350,14 +3361,18 @@ def read_input(
33503361 )
33513362
33523363 temp_session : PromptSession [str ] = PromptSession (
3353- complete_style = self .session .complete_style ,
3354- complete_while_typing = self .session .complete_while_typing ,
3364+ auto_suggest = self .main_session .auto_suggest ,
3365+ complete_style = self .main_session .complete_style ,
3366+ complete_in_thread = self .main_session .complete_in_thread ,
3367+ complete_while_typing = self .main_session .complete_while_typing ,
3368+ completer = completer_to_use ,
33553369 history = InMemoryHistory (history ) if history is not None else InMemoryHistory (),
3356- input = self .session .input ,
3357- output = self .session .output ,
3370+ key_bindings = self .main_session .key_bindings ,
3371+ input = self .main_session .input ,
3372+ output = self .main_session .output ,
33583373 )
33593374
3360- return self ._read_raw_input (prompt , temp_session , completer_to_use )
3375+ return self ._read_raw_input (prompt , temp_session )
33613376
33623377 def _process_alerts (self ) -> None :
33633378 """Background worker that processes queued alerts and dynamic prompt updates."""
@@ -3452,8 +3467,7 @@ def _pre_prompt() -> None:
34523467 try :
34533468 return self ._read_raw_input (
34543469 prompt = prompt_to_use ,
3455- session = self .session ,
3456- completer = self .completer ,
3470+ session = self .main_session ,
34573471 pre_run = _pre_prompt ,
34583472 )
34593473 finally :
0 commit comments