diff --git a/changelog.md b/changelog.md index 0a8dad08..43f014e6 100644 --- a/changelog.md +++ b/changelog.md @@ -43,6 +43,7 @@ Internal * Skip more tests when a database connection is not present. * Move SQL utilities to a new `sql_utils.py`. * Move CLI utilities to a new `cli_utils.py`. +* Move keybinding utilities to a new `key_binding_utils.py`. 1.67.1 (2026/03/28) diff --git a/mycli/key_bindings.py b/mycli/key_bindings.py index 1399319f..950a9af1 100644 --- a/mycli/key_bindings.py +++ b/mycli/key_bindings.py @@ -1,3 +1,4 @@ +from functools import partial import logging import webbrowser @@ -11,11 +12,12 @@ emacs_mode, ) from prompt_toolkit.key_binding import KeyBindings +from prompt_toolkit.key_binding.bindings.named_commands import register as ptoolkit_register from prompt_toolkit.key_binding.key_processor import KeyPressEvent from prompt_toolkit.selection import SelectionType from mycli.constants import DOCS_URL -from mycli.packages import shortcuts +from mycli.packages import key_binding_utils from mycli.packages.ptoolkit.fzf import search_history from mycli.packages.ptoolkit.utils import safe_invalidate_display @@ -53,6 +55,14 @@ def print_f1_help(): app.print_text('\n') +@ptoolkit_register("edit-and-execute-command") +def edit_and_execute(event: KeyPressEvent) -> None: + """Different from the prompt-toolkit default, we want to have a choice not + to execute a query after editing, hence validate_and_handle=False.""" + buff = event.current_buffer + buff.open_in_editor(validate_and_handle=False) + + def mycli_bindings(mycli) -> KeyBindings: """Custom key bindings for mycli.""" kb = KeyBindings() @@ -207,7 +217,7 @@ def _(event: KeyPressEvent) -> None: b = event.app.current_buffer if b.text: - b.transform_region(0, len(b.text), mycli.handle_prettify_binding) + b.transform_region(0, len(b.text), partial(key_binding_utils.handle_prettify_binding, mycli)) @kb.add("c-x", "u", filter=emacs_mode) def _(event: KeyPressEvent) -> None: @@ -220,7 +230,7 @@ def _(event: KeyPressEvent) -> None: b = event.app.current_buffer if b.text: - b.transform_region(0, len(b.text), mycli.handle_unprettify_binding) + b.transform_region(0, len(b.text), partial(key_binding_utils.handle_unprettify_binding, mycli)) @kb.add("c-o", "d", filter=emacs_mode) def _(event: KeyPressEvent) -> None: @@ -229,7 +239,7 @@ def _(event: KeyPressEvent) -> None: """ _logger.debug("Detected key.") - event.app.current_buffer.insert_text(shortcuts.server_date(mycli.sqlexecute)) + event.app.current_buffer.insert_text(key_binding_utils.server_date(mycli.sqlexecute)) @kb.add("c-o", "c-d", filter=emacs_mode) def _(event: KeyPressEvent) -> None: @@ -238,7 +248,7 @@ def _(event: KeyPressEvent) -> None: """ _logger.debug("Detected key.") - event.app.current_buffer.insert_text(shortcuts.server_date(mycli.sqlexecute, quoted=True)) + event.app.current_buffer.insert_text(key_binding_utils.server_date(mycli.sqlexecute, quoted=True)) @kb.add("c-o", "t", filter=emacs_mode) def _(event: KeyPressEvent) -> None: @@ -247,7 +257,7 @@ def _(event: KeyPressEvent) -> None: """ _logger.debug("Detected key.") - event.app.current_buffer.insert_text(shortcuts.server_datetime(mycli.sqlexecute)) + event.app.current_buffer.insert_text(key_binding_utils.server_datetime(mycli.sqlexecute)) @kb.add("c-o", "c-t", filter=emacs_mode) def _(event: KeyPressEvent) -> None: @@ -256,7 +266,7 @@ def _(event: KeyPressEvent) -> None: """ _logger.debug("Detected key.") - event.app.current_buffer.insert_text(shortcuts.server_datetime(mycli.sqlexecute, quoted=True)) + event.app.current_buffer.insert_text(key_binding_utils.server_datetime(mycli.sqlexecute, quoted=True)) @kb.add("c-r", filter=control_is_searchable) def _(event: KeyPressEvent) -> None: diff --git a/mycli/main.py b/mycli/main.py index 2dff8bd0..ba1484b5 100755 --- a/mycli/main.py +++ b/mycli/main.py @@ -14,7 +14,7 @@ import sys import threading import traceback -from typing import IO, Any, Callable, Generator, Iterable, Literal +from typing import IO, Any, Generator, Iterable, Literal try: from pwd import getpwuid @@ -50,8 +50,6 @@ to_formatted_text, to_plain_text, ) -from prompt_toolkit.key_binding.bindings.named_commands import register as prompt_register -from prompt_toolkit.key_binding.key_processor import KeyPressEvent from prompt_toolkit.layout.processors import ConditionalProcessor, HighlightMatchingBracketProcessor from prompt_toolkit.lexers import PygmentsLexer from prompt_toolkit.output import ColorDepth @@ -60,7 +58,6 @@ from pymysql.constants.CR import CR_SERVER_LOST from pymysql.constants.ER import ACCESS_DENIED_ERROR, HANDSHAKE_ERROR from pymysql.cursors import Cursor -import sqlglot import sqlparse from mycli import __version__ @@ -93,6 +90,10 @@ from mycli.packages.cli_utils import filtered_sys_argv, is_valid_connection_scheme from mycli.packages.filepaths import dir_path_exists, guess_socket_location from mycli.packages.hybrid_redirection import get_redirect_components, is_redirect_command +from mycli.packages.key_binding_utils import ( + handle_clip_command, + handle_editor_command, +) from mycli.packages.prompt_utils import confirm, confirm_destructive_query from mycli.packages.ptoolkit.history import FileHistoryWithTimestamp from mycli.packages.special.favoritequeries import FavoriteQueries @@ -871,99 +872,6 @@ def _connect( self.echo(str(e), err=True, fg="red") sys.exit(1) - def handle_editor_command( - self, - text: str, - inputhook: Callable | None, - loaded_message_fn: Callable, - ) -> str: - r"""Editor command is any query that is prefixed or suffixed by a '\e'. - The reason for a while loop is because a user might edit a query - multiple times. For eg: - - "select * from \e" to edit it in vim, then come - back to the prompt with the edited query "select * from - blah where q = 'abc'\e" to edit it again. - :param text: Document - :return: Document - - """ - - while special.editor_command(text): - filename = special.get_filename(text) - query = special.get_editor_query(text) or self.get_last_query() - sql, message = special.open_external_editor(filename=filename, sql=query) - if message: - # Something went wrong. Raise an exception and bail. - raise RuntimeError(message) - while True: - try: - assert isinstance(self.prompt_app, PromptSession) - text = self.prompt_app.prompt( - default=sql, - inputhook=inputhook, - message=loaded_message_fn, - ) - break - except KeyboardInterrupt: - sql = "" - - continue - return text - - def handle_clip_command(self, text: str) -> bool: - r"""A clip command is any query that is prefixed or suffixed by a - '\clip'. - - :param text: Document - :return: Boolean - - """ - - if special.clip_command(text): - query = special.get_clip_query(text) or self.get_last_query() - message = special.copy_query_to_clipboard(sql=query) - if message: - raise RuntimeError(message) - return True - return False - - def handle_prettify_binding(self, text: str) -> str: - if not text: - return '' - try: - statements = sqlglot.parse(text, read='mysql') - except Exception: - statements = [] - if len(statements) == 1 and statements[0]: - parse_succeeded = True - pretty_text = statements[0].sql(pretty=True, pad=4, dialect='mysql') - else: - parse_succeeded = False - pretty_text = text.rstrip(';') - self.toolbar_error_message = 'Prettify failed to parse single statement' - if pretty_text and parse_succeeded: - pretty_text = pretty_text + ';' - return pretty_text - - def handle_unprettify_binding(self, text: str) -> str: - if not text: - return '' - try: - statements = sqlglot.parse(text, read='mysql') - except Exception: - statements = [] - if len(statements) == 1 and statements[0]: - parse_succeeded = True - unpretty_text = statements[0].sql(pretty=False, dialect='mysql') - else: - parse_succeeded = False - unpretty_text = text.rstrip(';') - self.toolbar_error_message = 'Unprettify failed to parse single statement' - if unpretty_text and parse_succeeded: - unpretty_text = unpretty_text + ';' - return unpretty_text - def output_timing(self, timing: str, is_warnings_style: bool = False) -> None: self.log_output(timing) add_style = 'class:warnings.timing' if is_warnings_style else 'class:output.timing' @@ -1168,7 +1076,8 @@ def one_iteration(text: str | None = None) -> None: special.set_forced_horizontal_output(False) try: - text = self.handle_editor_command( + text = handle_editor_command( + self, text, inputhook, loaded_message_fn, @@ -1180,7 +1089,7 @@ def one_iteration(text: str | None = None) -> None: return try: - if self.handle_clip_command(text): + if handle_clip_command(self, text): return except RuntimeError as e: logger.error("sql: %r, error: %r", text, e) @@ -2698,14 +2607,6 @@ def tips_picker() -> str: return choice(tips) if tips else r'\? or "help" for help!' -@prompt_register("edit-and-execute-command") -def edit_and_execute(event: KeyPressEvent) -> None: - """Different from the prompt-toolkit default, we want to have a choice not - to execute a query after editing, hence validate_and_handle=False.""" - buff = event.current_buffer - buff.open_in_editor(validate_and_handle=False) - - def main() -> int | None: try: result = click_entrypoint.main( diff --git a/mycli/packages/key_binding_utils.py b/mycli/packages/key_binding_utils.py new file mode 100644 index 00000000..887b1fa7 --- /dev/null +++ b/mycli/packages/key_binding_utils.py @@ -0,0 +1,133 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING, Callable + +from prompt_toolkit.shortcuts import PromptSession +import sqlglot + +from mycli.packages import special +from mycli.sqlexecute import SQLExecute + +if TYPE_CHECKING: + from mycli.main import MyCli + + +def server_date(sqlexecute: SQLExecute, quoted: bool = False) -> str: + server_date_str = sqlexecute.now().strftime('%Y-%m-%d') + if quoted: + return f"'{server_date_str}'" + else: + return server_date_str + + +def server_datetime(sqlexecute: SQLExecute, quoted: bool = False) -> str: + server_datetime_str = sqlexecute.now().strftime('%Y-%m-%d %H:%M:%S') + if quoted: + return f"'{server_datetime_str}'" + else: + return server_datetime_str + + +# todo: maybe these handlers belong in a repl_handlers.py (which does not exist yet) +# \clip doesn't even have a keybinding +def handle_clip_command(mycli: 'MyCli', text: str) -> bool: + r"""A clip command is any query that is prefixed or suffixed by a + '\clip'. + + :param text: Document + :return: Boolean + + """ + + if special.clip_command(text): + query = special.get_clip_query(text) or mycli.get_last_query() + message = special.copy_query_to_clipboard(sql=query) + if message: + raise RuntimeError(message) + return True + return False + + +def handle_editor_command( + mycli: 'MyCli', + text: str, + inputhook: Callable | None, + loaded_message_fn: Callable, +) -> str: + r"""Editor command is any query that is prefixed or suffixed by a '\e'. + The reason for a while loop is because a user might edit a query + multiple times. For eg: + + "select * from \e" to edit it in vim, then come + back to the prompt with the edited query "select * from + blah where q = 'abc'\e" to edit it again. + :param text: Document + :return: Document + + """ + + while special.editor_command(text): + filename = special.get_filename(text) + query = special.get_editor_query(text) or mycli.get_last_query() + sql, message = special.open_external_editor(filename=filename, sql=query) + if message: + # Something went wrong. Raise an exception and bail. + raise RuntimeError(message) + while True: + try: + assert isinstance(mycli.prompt_app, PromptSession) + text = mycli.prompt_app.prompt( + default=sql, + inputhook=inputhook, + message=loaded_message_fn, + ) + break + except KeyboardInterrupt: + sql = "" + + continue + return text + + +def handle_prettify_binding( + mycli: 'MyCli', + text: str, +) -> str: + if not text: + return '' + try: + statements = sqlglot.parse(text, read='mysql') + except Exception: + statements = [] + if len(statements) == 1 and statements[0]: + parse_succeeded = True + pretty_text = statements[0].sql(pretty=True, pad=4, dialect='mysql') + else: + parse_succeeded = False + pretty_text = text.rstrip(';') + mycli.toolbar_error_message = 'Prettify failed to parse single statement' + if pretty_text and parse_succeeded: + pretty_text = pretty_text + ';' + return pretty_text + + +def handle_unprettify_binding( + mycli: 'MyCli', + text: str, +) -> str: + if not text: + return '' + try: + statements = sqlglot.parse(text, read='mysql') + except Exception: + statements = [] + if len(statements) == 1 and statements[0]: + parse_succeeded = True + unpretty_text = statements[0].sql(pretty=False, dialect='mysql') + else: + parse_succeeded = False + unpretty_text = text.rstrip(';') + mycli.toolbar_error_message = 'Unprettify failed to parse single statement' + if unpretty_text and parse_succeeded: + unpretty_text = unpretty_text + ';' + return unpretty_text diff --git a/mycli/packages/shortcuts.py b/mycli/packages/shortcuts.py deleted file mode 100644 index b4dbf785..00000000 --- a/mycli/packages/shortcuts.py +++ /dev/null @@ -1,17 +0,0 @@ -from mycli.sqlexecute import SQLExecute - - -def server_date(sqlexecute: SQLExecute, quoted: bool = False) -> str: - server_date_str = sqlexecute.now().strftime('%Y-%m-%d') - if quoted: - return f"'{server_date_str}'" - else: - return server_date_str - - -def server_datetime(sqlexecute: SQLExecute, quoted: bool = False) -> str: - server_datetime_str = sqlexecute.now().strftime('%Y-%m-%d %H:%M:%S') - if quoted: - return f"'{server_datetime_str}'" - else: - return server_datetime_str diff --git a/test/pytests/test_key_binding_utils.py b/test/pytests/test_key_binding_utils.py new file mode 100644 index 00000000..248d3616 --- /dev/null +++ b/test/pytests/test_key_binding_utils.py @@ -0,0 +1,228 @@ +import datetime +from typing import Any, cast + +import pytest + +from mycli.packages import key_binding_utils + + +class FakeSQLExecute: + def __init__(self, now_value: datetime.datetime) -> None: + self.now_value = now_value + + def now(self) -> datetime.datetime: + return self.now_value + + +class FakePromptSession: + def __init__(self, responses: list[object]) -> None: + self.responses = list(responses) + self.prompt_calls: list[dict[str, Any]] = [] + + def prompt(self, *, default: str, inputhook: Any, message: Any) -> str: + self.prompt_calls.append({ + 'default': default, + 'inputhook': inputhook, + 'message': message, + }) + response = self.responses.pop(0) + if isinstance(response, BaseException): + raise response + return cast(str, response) + + +class FakeMyCli: + def __init__( + self, + *, + prompt_app: FakePromptSession | None = None, + last_query: str = 'last query', + ) -> None: + self.prompt_app = prompt_app + self.last_query = last_query + self.toolbar_error_message: str | None = None + + def get_last_query(self) -> str: + return self.last_query + + +def test_server_date_returns_quoted_and_unquoted_values() -> None: + sqlexecute = FakeSQLExecute(datetime.datetime(2026, 4, 3, 14, 5, 6)) + + assert key_binding_utils.server_date(cast(Any, sqlexecute)) == '2026-04-03' + assert key_binding_utils.server_date(cast(Any, sqlexecute), quoted=True) == "'2026-04-03'" + + +def test_server_datetime_returns_quoted_and_unquoted_values() -> None: + sqlexecute = FakeSQLExecute(datetime.datetime(2026, 4, 3, 14, 5, 6)) + + assert key_binding_utils.server_datetime(cast(Any, sqlexecute)) == '2026-04-03 14:05:06' + assert key_binding_utils.server_datetime(cast(Any, sqlexecute), quoted=True) == "'2026-04-03 14:05:06'" + + +def test_prettify_statement(): + statement = 'SELECT 1' + mycli = FakeMyCli() + pretty_statement = key_binding_utils.handle_prettify_binding(cast(Any, mycli), statement) + assert pretty_statement == 'SELECT\n 1;' + + +def test_unprettify_statement(): + statement = 'SELECT\n 1' + mycli = FakeMyCli() + unpretty_statement = key_binding_utils.handle_unprettify_binding(cast(Any, mycli), statement) + assert unpretty_statement == 'SELECT 1;' + + +def test_handle_editor_command_returns_text_unchanged_when_not_editor_command(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setattr(key_binding_utils.special, 'editor_command', lambda text: False) + + mycli = FakeMyCli() + + assert key_binding_utils.handle_editor_command(cast(Any, mycli), 'select 1', None, lambda: 'loaded') == 'select 1' + + +def test_handle_editor_command_opens_editor_reprompts_after_keyboard_interrupt_and_returns_text(monkeypatch: pytest.MonkeyPatch) -> None: + prompt_app = FakePromptSession([KeyboardInterrupt(), 'edited sql']) + mycli = FakeMyCli(prompt_app=prompt_app) + open_calls: list[dict[str, str]] = [] + + def inputhook(*args: object, **kwargs: object) -> None: + return None + + def loaded_message_fn() -> str: + return 'loaded' + + def open_external_editor(*, filename: str | None, sql: str) -> tuple[str, str | None]: + open_calls.append({'filename': cast(str, filename), 'sql': sql}) + return 'SELECT 1', None + + monkeypatch.setattr(key_binding_utils, 'PromptSession', FakePromptSession) + monkeypatch.setattr(key_binding_utils.special, 'editor_command', lambda text: text in {'\\e', ''}) + monkeypatch.setattr(key_binding_utils.special, 'get_filename', lambda text: 'query.sql') + monkeypatch.setattr(key_binding_utils.special, 'get_editor_query', lambda text: '' if text == '\\e' else None) + monkeypatch.setattr( + key_binding_utils.special, + 'open_external_editor', + open_external_editor, + ) + + result = key_binding_utils.handle_editor_command(cast(Any, mycli), '\\e', inputhook, loaded_message_fn) + + assert result == 'edited sql' + assert open_calls == [{'filename': 'query.sql', 'sql': 'last query'}] + assert prompt_app.prompt_calls == [ + {'default': 'SELECT 1', 'inputhook': inputhook, 'message': loaded_message_fn}, + {'default': '', 'inputhook': inputhook, 'message': loaded_message_fn}, + ] + + +def test_handle_editor_command_uses_explicit_editor_query_and_raises_on_editor_error(monkeypatch: pytest.MonkeyPatch) -> None: + mycli = FakeMyCli(prompt_app=FakePromptSession([])) + + monkeypatch.setattr(key_binding_utils.special, 'editor_command', lambda text: True) + monkeypatch.setattr(key_binding_utils.special, 'get_filename', lambda text: 'query.sql') + monkeypatch.setattr(key_binding_utils.special, 'get_editor_query', lambda text: 'select from text') + monkeypatch.setattr( + key_binding_utils.special, + 'open_external_editor', + lambda *, filename, sql: ('', 'editor failed'), + ) + + with pytest.raises(RuntimeError, match='editor failed'): + key_binding_utils.handle_editor_command(cast(Any, mycli), '\\eselect 1', None, lambda: 'loaded') + + +def test_handle_clip_command_returns_false_when_not_clip_command(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setattr(key_binding_utils.special, 'clip_command', lambda text: False) + + mycli = FakeMyCli() + + assert key_binding_utils.handle_clip_command(cast(Any, mycli), 'select 1') is False + + +def test_handle_clip_command_copies_explicit_query(monkeypatch: pytest.MonkeyPatch) -> None: + clipboard_calls: list[str] = [] + + def copy_query_to_clipboard(*, sql: str) -> None: + clipboard_calls.append(sql) + + monkeypatch.setattr(key_binding_utils.special, 'clip_command', lambda text: True) + monkeypatch.setattr(key_binding_utils.special, 'get_clip_query', lambda text: 'select 1') + monkeypatch.setattr( + key_binding_utils.special, + 'copy_query_to_clipboard', + copy_query_to_clipboard, + ) + + mycli = FakeMyCli() + + assert key_binding_utils.handle_clip_command(cast(Any, mycli), '\\clip select 1') is True + assert clipboard_calls == ['select 1'] + + +def test_handle_clip_command_uses_last_query_and_raises_on_clipboard_error(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setattr(key_binding_utils.special, 'clip_command', lambda text: True) + monkeypatch.setattr(key_binding_utils.special, 'get_clip_query', lambda text: '') + monkeypatch.setattr( + key_binding_utils.special, + 'copy_query_to_clipboard', + lambda *, sql: 'clipboard failed', + ) + + mycli = FakeMyCli() + + with pytest.raises(RuntimeError, match='clipboard failed'): + key_binding_utils.handle_clip_command(cast(Any, mycli), '\\clip') + + +def test_prettify_statement_returns_empty_string_for_empty_input() -> None: + mycli = FakeMyCli() + assert key_binding_utils.handle_prettify_binding(cast(Any, mycli), '') == '' + + +def test_unprettify_statement_returns_empty_string_for_empty_input() -> None: + mycli = FakeMyCli() + assert key_binding_utils.handle_unprettify_binding(cast(Any, mycli), '') == '' + + +@pytest.mark.parametrize( + ('handler_name', 'text'), + [ + ('handle_prettify_binding', 'SELECT 1;'), + ('handle_unprettify_binding', 'SELECT 1;'), + ], +) +def test_prettify_helpers_fall_back_to_input_without_trailing_semicolon_on_parse_error( + monkeypatch: pytest.MonkeyPatch, + handler_name: str, + text: str, +) -> None: + monkeypatch.setattr(key_binding_utils.sqlglot, 'parse', lambda *_args, **_kwargs: (_ for _ in ()).throw(ValueError('bad sql'))) + + handler = getattr(key_binding_utils, handler_name) + + mycli = FakeMyCli() + + assert handler(cast(Any, mycli), text) == 'SELECT 1' + + +@pytest.mark.parametrize( + ('handler_name', 'text'), + [ + ('handle_prettify_binding', 'SELECT 1; SELECT 2;'), + ('handle_unprettify_binding', 'SELECT 1; SELECT 2;'), + ], +) +def test_prettify_helpers_fall_back_when_parse_returns_multiple_statements( + monkeypatch: pytest.MonkeyPatch, + handler_name: str, + text: str, +) -> None: + monkeypatch.setattr(key_binding_utils.sqlglot, 'parse', lambda *_args, **_kwargs: [object(), object()]) + + handler = getattr(key_binding_utils, handler_name) + + mycli = FakeMyCli() + + assert handler(cast(Any, mycli), text) == 'SELECT 1; SELECT 2' diff --git a/test/pytests/test_key_bindings.py b/test/pytests/test_key_bindings.py index fa5d0351..dd169d09 100644 --- a/test/pytests/test_key_bindings.py +++ b/test/pytests/test_key_bindings.py @@ -6,6 +6,7 @@ import prompt_toolkit from prompt_toolkit.enums import EditingMode +from prompt_toolkit.key_binding.key_processor import KeyPressEvent from prompt_toolkit.keys import Keys from prompt_toolkit.layout.controls import BufferControl, SearchBufferControl from prompt_toolkit.selection import SelectionType @@ -40,6 +41,7 @@ class DummyBuffer: complete_state: object | None = None complete_next_calls: int = 0 cancel_completion_calls: int = 0 + open_in_editor_calls: list[bool] = field(default_factory=list) start_completion_calls: list[dict[str, bool]] = field(default_factory=list) start_selection_calls: list[SelectionType] = field(default_factory=list) transform_calls: list[tuple[int, int, Callable[[str], str]]] = field(default_factory=list) @@ -64,6 +66,9 @@ def cancel_completion(self) -> None: self.cancel_completion_calls += 1 self.complete_state = None + def open_in_editor(self, validate_and_handle: bool) -> None: + self.open_in_editor_calls.append(validate_and_handle) + def start_selection(self, selection_type: SelectionType) -> None: self.start_selection_calls.append(selection_type) @@ -197,6 +202,14 @@ def test_print_f1_help_prints_inline_help_and_docs_url(monkeypatch) -> None: ] +def test_edit_and_execute_opens_editor_without_validation() -> None: + event = make_event() + + key_bindings.edit_and_execute(cast(KeyPressEvent, event)) + + assert event.current_buffer.open_in_editor_calls == [False] + + @pytest.mark.parametrize('keys', ((Keys.F1,), (Keys.Escape, '[', 'P'))) def test_f1_bindings_open_docs_show_help_and_invalidate(monkeypatch, keys: tuple[str | Keys, ...]) -> None: mycli = DummyMyCli(DummyKeysConfig()) @@ -388,57 +401,42 @@ def test_control_space_supports_completion_behaviors( @pytest.mark.parametrize( - ('keys', 'text', 'handler_name'), + ('keys', 'handler_name'), ( - ((Keys.ControlX, 'p'), 'select 1', 'handle_prettify_binding'), - ((Keys.ControlX, 'u'), 'select 1', 'handle_unprettify_binding'), + ((Keys.ControlX, 'p'), 'handle_prettify_binding'), + ((Keys.ControlX, 'u'), 'handle_unprettify_binding'), ), ) -def test_prettify_bindings_transform_non_empty_text( +def test_prettify_bindings_transform_non_empty_buffer( monkeypatch, keys: tuple[str | Keys, ...], - text: str, handler_name: str, ) -> None: mycli = DummyMyCli(DummyKeysConfig(), key_bindings_mode='emacs') kb = key_bindings.mycli_bindings(mycli) - event = make_event(DummyBuffer(text=text)) + event = make_event(DummyBuffer(text='select 1')) event.app.editing_mode = EditingMode.EMACS patch_filter_app(monkeypatch, event.app) assert binding_filter(kb, *keys)() is True - inactive_event = make_event(DummyBuffer(text=text)) - inactive_event.app.editing_mode = EditingMode.VI - patch_filter_app(monkeypatch, inactive_event.app) - assert binding_filter(kb, *keys)() is False - - patch_filter_app(monkeypatch, event.app) - binding_handler(kb, *keys)(event) + assert len(event.app.current_buffer.transform_calls) == 1 start, end, handler = event.app.current_buffer.transform_calls[0] - assert (start, end) == (0, len(text)) - assert handler.__func__ is getattr(DummyMyCli, handler_name) + assert (start, end) == (0, len('select 1')) + assert handler.func is getattr(key_bindings.key_binding_utils, handler_name) + assert handler.args == (mycli,) -@pytest.mark.parametrize(('keys'), (((Keys.ControlX, 'p')), ((Keys.ControlX, 'u')))) -def test_prettify_bindings_ignore_empty_text(monkeypatch, keys: tuple[str | Keys, ...]) -> None: +@pytest.mark.parametrize('keys', ((Keys.ControlX, 'p'), (Keys.ControlX, 'u'))) +def test_prettify_bindings_skip_empty_buffer(monkeypatch, keys: tuple[str | Keys, ...]) -> None: mycli = DummyMyCli(DummyKeysConfig(), key_bindings_mode='emacs') kb = key_bindings.mycli_bindings(mycli) event = make_event(DummyBuffer(text='')) event.app.editing_mode = EditingMode.EMACS patch_filter_app(monkeypatch, event.app) - assert binding_filter(kb, *keys)() is True - - inactive_event = make_event(DummyBuffer(text='')) - inactive_event.app.editing_mode = EditingMode.VI - patch_filter_app(monkeypatch, inactive_event.app) - assert binding_filter(kb, *keys)() is False - - patch_filter_app(monkeypatch, event.app) - binding_handler(kb, *keys)(event) assert event.app.current_buffer.transform_calls == [] @@ -465,12 +463,12 @@ def test_date_and_datetime_bindings_insert_shortcuts( patch_filter_app(monkeypatch, event.app) monkeypatch.setattr( - key_bindings.shortcuts, + key_bindings.key_binding_utils, 'server_date', lambda _sqlexecute, quoted=False: "'DATE'" if quoted else 'DATE', ) monkeypatch.setattr( - key_bindings.shortcuts, + key_bindings.key_binding_utils, 'server_datetime', lambda _sqlexecute, quoted=False: "'DATETIME'" if quoted else 'DATETIME', ) diff --git a/test/pytests/test_main.py b/test/pytests/test_main.py index 67889761..3af76d21 100644 --- a/test/pytests/test_main.py +++ b/test/pytests/test_main.py @@ -805,20 +805,6 @@ def test_list_dsn(monkeypatch): print(f"An error occurred while attempting to delete the file: {e}") -def test_prettify_statement(): - statement = "SELECT 1" - m = MyCli() - pretty_statement = m.handle_prettify_binding(statement) - assert pretty_statement == "SELECT\n 1;" - - -def test_unprettify_statement(): - statement = "SELECT\n 1" - m = MyCli() - unpretty_statement = m.handle_unprettify_binding(statement) - assert unpretty_statement == "SELECT 1;" - - def test_list_ssh_config(): runner = CliRunner() # keep Windows from locking the file with delete=False diff --git a/test/pytests/test_main_regression.py b/test/pytests/test_main_regression.py index 40350077..f12bd1a5 100644 --- a/test/pytests/test_main_regression.py +++ b/test/pytests/test_main_regression.py @@ -31,7 +31,8 @@ import pymysql import pytest -from mycli import main +from mycli import key_bindings, main +from mycli.packages import key_binding_utils from mycli.packages.sqlresult import SQLResult @@ -1033,59 +1034,32 @@ def __int__(self) -> int: assert any('Invalid port number' in msg for msg in echo_calls) -def test_handle_editor_clip_prettify_unprettify_and_output_timing(monkeypatch: pytest.MonkeyPatch) -> None: +def test_handle_editor_clip_and_output_timing(monkeypatch: pytest.MonkeyPatch) -> None: cli = make_bare_mycli() - monkeypatch.setattr(main, 'PromptSession', FakePromptSession) + monkeypatch.setattr(key_binding_utils, 'PromptSession', FakePromptSession) cli.prompt_app = cast(Any, FakePromptSession(responses=[KeyboardInterrupt(), 'edited sql'])) cli.get_last_query = lambda: 'last query' # type: ignore[assignment] monkeypatch.setattr(main.special, 'editor_command', lambda text: text.endswith(r'\e')) monkeypatch.setattr(main.special, 'get_filename', lambda text: 'query.sql') monkeypatch.setattr(main.special, 'get_editor_query', lambda text: 'select 1') monkeypatch.setattr(main.special, 'open_external_editor', lambda filename, sql: ('edited sql', None)) - assert main.MyCli.handle_editor_command(cli, r'select 1\e', None, lambda: None) == 'edited sql' + assert key_binding_utils.handle_editor_command(cli, r'select 1\e', None, lambda: None) == 'edited sql' monkeypatch.setattr(main.special, 'open_external_editor', lambda filename, sql: ('', 'boom')) with pytest.raises(RuntimeError, match='boom'): - main.MyCli.handle_editor_command(cli, r'select 1\e', None, lambda: None) + key_binding_utils.handle_editor_command(cli, r'select 1\e', None, lambda: None) monkeypatch.setattr(main.special, 'clip_command', lambda text: True) monkeypatch.setattr(main.special, 'get_clip_query', lambda text: None) monkeypatch.setattr(main.special, 'copy_query_to_clipboard', lambda sql: None) - assert main.MyCli.handle_clip_command(cli, r'select 1\clip') is True + assert key_binding_utils.handle_clip_command(cli, r'select 1\clip') is True monkeypatch.setattr(main.special, 'copy_query_to_clipboard', lambda sql: 'clipboard failed') with pytest.raises(RuntimeError, match='clipboard failed'): - main.MyCli.handle_clip_command(cli, r'select 1\clip') + key_binding_utils.handle_clip_command(cli, r'select 1\clip') monkeypatch.setattr(main.special, 'clip_command', lambda text: False) - assert main.MyCli.handle_clip_command(cli, 'select 1') is False - - class FakeStatement: - def __init__(self, rendered: str) -> None: - self.rendered = rendered - - def sql(self, **kwargs: Any) -> str: - return self.rendered - - monkeypatch.setattr( - main.sqlglot, - 'parse', - lambda text, read: [ - FakeStatement('SELECT\n 1'), - ], - ) - assert main.MyCli.handle_prettify_binding(cli, 'select 1') == 'SELECT\n 1;' - - monkeypatch.setattr(main.sqlglot, 'parse', lambda text, read: []) - assert main.MyCli.handle_prettify_binding(cli, 'select 1;') == 'select 1' - assert cli.toolbar_error_message == 'Prettify failed to parse single statement' - - monkeypatch.setattr(main.sqlglot, 'parse', lambda text, read: [FakeStatement('SELECT 1')]) - assert main.MyCli.handle_unprettify_binding(cli, 'SELECT\n 1;') == 'SELECT 1;' - - monkeypatch.setattr(main.sqlglot, 'parse', lambda text, read: []) - assert main.MyCli.handle_unprettify_binding(cli, 'SELECT 1;') == 'SELECT 1' - assert cli.toolbar_error_message == 'Unprettify failed to parse single statement' + assert key_binding_utils.handle_clip_command(cli, 'select 1') is False printed: list[tuple[Any, Any]] = [] monkeypatch.setattr(main, 'print_formatted_text', lambda text, style=None: printed.append((text, style))) @@ -1093,18 +1067,6 @@ def sql(self, **kwargs: Any) -> str: assert printed[-1][1] == cli.ptoolkit_style -def test_prettify_unprettify_empty_and_parse_error_branches(monkeypatch: pytest.MonkeyPatch) -> None: - cli = make_bare_mycli() - assert main.MyCli.handle_prettify_binding(cli, '') == '' - assert main.MyCli.handle_unprettify_binding(cli, '') == '' - - monkeypatch.setattr(main.sqlglot, 'parse', lambda text, read: (_ for _ in ()).throw(ValueError('parse failed'))) - assert main.MyCli.handle_prettify_binding(cli, 'select 1;') == 'select 1' - assert cli.toolbar_error_message == 'Prettify failed to parse single statement' - assert main.MyCli.handle_unprettify_binding(cli, 'select 1;') == 'select 1' - assert cli.toolbar_error_message == 'Unprettify failed to parse single statement' - - def test_format_sqlresult_run_query_reserved_space_and_last_query(monkeypatch: pytest.MonkeyPatch) -> None: cli = make_bare_mycli() cli.main_formatter = DummyFormatter() @@ -1593,7 +1555,7 @@ class ErrorNoCode(click.ClickException): current_buffer=SimpleNamespace(open_in_editor=lambda validate_and_handle=False: opened.append(validate_and_handle)) ), ) - main.edit_and_execute(event) + key_bindings.edit_and_execute(event) assert opened == [False] @@ -2315,124 +2277,6 @@ def fake_create_toolbar_tokens(mycli: Any, show_help: Any, fmt: str) -> str: assert echoed[-1] == 'Goodbye!' -def test_run_cli_watch_keepalive_editor_clip_redirect_and_destructive_paths(monkeypatch: pytest.MonkeyPatch) -> None: - cli = make_bare_mycli() - cli.config = {'history_file': '~/.mycli-history-testing'} - cli.keepalive_ticks = 1 - cli.less_chatty = True - cli.prompt_app = None - cli.destructive_warning = True - cli.destructive_keywords = ['drop'] - cli.logfile = False - echoes: list[str] = [] - cli.echo = lambda message, **kwargs: echoes.append(str(message)) # type: ignore[assignment] - cli.log_query = lambda text: None # type: ignore[assignment] - cli.log_output = lambda text: None # type: ignore[assignment] - cli.set_all_external_titles = lambda: None # type: ignore[assignment] - - def raise_keyboard_output(formatted: Any, result: Any, is_warnings_style: bool = False) -> None: - raise KeyboardInterrupt() - - def raise_keyboard_timing(timing: str, is_warnings_style: bool = False) -> None: - raise KeyboardInterrupt() - - cli.output = raise_keyboard_output # type: ignore[assignment] - cli.output_timing = raise_keyboard_timing # type: ignore[assignment] - cli.format_sqlresult = lambda result, **kwargs: iter(['formatted']) # type: ignore[assignment] - prompt_responses = ['editor boom', 'clip boom', 'clip ok', 'redirect bad', 'drop yes', 'drop no', 'watch bad', EOFError()] - - class HookPromptSession(FakePromptSession): - def prompt(self, **kwargs: Any) -> str: - inputhook = kwargs.get('inputhook') - if inputhook is not None: - inputhook(None) - inputhook(None) - return super().prompt(**kwargs) - - prompt_session = HookPromptSession(responses=prompt_responses) - ping_calls: list[bool] = [] - - class PingConnection: - def ping(self, reconnect: bool = False) -> None: - ping_calls.append(reconnect) - raise RuntimeError('ping fail') - - class FakeRunSQLExecute: - def __init__(self) -> None: - self.server_info = SimpleNamespace(species=SimpleNamespace(name='MySQL')) - self.dbname = 'db' - self.connection_id = 0 - self.conn = PingConnection() - - def run(self, text: str) -> Iterator[SQLResult]: - if text == 'watch bad': - cli.prompt_app = None - return iter([ - SQLResult(status='watch', command={'name': 'watch', 'seconds': '1'}), - SQLResult(status='watch', command={'name': 'watch', 'seconds': 'bad'}), - ]) - return iter([SQLResult(status='ok', rows=[(1,)])]) - - monkeypatch.setattr(main, 'SQLExecute', FakeRunSQLExecute) - cli.sqlexecute = cast(Any, FakeRunSQLExecute()) - monkeypatch.setattr(main, 'PromptSession', lambda **kwargs: prompt_session) - monkeypatch.setattr(main, 'mycli_bindings', lambda mycli: 'bindings') - monkeypatch.setattr(main, 'create_toolbar_tokens_func', lambda *args: 'toolbar') - monkeypatch.setattr(main, 'style_factory_ptoolkit', lambda *args, **kwargs: 'style') - monkeypatch.setattr(main, 'dir_path_exists', lambda path: True) - monkeypatch.setattr(main, 'cli_is_multiline', lambda mycli: False) - monkeypatch.setattr(main.special, 'set_expanded_output', lambda value: None) - monkeypatch.setattr(main.special, 'set_forced_horizontal_output', lambda value: None) - monkeypatch.setattr(main.special, 'is_llm_command', lambda text: False) - monkeypatch.setattr(main.special, 'is_expanded_output', lambda: False) - monkeypatch.setattr(main.special, 'is_redirected', lambda: False) - monkeypatch.setattr(main.special, 'is_timing_enabled', lambda: True) - monkeypatch.setattr(main.special, 'write_tee', lambda *args, **kwargs: None) - monkeypatch.setattr(main.special, 'unset_once_if_written', lambda *args, **kwargs: None) - monkeypatch.setattr(main.special, 'flush_pipe_once_if_written', lambda *args, **kwargs: None) - monkeypatch.setattr(main.special, 'close_tee', lambda: None) - monkeypatch.setattr(main, 'is_dropping_database', lambda text, dbname: False) - monkeypatch.setattr(main, 'need_completion_refresh', lambda text: False) - monkeypatch.setattr(main, 'confirm', lambda text: False) - monkeypatch.setattr(main, 'time', iter([0.0, 2.0, 3.0, 4.0, 5.0, 6.0]).__next__) - - def fake_editor(text: str, inputhook: Any, loaded_message_fn: Any) -> str: - if text == 'editor boom': - raise RuntimeError('editor failed') - return text - - cli.handle_editor_command = fake_editor # type: ignore[assignment] - - def fake_handle_clip(text: str) -> bool: - if text == 'clip boom': - raise RuntimeError('clip failed') - return text == 'clip ok' - - cli.handle_clip_command = fake_handle_clip # type: ignore[assignment] - monkeypatch.setattr(main, 'is_redirect_command', lambda text: text == 'redirect bad') - monkeypatch.setattr(main, 'get_redirect_components', lambda text: ('sql', '>', '>', '/tmp/out')) - - def fake_set_redirect(*args: Any) -> None: - raise RuntimeError('redirect failed') - - monkeypatch.setattr(main.special, 'set_redirect', fake_set_redirect) - monkeypatch.setattr( - main, - 'confirm_destructive_query', - lambda keywords, text: True if text == 'drop yes' else (False if text == 'drop no' else None), - ) - with pytest.raises(SystemExit): - main.MyCli.run_cli(cli) - assert ping_calls - assert any('editor failed' in line for line in echoes) - assert any('clip failed' in line for line in echoes) - assert 'Your call!' in echoes - assert 'Wise choice!' in echoes - assert any('redirect failed' in line for line in echoes) - assert any('Invalid watch sleep time provided' in line for line in echoes) - assert any('Warning: This query was not logged.' in line for line in echoes) - - def test_run_cli_llm_paths_and_finish_iteration(monkeypatch: pytest.MonkeyPatch) -> None: cli = make_bare_mycli() cli.config = {'history_file': '~/.mycli-history-testing'} diff --git a/test/pytests/test_shortcuts.py b/test/pytests/test_shortcuts.py deleted file mode 100644 index ac90ea15..00000000 --- a/test/pytests/test_shortcuts.py +++ /dev/null @@ -1,26 +0,0 @@ -import datetime -from typing import Any, cast - -from mycli.packages import shortcuts - - -class FakeSQLExecute: - def __init__(self, now_value: datetime.datetime) -> None: - self.now_value = now_value - - def now(self) -> datetime.datetime: - return self.now_value - - -def test_server_date_returns_quoted_and_unquoted_values() -> None: - sqlexecute = FakeSQLExecute(datetime.datetime(2026, 4, 3, 14, 5, 6)) - - assert shortcuts.server_date(cast(Any, sqlexecute)) == '2026-04-03' - assert shortcuts.server_date(cast(Any, sqlexecute), quoted=True) == "'2026-04-03'" - - -def test_server_datetime_returns_quoted_and_unquoted_values() -> None: - sqlexecute = FakeSQLExecute(datetime.datetime(2026, 4, 3, 14, 5, 6)) - - assert shortcuts.server_datetime(cast(Any, sqlexecute)) == '2026-04-03 14:05:06' - assert shortcuts.server_datetime(cast(Any, sqlexecute), quoted=True) == "'2026-04-03 14:05:06'"