Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions Lib/_pyrepl/base_eventqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,16 @@
from .trace import trace

class BaseEventQueue:
_ESCAPE_TIMEOUT_MS = 50
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's correct to expect timeouts. People can write slowly. Or not. Hold a key, or not. I also can't figure an example of when this would happen.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See above.


def __init__(self, encoding: str, keymap_dict: dict[bytes, str]) -> None:
self.compiled_keymap = keymap.compile_keymap(keymap_dict)
self.keymap = self.compiled_keymap
trace("keymap {k!r}", k=self.keymap)
self.encoding = encoding
self.events: deque[Event] = deque()
self.buf = bytearray()
self._pending_escape_deadline: float | None = None

def get(self) -> Event | None:
"""
Expand Down Expand Up @@ -69,6 +72,48 @@ def insert(self, event: Event) -> None:
trace('added event {event}', event=event)
self.events.append(event)

def has_pending_escape_sequence(self) -> bool:
"""
Check if there's a potential escape sequence waiting for more input.

Returns True if we have exactly one byte (ESC) in the buffer and
we're in the middle of keymap navigation, indicating we're waiting
to see if more bytes will arrive to complete an escape sequence.
"""
return (
len(self.buf) == 1
and self.buf[0] == 27 # ESC byte
and self.keymap is not self.compiled_keymap
)

def should_emit_standalone_escape(self, current_time_ms: float) -> bool:
"""
Check if a pending ESC should be emitted as a standalone escape key.
"""
if not self.has_pending_escape_sequence():
return False

if self._pending_escape_deadline is None:
self._pending_escape_deadline = current_time_ms + self._ESCAPE_TIMEOUT_MS
return False

return current_time_ms >= self._pending_escape_deadline

def emit_standalone_escape(self) -> None:
"""
Emit the buffered ESC byte as a standalone escape key event.
"""
self.keymap = self.compiled_keymap
# Standalone ESC event
self.insert(Event('key', '\033', b'\033'))

# Just in case there are remaining bytes in the buffer
remaining = self.flush_buf()[1:]
for byte in remaining:
self.push(byte)

self._pending_escape_deadline = None

def push(self, char: int | bytes) -> None:
"""
Processes a character by updating the buffer and handling special key mappings.
Expand All @@ -78,6 +123,9 @@ def push(self, char: int | bytes) -> None:
char = ord_char.to_bytes()
self.buf.append(ord_char)

if self._pending_escape_deadline is not None:
self._pending_escape_deadline = None

if char in self.keymap:
if self.keymap is self.compiled_keymap:
# sanity check, buffer is empty when a special key comes
Expand Down
26 changes: 22 additions & 4 deletions Lib/_pyrepl/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import os
import time

from .types import VI_MODE_NORMAL

# Categories of actions:
# killing
# yanking
Expand Down Expand Up @@ -325,10 +327,19 @@ def do(self) -> None:
b = r.buffer
for _ in range(r.get_arg()):
p = r.pos + 1
if p <= len(b):
r.pos = p
# In vi normal mode, don't move past the last character
if r.vi_mode == VI_MODE_NORMAL:
eol_pos = r.eol()
max_pos = max(r.bol(), eol_pos - 1) if eol_pos > r.bol() else r.bol()
if p <= max_pos:
r.pos = p
else:
self.reader.error("end of line")
else:
self.reader.error("end of buffer")
if p <= len(b):
r.pos = p
else:
self.reader.error("end of buffer")


class beginning_of_line(MotionCommand):
Expand All @@ -338,7 +349,14 @@ def do(self) -> None:

class end_of_line(MotionCommand):
def do(self) -> None:
self.reader.pos = self.reader.eol()
r = self.reader
eol_pos = r.eol()
if r.vi_mode == VI_MODE_NORMAL:
bol_pos = r.bol()
# Don't go past the last character (but stay at bol if line is empty)
r.pos = max(bol_pos, eol_pos - 1) if eol_pos > bol_pos else bol_pos
else:
r.pos = eol_pos


class home(MotionCommand):
Expand Down
20 changes: 14 additions & 6 deletions Lib/_pyrepl/historical_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,19 +257,27 @@ def __post_init__(self) -> None:
)

def collect_keymap(self) -> tuple[tuple[KeySpec, CommandName], ...]:
return super().collect_keymap() + (
bindings: list[tuple[KeySpec, CommandName]] = [
(r"\C-n", "next-history"),
(r"\C-p", "previous-history"),
(r"\C-o", "operate-and-get-next"),
(r"\C-r", "reverse-history-isearch"),
(r"\C-s", "forward-history-isearch"),
(r"\M-r", "restore-history"),
(r"\M-.", "yank-arg"),
(r"\<page down>", "history-search-forward"),
(r"\x1b[6~", "history-search-forward"),
(r"\<page up>", "history-search-backward"),
(r"\x1b[5~", "history-search-backward"),
)
]

if not self.use_vi_mode:
bindings.extend(
[
(r"\M-r", "restore-history"),
(r"\M-.", "yank-arg"),
(r"\x1b[6~", "history-search-forward"),
(r"\x1b[5~", "history-search-backward"),
]
)

return super().collect_keymap() + tuple(bindings)

def select_item(self, i: int) -> None:
self.transient_history[self.historyi] = self.get_unicode()
Expand Down
Loading
Loading