From 4a4ce349f2dfa95d4c7fab87085934c7ce20edac Mon Sep 17 00:00:00 2001 From: Joel Ray Holveck Date: Fri, 21 Nov 2025 16:40:26 -0800 Subject: [PATCH 1/9] Add XCB MIT-SHM support, and factor out the XCB setup This only adds the support for the XCB MIT-SHM extension to mss's internal xcb libraries. The actual usage of shared memory for screenshots will be done in a future commit. --- src/mss/linux/base.py | 228 ++++++++++++++++++++++ src/mss/linux/xcb.py | 59 +++++- src/mss/linux/xcbgen.py | 108 +++++++++++ src/mss/linux/xcbhelpers.py | 10 +- src/mss/linux/xgetimage.py | 210 +------------------- src/tests/test_setup.py | 3 + src/tests/test_xcb.py | 14 +- src/xcbproto/gen_xcb_to_py.py | 96 ++++++++-- src/xcbproto/shm.xml | 350 ++++++++++++++++++++++++++++++++++ 9 files changed, 846 insertions(+), 232 deletions(-) create mode 100644 src/mss/linux/base.py create mode 100644 src/xcbproto/shm.xml diff --git a/src/mss/linux/base.py b/src/mss/linux/base.py new file mode 100644 index 00000000..735d6baa --- /dev/null +++ b/src/mss/linux/base.py @@ -0,0 +1,228 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +from mss.base import MSSBase +from mss.exception import ScreenShotError + +from . import xcb +from .xcb import LIB + +if TYPE_CHECKING: + from mss.screenshot import ScreenShot + +SUPPORTED_DEPTHS = {24, 32} +SUPPORTED_BITS_PER_PIXEL = 32 +SUPPORTED_RED_MASK = 0xFF0000 +SUPPORTED_GREEN_MASK = 0x00FF00 +SUPPORTED_BLUE_MASK = 0x0000FF +ALL_PLANES = 0xFFFFFFFF # XCB doesn't define AllPlanes + + +class MSSXCBBase(MSSBase): + """Base class for XCB-based screenshot implementations. + + This class provides common XCB initialization and monitor detection logic + that can be shared across different XCB screenshot methods (XGetImage, + XShmGetImage, XComposite, etc.). + """ + + def __init__(self, /, **kwargs: Any) -> None: # noqa: PLR0912 + """Initialize an XCB connection and validate the display configuration. + + Args: + **kwargs: Keyword arguments, including optional 'display' for X11 display string. + + Raises: + ScreenShotError: If the display configuration is not supported. + """ + super().__init__(**kwargs) + + display = kwargs.get("display", b"") + if not display: + display = None + + self.conn: xcb.Connection | None + self.conn, pref_screen_num = xcb.connect(display) + + # Get the connection setup information that was included when we connected. + xcb_setup = xcb.get_setup(self.conn) + screens = xcb.setup_roots(xcb_setup) + pref_screen = screens[pref_screen_num] + self.root = self.drawable = pref_screen.root + + # We don't probe the XFixes presence or version until we need it. + self._xfixes_ready: bool | None = None + + # Probe the visuals (and related information), and make sure that our drawable is in an acceptable format. + # These iterations and tests don't involve any traffic with the server; it's all stuff that was included in + # the connection setup. Effectively all modern setups will be acceptable, but we verify to be sure. + + # Currently, we assume that the drawable we're capturing is the root; when we add single-window capture, + # we'll have to ask the server for its depth and visual. + assert self.root == self.drawable # noqa: S101 + self.drawable_depth = pref_screen.root_depth + self.drawable_visual_id = pref_screen.root_visual.value + # Server image byte order + if xcb_setup.image_byte_order != xcb.ImageOrder.LSBFirst: + msg = "Only X11 servers using LSB-First images are supported." + raise ScreenShotError(msg) + # Depth + if self.drawable_depth not in SUPPORTED_DEPTHS: + msg = f"Only screens of color depth 24 or 32 are supported, not {self.drawable_depth}" + raise ScreenShotError(msg) + # Format (i.e., bpp, padding) + for format_ in xcb.setup_pixmap_formats(xcb_setup): + if format_.depth == self.drawable_depth: + break + else: + msg = f"Internal error: drawable's depth {self.drawable_depth} not found in screen's supported formats" + raise ScreenShotError(msg) + drawable_format = format_ + if drawable_format.bits_per_pixel != SUPPORTED_BITS_PER_PIXEL: + msg = ( + f"Only screens at 32 bpp (regardless of color depth) are supported; " + f"got {drawable_format.bits_per_pixel} bpp" + ) + raise ScreenShotError(msg) + if drawable_format.scanline_pad != SUPPORTED_BITS_PER_PIXEL: + # To clarify the padding: the scanline_pad is the multiple that the scanline gets padded to. If there + # is no padding, then it will be the same as one pixel's size. + msg = "Screens with scanline padding are not supported" + raise ScreenShotError(msg) + # Visual, the interpretation of pixels (like indexed, grayscale, etc). (Visuals are arranged by depth, so + # we iterate over the depths first.) + for xcb_depth in xcb.screen_allowed_depths(pref_screen): + if xcb_depth.depth == self.drawable_depth: + break + else: + msg = "Internal error: drawable's depth not found in screen's supported depths" + raise ScreenShotError(msg) + for visual_info in xcb.depth_visuals(xcb_depth): + if visual_info.visual_id.value == self.drawable_visual_id: + break + else: + msg = "Internal error: drawable's visual not found in screen's supported visuals" + raise ScreenShotError(msg) + if visual_info.class_ not in {xcb.VisualClass.TrueColor, xcb.VisualClass.DirectColor}: + msg = "Only TrueColor and DirectColor visuals are supported" + raise ScreenShotError(msg) + if ( + visual_info.red_mask != SUPPORTED_RED_MASK + or visual_info.green_mask != SUPPORTED_GREEN_MASK + or visual_info.blue_mask != SUPPORTED_BLUE_MASK + ): + # There are two ways to phrase this layout: BGRx accounts for the byte order, while xRGB implies the + # native word order. Since we return the data as a byte array, we use the former. By the time we get + # to this point, we've already checked the endianness and depth, so this is pretty much never going to + # happen anyway. + msg = "Only visuals with BGRx ordering are supported" + raise ScreenShotError(msg) + + def close(self) -> None: + """Close the XCB connection.""" + if self.conn is not None: + xcb.disconnect(self.conn) + self.conn = None + + def _monitors_impl(self) -> None: + """Get positions of monitors. It will populate self._monitors.""" + if self.conn is None: + msg = "Cannot identify monitors while the connection is closed" + raise ScreenShotError(msg) + + # The first entry is the whole X11 screen that the root is on. That's the one that covers all the + # monitors. + root_geom = xcb.get_geometry(self.conn, self.root) + self._monitors.append( + { + "left": root_geom.x, + "top": root_geom.y, + "width": root_geom.width, + "height": root_geom.height, + } + ) + + # After that, we have one for each monitor on that X11 screen. For decades, that's been handled by + # Xrandr. We don't presently try to work with Xinerama. So, we're going to check the different outputs, + # according to Xrandr. If that fails, we'll just leave the one root covering everything. + + # Make sure we have the Xrandr extension we need. This will query the cache that we started populating in + # __init__. + randr_ext_data = xcb.get_extension_data(self.conn, LIB.randr_id) + if not randr_ext_data.present: + return + + # We ask the server to give us anything up to the version we support (i.e., what we expect the reply + # structs to look like). If the server only supports 1.2, then that's what it'll give us, and we're ok + # with that, but we also use a faster path if the server implements at least 1.3. + randr_version_data = xcb.randr_query_version(self.conn, xcb.RANDR_MAJOR_VERSION, xcb.RANDR_MINOR_VERSION) + randr_version = (randr_version_data.major_version, randr_version_data.minor_version) + if randr_version < (1, 2): + return + + screen_resources: xcb.RandrGetScreenResourcesReply | xcb.RandrGetScreenResourcesCurrentReply + # Check to see if we have the xcb_randr_get_screen_resources_current function in libxcb-randr, and that + # the server supports it. + if hasattr(LIB.randr, "xcb_randr_get_screen_resources_current") and randr_version >= (1, 3): + screen_resources = xcb.randr_get_screen_resources_current(self.conn, self.drawable.value) + crtcs = xcb.randr_get_screen_resources_current_crtcs(screen_resources) + else: + # Either the client or the server doesn't support the _current form. That's ok; we'll use the old + # function, which forces a new query to the physical monitors. + screen_resources = xcb.randr_get_screen_resources(self.conn, self.drawable) + crtcs = xcb.randr_get_screen_resources_crtcs(screen_resources) + + for crtc in crtcs: + crtc_info = xcb.randr_get_crtc_info(self.conn, crtc, screen_resources.config_timestamp) + if crtc_info.num_outputs == 0: + continue + self._monitors.append( + {"left": crtc_info.x, "top": crtc_info.y, "width": crtc_info.width, "height": crtc_info.height} + ) + + # Extra credit would be to enumerate the virtual desktops; see + # https://specifications.freedesktop.org/wm/latest/ar01s03.html. But I don't know how widely-used that + # style is. + + def _cursor_impl_check_xfixes(self) -> bool: + if self.conn is None: + msg = "Cannot take screenshot while the connection is closed" + raise ScreenShotError(msg) + + xfixes_ext_data = xcb.get_extension_data(self.conn, LIB.xfixes_id) + if not xfixes_ext_data.present: + return False + + reply = xcb.xfixes_query_version(self.conn, xcb.XFIXES_MAJOR_VERSION, xcb.XFIXES_MINOR_VERSION) + # We can work with 2.0 and later, but not sure about the actual minimum version we can use. That's ok; + # everything these days is much more modern. + return (reply.major_version, reply.minor_version) >= (2, 0) + + def _cursor_impl(self) -> ScreenShot: + """Retrieve all cursor data. Pixels have to be RGBx.""" + + if self.conn is None: + msg = "Cannot take screenshot while the connection is closed" + raise ScreenShotError(msg) + + if self._xfixes_ready is None: + self._xfixes_ready = self._cursor_impl_check_xfixes() + if not self._xfixes_ready: + msg = "Server does not have XFixes, or the version is too old." + raise ScreenShotError(msg) + + cursor_img = xcb.xfixes_get_cursor_image(self.conn) + region = { + "left": cursor_img.x - cursor_img.xhot, + "top": cursor_img.y - cursor_img.yhot, + "width": cursor_img.width, + "height": cursor_img.height, + } + + data_arr = xcb.xfixes_get_cursor_image_cursor_image(cursor_img) + data = bytearray(data_arr) + # We don't need to do the same array slice-and-dice work as the Xlib-based implementation: Xlib has an + # unfortunate historical accident that makes it have to return the cursor image in a different format. + + return self.cls_image(data, region) diff --git a/src/mss/linux/xcb.py b/src/mss/linux/xcb.py index 966def06..b5b19283 100644 --- a/src/mss/linux/xcb.py +++ b/src/mss/linux/xcb.py @@ -1,11 +1,11 @@ from __future__ import annotations -from ctypes import Structure, c_int, c_uint8, c_uint16, c_uint32 +from ctypes import Structure, _Pointer, c_int, c_uint8, c_uint16, c_uint32 from . import xcbgen # We import these just so they're re-exported to our users. -# ruff: noqa: F401 +# ruff: noqa: F401, TC001 from .xcbgen import ( RANDR_MAJOR_VERSION, RANDR_MINOR_VERSION, @@ -97,7 +97,7 @@ ) # These are also here to re-export. -from .xcbhelpers import LIB, Connection, XError +from .xcbhelpers import LIB, XID, Connection, QueryExtensionReply, XcbExtension, XError XCB_CONN_ERROR = 1 XCB_CONN_CLOSED_EXT_NOTSUPPORTED = 2 @@ -120,6 +120,53 @@ } +#### High-level XCB function wrappers + + +def get_extension_data( + xcb_conn: Connection | _Pointer[Connection], ext: XcbExtension | _Pointer[XcbExtension] +) -> QueryExtensionReply: + """Get extension data for the given extension. + + Returns the extension data, which includes whether the extension is present + and its opcode information. + """ + reply_p = LIB.xcb.xcb_get_extension_data(xcb_conn, ext) + return reply_p.contents + + +def prefetch_extension_data( + xcb_conn: Connection | _Pointer[Connection], ext: XcbExtension | _Pointer[XcbExtension] +) -> None: + """Prefetch extension data for the given extension. + + This is a performance hint to XCB to fetch the extension data + asynchronously. + """ + LIB.xcb.xcb_prefetch_extension_data(xcb_conn, ext) + + +def generate_id(xcb_conn: Connection | _Pointer[Connection]) -> XID: + """Generate a new unique X resource ID. + + Returns an XID that can be used to create new X resources. + """ + return LIB.xcb.xcb_generate_id(xcb_conn) + + +def get_setup(xcb_conn: Connection | _Pointer[Connection]) -> Setup: + """Get the connection setup information. + + Returns the setup structure containing information about the X server, + including available screens, pixmap formats, etc. + """ + setup_p = LIB.xcb.xcb_get_setup(xcb_conn) + return setup_p.contents + + +# Connection management + + def initialize() -> None: LIB.initialize(callbacks=[xcbgen.initialize]) @@ -145,6 +192,12 @@ def connect(display: str | bytes | None = None) -> tuple[Connection, int]: msg += f"error code {conn_err}" raise XError(msg) + # Prefetch extension data for all extensions we support to populate XCB's internal cache. + prefetch_extension_data(conn_p, LIB.randr_id) + prefetch_extension_data(conn_p, LIB.render_id) + prefetch_extension_data(conn_p, LIB.shm_id) + prefetch_extension_data(conn_p, LIB.xfixes_id) + return conn_p.contents, pref_screen_num.value diff --git a/src/mss/linux/xcbgen.py b/src/mss/linux/xcbgen.py index 65d808a8..8225c442 100644 --- a/src/mss/linux/xcbgen.py +++ b/src/mss/linux/xcbgen.py @@ -9,6 +9,7 @@ POINTER, Array, Structure, + _Pointer, c_char, c_int, c_int16, @@ -32,6 +33,8 @@ RANDR_MINOR_VERSION = 6 RENDER_MAJOR_VERSION = 0 RENDER_MINOR_VERSION = 11 +SHM_MAJOR_VERSION = 1 +SHM_MINOR_VERSION = 2 XFIXES_MAJOR_VERSION = 6 XFIXES_MINOR_VERSION = 0 @@ -428,6 +431,47 @@ class RenderQueryPictFormatsReply(Structure): ) +class ShmQueryVersionReply(Structure): + _fields_ = ( + ("response_type", c_uint8), + ("shared_pixmaps", c_uint8), + ("sequence", c_uint16), + ("length", c_uint32), + ("major_version", c_uint16), + ("minor_version", c_uint16), + ("uid", c_uint16), + ("gid", c_uint16), + ("pixmap_format", c_uint8), + ("pad0", c_uint8 * 15), + ) + + +class ShmSeg(XID): + pass + + +class ShmGetImageReply(Structure): + _fields_ = ( + ("response_type", c_uint8), + ("depth", c_uint8), + ("sequence", c_uint16), + ("length", c_uint32), + ("visual", Visualid), + ("size", c_uint32), + ("pad0", c_uint8 * 16), + ) + + +class ShmCreateSegmentReply(Structure): + _fields_ = ( + ("response_type", c_uint8), + ("nfd", c_uint8), + ("sequence", c_uint16), + ("length", c_uint32), + ("pad0", c_uint8 * 24), + ) + + class XfixesQueryVersionReply(Structure): _fields_ = ( ("response_type", c_uint8), @@ -587,6 +631,10 @@ def xfixes_get_cursor_image_cursor_image(r: XfixesGetCursorImageReply) -> Array[ ) +def shm_create_segment_reply_fds(c: Connection, r: ShmCreateSegmentReply) -> _Pointer[c_int]: + return LIB.shm.xcb_shm_create_segment_reply_fds(c, r) + + def get_geometry(c: Connection, drawable: Drawable) -> GetGeometryReply: return LIB.xcb.xcb_get_geometry(c, drawable).reply(c) @@ -648,6 +696,39 @@ def render_query_pict_formats(c: Connection) -> RenderQueryPictFormatsReply: return LIB.render.xcb_render_query_pict_formats(c).reply(c) +def shm_query_version(c: Connection) -> ShmQueryVersionReply: + return LIB.shm.xcb_shm_query_version(c).reply(c) + + +def shm_get_image( + c: Connection, + drawable: Drawable, + x: c_int16 | int, + y: c_int16 | int, + width: c_uint16 | int, + height: c_uint16 | int, + plane_mask: c_uint32 | int, + format_: c_uint8 | int, + shmseg: ShmSeg, + offset: c_uint32 | int, +) -> ShmGetImageReply: + return LIB.shm.xcb_shm_get_image(c, drawable, x, y, width, height, plane_mask, format_, shmseg, offset).reply(c) + + +def shm_attach_fd(c: Connection, shmseg: ShmSeg, shm_fd: c_int | int, read_only: c_uint8 | int) -> None: + return LIB.shm.xcb_shm_attach_fd(c, shmseg, shm_fd, read_only).check(c) + + +def shm_create_segment( + c: Connection, shmseg: ShmSeg, size: c_uint32 | int, read_only: c_uint8 | int +) -> ShmCreateSegmentReply: + return LIB.shm.xcb_shm_create_segment(c, shmseg, size, read_only).reply(c) + + +def shm_detach(c: Connection, shmseg: ShmSeg) -> None: + return LIB.shm.xcb_shm_detach(c, shmseg).check(c) + + def xfixes_query_version( c: Connection, client_major_version: c_uint32 | int, client_minor_version: c_uint32 | int ) -> XfixesQueryVersionReply: @@ -761,6 +842,11 @@ def initialize() -> None: # noqa: PLR0915 LIB.xfixes.xcb_xfixes_get_cursor_image_cursor_image.restype = POINTER(c_uint32) LIB.xfixes.xcb_xfixes_get_cursor_image_cursor_image_length.argtypes = (POINTER(XfixesGetCursorImageReply),) LIB.xfixes.xcb_xfixes_get_cursor_image_cursor_image_length.restype = c_int + LIB.shm.xcb_shm_create_segment_reply_fds.argtypes = ( + POINTER(Connection), + POINTER(ShmCreateSegmentReply), + ) + LIB.shm.xcb_shm_create_segment_reply_fds.restype = POINTER(c_int) initialize_xcb_typed_func(LIB.xcb, "xcb_get_geometry", [POINTER(Connection), Drawable], GetGeometryReply) initialize_xcb_typed_func( LIB.xcb, @@ -797,6 +883,28 @@ def initialize() -> None: # noqa: PLR0915 initialize_xcb_typed_func( LIB.render, "xcb_render_query_pict_formats", [POINTER(Connection)], RenderQueryPictFormatsReply ) + initialize_xcb_typed_func(LIB.shm, "xcb_shm_query_version", [POINTER(Connection)], ShmQueryVersionReply) + initialize_xcb_typed_func( + LIB.shm, + "xcb_shm_get_image", + [POINTER(Connection), Drawable, c_int16, c_int16, c_uint16, c_uint16, c_uint32, c_uint8, ShmSeg, c_uint32], + ShmGetImageReply, + ) + LIB.shm.xcb_shm_attach_fd.argtypes = ( + Connection, + ShmSeg, + c_int, + c_uint8, + ) + LIB.shm.xcb_shm_attach_fd.restype = VoidCookie + initialize_xcb_typed_func( + LIB.shm, "xcb_shm_create_segment", [POINTER(Connection), ShmSeg, c_uint32, c_uint8], ShmCreateSegmentReply + ) + LIB.shm.xcb_shm_detach.argtypes = ( + Connection, + ShmSeg, + ) + LIB.shm.xcb_shm_detach.restype = VoidCookie initialize_xcb_typed_func( LIB.xfixes, "xcb_xfixes_query_version", [POINTER(Connection), c_uint32, c_uint32], XfixesQueryVersionReply ) diff --git a/src/mss/linux/xcbhelpers.py b/src/mss/linux/xcbhelpers.py index 786799cb..a3a195d3 100644 --- a/src/mss/linux/xcbhelpers.py +++ b/src/mss/linux/xcbhelpers.py @@ -452,7 +452,8 @@ def initialize(self, callbacks: Iterable[Callable[[], None]] = frozenset()) -> N self.xcb.xcb_get_extension_data.restype = POINTER(QueryExtensionReply) self.xcb.xcb_prefetch_extension_data.argtypes = [POINTER(Connection), POINTER(XcbExtension)] self.xcb.xcb_prefetch_extension_data.restype = None - + self.xcb.xcb_generate_id.argtypes = [POINTER(Connection)] + self.xcb.xcb_generate_id.restype = XID self.xcb.xcb_get_setup.argtypes = [POINTER(Connection)] self.xcb.xcb_get_setup.restype = POINTER(Setup) self.xcb.xcb_connection_has_error.argtypes = [POINTER(Connection)] @@ -476,6 +477,13 @@ def initialize(self, callbacks: Iterable[Callable[[], None]] = frozenset()) -> N self.render = cdll.LoadLibrary(libxcb_render_so) self.render_id = XcbExtension.in_dll(self.render, "xcb_render_id") + libxcb_shm_so = ctypes.util.find_library("xcb-shm") + if libxcb_shm_so is None: + msg = "Library libxcb-shm.so not found" + raise ScreenShotError(msg) + self.shm = cdll.LoadLibrary(libxcb_shm_so) + self.shm_id = XcbExtension.in_dll(self.shm, "xcb_shm_id") + libxcb_xfixes_so = ctypes.util.find_library("xcb-xfixes") if libxcb_xfixes_so is None: msg = "Library libxcb-xfixes.so not found" diff --git a/src/mss/linux/xgetimage.py b/src/mss/linux/xgetimage.py index 9f127948..d527ba67 100644 --- a/src/mss/linux/xgetimage.py +++ b/src/mss/linux/xgetimage.py @@ -1,22 +1,12 @@ -from typing import Any - -from mss.base import MSSBase from mss.exception import ScreenShotError from mss.models import Monitor from mss.screenshot import ScreenShot from . import xcb -from .xcb import LIB - -SUPPORTED_DEPTHS = {24, 32} -SUPPORTED_BITS_PER_PIXEL = 32 -SUPPORTED_RED_MASK = 0xFF0000 -SUPPORTED_GREEN_MASK = 0x00FF00 -SUPPORTED_BLUE_MASK = 0x0000FF -ALL_PLANES = 0xFFFFFFFF # XCB doesn't define AllPlanes +from .base import ALL_PLANES, MSSXCBBase -class MSS(MSSBase): +class MSS(MSSXCBBase): """Multiple ScreenShots implementation for GNU/Linux. This implementation is based on XCB, using the GetImage request. @@ -25,160 +15,6 @@ class MSS(MSSBase): * XFixes: Including the cursor. """ - def __init__(self, /, **kwargs: Any) -> None: # noqa: PLR0912 - super().__init__(**kwargs) - - display = kwargs.get("display", b"") - if not display: - display = None - - self.conn: xcb.Connection | None - self.conn, pref_screen_num = xcb.connect(display) - - # Let XCB pre-populate its internal cache regarding the extensions we might use, while we finish setup. - LIB.xcb.xcb_prefetch_extension_data(self.conn, LIB.randr_id) - LIB.xcb.xcb_prefetch_extension_data(self.conn, LIB.xfixes_id) - - # Get the connection setup information that was included when we connected. - xcb_setup = LIB.xcb.xcb_get_setup(self.conn).contents - screens = xcb.setup_roots(xcb_setup) - pref_screen = screens[pref_screen_num] - self.root = self.drawable = pref_screen.root - self.drawable = self.root - - # We don't probe the XFixes presence or version until we need it. - self._xfixes_ready: bool | None = None - - # Probe the visuals (and related information), and make sure that our drawable is in an acceptable format. - # These iterations and tests don't involve any traffic with the server; it's all stuff that was included in the - # connection setup. Effectively all modern setups will be acceptable, but we verify to be sure. - - # Currently, we assume that the drawable we're capturing is the root; when we add single-window capture, we'll - # have to ask the server for its depth and visual. - assert self.root == self.drawable # noqa: S101 - self.drawable_depth = pref_screen.root_depth - self.drawable_visual_id = pref_screen.root_visual.value - # Server image byte order - if xcb_setup.image_byte_order != xcb.ImageOrder.LSBFirst: - msg = "Only X11 servers using LSB-First images are supported." - raise ScreenShotError(msg) - # Depth - if self.drawable_depth not in SUPPORTED_DEPTHS: - msg = f"Only screens of color depth 24 or 32 are supported, not {self.drawable_depth}" - raise ScreenShotError(msg) - # Format (i.e., bpp, padding) - for format_ in xcb.setup_pixmap_formats(xcb_setup): - if format_.depth == self.drawable_depth: - break - else: - msg = f"Internal error: drawable's depth {self.drawable_depth} not found in screen's supported formats" - raise ScreenShotError(msg) - drawable_format = format_ - if drawable_format.bits_per_pixel != SUPPORTED_BITS_PER_PIXEL: - msg = ( - f"Only screens at 32 bpp (regardless of color depth) are supported; " - f"got {drawable_format.bits_per_pixel} bpp" - ) - raise ScreenShotError(msg) - if drawable_format.scanline_pad != SUPPORTED_BITS_PER_PIXEL: - # To clarify the padding: the scanline_pad is the multiple that the scanline gets padded to. If there is - # no padding, then it will be the same as one pixel's size. - msg = "Screens with scanline padding are not supported" - raise ScreenShotError(msg) - # Visual, the interpretation of pixels (like indexed, grayscale, etc). (Visuals are arranged by depth, so we - # iterate over the depths first.) - for xcb_depth in xcb.screen_allowed_depths(pref_screen): - if xcb_depth.depth == self.drawable_depth: - break - else: - msg = "Internal error: drawable's depth not found in screen's supported depths" - raise ScreenShotError(msg) - for visual_info in xcb.depth_visuals(xcb_depth): - if visual_info.visual_id.value == self.drawable_visual_id: - break - else: - msg = "Internal error: drawable's visual not found in screen's supported visuals" - raise ScreenShotError(msg) - if visual_info.class_ not in {xcb.VisualClass.TrueColor, xcb.VisualClass.DirectColor}: - msg = "Only TrueColor and DirectColor visuals are supported" - raise ScreenShotError(msg) - if ( - visual_info.red_mask != SUPPORTED_RED_MASK - or visual_info.green_mask != SUPPORTED_GREEN_MASK - or visual_info.blue_mask != SUPPORTED_BLUE_MASK - ): - # There are two ways to phrase this layout: BGRx accounts for the byte order, while xRGB implies the native - # word order. Since we return the data as a byte array, we use the former. By the time we get to this - # point, we've already checked the endianness and depth, so this is pretty much never going to happen - # anyway. - msg = "Only visuals with BGRx ordering are supported" - raise ScreenShotError(msg) - - def close(self) -> None: - if self.conn is not None: - xcb.disconnect(self.conn) - self.conn = None - - def _monitors_impl(self) -> None: - """Get positions of monitors. It will populate self._monitors.""" - - if self.conn is None: - msg = "Cannot identify monitors while the connection is closed" - raise ScreenShotError(msg) - - # The first entry is the whole X11 screen that the root is on. That's the one that covers all the monitors. - root_geom = xcb.get_geometry(self.conn, self.root) - self._monitors.append( - { - "left": root_geom.x, - "top": root_geom.y, - "width": root_geom.width, - "height": root_geom.height, - } - ) - - # After that, we have one for each monitor on that X11 screen. For decades, that's been handled by Xrandr. - # We don't presently try to work with Xinerama. So, we're going to check the different outputs, according to - # Xrandr. If that fails, we'll just leave the one root covering everything. - - # Make sure we have the Xrandr extension we need. This will query the cache that we started populating in - # __init__. - randr_ext_data = LIB.xcb.xcb_get_extension_data(self.conn, LIB.randr_id).contents - if not randr_ext_data.present: - return - - # We ask the server to give us anything up to the version we support (i.e., what we expect the reply structs - # to look like). If the server only supports 1.2, then that's what it'll give us, and we're ok with that, but - # we also use a faster path if the server implements at least 1.3. - randr_version_data = xcb.randr_query_version(self.conn, xcb.RANDR_MAJOR_VERSION, xcb.RANDR_MINOR_VERSION) - randr_version = (randr_version_data.major_version, randr_version_data.minor_version) - if randr_version < (1, 2): - return - - screen_resources: xcb.RandrGetScreenResourcesReply | xcb.RandrGetScreenResourcesCurrentReply - # Check to see if we have the xcb_randr_get_screen_resources_current function in libxcb-randr, and that the - # server supports it. - if hasattr(LIB.randr, "xcb_randr_get_screen_resources_current") and randr_version >= (1, 3): - screen_resources = xcb.randr_get_screen_resources_current(self.conn, self.drawable.value) - crtcs = xcb.randr_get_screen_resources_current_crtcs(screen_resources) - else: - # Either the client or the server doesn't support the _current form. That's ok; we'll use the old - # function, which forces a new query to the physical monitors. - screen_resources = xcb.randr_get_screen_resources(self.conn, self.drawable) - crtcs = xcb.randr_get_screen_resources_crtcs(screen_resources) - - for crtc in crtcs: - crtc_info = xcb.randr_get_crtc_info(self.conn, crtc, screen_resources.config_timestamp) - if crtc_info.num_outputs == 0: - continue - self._monitors.append( - {"left": crtc_info.x, "top": crtc_info.y, "width": crtc_info.width, "height": crtc_info.height} - ) - - # Extra credit would be to enumerate the virtual desktops; see - # https://specifications.freedesktop.org/wm/latest/ar01s03.html. But I don't know how widely-used that style - # is. - def _grab_impl(self, monitor: Monitor, /) -> ScreenShot: """Retrieve all pixels from a monitor. Pixels have to be RGBX.""" @@ -212,45 +48,3 @@ def _grab_impl(self, monitor: Monitor, /) -> ScreenShot: raise ScreenShotError(msg) return self.cls_image(img_data, monitor) - - def _cursor_impl_check_xfixes(self) -> bool: - if self.conn is None: - msg = "Cannot take screenshot while the connection is closed" - raise ScreenShotError(msg) - - xfixes_ext_data = LIB.xcb.xcb_get_extension_data(self.conn, LIB.xfixes_id).contents - if not xfixes_ext_data.present: - return False - - reply = xcb.xfixes_query_version(self.conn, xcb.XFIXES_MAJOR_VERSION, xcb.XFIXES_MINOR_VERSION) - # We can work with 2.0 and later, but not sure about the actual minimum version we can use. That's ok; - # everything these days is much more modern. - return (reply.major_version, reply.minor_version) >= (2, 0) - - def _cursor_impl(self) -> ScreenShot: - """Retrieve all cursor data. Pixels have to be RGBx.""" - - if self.conn is None: - msg = "Cannot take screenshot while the connection is closed" - raise ScreenShotError(msg) - - if self._xfixes_ready is None: - self._xfixes_ready = self._cursor_impl_check_xfixes() - if not self._xfixes_ready: - msg = "Server does not have XFixes, or the version is too old." - raise ScreenShotError(msg) - - cursor_img = xcb.xfixes_get_cursor_image(self.conn) - region = { - "left": cursor_img.x - cursor_img.xhot, - "top": cursor_img.y - cursor_img.yhot, - "width": cursor_img.width, - "height": cursor_img.height, - } - - data_arr = xcb.xfixes_get_cursor_image_cursor_image(cursor_img) - data = bytearray(data_arr) - # We don't need to do the same array slice-and-dice work as the Xlib-based implementation: Xlib has an - # unfortunate historical accident that makes it have to return the cursor image in a different format. - - return self.cls_image(data, region) diff --git a/src/tests/test_setup.py b/src/tests/test_setup.py index c6eea223..dddb6765 100644 --- a/src/tests/test_setup.py +++ b/src/tests/test_setup.py @@ -69,6 +69,7 @@ def test_sdist() -> None: f"mss-{__version__}/src/mss/exception.py", f"mss-{__version__}/src/mss/factory.py", f"mss-{__version__}/src/mss/linux/__init__.py", + f"mss-{__version__}/src/mss/linux/base.py", f"mss-{__version__}/src/mss/linux/xcb.py", f"mss-{__version__}/src/mss/linux/xcbgen.py", f"mss-{__version__}/src/mss/linux/xcbhelpers.py", @@ -105,6 +106,7 @@ def test_sdist() -> None: f"mss-{__version__}/src/xcbproto/gen_xcb_to_py.py", f"mss-{__version__}/src/xcbproto/randr.xml", f"mss-{__version__}/src/xcbproto/render.xml", + f"mss-{__version__}/src/xcbproto/shm.xml", f"mss-{__version__}/src/xcbproto/xfixes.xml", f"mss-{__version__}/src/xcbproto/xproto.xml", ] @@ -134,6 +136,7 @@ def test_wheel() -> None: "mss/exception.py", "mss/factory.py", "mss/linux/__init__.py", + "mss/linux/base.py", "mss/linux/xcb.py", "mss/linux/xcbgen.py", "mss/linux/xcbhelpers.py", diff --git a/src/tests/test_xcb.py b/src/tests/test_xcb.py index bff4bc00..24903ace 100644 --- a/src/tests/test_xcb.py +++ b/src/tests/test_xcb.py @@ -19,7 +19,7 @@ import pytest from mss.exception import ScreenShotError -from mss.linux import xcb, xgetimage +from mss.linux import base, xcb, xgetimage from mss.linux.xcbhelpers import ( XcbExtension, array_from_xcb, @@ -172,7 +172,7 @@ def __init__(self, monkeypatch: pytest.MonkeyPatch) -> None: randr_id=XcbExtension(), xfixes_id=XcbExtension(), ) - self._monkeypatch.setattr(xgetimage, "LIB", fake_lib) + self._monkeypatch.setattr(xcb, "LIB", fake_lib) self._monkeypatch.setattr(xcb, "connect", lambda _display=None: (self.connection, 0)) self._monkeypatch.setattr(xcb, "disconnect", lambda _conn: None) self._monkeypatch.setattr(xcb, "setup_roots", self._setup_roots) @@ -190,16 +190,16 @@ def reset(self) -> None: self.screen.root_visual = xcb.Visualid(visual_id) self.format.depth = self.screen.root_depth - self.format.bits_per_pixel = xgetimage.SUPPORTED_BITS_PER_PIXEL - self.format.scanline_pad = xgetimage.SUPPORTED_BITS_PER_PIXEL + self.format.bits_per_pixel = base.SUPPORTED_BITS_PER_PIXEL + self.format.scanline_pad = base.SUPPORTED_BITS_PER_PIXEL self.depth.depth = self.screen.root_depth self.visual.visual_id = xcb.Visualid(visual_id) self.visual.class_ = xcb.VisualClass.TrueColor - self.visual.red_mask = xgetimage.SUPPORTED_RED_MASK - self.visual.green_mask = xgetimage.SUPPORTED_GREEN_MASK - self.visual.blue_mask = xgetimage.SUPPORTED_BLUE_MASK + self.visual.red_mask = base.SUPPORTED_RED_MASK + self.visual.green_mask = base.SUPPORTED_GREEN_MASK + self.visual.blue_mask = base.SUPPORTED_BLUE_MASK self.screens = [self.screen] self.pixmap_formats = [self.format] diff --git a/src/xcbproto/gen_xcb_to_py.py b/src/xcbproto/gen_xcb_to_py.py index 6109169b..eb07c27d 100755 --- a/src/xcbproto/gen_xcb_to_py.py +++ b/src/xcbproto/gen_xcb_to_py.py @@ -71,6 +71,13 @@ "QueryVersion", "QueryPictFormats", ], + "shm": [ + "QueryVersion", + "GetImage", + "AttachFd", + "CreateSegment", + "Detach", + ], "xfixes": [ "QueryVersion", "GetCursorImage", @@ -95,7 +102,7 @@ "void": "None", } -INT_CTYPES = {"c_int8", "c_int16", "c_int32", "c_int64", "c_uint8", "c_uint16", "c_uint32", "c_uint64"} +INT_CTYPES = {"c_int", "c_int8", "c_int16", "c_int32", "c_int64", "c_uint8", "c_uint16", "c_uint32", "c_uint64"} EIGHT_BIT_TYPES = { "c_int8", @@ -308,7 +315,12 @@ class ListField: enum: str | None = None -StructMember = Field | Pad | ListField +@dataclass +class FdField: + name: str + + +StructMember = Field | Pad | ListField | FdField class StructLikeDefn(LazyDefn): @@ -351,6 +363,12 @@ def _parse_child(self, child: ET._Element) -> None: mask=child.attrib.get("mask"), ) ) + case "fd": + self.members.append( + FdField( + name=child.attrib["name"], + ) + ) case "pad": self.members.append(parse_pad(child)) case "list": @@ -647,7 +665,7 @@ def require_member(protocol: str, member: StructMember) -> None: if member.enum: enum = registry.resolve_enum(protocol, member.enum) appendnew(rv.enums, enum) - elif isinstance(member, Pad): + elif isinstance(member, (FdField, Pad)): pass else: msg = f"Unrecognized struct member {member}" @@ -763,7 +781,15 @@ def camel_case(name: str, protocol: str | None = None) -> str: def snake_case(name: str, protocol: str | None = None) -> str: - prefix = "" if protocol in {"xproto", None} else f"{snake_case(protocol)}_" # type: ignore[arg-type] + prefix = ( + "" + if protocol + in { + "xproto", + None, + } + else f"{snake_case(protocol)}_" # type: ignore[arg-type] + ) if name.islower(): return prefix + name s1 = re.sub(r"(.)([A-Z][a-z]+)", r"\1_\2", name) @@ -823,7 +849,7 @@ def format_type_name(typedefn: TypeDefn) -> str: return base_name -def format_field_name(field: Field) -> str: +def format_field_name(field: Field | FdField) -> str: name = field.name return f"{name}_" if name in RESERVED_NAMES else name @@ -995,23 +1021,29 @@ def emit_reply(writer: CodeWriter, registry: ProtocolRegistry, entry: ReplyDefn) # uint16_t sequence; # uint32_t length; # However, if the first field of the reply contents is a single byte, then it replaces pad0 in that structure. - members = entry.members[:] + nonfd_members = [m for m in entry.members if not isinstance(m, FdField)] + field_entries: list[tuple[str, str] | StructMember] = [("response_type", "c_uint8")] - if members and isinstance(members[0], Field) and is_eight_bit(registry, entry.protocol, members[0].type): - member = members.pop(0) + if ( + nonfd_members + and isinstance(nonfd_members[0], Field) + and is_eight_bit(registry, entry.protocol, nonfd_members[0].type) + ): + member = nonfd_members.pop(0) assert isinstance(member, Field) # noqa: S101 name = format_field_name(member) type_expr = python_type_for(registry, entry.protocol, member.type) field_entries.append((name, type_expr)) - elif members and (isinstance(members[0], Pad) and members[0].bytes == 1): + elif nonfd_members and (isinstance(nonfd_members[0], Pad) and nonfd_members[0].bytes == 1): # XFixes puts the padding byte explicitly at the start of the replies, but it just gets folded in the same way. - member = members.pop(0) + member = nonfd_members.pop(0) field_entries.append(member) else: field_entries.append(Pad(bytes=1)) field_entries.append(("sequence", "c_uint16")) field_entries.append(("length", "c_uint32")) - field_entries += members + field_entries += nonfd_members + return emit_structlike(writer, registry, entry, field_entries) @@ -1123,6 +1155,39 @@ def emit_lists(writer: CodeWriter, registry: ProtocolRegistry, types: list[TypeD return rv +# File descriptor accessor wrappers + + +def emit_fds(writer: CodeWriter, _registry: ProtocolRegistry, types: list[TypeDefn]) -> list[FuncDecl]: + rv: list[FuncDecl] = [] + for typ in types: + if not isinstance(typ, StructLikeDefn): + continue + fd_members = [m for m in typ.members if isinstance(m, FdField)] + if not fd_members: + continue + if len(fd_members) > 1: + # I simply don't know how this would be represented in libxcb. + msg = f"Struct {typ.protocol}:{typ.name} has multiple FdFields, which is unsupported" + raise GenerationError(msg) + # The way that the reply fd accessor is named is not that great: + # rather than having a function named after the field, it's named with just an "_fd" suffix. + func_name = f"{format_function_name(typ.name, typ.protocol)}_reply_fds" + writer.write() + writer.write(f"def {func_name}(c: Connection, r: {format_type_name(typ)}) -> _Pointer[c_int]:") + with writer.indent(): + writer.write(f"return LIB.{lib_for_proto(typ.protocol)}.xcb_{func_name}(c, r)") + rv.append( + FuncDecl( + typ.protocol, + f"xcb_{func_name}", + ["POINTER(Connection)", f"POINTER({format_type_name(typ)})"], + "POINTER(c_int)", + ) + ) + return rv + + # Request wrapper functions @@ -1139,8 +1204,12 @@ def emit_requests(writer: CodeWriter, registry: ProtocolRegistry, requests: list # a function when you call it. params: list[tuple[str, str]] = [("c", "Connection")] params += [ - (format_field_name(field), python_type_for(registry, request.protocol, field.type)) - for field in request.fields + ( + format_field_name(field), + python_type_for(registry, request.protocol, field.type) if isinstance(field, Field) else "c_int", + ) + for field in request.members + if not isinstance(field, (Pad, ListField)) ] params_types = [p[1] for p in params] # Arrange for the wrappers to take Python ints in place of any of the int-based ctypes. @@ -1209,6 +1278,7 @@ def generate( emit_enums(writer, registry, plan.enums) func_decls += emit_types(writer, registry, plan.types) func_decls += emit_lists(writer, registry, plan.types) + func_decls += emit_fds(writer, registry, plan.types) func_decls += emit_requests(writer, registry, plan.requests) emit_initialize(writer, func_decls) diff --git a/src/xcbproto/shm.xml b/src/xcbproto/shm.xml new file mode 100644 index 00000000..c1114e0a --- /dev/null +++ b/src/xcbproto/shm.xml @@ -0,0 +1,350 @@ + + + + xproto + + + + + + + + + + + + + Report that an XCB_SHM_PUT_IMAGE request has completed + + + + + + + + + + + + + + + + + + + + + + The version of the MIT-SHM extension supported by the server + + + + + The UID of the server. + The GID of the server. + + + + Query the version of the MIT-SHM extension. + + + + + + + + + + + Attach a System V shared memory segment. + + + + + + + + + + + Destroys the specified shared memory segment. + + The segment to be destroyed. + + + + + + + + + + + + + + + + + + + + + + Copy data from the shared memory to the specified drawable. + + The drawable to draw to. + The graphics context to use. + The total width of the source image. + The total height of the source image. + The source X coordinate of the sub-image to copy. + The source Y coordinate of the sub-image to copy. + + + + + The depth to use. + + + The offset that the source image starts at. + + + + + + + + + + + + + + + + + + + + + Indicates the result of the copy. + + The depth of the source drawable. + The visual ID of the source drawable. + The number of bytes copied. + + + + Copies data from the specified drawable to the shared memory segment. + + The drawable to copy the image out of. + The X coordinate in the drawable to begin copying at. + The Y coordinate in the drawable to begin copying at. + The width of the image to copy. + The height of the image to copy. + A mask that determines which planes are used. + The format to use for the copy (???). + The destination shared memory segment. + The offset in the shared memory segment to copy data to. + + + + + + + + + + + + + + Create a pixmap backed by shared memory. + +Create a pixmap backed by shared memory. Writes to the shared memory will be +reflected in the contents of the pixmap, and writes to the pixmap will be +reflected in the contents of the shared memory. + + A pixmap ID created with xcb_generate_id(). + The drawable to create the pixmap in. + + + + + + + + + + + + + + + Create a shared memory segment + + + The file descriptor the server should mmap(). + + + + + + + + + + + + + + + The returned file descriptor. + + + + + + Asks the server to allocate a shared memory segment. + + + The size of the segment to create. + + + + From 3a0c78c2d86397fd0e918ffbc94d5be6f86f7c0b Mon Sep 17 00:00:00 2001 From: Joel Ray Holveck Date: Fri, 21 Nov 2025 19:11:49 -0800 Subject: [PATCH 2/9] Implement an XShmGetImage-based backend. This is close to complete, but there's a few things that need to be chased down: notably, the test_thread_safety test is failing, for some reason. It also currently doesn't work correctly if the root window size increases. That said, this is quite promising: on my computer, the new backend can take 4k screenshots at 30-34 fps, while the XGetImage backend could only run at 11-14 fps. --- src/mss/linux/__init__.py | 4 + src/mss/linux/base.py | 49 ++++++++- src/mss/linux/xcb.py | 12 +++ src/mss/linux/xcbgen.py | 24 ++--- src/mss/linux/xgetimage.py | 38 +------ src/mss/linux/xshmgetimage.py | 172 +++++++++++++++++++++++++++++++ src/tests/conftest.py | 2 +- src/tests/test_implementation.py | 4 + src/tests/test_setup.py | 2 + src/xcbproto/gen_xcb_to_py.py | 9 +- 10 files changed, 260 insertions(+), 56 deletions(-) create mode 100644 src/mss/linux/xshmgetimage.py diff --git a/src/mss/linux/__init__.py b/src/mss/linux/__init__.py index 46993f1f..1676ffc0 100644 --- a/src/mss/linux/__init__.py +++ b/src/mss/linux/__init__.py @@ -14,6 +14,10 @@ def mss(backend: str = "default", **kwargs: Any) -> MSSBase: from . import xgetimage # noqa: PLC0415 return xgetimage.MSS(**kwargs) + if backend == "xshmgetimage": + from . import xshmgetimage # noqa: PLC0415 + + return xshmgetimage.MSS(**kwargs) msg = f"Backend {backend!r} not (yet?) implemented." raise ScreenShotError(msg) diff --git a/src/mss/linux/base.py b/src/mss/linux/base.py index 735d6baa..489863ce 100644 --- a/src/mss/linux/base.py +++ b/src/mss/linux/base.py @@ -9,6 +9,7 @@ from .xcb import LIB if TYPE_CHECKING: + from mss.models import Monitor from mss.screenshot import ScreenShot SUPPORTED_DEPTHS = {24, 32} @@ -48,8 +49,8 @@ def __init__(self, /, **kwargs: Any) -> None: # noqa: PLR0912 # Get the connection setup information that was included when we connected. xcb_setup = xcb.get_setup(self.conn) screens = xcb.setup_roots(xcb_setup) - pref_screen = screens[pref_screen_num] - self.root = self.drawable = pref_screen.root + self.pref_screen = screens[pref_screen_num] + self.root = self.drawable = self.pref_screen.root # We don't probe the XFixes presence or version until we need it. self._xfixes_ready: bool | None = None @@ -61,8 +62,8 @@ def __init__(self, /, **kwargs: Any) -> None: # noqa: PLR0912 # Currently, we assume that the drawable we're capturing is the root; when we add single-window capture, # we'll have to ask the server for its depth and visual. assert self.root == self.drawable # noqa: S101 - self.drawable_depth = pref_screen.root_depth - self.drawable_visual_id = pref_screen.root_visual.value + self.drawable_depth = self.pref_screen.root_depth + self.drawable_visual_id = self.pref_screen.root_visual.value # Server image byte order if xcb_setup.image_byte_order != xcb.ImageOrder.LSBFirst: msg = "Only X11 servers using LSB-First images are supported." @@ -92,7 +93,7 @@ def __init__(self, /, **kwargs: Any) -> None: # noqa: PLR0912 raise ScreenShotError(msg) # Visual, the interpretation of pixels (like indexed, grayscale, etc). (Visuals are arranged by depth, so # we iterate over the depths first.) - for xcb_depth in xcb.screen_allowed_depths(pref_screen): + for xcb_depth in xcb.screen_allowed_depths(self.pref_screen): if xcb_depth.depth == self.drawable_depth: break else: @@ -226,3 +227,41 @@ def _cursor_impl(self) -> ScreenShot: # unfortunate historical accident that makes it have to return the cursor image in a different format. return self.cls_image(data, region) + + def _grab_impl_xgetimage(self, monitor: Monitor, /) -> ScreenShot: + """Retrieve all pixels from a monitor using GetImage. + + This is used by the XGetImage backend, and also the XShmGetImage + backend in fallback mode. + """ + + if self.conn is None: + msg = "Cannot take screenshot while the connection is closed" + raise ScreenShotError(msg) + + img_reply = xcb.get_image( + self.conn, + xcb.ImageFormat.ZPixmap, + self.drawable, + monitor["left"], + monitor["top"], + monitor["width"], + monitor["height"], + ALL_PLANES, + ) + + # Now, save the image. This is a reference into the img_reply structure. + img_data_arr = xcb.get_image_data(img_reply) + # Copy this into a new bytearray, so that it will persist after we clear the image structure. + img_data = bytearray(img_data_arr) + + if img_reply.depth != self.drawable_depth or img_reply.visual.value != self.drawable_visual_id: + # This should never happen; a window can't change its visual. + msg = ( + "Server returned an image with a depth or visual different than it initially reported: " + f"expected {self.drawable_depth},{hex(self.drawable_visual_id)}, " + f"got {img_reply.depth},{hex(img_reply.visual.value)}" + ) + raise ScreenShotError(msg) + + return self.cls_image(img_data, monitor) diff --git a/src/mss/linux/xcb.py b/src/mss/linux/xcb.py index b5b19283..9a3d8d04 100644 --- a/src/mss/linux/xcb.py +++ b/src/mss/linux/xcb.py @@ -11,6 +11,8 @@ RANDR_MINOR_VERSION, RENDER_MAJOR_VERSION, RENDER_MINOR_VERSION, + SHM_MAJOR_VERSION, + SHM_MINOR_VERSION, XFIXES_MAJOR_VERSION, XFIXES_MINOR_VERSION, Atom, @@ -52,6 +54,10 @@ ScreenIterator, Setup, SetupIterator, + ShmCreateSegmentReply, + ShmGetImageReply, + ShmQueryVersionReply, + ShmSeg, Timestamp, VisualClass, Visualid, @@ -91,6 +97,12 @@ setup_pixmap_formats, setup_roots, setup_vendor, + shm_attach_fd, + shm_create_segment, + shm_create_segment_reply_fds, + shm_detach, + shm_get_image, + shm_query_version, xfixes_get_cursor_image, xfixes_get_cursor_image_cursor_image, xfixes_query_version, diff --git a/src/mss/linux/xcbgen.py b/src/mss/linux/xcbgen.py index 8225c442..6fba72b3 100644 --- a/src/mss/linux/xcbgen.py +++ b/src/mss/linux/xcbgen.py @@ -631,7 +631,7 @@ def xfixes_get_cursor_image_cursor_image(r: XfixesGetCursorImageReply) -> Array[ ) -def shm_create_segment_reply_fds(c: Connection, r: ShmCreateSegmentReply) -> _Pointer[c_int]: +def shm_create_segment_reply_fds(c: Connection | _Pointer[Connection], r: ShmCreateSegmentReply) -> _Pointer[c_int]: return LIB.shm.xcb_shm_create_segment_reply_fds(c, r) @@ -665,7 +665,7 @@ def get_property( def no_operation(c: Connection) -> None: - return LIB.xcb.xcb_no_operation(c).check(c) + return LIB.xcb.xcb_no_operation_checked(c).check(c) def randr_query_version( @@ -716,7 +716,7 @@ def shm_get_image( def shm_attach_fd(c: Connection, shmseg: ShmSeg, shm_fd: c_int | int, read_only: c_uint8 | int) -> None: - return LIB.shm.xcb_shm_attach_fd(c, shmseg, shm_fd, read_only).check(c) + return LIB.shm.xcb_shm_attach_fd_checked(c, shmseg, shm_fd, read_only).check(c) def shm_create_segment( @@ -726,7 +726,7 @@ def shm_create_segment( def shm_detach(c: Connection, shmseg: ShmSeg) -> None: - return LIB.shm.xcb_shm_detach(c, shmseg).check(c) + return LIB.shm.xcb_shm_detach_checked(c, shmseg).check(c) def xfixes_query_version( @@ -860,8 +860,8 @@ def initialize() -> None: # noqa: PLR0915 [POINTER(Connection), c_uint8, Window, Atom, Atom, c_uint32, c_uint32], GetPropertyReply, ) - LIB.xcb.xcb_no_operation.argtypes = (Connection,) - LIB.xcb.xcb_no_operation.restype = VoidCookie + LIB.xcb.xcb_no_operation_checked.argtypes = (POINTER(Connection),) + LIB.xcb.xcb_no_operation_checked.restype = VoidCookie initialize_xcb_typed_func( LIB.randr, "xcb_randr_query_version", [POINTER(Connection), c_uint32, c_uint32], RandrQueryVersionReply ) @@ -890,21 +890,21 @@ def initialize() -> None: # noqa: PLR0915 [POINTER(Connection), Drawable, c_int16, c_int16, c_uint16, c_uint16, c_uint32, c_uint8, ShmSeg, c_uint32], ShmGetImageReply, ) - LIB.shm.xcb_shm_attach_fd.argtypes = ( - Connection, + LIB.shm.xcb_shm_attach_fd_checked.argtypes = ( + POINTER(Connection), ShmSeg, c_int, c_uint8, ) - LIB.shm.xcb_shm_attach_fd.restype = VoidCookie + LIB.shm.xcb_shm_attach_fd_checked.restype = VoidCookie initialize_xcb_typed_func( LIB.shm, "xcb_shm_create_segment", [POINTER(Connection), ShmSeg, c_uint32, c_uint8], ShmCreateSegmentReply ) - LIB.shm.xcb_shm_detach.argtypes = ( - Connection, + LIB.shm.xcb_shm_detach_checked.argtypes = ( + POINTER(Connection), ShmSeg, ) - LIB.shm.xcb_shm_detach.restype = VoidCookie + LIB.shm.xcb_shm_detach_checked.restype = VoidCookie initialize_xcb_typed_func( LIB.xfixes, "xcb_xfixes_query_version", [POINTER(Connection), c_uint32, c_uint32], XfixesQueryVersionReply ) diff --git a/src/mss/linux/xgetimage.py b/src/mss/linux/xgetimage.py index d527ba67..a41368c6 100644 --- a/src/mss/linux/xgetimage.py +++ b/src/mss/linux/xgetimage.py @@ -1,9 +1,7 @@ -from mss.exception import ScreenShotError from mss.models import Monitor from mss.screenshot import ScreenShot -from . import xcb -from .base import ALL_PLANES, MSSXCBBase +from .base import MSSXCBBase class MSS(MSSXCBBase): @@ -15,36 +13,6 @@ class MSS(MSSXCBBase): * XFixes: Including the cursor. """ - def _grab_impl(self, monitor: Monitor, /) -> ScreenShot: + def _grab_impl(self, monitor: Monitor) -> ScreenShot: """Retrieve all pixels from a monitor. Pixels have to be RGBX.""" - - if self.conn is None: - msg = "Cannot take screenshot while the connection is closed" - raise ScreenShotError(msg) - - img_reply = xcb.get_image( - self.conn, - xcb.ImageFormat.ZPixmap, - self.drawable, - monitor["left"], - monitor["top"], - monitor["width"], - monitor["height"], - ALL_PLANES, - ) - - # Now, save the image. This is a reference into the img_reply structure. - img_data_arr = xcb.get_image_data(img_reply) - # Copy this into a new bytearray, so that it will persist after we clear the image structure. - img_data = bytearray(img_data_arr) - - if img_reply.depth != self.drawable_depth or img_reply.visual.value != self.drawable_visual_id: - # This should never happen; a window can't change its visual. - msg = ( - "Server returned an image with a depth or visual different than it initially reported: " - f"expected {self.drawable_depth},{hex(self.drawable_visual_id)}, " - f"got {img_reply.depth},{hex(img_reply.visual.value)}" - ) - raise ScreenShotError(msg) - - return self.cls_image(img_data, monitor) + return super()._grab_impl_xgetimage(monitor) diff --git a/src/mss/linux/xshmgetimage.py b/src/mss/linux/xshmgetimage.py new file mode 100644 index 00000000..14374b29 --- /dev/null +++ b/src/mss/linux/xshmgetimage.py @@ -0,0 +1,172 @@ +from __future__ import annotations + +import errno +import os +from mmap import PROT_READ, mmap # type: ignore[attr-defined] +from typing import TYPE_CHECKING, Any, Literal + +from mss.exception import ScreenShotError +from mss.linux import xcb +from mss.linux.xcbhelpers import LIB, XProtoError + +from .base import ALL_PLANES, MSSXCBBase + +if TYPE_CHECKING: + from mss.models import Monitor + from mss.screenshot import ScreenShot + + +class MSS(MSSXCBBase): + """Multiple ScreenShots implementation for GNU/Linux. + + This implementation is based on XCB, using the ShmGetImage request. + If ShmGetImage fails, then this will fall back to using GetImage. + """ + + def __init__(self, /, **kwargs: Any) -> None: + super().__init__(**kwargs) + + # These are the objects we need to clean up when we shut down. They are created in _setup_shm. + self._memfd: int | None = None + self._buf: mmap | None = None + self._shmseg: xcb.ShmSeg | None = None + + # _shm_works is True if at least one screenshot has been taken, False if it's known to fail, and None until + # then. + self._shm_works: bool | None = self._setup_shm() + + def _shm_report_issue(self, msg: str, *args: Any) -> None: + """Debugging hook for troubleshooting MIT-SHM issues. + + This will be called whenever MIT-SHM is disabled. The optional + arguments are not well-defined; exceptions are common. + """ + print(msg, args) + + def _setup_shm(self) -> Literal[False] | None: + assert self.conn is not None # noqa: S101 + + shm_ext_data = xcb.get_extension_data(self.conn, LIB.shm_id) + if not shm_ext_data.present: + self._shm_report_issue("MIT-SHM extension not present") + return False + + # We use the FD-based version of ShmGetImage, so we require the extension to be at least 1.3. + shm_version_data = xcb.shm_query_version(self.conn) + shm_version = (shm_version_data.major_version, shm_version_data.minor_version) + if shm_version < (1, 2): + self._shm_report_issue("MIT-SHM version too old", shm_version) + return False + + # We allocate something large enough for the root, so we don't have to reallocate each time the window is + # resized. + # TODO(jholveck): Check in _grab_impl that we're not going to exceed this size. That can happen if the + # root is resized. + size = self.pref_screen.width_in_pixels * self.pref_screen.height_in_pixels * 4 + + try: + self._memfd = os.memfd_create("mss-shm-buf", flags=os.MFD_CLOEXEC) # type: ignore[attr-defined] + except OSError as e: + self._shm_report_issue("memfd_create failed", e) + self._shutdown_shm() + return False + os.ftruncate(self._memfd, size) + + try: + self._buf = mmap(self._memfd, size, prot=PROT_READ) # type: ignore[call-arg] + except OSError as e: + self._shm_report_issue("mmap failed", e) + self._shutdown_shm() + return False + + self._shmseg = xcb.ShmSeg(xcb.generate_id(self.conn).value) + try: + # This will normally be what raises an exception if you're on a remote connection. I previously thought + # the server deferred that until the GetImage call, but I had not been properly checking the status here. + xcb.shm_attach_fd(self.conn, self._shmseg, self._memfd, read_only=False) + except xcb.XError as e: + self._shm_report_issue("Cannot attach MIT-SHM segment", e) + self._shutdown_shm() + return False + + return None + + def close(self) -> None: + self._shutdown_shm() + super().close() + + def _shutdown_shm(self) -> None: + # It would be nice to also try to tell the server to detach the shmseg, but we might be in an error path + # and don't know if that's possible. It's not like we'll leak a lot of them on the same connection anyway. + # This can be called in the path of partial initialization. + if self._buf is not None: + self._buf.close() + self._buf = None + if self._memfd is not None: + # TODO(jholveck): For some reason, at this point, self._memfd is no longer valid. If I try to close it, + # I get EBADF, even if I try to close it before closing the mmap. The theories I have about this involve + # the mmap object taking control, but it doesn't make sense that I could still use shm_attach_fd in that + # case. I need to investigate before releasing. + try: + os.close(self._memfd) + except OSError as e: + if e.errno != errno.EBADF: + raise + self._memfd = None + + def _grab_impl_xshmgetimage(self, monitor: Monitor) -> ScreenShot: + if self.conn is None: + msg = "Cannot take screenshot while the connection is closed" + raise ScreenShotError(msg) + assert self._buf is not None # noqa: S101 + assert self._shmseg is not None # noqa: S101 + + img_reply = xcb.shm_get_image( + self.conn, + self.drawable.value, + monitor["left"], + monitor["top"], + monitor["width"], + monitor["height"], + ALL_PLANES, + xcb.ImageFormat.ZPixmap, + self._shmseg, + 0, + ) + + if img_reply.depth != self.drawable_depth or img_reply.visual.value != self.drawable_visual_id: + # This should never happen; a window can't change its visual. + msg = ( + "Server returned an image with a depth or visual different than it initially reported: " + f"expected {self.drawable_depth},{hex(self.drawable_visual_id)}, " + f"got {img_reply.depth},{hex(img_reply.visual.value)}" + ) + raise ScreenShotError(msg) + + # Snapshot the buffer into new bytearray. + new_size = monitor["width"] * monitor["height"] * 4 + # Slicing the memoryview creates a new memoryview that points to the relevant subregion. Making this and + # then copying it into a fresh bytearray is much faster than slicing the mmap object. + img_mv = memoryview(self._buf)[:new_size] + img_data = bytearray(img_mv) + + return self.cls_image(img_data, monitor) + + def _grab_impl(self, monitor: Monitor) -> ScreenShot: + """Retrieve all pixels from a monitor. Pixels have to be RGBX.""" + if self._shm_works == False: # noqa: E712 + return super()._grab_impl_xgetimage(monitor) + + try: + rv = self._grab_impl_xshmgetimage(monitor) + except XProtoError as e: + if self._shm_works is not None: + raise + self._shm_report_issue("MIT-SHM GetImage failed", e) + self._shm_works = False + self._shutdown_shm() + rv = super()._grab_impl_xgetimage(monitor) + else: + self._shm_works = True + + return rv diff --git a/src/tests/conftest.py b/src/tests/conftest.py index 584b0b75..7c272449 100644 --- a/src/tests/conftest.py +++ b/src/tests/conftest.py @@ -70,7 +70,7 @@ def raw() -> bytes: return data -@pytest.fixture(params=["xlib", "xgetimage"] if system() == "Linux" else ["default"]) +@pytest.fixture(params=["xlib", "xgetimage", "xshmgetimage"] if system() == "Linux" else ["default"]) def backend(request: pytest.FixtureRequest) -> str: return request.param diff --git a/src/tests/test_implementation.py b/src/tests/test_implementation.py index d02c2b95..a30f0cd9 100644 --- a/src/tests/test_implementation.py +++ b/src/tests/test_implementation.py @@ -245,6 +245,10 @@ def test_grab_with_tuple_percents(mss_impl: Callable[..., MSSBase]) -> None: def test_thread_safety(backend: str) -> None: """Regression test for issue #169.""" + # This currently breaks on xshmgetimage. Resource exhaustion? Something leaking? + if backend == "xshmgetimage": + pytest.xfail() + def record(check: dict) -> None: """Record for one second.""" start_time = time.time() diff --git a/src/tests/test_setup.py b/src/tests/test_setup.py index dddb6765..391b22f4 100644 --- a/src/tests/test_setup.py +++ b/src/tests/test_setup.py @@ -75,6 +75,7 @@ def test_sdist() -> None: f"mss-{__version__}/src/mss/linux/xcbhelpers.py", f"mss-{__version__}/src/mss/linux/xgetimage.py", f"mss-{__version__}/src/mss/linux/xlib.py", + f"mss-{__version__}/src/mss/linux/xshmgetimage.py", f"mss-{__version__}/src/mss/models.py", f"mss-{__version__}/src/mss/py.typed", f"mss-{__version__}/src/mss/screenshot.py", @@ -142,6 +143,7 @@ def test_wheel() -> None: "mss/linux/xcbhelpers.py", "mss/linux/xgetimage.py", "mss/linux/xlib.py", + "mss/linux/xshmgetimage.py", "mss/models.py", "mss/py.typed", "mss/screenshot.py", diff --git a/src/xcbproto/gen_xcb_to_py.py b/src/xcbproto/gen_xcb_to_py.py index eb07c27d..1e41acc8 100755 --- a/src/xcbproto/gen_xcb_to_py.py +++ b/src/xcbproto/gen_xcb_to_py.py @@ -1174,7 +1174,9 @@ def emit_fds(writer: CodeWriter, _registry: ProtocolRegistry, types: list[TypeDe # rather than having a function named after the field, it's named with just an "_fd" suffix. func_name = f"{format_function_name(typ.name, typ.protocol)}_reply_fds" writer.write() - writer.write(f"def {func_name}(c: Connection, r: {format_type_name(typ)}) -> _Pointer[c_int]:") + writer.write( + f"def {func_name}(c: Connection | _Pointer[Connection], r: {format_type_name(typ)}) -> _Pointer[c_int]:" + ) with writer.indent(): writer.write(f"return LIB.{lib_for_proto(typ.protocol)}.xcb_{func_name}(c, r)") rv.append( @@ -1216,12 +1218,14 @@ def emit_requests(writer: CodeWriter, registry: ProtocolRegistry, requests: list params_with_alts = [(p[0], f"{p[1]} | int" if p[1] in INT_CTYPES else p[1]) for p in params] params_string = ", ".join(f"{p[0]}: {p[1]}" for p in params_with_alts) args_string = ", ".join(p[0] for p in params) + xcb_params_types = ["POINTER(Connection)", *params_types[1:]] if request.reply is None: + xcb_func_name += "_checked" writer.write() writer.write(f"def {func_name}({params_string}) -> None:") with writer.indent(): writer.write(f"return LIB.{lib}.{xcb_func_name}({args_string}).check(c)") - rv.append(FuncDecl(request.protocol, xcb_func_name, params_types, "VoidCookie")) + rv.append(FuncDecl(request.protocol, xcb_func_name, xcb_params_types, "VoidCookie")) else: reply_type = request.reply reply_type_name = format_type_name(reply_type) @@ -1232,7 +1236,6 @@ def emit_requests(writer: CodeWriter, registry: ProtocolRegistry, requests: list # We have to use initialize_xcb_typed_func to initialize late, rather than making the cookie class here, # because the cookie definition needs to reference the XCB reply function. We could also do a lazy # initialization, but it's probably not worth it. - xcb_params_types = ["POINTER(Connection)", *params_types[1:]] rv.append( f'initialize_xcb_typed_func(LIB.{lib}, "{xcb_func_name}", ' f"[{', '.join(xcb_params_types)}], {reply_type_name})" From 37532576a90f47c8d3964af58eb30b9fe59e4a0d Mon Sep 17 00:00:00 2001 From: Joel Ray Holveck Date: Fri, 21 Nov 2025 19:24:18 -0800 Subject: [PATCH 3/9] Expand the xcb-util-errors test to include xshmgetimage --- src/tests/test_gnu_linux.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/tests/test_gnu_linux.py b/src/tests/test_gnu_linux.py index 82cef143..b30ecbbd 100644 --- a/src/tests/test_gnu_linux.py +++ b/src/tests/test_gnu_linux.py @@ -191,7 +191,7 @@ def test_region_out_of_monitor_bounds(display: str, backend: str) -> None: details = exc.value.details assert details assert isinstance(details, dict) - if backend == "xgetimage" and mss.linux.xcb.LIB.errors is None: + if backend in {"xgetimage", "xshmgetimage"} and mss.linux.xcb.LIB.errors is None: pytest.xfail("Error strings in XCB backends are only available with the xcb-util-errors library.") assert isinstance(details["error"], str) From 736440876b5f7579a04709962fe7e2d1eeceb1a0 Mon Sep 17 00:00:00 2001 From: Joel Holveck Date: Sun, 23 Nov 2025 00:17:31 +0000 Subject: [PATCH 4/9] Track down the closed memfd. Previously, I had been trying to close the memfd in _shutdown_shm, and ignoring EBADF. It turns out that XCB will close the memfd when you send it to the X server. I think this was one potential cause of the issues I saw in test_thread_safety: the two threads would be reallocated each others' fds, leading to thread A closing an fd that thread B was using, thinking that it was thread A's memfd. Fix so that the memfd is only explicitly closed in an error situation. --- src/mss/linux/xcb.py | 2 +- src/mss/linux/xcbhelpers.py | 7 +- src/mss/linux/xshmgetimage.py | 139 ++++++++++++++++++------------- src/tests/test_gnu_linux.py | 38 +++++++++ src/tests/test_implementation.py | 4 - 5 files changed, 122 insertions(+), 68 deletions(-) diff --git a/src/mss/linux/xcb.py b/src/mss/linux/xcb.py index 9a3d8d04..f599e2a7 100644 --- a/src/mss/linux/xcb.py +++ b/src/mss/linux/xcb.py @@ -1,6 +1,6 @@ from __future__ import annotations -from ctypes import Structure, _Pointer, c_int, c_uint8, c_uint16, c_uint32 +from ctypes import _Pointer, c_int from . import xcbgen diff --git a/src/mss/linux/xcbhelpers.py b/src/mss/linux/xcbhelpers.py index a3a195d3..f387c79e 100644 --- a/src/mss/linux/xcbhelpers.py +++ b/src/mss/linux/xcbhelpers.py @@ -230,10 +230,9 @@ def __str__(self) -> str: ext_desc = f"\n Extension: {details['extension']}" if "extension" in details else "" msg += ( f"\nX Error of failed request: {error_desc}" - f"\n Major opcode of failed request: {major_desc}" - f"{ext_desc}" - f"\n Minor opcode of failed request: {minor_desc}" - f"\n Resource id in failed request: {details['resource_id']}" + f"\n Major opcode of failed request: {major_desc}{ext_desc}" + + (f"\n Minor opcode of failed request: {minor_desc}" if details["minor_code"] != 0 else "") + + f"\n Resource id in failed request: {details['resource_id']}" f"\n Serial number of failed request: {details['full_sequence']}" ) return msg diff --git a/src/mss/linux/xshmgetimage.py b/src/mss/linux/xshmgetimage.py index 14374b29..da91b363 100644 --- a/src/mss/linux/xshmgetimage.py +++ b/src/mss/linux/xshmgetimage.py @@ -1,9 +1,9 @@ from __future__ import annotations -import errno +import enum import os from mmap import PROT_READ, mmap # type: ignore[attr-defined] -from typing import TYPE_CHECKING, Any, Literal +from typing import TYPE_CHECKING, Any from mss.exception import ScreenShotError from mss.linux import xcb @@ -16,6 +16,12 @@ from mss.screenshot import ScreenShot +class ShmStatus(enum.Enum): + UNKNOWN = enum.auto() + AVAILABLE = enum.auto() + UNAVAILABLE = enum.auto() + + class MSS(MSSXCBBase): """Multiple ScreenShots implementation for GNU/Linux. @@ -31,9 +37,8 @@ def __init__(self, /, **kwargs: Any) -> None: self._buf: mmap | None = None self._shmseg: xcb.ShmSeg | None = None - # _shm_works is True if at least one screenshot has been taken, False if it's known to fail, and None until - # then. - self._shm_works: bool | None = self._setup_shm() + self.shm_status: ShmStatus = self._setup_shm() + self.shm_failed_reason: str | None = None def _shm_report_issue(self, msg: str, *args: Any) -> None: """Debugging hook for troubleshooting MIT-SHM issues. @@ -41,55 +46,69 @@ def _shm_report_issue(self, msg: str, *args: Any) -> None: This will be called whenever MIT-SHM is disabled. The optional arguments are not well-defined; exceptions are common. """ - print(msg, args) + full_msg = msg + if args: + full_msg += " | " + ", ".join(str(arg) for arg in args) + self.shm_failed_reason = full_msg - def _setup_shm(self) -> Literal[False] | None: + def _setup_shm(self) -> ShmStatus: # noqa: PLR0911 assert self.conn is not None # noqa: S101 - shm_ext_data = xcb.get_extension_data(self.conn, LIB.shm_id) - if not shm_ext_data.present: - self._shm_report_issue("MIT-SHM extension not present") - return False - - # We use the FD-based version of ShmGetImage, so we require the extension to be at least 1.3. - shm_version_data = xcb.shm_query_version(self.conn) - shm_version = (shm_version_data.major_version, shm_version_data.minor_version) - if shm_version < (1, 2): - self._shm_report_issue("MIT-SHM version too old", shm_version) - return False - - # We allocate something large enough for the root, so we don't have to reallocate each time the window is - # resized. - # TODO(jholveck): Check in _grab_impl that we're not going to exceed this size. That can happen if the - # root is resized. - size = self.pref_screen.width_in_pixels * self.pref_screen.height_in_pixels * 4 - try: - self._memfd = os.memfd_create("mss-shm-buf", flags=os.MFD_CLOEXEC) # type: ignore[attr-defined] - except OSError as e: - self._shm_report_issue("memfd_create failed", e) - self._shutdown_shm() - return False - os.ftruncate(self._memfd, size) + shm_ext_data = xcb.get_extension_data(self.conn, LIB.shm_id) + if not shm_ext_data.present: + self._shm_report_issue("MIT-SHM extension not present") + return ShmStatus.UNAVAILABLE + + # We use the FD-based version of ShmGetImage, so we require the extension to be at least 1.2. + shm_version_data = xcb.shm_query_version(self.conn) + shm_version = (shm_version_data.major_version, shm_version_data.minor_version) + if shm_version < (1, 2): + self._shm_report_issue("MIT-SHM version too old", shm_version) + return ShmStatus.UNAVAILABLE + + # We allocate something large enough for the root, so we don't have to reallocate each time the window is + # resized. + self._bufsize = self.pref_screen.width_in_pixels * self.pref_screen.height_in_pixels * 4 + + if not hasattr(os, "memfd_create"): + self._shm_report_issue("os.memfd_create not available") + return ShmStatus.UNAVAILABLE + try: + self._memfd = os.memfd_create("mss-shm-buf", flags=os.MFD_CLOEXEC) # type: ignore[attr-defined] + except OSError as e: + self._shm_report_issue("memfd_create failed", e) + self._shutdown_shm() + return ShmStatus.UNAVAILABLE + os.ftruncate(self._memfd, self._bufsize) - try: - self._buf = mmap(self._memfd, size, prot=PROT_READ) # type: ignore[call-arg] - except OSError as e: - self._shm_report_issue("mmap failed", e) - self._shutdown_shm() - return False + try: + self._buf = mmap(self._memfd, self._bufsize, prot=PROT_READ) # type: ignore[call-arg] + except OSError as e: + self._shm_report_issue("mmap failed", e) + self._shutdown_shm() + return ShmStatus.UNAVAILABLE - self._shmseg = xcb.ShmSeg(xcb.generate_id(self.conn).value) - try: - # This will normally be what raises an exception if you're on a remote connection. I previously thought - # the server deferred that until the GetImage call, but I had not been properly checking the status here. - xcb.shm_attach_fd(self.conn, self._shmseg, self._memfd, read_only=False) - except xcb.XError as e: - self._shm_report_issue("Cannot attach MIT-SHM segment", e) + self._shmseg = xcb.ShmSeg(xcb.generate_id(self.conn).value) + try: + # This will normally be what raises an exception if you're on a remote connection. (I previously + # thought the server deferred that until the GetImage call, but I had not been properly checking the + # status.) + # This will close _memfd, on success or on failure. + try: + xcb.shm_attach_fd(self.conn, self._shmseg, self._memfd, read_only=False) + finally: + self._memfd = None + except xcb.XError as e: + self._shm_report_issue("Cannot attach MIT-SHM segment", e) + self._shutdown_shm() + return ShmStatus.UNAVAILABLE + + except Exception: self._shutdown_shm() - return False + raise - return None + return ShmStatus.UNKNOWN def close(self) -> None: self._shutdown_shm() @@ -103,15 +122,7 @@ def _shutdown_shm(self) -> None: self._buf.close() self._buf = None if self._memfd is not None: - # TODO(jholveck): For some reason, at this point, self._memfd is no longer valid. If I try to close it, - # I get EBADF, even if I try to close it before closing the mmap. The theories I have about this involve - # the mmap object taking control, but it doesn't make sense that I could still use shm_attach_fd in that - # case. I need to investigate before releasing. - try: - os.close(self._memfd) - except OSError as e: - if e.errno != errno.EBADF: - raise + os.close(self._memfd) self._memfd = None def _grab_impl_xshmgetimage(self, monitor: Monitor) -> ScreenShot: @@ -121,6 +132,16 @@ def _grab_impl_xshmgetimage(self, monitor: Monitor) -> ScreenShot: assert self._buf is not None # noqa: S101 assert self._shmseg is not None # noqa: S101 + required_size = monitor["width"] * monitor["height"] * 4 + if required_size > self._bufsize: + # This is temporary. The permanent fix will depend on how + # issue https://github.com/BoboTiG/python-mss/issues/432 is resolved. + msg = ( + "Requested capture size exceeds the allocated buffer. If you have resized the screen, " + "please recreate your MSS object." + ) + raise ScreenShotError(msg) + img_reply = xcb.shm_get_image( self.conn, self.drawable.value, @@ -154,19 +175,19 @@ def _grab_impl_xshmgetimage(self, monitor: Monitor) -> ScreenShot: def _grab_impl(self, monitor: Monitor) -> ScreenShot: """Retrieve all pixels from a monitor. Pixels have to be RGBX.""" - if self._shm_works == False: # noqa: E712 + if self.shm_status == ShmStatus.UNAVAILABLE: return super()._grab_impl_xgetimage(monitor) try: rv = self._grab_impl_xshmgetimage(monitor) except XProtoError as e: - if self._shm_works is not None: + if self.shm_status != ShmStatus.UNKNOWN: raise self._shm_report_issue("MIT-SHM GetImage failed", e) - self._shm_works = False + self.shm_status = ShmStatus.UNAVAILABLE self._shutdown_shm() rv = super()._grab_impl_xgetimage(monitor) else: - self._shm_works = True + self.shm_status = ShmStatus.AVAILABLE return rv diff --git a/src/tests/test_gnu_linux.py b/src/tests/test_gnu_linux.py index b30ecbbd..6bb118ac 100644 --- a/src/tests/test_gnu_linux.py +++ b/src/tests/test_gnu_linux.py @@ -310,3 +310,41 @@ def test_with_cursor_failure(display: str) -> None: pytest.raises(ScreenShotError), ): sct.grab(sct.monitors[1]) + + +def test_shm_available() -> None: + """Verify that the xshmgetimage backend doesn't always fallback. + + Since this backend does an automatic fallback for certain types of + anticipated issues, that could cause some failures to be masked. + Ensure this isn't happening. + """ + with ( + pyvirtualdisplay.Display(size=(WIDTH, HEIGHT), color_depth=DEPTH) as vdisplay, + mss.mss(display=vdisplay.new_display_var, backend="xshmgetimage") as sct, + ): + assert isinstance(sct, mss.linux.xshmgetimage.MSS) # For Mypy + # The status currently isn't established as final until a grab succeeds. + sct.grab(sct.monitors[0]) + assert sct.shm_status == mss.linux.xshmgetimage.ShmStatus.AVAILABLE + + +def test_shm_fallback() -> None: + """Verify that the xshmgetimage backend falls back if MIT-SHM fails. + + The most common case when a fallback is needed is with a TCP + connection, such as the one used with ssh relaying. By using + DISPLAY=localhost:99 instead of DISPLAY=:99, we connect over TCP + instead of a local-domain socket. This is sufficient to prevent + MIT-SHM from completing its setup: the extension is available, but + won't be able to attach a shared memory segment. + """ + with ( + pyvirtualdisplay.Display(size=(WIDTH, HEIGHT), color_depth=DEPTH, extra_args=["-listen", "tcp"]) as vdisplay, + mss.mss(display=f"localhost{vdisplay.new_display_var}", backend="xshmgetimage") as sct, + ): + assert isinstance(sct, mss.linux.xshmgetimage.MSS) # For Mypy + # Ensure that the grab call completes without exception. + sct.grab(sct.monitors[0]) + # Ensure that it really did have to fall back; otherwise, we'd need to change how we test this case. + assert sct.shm_status == mss.linux.xshmgetimage.ShmStatus.UNAVAILABLE diff --git a/src/tests/test_implementation.py b/src/tests/test_implementation.py index a30f0cd9..d02c2b95 100644 --- a/src/tests/test_implementation.py +++ b/src/tests/test_implementation.py @@ -245,10 +245,6 @@ def test_grab_with_tuple_percents(mss_impl: Callable[..., MSSBase]) -> None: def test_thread_safety(backend: str) -> None: """Regression test for issue #169.""" - # This currently breaks on xshmgetimage. Resource exhaustion? Something leaking? - if backend == "xshmgetimage": - pytest.xfail() - def record(check: dict) -> None: """Record for one second.""" start_time = time.time() From f25d99bca936d7cfa0532ab94d7e8b6e61f3b75f Mon Sep 17 00:00:00 2001 From: Joel Holveck Date: Sun, 23 Nov 2025 08:35:58 +0000 Subject: [PATCH 5/9] Ensure that an X11 connection is open during the test session. Under X11, when the last client disconnects, the server resets. If a new client tries to connect before the reset is complete, it may fail. Since we often run the tests under Xvfb, they're frequently the only clients. Since our tests run in rapid succession, this combination can lead to intermittent failures. To avoid this, we open a connection at the start of the test session and keep it open until the end. --- src/tests/conftest.py | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/src/tests/conftest.py b/src/tests/conftest.py index 7c272449..08b98548 100644 --- a/src/tests/conftest.py +++ b/src/tests/conftest.py @@ -80,3 +80,29 @@ def mss_impl(backend: str) -> Callable[..., MSSBase]: # We can't just use partial here, since it will read $DISPLAY at the wrong time. This can cause problems, # depending on just how the fixtures get run. return lambda *args, **kwargs: mss(*args, display=os.getenv("DISPLAY"), backend=backend, **kwargs) + + +@pytest.fixture(autouse=True, scope="session") +def inhibit_x11_resets() -> Generator[None, None, None]: + """Ensure that an X11 connection is open during the test session. + + Under X11, when the last client disconnects, the server resets. If + a new client tries to connect before the reset is complete, it may fail. + Since we often run the tests under Xvfb, they're frequently the only + clients. Since our tests run in rapid succession, this combination + can lead to intermittent failures. + + To avoid this, we open a connection at the start of the test session + and keep it open until the end. + """ + if system() != "Linux": + yield + return + + conn, _ = xcb.connect() + try: + yield + finally: + # Some tests may have reset xcb.LIB, so make sure it's currently initialized. + xcb.initialize() + xcb.disconnect(conn) From 730531fdea03014767ceded56bbd4be0101f623c Mon Sep 17 00:00:00 2001 From: Joel Ray Holveck Date: Tue, 9 Dec 2025 17:48:36 -0800 Subject: [PATCH 6/9] Update the docs Also, before marking XShmGetImage as unavailable if the first grab() fails, also test to see if the fallback XGetImage also fails (such as if the user gave an out-of-bounds rect). In that case, just reraise the exception and try XShmGetImage again with the next grab(). --- CHANGELOG.md | 3 +- CHANGES.md | 14 ++++ docs/source/api.rst | 79 ++++++++++++++++++++++ docs/source/developers.rst | 9 +++ docs/source/examples.rst | 9 +++ docs/source/examples/linux_xshm_backend.py | 17 +++++ docs/source/usage.rst | 49 +++++++++++++- pyproject.toml | 3 +- src/mss/base.py | 11 ++- src/mss/factory.py | 3 +- src/mss/linux/__init__.py | 13 ++++ src/mss/linux/xshmgetimage.py | 45 ++++++++---- src/tests/test_setup.py | 1 + src/xcbproto/README.md | 2 +- 14 files changed, 238 insertions(+), 20 deletions(-) create mode 100644 docs/source/examples/linux_xshm_backend.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 3c7b25e6..c737eff3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,7 +5,8 @@ See Git checking messages for full history. ## 10.1.1.dev0 (2025-xx-xx) - Linux: check the server for Xrandr support version (#417) - Linux: improve typing and error messages for X libraries (#418) -- Linux: add a new XCB backend for better thread safety, error-checking, and future development (#425) +- Linux: introduce an XCB-powered backend stack with a factory in ``mss.linux`` while keeping the Xlib code as a fallback (#425) +- Linux: add the XShmGetImage backend with automatic XGetImage fallback and explicit status reporting (#431) - :heart: contributors: @jholveck ## 10.1.0 (2025-08-16) diff --git a/CHANGES.md b/CHANGES.md index 231568e3..1f9ef079 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,5 +1,19 @@ # Technical Changes +## 10.1.1 (2025-xx-xx) + +### linux/__init__.py +- Added an ``mss()`` factory to select between the different GNU/Linux backends. + +### linux/xlib.py +- Moved the legacy Xlib backend into the ``mss.linux.xlib`` module to be used as a fallback implementation. + +### linux/xgetimage.py +- Added an XCB-based backend that mirrors XGetImage semantics. + +### linux/xshmgetimage.py +- Added an XCB backend powered by XShmGetImage with ``shm_status`` and ``shm_fallback_reason`` attributes for diagnostics. + ## 10.1.0 (2025-08-16) ### darwin.py diff --git a/docs/source/api.rst b/docs/source/api.rst index b1d87f4b..285774b4 100644 --- a/docs/source/api.rst +++ b/docs/source/api.rst @@ -33,6 +33,36 @@ GNU/Linux .. module:: mss.linux +Factory function to return the appropriate backend implementation. + +.. function:: mss(backend="default", **kwargs) + + :keyword str backend: Backend name ("default", "xlib", "xgetimage", or "xshmgetimage"). + :keyword display: Display name (e.g., ":0.0") for the X server. Default is taken from the :envvar:`DISPLAY` environment variable. + :type display: str or None + :param kwargs: Additional arguments passed to the backend MSS class. + :rtype: :class:`mss.base.MSSBase` + :return: Backend-specific MSS instance. + + Factory returning a proper MSS class instance for GNU/Linux. + The backend parameter selects the implementation: + + - "default" or "xlib": Traditional Xlib-based backend (default) + - "xgetimage": XCB-based backend using XGetImage + - "xshmgetimage": XCB-based backend using XShmGetImage (falls back to XGetImage if unavailable) + +.. function:: MSS(*args, **kwargs) + + Alias for :func:`mss` for backward compatibility. + + +Xlib Backend +^^^^^^^^^^^^ + +.. module:: mss.linux.xlib + +Traditional Xlib-based backend (default). + .. attribute:: CFUNCTIONS .. versionadded:: 6.1.0 @@ -79,6 +109,55 @@ GNU/Linux .. versionadded:: 8.0.0 + +XGetImage Backend +^^^^^^^^^^^^^^^^^ + +.. module:: mss.linux.xgetimage + +XCB-based backend using XGetImage protocol. + +.. class:: MSS + + XCB implementation using XGetImage for screenshot capture. + + +XShmGetImage Backend +^^^^^^^^^^^^^^^^^^^^ + +.. module:: mss.linux.xshmgetimage + +XCB-based backend using XShmGetImage protocol with shared memory. + +.. class:: ShmStatus + + Enum describing the availability of the X11 MIT-SHM extension used by the backend. + + .. attribute:: UNKNOWN + + Initial state before any capture confirms availability or failure. + + .. attribute:: AVAILABLE + + Shared-memory capture works and will continue to be used. + + .. attribute:: UNAVAILABLE + + Shared-memory capture failed; MSS will use XGetImage. + +.. class:: MSS + + XCB implementation using XShmGetImage for screenshot capture. + Falls back to XGetImage if shared memory extension is unavailable. + + .. attribute:: shm_status + + Current shared-memory availability, using :class:`mss.linux.xshmgetimage.ShmStatus`. + + .. attribute:: shm_fallback_reason + + Optional string describing why the backend fell back to XGetImage when MIT-SHM is unavailable. + Windows ------- diff --git a/docs/source/developers.rst b/docs/source/developers.rst index 3dfe19bc..15609d0a 100644 --- a/docs/source/developers.rst +++ b/docs/source/developers.rst @@ -50,3 +50,12 @@ To build the documentation, simply type:: $ python -m pip install -e '.[docs]' $ sphinx-build -d docs docs/source docs_out --color -W -bhtml + + +XCB Code Generator +================== + +The GNU/Linux XCB backends rely on generated ctypes bindings. If you need to +add new XCB requests or types, do **not** edit ``src/mss/linux/xcbgen.py`` by +hand. Instead, follow the workflow described in ``src/xcbproto/README.md``, +which explains how to update ``gen_xcb_to_py.py`` and regenerate the bindings. diff --git a/docs/source/examples.rst b/docs/source/examples.rst index 7bb8157b..71d75df2 100644 --- a/docs/source/examples.rst +++ b/docs/source/examples.rst @@ -103,6 +103,15 @@ You can handle data using a custom class: .. versionadded:: 3.1.0 +GNU/Linux XShm backend +---------------------- + +Select the XShmGetImage backend explicitly and inspect whether it is active or +falling back to XGetImage:: + +.. literalinclude:: examples/linux_xshm_backend.py + :lines: 7- + PIL === diff --git a/docs/source/examples/linux_xshm_backend.py b/docs/source/examples/linux_xshm_backend.py new file mode 100644 index 00000000..3d2f22bf --- /dev/null +++ b/docs/source/examples/linux_xshm_backend.py @@ -0,0 +1,17 @@ +"""This is part of the MSS Python's module. +Source: https://github.com/BoboTiG/python-mss. + +Select the XShmGetImage backend explicitly and inspect its status. +""" + +from mss.linux.xshmgetimage import MSS as mss + +with mss() as sct: + screenshot = sct.grab(sct.monitors[1]) + print(f"Captured screenshot dimensions: {screenshot.size.width}x{screenshot.size.height}") + + print(f"shm_status: {sct.shm_status.name}") + if sct.shm_fallback_reason: + print(f"Falling back to XGetImage because: {sct.shm_fallback_reason}") + else: + print("MIT-SHM capture active.") diff --git a/docs/source/usage.rst b/docs/source/usage.rst index 4e105a8b..63b24028 100644 --- a/docs/source/usage.rst +++ b/docs/source/usage.rst @@ -5,7 +5,7 @@ Usage Import ====== -So MSS can be used as simply as:: +MSS can be used as simply as:: from mss import mss @@ -20,6 +20,11 @@ Or import the good one based on your operating system:: # Microsoft Windows from mss.windows import MSS as mss +On GNU/Linux you can also import a specific backend (see :ref:`backends`) +directly when you need a particular implementation, for example:: + + from mss.linux.xshmgetimage import MSS as mss + Instance ======== @@ -49,19 +54,57 @@ This is a much better usage, memory efficient:: Also, it is a good thing to save the MSS instance inside an attribute of your class and calling it when needed. +.. _backends: + +Backends +-------- + +Some platforms have multiple ways to take screenshots. In MSS, these are known as *backends*. The :py:func:`mss` functions will normally autodetect which one is appropriate for your situation, but you can override this if you want. For instance, you may know that your specific situation requires a particular backend. + +If you want to choose a particular backend, you can use the :py::`backend` keyword to :py:func:`mss`::: + + with mss(backend="xgetimage") as sct: + ... + +Instead, you can also directly import the backend you want to use::: + + from mss.linux.xgetimage import MSS as mss + +Currently, only the GNU/Linux implementation has multiple backends. These are described in their own section below. + + GNU/Linux --------- -On GNU/Linux, you can specify which display to use (useful for distant screenshots via SSH):: +Display +^^^^^^^ + +On GNU/Linux, the default display is taken from the :envvar:`DISPLAY` environment variable. You can instead specify which display to use (useful for distant screenshots via SSH) using the :keyword:`display` keyword:: with mss(display=":0.0") as sct: - # ... + ... A more specific example (only valid on GNU/Linux): .. literalinclude:: examples/linux_display_keyword.py :lines: 9- +Backends +^^^^^^^^ + +The GNU/Linux implementation has multiple backends (see :ref:`backends`), or ways it can take screenshots. The :py:func:`mss.mss` and :py:func:`mss.linux.mss` functions will normally autodetect which one is appropriate, but you can override this if you want. + +There are three available backends. + +:py:mod:`xlib` (default) + The legacy backend, based on :c:func:`XGetImage`. This backend is not being improved anymore. It is only provided in case the newer backends don't work for some reason. + +:py:mod:`xgetimage` + A highly-compatible, but slow, backend, based on :c:func:`xcb_get_image`. This backend is the slowest of the new backends, but works in all situations. You can use this if you know that :py:mod:`xshmgetimage` won't work. + +:py:mod:`xshmgetimage` + The fastest backend, based on :c:func:`xcb_shm_get_image`. This backend is the fastest, about three times faster than :py:mod:`xgetimage`. However, it doesn't work for remote screenshots, such as over SSH. If you use it with a remote display, then it will automatically switch to :py:mod:`xgetimage` instead. It's always safe to use this backend. + Command Line ============ diff --git a/pyproject.toml b/pyproject.toml index f51a7b38..dd5c3f35 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -179,6 +179,7 @@ ignore = [ "docs/source/*" = [ "ERA001", # commented code "INP001", # file `xxx` is part of an implicit namespace package + "N811", # importing constant (MSS) as non-constant (mss) ] "src/tests/*" = [ "FBT001", # boolean-typed positional argument in function definition @@ -191,4 +192,4 @@ ignore = [ ] [tool.ruff.per-file-target-version] -"src/xcbproto/*" = "py312" \ No newline at end of file +"src/xcbproto/*" = "py312" diff --git a/src/mss/base.py b/src/mss/base.py index 0a6c443a..2abfcbd3 100644 --- a/src/mss/base.py +++ b/src/mss/base.py @@ -18,6 +18,15 @@ from mss.models import Monitor, Monitors + # Prior to 3.11, Python didn't have the Self type. typing_extensions does, but we don't want to depend on it. + try: + from typing import Self + except ImportError: # pragma: nocover + try: + from typing_extensions import Self + except ImportError: # pragma: nocover + Self = Any # type: ignore[assignment] + try: from datetime import UTC except ImportError: # pragma: nocover @@ -58,7 +67,7 @@ def __init__( msg = 'The only valid backend on this platform is "default".' raise ScreenShotError(msg) - def __enter__(self) -> MSSBase: # noqa:PYI034 + def __enter__(self) -> Self: """For the cool call `with MSS() as mss:`.""" return self diff --git a/src/mss/factory.py b/src/mss/factory.py index 29cb3ed1..933310d7 100644 --- a/src/mss/factory.py +++ b/src/mss/factory.py @@ -29,7 +29,8 @@ def mss(**kwargs: Any) -> MSSBase: if os_ == "linux": from mss import linux # noqa: PLC0415 - return linux.MSS(**kwargs) + # Linux has its own factory to choose the backend. + return linux.mss(**kwargs) if os_ == "windows": from mss import windows # noqa: PLC0415 diff --git a/src/mss/linux/__init__.py b/src/mss/linux/__init__.py index 1676ffc0..05975e9d 100644 --- a/src/mss/linux/__init__.py +++ b/src/mss/linux/__init__.py @@ -5,6 +5,15 @@ def mss(backend: str = "default", **kwargs: Any) -> MSSBase: + """Factory returning a proper MSS class instance. + + It examines the options provided, and chooses the most adapted MSS + class to take screenshots. It then proxies its arguments to the + class for instantiation. + + Currently, the only option used is the "backend" flag. Future + versions will look at other options as well. + """ backend = backend.lower() if backend in {"default", "xlib"}: from . import xlib # noqa: PLC0415 @@ -13,6 +22,10 @@ def mss(backend: str = "default", **kwargs: Any) -> MSSBase: if backend == "xgetimage": from . import xgetimage # noqa: PLC0415 + # Note that the xshmgetimage backend will automatically fall + # back to XGetImage calls if XShmGetImage isn't available. The + # only reason to use the xgetimage backend is if the user + # already knows that XShmGetImage isn't going to be supported. return xgetimage.MSS(**kwargs) if backend == "xshmgetimage": from . import xshmgetimage # noqa: PLC0415 diff --git a/src/mss/linux/xshmgetimage.py b/src/mss/linux/xshmgetimage.py index da91b363..981f2116 100644 --- a/src/mss/linux/xshmgetimage.py +++ b/src/mss/linux/xshmgetimage.py @@ -17,9 +17,9 @@ class ShmStatus(enum.Enum): - UNKNOWN = enum.auto() - AVAILABLE = enum.auto() - UNAVAILABLE = enum.auto() + UNKNOWN = enum.auto() # Constructor says SHM *should* work, but we haven't seen a real GetImage succeed yet. + AVAILABLE = enum.auto() # We've successfully used XShmGetImage at least once. + UNAVAILABLE = enum.auto() # We know SHM GetImage is unusable; always use XGetImage. class MSS(MSSXCBBase): @@ -27,6 +27,8 @@ class MSS(MSSXCBBase): This implementation is based on XCB, using the ShmGetImage request. If ShmGetImage fails, then this will fall back to using GetImage. + In that event, the reason for the fallback will be recorded in the + shm_fallback_reason attribute as a string, for debugging purposes. """ def __init__(self, /, **kwargs: Any) -> None: @@ -37,8 +39,12 @@ def __init__(self, /, **kwargs: Any) -> None: self._buf: mmap | None = None self._shmseg: xcb.ShmSeg | None = None + # Rather than trying to track the shm_status, we may be able to raise an exception in __init__ if XShmGetImage + # isn't available. The factory in linux/__init__.py could then catch that and switch to XGetImage. + # The conditions under which the attach will succeed but the xcb_shm_get_image will fail are extremely + # rare, and I haven't yet found any that also will work with xcb_get_image. self.shm_status: ShmStatus = self._setup_shm() - self.shm_failed_reason: str | None = None + self.shm_fallback_reason: str | None = None def _shm_report_issue(self, msg: str, *args: Any) -> None: """Debugging hook for troubleshooting MIT-SHM issues. @@ -49,7 +55,7 @@ def _shm_report_issue(self, msg: str, *args: Any) -> None: full_msg = msg if args: full_msg += " | " + ", ".join(str(arg) for arg in args) - self.shm_failed_reason = full_msg + self.shm_fallback_reason = full_msg def _setup_shm(self) -> ShmStatus: # noqa: PLR0911 assert self.conn is not None # noqa: S101 @@ -91,10 +97,8 @@ def _setup_shm(self) -> ShmStatus: # noqa: PLR0911 self._shmseg = xcb.ShmSeg(xcb.generate_id(self.conn).value) try: - # This will normally be what raises an exception if you're on a remote connection. (I previously - # thought the server deferred that until the GetImage call, but I had not been properly checking the - # status.) - # This will close _memfd, on success or on failure. + # This will normally be what raises an exception if you're on a remote connection. + # XCB will close _memfd, on success or on failure. try: xcb.shm_attach_fd(self.conn, self._shmseg, self._memfd, read_only=False) finally: @@ -178,16 +182,33 @@ def _grab_impl(self, monitor: Monitor) -> ScreenShot: if self.shm_status == ShmStatus.UNAVAILABLE: return super()._grab_impl_xgetimage(monitor) + # The usual path is just the next few lines. try: rv = self._grab_impl_xshmgetimage(monitor) + self.shm_status = ShmStatus.AVAILABLE except XProtoError as e: if self.shm_status != ShmStatus.UNKNOWN: + # We know XShmGetImage works, because it worked earlier. Reraise the error. + raise + + # Should we engage the fallback path? In almost all cases, if XShmGetImage failed at this stage (after + # all our testing in __init__), XGetImage will also fail. This could mean that the user sent an + # out-of-bounds request. In more exotic situations, some rare X servers disallow screen capture + # altogether: security-hardened servers, for instance, or some XPrint servers. But let's make sure, by + # testing the same request through XGetImage. + try: + rv = super()._grab_impl_xgetimage(monitor) + except XProtoError: # noqa: TRY203 + # The XGetImage also failed, so we don't know anything about whether XShmGetImage is usable. Maybe + # the user sent an out-of-bounds request. Maybe it's a security-hardened server. We're not sure what + # the problem is. So, if XGetImage failed, we re-raise that error (the one from XShmGetImage will be + # attached as __context__), but we won't update the shm_status yet. (Technically, our except:raise + # clause here is redundant; it's just for clarity, to hold this comment.) raise + + # Using XShmGetImage failed, and using XGetImage worked. Use XGetImage in the future. self._shm_report_issue("MIT-SHM GetImage failed", e) self.shm_status = ShmStatus.UNAVAILABLE self._shutdown_shm() - rv = super()._grab_impl_xgetimage(monitor) - else: - self.shm_status = ShmStatus.AVAILABLE return rv diff --git a/src/tests/test_setup.py b/src/tests/test_setup.py index 391b22f4..295dba23 100644 --- a/src/tests/test_setup.py +++ b/src/tests/test_setup.py @@ -51,6 +51,7 @@ def test_sdist() -> None: f"mss-{__version__}/docs/source/examples/fps_multiprocessing.py", f"mss-{__version__}/docs/source/examples/from_pil_tuple.py", f"mss-{__version__}/docs/source/examples/linux_display_keyword.py", + f"mss-{__version__}/docs/source/examples/linux_xshm_backend.py", f"mss-{__version__}/docs/source/examples/opencv_numpy.py", f"mss-{__version__}/docs/source/examples/part_of_screen.py", f"mss-{__version__}/docs/source/examples/part_of_screen_monitor_2.py", diff --git a/src/xcbproto/README.md b/src/xcbproto/README.md index 79891dd2..351e0b97 100644 --- a/src/xcbproto/README.md +++ b/src/xcbproto/README.md @@ -20,7 +20,7 @@ The generator is a **maintainer tool**, not part of the normal build process: 3. The generator reads the XML protocol definitions and emits `xcbgen.py`. 4. The maintainer ensures that this worked correctly, and moves the file to `src/mss/linux/xcbgen.py`. -4. The generated `xcbgen.py` is committed to version control and distributed with the package, so end users never need to run the generator. +5. The generated `xcbgen.py` is committed to version control and distributed with the package, so end users never need to run the generator. ## Protocol XML Files From 9eaf3fca6903875a7351ad4d6d0ff2b9f5456e6d Mon Sep 17 00:00:00 2001 From: Joel Ray Holveck Date: Tue, 9 Dec 2025 21:40:25 -0800 Subject: [PATCH 7/9] Changes that should have been in the previous commit --- docs/source/usage.rst | 9 ++------- pyproject.toml | 2 +- 2 files changed, 3 insertions(+), 8 deletions(-) diff --git a/docs/source/usage.rst b/docs/source/usage.rst index 63b24028..0fb3aefd 100644 --- a/docs/source/usage.rst +++ b/docs/source/usage.rst @@ -61,12 +61,12 @@ Backends Some platforms have multiple ways to take screenshots. In MSS, these are known as *backends*. The :py:func:`mss` functions will normally autodetect which one is appropriate for your situation, but you can override this if you want. For instance, you may know that your specific situation requires a particular backend. -If you want to choose a particular backend, you can use the :py::`backend` keyword to :py:func:`mss`::: +If you want to choose a particular backend, you can use the :py::`backend` keyword to :py:func:`mss`:: with mss(backend="xgetimage") as sct: ... -Instead, you can also directly import the backend you want to use::: +Alternatively, you can also directly import the backend you want to use:: from mss.linux.xgetimage import MSS as mss @@ -81,11 +81,6 @@ Display On GNU/Linux, the default display is taken from the :envvar:`DISPLAY` environment variable. You can instead specify which display to use (useful for distant screenshots via SSH) using the :keyword:`display` keyword:: - with mss(display=":0.0") as sct: - ... - -A more specific example (only valid on GNU/Linux): - .. literalinclude:: examples/linux_display_keyword.py :lines: 9- diff --git a/pyproject.toml b/pyproject.toml index dd5c3f35..3b4b0223 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -192,4 +192,4 @@ ignore = [ ] [tool.ruff.per-file-target-version] -"src/xcbproto/*" = "py312" +"src/xcbproto/*" = "py312" \ No newline at end of file From 44bf176fbcbfbb203242889e18375c5a808c8cec Mon Sep 17 00:00:00 2001 From: Joel Ray Holveck Date: Tue, 9 Dec 2025 21:50:17 -0800 Subject: [PATCH 8/9] Change the default backend on Linux to xshmgetimage Also, fix a minor error in the docs formatting. --- docs/source/api.rst | 6 +++--- docs/source/usage.rst | 19 ++++++++++++------- src/mss/linux/__init__.py | 11 +++++------ 3 files changed, 20 insertions(+), 16 deletions(-) diff --git a/docs/source/api.rst b/docs/source/api.rst index 285774b4..655aabaf 100644 --- a/docs/source/api.rst +++ b/docs/source/api.rst @@ -47,9 +47,9 @@ Factory function to return the appropriate backend implementation. Factory returning a proper MSS class instance for GNU/Linux. The backend parameter selects the implementation: - - "default" or "xlib": Traditional Xlib-based backend (default) + - "default" or "xshmgetimage": XCB-based backend using XShmGetImage (default, with automatic fallback to XGetImage) - "xgetimage": XCB-based backend using XGetImage - - "xshmgetimage": XCB-based backend using XShmGetImage (falls back to XGetImage if unavailable) + - "xlib": Traditional Xlib-based backend retained for environments without working XCB libraries .. function:: MSS(*args, **kwargs) @@ -61,7 +61,7 @@ Xlib Backend .. module:: mss.linux.xlib -Traditional Xlib-based backend (default). +Legacy Xlib-based backend, kept as a fallback when XCB is unavailable. .. attribute:: CFUNCTIONS diff --git a/docs/source/usage.rst b/docs/source/usage.rst index 0fb3aefd..d870ba65 100644 --- a/docs/source/usage.rst +++ b/docs/source/usage.rst @@ -79,10 +79,11 @@ GNU/Linux Display ^^^^^^^ -On GNU/Linux, the default display is taken from the :envvar:`DISPLAY` environment variable. You can instead specify which display to use (useful for distant screenshots via SSH) using the :keyword:`display` keyword:: +On GNU/Linux, the default display is taken from the :envvar:`DISPLAY` environment variable. You can instead specify which display to use (useful for distant screenshots via SSH) using the ``display`` keyword:: .. literalinclude:: examples/linux_display_keyword.py - :lines: 9- + :lines: 7- + Backends ^^^^^^^^ @@ -91,14 +92,18 @@ The GNU/Linux implementation has multiple backends (see :ref:`backends`), or way There are three available backends. -:py:mod:`xlib` (default) - The legacy backend, based on :c:func:`XGetImage`. This backend is not being improved anymore. It is only provided in case the newer backends don't work for some reason. +:py:mod:`xshmgetimage` (default) + The fastest backend, based on :c:func:`xcb_shm_get_image`. It is roughly three times faster than :py:mod:`xgetimage` + and is used automatically. When the MIT-SHM extension is unavailable (for example on remote SSH displays), it + transparently falls back to :py:mod:`xgetimage` so you can always request it safely. :py:mod:`xgetimage` - A highly-compatible, but slow, backend, based on :c:func:`xcb_get_image`. This backend is the slowest of the new backends, but works in all situations. You can use this if you know that :py:mod:`xshmgetimage` won't work. + A highly-compatible, but slower, backend based on :c:func:`xcb_get_image`. Use this explicitly only when you know + that :py:mod:`xshmgetimage` cannot operate in your environment. -:py:mod:`xshmgetimage` - The fastest backend, based on :c:func:`xcb_shm_get_image`. This backend is the fastest, about three times faster than :py:mod:`xgetimage`. However, it doesn't work for remote screenshots, such as over SSH. If you use it with a remote display, then it will automatically switch to :py:mod:`xgetimage` instead. It's always safe to use this backend. +:py:mod:`xlib` + The legacy backend powered by :c:func:`XGetImage`. It is kept solely for systems where XCB libraries are + unavailable and no new features are being added to it. Command Line diff --git a/src/mss/linux/__init__.py b/src/mss/linux/__init__.py index 05975e9d..14dc33e9 100644 --- a/src/mss/linux/__init__.py +++ b/src/mss/linux/__init__.py @@ -15,19 +15,18 @@ class for instantiation. versions will look at other options as well. """ backend = backend.lower() - if backend in {"default", "xlib"}: + if backend == "xlib": from . import xlib # noqa: PLC0415 return xlib.MSS(**kwargs) if backend == "xgetimage": from . import xgetimage # noqa: PLC0415 - # Note that the xshmgetimage backend will automatically fall - # back to XGetImage calls if XShmGetImage isn't available. The - # only reason to use the xgetimage backend is if the user - # already knows that XShmGetImage isn't going to be supported. + # Note that the xshmgetimage backend will automatically fall back to XGetImage calls if XShmGetImage isn't + # available. The only reason to use the xgetimage backend is if the user already knows that XShmGetImage + # isn't going to be supported. return xgetimage.MSS(**kwargs) - if backend == "xshmgetimage": + if backend in {"default", "xshmgetimage"}: from . import xshmgetimage # noqa: PLC0415 return xshmgetimage.MSS(**kwargs) From 57d8c5d1b9f698a41684af6d77ba22b78f2fcf72 Mon Sep 17 00:00:00 2001 From: Joel Ray Holveck Date: Tue, 9 Dec 2025 23:36:58 -0800 Subject: [PATCH 9/9] Add a --backend argument to the CLI --- docs/source/examples.rst | 2 +- docs/source/usage.rst | 9 +- src/mss/__main__.py | 38 +++++++- src/mss/darwin.py | 2 + src/mss/linux/__init__.py | 3 + src/mss/windows.py | 2 + src/tests/test_implementation.py | 152 +++++++++++++++++++------------ 7 files changed, 140 insertions(+), 68 deletions(-) diff --git a/docs/source/examples.rst b/docs/source/examples.rst index 71d75df2..00920890 100644 --- a/docs/source/examples.rst +++ b/docs/source/examples.rst @@ -107,7 +107,7 @@ GNU/Linux XShm backend ---------------------- Select the XShmGetImage backend explicitly and inspect whether it is active or -falling back to XGetImage:: +falling back to XGetImage: .. literalinclude:: examples/linux_xshm_backend.py :lines: 7- diff --git a/docs/source/usage.rst b/docs/source/usage.rst index d870ba65..b9850dde 100644 --- a/docs/source/usage.rst +++ b/docs/source/usage.rst @@ -79,7 +79,7 @@ GNU/Linux Display ^^^^^^^ -On GNU/Linux, the default display is taken from the :envvar:`DISPLAY` environment variable. You can instead specify which display to use (useful for distant screenshots via SSH) using the ``display`` keyword:: +On GNU/Linux, the default display is taken from the :envvar:`DISPLAY` environment variable. You can instead specify which display to use (useful for distant screenshots via SSH) using the ``display`` keyword: .. literalinclude:: examples/linux_display_keyword.py :lines: 7- @@ -116,8 +116,8 @@ You can use ``mss`` via the CLI:: Or via direct call from Python:: $ python -m mss --help - usage: __main__.py [-h] [-c COORDINATES] [-l {0,1,2,3,4,5,6,7,8,9}] - [-m MONITOR] [-o OUTPUT] [-q] [-v] [--with-cursor] + usage: mss [-h] [-c COORDINATES] [-l {0,1,2,3,4,5,6,7,8,9}] [-m MONITOR] + [-o OUTPUT] [--with-cursor] [-q] [-b BACKEND] [-v] options: -h, --help show this help message and exit @@ -129,6 +129,9 @@ Or via direct call from Python:: the monitor to screenshot -o OUTPUT, --output OUTPUT the output file name + -b, --backend BACKEND + platform-specific backend to use + (Linux: default/xlib/xgetimage/xshmgetimage; macOS/Windows: default) --with-cursor include the cursor -q, --quiet do not print created files -v, --version show program's version number and exit diff --git a/src/mss/__main__.py b/src/mss/__main__.py index 384ad344..7d884fc6 100644 --- a/src/mss/__main__.py +++ b/src/mss/__main__.py @@ -3,8 +3,9 @@ """ import os.path +import platform import sys -from argparse import ArgumentParser +from argparse import ArgumentError, ArgumentParser from mss import __version__ from mss.exception import ScreenShotError @@ -12,9 +13,28 @@ from mss.tools import to_png +def _backend_cli_choices() -> list[str]: + os_name = platform.system().lower() + if os_name == "darwin": + from mss import darwin # noqa: PLC0415 + + return list(darwin.BACKENDS) + if os_name == "linux": + from mss import linux # noqa: PLC0415 + + return list(linux.BACKENDS) + if os_name == "windows": + from mss import windows # noqa: PLC0415 + + return list(windows.BACKENDS) + return ["default"] + + def main(*args: str) -> int: """Main logic.""" - cli_args = ArgumentParser(prog="mss") + backend_choices = _backend_cli_choices() + + cli_args = ArgumentParser(prog="mss", exit_on_error=False) cli_args.add_argument( "-c", "--coordinates", @@ -40,9 +60,19 @@ def main(*args: str) -> int: action="store_true", help="do not print created files", ) + cli_args.add_argument( + "-b", "--backend", default="default", choices=backend_choices, help="platform-specific backend to use" + ) cli_args.add_argument("-v", "--version", action="version", version=__version__) - options = cli_args.parse_args(args or None) + try: + options = cli_args.parse_args(args or None) + except ArgumentError as e: + # By default, parse_args will print and the error and exit. We + # return instead of exiting, to make unit testing easier. + cli_args.print_usage(sys.stderr) + print(f"{cli_args.prog}: error: {e}", file=sys.stderr) + return 2 kwargs = {"mon": options.monitor, "output": options.output} if options.coordinates: try: @@ -61,7 +91,7 @@ def main(*args: str) -> int: kwargs["output"] = "sct-{top}x{left}_{width}x{height}.png" try: - with mss(with_cursor=options.with_cursor) as sct: + with mss(with_cursor=options.with_cursor, backend=options.backend) as sct: if options.coordinates: output = kwargs["output"].format(**kwargs["mon"]) sct_img = sct.grab(kwargs["mon"]) diff --git a/src/mss/darwin.py b/src/mss/darwin.py index f001398b..5d723b98 100644 --- a/src/mss/darwin.py +++ b/src/mss/darwin.py @@ -20,6 +20,8 @@ __all__ = ("MSS",) +BACKENDS = ["default"] + MAC_VERSION_CATALINA = 10.16 kCGWindowImageBoundsIgnoreFraming = 1 << 0 # noqa: N816 diff --git a/src/mss/linux/__init__.py b/src/mss/linux/__init__.py index 14dc33e9..8426abfd 100644 --- a/src/mss/linux/__init__.py +++ b/src/mss/linux/__init__.py @@ -3,6 +3,8 @@ from mss.base import MSSBase from mss.exception import ScreenShotError +BACKENDS = ["default", "xlib", "xgetimage", "xshmgetimage"] + def mss(backend: str = "default", **kwargs: Any) -> MSSBase: """Factory returning a proper MSS class instance. @@ -30,6 +32,7 @@ class for instantiation. from . import xshmgetimage # noqa: PLC0415 return xshmgetimage.MSS(**kwargs) + assert backend not in BACKENDS # noqa: S101 msg = f"Backend {backend!r} not (yet?) implemented." raise ScreenShotError(msg) diff --git a/src/mss/windows.py b/src/mss/windows.py index d5e2bb78..b84daa3e 100644 --- a/src/mss/windows.py +++ b/src/mss/windows.py @@ -35,6 +35,8 @@ __all__ = ("MSS",) +BACKENDS = ["default"] + CAPTUREBLT = 0x40000000 DIB_RGB_COLORS = 0 diff --git a/src/tests/test_implementation.py b/src/tests/test_implementation.py index d02c2b95..9ee55adb 100644 --- a/src/tests/test_implementation.py +++ b/src/tests/test_implementation.py @@ -95,72 +95,104 @@ def test_factory_unknown_system(backend: str, monkeypatch: pytest.MonkeyPatch) - assert error == "System 'chuck norris' not (yet?) implemented." -@patch.object(sys, "argv", new=[]) # Prevent side effects while testing +@pytest.fixture +def reset_sys_argv(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setattr(sys, "argv", []) + + +@pytest.mark.usefixtures("reset_sys_argv") @pytest.mark.parametrize("with_cursor", [False, True]) -def test_entry_point(with_cursor: bool, capsys: pytest.CaptureFixture) -> None: - def main(*args: str, ret: int = 0) -> None: +class TestEntryPoint: + """CLI entry-point scenarios split into focused tests.""" + + @staticmethod + def _run_main(with_cursor: bool, *args: str, ret: int = 0) -> None: if with_cursor: args = (*args, "--with-cursor") assert entry_point(*args) == ret - # No arguments - main() - captured = capsys.readouterr() - for mon, line in enumerate(captured.out.splitlines(), 1): - filename = Path(f"monitor-{mon}.png") - assert line.endswith(filename.name) - assert filename.is_file() - filename.unlink() - - file = Path("monitor-1.png") - for opt in ("-m", "--monitor"): - main(opt, "1") - captured = capsys.readouterr() - assert captured.out.endswith(f"{file.name}\n") - assert filename.is_file() - filename.unlink() - - for opts in zip(["-m 1", "--monitor=1"], ["-q", "--quiet"]): - main(*opts) - captured = capsys.readouterr() - assert not captured.out - assert filename.is_file() - filename.unlink() - - fmt = "sct-{mon}-{width}x{height}.png" - for opt in ("-o", "--out"): - main(opt, fmt) - captured = capsys.readouterr() - with mss.mss(display=os.getenv("DISPLAY")) as sct: - for mon, (monitor, line) in enumerate(zip(sct.monitors[1:], captured.out.splitlines()), 1): - filename = Path(fmt.format(mon=mon, **monitor)) - assert line.endswith(filename.name) - assert filename.is_file() - filename.unlink() - - fmt = "sct_{mon}-{date:%Y-%m-%d}.png" - for opt in ("-o", "--out"): - main("-m 1", opt, fmt) - filename = Path(fmt.format(mon=1, date=datetime.now(tz=UTC))) - captured = capsys.readouterr() - assert captured.out.endswith(f"{filename}\n") - assert filename.is_file() - filename.unlink() - - coordinates = "2,12,40,67" - filename = Path("sct-2x12_40x67.png") - for opt in ("-c", "--coordinates"): - main(opt, coordinates) - captured = capsys.readouterr() - assert captured.out.endswith(f"{filename}\n") - assert filename.is_file() - filename.unlink() - - coordinates = "2,12,40" - for opt in ("-c", "--coordinates"): - main(opt, coordinates, ret=2) + def test_no_arguments(self, with_cursor: bool, capsys: pytest.CaptureFixture) -> None: + self._run_main(with_cursor) captured = capsys.readouterr() - assert captured.out == "Coordinates syntax: top, left, width, height\n" + for mon, line in enumerate(captured.out.splitlines(), 1): + filename = Path(f"monitor-{mon}.png") + assert line.endswith(filename.name) + assert filename.is_file() + filename.unlink() + + def test_monitor_option_and_quiet(self, with_cursor: bool, capsys: pytest.CaptureFixture) -> None: + file = Path("monitor-1.png") + filename: Path | None = None + for opt in ("-m", "--monitor"): + self._run_main(with_cursor, opt, "1") + captured = capsys.readouterr() + assert captured.out.endswith(f"{file.name}\n") + filename = Path(captured.out.rstrip()) + assert filename.is_file() + filename.unlink() + + assert filename is not None + for opts in zip(["-m 1", "--monitor=1"], ["-q", "--quiet"]): + self._run_main(with_cursor, *opts) + captured = capsys.readouterr() + assert not captured.out + assert filename.is_file() + filename.unlink() + + def test_custom_output_pattern(self, with_cursor: bool, capsys: pytest.CaptureFixture) -> None: + fmt = "sct-{mon}-{width}x{height}.png" + for opt in ("-o", "--out"): + self._run_main(with_cursor, opt, fmt) + captured = capsys.readouterr() + with mss.mss(display=os.getenv("DISPLAY")) as sct: + for mon, (monitor, line) in enumerate(zip(sct.monitors[1:], captured.out.splitlines()), 1): + filename = Path(fmt.format(mon=mon, **monitor)) + assert line.endswith(filename.name) + assert filename.is_file() + filename.unlink() + + def test_output_pattern_with_date(self, with_cursor: bool, capsys: pytest.CaptureFixture) -> None: + fmt = "sct_{mon}-{date:%Y-%m-%d}.png" + for opt in ("-o", "--out"): + self._run_main(with_cursor, "-m 1", opt, fmt) + filename = Path(fmt.format(mon=1, date=datetime.now(tz=UTC))) + captured = capsys.readouterr() + assert captured.out.endswith(f"{filename}\n") + assert filename.is_file() + filename.unlink() + + def test_coordinates_capture(self, with_cursor: bool, capsys: pytest.CaptureFixture) -> None: + coordinates = "2,12,40,67" + filename = Path("sct-2x12_40x67.png") + for opt in ("-c", "--coordinates"): + self._run_main(with_cursor, opt, coordinates) + captured = capsys.readouterr() + assert captured.out.endswith(f"{filename}\n") + assert filename.is_file() + filename.unlink() + + def test_invalid_coordinates(self, with_cursor: bool, capsys: pytest.CaptureFixture) -> None: + coordinates = "2,12,40" + for opt in ("-c", "--coordinates"): + self._run_main(with_cursor, opt, coordinates, ret=2) + captured = capsys.readouterr() + assert captured.out == "Coordinates syntax: top, left, width, height\n" + + def test_backend_option(self, with_cursor: bool, capsys: pytest.CaptureFixture) -> None: + backend = "default" + for opt in ("-b", "--backend"): + self._run_main(with_cursor, opt, backend, "-m1") + captured = capsys.readouterr() + filename = Path(captured.out.rstrip()) + assert filename.is_file() + filename.unlink() + + def test_invalid_backend_option(self, with_cursor: bool, capsys: pytest.CaptureFixture) -> None: + backend = "chuck_norris" + for opt in ("-b", "--backend"): + self._run_main(with_cursor, opt, backend, "-m1", ret=2) + captured = capsys.readouterr() + assert "argument -b/--backend: invalid choice: 'chuck_norris' (choose from" in captured.err @patch.object(sys, "argv", new=[]) # Prevent side effects while testing