From f045000c87de541450ac78c22e66b645afc6387e Mon Sep 17 00:00:00 2001 From: Sludge Date: Tue, 22 Jul 2025 22:44:17 +0200 Subject: [PATCH] Proper tests for the `async` functionality --- src/hotplug.rs | 17 ++++-- src/hotplug/async.rs | 42 +++++++++------ src/reader.rs | 4 ++ src/reader/async.rs | 56 +++++++++---------- src/test.rs | 31 ++--------- src/util/async.rs | 126 ++++++++++++++++++++++++++++++++++++++----- 6 files changed, 188 insertions(+), 88 deletions(-) diff --git a/src/hotplug.rs b/src/hotplug.rs index a36a5cf..b15187a 100644 --- a/src/hotplug.rs +++ b/src/hotplug.rs @@ -1,10 +1,17 @@ //! Support for hotplug events. //! -//! This module is currently Linux-specific. It uses the udev netlink socket to listen for events -//! from a udev implementation. +//! The recommended way to support device hotplug in applications is to use the +//! [`hotplug::enumerate`] function, which returns an iterator over all devices that are or will be +//! plugged into the system. //! -//! The recommended way to support device hotplug is to use the [`hotplug::enumerate`] function, -//! which returns an iterator over all devices that are or will be plugged into the system. +//! # Platform Support +//! +//! Hotplug functionality is supported on Linux and FreeBSD, as follows: +//! +//! | OS | Details | +//! |----|---------| +//! | Linux | Uses the `NETLINK_KOBJECT_UEVENT` socket. Requires `udev`. | +//! | FreeBSD | Uses `devd`'s seqpacket socket at `/var/run/devd.seqpacket.pipe`. | //! //! [`hotplug::enumerate`]: crate::hotplug::enumerate @@ -114,6 +121,8 @@ impl HotplugMonitor { /// /// The [`HotplugMonitor`] will be put in non-blocking mode while the [`AsyncIter`] is alive /// (if it isn't already). + /// + /// When using the `"tokio"` Cargo feature, this must be called while inside a tokio context. #[cfg_attr(docsrs, doc(cfg(any(doc, feature = "tokio", feature = "async-io"))))] #[cfg(any(doc, feature = "tokio", feature = "async-io"))] pub fn async_iter(&self) -> io::Result> { diff --git a/src/hotplug/async.rs b/src/hotplug/async.rs index 7f5e187..857ea16 100644 --- a/src/hotplug/async.rs +++ b/src/hotplug/async.rs @@ -32,29 +32,41 @@ impl<'a> AsyncIter<'a> { #[cfg(test)] mod tests { - use std::{io, pin::pin}; + use std::io; - use crate::{ - hotplug::HotplugMonitor, test::AssertPending, uinput::UinputDevice, - util::r#async::with_runtime, - }; + use crate::{hotplug::HotplugMonitor, uinput::UinputDevice, util::r#async::test::AsyncTest}; #[test] fn smoke() -> io::Result<()> { - with_runtime(|rt| { - const DEVNAME: &str = "-@-rust-async-hotplug-test-@-"; + env_logger::try_init().ok(); - let mon = HotplugMonitor::new()?; + const DEVNAME: &str = "-@-rust-async-hotplug-test-@-"; - let events = mon.async_iter()?; - let mut fut = pin!(events.next_event()); - rt.block_on(AssertPending(fut.as_mut())); - - let _uinput = UinputDevice::builder()?.build(DEVNAME)?; - - rt.block_on(fut)?; + let mon = HotplugMonitor::new()?; + let mut uinput = None; + let fut = async { + // Wait for our test device to arrive: + loop { + if let Ok(evdev) = mon.async_iter()?.next_event().await { + if let Ok(name) = evdev.name() { + if name == DEVNAME { + return Ok(evdev); + } + } + } + } + }; + AsyncTest::new(fut, || { + uinput = Some(UinputDevice::builder()?.build(DEVNAME)?); + println!("unblocked"); Ok(()) }) + // This test might take a few tries since unrelated events need to be filtered out, and + // unrelated messages may arrive at the socket, causing a wakeup that results in `Pending`. + .allowed_polls(1024) + .run()?; + + Ok(()) } } diff --git a/src/reader.rs b/src/reader.rs index dc4f076..9afd199 100644 --- a/src/reader.rs +++ b/src/reader.rs @@ -942,6 +942,8 @@ impl EventReader { /// /// The underlying device will be put in non-blocking mode while the returned [`AsyncEvents`] /// is alive (if it isn't already). + /// + /// When using the `"tokio"` Cargo feature, this must be called while inside a tokio context. #[cfg_attr(docsrs, doc(cfg(any(feature = "tokio", feature = "async-io"))))] #[cfg(any(feature = "tokio", feature = "async-io"))] pub fn async_events(&mut self) -> io::Result> { @@ -952,6 +954,8 @@ impl EventReader { /// /// The underlying device will be put in non-blocking mode while the returned [`AsyncReports`] /// is alive (if it isn't already). + /// + /// When using the `"tokio"` Cargo feature, this must be called while inside a tokio context. #[cfg_attr(docsrs, doc(cfg(any(feature = "tokio", feature = "async-io"))))] #[cfg(any(feature = "tokio", feature = "async-io"))] pub fn async_reports(&mut self) -> io::Result> { diff --git a/src/reader/async.rs b/src/reader/async.rs index 393caa5..0630548 100644 --- a/src/reader/async.rs +++ b/src/reader/async.rs @@ -77,49 +77,43 @@ impl<'a> AsyncEvents<'a> { #[cfg(test)] mod tests { - use std::{io, pin::pin}; + use std::io; use crate::{ event::{Rel, RelEvent, Syn}, - test::{AssertPending, check_events, pair}, - util::r#async::with_runtime, + test::{check_events, pair}, + util::r#async::test::AsyncTest, }; #[test] fn smoke() -> io::Result<()> { - with_runtime(|rt| { - let (uinput, evdev) = pair(|b| b.with_rel_axes([Rel::DIAL]))?; - let mut reader = evdev.into_reader()?; - let mut events = reader.async_events()?; + let (uinput, evdev) = pair(|b| b.with_rel_axes([Rel::DIAL]))?; + let mut reader = evdev.into_reader()?; - { - let mut fut = pin!(events.next_event()); - rt.block_on(AssertPending(fut.as_mut())); - - uinput.write(&[RelEvent::new(Rel::DIAL, 1).into()])?; - - let event = rt.block_on(fut)?; - check_events([event], [RelEvent::new(Rel::DIAL, 1).into()]); - } + { + let event = AsyncTest::new(async { reader.async_events()?.next_event().await }, || { + uinput.write(&[RelEvent::new(Rel::DIAL, 1).into()]) + }) + .run()?; - drop(events); - let ev = reader.events().next().unwrap()?; - check_events([ev], [Syn::REPORT.into()]); + check_events([event], [RelEvent::new(Rel::DIAL, 1).into()]); + } - let mut reports = reader.async_reports()?; - let mut fut = pin!(reports.next_report()); - rt.block_on(AssertPending(fut.as_mut())); + let ev = reader.events().next().unwrap()?; + check_events([ev], [Syn::REPORT.into()]); - uinput.write(&[RelEvent::new(Rel::DIAL, 2).into()])?; + let report = AsyncTest::new( + async { reader.async_reports()?.next_report().await }, + || uinput.write(&[RelEvent::new(Rel::DIAL, 2).into()]), + ) + .run()?; - let report = rt.block_on(fut)?; - assert_eq!(report.len(), 2); - check_events( - report, - [RelEvent::new(Rel::DIAL, 2).into(), Syn::REPORT.into()], - ); + assert_eq!(report.len(), 2); + check_events( + report, + [RelEvent::new(Rel::DIAL, 2).into(), Syn::REPORT.into()], + ); - Ok(()) - }) + Ok(()) } } diff --git a/src/test.rs b/src/test.rs index 26b3dfe..23bf211 100644 --- a/src/test.rs +++ b/src/test.rs @@ -1,12 +1,7 @@ -#![allow(dead_code)] - use std::{ - fmt, hash::{BuildHasher, Hasher, RandomState}, io, iter::zip, - pin::Pin, - task::{Context, Poll}, }; use crate::{ @@ -16,12 +11,13 @@ use crate::{ uinput::{Builder, UinputDevice}, }; -fn hash() -> u64 { - RandomState::new().build_hasher().finish() -} - /// Creates a [`UinputDevice`] and [`Evdev`] that are connected to each other. +#[allow(dead_code)] pub fn pair(b: impl FnOnce(Builder) -> io::Result) -> io::Result<(UinputDevice, Evdev)> { + fn hash() -> u64 { + RandomState::new().build_hasher().finish() + } + let hash = hash(); let name = format!("-@-rust-evdevil-device-{hash}-@-"); @@ -73,20 +69,3 @@ pub fn check_events( panic!("expected {expected:?}, got {actual:?}"); } } - -/// A `Future` that polls its argument once and panics unless the inner poll results in `Pending`. -pub struct AssertPending<'a, F>(pub Pin<&'a mut F>); - -impl<'a, F: Future> Future for AssertPending<'a, F> -where - F::Output: fmt::Debug, -{ - type Output = (); - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match self.0.as_mut().poll(cx) { - Poll::Ready(val) => panic!("expected `Pending`, got `Ready`: {val:?}"), - Poll::Pending => Poll::Ready(()), - } - } -} diff --git a/src/util/async.rs b/src/util/async.rs index d92fd11..2afc5b6 100644 --- a/src/util/async.rs +++ b/src/util/async.rs @@ -116,8 +116,9 @@ use asyncio_impl::*; #[cfg(feature = "async-io")] mod asyncio_impl { use std::{ - io, + future, io, os::fd::{BorrowedFd, RawFd}, + pin::pin, task::Poll, }; @@ -129,7 +130,7 @@ mod asyncio_impl { impl Impl { pub fn new(fd: RawFd) -> io::Result { let fd = unsafe { BorrowedFd::borrow_raw(fd) }; - Async::new(fd).map(Self) + Async::new_nonblocking(fd).map(Self) } pub async fn asyncify( @@ -138,13 +139,33 @@ mod asyncio_impl { ) -> io::Result { loop { match op() { - Poll::Pending => self.0.readable().await?, + Poll::Pending => optimistic(self.0.readable()).await?, Poll::Ready(res) => return res, } } } } + // This "optimization" is copied from async-io. + // async-io is apparently very buggy (see smol-rs/async-io#78), so it ends up being required for + // things to work right. + // Specifically, the `.readable()` future is permanently `Pending`, even after the reactor + // schedules the future again, so `asyncify` would just never complete. + async fn optimistic(fut: impl Future>) -> io::Result<()> { + let mut polled = false; + let mut fut = pin!(fut); + + future::poll_fn(|cx| { + if !polled { + polled = true; + fut.as_mut().poll(cx) + } else { + Poll::Ready(Ok(())) + } + }) + .await + } + #[cfg(test)] pub struct Runtime; @@ -169,14 +190,95 @@ pub struct Impl; #[cfg(doc)] pub struct Runtime; -/// Calls `f` with an instance of the selected async runtime. -/// -/// Allows writing async-runtime-agnostic tests. -/// -/// The only supported API is `runtime.block_on(future)`. #[cfg(test)] -pub fn with_runtime(f: impl FnOnce(&Runtime) -> io::Result) -> io::Result { - let rt = Runtime::new()?; - let _guard = rt.enter(); - f(&rt) +pub mod test { + use std::{fmt, future, panic::resume_unwind, pin::pin, sync::mpsc, thread}; + + use super::*; + + pub struct AsyncTest { + future: F, + unblocker: U, + allowed_polls: usize, + } + + impl AsyncTest { + pub fn new(future: F, unblocker: U) -> Self { + Self { + future, + unblocker, + allowed_polls: 1, + } + } + + /// Sets the number of allowed future polls after the `unblocker` has been run. + /// + /// By default, this is 1, expecting the future to complete immediately after the waker has + /// been notified. + /// Higher values may be needed if the API-under-test is system-global and may have to + /// process some irrelevant events until it becomes `Ready`. + pub fn allowed_polls(mut self, allowed_polls: usize) -> Self { + self.allowed_polls = allowed_polls; + self + } + + /// Polls `future`, expecting `Poll::Pending`. Then runs `unblocker`, and expects the waker to + /// be invoked and the `future` to be `Poll::Ready`. + pub fn run(self) -> io::Result + where + F: Future> + Send, + F::Output: Send, + U: FnOnce() -> io::Result<()>, + T: fmt::Debug, + { + let (sender, recv) = mpsc::sync_channel(0); + thread::scope(|s| { + let h = s.spawn(move || -> io::Result<_> { + let rt = Runtime::new()?; + let _guard = rt.enter(); + let mut fut = pin!(self.future); + let mut poll_count = 0; + + rt.block_on(future::poll_fn(|cx| { + if poll_count == 0 { + match fut.as_mut().poll(cx) { + Poll::Ready(val) => { + panic!("expected future to be `Pending`, but it is `Ready({val:?})`") + } + Poll::Pending => { + // Waker is now scheduled to be woken when the event of interest occurs. + println!("future is pending; scheduling wakeup"); + poll_count += 1; + sender.send(()).unwrap(); + return Poll::Pending; + } + } + } else { + // This is called when the `Waker` has been woken up. + match fut.as_mut().poll(cx) { + Poll::Ready(out) => Poll::Ready(out), + Poll::Pending => { + if poll_count >= self.allowed_polls { + panic!("future still `Pending` after {poll_count} polls"); + } + poll_count += 1; + Poll::Pending + } + } + } + })) + }); + + recv.recv().unwrap(); + + // We've been signaled to invoke `unblocker`. + (self.unblocker)()?; + + match h.join() { + Ok(res) => res, + Err(payload) => resume_unwind(payload), + } + }) + } + } }