diff --git a/demos/tinytv-stream-simple.py b/demos/tinytv-stream-simple.py
new file mode 100755
index 0000000..c0dc1c7
--- /dev/null
+++ b/demos/tinytv-stream-simple.py
@@ -0,0 +1,182 @@
+#! /usr/bin/env python3
+
+# You're the type of person who likes to understand how things work under the hood. You want to see a simple example
+# of how to stream video to a TinyTV. This is that example!
+#
+# There's a more advanced example, tinytv-stream.py, that has more features and better performance. But this simple
+# demo is easier to understand, because it does everything in a straightforward way, without any complicated features.
+#
+# Wait, what's a TinyTV? It's a tiny retro-style TV, about 5cm tall. You can put videos on it, or stream video to it
+# over USB. Advanced users can even reprogram its firmware! You can find out more about it at https://tinytv.us/
+#
+# You may want to read at least the docstring at the top of tinytv-stream.py, since it gives you some details about
+# setting up permissions on Linux to connect to your TinyTV.
+#
+# We use three libraries that don't come with Python: PySerial, Pillow, and (of course) MSS. You'll need to install
+# those with "pip install pyserial pillow mss". Normally, you'll want to install these into a venv; if you don't know
+# about those, there are lots of great tutorials online.
+
+from __future__ import annotations
+
+import io
+import sys
+import time
+
+import serial
+from PIL import Image
+
+import mss
+
+
+def main() -> None:
+ # The TinyTV gets streaming input over its USB connection by emulating an old-style serial port. We can send our
+ # video to that serial port, in the format that the TinyTV expects.
+ #
+ # The advanced demo can find the correct device name by looking at the USB IDs of the devices. In this simple
+ # demo, we just ask the user to supply it.
+ if len(sys.argv) != 2: # noqa: PLR2004
+ print(
+ f"Usage: {sys.argv[0]} DEVICE\n"
+ "where DEVICE is something like /dev/ttyACM0 or COM3.\n"
+ 'Use "python3 -m serial.tools.list_ports -v" to list your available devices.'
+ )
+ sys.exit(2)
+ device = sys.argv[1]
+
+ # Open the serial port. It's usually best to use the serial port in a "with:" block like this, to make sure it's
+ # cleaned up when you're done with it.
+ with serial.Serial(device, timeout=1, write_timeout=1) as ser:
+ # The TinyTV might have sent something to the serial port earlier, such as to a program that it was talking to
+ # that crashed without reading it. If that happens, these messages will still be in the device's input
+ # buffer, waiting to be read. We'll just delete anything waiting to be read, to get a fresh start.
+ ser.reset_input_buffer()
+
+ # Let's find out what type of TinyTV this is. The TinyTV has a special command to get that.
+ ser.write(b'{"GET":"tvType"}')
+ tvtype_response = ser.readline()
+ print("Received response:", tvtype_response.strip())
+
+ # The response is usually something like {"tvType":TinyTV2}. Normally, you'd want to use json.loads to parse
+ # JSON. But this isn't correct JSON (there's no quotes around the TV type), so we can't do that.
+ #
+ # But we still need to know the TV type, so we can figure out the screen size. We'll just see if the response
+ # mentions the right type.
+ if b"TinyTV2" in tvtype_response:
+ tinytv_size = (210, 135)
+ elif b"TinyTVKit" in tvtype_response:
+ tinytv_size = (96, 64)
+ elif b"TinyTVMini" in tvtype_response:
+ tinytv_size = (64, 64)
+ else:
+ print("This doesn't seem to be a supported type of TinyTV.")
+ sys.exit(1)
+ print("Detected TinyTV with screen size", tinytv_size)
+
+ # We're ready to start taking screenshots and sending them to the TinyTV! Let's start by creating an MSS
+ # object. Like the serial object, we use a "with:" block to make sure that it can clean up after we're done
+ # with it.
+ #
+ # Note that we use the same MSS object the whole time. We don't try to keep creating a new MSS object each
+ # time we take a new screenshot. That's because the MSS object has a lot of stuff that it sets up and
+ # remembers, and creating a new MSS object each time would mean that it has to repeat that setup constantly.
+ with mss.mss() as sct:
+ # It's time to get the monitor that we're going to capture. In this demo, we just capture the first
+ # monitor. (We could also use monitors[0] for all the monitors combined.)
+ monitor = sct.monitors[1]
+ print("Monitor:", monitor)
+
+ # The rest of this will run forever, until we get an error or the user presses Ctrl-C. Let's record our
+ # starting time and count frames, so we can report FPS at the end.
+ start_time = time.perf_counter()
+ frame_count = 0
+ try:
+ while True:
+ # First, we get a screenshot. MSS makes this easy!
+ screenshot = sct.grab(monitor)
+
+ # The next step is to resize the image to fit the TinyTV's screen. There's a great image
+ # manipulation library called PIL, or Pillow, that can do that. Let's transfer the raw pixels in
+ # the ScreenShot object into a PIL Image.
+ original_image = Image.frombytes("RGB", screenshot.size, screenshot.bgra, "raw", "BGRX")
+
+ # Now, we can resize it. The resize method may stretch the image to make it match the TinyTV's
+ # screen; the advanced demo gives other options. Using a reducing gap is optional, but speeds up
+ # the resize significantly.
+ scaled_image = original_image.resize(tinytv_size, reducing_gap=3.0)
+
+ # The TinyTV wants its image frames in JPEG format. PIL can save an image to a JPEG file, but we
+ # want the JPEG data as a bunch of bytes we can transmit to the TinyTV. Python provides
+ # io.BytesIO to make something that pretends to be a file to PIL, but lets you just get the bytes
+ # that PIL writes.
+ with io.BytesIO() as fh:
+ scaled_image.save(fh, format="JPEG")
+ jpeg_bytes = fh.getvalue()
+
+ # We're ready to send the frame to the TinyTV! First, though, this is a good time to look for any
+ # error messages that the TinyTV has sent us. In today's firmware, anything the TinyTV sends us
+ # is always an error message; it doesn't send us anything normally. (Of course, this might change
+ # in later firmware versions, so we may need to change this someday.)
+ if ser.in_waiting != 0:
+ # There is indeed an error message. Let's read it and show it to the user.
+ incoming_data = ser.read(ser.in_waiting)
+ print(f"Error from TinyTV: {incoming_data!r}")
+ sys.exit(1)
+
+ # The TinyTV wants us to send a command to tell it that we're about to send it a new video frame.
+ # We also need to tell it how many bytes of JPEG data we're going to send. The command we send
+ # looks like {"FRAME":12345}.
+ delimiter = b'{"FRAME":%i}' % len(jpeg_bytes)
+ ser.write(delimiter)
+
+ # Now that we've written the command delimiter, we're ready to write the JPEG data.
+ ser.write(jpeg_bytes)
+
+ # Once we've written the frame, update our counter.
+ frame_count += 1
+
+ # Now we loop! This program will keep running forever, or until you press Ctrl-C.
+
+ finally:
+ # When the loop exits, report our stats.
+ end_time = time.perf_counter()
+ run_time = end_time - start_time
+ print("Frame count:", frame_count)
+ print("Time (secs):", run_time)
+ if run_time > 0:
+ print("FPS:", frame_count / run_time)
+
+
+# Thanks for reading this far! Let's talk about some improvements; these all appear in the advanced version.
+#
+# * Right now, the user has to figure out the right device name for the TinyTV's serial port and supply it on the
+# command line. The advanced version can find the right device automatically by looking at the USB IDs of the
+# connected devices.
+#
+# * There are a lot of things the user might want to do differently, such as choosing which monitor to capture, or
+# changing the JPEG quality (which can affect how fast the TinyTV can process it). The advanced version uses
+# argparse to provide command-line options for these things.
+#
+# * The advanced program shows a status line with things like the current FPS.
+#
+# * Programs of any significant size have a lot of common things you usually want to think about, like organization
+# into separate functions and classes, error handling, logging, and so forth. In this simple demo, we didn't worry
+# about those, but they're important for a real program.
+#
+# * Here's the biggest difference, though. In the program above, we do a lot of things one at a time. First we take
+# a screenshot, then we resize it, then we send it to the TinyTV.
+#
+# We could overlap these, though: while we're sending one screenshot to the TinyTV, we could be preparing the next
+# one. This can speed up the program from about 15 fps to about 25 fps, which is about as fast as the TinyTV can
+# run!
+#
+# This is called a pipeline. While it's tough to coordinate, just like it's harder to coordinate a group of people
+# working together than to do everything yourself, it also can be much faster. A lot of the code in the advanced
+# version is actually about managing the pipeline.
+#
+# Using a pipeline isn't always helpful: you have to understand which operations the system can run in parallel, and
+# how Python itself coordinates threads. That said, I do find that many times, if I'm using MSS to capture video,
+# it does benefit from pipelining these three stages: taking a screenshot, processing it, and sending it somewhere
+# else (like a web server or an AVI file).
+
+if __name__ == "__main__":
+ main()
diff --git a/demos/tinytv-stream.py b/demos/tinytv-stream.py
new file mode 100755
index 0000000..dcbcb18
--- /dev/null
+++ b/demos/tinytv-stream.py
@@ -0,0 +1,1007 @@
+#! /usr/bin/env python3
+
+"""Stream to a TinyTV
+
+A TinyTV is a roughly 5cm tall TV with a 2cm screen. It can play
+videos from built-in storage, or a computer can stream video to it.
+
+This program will capture your display, and stream it to the TinyTV.
+
+While streaming is supported with the TinyTV 2, Mini, and DIY Kit,
+this has only been tested with the TinyTV 2. Reports regarding tests
+with other devices are welcome!
+
+The firmware code in the TinyTV that we're talking to is at
+https://github.com/TinyCircuits/TinyCircuits-TinyTVs-Firmware/blob/master/USB_CDC.h
+
+In short, the TinyTV takes its input as Motion JPEG (MJPG), a simple
+sequence of frames, each encoded as a single JPEG file. With JPEG, it
+is difficult to tell where one JPEG image ends and the next begins.
+So, each frame is preceded by a delimiter: this is the JSON text
+{"FRAME":1234}, where the number is the size in bytes. This is
+followed by the JPEG data itself.
+
+How fast can it be? The time it takes for the TinyTV to process the
+JPEG seems to be the main bottleneck. In our test, the official
+streamer at https://tinytv.us/Streaming/ gets about 15 fps for a 4k
+capture, which is about the same as the non-threaded simple demo gets
+(depending on the screen contents).
+
+We use a background sending thread so that we can prepare one
+screenshot while the other is being sent. That lets us get about
+20-30 fps for 4k in our tests, which seems to be close to the limit of
+the TinyTV hardware (again, depending on the JPEG settings and screen
+contents).
+
+Before connecting, we can't distinguish between a TinyTV and any other
+Raspberry Pi Pico by USB IDs alone. You may need to use the --device
+or --usb-serial-number flag to tell the program which serial device to
+use. Different OSs have different ways to identify the correct
+device.
+
+Windows:
+
+On Windows, the serial device will be something like COM3. You can
+find the correct port by looking in Device Manager under "Ports (COM &
+LPT)". You can specify the device to use with the --device flag.
+
+You may want to use the --list-devices flag to identify the correct
+device, and use the --usb-serial-number flag in future invocations.
+This is because Windows COM port assignments can change between
+reboots or replugging the device.
+
+macOS:
+
+On macOS, the serial device is usually something like
+/dev/tty.usbserial-1234ABCD or /dev/tty.usbmodem1234ABCD, where
+1234ABCD is a device-specific value that will be the same every time
+that device is plugged in. You can use the --device flag to point to
+these.
+
+Linux:
+
+On Linux, the serial device is usually something like /dev/ttyACM0 or
+/dev/ttyUSB0. You can use the --device flag to point to the symlink
+that is automatically created, e.g.,
+/dev/serial/by-id/usb-Raspberry_Pi_Pico_0123456789ABCDEF-if00.
+
+You need write access to the serial device that represents the TinyTV.
+If you run this program as a normal user, you may need to set up a
+udev rule to give your user access.
+
+You can do this by creating a file named
+/etc/udev/rules.d/70-tinytv.rules or something similar. Name it with
+a number below 73 so it runs before 73-seat-late.rules (where uaccess
+is applied). (Note that this rule will be applied to many Raspberry
+Pi Pico devices; you can add a ATTRS{serial} test to limit it to just
+your TinyTV.)
+
+ # TinyTV2, TinyTVKit
+ SUBSYSTEM=="tty", ATTRS{idVendor}=="2e8a", ATTRS{idProduct}=="0003|000a", TAG+="uaccess"
+ # TinyTVMini
+ SUBSYSTEM=="tty", ATTRS{idVendor}=="03eb", ATTRS{idProduct}=="8009", TAG+="uaccess"
+
+Once you have the rule in place, reload udev rules:
+
+ sudo udevadm control --reload-rules
+
+Then unplug and replug your TinyTV.
+"""
+
+# I see you're the type of person who likes to read the source code, instead of just the documentation. Maybe you're
+# looking here because you're getting ideas about doing something similar in your own projects, or maybe you're just
+# curious about how this works. Terrific! Glad to have you with us!
+#
+# You may want to read the simple demo first, to get an idea of what we're doing here. This program follows the same
+# basic steps, but we organize them into a multithreaded pipeline for higher performance. We also add more
+# configuration options and things like that.
+#
+# Still, the basic flow is still straightforward:
+# Capture from MSS -> Scale and JPEG-encode in PIL -> Send to serial port
+#
+# The core idea behind this program is a pipeline: instead of fully processing one video frame at a time, we work on
+# several different frames at once, each at a different stage of processing.
+#
+# At any given moment, one frame might be getting captured, the previous frame might be getting scaled and
+# JPEG-encoded, and an even earlier frame might be in the process of being sent to the TinyTV.
+#
+# The stages are:
+#
+# * capture a screenshot (MSS)
+# * scale and JPEG-encode it (Pillow)
+# * send it to the TinyTV (serial)
+#
+# Between each stage is a mailbox. A mailbox can hold one item. A stage puts its output into the next mailbox, and
+# the following stage takes it when it's ready.
+#
+# If a stage tries to read from an empty mailbox, it waits. If it tries to write to a full mailbox, it also waits.
+#
+# This lets the stages overlap. While one frame is being sent to the TinyTV (the slowest step), the next frame can
+# already be captured or encoded.
+#
+# Eventually, the slowest stage determines the overall speed. When that happens, earlier stages naturally stop and
+# wait. This is called backpressure: work piles up behind the bottleneck instead of running ahead and wasting effort.
+#
+# The result is that the TinyTV may show a frame that's a few frames behind what's currently on your screen. That
+# latency is the cost of keeping the pipeline efficient.
+#
+# An alternative design would be to drop old frames when a mailbox is full, so the display stays closer to "live".
+# That reduces lag, but it means capturing and encoding frames that are never shown. Which approach is better depends
+# on your goals; this program chooses to block and apply backpressure.
+
+from __future__ import annotations
+
+import argparse
+import contextlib
+import functools
+import io
+import itertools
+import logging
+import os
+import re
+import sys
+import time
+from collections import deque
+from collections.abc import Generator, Iterable, Iterator
+from threading import Condition, Lock, Thread
+from typing import TYPE_CHECKING, Generic, Literal, TypeVar, overload
+
+import serial
+from PIL import Image, ImageOps
+from prettytable import PrettyTable, TableStyle
+from serial.tools import list_ports
+
+import mss
+
+if TYPE_CHECKING:
+ from collections.abc import Callable
+
+# The keys in this are substrings in the tvType query. Make sure that they're all distinct: having both "TinyTV2" and
+# "TinyTV2.1" in here would mean that a 2.1 might be misidentified as a 2. We use substrings instead of parsing the
+# response because the TinyTV currently responds with invalid JSON, and they might change that later.
+SUPPORTED_DEVICES = {
+ # Only the TinyTV2 kit has been tested. Reports with other hardware are welcome!
+ b"TinyTV2": {
+ # Uses an RP2040 board. 2e8a:000a is the ID when it's in normal mode (not recovery), which is the default
+ # VID:PID for an RP2040.
+ "usb_id": (0x2E8A, 0x000A),
+ "size": (210, 135),
+ },
+ b"TinyTVKit": {
+ # Uses an RP2040 board, like the TinyTV2. I assume it also uses the default VID:PID.
+ "usb_id": (0x2E8A, 0x000A),
+ "size": (96, 64),
+ },
+ b"TinyTVMini": {
+ # Uses a board based on the SAMD21, similar to an Arduino Zero. From what I see in the TinyCircuits Arduino
+ # board file, it enumerates as 03eb:8009. I think it uses 03eb:8008 in recovery mode.
+ "usb_id": (0x03EB, 0x8009),
+ "size": (64, 64),
+ },
+}
+
+# Downscaling is one of the most time-intensive steps, and the practical difference between the high-quality
+# algorithms isn't going to be perceptible in this context.
+SCALING_ALGORITHM = Image.Resampling.HAMMING
+
+# When choosing JPEG quality, note that higher quality images are slower for the TinyTV to decode; you can get a few
+# extra FPS by lowering the quality to something like 35. However, with some content (like text windows, which the
+# user will have on their screen when they first start this program), heavy compression makes JPEG artifacts really
+# visible. You just don't want to see that on your TinyTV.
+DEFAULT_JPEG_QUALITY = 75
+
+
+T = TypeVar("T")
+U = TypeVar("U")
+
+LOGGER = logging.getLogger("tinytv-stream")
+
+
+class MailboxShutDown(Exception): # noqa: N818 (An exception, but not an error)
+ """Exception to indicate that a Mailbox has been shut down.
+
+ This will be raised if Mailbox.get() or Mailbox.put() is run on a
+ mailbox after its .shutdown() method has been called, or if it is
+ called while waiting.
+ """
+
+ def __init__(self, mailbox: Mailbox) -> None:
+ #: The mailbox that was shut down
+ self.mailbox = mailbox
+
+ def __str__(self) -> str:
+ return f"Mailbox shut down: {self.mailbox}"
+
+
+class Mailbox(Generic[T]):
+ """Thread-safe container to pass a single object at a time between threads.
+
+ A Mailbox can be shut down to indicate that it is no longer
+ available. This can be used by a producer to indicate that no
+ more items will be forthcoming, or by a consumer to indicate that
+ it is no longer able to accept more objects.
+
+ In Python 3.13, this has the same basic functionality as
+ queue.Queue(1). Prior to 3.13, there was no
+ queue.Queue.shutdown() method. The mechanisms for using mailboxes
+ as iterables, or adding items from iterables, are also not part of
+ queue.Queue in any version of Python.
+ """
+
+ def __init__(self) -> None:
+ #: Lock to protect mailbox state
+ self.lock = Lock()
+ self._condition = Condition(lock=self.lock)
+ #: Indicates whether an item is present in the mailbox
+ self.has_item = False
+ self._item: T | None = None
+ #: Indicates whether the mailbox has been shut down
+ self.is_shutdown = False
+
+ def get(self) -> T:
+ """Return and remove the item being held by the mailbox.
+
+ If an item is not presently available, block until another
+ thread calls .put().
+ """
+ with self._condition:
+ while True:
+ # We test to see if an item is present before testing if the queue is shut down. This is so that a
+ # non-immediate shutdown allows the mailbox to be drained.
+ if self.has_item:
+ rv = self._item
+ self._item = None # Don't hold an unnecessary reference
+ self.has_item = False
+ self._condition.notify_all()
+ return rv # type:ignore[return-value]
+ if self.is_shutdown:
+ raise MailboxShutDown(self)
+ self._condition.wait()
+
+ def get_many(self) -> Iterable[T]:
+ """Yield items as they appear in the mailbox.
+
+ The iterator exits the mailbox is shut down; MailboxShutDown
+ is not raised into the caller.
+ """
+ return iter(self)
+
+ def put(self, item: T) -> None:
+ """Store an item in the mailbox.
+
+ If an item is already in the mailbox, block until another
+ thread calls .get().
+ """
+ with self._condition:
+ while True:
+ if self.is_shutdown:
+ raise MailboxShutDown(self)
+ if not self.has_item:
+ self._item = item
+ self.has_item = True
+ self._condition.notify()
+ return
+ self._condition.wait()
+
+ def put_many(self, items: Iterable[T]) -> Iterator[T]:
+ """Put the elements of iterable in the mailbox, one at a time.
+
+ If the mailbox is shut down before all the elements can be put
+ into it, a MailboxShutDown exception is _not_ raised.
+
+ Returns an iterator containing any remaining items, including
+ the one that was being processed when the mailbox was shut
+ down. The first item (if any) of this iterator can be
+ immediately accessed with next; subsequent items defer to the
+ input iterable, so may block.
+ """
+ iterator = iter(items)
+ for item in iterator:
+ # We put this try/except inside the for loop, to make sure we don't accidentally filter out an exception
+ # that escaped the items iterator.
+ try:
+ self.put(item)
+ except MailboxShutDown:
+ return itertools.chain([item], iterator)
+ # Remove references to the value once it's not needed. This lets objects with advanced buffer semantics
+ # reclaim the object's memory immediately, without waiting for the next iteration of the iterable.
+ del item
+ return iter([])
+
+ def shutdown(self, *, immediate: bool = False) -> None:
+ """Shut down the mailbox, marking it as unavailable for future use.
+
+ Any callers currently blocked in .get or .put, or any future
+ caller to those methods, will recieve a MailboxShutDown
+ exception. Callers using .get_many or iterating over the
+ mailbox will see the iteration end. Callers to .put_many will
+ stop adding items.
+
+ If immediate is False (the default), and an item is currently
+ in the mailbox, it will be returned by the next call to
+ .get(), and the one after that will raise MailboxShutDown.
+
+ It is safe to call this method multiple times, including to
+ promote a non-immediate shutdown to an immediate one.
+ """
+ with self._condition:
+ # We don't actually need to check whether we've been called already.
+ self.is_shutdown = True
+ if immediate:
+ self._item = None
+ self.has_item = False
+ self._condition.notify_all()
+
+ def __iter__(self) -> Iterator[T]:
+ """Yield items as they appear in the mailbox.
+
+ The iterator exits when the mailbox is shut down;
+ MailboxShutDown is not raised into the caller.
+ """
+ with contextlib.suppress(MailboxShutDown):
+ while True:
+ yield self.get()
+
+
+class PipelineStage(Thread, Generic[T, U]):
+ """A stage of a multi-threaded pipeline.
+
+ The target function will be called once, and should yield one
+ value for each element.
+
+ If an in_mailbox is provided, the function will get an iterable of
+ its successive elements. If an out_mailbox is provided, it will
+ be supplied with the successive outputs of the target function.
+
+ If the either mailbox is shut down, the target function's loop
+ will stop being called. Both mailboxes will be shut down when the
+ target function ends.
+
+ Note to readers adapting this class to their own programs:
+
+ This is designed for linear pipelines: it is not meant to support
+ fan-in (multiple stages feeding one mailbox) or fan-out (one
+ mailbox feeding multiple stages). The shutdown semantics of these
+ sorts of pipelines will depend heavily on what it's used for, and
+ this demo only needs a simple pipeline.
+ """
+
+ # Source stage
+ @overload
+ def __init__(
+ self,
+ target: Callable[[], Generator[U]],
+ *,
+ out_mailbox: Mailbox[U],
+ name: str | None = None,
+ ) -> None: ...
+
+ # Transformer stage
+ @overload
+ def __init__(
+ self,
+ target: Callable[[Iterable[T]], Generator[U]],
+ *,
+ in_mailbox: Mailbox[T],
+ out_mailbox: Mailbox[U],
+ name: str | None = None,
+ ) -> None: ...
+
+ # Sink stage
+ @overload
+ def __init__(
+ self,
+ target: Callable[[Iterable[T]], None],
+ *,
+ in_mailbox: Mailbox[T],
+ name: str | None = None,
+ ) -> None: ...
+
+ def __init__(
+ self,
+ target: Callable[[], Generator[U]] | Callable[[Iterable[T]], Generator[U]] | Callable[[Iterable[T]], None],
+ *,
+ in_mailbox: Mailbox[T] | None = None,
+ out_mailbox: Mailbox[U] | None = None,
+ name: str | None = None,
+ ) -> None:
+ """Initialize the PipelineStage.
+
+ Either :param:`in_mailbox` or :param:`out_mailbox` is
+ required. Otherwise, it would be a pipeline stage that can't
+ connect to anything else. (You can always use
+ :class:`threading.Thread` directly if you need that behavior.)
+
+ :param target: Function to run during the stage. This will be
+ called once, in a separate thread. This should take one
+ argument if :param:`in_mailbox` is provided, or no
+ arguments otherwise. If you want additional arguments
+ (such as configuration), use :func:`functools.partial`.
+ :param in_mailbox: An optional :class:`Mailbox` to provide
+ inputs to the target function. The target function will
+ be called with one argument, an iterable that you can use
+ in a for loop or similar construct, to get the successive
+ values.
+ :param out_mailbox: An optional :class:`Mailbox` to receive
+ outputs from the target function. If this is provided,
+ the target function must be a generator (a function that
+ uses ``yield`` instead of ``return``). The successive
+ outputs from the function will be placed in
+ :param:`out_mailbox`.
+ :param name: An optional name for debugging purposes; see
+ :attr:`threading.Thread.name`.
+ """
+ if in_mailbox is None and out_mailbox is None:
+ msg = "Cannot have a pipeline stage with neither inputs nor outputs"
+ raise ValueError(msg)
+ self.in_mailbox = in_mailbox
+ self.out_mailbox = out_mailbox
+ self.target = target
+ #: The exception (if any) raised by the target function
+ self.exc: Exception | None = None
+ super().__init__(name=name, daemon=True)
+
+ def run(self) -> None:
+ """Execute the pipeline stage.
+
+ This should not be run directly. Instead, use the start()
+ method (inherited from threading.Thread) to run this in a
+ background thread.
+
+ This will run the target function, managing input and output
+ mailboxes. When the stage completes, whether normally or with
+ an error, the mailboxes will be shut down.
+ """
+ try:
+ if self.out_mailbox is None:
+ # This is a sink function, the easiest to deal with. Since a mailbox is iterable, we can just pass it
+ # to the target function.
+ assert self.in_mailbox is not None # noqa: S101
+ self.target(self.in_mailbox) # type:ignore[call-arg]
+ return
+ # This is a source or transformation function.
+ out_iterable = self.target() if self.in_mailbox is None else self.target(self.in_mailbox) # type:ignore[call-arg]
+ if not isinstance(out_iterable, Generator):
+ msg = (
+ "Pipeline target function was expected to be a generator; "
+ f"instead, it returned a {type(out_iterable)}."
+ )
+ raise TypeError(msg) # noqa: TRY301
+ # Once a generator is closed, the yield call (where they block when they send an object downstream) will
+ # raise GeneratorExit. That lets finally: blocks, with: exits, etc. run. This happens automatically when
+ # out_iterable is garbage-collected. We still close it explicitly to so it gets the GeneratorExit, in case
+ # something (like an exception object) is holding a reference to out_iterable.
+ with contextlib.closing(out_iterable):
+ self.out_mailbox.put_many(out_iterable)
+ except Exception as e:
+ # We store the exception, so that our caller can choose what to do about it after they call join.
+ self.exc = e
+ raise
+ finally:
+ if self.in_mailbox is not None:
+ self.in_mailbox.shutdown()
+ if self.out_mailbox is not None:
+ self.out_mailbox.shutdown()
+
+ def __str__(self) -> str:
+ return f""
+
+
+def list_devices() -> None:
+ """Display all USB serial ports in a formatted table."""
+ ports = list(list_ports.comports())
+ if not ports:
+ print("No serial ports found.")
+ return
+
+ # Create and populate table
+ table = PrettyTable(["Device", "USB ID", "Serial Number", "Manufacturer", "Product", "Description"])
+ table.align = "l"
+ table.set_style(TableStyle.PLAIN_COLUMNS)
+ table.sortby = "Device"
+ for port in ports:
+ usb_id = f"{port.vid:04x}:{port.pid:04x}".lower() if port.vid and port.pid else ""
+ serial_num = port.serial_number or ""
+ table.add_row(
+ [
+ port.device,
+ usb_id,
+ serial_num,
+ port.manufacturer or "",
+ port.product or "",
+ port.description if port.description and port.description != "n/a" else "",
+ ]
+ )
+
+ print(table)
+
+
+def get_device_name(usb_id: str | None, usb_serial_number: str | None) -> str: # noqa: PLR0912
+ """Find the device name for a USB serial port.
+
+ If multiple serial ports match the criteria, an exception is
+ raised.
+
+ We currently don't provide the user a way to select an interface
+ if the device has multiple USB endpoints. The TinyTV doesn't do
+ that, so it's not urgent.
+
+ :param usb_id: USB vendor:product ID in format "vvvv:pppp".
+ :param usb_serial_number: Optional USB serial number to filter by.
+ :returns: The device name (e.g., "/dev/ttyACM0" on Linux or "COM3"
+ on Windows).
+ """
+ if usb_id is not None:
+ vendor_str, product_str = usb_id.lower().split(":", maxsplit=1)
+ vendor = int(vendor_str, 16)
+ product = int(product_str, 16)
+
+ candidates = []
+
+ # We sort the ports by name so that the --verbose output is nicer to read.
+ for port in sorted(list_ports.comports(), key=lambda port: port.name):
+ if port.vid is None or port.pid is None:
+ LOGGER.debug("%s: device is not USB", port.name)
+ continue
+ if usb_serial_number is not None and port.serial_number != usb_serial_number:
+ LOGGER.debug("%s: serial number does not match (found %r)", port.name, port.serial_number)
+ continue
+ if usb_id is not None:
+ if (port.vid, port.pid) == (vendor, product):
+ LOGGER.debug("%s: device matches")
+ candidates.append(port)
+ else:
+ LOGGER.debug("%s: USB id mismatch: %04x:%04x", port.name, port.vid, port.pid)
+ else:
+ for device_name, device_spec in SUPPORTED_DEVICES.items():
+ if (port.vid, port.pid) == device_spec["usb_id"]:
+ LOGGER.debug(
+ "%s: USB id matches %s: %04x:%04x", port.name, device_name.decode("ascii"), port.vid, port.pid
+ )
+ candidates.append(port)
+ break
+ else:
+ LOGGER.debug("%s: USB id not in supported device list: %04x:%04x", port.name, port.vid, port.pid)
+
+ if len(candidates) == 1:
+ # We've been logging the name attribute, which is the human-friendly name: "ttyACM0". We return the device
+ # attribute, which is the full path: "/dev/ttyACM0".
+ return candidates[0].device
+
+ msg = "Cannot find USB device" if len(candidates) == 0 else "Multiple USB devices found"
+ if usb_id is not None:
+ msg += f": {usb_id}"
+ else:
+ msg += " in supported device list"
+ if usb_serial_number is not None:
+ msg += f" with serial number {usb_serial_number}"
+ if len(candidates) != 0:
+ msg += f": {', '.join(c.name for c in candidates)}"
+
+ msg += "\nHint: Consider --list-devices, find your device, and use the --device flag."
+
+ raise RuntimeError(msg)
+
+
+def get_screen_size(ser: serial.Serial) -> tuple[int, int]:
+ """Identify the TinyTV type and screen size.
+
+ :param ser: An open serial connection to the TinyTV.
+ :return: The screen size as (width, height).
+ """
+ # First, clear out any remaining junk in the buffer, such as from earlier buggy runs.
+ while ser.in_waiting:
+ ser.reset_input_buffer()
+ # Check for the device type.
+ LOGGER.debug('>>> {"GET":"tvType"}')
+ ser.write(b'{"GET":"tvType"}')
+ response = ser.readline() # {"tvType":TinyTV2}\r\n
+ LOGGER.debug("<<< %s", response.decode(errors="replace").rstrip())
+ # Do a very simple check: the return format might change (such as to add quotes around the value).
+ for name, spec in SUPPORTED_DEVICES.items():
+ if name in response:
+ LOGGER.debug("Device detected as %s", name.decode())
+ return spec["size"]
+ msg = f"Device is not a supported TinyTV: {ser.name}"
+ raise RuntimeError(msg)
+
+
+def _scale_letterbox(img: Image.Image, size: tuple[int, int]) -> Image.Image:
+ """Fit image to size, preserving aspect ratio, with black padding."""
+ img.thumbnail(size, SCALING_ALGORITHM)
+ return ImageOps.pad(img, size, color="black")
+
+
+def _scale_crop(img: Image.Image, size: tuple[int, int]) -> Image.Image:
+ """Crop image to fit size, preserving aspect ratio."""
+ return ImageOps.fit(img, size, SCALING_ALGORITHM)
+
+
+def _scale_stretch(img: Image.Image, size: tuple[int, int]) -> Image.Image:
+ """Stretch image to exactly fit size, ignoring aspect ratio."""
+ return img.resize(size, SCALING_ALGORITHM)
+
+
+def capture_image(
+ *,
+ monitor: int | None = None,
+ capture_area: dict[str, int] | None = None,
+) -> Generator[Image.Image]:
+ """Continuously capture images from the specified monitor.
+
+ Either monitor or capture_area must be used, but not both.
+
+ :param monitor: Monitor number to capture from, using the standard
+ MSS convention (all screens=0, first screen=1, etc.).
+ :param capture_area: Capture rectangle dict with 'left', 'top',
+ 'width', 'height'.
+ :yields: PIL Image objects from the captured monitor.
+ """
+ with mss.mss() as sct:
+ rect = capture_area if capture_area is not None else sct.monitors[monitor]
+ LOGGER.debug("Capture area: %i,%i, %ix%i", rect["left"], rect["top"], rect["width"], rect["height"])
+
+ while True:
+ sct_img = sct.grab(rect)
+ pil_img = Image.frombytes("RGB", sct_img.size, sct_img.bgra, "raw", "BGRX")
+ yield pil_img
+
+
+def process_and_encode_image(
+ images: Iterable[Image.Image],
+ *,
+ size: tuple[int, int],
+ scaling_mode: Literal["letterbox", "crop", "stretch"] = "stretch",
+ quality: int = DEFAULT_JPEG_QUALITY,
+) -> Generator[bytes]:
+ """Scale and JPEG-encode images for TinyTV display.
+
+ :param images: Iterable of PIL Image objects to process.
+ :param size: Tuple (width, height) to resize images to.
+ :param scaling_mode: How to scale images ("letterbox", "crop", or
+ "stretch").
+ :param quality: JPEG quality level (1-100). Higher quality
+ provides clearer images, but also is slower for the TinyTV to
+ process.
+ :yields: JPEG-encoded image data as bytes.
+ """
+ # Select the scaling function based on mode.
+ scale_func = {
+ "letterbox": _scale_letterbox,
+ "crop": _scale_crop,
+ "stretch": _scale_stretch,
+ }[scaling_mode]
+
+ for img in images:
+ # Scaling large images can be slow. To speed it up, reduce to ~3x target size before invoking the
+ # high-quality scaling function.
+ reduced_size = tuple(d * 3 for d in size)
+ reduction_factor = max(1, img.width // reduced_size[0], img.height // reduced_size[1])
+ scaled_img = img.reduce(reduction_factor)
+ scaled_img = scale_func(scaled_img, size)
+
+ with io.BytesIO() as fh:
+ scaled_img.save(fh, format="JPEG", quality=quality)
+ jpeg_bytes = fh.getvalue()
+
+ yield jpeg_bytes
+
+
+def send_jpeg(ser: serial.Serial, jpeg_bytes_inputs: Iterable[bytes]) -> Generator[int]:
+ """Send JPEG frames to TinyTV over its USB serial connection.
+
+ :param ser: Serial device for an open and verified connection.
+ :param jpeg_bytes_inputs: Iterable of JPEG-encoded image data. Each
+ element should represent a single, self-contained image frame.
+ :yields: Byte count sent for each frame (delimiter + JPEG data).
+ """
+ for jpeg_bytes in jpeg_bytes_inputs:
+ # The TinyTV doesn't have an unambiguous error protocol: it just prints an English string. Fortunately, it
+ # doesn't print anything during normal operation. Debug builds of the firmware can, but if you're using a
+ # debug build, you know enough to adapt this code to your needs.
+ if ser.in_waiting:
+ # Configure a one-second timeout on the serial device, so that it will stop reading after that time,
+ # instead of waiting for a full 4k of error messages.
+ ser.timeout = 1
+ incoming_data = ser.read(4096)
+ msg = f"Error from TinyTV: {incoming_data!r}"
+ raise RuntimeError(msg)
+
+ delimiter = ('{"FRAME":%s}' % len(jpeg_bytes)).encode("ascii") # noqa: UP031
+ ser.write(delimiter)
+ ser.write(jpeg_bytes)
+ yield (len(delimiter) + len(jpeg_bytes))
+
+
+def show_stats(byte_counts: Iterable[int]) -> None:
+ """Display streaming statistics (FPS and throughput).
+
+ Statistics are displayed over a 100-frame sliding window, which is
+ about four seconds.
+
+ FPS indicates how fast the entire pipeline can run as a whole, not
+ any individual stage.
+
+ Bps, or bytes per second, is the speed at which we are sending
+ data to the TinyTV. The TinyTV is usually the slowest part of the
+ pipeline, but not because of the raw transfer speed. If you try
+ different --quality values, you'll see that at higher quality, the
+ Bps goes up, but the overall FPS drops. This indicates that the
+ per-frame decoding in the TinyTV, rather than the raw transfer
+ speed, is the limiting factor.
+
+ This is run on the main thread. This is partly a matter of
+ convenience, and partly because it simplifies waiting for the
+ pipeline to complete.
+
+ :param byte_counts: Iterable of byte counts per frame.
+ """
+ # If we needed high-precision, such as for benchmarking very short times, we might want to use time.perf_counter().
+ # However, time.monotonic() is sufficient for simple stats reporting.
+ start_time = time.monotonic()
+ time_deque: deque[float] = deque(maxlen=100)
+ byte_count_deque: deque[int] = deque(maxlen=100)
+ next_display_update = 0.0
+ last_status_len = 0
+ for frame_count, byte_count in enumerate(byte_counts):
+ now = time.monotonic()
+ time_deque.append(now)
+ byte_count_deque.append(byte_count)
+ if now >= next_display_update and len(time_deque) > 1:
+ next_display_update = now + 0.1
+ running_time = now - start_time
+ running_minutes = int(running_time / 60)
+ running_seconds = int(running_time % 60)
+ window_secs = time_deque[-1] - time_deque[0]
+ window_frames = len(time_deque)
+ window_bytes = sum(byte_count_deque)
+ fps = window_frames / window_secs
+ bytes_per_sec = int(window_bytes / window_secs)
+ line = (
+ f"{running_minutes:02d}:{running_seconds:02d} frame {frame_count}: {fps:.2f} fps, {bytes_per_sec} Bps"
+ )
+ this_status_len = len(line)
+ full_line = f"\r{line}{' ' * (last_status_len - this_status_len)}"
+ print(full_line, end="")
+ last_status_len = this_status_len
+
+
+def _usb_id_type(value: str) -> str:
+ """Validate and return a USB ID in vvvv:pppp format.
+
+ This is used to tell argparse how to validate the string given on
+ the command line.
+
+ :param value: The USB ID string to validate.
+ :returns: The validated USB ID string.
+ :raises argparse.ArgumentTypeError: If the format is invalid.
+ """
+ # Expect vvvv:pppp using hex digits
+ if re.fullmatch(r"[0-9a-fA-F]{4}:[0-9a-fA-F]{4}", value):
+ return value
+ msg = "Invalid USB ID format; expected vvvv:pppp (hex)"
+ raise argparse.ArgumentTypeError(msg)
+
+
+def _quality_type(value: str) -> int:
+ """Validate and return a JPEG quality value (1-100).
+
+ This is used to tell argparse how to validate the string given on
+ the command line.
+
+ :param value: The quality value string to validate.
+ :returns: The validated quality value as an integer.
+ :raises argparse.ArgumentTypeError: If the value is not between 1
+ and 100.
+ """
+ try:
+ q = int(value)
+ except ValueError:
+ msg = "Quality must be an integer between 1 and 100"
+ raise argparse.ArgumentTypeError(msg) from None
+ if 1 <= q <= 100: # noqa: PLR2004
+ return q
+ msg = "Quality must be between 1 and 100"
+ raise argparse.ArgumentTypeError(msg)
+
+
+def _capture_area_type(value: str) -> dict[str, int]:
+ """Validate and return a capture area dict.
+
+ Expected format is ``left,top,width,height`` where all values are
+ integers.
+
+ :param value: The capture area string to validate.
+ :returns: Dict with 'left', 'top', 'width', 'height' keys.
+ :raises argparse.ArgumentTypeError: If the format is invalid or extents
+ are non-positive.
+ """
+ parts = value.split(",")
+ if len(parts) != 4: # noqa: PLR2004
+ msg = "Capture area must have four comma-separated integers: left,top,width,height"
+ raise argparse.ArgumentTypeError(msg)
+
+ try:
+ left, top, width, height = (int(part) for part in parts)
+ except ValueError:
+ msg = "Capture area values must be integers"
+ raise argparse.ArgumentTypeError(msg) from None
+
+ if width <= 0 or height <= 0:
+ msg = "Capture area width and height must be positive"
+ raise argparse.ArgumentTypeError(msg)
+
+ return {"left": left, "top": top, "width": width, "height": height}
+
+
+def main() -> None:
+ """Main entry point for the TinyTV streaming application.
+
+ Parses command-line arguments, sets up the streaming pipeline, and
+ runs the capture-process-send stages in parallel threads.
+ """
+ parser = argparse.ArgumentParser(
+ description="Stream your display to a TinyTV",
+ usage="""
+%(prog)s --list-devices
+%(prog)s
+ [ [ --usb-id VID:PID ] [ --usb-serial-number SERIAL ] | --device DEVICE ]
+ [ --monitor MONITOR_NUMBER | --capture-area X,Y,R,B ]
+ [ --scaling-mode {letterbox,crop,stretch} ] [ --quality QUALITY ]
+""".strip(),
+ )
+ parser.add_argument(
+ "-L",
+ "--list-devices",
+ action="store_true",
+ help="List all USB serial ports and exit",
+ )
+ parser.add_argument(
+ "-v",
+ "--verbose",
+ action="store_true",
+ help="Report additional details",
+ )
+ parser.add_argument(
+ "-U",
+ "--usb-id",
+ type=_usb_id_type,
+ help="USB VID:PID to search for (default 2e8a:000a)",
+ )
+ parser.add_argument(
+ "-S",
+ "--usb-serial-number",
+ help="Match device by USB serial number instead of VID:PID",
+ )
+ sample_device = (
+ "COM3"
+ if os.name == "nt"
+ else "/dev/serial/by-id/usb-Raspberry_Pi_Pico_0123456789ABCDEF-if00"
+ if os.name == "posix"
+ else None
+ )
+ sample_device_desc = f" (e.g., {sample_device})" if sample_device else ""
+ parser.add_argument(
+ "-d",
+ "--device",
+ help=(f"Serial device{sample_device_desc}"),
+ )
+ monitor_group = parser.add_mutually_exclusive_group()
+ monitor_group.add_argument(
+ "-m",
+ "--monitor",
+ type=int,
+ default=1,
+ help="Monitor index from mss (0 = all, 1+ = individual; default 1; mutually exclusive with --capture-area)",
+ )
+ monitor_group.add_argument(
+ "-a",
+ "--capture-area",
+ type=_capture_area_type,
+ metavar="X,Y,W,H",
+ help="Capture rectangle as left,top,width,height (mutually exclusive with --monitor)",
+ )
+ parser.add_argument(
+ "-s",
+ "--scaling-mode",
+ choices=["letterbox", "crop", "stretch"],
+ default="crop",
+ help="How to scale to TinyTV display: letterbox (black bars), crop (center), or stretch (default crop)",
+ )
+ parser.add_argument(
+ "-q",
+ "--quality",
+ type=_quality_type,
+ default=75,
+ help="JPEG quality (1-100; default 75)",
+ )
+
+ args = parser.parse_args()
+
+ logging.basicConfig(level=logging.DEBUG if args.verbose else logging.INFO, format="%(message)s")
+
+ # Handle --list-devices
+ if args.list_devices:
+ list_devices()
+ return
+
+ # Compute variables from CLI args.
+ monitor = args.monitor
+ capture_area = args.capture_area
+ quality = args.quality
+ scaling_mode = args.scaling_mode
+
+ # Find the right device.
+ if args.device:
+ if args.usb_id is not None or args.usb_serial_number is not None:
+ parser.error("argument --device: not allowed with --usb-id or --usb-serial-number")
+ device = args.device
+ else:
+ device = get_device_name(args.usb_id, args.usb_serial_number)
+ LOGGER.info("Using device %s", device)
+
+ # We could use serial.Serial as a context manager if we wanted to automatically close it when we don't need it
+ # anymore. But we need it for the entire life of the program, so we just keep it open.
+ ser = serial.Serial(device, timeout=1)
+ size = get_screen_size(ser)
+ LOGGER.debug("TinyTV screen size: %dx%d", size[0], size[1])
+
+ # We divide our work into three stages: capture, processing (scale and encode), and sending. These each take
+ # about the same amount of time per-image. In the capture stage, we are mostly waiting for the image to be
+ # copied. In the processing stage, we are just running PIL image manipulation functions. In the send stage, we
+ # are mostly waiting for the TinyTV to read our data. The overall slowest stage is the send stage. You can get
+ # close to optimal performance even if you combine the capture and processing threads, but separating them gives
+ # us more headroom.
+
+ # Mailboxes are used to pass data between threads.
+ captured_image_mailbox: Mailbox[Image.Image] = Mailbox()
+ jpeg_bytes_mailbox: Mailbox[bytes] = Mailbox()
+ byte_count_mailbox: Mailbox[int] = Mailbox()
+
+ # The stages are run in parallel threads.
+ capture_stage: PipelineStage[None, Image.Image] = PipelineStage(
+ name="capture",
+ target=functools.partial(capture_image, monitor=monitor, capture_area=capture_area),
+ out_mailbox=captured_image_mailbox,
+ )
+ process_and_encode_stage = PipelineStage(
+ name="process_and_encode",
+ in_mailbox=captured_image_mailbox,
+ target=functools.partial(process_and_encode_image, size=size, scaling_mode=scaling_mode, quality=quality),
+ out_mailbox=jpeg_bytes_mailbox,
+ )
+ send_stage: PipelineStage[bytes, int] = PipelineStage(
+ name="send",
+ in_mailbox=jpeg_bytes_mailbox,
+ target=functools.partial(send_jpeg, ser),
+ out_mailbox=byte_count_mailbox,
+ )
+
+ capture_stage.start()
+ process_and_encode_stage.start()
+ send_stage.start()
+
+ LOGGER.debug("Capture thread: %i", capture_stage.native_id)
+ LOGGER.debug("Process thread: %i", process_and_encode_stage.native_id)
+ LOGGER.debug("Send thread: %i", send_stage.native_id)
+
+ # The show_stats function will run until the byte_count_mailbox shuts down, which happens if any of the threads
+ # encounters an error: the PipelineStage will shut down its mailboxes, and that shutdown will propagate through
+ # all the stages.
+ show_stats(byte_count_mailbox)
+
+ # At this point, the byte_count_mailbox has shut down, and the others will be shutting down as well. We join the
+ # outstanding threads, so that if any of them raise an Exception, that thread has time to print it before we exit.
+ capture_stage.join()
+ process_and_encode_stage.join()
+ send_stage.join()
+
+ # Test for errors from any of the stages. If there are errors, then the default threading.excepthook will have
+ # already printed it to stderr. We just need to exit with a non-zero value to let the shell know that something
+ # happened. (Mind you, currently, we never stop without an exception like KeyboardInterrupt.)
+ if capture_stage.exc or process_and_encode_stage.exc or send_stage.exc:
+ sys.exit(1)
+
+
+if __name__ == "__main__":
+ main()