Skip to content

Commit 984cf53

Browse files
committed
Properly detect single ESC key
1 parent 49c066b commit 984cf53

File tree

4 files changed

+101
-0
lines changed

4 files changed

+101
-0
lines changed

Lib/_pyrepl/base_eventqueue.py

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,16 @@
3131
from .trace import trace
3232

3333
class BaseEventQueue:
34+
_ESCAPE_TIMEOUT_MS = 50
35+
3436
def __init__(self, encoding: str, keymap_dict: dict[bytes, str]) -> None:
3537
self.compiled_keymap = keymap.compile_keymap(keymap_dict)
3638
self.keymap = self.compiled_keymap
3739
trace("keymap {k!r}", k=self.keymap)
3840
self.encoding = encoding
3941
self.events: deque[Event] = deque()
4042
self.buf = bytearray()
43+
self._pending_escape_deadline: float | None = None
4144

4245
def get(self) -> Event | None:
4346
"""
@@ -69,6 +72,50 @@ def insert(self, event: Event) -> None:
6972
trace('added event {event}', event=event)
7073
self.events.append(event)
7174

75+
def has_pending_escape_sequence(self) -> bool:
76+
"""
77+
Check if there's a potential escape sequence waiting for more input.
78+
79+
Returns True if we have exactly one byte (ESC) in the buffer and
80+
we're in the middle of keymap navigation, indicating we're waiting
81+
to see if more bytes will arrive to complete an escape sequence.
82+
"""
83+
return (
84+
len(self.buf) == 1
85+
and self.buf[0] == 27 # ESC byte
86+
and self.keymap is not self.compiled_keymap
87+
)
88+
89+
def should_emit_standalone_escape(self, current_time_ms: float) -> bool:
90+
"""
91+
Check if a pending ESC should be emitted as a standalone escape key.
92+
"""
93+
if not self.has_pending_escape_sequence():
94+
return False
95+
96+
if self._pending_escape_deadline is None:
97+
# First time checking - set the deadline
98+
self._pending_escape_deadline = current_time_ms + self._ESCAPE_TIMEOUT_MS
99+
return False
100+
101+
# Check if the deadline has passed
102+
return current_time_ms >= self._pending_escape_deadline
103+
104+
def emit_standalone_escape(self) -> None:
105+
"""
106+
Emit the buffered ESC byte as a standalone escape key event.
107+
"""
108+
self.keymap = self.compiled_keymap
109+
# Standalone ESC event
110+
self.insert(Event('key', '\033', b'\033'))
111+
112+
# Just in case there are remaining bytes in the buffer
113+
remaining = self.flush_buf()[1:]
114+
for byte in remaining:
115+
self.push(byte)
116+
117+
self._pending_escape_deadline = None
118+
72119
def push(self, char: int | bytes) -> None:
73120
"""
74121
Processes a character by updating the buffer and handling special key mappings.
@@ -78,6 +125,9 @@ def push(self, char: int | bytes) -> None:
78125
char = ord_char.to_bytes()
79126
self.buf.append(ord_char)
80127

128+
if self._pending_escape_deadline is not None:
129+
self._pending_escape_deadline = None
130+
81131
if char in self.keymap:
82132
if self.keymap is self.compiled_keymap:
83133
# sanity check, buffer is empty when a special key comes

Lib/_pyrepl/unix_console.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -419,6 +419,21 @@ def get_event(self, block: bool = True) -> Event | None:
419419
return None
420420

421421
while self.event_queue.empty():
422+
# Check if we have a pending escape sequence that needs timeout handling
423+
if self.event_queue.has_pending_escape_sequence():
424+
current_time_ms = time.monotonic() * 1000
425+
426+
if self.event_queue.should_emit_standalone_escape(current_time_ms):
427+
# Timeout expired - emit the ESC as a standalone key
428+
self.event_queue.emit_standalone_escape()
429+
break
430+
431+
if not self.wait(timeout=10):
432+
current_time_ms = time.monotonic() * 1000
433+
if self.event_queue.should_emit_standalone_escape(current_time_ms):
434+
self.event_queue.emit_standalone_escape()
435+
continue
436+
422437
while True:
423438
try:
424439
self.push_char(self.__read(1))
@@ -445,6 +460,7 @@ def wait(self, timeout: float | None = None) -> bool:
445460
or bool(self.pollob.poll(timeout))
446461
)
447462

463+
448464
def set_cursor_vis(self, visible):
449465
"""
450466
Set the visibility of the cursor.

Lib/_pyrepl/windows_console.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -446,6 +446,24 @@ def get_event(self, block: bool = True) -> Event | None:
446446
return None
447447

448448
while self.event_queue.empty():
449+
# Check if we have a pending escape sequence that needs timeout handling
450+
if self.event_queue.has_pending_escape_sequence():
451+
import time
452+
current_time_ms = time.monotonic() * 1000
453+
454+
if self.event_queue.should_emit_standalone_escape(current_time_ms):
455+
# Timeout expired - emit the ESC as a standalone key
456+
self.event_queue.emit_standalone_escape()
457+
break
458+
459+
# Wait for a short time to check for more input
460+
if not self.wait(timeout=10):
461+
# Check again after timeout
462+
current_time_ms = time.monotonic() * 1000
463+
if self.event_queue.should_emit_standalone_escape(current_time_ms):
464+
self.event_queue.emit_standalone_escape()
465+
continue
466+
449467
rec = self._read_input()
450468
if rec is None:
451469
return None
@@ -583,6 +601,7 @@ def repaint(self) -> None:
583601
raise NotImplementedError("No repaint support")
584602

585603

604+
586605
# Windows interop
587606
class CONSOLE_SCREEN_BUFFER_INFO(Structure):
588607
_fields_ = [

Lib/test/test_pyrepl/test_reader.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -551,6 +551,22 @@ def test_syntax_highlighting_literal_brace_in_fstring_or_tstring(self):
551551
self.maxDiff=None
552552
self.assert_screen_equal(reader, expected)
553553

554+
def test_vi_escape_switches_to_normal_mode(self):
555+
events = itertools.chain(
556+
code_to_events("hello"),
557+
[
558+
Event(evt="key", data="\x1b", raw=bytearray(b"\x1b")),
559+
Event(evt="key", data="h", raw=bytearray(b"h")),
560+
],
561+
)
562+
reader, _ = handle_all_events(
563+
events,
564+
prepare_reader=prepare_vi_reader,
565+
)
566+
self.assertEqual(reader.get_unicode(), "hello")
567+
self.assertTrue(reader.editor_mode.is_normal())
568+
self.assertEqual(reader.pos, len("hello") - 2) # After 'h' left movement
569+
554570
def test_control_characters(self):
555571
code = 'flag = "🏳️‍🌈"'
556572
events = code_to_events(code)

0 commit comments

Comments
 (0)