diff --git a/CHANGELOG.md b/CHANGELOG.md index 3c7b25e6..c737eff3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,7 +5,8 @@ See Git checking messages for full history. ## 10.1.1.dev0 (2025-xx-xx) - Linux: check the server for Xrandr support version (#417) - Linux: improve typing and error messages for X libraries (#418) -- Linux: add a new XCB backend for better thread safety, error-checking, and future development (#425) +- Linux: introduce an XCB-powered backend stack with a factory in ``mss.linux`` while keeping the Xlib code as a fallback (#425) +- Linux: add the XShmGetImage backend with automatic XGetImage fallback and explicit status reporting (#431) - :heart: contributors: @jholveck ## 10.1.0 (2025-08-16) diff --git a/CHANGES.md b/CHANGES.md index 231568e3..1f9ef079 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,5 +1,19 @@ # Technical Changes +## 10.1.1 (2025-xx-xx) + +### linux/__init__.py +- Added an ``mss()`` factory to select between the different GNU/Linux backends. + +### linux/xlib.py +- Moved the legacy Xlib backend into the ``mss.linux.xlib`` module to be used as a fallback implementation. + +### linux/xgetimage.py +- Added an XCB-based backend that mirrors XGetImage semantics. + +### linux/xshmgetimage.py +- Added an XCB backend powered by XShmGetImage with ``shm_status`` and ``shm_fallback_reason`` attributes for diagnostics. + ## 10.1.0 (2025-08-16) ### darwin.py diff --git a/docs/source/api.rst b/docs/source/api.rst index b1d87f4b..655aabaf 100644 --- a/docs/source/api.rst +++ b/docs/source/api.rst @@ -33,6 +33,36 @@ GNU/Linux .. module:: mss.linux +Factory function to return the appropriate backend implementation. + +.. function:: mss(backend="default", **kwargs) + + :keyword str backend: Backend name ("default", "xlib", "xgetimage", or "xshmgetimage"). + :keyword display: Display name (e.g., ":0.0") for the X server. Default is taken from the :envvar:`DISPLAY` environment variable. + :type display: str or None + :param kwargs: Additional arguments passed to the backend MSS class. + :rtype: :class:`mss.base.MSSBase` + :return: Backend-specific MSS instance. + + Factory returning a proper MSS class instance for GNU/Linux. + The backend parameter selects the implementation: + + - "default" or "xshmgetimage": XCB-based backend using XShmGetImage (default, with automatic fallback to XGetImage) + - "xgetimage": XCB-based backend using XGetImage + - "xlib": Traditional Xlib-based backend retained for environments without working XCB libraries + +.. function:: MSS(*args, **kwargs) + + Alias for :func:`mss` for backward compatibility. + + +Xlib Backend +^^^^^^^^^^^^ + +.. module:: mss.linux.xlib + +Legacy Xlib-based backend, kept as a fallback when XCB is unavailable. + .. attribute:: CFUNCTIONS .. versionadded:: 6.1.0 @@ -79,6 +109,55 @@ GNU/Linux .. versionadded:: 8.0.0 + +XGetImage Backend +^^^^^^^^^^^^^^^^^ + +.. module:: mss.linux.xgetimage + +XCB-based backend using XGetImage protocol. + +.. class:: MSS + + XCB implementation using XGetImage for screenshot capture. + + +XShmGetImage Backend +^^^^^^^^^^^^^^^^^^^^ + +.. module:: mss.linux.xshmgetimage + +XCB-based backend using XShmGetImage protocol with shared memory. + +.. class:: ShmStatus + + Enum describing the availability of the X11 MIT-SHM extension used by the backend. + + .. attribute:: UNKNOWN + + Initial state before any capture confirms availability or failure. + + .. attribute:: AVAILABLE + + Shared-memory capture works and will continue to be used. + + .. attribute:: UNAVAILABLE + + Shared-memory capture failed; MSS will use XGetImage. + +.. class:: MSS + + XCB implementation using XShmGetImage for screenshot capture. + Falls back to XGetImage if shared memory extension is unavailable. + + .. attribute:: shm_status + + Current shared-memory availability, using :class:`mss.linux.xshmgetimage.ShmStatus`. + + .. attribute:: shm_fallback_reason + + Optional string describing why the backend fell back to XGetImage when MIT-SHM is unavailable. + Windows ------- diff --git a/docs/source/developers.rst b/docs/source/developers.rst index 3dfe19bc..15609d0a 100644 --- a/docs/source/developers.rst +++ b/docs/source/developers.rst @@ -50,3 +50,12 @@ To build the documentation, simply type:: $ python -m pip install -e '.[docs]' $ sphinx-build -d docs docs/source docs_out --color -W -bhtml + + +XCB Code Generator +================== + +The GNU/Linux XCB backends rely on generated ctypes bindings. If you need to +add new XCB requests or types, do **not** edit ``src/mss/linux/xcbgen.py`` by +hand. Instead, follow the workflow described in ``src/xcbproto/README.md``, +which explains how to update ``gen_xcb_to_py.py`` and regenerate the bindings. diff --git a/docs/source/examples.rst b/docs/source/examples.rst index 7bb8157b..00920890 100644 --- a/docs/source/examples.rst +++ b/docs/source/examples.rst @@ -103,6 +103,15 @@ You can handle data using a custom class: .. versionadded:: 3.1.0 +GNU/Linux XShm backend +---------------------- + +Select the XShmGetImage backend explicitly and inspect whether it is active or +falling back to XGetImage: + +.. literalinclude:: examples/linux_xshm_backend.py + :lines: 7- + PIL === diff --git a/docs/source/examples/linux_xshm_backend.py b/docs/source/examples/linux_xshm_backend.py new file mode 100644 index 00000000..3d2f22bf --- /dev/null +++ b/docs/source/examples/linux_xshm_backend.py @@ -0,0 +1,17 @@ +"""This is part of the MSS Python's module. +Source: https://github.com/BoboTiG/python-mss. + +Select the XShmGetImage backend explicitly and inspect its status. +""" + +from mss.linux.xshmgetimage import MSS as mss + +with mss() as sct: + screenshot = sct.grab(sct.monitors[1]) + print(f"Captured screenshot dimensions: {screenshot.size.width}x{screenshot.size.height}") + + print(f"shm_status: {sct.shm_status.name}") + if sct.shm_fallback_reason: + print(f"Falling back to XGetImage because: {sct.shm_fallback_reason}") + else: + print("MIT-SHM capture active.") diff --git a/docs/source/usage.rst b/docs/source/usage.rst index 4e105a8b..b9850dde 100644 --- a/docs/source/usage.rst +++ b/docs/source/usage.rst @@ -5,7 +5,7 @@ Usage Import ====== -So MSS can be used as simply as:: +MSS can be used as simply as:: from mss import mss @@ -20,6 +20,11 @@ Or import the good one based on your operating system:: # Microsoft Windows from mss.windows import MSS as mss +On GNU/Linux you can also import a specific backend (see :ref:`backends`) +directly when you need a particular implementation, for example:: + + from mss.linux.xshmgetimage import MSS as mss + Instance ======== @@ -49,18 +54,56 @@ This is a much better usage, memory efficient:: Also, it is a good thing to save the MSS instance inside an attribute of your class and calling it when needed. +.. _backends: + +Backends +-------- + +Some platforms have multiple ways to take screenshots. In MSS, these are known as *backends*. The :py:func:`mss` functions will normally autodetect which one is appropriate for your situation, but you can override this if you want. For instance, you may know that your specific situation requires a particular backend. + +If you want to choose a particular backend, you can use the :py::`backend` keyword to :py:func:`mss`:: + + with mss(backend="xgetimage") as sct: + ... + +Alternatively, you can also directly import the backend you want to use:: + + from mss.linux.xgetimage import MSS as mss + +Currently, only the GNU/Linux implementation has multiple backends. These are described in their own section below. + + GNU/Linux --------- -On GNU/Linux, you can specify which display to use (useful for distant screenshots via SSH):: - - with mss(display=":0.0") as sct: - # ... +Display +^^^^^^^ -A more specific example (only valid on GNU/Linux): +On GNU/Linux, the default display is taken from the :envvar:`DISPLAY` environment variable. You can instead specify which display to use (useful for distant screenshots via SSH) using the ``display`` keyword: .. literalinclude:: examples/linux_display_keyword.py - :lines: 9- + :lines: 7- + + +Backends +^^^^^^^^ + +The GNU/Linux implementation has multiple backends (see :ref:`backends`), or ways it can take screenshots. The :py:func:`mss.mss` and :py:func:`mss.linux.mss` functions will normally autodetect which one is appropriate, but you can override this if you want. + +There are three available backends. + +:py:mod:`xshmgetimage` (default) + The fastest backend, based on :c:func:`xcb_shm_get_image`. It is roughly three times faster than :py:mod:`xgetimage` + and is used automatically. When the MIT-SHM extension is unavailable (for example on remote SSH displays), it + transparently falls back to :py:mod:`xgetimage` so you can always request it safely. + +:py:mod:`xgetimage` + A highly-compatible, but slower, backend based on :c:func:`xcb_get_image`. Use this explicitly only when you know + that :py:mod:`xshmgetimage` cannot operate in your environment. + +:py:mod:`xlib` + The legacy backend powered by :c:func:`XGetImage`. It is kept solely for systems where XCB libraries are + unavailable and no new features are being added to it. Command Line @@ -73,8 +116,8 @@ You can use ``mss`` via the CLI:: Or via direct call from Python:: $ python -m mss --help - usage: __main__.py [-h] [-c COORDINATES] [-l {0,1,2,3,4,5,6,7,8,9}] - [-m MONITOR] [-o OUTPUT] [-q] [-v] [--with-cursor] + usage: mss [-h] [-c COORDINATES] [-l {0,1,2,3,4,5,6,7,8,9}] [-m MONITOR] + [-o OUTPUT] [--with-cursor] [-q] [-b BACKEND] [-v] options: -h, --help show this help message and exit @@ -86,6 +129,9 @@ Or via direct call from Python:: the monitor to screenshot -o OUTPUT, --output OUTPUT the output file name + -b, --backend BACKEND + platform-specific backend to use + (Linux: default/xlib/xgetimage/xshmgetimage; macOS/Windows: default) --with-cursor include the cursor -q, --quiet do not print created files -v, --version show program's version number and exit diff --git a/pyproject.toml b/pyproject.toml index f51a7b38..3b4b0223 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -179,6 +179,7 @@ ignore = [ "docs/source/*" = [ "ERA001", # commented code "INP001", # file `xxx` is part of an implicit namespace package + "N811", # importing constant (MSS) as non-constant (mss) ] "src/tests/*" = [ "FBT001", # boolean-typed positional argument in function definition diff --git a/src/mss/__main__.py b/src/mss/__main__.py index 384ad344..7d884fc6 100644 --- a/src/mss/__main__.py +++ b/src/mss/__main__.py @@ -3,8 +3,9 @@ """ import os.path +import platform import sys -from argparse import ArgumentParser +from argparse import ArgumentError, ArgumentParser from mss import __version__ from mss.exception import ScreenShotError @@ -12,9 +13,28 @@ from mss.tools import to_png +def _backend_cli_choices() -> list[str]: + os_name = platform.system().lower() + if os_name == "darwin": + from mss import darwin # noqa: PLC0415 + + return list(darwin.BACKENDS) + if os_name == "linux": + from mss import linux # noqa: PLC0415 + + return list(linux.BACKENDS) + if os_name == "windows": + from mss import windows # noqa: PLC0415 + + return list(windows.BACKENDS) + return ["default"] + + def main(*args: str) -> int: """Main logic.""" - cli_args = ArgumentParser(prog="mss") + backend_choices = _backend_cli_choices() + + cli_args = ArgumentParser(prog="mss", exit_on_error=False) cli_args.add_argument( "-c", "--coordinates", @@ -40,9 +60,19 @@ def main(*args: str) -> int: action="store_true", help="do not print created files", ) + cli_args.add_argument( + "-b", "--backend", default="default", choices=backend_choices, help="platform-specific backend to use" + ) cli_args.add_argument("-v", "--version", action="version", version=__version__) - options = cli_args.parse_args(args or None) + try: + options = cli_args.parse_args(args or None) + except ArgumentError as e: + # By default, parse_args will print and the error and exit. We + # return instead of exiting, to make unit testing easier. + cli_args.print_usage(sys.stderr) + print(f"{cli_args.prog}: error: {e}", file=sys.stderr) + return 2 kwargs = {"mon": options.monitor, "output": options.output} if options.coordinates: try: @@ -61,7 +91,7 @@ def main(*args: str) -> int: kwargs["output"] = "sct-{top}x{left}_{width}x{height}.png" try: - with mss(with_cursor=options.with_cursor) as sct: + with mss(with_cursor=options.with_cursor, backend=options.backend) as sct: if options.coordinates: output = kwargs["output"].format(**kwargs["mon"]) sct_img = sct.grab(kwargs["mon"]) diff --git a/src/mss/base.py b/src/mss/base.py index 0a6c443a..2abfcbd3 100644 --- a/src/mss/base.py +++ b/src/mss/base.py @@ -18,6 +18,15 @@ from mss.models import Monitor, Monitors + # Prior to 3.11, Python didn't have the Self type. typing_extensions does, but we don't want to depend on it. + try: + from typing import Self + except ImportError: # pragma: nocover + try: + from typing_extensions import Self + except ImportError: # pragma: nocover + Self = Any # type: ignore[assignment] + try: from datetime import UTC except ImportError: # pragma: nocover @@ -58,7 +67,7 @@ def __init__( msg = 'The only valid backend on this platform is "default".' raise ScreenShotError(msg) - def __enter__(self) -> MSSBase: # noqa:PYI034 + def __enter__(self) -> Self: """For the cool call `with MSS() as mss:`.""" return self diff --git a/src/mss/darwin.py b/src/mss/darwin.py index f001398b..5d723b98 100644 --- a/src/mss/darwin.py +++ b/src/mss/darwin.py @@ -20,6 +20,8 @@ __all__ = ("MSS",) +BACKENDS = ["default"] + MAC_VERSION_CATALINA = 10.16 kCGWindowImageBoundsIgnoreFraming = 1 << 0 # noqa: N816 diff --git a/src/mss/factory.py b/src/mss/factory.py index 29cb3ed1..933310d7 100644 --- a/src/mss/factory.py +++ b/src/mss/factory.py @@ -29,7 +29,8 @@ def mss(**kwargs: Any) -> MSSBase: if os_ == "linux": from mss import linux # noqa: PLC0415 - return linux.MSS(**kwargs) + # Linux has its own factory to choose the backend. + return linux.mss(**kwargs) if os_ == "windows": from mss import windows # noqa: PLC0415 diff --git a/src/mss/linux/__init__.py b/src/mss/linux/__init__.py index 46993f1f..8426abfd 100644 --- a/src/mss/linux/__init__.py +++ b/src/mss/linux/__init__.py @@ -3,17 +3,36 @@ from mss.base import MSSBase from mss.exception import ScreenShotError +BACKENDS = ["default", "xlib", "xgetimage", "xshmgetimage"] + def mss(backend: str = "default", **kwargs: Any) -> MSSBase: + """Factory returning a proper MSS class instance. + + It examines the options provided, and chooses the most adapted MSS + class to take screenshots. It then proxies its arguments to the + class for instantiation. + + Currently, the only option used is the "backend" flag. Future + versions will look at other options as well. + """ backend = backend.lower() - if backend in {"default", "xlib"}: + if backend == "xlib": from . import xlib # noqa: PLC0415 return xlib.MSS(**kwargs) if backend == "xgetimage": from . import xgetimage # noqa: PLC0415 + # Note that the xshmgetimage backend will automatically fall back to XGetImage calls if XShmGetImage isn't + # available. The only reason to use the xgetimage backend is if the user already knows that XShmGetImage + # isn't going to be supported. return xgetimage.MSS(**kwargs) + if backend in {"default", "xshmgetimage"}: + from . import xshmgetimage # noqa: PLC0415 + + return xshmgetimage.MSS(**kwargs) + assert backend not in BACKENDS # noqa: S101 msg = f"Backend {backend!r} not (yet?) implemented." raise ScreenShotError(msg) diff --git a/src/mss/linux/base.py b/src/mss/linux/base.py new file mode 100644 index 00000000..489863ce --- /dev/null +++ b/src/mss/linux/base.py @@ -0,0 +1,267 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +from mss.base import MSSBase +from mss.exception import ScreenShotError + +from . import xcb +from .xcb import LIB + +if TYPE_CHECKING: + from mss.models import Monitor + from mss.screenshot import ScreenShot + +SUPPORTED_DEPTHS = {24, 32} +SUPPORTED_BITS_PER_PIXEL = 32 +SUPPORTED_RED_MASK = 0xFF0000 +SUPPORTED_GREEN_MASK = 0x00FF00 +SUPPORTED_BLUE_MASK = 0x0000FF +ALL_PLANES = 0xFFFFFFFF # XCB doesn't define AllPlanes + + +class MSSXCBBase(MSSBase): + """Base class for XCB-based screenshot implementations. + + This class provides common XCB initialization and monitor detection logic + that can be shared across different XCB screenshot methods (XGetImage, + XShmGetImage, XComposite, etc.). + """ + + def __init__(self, /, **kwargs: Any) -> None: # noqa: PLR0912 + """Initialize an XCB connection and validate the display configuration. + + Args: + **kwargs: Keyword arguments, including optional 'display' for X11 display string. + + Raises: + ScreenShotError: If the display configuration is not supported. + """ + super().__init__(**kwargs) + + display = kwargs.get("display", b"") + if not display: + display = None + + self.conn: xcb.Connection | None + self.conn, pref_screen_num = xcb.connect(display) + + # Get the connection setup information that was included when we connected. + xcb_setup = xcb.get_setup(self.conn) + screens = xcb.setup_roots(xcb_setup) + self.pref_screen = screens[pref_screen_num] + self.root = self.drawable = self.pref_screen.root + + # We don't probe the XFixes presence or version until we need it. + self._xfixes_ready: bool | None = None + + # Probe the visuals (and related information), and make sure that our drawable is in an acceptable format. + # These iterations and tests don't involve any traffic with the server; it's all stuff that was included in + # the connection setup. Effectively all modern setups will be acceptable, but we verify to be sure. + + # Currently, we assume that the drawable we're capturing is the root; when we add single-window capture, + # we'll have to ask the server for its depth and visual. + assert self.root == self.drawable # noqa: S101 + self.drawable_depth = self.pref_screen.root_depth + self.drawable_visual_id = self.pref_screen.root_visual.value + # Server image byte order + if xcb_setup.image_byte_order != xcb.ImageOrder.LSBFirst: + msg = "Only X11 servers using LSB-First images are supported." + raise ScreenShotError(msg) + # Depth + if self.drawable_depth not in SUPPORTED_DEPTHS: + msg = f"Only screens of color depth 24 or 32 are supported, not {self.drawable_depth}" + raise ScreenShotError(msg) + # Format (i.e., bpp, padding) + for format_ in xcb.setup_pixmap_formats(xcb_setup): + if format_.depth == self.drawable_depth: + break + else: + msg = f"Internal error: drawable's depth {self.drawable_depth} not found in screen's supported formats" + raise ScreenShotError(msg) + drawable_format = format_ + if drawable_format.bits_per_pixel != SUPPORTED_BITS_PER_PIXEL: + msg = ( + f"Only screens at 32 bpp (regardless of color depth) are supported; " + f"got {drawable_format.bits_per_pixel} bpp" + ) + raise ScreenShotError(msg) + if drawable_format.scanline_pad != SUPPORTED_BITS_PER_PIXEL: + # To clarify the padding: the scanline_pad is the multiple that the scanline gets padded to. If there + # is no padding, then it will be the same as one pixel's size. + msg = "Screens with scanline padding are not supported" + raise ScreenShotError(msg) + # Visual, the interpretation of pixels (like indexed, grayscale, etc). (Visuals are arranged by depth, so + # we iterate over the depths first.) + for xcb_depth in xcb.screen_allowed_depths(self.pref_screen): + if xcb_depth.depth == self.drawable_depth: + break + else: + msg = "Internal error: drawable's depth not found in screen's supported depths" + raise ScreenShotError(msg) + for visual_info in xcb.depth_visuals(xcb_depth): + if visual_info.visual_id.value == self.drawable_visual_id: + break + else: + msg = "Internal error: drawable's visual not found in screen's supported visuals" + raise ScreenShotError(msg) + if visual_info.class_ not in {xcb.VisualClass.TrueColor, xcb.VisualClass.DirectColor}: + msg = "Only TrueColor and DirectColor visuals are supported" + raise ScreenShotError(msg) + if ( + visual_info.red_mask != SUPPORTED_RED_MASK + or visual_info.green_mask != SUPPORTED_GREEN_MASK + or visual_info.blue_mask != SUPPORTED_BLUE_MASK + ): + # There are two ways to phrase this layout: BGRx accounts for the byte order, while xRGB implies the + # native word order. Since we return the data as a byte array, we use the former. By the time we get + # to this point, we've already checked the endianness and depth, so this is pretty much never going to + # happen anyway. + msg = "Only visuals with BGRx ordering are supported" + raise ScreenShotError(msg) + + def close(self) -> None: + """Close the XCB connection.""" + if self.conn is not None: + xcb.disconnect(self.conn) + self.conn = None + + def _monitors_impl(self) -> None: + """Get positions of monitors. It will populate self._monitors.""" + if self.conn is None: + msg = "Cannot identify monitors while the connection is closed" + raise ScreenShotError(msg) + + # The first entry is the whole X11 screen that the root is on. That's the one that covers all the + # monitors. + root_geom = xcb.get_geometry(self.conn, self.root) + self._monitors.append( + { + "left": root_geom.x, + "top": root_geom.y, + "width": root_geom.width, + "height": root_geom.height, + } + ) + + # After that, we have one for each monitor on that X11 screen. For decades, that's been handled by + # Xrandr. We don't presently try to work with Xinerama. So, we're going to check the different outputs, + # according to Xrandr. If that fails, we'll just leave the one root covering everything. + + # Make sure we have the Xrandr extension we need. This will query the cache that we started populating in + # __init__. + randr_ext_data = xcb.get_extension_data(self.conn, LIB.randr_id) + if not randr_ext_data.present: + return + + # We ask the server to give us anything up to the version we support (i.e., what we expect the reply + # structs to look like). If the server only supports 1.2, then that's what it'll give us, and we're ok + # with that, but we also use a faster path if the server implements at least 1.3. + randr_version_data = xcb.randr_query_version(self.conn, xcb.RANDR_MAJOR_VERSION, xcb.RANDR_MINOR_VERSION) + randr_version = (randr_version_data.major_version, randr_version_data.minor_version) + if randr_version < (1, 2): + return + + screen_resources: xcb.RandrGetScreenResourcesReply | xcb.RandrGetScreenResourcesCurrentReply + # Check to see if we have the xcb_randr_get_screen_resources_current function in libxcb-randr, and that + # the server supports it. + if hasattr(LIB.randr, "xcb_randr_get_screen_resources_current") and randr_version >= (1, 3): + screen_resources = xcb.randr_get_screen_resources_current(self.conn, self.drawable.value) + crtcs = xcb.randr_get_screen_resources_current_crtcs(screen_resources) + else: + # Either the client or the server doesn't support the _current form. That's ok; we'll use the old + # function, which forces a new query to the physical monitors. + screen_resources = xcb.randr_get_screen_resources(self.conn, self.drawable) + crtcs = xcb.randr_get_screen_resources_crtcs(screen_resources) + + for crtc in crtcs: + crtc_info = xcb.randr_get_crtc_info(self.conn, crtc, screen_resources.config_timestamp) + if crtc_info.num_outputs == 0: + continue + self._monitors.append( + {"left": crtc_info.x, "top": crtc_info.y, "width": crtc_info.width, "height": crtc_info.height} + ) + + # Extra credit would be to enumerate the virtual desktops; see + # https://specifications.freedesktop.org/wm/latest/ar01s03.html. But I don't know how widely-used that + # style is. + + def _cursor_impl_check_xfixes(self) -> bool: + if self.conn is None: + msg = "Cannot take screenshot while the connection is closed" + raise ScreenShotError(msg) + + xfixes_ext_data = xcb.get_extension_data(self.conn, LIB.xfixes_id) + if not xfixes_ext_data.present: + return False + + reply = xcb.xfixes_query_version(self.conn, xcb.XFIXES_MAJOR_VERSION, xcb.XFIXES_MINOR_VERSION) + # We can work with 2.0 and later, but not sure about the actual minimum version we can use. That's ok; + # everything these days is much more modern. + return (reply.major_version, reply.minor_version) >= (2, 0) + + def _cursor_impl(self) -> ScreenShot: + """Retrieve all cursor data. Pixels have to be RGBx.""" + + if self.conn is None: + msg = "Cannot take screenshot while the connection is closed" + raise ScreenShotError(msg) + + if self._xfixes_ready is None: + self._xfixes_ready = self._cursor_impl_check_xfixes() + if not self._xfixes_ready: + msg = "Server does not have XFixes, or the version is too old." + raise ScreenShotError(msg) + + cursor_img = xcb.xfixes_get_cursor_image(self.conn) + region = { + "left": cursor_img.x - cursor_img.xhot, + "top": cursor_img.y - cursor_img.yhot, + "width": cursor_img.width, + "height": cursor_img.height, + } + + data_arr = xcb.xfixes_get_cursor_image_cursor_image(cursor_img) + data = bytearray(data_arr) + # We don't need to do the same array slice-and-dice work as the Xlib-based implementation: Xlib has an + # unfortunate historical accident that makes it have to return the cursor image in a different format. + + return self.cls_image(data, region) + + def _grab_impl_xgetimage(self, monitor: Monitor, /) -> ScreenShot: + """Retrieve all pixels from a monitor using GetImage. + + This is used by the XGetImage backend, and also the XShmGetImage + backend in fallback mode. + """ + + if self.conn is None: + msg = "Cannot take screenshot while the connection is closed" + raise ScreenShotError(msg) + + img_reply = xcb.get_image( + self.conn, + xcb.ImageFormat.ZPixmap, + self.drawable, + monitor["left"], + monitor["top"], + monitor["width"], + monitor["height"], + ALL_PLANES, + ) + + # Now, save the image. This is a reference into the img_reply structure. + img_data_arr = xcb.get_image_data(img_reply) + # Copy this into a new bytearray, so that it will persist after we clear the image structure. + img_data = bytearray(img_data_arr) + + if img_reply.depth != self.drawable_depth or img_reply.visual.value != self.drawable_visual_id: + # This should never happen; a window can't change its visual. + msg = ( + "Server returned an image with a depth or visual different than it initially reported: " + f"expected {self.drawable_depth},{hex(self.drawable_visual_id)}, " + f"got {img_reply.depth},{hex(img_reply.visual.value)}" + ) + raise ScreenShotError(msg) + + return self.cls_image(img_data, monitor) diff --git a/src/mss/linux/xcb.py b/src/mss/linux/xcb.py index 966def06..f599e2a7 100644 --- a/src/mss/linux/xcb.py +++ b/src/mss/linux/xcb.py @@ -1,16 +1,18 @@ from __future__ import annotations -from ctypes import Structure, c_int, c_uint8, c_uint16, c_uint32 +from ctypes import _Pointer, c_int from . import xcbgen # We import these just so they're re-exported to our users. -# ruff: noqa: F401 +# ruff: noqa: F401, TC001 from .xcbgen import ( RANDR_MAJOR_VERSION, RANDR_MINOR_VERSION, RENDER_MAJOR_VERSION, RENDER_MINOR_VERSION, + SHM_MAJOR_VERSION, + SHM_MINOR_VERSION, XFIXES_MAJOR_VERSION, XFIXES_MINOR_VERSION, Atom, @@ -52,6 +54,10 @@ ScreenIterator, Setup, SetupIterator, + ShmCreateSegmentReply, + ShmGetImageReply, + ShmQueryVersionReply, + ShmSeg, Timestamp, VisualClass, Visualid, @@ -91,13 +97,19 @@ setup_pixmap_formats, setup_roots, setup_vendor, + shm_attach_fd, + shm_create_segment, + shm_create_segment_reply_fds, + shm_detach, + shm_get_image, + shm_query_version, xfixes_get_cursor_image, xfixes_get_cursor_image_cursor_image, xfixes_query_version, ) # These are also here to re-export. -from .xcbhelpers import LIB, Connection, XError +from .xcbhelpers import LIB, XID, Connection, QueryExtensionReply, XcbExtension, XError XCB_CONN_ERROR = 1 XCB_CONN_CLOSED_EXT_NOTSUPPORTED = 2 @@ -120,6 +132,53 @@ } +#### High-level XCB function wrappers + + +def get_extension_data( + xcb_conn: Connection | _Pointer[Connection], ext: XcbExtension | _Pointer[XcbExtension] +) -> QueryExtensionReply: + """Get extension data for the given extension. + + Returns the extension data, which includes whether the extension is present + and its opcode information. + """ + reply_p = LIB.xcb.xcb_get_extension_data(xcb_conn, ext) + return reply_p.contents + + +def prefetch_extension_data( + xcb_conn: Connection | _Pointer[Connection], ext: XcbExtension | _Pointer[XcbExtension] +) -> None: + """Prefetch extension data for the given extension. + + This is a performance hint to XCB to fetch the extension data + asynchronously. + """ + LIB.xcb.xcb_prefetch_extension_data(xcb_conn, ext) + + +def generate_id(xcb_conn: Connection | _Pointer[Connection]) -> XID: + """Generate a new unique X resource ID. + + Returns an XID that can be used to create new X resources. + """ + return LIB.xcb.xcb_generate_id(xcb_conn) + + +def get_setup(xcb_conn: Connection | _Pointer[Connection]) -> Setup: + """Get the connection setup information. + + Returns the setup structure containing information about the X server, + including available screens, pixmap formats, etc. + """ + setup_p = LIB.xcb.xcb_get_setup(xcb_conn) + return setup_p.contents + + +# Connection management + + def initialize() -> None: LIB.initialize(callbacks=[xcbgen.initialize]) @@ -145,6 +204,12 @@ def connect(display: str | bytes | None = None) -> tuple[Connection, int]: msg += f"error code {conn_err}" raise XError(msg) + # Prefetch extension data for all extensions we support to populate XCB's internal cache. + prefetch_extension_data(conn_p, LIB.randr_id) + prefetch_extension_data(conn_p, LIB.render_id) + prefetch_extension_data(conn_p, LIB.shm_id) + prefetch_extension_data(conn_p, LIB.xfixes_id) + return conn_p.contents, pref_screen_num.value diff --git a/src/mss/linux/xcbgen.py b/src/mss/linux/xcbgen.py index 65d808a8..6fba72b3 100644 --- a/src/mss/linux/xcbgen.py +++ b/src/mss/linux/xcbgen.py @@ -9,6 +9,7 @@ POINTER, Array, Structure, + _Pointer, c_char, c_int, c_int16, @@ -32,6 +33,8 @@ RANDR_MINOR_VERSION = 6 RENDER_MAJOR_VERSION = 0 RENDER_MINOR_VERSION = 11 +SHM_MAJOR_VERSION = 1 +SHM_MINOR_VERSION = 2 XFIXES_MAJOR_VERSION = 6 XFIXES_MINOR_VERSION = 0 @@ -428,6 +431,47 @@ class RenderQueryPictFormatsReply(Structure): ) +class ShmQueryVersionReply(Structure): + _fields_ = ( + ("response_type", c_uint8), + ("shared_pixmaps", c_uint8), + ("sequence", c_uint16), + ("length", c_uint32), + ("major_version", c_uint16), + ("minor_version", c_uint16), + ("uid", c_uint16), + ("gid", c_uint16), + ("pixmap_format", c_uint8), + ("pad0", c_uint8 * 15), + ) + + +class ShmSeg(XID): + pass + + +class ShmGetImageReply(Structure): + _fields_ = ( + ("response_type", c_uint8), + ("depth", c_uint8), + ("sequence", c_uint16), + ("length", c_uint32), + ("visual", Visualid), + ("size", c_uint32), + ("pad0", c_uint8 * 16), + ) + + +class ShmCreateSegmentReply(Structure): + _fields_ = ( + ("response_type", c_uint8), + ("nfd", c_uint8), + ("sequence", c_uint16), + ("length", c_uint32), + ("pad0", c_uint8 * 24), + ) + + class XfixesQueryVersionReply(Structure): _fields_ = ( ("response_type", c_uint8), @@ -587,6 +631,10 @@ def xfixes_get_cursor_image_cursor_image(r: XfixesGetCursorImageReply) -> Array[ ) +def shm_create_segment_reply_fds(c: Connection | _Pointer[Connection], r: ShmCreateSegmentReply) -> _Pointer[c_int]: + return LIB.shm.xcb_shm_create_segment_reply_fds(c, r) + + def get_geometry(c: Connection, drawable: Drawable) -> GetGeometryReply: return LIB.xcb.xcb_get_geometry(c, drawable).reply(c) @@ -617,7 +665,7 @@ def get_property( def no_operation(c: Connection) -> None: - return LIB.xcb.xcb_no_operation(c).check(c) + return LIB.xcb.xcb_no_operation_checked(c).check(c) def randr_query_version( @@ -648,6 +696,39 @@ def render_query_pict_formats(c: Connection) -> RenderQueryPictFormatsReply: return LIB.render.xcb_render_query_pict_formats(c).reply(c) +def shm_query_version(c: Connection) -> ShmQueryVersionReply: + return LIB.shm.xcb_shm_query_version(c).reply(c) + + +def shm_get_image( + c: Connection, + drawable: Drawable, + x: c_int16 | int, + y: c_int16 | int, + width: c_uint16 | int, + height: c_uint16 | int, + plane_mask: c_uint32 | int, + format_: c_uint8 | int, + shmseg: ShmSeg, + offset: c_uint32 | int, +) -> ShmGetImageReply: + return LIB.shm.xcb_shm_get_image(c, drawable, x, y, width, height, plane_mask, format_, shmseg, offset).reply(c) + + +def shm_attach_fd(c: Connection, shmseg: ShmSeg, shm_fd: c_int | int, read_only: c_uint8 | int) -> None: + return LIB.shm.xcb_shm_attach_fd_checked(c, shmseg, shm_fd, read_only).check(c) + + +def shm_create_segment( + c: Connection, shmseg: ShmSeg, size: c_uint32 | int, read_only: c_uint8 | int +) -> ShmCreateSegmentReply: + return LIB.shm.xcb_shm_create_segment(c, shmseg, size, read_only).reply(c) + + +def shm_detach(c: Connection, shmseg: ShmSeg) -> None: + return LIB.shm.xcb_shm_detach_checked(c, shmseg).check(c) + + def xfixes_query_version( c: Connection, client_major_version: c_uint32 | int, client_minor_version: c_uint32 | int ) -> XfixesQueryVersionReply: @@ -761,6 +842,11 @@ def initialize() -> None: # noqa: PLR0915 LIB.xfixes.xcb_xfixes_get_cursor_image_cursor_image.restype = POINTER(c_uint32) LIB.xfixes.xcb_xfixes_get_cursor_image_cursor_image_length.argtypes = (POINTER(XfixesGetCursorImageReply),) LIB.xfixes.xcb_xfixes_get_cursor_image_cursor_image_length.restype = c_int + LIB.shm.xcb_shm_create_segment_reply_fds.argtypes = ( + POINTER(Connection), + POINTER(ShmCreateSegmentReply), + ) + LIB.shm.xcb_shm_create_segment_reply_fds.restype = POINTER(c_int) initialize_xcb_typed_func(LIB.xcb, "xcb_get_geometry", [POINTER(Connection), Drawable], GetGeometryReply) initialize_xcb_typed_func( LIB.xcb, @@ -774,8 +860,8 @@ def initialize() -> None: # noqa: PLR0915 [POINTER(Connection), c_uint8, Window, Atom, Atom, c_uint32, c_uint32], GetPropertyReply, ) - LIB.xcb.xcb_no_operation.argtypes = (Connection,) - LIB.xcb.xcb_no_operation.restype = VoidCookie + LIB.xcb.xcb_no_operation_checked.argtypes = (POINTER(Connection),) + LIB.xcb.xcb_no_operation_checked.restype = VoidCookie initialize_xcb_typed_func( LIB.randr, "xcb_randr_query_version", [POINTER(Connection), c_uint32, c_uint32], RandrQueryVersionReply ) @@ -797,6 +883,28 @@ def initialize() -> None: # noqa: PLR0915 initialize_xcb_typed_func( LIB.render, "xcb_render_query_pict_formats", [POINTER(Connection)], RenderQueryPictFormatsReply ) + initialize_xcb_typed_func(LIB.shm, "xcb_shm_query_version", [POINTER(Connection)], ShmQueryVersionReply) + initialize_xcb_typed_func( + LIB.shm, + "xcb_shm_get_image", + [POINTER(Connection), Drawable, c_int16, c_int16, c_uint16, c_uint16, c_uint32, c_uint8, ShmSeg, c_uint32], + ShmGetImageReply, + ) + LIB.shm.xcb_shm_attach_fd_checked.argtypes = ( + POINTER(Connection), + ShmSeg, + c_int, + c_uint8, + ) + LIB.shm.xcb_shm_attach_fd_checked.restype = VoidCookie + initialize_xcb_typed_func( + LIB.shm, "xcb_shm_create_segment", [POINTER(Connection), ShmSeg, c_uint32, c_uint8], ShmCreateSegmentReply + ) + LIB.shm.xcb_shm_detach_checked.argtypes = ( + POINTER(Connection), + ShmSeg, + ) + LIB.shm.xcb_shm_detach_checked.restype = VoidCookie initialize_xcb_typed_func( LIB.xfixes, "xcb_xfixes_query_version", [POINTER(Connection), c_uint32, c_uint32], XfixesQueryVersionReply ) diff --git a/src/mss/linux/xcbhelpers.py b/src/mss/linux/xcbhelpers.py index 786799cb..f387c79e 100644 --- a/src/mss/linux/xcbhelpers.py +++ b/src/mss/linux/xcbhelpers.py @@ -230,10 +230,9 @@ def __str__(self) -> str: ext_desc = f"\n Extension: {details['extension']}" if "extension" in details else "" msg += ( f"\nX Error of failed request: {error_desc}" - f"\n Major opcode of failed request: {major_desc}" - f"{ext_desc}" - f"\n Minor opcode of failed request: {minor_desc}" - f"\n Resource id in failed request: {details['resource_id']}" + f"\n Major opcode of failed request: {major_desc}{ext_desc}" + + (f"\n Minor opcode of failed request: {minor_desc}" if details["minor_code"] != 0 else "") + + f"\n Resource id in failed request: {details['resource_id']}" f"\n Serial number of failed request: {details['full_sequence']}" ) return msg @@ -452,7 +451,8 @@ def initialize(self, callbacks: Iterable[Callable[[], None]] = frozenset()) -> N self.xcb.xcb_get_extension_data.restype = POINTER(QueryExtensionReply) self.xcb.xcb_prefetch_extension_data.argtypes = [POINTER(Connection), POINTER(XcbExtension)] self.xcb.xcb_prefetch_extension_data.restype = None - + self.xcb.xcb_generate_id.argtypes = [POINTER(Connection)] + self.xcb.xcb_generate_id.restype = XID self.xcb.xcb_get_setup.argtypes = [POINTER(Connection)] self.xcb.xcb_get_setup.restype = POINTER(Setup) self.xcb.xcb_connection_has_error.argtypes = [POINTER(Connection)] @@ -476,6 +476,13 @@ def initialize(self, callbacks: Iterable[Callable[[], None]] = frozenset()) -> N self.render = cdll.LoadLibrary(libxcb_render_so) self.render_id = XcbExtension.in_dll(self.render, "xcb_render_id") + libxcb_shm_so = ctypes.util.find_library("xcb-shm") + if libxcb_shm_so is None: + msg = "Library libxcb-shm.so not found" + raise ScreenShotError(msg) + self.shm = cdll.LoadLibrary(libxcb_shm_so) + self.shm_id = XcbExtension.in_dll(self.shm, "xcb_shm_id") + libxcb_xfixes_so = ctypes.util.find_library("xcb-xfixes") if libxcb_xfixes_so is None: msg = "Library libxcb-xfixes.so not found" diff --git a/src/mss/linux/xgetimage.py b/src/mss/linux/xgetimage.py index 9f127948..a41368c6 100644 --- a/src/mss/linux/xgetimage.py +++ b/src/mss/linux/xgetimage.py @@ -1,22 +1,10 @@ -from typing import Any - -from mss.base import MSSBase -from mss.exception import ScreenShotError from mss.models import Monitor from mss.screenshot import ScreenShot -from . import xcb -from .xcb import LIB - -SUPPORTED_DEPTHS = {24, 32} -SUPPORTED_BITS_PER_PIXEL = 32 -SUPPORTED_RED_MASK = 0xFF0000 -SUPPORTED_GREEN_MASK = 0x00FF00 -SUPPORTED_BLUE_MASK = 0x0000FF -ALL_PLANES = 0xFFFFFFFF # XCB doesn't define AllPlanes +from .base import MSSXCBBase -class MSS(MSSBase): +class MSS(MSSXCBBase): """Multiple ScreenShots implementation for GNU/Linux. This implementation is based on XCB, using the GetImage request. @@ -25,232 +13,6 @@ class MSS(MSSBase): * XFixes: Including the cursor. """ - def __init__(self, /, **kwargs: Any) -> None: # noqa: PLR0912 - super().__init__(**kwargs) - - display = kwargs.get("display", b"") - if not display: - display = None - - self.conn: xcb.Connection | None - self.conn, pref_screen_num = xcb.connect(display) - - # Let XCB pre-populate its internal cache regarding the extensions we might use, while we finish setup. - LIB.xcb.xcb_prefetch_extension_data(self.conn, LIB.randr_id) - LIB.xcb.xcb_prefetch_extension_data(self.conn, LIB.xfixes_id) - - # Get the connection setup information that was included when we connected. - xcb_setup = LIB.xcb.xcb_get_setup(self.conn).contents - screens = xcb.setup_roots(xcb_setup) - pref_screen = screens[pref_screen_num] - self.root = self.drawable = pref_screen.root - self.drawable = self.root - - # We don't probe the XFixes presence or version until we need it. - self._xfixes_ready: bool | None = None - - # Probe the visuals (and related information), and make sure that our drawable is in an acceptable format. - # These iterations and tests don't involve any traffic with the server; it's all stuff that was included in the - # connection setup. Effectively all modern setups will be acceptable, but we verify to be sure. - - # Currently, we assume that the drawable we're capturing is the root; when we add single-window capture, we'll - # have to ask the server for its depth and visual. - assert self.root == self.drawable # noqa: S101 - self.drawable_depth = pref_screen.root_depth - self.drawable_visual_id = pref_screen.root_visual.value - # Server image byte order - if xcb_setup.image_byte_order != xcb.ImageOrder.LSBFirst: - msg = "Only X11 servers using LSB-First images are supported." - raise ScreenShotError(msg) - # Depth - if self.drawable_depth not in SUPPORTED_DEPTHS: - msg = f"Only screens of color depth 24 or 32 are supported, not {self.drawable_depth}" - raise ScreenShotError(msg) - # Format (i.e., bpp, padding) - for format_ in xcb.setup_pixmap_formats(xcb_setup): - if format_.depth == self.drawable_depth: - break - else: - msg = f"Internal error: drawable's depth {self.drawable_depth} not found in screen's supported formats" - raise ScreenShotError(msg) - drawable_format = format_ - if drawable_format.bits_per_pixel != SUPPORTED_BITS_PER_PIXEL: - msg = ( - f"Only screens at 32 bpp (regardless of color depth) are supported; " - f"got {drawable_format.bits_per_pixel} bpp" - ) - raise ScreenShotError(msg) - if drawable_format.scanline_pad != SUPPORTED_BITS_PER_PIXEL: - # To clarify the padding: the scanline_pad is the multiple that the scanline gets padded to. If there is - # no padding, then it will be the same as one pixel's size. - msg = "Screens with scanline padding are not supported" - raise ScreenShotError(msg) - # Visual, the interpretation of pixels (like indexed, grayscale, etc). (Visuals are arranged by depth, so we - # iterate over the depths first.) - for xcb_depth in xcb.screen_allowed_depths(pref_screen): - if xcb_depth.depth == self.drawable_depth: - break - else: - msg = "Internal error: drawable's depth not found in screen's supported depths" - raise ScreenShotError(msg) - for visual_info in xcb.depth_visuals(xcb_depth): - if visual_info.visual_id.value == self.drawable_visual_id: - break - else: - msg = "Internal error: drawable's visual not found in screen's supported visuals" - raise ScreenShotError(msg) - if visual_info.class_ not in {xcb.VisualClass.TrueColor, xcb.VisualClass.DirectColor}: - msg = "Only TrueColor and DirectColor visuals are supported" - raise ScreenShotError(msg) - if ( - visual_info.red_mask != SUPPORTED_RED_MASK - or visual_info.green_mask != SUPPORTED_GREEN_MASK - or visual_info.blue_mask != SUPPORTED_BLUE_MASK - ): - # There are two ways to phrase this layout: BGRx accounts for the byte order, while xRGB implies the native - # word order. Since we return the data as a byte array, we use the former. By the time we get to this - # point, we've already checked the endianness and depth, so this is pretty much never going to happen - # anyway. - msg = "Only visuals with BGRx ordering are supported" - raise ScreenShotError(msg) - - def close(self) -> None: - if self.conn is not None: - xcb.disconnect(self.conn) - self.conn = None - - def _monitors_impl(self) -> None: - """Get positions of monitors. It will populate self._monitors.""" - - if self.conn is None: - msg = "Cannot identify monitors while the connection is closed" - raise ScreenShotError(msg) - - # The first entry is the whole X11 screen that the root is on. That's the one that covers all the monitors. - root_geom = xcb.get_geometry(self.conn, self.root) - self._monitors.append( - { - "left": root_geom.x, - "top": root_geom.y, - "width": root_geom.width, - "height": root_geom.height, - } - ) - - # After that, we have one for each monitor on that X11 screen. For decades, that's been handled by Xrandr. - # We don't presently try to work with Xinerama. So, we're going to check the different outputs, according to - # Xrandr. If that fails, we'll just leave the one root covering everything. - - # Make sure we have the Xrandr extension we need. This will query the cache that we started populating in - # __init__. - randr_ext_data = LIB.xcb.xcb_get_extension_data(self.conn, LIB.randr_id).contents - if not randr_ext_data.present: - return - - # We ask the server to give us anything up to the version we support (i.e., what we expect the reply structs - # to look like). If the server only supports 1.2, then that's what it'll give us, and we're ok with that, but - # we also use a faster path if the server implements at least 1.3. - randr_version_data = xcb.randr_query_version(self.conn, xcb.RANDR_MAJOR_VERSION, xcb.RANDR_MINOR_VERSION) - randr_version = (randr_version_data.major_version, randr_version_data.minor_version) - if randr_version < (1, 2): - return - - screen_resources: xcb.RandrGetScreenResourcesReply | xcb.RandrGetScreenResourcesCurrentReply - # Check to see if we have the xcb_randr_get_screen_resources_current function in libxcb-randr, and that the - # server supports it. - if hasattr(LIB.randr, "xcb_randr_get_screen_resources_current") and randr_version >= (1, 3): - screen_resources = xcb.randr_get_screen_resources_current(self.conn, self.drawable.value) - crtcs = xcb.randr_get_screen_resources_current_crtcs(screen_resources) - else: - # Either the client or the server doesn't support the _current form. That's ok; we'll use the old - # function, which forces a new query to the physical monitors. - screen_resources = xcb.randr_get_screen_resources(self.conn, self.drawable) - crtcs = xcb.randr_get_screen_resources_crtcs(screen_resources) - - for crtc in crtcs: - crtc_info = xcb.randr_get_crtc_info(self.conn, crtc, screen_resources.config_timestamp) - if crtc_info.num_outputs == 0: - continue - self._monitors.append( - {"left": crtc_info.x, "top": crtc_info.y, "width": crtc_info.width, "height": crtc_info.height} - ) - - # Extra credit would be to enumerate the virtual desktops; see - # https://specifications.freedesktop.org/wm/latest/ar01s03.html. But I don't know how widely-used that style - # is. - - def _grab_impl(self, monitor: Monitor, /) -> ScreenShot: + def _grab_impl(self, monitor: Monitor) -> ScreenShot: """Retrieve all pixels from a monitor. Pixels have to be RGBX.""" - - if self.conn is None: - msg = "Cannot take screenshot while the connection is closed" - raise ScreenShotError(msg) - - img_reply = xcb.get_image( - self.conn, - xcb.ImageFormat.ZPixmap, - self.drawable, - monitor["left"], - monitor["top"], - monitor["width"], - monitor["height"], - ALL_PLANES, - ) - - # Now, save the image. This is a reference into the img_reply structure. - img_data_arr = xcb.get_image_data(img_reply) - # Copy this into a new bytearray, so that it will persist after we clear the image structure. - img_data = bytearray(img_data_arr) - - if img_reply.depth != self.drawable_depth or img_reply.visual.value != self.drawable_visual_id: - # This should never happen; a window can't change its visual. - msg = ( - "Server returned an image with a depth or visual different than it initially reported: " - f"expected {self.drawable_depth},{hex(self.drawable_visual_id)}, " - f"got {img_reply.depth},{hex(img_reply.visual.value)}" - ) - raise ScreenShotError(msg) - - return self.cls_image(img_data, monitor) - - def _cursor_impl_check_xfixes(self) -> bool: - if self.conn is None: - msg = "Cannot take screenshot while the connection is closed" - raise ScreenShotError(msg) - - xfixes_ext_data = LIB.xcb.xcb_get_extension_data(self.conn, LIB.xfixes_id).contents - if not xfixes_ext_data.present: - return False - - reply = xcb.xfixes_query_version(self.conn, xcb.XFIXES_MAJOR_VERSION, xcb.XFIXES_MINOR_VERSION) - # We can work with 2.0 and later, but not sure about the actual minimum version we can use. That's ok; - # everything these days is much more modern. - return (reply.major_version, reply.minor_version) >= (2, 0) - - def _cursor_impl(self) -> ScreenShot: - """Retrieve all cursor data. Pixels have to be RGBx.""" - - if self.conn is None: - msg = "Cannot take screenshot while the connection is closed" - raise ScreenShotError(msg) - - if self._xfixes_ready is None: - self._xfixes_ready = self._cursor_impl_check_xfixes() - if not self._xfixes_ready: - msg = "Server does not have XFixes, or the version is too old." - raise ScreenShotError(msg) - - cursor_img = xcb.xfixes_get_cursor_image(self.conn) - region = { - "left": cursor_img.x - cursor_img.xhot, - "top": cursor_img.y - cursor_img.yhot, - "width": cursor_img.width, - "height": cursor_img.height, - } - - data_arr = xcb.xfixes_get_cursor_image_cursor_image(cursor_img) - data = bytearray(data_arr) - # We don't need to do the same array slice-and-dice work as the Xlib-based implementation: Xlib has an - # unfortunate historical accident that makes it have to return the cursor image in a different format. - - return self.cls_image(data, region) + return super()._grab_impl_xgetimage(monitor) diff --git a/src/mss/linux/xshmgetimage.py b/src/mss/linux/xshmgetimage.py new file mode 100644 index 00000000..981f2116 --- /dev/null +++ b/src/mss/linux/xshmgetimage.py @@ -0,0 +1,214 @@ +from __future__ import annotations + +import enum +import os +from mmap import PROT_READ, mmap # type: ignore[attr-defined] +from typing import TYPE_CHECKING, Any + +from mss.exception import ScreenShotError +from mss.linux import xcb +from mss.linux.xcbhelpers import LIB, XProtoError + +from .base import ALL_PLANES, MSSXCBBase + +if TYPE_CHECKING: + from mss.models import Monitor + from mss.screenshot import ScreenShot + + +class ShmStatus(enum.Enum): + UNKNOWN = enum.auto() # Constructor says SHM *should* work, but we haven't seen a real GetImage succeed yet. + AVAILABLE = enum.auto() # We've successfully used XShmGetImage at least once. + UNAVAILABLE = enum.auto() # We know SHM GetImage is unusable; always use XGetImage. + + +class MSS(MSSXCBBase): + """Multiple ScreenShots implementation for GNU/Linux. + + This implementation is based on XCB, using the ShmGetImage request. + If ShmGetImage fails, then this will fall back to using GetImage. + In that event, the reason for the fallback will be recorded in the + shm_fallback_reason attribute as a string, for debugging purposes. + """ + + def __init__(self, /, **kwargs: Any) -> None: + super().__init__(**kwargs) + + # These are the objects we need to clean up when we shut down. They are created in _setup_shm. + self._memfd: int | None = None + self._buf: mmap | None = None + self._shmseg: xcb.ShmSeg | None = None + + # Rather than trying to track the shm_status, we may be able to raise an exception in __init__ if XShmGetImage + # isn't available. The factory in linux/__init__.py could then catch that and switch to XGetImage. + # The conditions under which the attach will succeed but the xcb_shm_get_image will fail are extremely + # rare, and I haven't yet found any that also will work with xcb_get_image. + self.shm_status: ShmStatus = self._setup_shm() + self.shm_fallback_reason: str | None = None + + def _shm_report_issue(self, msg: str, *args: Any) -> None: + """Debugging hook for troubleshooting MIT-SHM issues. + + This will be called whenever MIT-SHM is disabled. The optional + arguments are not well-defined; exceptions are common. + """ + full_msg = msg + if args: + full_msg += " | " + ", ".join(str(arg) for arg in args) + self.shm_fallback_reason = full_msg + + def _setup_shm(self) -> ShmStatus: # noqa: PLR0911 + assert self.conn is not None # noqa: S101 + + try: + shm_ext_data = xcb.get_extension_data(self.conn, LIB.shm_id) + if not shm_ext_data.present: + self._shm_report_issue("MIT-SHM extension not present") + return ShmStatus.UNAVAILABLE + + # We use the FD-based version of ShmGetImage, so we require the extension to be at least 1.2. + shm_version_data = xcb.shm_query_version(self.conn) + shm_version = (shm_version_data.major_version, shm_version_data.minor_version) + if shm_version < (1, 2): + self._shm_report_issue("MIT-SHM version too old", shm_version) + return ShmStatus.UNAVAILABLE + + # We allocate something large enough for the root, so we don't have to reallocate each time the window is + # resized. + self._bufsize = self.pref_screen.width_in_pixels * self.pref_screen.height_in_pixels * 4 + + if not hasattr(os, "memfd_create"): + self._shm_report_issue("os.memfd_create not available") + return ShmStatus.UNAVAILABLE + try: + self._memfd = os.memfd_create("mss-shm-buf", flags=os.MFD_CLOEXEC) # type: ignore[attr-defined] + except OSError as e: + self._shm_report_issue("memfd_create failed", e) + self._shutdown_shm() + return ShmStatus.UNAVAILABLE + os.ftruncate(self._memfd, self._bufsize) + + try: + self._buf = mmap(self._memfd, self._bufsize, prot=PROT_READ) # type: ignore[call-arg] + except OSError as e: + self._shm_report_issue("mmap failed", e) + self._shutdown_shm() + return ShmStatus.UNAVAILABLE + + self._shmseg = xcb.ShmSeg(xcb.generate_id(self.conn).value) + try: + # This will normally be what raises an exception if you're on a remote connection. + # XCB will close _memfd, on success or on failure. + try: + xcb.shm_attach_fd(self.conn, self._shmseg, self._memfd, read_only=False) + finally: + self._memfd = None + except xcb.XError as e: + self._shm_report_issue("Cannot attach MIT-SHM segment", e) + self._shutdown_shm() + return ShmStatus.UNAVAILABLE + + except Exception: + self._shutdown_shm() + raise + + return ShmStatus.UNKNOWN + + def close(self) -> None: + self._shutdown_shm() + super().close() + + def _shutdown_shm(self) -> None: + # It would be nice to also try to tell the server to detach the shmseg, but we might be in an error path + # and don't know if that's possible. It's not like we'll leak a lot of them on the same connection anyway. + # This can be called in the path of partial initialization. + if self._buf is not None: + self._buf.close() + self._buf = None + if self._memfd is not None: + os.close(self._memfd) + self._memfd = None + + def _grab_impl_xshmgetimage(self, monitor: Monitor) -> ScreenShot: + if self.conn is None: + msg = "Cannot take screenshot while the connection is closed" + raise ScreenShotError(msg) + assert self._buf is not None # noqa: S101 + assert self._shmseg is not None # noqa: S101 + + required_size = monitor["width"] * monitor["height"] * 4 + if required_size > self._bufsize: + # This is temporary. The permanent fix will depend on how + # issue https://github.com/BoboTiG/python-mss/issues/432 is resolved. + msg = ( + "Requested capture size exceeds the allocated buffer. If you have resized the screen, " + "please recreate your MSS object." + ) + raise ScreenShotError(msg) + + img_reply = xcb.shm_get_image( + self.conn, + self.drawable.value, + monitor["left"], + monitor["top"], + monitor["width"], + monitor["height"], + ALL_PLANES, + xcb.ImageFormat.ZPixmap, + self._shmseg, + 0, + ) + + if img_reply.depth != self.drawable_depth or img_reply.visual.value != self.drawable_visual_id: + # This should never happen; a window can't change its visual. + msg = ( + "Server returned an image with a depth or visual different than it initially reported: " + f"expected {self.drawable_depth},{hex(self.drawable_visual_id)}, " + f"got {img_reply.depth},{hex(img_reply.visual.value)}" + ) + raise ScreenShotError(msg) + + # Snapshot the buffer into new bytearray. + new_size = monitor["width"] * monitor["height"] * 4 + # Slicing the memoryview creates a new memoryview that points to the relevant subregion. Making this and + # then copying it into a fresh bytearray is much faster than slicing the mmap object. + img_mv = memoryview(self._buf)[:new_size] + img_data = bytearray(img_mv) + + return self.cls_image(img_data, monitor) + + def _grab_impl(self, monitor: Monitor) -> ScreenShot: + """Retrieve all pixels from a monitor. Pixels have to be RGBX.""" + if self.shm_status == ShmStatus.UNAVAILABLE: + return super()._grab_impl_xgetimage(monitor) + + # The usual path is just the next few lines. + try: + rv = self._grab_impl_xshmgetimage(monitor) + self.shm_status = ShmStatus.AVAILABLE + except XProtoError as e: + if self.shm_status != ShmStatus.UNKNOWN: + # We know XShmGetImage works, because it worked earlier. Reraise the error. + raise + + # Should we engage the fallback path? In almost all cases, if XShmGetImage failed at this stage (after + # all our testing in __init__), XGetImage will also fail. This could mean that the user sent an + # out-of-bounds request. In more exotic situations, some rare X servers disallow screen capture + # altogether: security-hardened servers, for instance, or some XPrint servers. But let's make sure, by + # testing the same request through XGetImage. + try: + rv = super()._grab_impl_xgetimage(monitor) + except XProtoError: # noqa: TRY203 + # The XGetImage also failed, so we don't know anything about whether XShmGetImage is usable. Maybe + # the user sent an out-of-bounds request. Maybe it's a security-hardened server. We're not sure what + # the problem is. So, if XGetImage failed, we re-raise that error (the one from XShmGetImage will be + # attached as __context__), but we won't update the shm_status yet. (Technically, our except:raise + # clause here is redundant; it's just for clarity, to hold this comment.) + raise + + # Using XShmGetImage failed, and using XGetImage worked. Use XGetImage in the future. + self._shm_report_issue("MIT-SHM GetImage failed", e) + self.shm_status = ShmStatus.UNAVAILABLE + self._shutdown_shm() + + return rv diff --git a/src/mss/windows.py b/src/mss/windows.py index d5e2bb78..b84daa3e 100644 --- a/src/mss/windows.py +++ b/src/mss/windows.py @@ -35,6 +35,8 @@ __all__ = ("MSS",) +BACKENDS = ["default"] + CAPTUREBLT = 0x40000000 DIB_RGB_COLORS = 0 diff --git a/src/tests/conftest.py b/src/tests/conftest.py index 584b0b75..08b98548 100644 --- a/src/tests/conftest.py +++ b/src/tests/conftest.py @@ -70,7 +70,7 @@ def raw() -> bytes: return data -@pytest.fixture(params=["xlib", "xgetimage"] if system() == "Linux" else ["default"]) +@pytest.fixture(params=["xlib", "xgetimage", "xshmgetimage"] if system() == "Linux" else ["default"]) def backend(request: pytest.FixtureRequest) -> str: return request.param @@ -80,3 +80,29 @@ def mss_impl(backend: str) -> Callable[..., MSSBase]: # We can't just use partial here, since it will read $DISPLAY at the wrong time. This can cause problems, # depending on just how the fixtures get run. return lambda *args, **kwargs: mss(*args, display=os.getenv("DISPLAY"), backend=backend, **kwargs) + + +@pytest.fixture(autouse=True, scope="session") +def inhibit_x11_resets() -> Generator[None, None, None]: + """Ensure that an X11 connection is open during the test session. + + Under X11, when the last client disconnects, the server resets. If + a new client tries to connect before the reset is complete, it may fail. + Since we often run the tests under Xvfb, they're frequently the only + clients. Since our tests run in rapid succession, this combination + can lead to intermittent failures. + + To avoid this, we open a connection at the start of the test session + and keep it open until the end. + """ + if system() != "Linux": + yield + return + + conn, _ = xcb.connect() + try: + yield + finally: + # Some tests may have reset xcb.LIB, so make sure it's currently initialized. + xcb.initialize() + xcb.disconnect(conn) diff --git a/src/tests/test_gnu_linux.py b/src/tests/test_gnu_linux.py index 82cef143..6bb118ac 100644 --- a/src/tests/test_gnu_linux.py +++ b/src/tests/test_gnu_linux.py @@ -191,7 +191,7 @@ def test_region_out_of_monitor_bounds(display: str, backend: str) -> None: details = exc.value.details assert details assert isinstance(details, dict) - if backend == "xgetimage" and mss.linux.xcb.LIB.errors is None: + if backend in {"xgetimage", "xshmgetimage"} and mss.linux.xcb.LIB.errors is None: pytest.xfail("Error strings in XCB backends are only available with the xcb-util-errors library.") assert isinstance(details["error"], str) @@ -310,3 +310,41 @@ def test_with_cursor_failure(display: str) -> None: pytest.raises(ScreenShotError), ): sct.grab(sct.monitors[1]) + + +def test_shm_available() -> None: + """Verify that the xshmgetimage backend doesn't always fallback. + + Since this backend does an automatic fallback for certain types of + anticipated issues, that could cause some failures to be masked. + Ensure this isn't happening. + """ + with ( + pyvirtualdisplay.Display(size=(WIDTH, HEIGHT), color_depth=DEPTH) as vdisplay, + mss.mss(display=vdisplay.new_display_var, backend="xshmgetimage") as sct, + ): + assert isinstance(sct, mss.linux.xshmgetimage.MSS) # For Mypy + # The status currently isn't established as final until a grab succeeds. + sct.grab(sct.monitors[0]) + assert sct.shm_status == mss.linux.xshmgetimage.ShmStatus.AVAILABLE + + +def test_shm_fallback() -> None: + """Verify that the xshmgetimage backend falls back if MIT-SHM fails. + + The most common case when a fallback is needed is with a TCP + connection, such as the one used with ssh relaying. By using + DISPLAY=localhost:99 instead of DISPLAY=:99, we connect over TCP + instead of a local-domain socket. This is sufficient to prevent + MIT-SHM from completing its setup: the extension is available, but + won't be able to attach a shared memory segment. + """ + with ( + pyvirtualdisplay.Display(size=(WIDTH, HEIGHT), color_depth=DEPTH, extra_args=["-listen", "tcp"]) as vdisplay, + mss.mss(display=f"localhost{vdisplay.new_display_var}", backend="xshmgetimage") as sct, + ): + assert isinstance(sct, mss.linux.xshmgetimage.MSS) # For Mypy + # Ensure that the grab call completes without exception. + sct.grab(sct.monitors[0]) + # Ensure that it really did have to fall back; otherwise, we'd need to change how we test this case. + assert sct.shm_status == mss.linux.xshmgetimage.ShmStatus.UNAVAILABLE diff --git a/src/tests/test_implementation.py b/src/tests/test_implementation.py index d02c2b95..9ee55adb 100644 --- a/src/tests/test_implementation.py +++ b/src/tests/test_implementation.py @@ -95,72 +95,104 @@ def test_factory_unknown_system(backend: str, monkeypatch: pytest.MonkeyPatch) - assert error == "System 'chuck norris' not (yet?) implemented." -@patch.object(sys, "argv", new=[]) # Prevent side effects while testing +@pytest.fixture +def reset_sys_argv(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setattr(sys, "argv", []) + + +@pytest.mark.usefixtures("reset_sys_argv") @pytest.mark.parametrize("with_cursor", [False, True]) -def test_entry_point(with_cursor: bool, capsys: pytest.CaptureFixture) -> None: - def main(*args: str, ret: int = 0) -> None: +class TestEntryPoint: + """CLI entry-point scenarios split into focused tests.""" + + @staticmethod + def _run_main(with_cursor: bool, *args: str, ret: int = 0) -> None: if with_cursor: args = (*args, "--with-cursor") assert entry_point(*args) == ret - # No arguments - main() - captured = capsys.readouterr() - for mon, line in enumerate(captured.out.splitlines(), 1): - filename = Path(f"monitor-{mon}.png") - assert line.endswith(filename.name) - assert filename.is_file() - filename.unlink() - - file = Path("monitor-1.png") - for opt in ("-m", "--monitor"): - main(opt, "1") - captured = capsys.readouterr() - assert captured.out.endswith(f"{file.name}\n") - assert filename.is_file() - filename.unlink() - - for opts in zip(["-m 1", "--monitor=1"], ["-q", "--quiet"]): - main(*opts) - captured = capsys.readouterr() - assert not captured.out - assert filename.is_file() - filename.unlink() - - fmt = "sct-{mon}-{width}x{height}.png" - for opt in ("-o", "--out"): - main(opt, fmt) - captured = capsys.readouterr() - with mss.mss(display=os.getenv("DISPLAY")) as sct: - for mon, (monitor, line) in enumerate(zip(sct.monitors[1:], captured.out.splitlines()), 1): - filename = Path(fmt.format(mon=mon, **monitor)) - assert line.endswith(filename.name) - assert filename.is_file() - filename.unlink() - - fmt = "sct_{mon}-{date:%Y-%m-%d}.png" - for opt in ("-o", "--out"): - main("-m 1", opt, fmt) - filename = Path(fmt.format(mon=1, date=datetime.now(tz=UTC))) - captured = capsys.readouterr() - assert captured.out.endswith(f"{filename}\n") - assert filename.is_file() - filename.unlink() - - coordinates = "2,12,40,67" - filename = Path("sct-2x12_40x67.png") - for opt in ("-c", "--coordinates"): - main(opt, coordinates) - captured = capsys.readouterr() - assert captured.out.endswith(f"{filename}\n") - assert filename.is_file() - filename.unlink() - - coordinates = "2,12,40" - for opt in ("-c", "--coordinates"): - main(opt, coordinates, ret=2) + def test_no_arguments(self, with_cursor: bool, capsys: pytest.CaptureFixture) -> None: + self._run_main(with_cursor) captured = capsys.readouterr() - assert captured.out == "Coordinates syntax: top, left, width, height\n" + for mon, line in enumerate(captured.out.splitlines(), 1): + filename = Path(f"monitor-{mon}.png") + assert line.endswith(filename.name) + assert filename.is_file() + filename.unlink() + + def test_monitor_option_and_quiet(self, with_cursor: bool, capsys: pytest.CaptureFixture) -> None: + file = Path("monitor-1.png") + filename: Path | None = None + for opt in ("-m", "--monitor"): + self._run_main(with_cursor, opt, "1") + captured = capsys.readouterr() + assert captured.out.endswith(f"{file.name}\n") + filename = Path(captured.out.rstrip()) + assert filename.is_file() + filename.unlink() + + assert filename is not None + for opts in zip(["-m 1", "--monitor=1"], ["-q", "--quiet"]): + self._run_main(with_cursor, *opts) + captured = capsys.readouterr() + assert not captured.out + assert filename.is_file() + filename.unlink() + + def test_custom_output_pattern(self, with_cursor: bool, capsys: pytest.CaptureFixture) -> None: + fmt = "sct-{mon}-{width}x{height}.png" + for opt in ("-o", "--out"): + self._run_main(with_cursor, opt, fmt) + captured = capsys.readouterr() + with mss.mss(display=os.getenv("DISPLAY")) as sct: + for mon, (monitor, line) in enumerate(zip(sct.monitors[1:], captured.out.splitlines()), 1): + filename = Path(fmt.format(mon=mon, **monitor)) + assert line.endswith(filename.name) + assert filename.is_file() + filename.unlink() + + def test_output_pattern_with_date(self, with_cursor: bool, capsys: pytest.CaptureFixture) -> None: + fmt = "sct_{mon}-{date:%Y-%m-%d}.png" + for opt in ("-o", "--out"): + self._run_main(with_cursor, "-m 1", opt, fmt) + filename = Path(fmt.format(mon=1, date=datetime.now(tz=UTC))) + captured = capsys.readouterr() + assert captured.out.endswith(f"{filename}\n") + assert filename.is_file() + filename.unlink() + + def test_coordinates_capture(self, with_cursor: bool, capsys: pytest.CaptureFixture) -> None: + coordinates = "2,12,40,67" + filename = Path("sct-2x12_40x67.png") + for opt in ("-c", "--coordinates"): + self._run_main(with_cursor, opt, coordinates) + captured = capsys.readouterr() + assert captured.out.endswith(f"{filename}\n") + assert filename.is_file() + filename.unlink() + + def test_invalid_coordinates(self, with_cursor: bool, capsys: pytest.CaptureFixture) -> None: + coordinates = "2,12,40" + for opt in ("-c", "--coordinates"): + self._run_main(with_cursor, opt, coordinates, ret=2) + captured = capsys.readouterr() + assert captured.out == "Coordinates syntax: top, left, width, height\n" + + def test_backend_option(self, with_cursor: bool, capsys: pytest.CaptureFixture) -> None: + backend = "default" + for opt in ("-b", "--backend"): + self._run_main(with_cursor, opt, backend, "-m1") + captured = capsys.readouterr() + filename = Path(captured.out.rstrip()) + assert filename.is_file() + filename.unlink() + + def test_invalid_backend_option(self, with_cursor: bool, capsys: pytest.CaptureFixture) -> None: + backend = "chuck_norris" + for opt in ("-b", "--backend"): + self._run_main(with_cursor, opt, backend, "-m1", ret=2) + captured = capsys.readouterr() + assert "argument -b/--backend: invalid choice: 'chuck_norris' (choose from" in captured.err @patch.object(sys, "argv", new=[]) # Prevent side effects while testing diff --git a/src/tests/test_setup.py b/src/tests/test_setup.py index c6eea223..295dba23 100644 --- a/src/tests/test_setup.py +++ b/src/tests/test_setup.py @@ -51,6 +51,7 @@ def test_sdist() -> None: f"mss-{__version__}/docs/source/examples/fps_multiprocessing.py", f"mss-{__version__}/docs/source/examples/from_pil_tuple.py", f"mss-{__version__}/docs/source/examples/linux_display_keyword.py", + f"mss-{__version__}/docs/source/examples/linux_xshm_backend.py", f"mss-{__version__}/docs/source/examples/opencv_numpy.py", f"mss-{__version__}/docs/source/examples/part_of_screen.py", f"mss-{__version__}/docs/source/examples/part_of_screen_monitor_2.py", @@ -69,11 +70,13 @@ def test_sdist() -> None: f"mss-{__version__}/src/mss/exception.py", f"mss-{__version__}/src/mss/factory.py", f"mss-{__version__}/src/mss/linux/__init__.py", + f"mss-{__version__}/src/mss/linux/base.py", f"mss-{__version__}/src/mss/linux/xcb.py", f"mss-{__version__}/src/mss/linux/xcbgen.py", f"mss-{__version__}/src/mss/linux/xcbhelpers.py", f"mss-{__version__}/src/mss/linux/xgetimage.py", f"mss-{__version__}/src/mss/linux/xlib.py", + f"mss-{__version__}/src/mss/linux/xshmgetimage.py", f"mss-{__version__}/src/mss/models.py", f"mss-{__version__}/src/mss/py.typed", f"mss-{__version__}/src/mss/screenshot.py", @@ -105,6 +108,7 @@ def test_sdist() -> None: f"mss-{__version__}/src/xcbproto/gen_xcb_to_py.py", f"mss-{__version__}/src/xcbproto/randr.xml", f"mss-{__version__}/src/xcbproto/render.xml", + f"mss-{__version__}/src/xcbproto/shm.xml", f"mss-{__version__}/src/xcbproto/xfixes.xml", f"mss-{__version__}/src/xcbproto/xproto.xml", ] @@ -134,11 +138,13 @@ def test_wheel() -> None: "mss/exception.py", "mss/factory.py", "mss/linux/__init__.py", + "mss/linux/base.py", "mss/linux/xcb.py", "mss/linux/xcbgen.py", "mss/linux/xcbhelpers.py", "mss/linux/xgetimage.py", "mss/linux/xlib.py", + "mss/linux/xshmgetimage.py", "mss/models.py", "mss/py.typed", "mss/screenshot.py", diff --git a/src/tests/test_xcb.py b/src/tests/test_xcb.py index bff4bc00..24903ace 100644 --- a/src/tests/test_xcb.py +++ b/src/tests/test_xcb.py @@ -19,7 +19,7 @@ import pytest from mss.exception import ScreenShotError -from mss.linux import xcb, xgetimage +from mss.linux import base, xcb, xgetimage from mss.linux.xcbhelpers import ( XcbExtension, array_from_xcb, @@ -172,7 +172,7 @@ def __init__(self, monkeypatch: pytest.MonkeyPatch) -> None: randr_id=XcbExtension(), xfixes_id=XcbExtension(), ) - self._monkeypatch.setattr(xgetimage, "LIB", fake_lib) + self._monkeypatch.setattr(xcb, "LIB", fake_lib) self._monkeypatch.setattr(xcb, "connect", lambda _display=None: (self.connection, 0)) self._monkeypatch.setattr(xcb, "disconnect", lambda _conn: None) self._monkeypatch.setattr(xcb, "setup_roots", self._setup_roots) @@ -190,16 +190,16 @@ def reset(self) -> None: self.screen.root_visual = xcb.Visualid(visual_id) self.format.depth = self.screen.root_depth - self.format.bits_per_pixel = xgetimage.SUPPORTED_BITS_PER_PIXEL - self.format.scanline_pad = xgetimage.SUPPORTED_BITS_PER_PIXEL + self.format.bits_per_pixel = base.SUPPORTED_BITS_PER_PIXEL + self.format.scanline_pad = base.SUPPORTED_BITS_PER_PIXEL self.depth.depth = self.screen.root_depth self.visual.visual_id = xcb.Visualid(visual_id) self.visual.class_ = xcb.VisualClass.TrueColor - self.visual.red_mask = xgetimage.SUPPORTED_RED_MASK - self.visual.green_mask = xgetimage.SUPPORTED_GREEN_MASK - self.visual.blue_mask = xgetimage.SUPPORTED_BLUE_MASK + self.visual.red_mask = base.SUPPORTED_RED_MASK + self.visual.green_mask = base.SUPPORTED_GREEN_MASK + self.visual.blue_mask = base.SUPPORTED_BLUE_MASK self.screens = [self.screen] self.pixmap_formats = [self.format] diff --git a/src/xcbproto/README.md b/src/xcbproto/README.md index 79891dd2..351e0b97 100644 --- a/src/xcbproto/README.md +++ b/src/xcbproto/README.md @@ -20,7 +20,7 @@ The generator is a **maintainer tool**, not part of the normal build process: 3. The generator reads the XML protocol definitions and emits `xcbgen.py`. 4. The maintainer ensures that this worked correctly, and moves the file to `src/mss/linux/xcbgen.py`. -4. The generated `xcbgen.py` is committed to version control and distributed with the package, so end users never need to run the generator. +5. The generated `xcbgen.py` is committed to version control and distributed with the package, so end users never need to run the generator. ## Protocol XML Files diff --git a/src/xcbproto/gen_xcb_to_py.py b/src/xcbproto/gen_xcb_to_py.py index 6109169b..1e41acc8 100755 --- a/src/xcbproto/gen_xcb_to_py.py +++ b/src/xcbproto/gen_xcb_to_py.py @@ -71,6 +71,13 @@ "QueryVersion", "QueryPictFormats", ], + "shm": [ + "QueryVersion", + "GetImage", + "AttachFd", + "CreateSegment", + "Detach", + ], "xfixes": [ "QueryVersion", "GetCursorImage", @@ -95,7 +102,7 @@ "void": "None", } -INT_CTYPES = {"c_int8", "c_int16", "c_int32", "c_int64", "c_uint8", "c_uint16", "c_uint32", "c_uint64"} +INT_CTYPES = {"c_int", "c_int8", "c_int16", "c_int32", "c_int64", "c_uint8", "c_uint16", "c_uint32", "c_uint64"} EIGHT_BIT_TYPES = { "c_int8", @@ -308,7 +315,12 @@ class ListField: enum: str | None = None -StructMember = Field | Pad | ListField +@dataclass +class FdField: + name: str + + +StructMember = Field | Pad | ListField | FdField class StructLikeDefn(LazyDefn): @@ -351,6 +363,12 @@ def _parse_child(self, child: ET._Element) -> None: mask=child.attrib.get("mask"), ) ) + case "fd": + self.members.append( + FdField( + name=child.attrib["name"], + ) + ) case "pad": self.members.append(parse_pad(child)) case "list": @@ -647,7 +665,7 @@ def require_member(protocol: str, member: StructMember) -> None: if member.enum: enum = registry.resolve_enum(protocol, member.enum) appendnew(rv.enums, enum) - elif isinstance(member, Pad): + elif isinstance(member, (FdField, Pad)): pass else: msg = f"Unrecognized struct member {member}" @@ -763,7 +781,15 @@ def camel_case(name: str, protocol: str | None = None) -> str: def snake_case(name: str, protocol: str | None = None) -> str: - prefix = "" if protocol in {"xproto", None} else f"{snake_case(protocol)}_" # type: ignore[arg-type] + prefix = ( + "" + if protocol + in { + "xproto", + None, + } + else f"{snake_case(protocol)}_" # type: ignore[arg-type] + ) if name.islower(): return prefix + name s1 = re.sub(r"(.)([A-Z][a-z]+)", r"\1_\2", name) @@ -823,7 +849,7 @@ def format_type_name(typedefn: TypeDefn) -> str: return base_name -def format_field_name(field: Field) -> str: +def format_field_name(field: Field | FdField) -> str: name = field.name return f"{name}_" if name in RESERVED_NAMES else name @@ -995,23 +1021,29 @@ def emit_reply(writer: CodeWriter, registry: ProtocolRegistry, entry: ReplyDefn) # uint16_t sequence; # uint32_t length; # However, if the first field of the reply contents is a single byte, then it replaces pad0 in that structure. - members = entry.members[:] + nonfd_members = [m for m in entry.members if not isinstance(m, FdField)] + field_entries: list[tuple[str, str] | StructMember] = [("response_type", "c_uint8")] - if members and isinstance(members[0], Field) and is_eight_bit(registry, entry.protocol, members[0].type): - member = members.pop(0) + if ( + nonfd_members + and isinstance(nonfd_members[0], Field) + and is_eight_bit(registry, entry.protocol, nonfd_members[0].type) + ): + member = nonfd_members.pop(0) assert isinstance(member, Field) # noqa: S101 name = format_field_name(member) type_expr = python_type_for(registry, entry.protocol, member.type) field_entries.append((name, type_expr)) - elif members and (isinstance(members[0], Pad) and members[0].bytes == 1): + elif nonfd_members and (isinstance(nonfd_members[0], Pad) and nonfd_members[0].bytes == 1): # XFixes puts the padding byte explicitly at the start of the replies, but it just gets folded in the same way. - member = members.pop(0) + member = nonfd_members.pop(0) field_entries.append(member) else: field_entries.append(Pad(bytes=1)) field_entries.append(("sequence", "c_uint16")) field_entries.append(("length", "c_uint32")) - field_entries += members + field_entries += nonfd_members + return emit_structlike(writer, registry, entry, field_entries) @@ -1123,6 +1155,41 @@ def emit_lists(writer: CodeWriter, registry: ProtocolRegistry, types: list[TypeD return rv +# File descriptor accessor wrappers + + +def emit_fds(writer: CodeWriter, _registry: ProtocolRegistry, types: list[TypeDefn]) -> list[FuncDecl]: + rv: list[FuncDecl] = [] + for typ in types: + if not isinstance(typ, StructLikeDefn): + continue + fd_members = [m for m in typ.members if isinstance(m, FdField)] + if not fd_members: + continue + if len(fd_members) > 1: + # I simply don't know how this would be represented in libxcb. + msg = f"Struct {typ.protocol}:{typ.name} has multiple FdFields, which is unsupported" + raise GenerationError(msg) + # The way that the reply fd accessor is named is not that great: + # rather than having a function named after the field, it's named with just an "_fd" suffix. + func_name = f"{format_function_name(typ.name, typ.protocol)}_reply_fds" + writer.write() + writer.write( + f"def {func_name}(c: Connection | _Pointer[Connection], r: {format_type_name(typ)}) -> _Pointer[c_int]:" + ) + with writer.indent(): + writer.write(f"return LIB.{lib_for_proto(typ.protocol)}.xcb_{func_name}(c, r)") + rv.append( + FuncDecl( + typ.protocol, + f"xcb_{func_name}", + ["POINTER(Connection)", f"POINTER({format_type_name(typ)})"], + "POINTER(c_int)", + ) + ) + return rv + + # Request wrapper functions @@ -1139,20 +1206,26 @@ def emit_requests(writer: CodeWriter, registry: ProtocolRegistry, requests: list # a function when you call it. params: list[tuple[str, str]] = [("c", "Connection")] params += [ - (format_field_name(field), python_type_for(registry, request.protocol, field.type)) - for field in request.fields + ( + format_field_name(field), + python_type_for(registry, request.protocol, field.type) if isinstance(field, Field) else "c_int", + ) + for field in request.members + if not isinstance(field, (Pad, ListField)) ] params_types = [p[1] for p in params] # Arrange for the wrappers to take Python ints in place of any of the int-based ctypes. params_with_alts = [(p[0], f"{p[1]} | int" if p[1] in INT_CTYPES else p[1]) for p in params] params_string = ", ".join(f"{p[0]}: {p[1]}" for p in params_with_alts) args_string = ", ".join(p[0] for p in params) + xcb_params_types = ["POINTER(Connection)", *params_types[1:]] if request.reply is None: + xcb_func_name += "_checked" writer.write() writer.write(f"def {func_name}({params_string}) -> None:") with writer.indent(): writer.write(f"return LIB.{lib}.{xcb_func_name}({args_string}).check(c)") - rv.append(FuncDecl(request.protocol, xcb_func_name, params_types, "VoidCookie")) + rv.append(FuncDecl(request.protocol, xcb_func_name, xcb_params_types, "VoidCookie")) else: reply_type = request.reply reply_type_name = format_type_name(reply_type) @@ -1163,7 +1236,6 @@ def emit_requests(writer: CodeWriter, registry: ProtocolRegistry, requests: list # We have to use initialize_xcb_typed_func to initialize late, rather than making the cookie class here, # because the cookie definition needs to reference the XCB reply function. We could also do a lazy # initialization, but it's probably not worth it. - xcb_params_types = ["POINTER(Connection)", *params_types[1:]] rv.append( f'initialize_xcb_typed_func(LIB.{lib}, "{xcb_func_name}", ' f"[{', '.join(xcb_params_types)}], {reply_type_name})" @@ -1209,6 +1281,7 @@ def generate( emit_enums(writer, registry, plan.enums) func_decls += emit_types(writer, registry, plan.types) func_decls += emit_lists(writer, registry, plan.types) + func_decls += emit_fds(writer, registry, plan.types) func_decls += emit_requests(writer, registry, plan.requests) emit_initialize(writer, func_decls) diff --git a/src/xcbproto/shm.xml b/src/xcbproto/shm.xml new file mode 100644 index 00000000..c1114e0a --- /dev/null +++ b/src/xcbproto/shm.xml @@ -0,0 +1,350 @@ + + + + xproto + + + + + + + + + + + + + Report that an XCB_SHM_PUT_IMAGE request has completed + + + + + + + + + + + + + + + + + + + + + + The version of the MIT-SHM extension supported by the server + + + + + The UID of the server. + The GID of the server. + + + + Query the version of the MIT-SHM extension. + + + + + + + + + + + Attach a System V shared memory segment. + + + + + + + + + + + Destroys the specified shared memory segment. + + The segment to be destroyed. + + + + + + + + + + + + + + + + + + + + + + Copy data from the shared memory to the specified drawable. + + The drawable to draw to. + The graphics context to use. + The total width of the source image. + The total height of the source image. + The source X coordinate of the sub-image to copy. + The source Y coordinate of the sub-image to copy. + + + + + The depth to use. + + + The offset that the source image starts at. + + + + + + + + + + + + + + + + + + + + + Indicates the result of the copy. + + The depth of the source drawable. + The visual ID of the source drawable. + The number of bytes copied. + + + + Copies data from the specified drawable to the shared memory segment. + + The drawable to copy the image out of. + The X coordinate in the drawable to begin copying at. + The Y coordinate in the drawable to begin copying at. + The width of the image to copy. + The height of the image to copy. + A mask that determines which planes are used. + The format to use for the copy (???). + The destination shared memory segment. + The offset in the shared memory segment to copy data to. + + + + + + + + + + + + + + Create a pixmap backed by shared memory. + +Create a pixmap backed by shared memory. Writes to the shared memory will be +reflected in the contents of the pixmap, and writes to the pixmap will be +reflected in the contents of the shared memory. + + A pixmap ID created with xcb_generate_id(). + The drawable to create the pixmap in. + + + + + + + + + + + + + + + Create a shared memory segment + + + The file descriptor the server should mmap(). + + + + + + + + + + + + + + + The returned file descriptor. + + + + + + Asks the server to allocate a shared memory segment. + + + The size of the segment to create. + + + +