Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 10 additions & 8 deletions Lib/_pyrepl/base_eventqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,11 @@ def empty(self) -> bool:
"""
return not self.events

def flush_buf(self) -> bytearray:
def flush_buf(self) -> None:
"""
Flushes the buffer and returns its contents.
Flushes the buffer.
"""
old = self.buf
self.buf = bytearray()
return old

def insert(self, event: Event) -> None:
"""
Expand All @@ -87,7 +85,8 @@ def push(self, char: int | bytes) -> None:
if isinstance(k, dict):
self.keymap = k
else:
self.insert(Event('key', k, bytes(self.flush_buf())))
self.flush_buf()
self.insert(Event('key', k))
self.keymap = self.compiled_keymap

elif self.buf and self.buf[0] == 27: # escape
Expand All @@ -96,8 +95,10 @@ def push(self, char: int | bytes) -> None:
# the docstring in keymap.py
trace('unrecognized escape sequence, propagating...')
self.keymap = self.compiled_keymap
self.insert(Event('key', '\033', b'\033'))
for _c in self.flush_buf()[1:]:
self.insert(Event('key', '\033'))
remaining = self.buf[1:]
self.flush_buf()
for _c in remaining:
self.push(_c)

else:
Expand All @@ -106,5 +107,6 @@ def push(self, char: int | bytes) -> None:
except UnicodeError:
return
else:
self.insert(Event('key', decoded, bytes(self.flush_buf())))
self.flush_buf()
self.insert(Event('key', decoded))
self.keymap = self.compiled_keymap
1 change: 0 additions & 1 deletion Lib/_pyrepl/console.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
class Event:
evt: str
data: str
raw: bytes = b""


@dataclass
Expand Down
14 changes: 4 additions & 10 deletions Lib/_pyrepl/unix_console.py
Original file line number Diff line number Diff line change
Expand Up @@ -537,19 +537,16 @@ def getpending(self):
Returns:
- Event: Pending event from the event queue.
"""
e = Event("key", "", b"")
e = Event("key", "")

while not self.event_queue.empty():
e2 = self.event_queue.get()
e.data += e2.data
e.raw += e.raw

amount = struct.unpack("i", ioctl(self.input_fd, FIONREAD, b"\0\0\0\0"))[0]
trace("getpending({a})", a=amount)
raw = self.__read(amount)
data = str(raw, self.encoding, "replace")
e.data += data
e.raw += raw
e.data += str(raw, self.encoding, "replace")
return e

else:
Expand All @@ -561,18 +558,15 @@ def getpending(self):
Returns:
- Event: Pending event from the event queue.
"""
e = Event("key", "", b"")
e = Event("key", "")

while not self.event_queue.empty():
e2 = self.event_queue.get()
e.data += e2.data
e.raw += e.raw

amount = 10000
raw = self.__read(amount)
data = str(raw, self.encoding, "replace")
e.data += data
e.raw += raw
e.data += str(raw, self.encoding, "replace")
return e

def clear(self):
Expand Down
2 changes: 1 addition & 1 deletion Lib/_pyrepl/windows_console.py
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@ def forgetinput(self) -> None:
def getpending(self) -> Event:
"""Return the characters that have been typed but not yet
processed."""
e = Event("key", "", b"")
e = Event("key", "")

while not self.event_queue.empty():
e2 = self.event_queue.get()
Expand Down
2 changes: 1 addition & 1 deletion Lib/test/test_pyrepl/support.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def more_lines(text: str, namespace: dict | None = None):

def code_to_events(code: str):
for c in code:
yield Event(evt="key", data=c, raw=bytearray(c.encode("utf-8")))
yield Event(evt="key", data=c)


def clean_screen(reader: ReadlineAlikeReader) -> list[str]:
Expand Down
14 changes: 6 additions & 8 deletions Lib/test/test_pyrepl/test_eventqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,25 +28,25 @@ def make_eventqueue(self) -> base_eventqueue.BaseEventQueue:

def test_get(self):
eq = self.make_eventqueue()
event = Event("key", "a", b"a")
event = Event("key", "a")
eq.insert(event)
self.assertEqual(eq.get(), event)

def test_empty(self):
eq = self.make_eventqueue()
self.assertTrue(eq.empty())
eq.insert(Event("key", "a", b"a"))
eq.insert(Event("key", "a"))
self.assertFalse(eq.empty())

def test_flush_buf(self):
eq = self.make_eventqueue()
eq.buf.extend(b"test")
self.assertEqual(eq.flush_buf(), b"test")
eq.flush_buf()
self.assertEqual(eq.buf, bytearray())

def test_insert(self):
eq = self.make_eventqueue()
event = Event("key", "a", b"a")
event = Event("key", "a")
eq.insert(event)
self.assertEqual(eq.events[0], event)

Expand Down Expand Up @@ -152,10 +152,8 @@ def test_push_single_chars_and_unicode_character_as_str(self):
eq = self.make_eventqueue()
eq.keymap = {}

def _event(evt, data, raw=None):
r = raw if raw is not None else data.encode(eq.encoding)
e = Event(evt, data, r)
return e
def _event(evt, data):
return Event(evt, data)

def _push(keys):
for k in keys:
Expand Down
Loading
Loading