Skip to content

Commit cd7b85d

Browse files
committed
Properly detect single ESC key
1 parent a054e5d commit cd7b85d

File tree

3 files changed

+81
-0
lines changed

3 files changed

+81
-0
lines changed

Lib/_pyrepl/base_eventqueue.py

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,16 @@
3131
from .trace import trace
3232

3333
class BaseEventQueue:
34+
_ESCAPE_TIMEOUT_MS = 50
35+
3436
def __init__(self, encoding: str, keymap_dict: dict[bytes, str]) -> None:
3537
self.compiled_keymap = keymap.compile_keymap(keymap_dict)
3638
self.keymap = self.compiled_keymap
3739
trace("keymap {k!r}", k=self.keymap)
3840
self.encoding = encoding
3941
self.events: deque[Event] = deque()
4042
self.buf = bytearray()
43+
self._pending_escape_deadline: float | None = None
4144

4245
def get(self) -> Event | None:
4346
"""
@@ -69,6 +72,48 @@ def insert(self, event: Event) -> None:
6972
trace('added event {event}', event=event)
7073
self.events.append(event)
7174

75+
def has_pending_escape_sequence(self) -> bool:
76+
"""
77+
Check if there's a potential escape sequence waiting for more input.
78+
79+
Returns True if we have exactly one byte (ESC) in the buffer and
80+
we're in the middle of keymap navigation, indicating we're waiting
81+
to see if more bytes will arrive to complete an escape sequence.
82+
"""
83+
return (
84+
len(self.buf) == 1
85+
and self.buf[0] == 27 # ESC byte
86+
and self.keymap is not self.compiled_keymap
87+
)
88+
89+
def should_emit_standalone_escape(self, current_time_ms: float) -> bool:
90+
"""
91+
Check if a pending ESC should be emitted as a standalone escape key.
92+
"""
93+
if not self.has_pending_escape_sequence():
94+
return False
95+
96+
if self._pending_escape_deadline is None:
97+
self._pending_escape_deadline = current_time_ms + self._ESCAPE_TIMEOUT_MS
98+
return False
99+
100+
return current_time_ms >= self._pending_escape_deadline
101+
102+
def emit_standalone_escape(self) -> None:
103+
"""
104+
Emit the buffered ESC byte as a standalone escape key event.
105+
"""
106+
self.keymap = self.compiled_keymap
107+
# Standalone ESC event
108+
self.insert(Event('key', '\033', b'\033'))
109+
110+
# Just in case there are remaining bytes in the buffer
111+
remaining = self.flush_buf()[1:]
112+
for byte in remaining:
113+
self.push(byte)
114+
115+
self._pending_escape_deadline = None
116+
72117
def push(self, char: int | bytes) -> None:
73118
"""
74119
Processes a character by updating the buffer and handling special key mappings.
@@ -78,6 +123,9 @@ def push(self, char: int | bytes) -> None:
78123
char = ord_char.to_bytes()
79124
self.buf.append(ord_char)
80125

126+
if self._pending_escape_deadline is not None:
127+
self._pending_escape_deadline = None
128+
81129
if char in self.keymap:
82130
if self.keymap is self.compiled_keymap:
83131
# sanity check, buffer is empty when a special key comes

Lib/_pyrepl/unix_console.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -419,6 +419,19 @@ def get_event(self, block: bool = True) -> Event | None:
419419
return None
420420

421421
while self.event_queue.empty():
422+
if self.event_queue.has_pending_escape_sequence():
423+
current_time_ms = time.monotonic() * 1000
424+
425+
if self.event_queue.should_emit_standalone_escape(current_time_ms):
426+
self.event_queue.emit_standalone_escape()
427+
break
428+
429+
if not self.wait(timeout=10):
430+
current_time_ms = time.monotonic() * 1000
431+
if self.event_queue.should_emit_standalone_escape(current_time_ms):
432+
self.event_queue.emit_standalone_escape()
433+
continue
434+
422435
while True:
423436
try:
424437
self.push_char(self.__read(1))
@@ -445,6 +458,7 @@ def wait(self, timeout: float | None = None) -> bool:
445458
or bool(self.pollob.poll(timeout))
446459
)
447460

461+
448462
def set_cursor_vis(self, visible):
449463
"""
450464
Set the visibility of the cursor.

Lib/_pyrepl/windows_console.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -446,6 +446,24 @@ def get_event(self, block: bool = True) -> Event | None:
446446
return None
447447

448448
while self.event_queue.empty():
449+
# Check if we have a pending escape sequence that needs timeout handling
450+
if self.event_queue.has_pending_escape_sequence():
451+
import time
452+
current_time_ms = time.monotonic() * 1000
453+
454+
if self.event_queue.should_emit_standalone_escape(current_time_ms):
455+
# Timeout expired - emit the ESC as a standalone key
456+
self.event_queue.emit_standalone_escape()
457+
break
458+
459+
# Wait for a short time to check for more input
460+
if not self.wait(timeout=10):
461+
# Check again after timeout
462+
current_time_ms = time.monotonic() * 1000
463+
if self.event_queue.should_emit_standalone_escape(current_time_ms):
464+
self.event_queue.emit_standalone_escape()
465+
continue
466+
449467
rec = self._read_input()
450468
if rec is None:
451469
return None
@@ -583,6 +601,7 @@ def repaint(self) -> None:
583601
raise NotImplementedError("No repaint support")
584602

585603

604+
586605
# Windows interop
587606
class CONSOLE_SCREEN_BUFFER_INFO(Structure):
588607
_fields_ = [

0 commit comments

Comments
 (0)