Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ Internal
* Skip more tests when a database connection is not present.
* Move SQL utilities to a new `sql_utils.py`.
* Move CLI utilities to a new `cli_utils.py`.
* Move keybinding utilities to a new `key_binding_utils.py`.


1.67.1 (2026/03/28)
Expand Down
24 changes: 17 additions & 7 deletions mycli/key_bindings.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from functools import partial
import logging
import webbrowser

Expand All @@ -11,11 +12,12 @@
emacs_mode,
)
from prompt_toolkit.key_binding import KeyBindings
from prompt_toolkit.key_binding.bindings.named_commands import register as ptoolkit_register
from prompt_toolkit.key_binding.key_processor import KeyPressEvent
from prompt_toolkit.selection import SelectionType

from mycli.constants import DOCS_URL
from mycli.packages import shortcuts
from mycli.packages import key_binding_utils
from mycli.packages.ptoolkit.fzf import search_history
from mycli.packages.ptoolkit.utils import safe_invalidate_display

Expand Down Expand Up @@ -53,6 +55,14 @@ def print_f1_help():
app.print_text('\n')


@ptoolkit_register("edit-and-execute-command")
def edit_and_execute(event: KeyPressEvent) -> None:
"""Different from the prompt-toolkit default, we want to have a choice not
to execute a query after editing, hence validate_and_handle=False."""
buff = event.current_buffer
buff.open_in_editor(validate_and_handle=False)


def mycli_bindings(mycli) -> KeyBindings:
"""Custom key bindings for mycli."""
kb = KeyBindings()
Expand Down Expand Up @@ -207,7 +217,7 @@ def _(event: KeyPressEvent) -> None:

b = event.app.current_buffer
if b.text:
b.transform_region(0, len(b.text), mycli.handle_prettify_binding)
b.transform_region(0, len(b.text), partial(key_binding_utils.handle_prettify_binding, mycli))

@kb.add("c-x", "u", filter=emacs_mode)
def _(event: KeyPressEvent) -> None:
Expand All @@ -220,7 +230,7 @@ def _(event: KeyPressEvent) -> None:

b = event.app.current_buffer
if b.text:
b.transform_region(0, len(b.text), mycli.handle_unprettify_binding)
b.transform_region(0, len(b.text), partial(key_binding_utils.handle_unprettify_binding, mycli))

@kb.add("c-o", "d", filter=emacs_mode)
def _(event: KeyPressEvent) -> None:
Expand All @@ -229,7 +239,7 @@ def _(event: KeyPressEvent) -> None:
"""
_logger.debug("Detected <C-o d> key.")

event.app.current_buffer.insert_text(shortcuts.server_date(mycli.sqlexecute))
event.app.current_buffer.insert_text(key_binding_utils.server_date(mycli.sqlexecute))

@kb.add("c-o", "c-d", filter=emacs_mode)
def _(event: KeyPressEvent) -> None:
Expand All @@ -238,7 +248,7 @@ def _(event: KeyPressEvent) -> None:
"""
_logger.debug("Detected <C-o C-d> key.")

event.app.current_buffer.insert_text(shortcuts.server_date(mycli.sqlexecute, quoted=True))
event.app.current_buffer.insert_text(key_binding_utils.server_date(mycli.sqlexecute, quoted=True))

@kb.add("c-o", "t", filter=emacs_mode)
def _(event: KeyPressEvent) -> None:
Expand All @@ -247,7 +257,7 @@ def _(event: KeyPressEvent) -> None:
"""
_logger.debug("Detected <C-o t> key.")

event.app.current_buffer.insert_text(shortcuts.server_datetime(mycli.sqlexecute))
event.app.current_buffer.insert_text(key_binding_utils.server_datetime(mycli.sqlexecute))

@kb.add("c-o", "c-t", filter=emacs_mode)
def _(event: KeyPressEvent) -> None:
Expand All @@ -256,7 +266,7 @@ def _(event: KeyPressEvent) -> None:
"""
_logger.debug("Detected <C-o C-t> key.")

event.app.current_buffer.insert_text(shortcuts.server_datetime(mycli.sqlexecute, quoted=True))
event.app.current_buffer.insert_text(key_binding_utils.server_datetime(mycli.sqlexecute, quoted=True))

@kb.add("c-r", filter=control_is_searchable)
def _(event: KeyPressEvent) -> None:
Expand Down
115 changes: 8 additions & 107 deletions mycli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import sys
import threading
import traceback
from typing import IO, Any, Callable, Generator, Iterable, Literal
from typing import IO, Any, Generator, Iterable, Literal

try:
from pwd import getpwuid
Expand Down Expand Up @@ -50,8 +50,6 @@
to_formatted_text,
to_plain_text,
)
from prompt_toolkit.key_binding.bindings.named_commands import register as prompt_register
from prompt_toolkit.key_binding.key_processor import KeyPressEvent
from prompt_toolkit.layout.processors import ConditionalProcessor, HighlightMatchingBracketProcessor
from prompt_toolkit.lexers import PygmentsLexer
from prompt_toolkit.output import ColorDepth
Expand All @@ -60,7 +58,6 @@
from pymysql.constants.CR import CR_SERVER_LOST
from pymysql.constants.ER import ACCESS_DENIED_ERROR, HANDSHAKE_ERROR
from pymysql.cursors import Cursor
import sqlglot
import sqlparse

from mycli import __version__
Expand Down Expand Up @@ -93,6 +90,10 @@
from mycli.packages.cli_utils import filtered_sys_argv, is_valid_connection_scheme
from mycli.packages.filepaths import dir_path_exists, guess_socket_location
from mycli.packages.hybrid_redirection import get_redirect_components, is_redirect_command
from mycli.packages.key_binding_utils import (
handle_clip_command,
handle_editor_command,
)
from mycli.packages.prompt_utils import confirm, confirm_destructive_query
from mycli.packages.ptoolkit.history import FileHistoryWithTimestamp
from mycli.packages.special.favoritequeries import FavoriteQueries
Expand Down Expand Up @@ -871,99 +872,6 @@ def _connect(
self.echo(str(e), err=True, fg="red")
sys.exit(1)

def handle_editor_command(
self,
text: str,
inputhook: Callable | None,
loaded_message_fn: Callable,
) -> str:
r"""Editor command is any query that is prefixed or suffixed by a '\e'.
The reason for a while loop is because a user might edit a query
multiple times. For eg:

"select * from \e"<enter> to edit it in vim, then come
back to the prompt with the edited query "select * from
blah where q = 'abc'\e" to edit it again.
:param text: Document
:return: Document

"""

while special.editor_command(text):
filename = special.get_filename(text)
query = special.get_editor_query(text) or self.get_last_query()
sql, message = special.open_external_editor(filename=filename, sql=query)
if message:
# Something went wrong. Raise an exception and bail.
raise RuntimeError(message)
while True:
try:
assert isinstance(self.prompt_app, PromptSession)
text = self.prompt_app.prompt(
default=sql,
inputhook=inputhook,
message=loaded_message_fn,
)
break
except KeyboardInterrupt:
sql = ""

continue
return text

def handle_clip_command(self, text: str) -> bool:
r"""A clip command is any query that is prefixed or suffixed by a
'\clip'.

:param text: Document
:return: Boolean

"""

if special.clip_command(text):
query = special.get_clip_query(text) or self.get_last_query()
message = special.copy_query_to_clipboard(sql=query)
if message:
raise RuntimeError(message)
return True
return False

def handle_prettify_binding(self, text: str) -> str:
if not text:
return ''
try:
statements = sqlglot.parse(text, read='mysql')
except Exception:
statements = []
if len(statements) == 1 and statements[0]:
parse_succeeded = True
pretty_text = statements[0].sql(pretty=True, pad=4, dialect='mysql')
else:
parse_succeeded = False
pretty_text = text.rstrip(';')
self.toolbar_error_message = 'Prettify failed to parse single statement'
if pretty_text and parse_succeeded:
pretty_text = pretty_text + ';'
return pretty_text

def handle_unprettify_binding(self, text: str) -> str:
if not text:
return ''
try:
statements = sqlglot.parse(text, read='mysql')
except Exception:
statements = []
if len(statements) == 1 and statements[0]:
parse_succeeded = True
unpretty_text = statements[0].sql(pretty=False, dialect='mysql')
else:
parse_succeeded = False
unpretty_text = text.rstrip(';')
self.toolbar_error_message = 'Unprettify failed to parse single statement'
if unpretty_text and parse_succeeded:
unpretty_text = unpretty_text + ';'
return unpretty_text

def output_timing(self, timing: str, is_warnings_style: bool = False) -> None:
self.log_output(timing)
add_style = 'class:warnings.timing' if is_warnings_style else 'class:output.timing'
Expand Down Expand Up @@ -1168,7 +1076,8 @@ def one_iteration(text: str | None = None) -> None:
special.set_forced_horizontal_output(False)

try:
text = self.handle_editor_command(
text = handle_editor_command(
self,
text,
inputhook,
loaded_message_fn,
Expand All @@ -1180,7 +1089,7 @@ def one_iteration(text: str | None = None) -> None:
return

try:
if self.handle_clip_command(text):
if handle_clip_command(self, text):
return
except RuntimeError as e:
logger.error("sql: %r, error: %r", text, e)
Expand Down Expand Up @@ -2698,14 +2607,6 @@ def tips_picker() -> str:
return choice(tips) if tips else r'\? or "help" for help!'


@prompt_register("edit-and-execute-command")
def edit_and_execute(event: KeyPressEvent) -> None:
"""Different from the prompt-toolkit default, we want to have a choice not
to execute a query after editing, hence validate_and_handle=False."""
buff = event.current_buffer
buff.open_in_editor(validate_and_handle=False)


def main() -> int | None:
try:
result = click_entrypoint.main(
Expand Down
Loading
Loading