From 061d85d8481134db5f517eab302934f40c37ed99 Mon Sep 17 00:00:00 2001 From: Luca Versari Date: Mon, 8 Dec 2025 02:08:41 +0100 Subject: [PATCH 1/3] Implement an actual scheduler. As disk operations are still blocking, it doesn't quite work as well as one would want, but it's a start. --- pixie-uefi/src/flash.rs | 5 +- pixie-uefi/src/main.rs | 13 +-- pixie-uefi/src/os/executor.rs | 169 ++++++++++++++++++++++------- pixie-uefi/src/os/input.rs | 3 +- pixie-uefi/src/os/mod.rs | 3 +- pixie-uefi/src/os/net/interface.rs | 6 +- pixie-uefi/src/os/net/mod.rs | 77 +++++++++---- pixie-uefi/src/os/net/speed.rs | 3 +- pixie-uefi/src/os/net/tcp.rs | 7 +- pixie-uefi/src/os/ui.rs | 7 +- pixie-uefi/src/power_control.rs | 4 +- 11 files changed, 215 insertions(+), 82 deletions(-) diff --git a/pixie-uefi/src/flash.rs b/pixie-uefi/src/flash.rs index 6167ea90..26826ef0 100644 --- a/pixie-uefi/src/flash.rs +++ b/pixie-uefi/src/flash.rs @@ -5,6 +5,7 @@ use core::cell::{Cell, RefCell}; use core::fmt::Write; use core::mem; use core::net::SocketAddrV4; +use core::time::Duration; use futures::future::{select, Either}; use log::info; @@ -120,7 +121,7 @@ pub async fn flash(server_addr: SocketAddrV4) -> Result<()> { let draw_task = async { let mut last_stats = stats.borrow().clone(); while !done.get() { - Executor::sleep_us(100_000).await; + Executor::sleep(Duration::from_millis(100)).await; let stats = stats.borrow().clone(); if stats == last_stats { continue; @@ -176,7 +177,7 @@ pub async fn flash(server_addr: SocketAddrV4) -> Result<()> { ); while !chunks_info.is_empty() { let recv = Box::pin(socket.recv_from(&mut buf)); - let sleep = Box::pin(Executor::sleep_us(100_000)); + let sleep = Box::pin(Executor::sleep(Duration::from_millis(100))); match select(recv, sleep).await { Either::Left(((buf, _addr), _)) => { stats.borrow_mut().pack_recv += 1; diff --git a/pixie-uefi/src/main.rs b/pixie-uefi/src/main.rs index 8b58c964..1970b1b0 100644 --- a/pixie-uefi/src/main.rs +++ b/pixie-uefi/src/main.rs @@ -7,6 +7,7 @@ extern crate alloc; use alloc::boxed::Box; use core::net::{Ipv4Addr, SocketAddrV4}; +use core::time::Duration; use futures::future::{self, Either}; use pixie_shared::{Action, TcpRequest, UdpRequest, ACTION_PORT, PING_PORT}; @@ -43,7 +44,7 @@ async fn server_discover() -> Result { socket .send_to(SocketAddrV4::new(Ipv4Addr::BROADCAST, ACTION_PORT), &msg) .await?; - Executor::sleep_us(1_000_000).await; + Executor::sleep(Duration::from_secs(1)).await; }) }; @@ -70,7 +71,7 @@ async fn shutdown() -> ! { export_cov::export().await; log::info!("Shutting down..."); - Executor::sleep_us(1_000_000).await; + Executor::sleep(Duration::from_secs(1)).await; power_control::shutdown() } @@ -108,7 +109,7 @@ async fn run() -> Result<()> { .send_to(SocketAddrV4::new(*server.ip(), PING_PORT), b"pixie") .await .unwrap(); - Executor::sleep_us(10_000_000).await; + Executor::sleep(Duration::from_secs(10)).await; } }); @@ -132,11 +133,7 @@ async fn run() -> Result<()> { log::warn!("Started waiting for another command..."); } last_was_wait = true; - const WAIT_10MSECS: u64 = 50; - for _ in 0..WAIT_10MSECS { - Executor::deep_sleep_us(10_000); - Executor::sched_yield().await; - } + Executor::sleep(Duration::from_millis(500)).await; } else { last_was_wait = false; log::info!("Command: {command:?}"); diff --git a/pixie-uefi/src/os/executor.rs b/pixie-uefi/src/os/executor.rs index 76eb25c8..a9d54613 100644 --- a/pixie-uefi/src/os/executor.rs +++ b/pixie-uefi/src/os/executor.rs @@ -1,4 +1,5 @@ use alloc::boxed::Box; +use alloc::collections::binary_heap::BinaryHeap; use alloc::collections::VecDeque; use alloc::sync::Arc; use alloc::task::Wake; @@ -8,10 +9,11 @@ use core::future::{poll_fn, Future}; use core::pin::Pin; use core::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use core::task::{Context, Poll, Waker}; -use futures::channel::oneshot; +use core::time::Duration; +use futures::channel::oneshot; +use futures::task::AtomicWaker; use spin::Mutex; -use uefi::boot::{EventType, TimerTrigger, Tpl}; use uefi::proto::console::text::Color; use crate::os::send_wrapper::SendWrapper; @@ -53,10 +55,33 @@ impl Wake for Task { } } -static EXECUTOR: Mutex = Mutex::new(Executor { - ready_tasks: VecDeque::new(), - tasks: vec![], -}); +pub(super) type WrappedWaker = Arc; + +struct TimedWait { + wake_at: i64, + waker: WrappedWaker, +} + +impl PartialEq for TimedWait { + fn eq(&self, other: &Self) -> bool { + self.wake_at == other.wake_at + } +} + +impl Eq for TimedWait {} + +impl PartialOrd for TimedWait { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for TimedWait { + fn cmp(&self, other: &Self) -> core::cmp::Ordering { + // Reversed order: min-heap. + other.wake_at.cmp(&self.wake_at) + } +} pub struct JoinHandle(oneshot::Receiver); @@ -66,8 +91,16 @@ impl JoinHandle { } } +static EXECUTOR: Mutex = Mutex::new(Executor { + wake_on_interrupt: vec![], + timed_wait: BinaryHeap::new(), + ready_tasks: VecDeque::new(), + tasks: vec![], +}); + pub struct Executor { - // TODO(veluca): scheduling. + wake_on_interrupt: Vec, + timed_wait: BinaryHeap, ready_tasks: VecDeque>, tasks: Vec>, } @@ -81,7 +114,7 @@ impl Executor { assert!((w - 1).is_multiple_of(TASK_LEN + 1)); let num_w = (w - 1) / (TASK_LEN + 1); let mut last = Timer::micros() as u64; - Self::sleep_us(100_000).await; + Self::sleep(Duration::from_millis(100)).await; loop { draw_area.clear(); let cur = Timer::micros() as u64; @@ -161,30 +194,76 @@ impl Executor { tasks.retain(|t| !t.done.load(Ordering::Relaxed)); } last = Timer::micros() as u64; - Self::sleep_us(1_000_000).await; + Self::sleep(Duration::from_secs(1)).await } } pub fn run() -> ! { Self::spawn("[show_tasks]", Self::draw_tasks()); + + // Maximum amount of microseconds between wakeups of interrupt-based wakers. + const INTERRUPT_MICROS: i64 = 500; + + let mut last_interrupt_wakeup = Timer::micros(); + + let mut do_wake = |force_interrupt_wake| { + // Wake timed-waiting tasks. + loop { + let waker = { + let mut ex = EXECUTOR.lock(); + let Some(w) = ex.timed_wait.peek() else { + break; + }; + if w.wake_at > Timer::micros() { + break; + } + let w = ex.timed_wait.pop().unwrap(); + w.waker + }; + waker.wake(); + } + // Since we don't notice interrupts that happened while we are not hlt-ing, + // make sure that we wake up all the interrupt-based waiting tasks every at + // most INTERRUPT_MICROS micros to make it unlikely to miss interrupts. + if last_interrupt_wakeup + INTERRUPT_MICROS <= Timer::micros() || force_interrupt_wake { + last_interrupt_wakeup = Timer::micros(); + let to_wake = core::mem::take(&mut EXECUTOR.lock().wake_on_interrupt); + for w in to_wake { + w.wake(); + } + } + }; + loop { - let task = EXECUTOR - .lock() - .ready_tasks - .pop_front() - .expect("Executor should never run out of ready tasks"); - - task.in_queue.store(false, Ordering::Relaxed); - let waker = Waker::from(task.clone()); - let mut context = Context::from_waker(&waker); - let mut fut = task.future.try_lock().unwrap(); - let begin = Timer::micros(); - let done = fut.0.as_mut().poll(&mut context); - let end = Timer::micros(); - task.micros - .fetch_add((end - begin) as u64, Ordering::Relaxed); - if done.is_ready() { - task.done.swap(true, Ordering::Relaxed); + do_wake(false); + let task = EXECUTOR.lock().ready_tasks.pop_front(); + if let Some(task) = task { + // It is possible for a done task to end up in the queue (if it wakes + // itself during execution). If that happens, we just remove it from + // the queue here. + if task.done.load(Ordering::Relaxed) { + continue; + } + task.in_queue.store(false, Ordering::Relaxed); + let waker = Waker::from(task.clone()); + let mut context = Context::from_waker(&waker); + let mut fut = task.future.try_lock().unwrap(); + let begin = Timer::micros(); + let done = fut.0.as_mut().poll(&mut context); + let end = Timer::micros(); + task.micros + .fetch_add((end - begin) as u64, Ordering::Relaxed); + if done.is_ready() { + task.done.swap(true, Ordering::Relaxed); + } + } else { + // If we don't have anything ready, sleep until the next interrupt. + // SAFETY: hlt is available on all reasonable x86 processors and has no safety + // requirements. + unsafe { + core::arch::asm!("hlt"); + } + do_wake(true); } } } @@ -204,27 +283,41 @@ impl Executor { }) } - pub fn sleep_us(us: u64) -> impl Future { - let tgt = Timer::micros() as u64 + us; + // Note: there are no guarantees on whether the amount of time we will sleep for + // will be exceeded. + pub fn sleep(time: Duration) -> impl Future { + let tgt = Timer::micros() + time.as_micros() as i64; + let mut ww = None; poll_fn(move |cx| { - let now = Timer::micros() as u64; + let now = Timer::micros(); if now >= tgt { Poll::Ready(()) } else { - // TODO(veluca): actually suspend the task. - cx.waker().wake_by_ref(); + Self::wake_at_micros(tgt, cx.waker(), &mut ww); Poll::Pending } }) } - /// **WARNING**: this function halts all tasks - pub fn deep_sleep_us(us: u64) { - // SAFETY: we are not using a callback - let e = - unsafe { uefi::boot::create_event(EventType::TIMER, Tpl::NOTIFY, None, None).unwrap() }; - uefi::boot::set_timer(&e, TimerTrigger::Relative(10 * us)).unwrap(); - uefi::boot::wait_for_event(&mut [e]).unwrap(); + // Wakes a task as soon as *any* interrupt is received. + pub(super) fn wake_on_interrupt(waker: &Waker) { + EXECUTOR.lock().wake_on_interrupt.push(waker.clone()); + } + + pub(super) fn wake_at_micros( + micros: i64, + waker: &Waker, + previous_waker: &mut Option, + ) { + if !previous_waker.is_some() { + let w = Arc::new(AtomicWaker::new()); + EXECUTOR.lock().timed_wait.push(TimedWait { + wake_at: micros, + waker: w.clone(), + }); + *previous_waker = Some(w); + } + previous_waker.as_ref().unwrap().register(waker); } /// Spawn a new task. diff --git a/pixie-uefi/src/os/input.rs b/pixie-uefi/src/os/input.rs index 9545de07..a819297d 100644 --- a/pixie-uefi/src/os/input.rs +++ b/pixie-uefi/src/os/input.rs @@ -7,6 +7,7 @@ use uefi::boot::ScopedProtocol; use uefi::proto::console::text::{Input, Key}; use crate::os::error::Result; +use crate::os::executor::Executor; use crate::os::send_wrapper::SendWrapper; static INPUT: Lazy>>> = Lazy::new(|| { @@ -22,7 +23,7 @@ pub fn read_key() -> impl Future> { Err(e) => Poll::Ready(Err(e.into())), Ok(Some(key)) => Poll::Ready(Ok(key)), Ok(None) => { - cx.waker().wake_by_ref(); + Executor::wake_on_interrupt(cx.waker()); Poll::Pending } } diff --git a/pixie-uefi/src/os/mod.rs b/pixie-uefi/src/os/mod.rs index d10a154d..c2093adc 100644 --- a/pixie-uefi/src/os/mod.rs +++ b/pixie-uefi/src/os/mod.rs @@ -2,6 +2,7 @@ use core::ffi::c_void; use core::future::Future; use core::ptr::NonNull; use core::sync::atomic::{AtomicBool, Ordering}; +use core::time::Duration; use uefi::boot::{EventType, Tpl}; use uefi::{Event, Status}; @@ -72,7 +73,7 @@ where break; } - Executor::sleep_us(30_000_000).await; + Executor::sleep(Duration::from_secs(30)).await; } }); diff --git a/pixie-uefi/src/os/net/interface.rs b/pixie-uefi/src/os/net/interface.rs index 9afe7868..9ca40f82 100644 --- a/pixie-uefi/src/os/net/interface.rs +++ b/pixie-uefi/src/os/net/interface.rs @@ -5,8 +5,9 @@ use uefi::proto::network::snp::{ReceiveFlags, SimpleNetwork}; use uefi::Status; use super::ETH_PACKET_SIZE; +use crate::os::send_wrapper::SendWrapper; -type Snp = ScopedProtocol; +type Snp = SendWrapper>; pub struct SnpDevice { snp: Snp, @@ -15,9 +16,6 @@ pub struct SnpDevice { rx_buf: [u8; ETH_PACKET_SIZE + 4], } -// SAFETY: we never create threads anyway. -unsafe impl Send for SnpDevice {} - impl SnpDevice { pub fn new(snp: Snp) -> SnpDevice { // Shut down the SNP protocol if needed. diff --git a/pixie-uefi/src/os/net/mod.rs b/pixie-uefi/src/os/net/mod.rs index 04be355d..f604284e 100644 --- a/pixie-uefi/src/os/net/mod.rs +++ b/pixie-uefi/src/os/net/mod.rs @@ -1,11 +1,17 @@ use alloc::string::{String, ToString}; +use alloc::sync::Arc; +use alloc::vec::Vec; use core::fmt::Write; use core::future::{poll_fn, Future}; use core::net::Ipv4Addr; use core::sync::atomic::{AtomicU64, Ordering}; use core::task::Poll; +use core::time::Duration; -use smoltcp::iface::{Config, Interface, PollResult, SocketHandle, SocketSet}; +use futures::task::AtomicWaker; +use smoltcp::iface::{ + Config, Interface, PollIngressSingleResult, PollResult, SocketHandle, SocketSet, +}; use smoltcp::socket::dhcpv4::{Event, Socket as Dhcpv4Socket}; use smoltcp::wire::{DhcpOption, HardwareAddress, IpCidr}; use spin::Mutex; @@ -19,10 +25,11 @@ use uefi::Handle; use super::timer::Timer; use crate::os::boot_options::BootOptions; -use crate::os::executor::Executor; +use crate::os::executor::{Executor, WrappedWaker}; use crate::os::net::interface::SnpDevice; pub use crate::os::net::tcp::TcpStream; pub use crate::os::net::udp::UdpSocket; +use crate::os::send_wrapper::SendWrapper; use crate::os::timer::rdtsc; use crate::os::ui; @@ -44,6 +51,8 @@ struct NetworkData { static NETWORK_DATA: Mutex> = Mutex::new(None); +static WAITING_FOR_IP: Mutex> = Mutex::new(vec![]); + fn with_net T>(f: F) -> T { let mut mg = NETWORK_DATA.try_lock().expect("Network is locked"); f(mg.as_mut().expect("Network is not initialized")) @@ -97,7 +106,7 @@ pub(super) fn init() { &snp.mode().current_address.0[..6], )); - let mut device = SnpDevice::new(snp); + let mut device = SnpDevice::new(SendWrapper(snp)); let mut interface_config = Config::new(hw_addr); interface_config.random_seed = rdtsc() as u64; @@ -118,15 +127,28 @@ pub(super) fn init() { dhcp_socket_handle, }); - Executor::spawn( - "[net_poll]", + Executor::spawn("[net_poll]", { poll_fn(move |cx| { - poll(); - // TODO(veluca): figure out whether we can suspend the task. - cx.waker().wake_by_ref(); + let wait = poll(); + match wait { + None => { + Executor::wake_on_interrupt(cx.waker()); + } + Some(x) if x < 200 => { + // Immediately wake if we want call poll() again in a very short time. + cx.waker().wake_by_ref(); + } + Some(wait) => { + // Halve the waiting time, to try to ensure that we don't exceed the suggested + // waiting time. + let deadline = Timer::micros() + wait as i64 / 2; + Executor::wake_on_interrupt(cx.waker()); + Executor::wake_at_micros(deadline, cx.waker(), &mut None); + } + } Poll::<()>::Pending - }), - ); + }) + }); Executor::spawn("[show_ip]", async { let mut draw_area = ui::DrawArea::ip(); @@ -136,10 +158,10 @@ pub(super) fn init() { let w = draw_area.size().0; if let Some(ip) = ip { write!(draw_area, "IP: {ip:>0$}", w - 4).unwrap(); - Executor::sleep_us(10_000_000).await + Executor::sleep(Duration::from_secs(10)).await } else { draw_area.write_with_color("DHCP...", Color::Yellow, Color::Black); - Executor::sleep_us(100_000).await + Executor::sleep(Duration::from_millis(100)).await } } }); @@ -148,11 +170,17 @@ pub(super) fn init() { } pub fn wait_for_ip() -> impl Future { + let mut last_waker = None; poll_fn(move |cx| { if ip().is_some() { Poll::Ready(()) } else { - cx.waker().wake_by_ref(); + if last_waker.is_none() { + let waker = Arc::new(AtomicWaker::new()); + WAITING_FOR_IP.lock().push(waker.clone()); + last_waker = Some(waker); + } + last_waker.as_ref().unwrap().register(cx.waker()); Poll::Pending } }) @@ -167,25 +195,25 @@ fn get_ephemeral_port() -> u16 { ((ans % (60999 - 49152)) + 49152) as u16 } -fn poll() { +/// Returns # of microseconds to wait until we should call poll() again (possibly 0), or +/// None if we can wait until the next interrupt. +fn poll() -> Option { let now = Timer::instant(); let mut data = NETWORK_DATA.lock(); - let Some(NetworkData { + let NetworkData { interface, device, socket_set, dhcp_socket_handle, - }) = data.as_mut() - else { - return; - }; + } = data.as_mut().unwrap(); - let status = interface.poll(now, device, socket_set); + let status_out = interface.poll_egress(now, device, socket_set); + let status_in = interface.poll_ingress_single(now, device, socket_set); - if status == PollResult::None { - return; + if status_in == PollIngressSingleResult::None && status_out == PollResult::None { + return interface.poll_delay(now, socket_set).map(|x| x.micros()); } let dhcp_status = socket_set @@ -203,6 +231,10 @@ fn poll() { .add_default_ipv4_route(router) .unwrap(); } + let to_wake = core::mem::take(&mut *WAITING_FOR_IP.lock()); + for w in to_wake { + w.wake(); + } } else { interface.update_ip_addrs(|a| { a.clear(); @@ -210,4 +242,5 @@ fn poll() { interface.routes_mut().remove_default_ipv4_route(); } } + Some(0) } diff --git a/pixie-uefi/src/os/net/speed.rs b/pixie-uefi/src/os/net/speed.rs index 89770cb7..203ed129 100644 --- a/pixie-uefi/src/os/net/speed.rs +++ b/pixie-uefi/src/os/net/speed.rs @@ -1,5 +1,6 @@ use core::fmt::Write; use core::sync::atomic::{AtomicU64, Ordering}; +use core::time::Duration; use pixie_shared::util::BytesFmt; use uefi::proto::console::text::Color; @@ -71,7 +72,7 @@ pub(super) fn spawn_network_speed_task() { Color::Black, ); writeln!(draw_area, "{:10.1}/s", BytesFmt(vtx)).unwrap(); - Executor::sleep_us(1_000_000).await; + Executor::sleep(Duration::from_secs(1)).await; } }); } diff --git a/pixie-uefi/src/os/net/tcp.rs b/pixie-uefi/src/os/net/tcp.rs index 1afc9b9b..8004575a 100644 --- a/pixie-uefi/src/os/net/tcp.rs +++ b/pixie-uefi/src/os/net/tcp.rs @@ -65,7 +65,12 @@ impl TcpStream { let state = with_net(|n| n.socket_set.get_mut::(self.handle).state()); let res = f(state); if matches!(res, Poll::Pending) { - cx.waker().wake_by_ref(); + with_net(|n| { + let socket = n.socket_set.get_mut::(self.handle); + // Every meaningful state change causes either send or recv to become possible (possibly erroring). + socket.register_send_waker(cx.waker()); + socket.register_recv_waker(cx.waker()); + }); } res }) diff --git a/pixie-uefi/src/os/ui.rs b/pixie-uefi/src/os/ui.rs index ea2f69a1..ae69ac8c 100644 --- a/pixie-uefi/src/os/ui.rs +++ b/pixie-uefi/src/os/ui.rs @@ -1,6 +1,7 @@ use alloc::vec::Vec; use core::fmt::Write; use core::sync::atomic::{AtomicUsize, Ordering}; +use core::time::Duration; use pixie_shared::util::BytesFmt; use spin::lazy::Lazy; @@ -81,7 +82,7 @@ pub(super) fn init() { Executor::spawn("[flush_ui]", async move { loop { SCREEN.lock().flush(); - Executor::sleep_us(100_000).await; + Executor::sleep(Duration::from_millis(100)).await; } }); @@ -94,7 +95,7 @@ pub(super) fn init() { let time = Timer::micros() as f32 * 0.000_001; let w = draw_area.size().0; write!(draw_area, "uptime:{0:1$}{time:12.1}s", "", w - 20).unwrap(); - Executor::sleep_us(50_000).await; + Executor::sleep(Duration::from_millis(50)).await; } }); @@ -113,7 +114,7 @@ pub(super) fn init() { w - 27, ) .unwrap(); - Executor::sleep_us(1_000_000).await; + Executor::sleep(Duration::from_secs(1)).await; } }); diff --git a/pixie-uefi/src/power_control.rs b/pixie-uefi/src/power_control.rs index ca88c3a8..24bfc4b6 100644 --- a/pixie-uefi/src/power_control.rs +++ b/pixie-uefi/src/power_control.rs @@ -1,3 +1,5 @@ +use core::time::Duration; + use uefi::Status; use crate::os::boot_options::BootOptions; @@ -14,7 +16,7 @@ pub async fn reboot_to_os() -> ! { BootOptions::current() ); log::warn!("{:?}", BootOptions::order()); - Executor::sleep_us(100_000_000).await; + Executor::sleep(Duration::from_secs(100)).await; } reset(); } From 6b8cd3009c8c67293aa678e4eef6ff34fe37b999 Mon Sep 17 00:00:00 2001 From: Filippo Casarin Date: Sun, 21 Dec 2025 18:51:58 +0100 Subject: [PATCH 2/3] Add Event --- pixie-uefi/src/os/executor/event.rs | 57 ++++++++++ .../src/os/{executor.rs => executor/mod.rs} | 105 ++++++++---------- pixie-uefi/src/os/input.rs | 20 +--- pixie-uefi/src/os/net/mod.rs | 60 +++++----- 4 files changed, 132 insertions(+), 110 deletions(-) create mode 100644 pixie-uefi/src/os/executor/event.rs rename pixie-uefi/src/os/{executor.rs => executor/mod.rs} (81%) diff --git a/pixie-uefi/src/os/executor/event.rs b/pixie-uefi/src/os/executor/event.rs new file mode 100644 index 00000000..29b0e6b2 --- /dev/null +++ b/pixie-uefi/src/os/executor/event.rs @@ -0,0 +1,57 @@ +use alloc::sync::{Arc, Weak}; +use core::future::Future; +use core::pin::Pin; +use core::sync::atomic::{AtomicBool, Ordering}; +use core::task::{Context, Poll}; + +use futures::task::AtomicWaker; + +#[derive(Debug, Default)] +struct Inner { + triggered: AtomicBool, + waker: AtomicWaker, +} + +pub struct Event { + inner: Arc, +} + +impl Event { + pub fn new() -> Self { + Self { + inner: Default::default(), + } + } + + pub fn trigger(&self) -> EventTrigger { + EventTrigger { + inner: Arc::downgrade(&self.inner), + } + } +} + +impl Future for Event { + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { + if self.inner.triggered.load(Ordering::Relaxed) { + Poll::Ready(()) + } else { + self.inner.waker.register(cx.waker()); + Poll::Pending + } + } +} + +pub struct EventTrigger { + inner: Weak, +} + +impl EventTrigger { + pub fn trigger(&self) { + if let Some(inner) = self.inner.upgrade() { + inner.triggered.store(true, Ordering::Relaxed); + inner.waker.wake(); + } + } +} diff --git a/pixie-uefi/src/os/executor.rs b/pixie-uefi/src/os/executor/mod.rs similarity index 81% rename from pixie-uefi/src/os/executor.rs rename to pixie-uefi/src/os/executor/mod.rs index a9d54613..7ea0bbcc 100644 --- a/pixie-uefi/src/os/executor.rs +++ b/pixie-uefi/src/os/executor/mod.rs @@ -12,14 +12,16 @@ use core::task::{Context, Poll, Waker}; use core::time::Duration; use futures::channel::oneshot; -use futures::task::AtomicWaker; use spin::Mutex; use uefi::proto::console::text::Color; +use crate::os::executor::event::{Event, EventTrigger}; use crate::os::send_wrapper::SendWrapper; use crate::os::timer::Timer; use crate::os::ui::DrawArea; +pub mod event; + type BoxFuture = SendWrapper + 'static>>>; struct Task { @@ -55,11 +57,9 @@ impl Wake for Task { } } -pub(super) type WrappedWaker = Arc; - struct TimedWait { wake_at: i64, - waker: WrappedWaker, + event: EventTrigger, } impl PartialEq for TimedWait { @@ -99,7 +99,7 @@ static EXECUTOR: Mutex = Mutex::new(Executor { }); pub struct Executor { - wake_on_interrupt: Vec, + wake_on_interrupt: Vec, timed_wait: BinaryHeap, ready_tasks: VecDeque>, tasks: Vec>, @@ -209,7 +209,7 @@ impl Executor { let mut do_wake = |force_interrupt_wake| { // Wake timed-waiting tasks. loop { - let waker = { + let event = { let mut ex = EXECUTOR.lock(); let Some(w) = ex.timed_wait.peek() else { break; @@ -218,9 +218,9 @@ impl Executor { break; } let w = ex.timed_wait.pop().unwrap(); - w.waker + w.event }; - waker.wake(); + event.trigger(); } // Since we don't notice interrupts that happened while we are not hlt-ing, // make sure that we wake up all the interrupt-based waiting tasks every at @@ -228,8 +228,8 @@ impl Executor { if last_interrupt_wakeup + INTERRUPT_MICROS <= Timer::micros() || force_interrupt_wake { last_interrupt_wakeup = Timer::micros(); let to_wake = core::mem::take(&mut EXECUTOR.lock().wake_on_interrupt); - for w in to_wake { - w.wake(); + for e in to_wake { + e.trigger(); } } }; @@ -237,26 +237,7 @@ impl Executor { loop { do_wake(false); let task = EXECUTOR.lock().ready_tasks.pop_front(); - if let Some(task) = task { - // It is possible for a done task to end up in the queue (if it wakes - // itself during execution). If that happens, we just remove it from - // the queue here. - if task.done.load(Ordering::Relaxed) { - continue; - } - task.in_queue.store(false, Ordering::Relaxed); - let waker = Waker::from(task.clone()); - let mut context = Context::from_waker(&waker); - let mut fut = task.future.try_lock().unwrap(); - let begin = Timer::micros(); - let done = fut.0.as_mut().poll(&mut context); - let end = Timer::micros(); - task.micros - .fetch_add((end - begin) as u64, Ordering::Relaxed); - if done.is_ready() { - task.done.swap(true, Ordering::Relaxed); - } - } else { + let Some(task) = task else { // If we don't have anything ready, sleep until the next interrupt. // SAFETY: hlt is available on all reasonable x86 processors and has no safety // requirements. @@ -264,6 +245,26 @@ impl Executor { core::arch::asm!("hlt"); } do_wake(true); + continue; + }; + + // It is possible for a done task to end up in the queue (if it wakes + // itself during execution). If that happens, we just remove it from + // the queue here. + if task.done.load(Ordering::Relaxed) { + continue; + } + task.in_queue.store(false, Ordering::Relaxed); + let waker = Waker::from(task.clone()); + let mut context = Context::from_waker(&waker); + let mut fut = task.future.try_lock().unwrap(); + let begin = Timer::micros(); + let done = fut.0.as_mut().poll(&mut context); + let end = Timer::micros(); + task.micros + .fetch_add((end - begin) as u64, Ordering::Relaxed); + if done.is_ready() { + task.done.swap(true, Ordering::Relaxed); } } } @@ -283,41 +284,23 @@ impl Executor { }) } + // Wakes a task as soon as *any* interrupt is received. + pub fn wait_for_interrupt() -> impl Future { + let event = Event::new(); + EXECUTOR.lock().wake_on_interrupt.push(event.trigger()); + event + } + // Note: there are no guarantees on whether the amount of time we will sleep for // will be exceeded. pub fn sleep(time: Duration) -> impl Future { let tgt = Timer::micros() + time.as_micros() as i64; - let mut ww = None; - poll_fn(move |cx| { - let now = Timer::micros(); - if now >= tgt { - Poll::Ready(()) - } else { - Self::wake_at_micros(tgt, cx.waker(), &mut ww); - Poll::Pending - } - }) - } - - // Wakes a task as soon as *any* interrupt is received. - pub(super) fn wake_on_interrupt(waker: &Waker) { - EXECUTOR.lock().wake_on_interrupt.push(waker.clone()); - } - - pub(super) fn wake_at_micros( - micros: i64, - waker: &Waker, - previous_waker: &mut Option, - ) { - if !previous_waker.is_some() { - let w = Arc::new(AtomicWaker::new()); - EXECUTOR.lock().timed_wait.push(TimedWait { - wake_at: micros, - waker: w.clone(), - }); - *previous_waker = Some(w); - } - previous_waker.as_ref().unwrap().register(waker); + let event = Event::new(); + EXECUTOR.lock().timed_wait.push(TimedWait { + wake_at: tgt, + event: event.trigger(), + }); + event } /// Spawn a new task. diff --git a/pixie-uefi/src/os/input.rs b/pixie-uefi/src/os/input.rs index a819297d..ffb82087 100644 --- a/pixie-uefi/src/os/input.rs +++ b/pixie-uefi/src/os/input.rs @@ -1,6 +1,3 @@ -use core::future::{poll_fn, Future}; -use core::task::Poll; - use spin::lazy::Lazy; use spin::Mutex; use uefi::boot::ScopedProtocol; @@ -16,16 +13,11 @@ static INPUT: Lazy>>> = Lazy::new(|| { Mutex::new(SendWrapper(input)) }); -pub fn read_key() -> impl Future> { - poll_fn(move |cx| { - let key = INPUT.lock().read_key(); - match key { - Err(e) => Poll::Ready(Err(e.into())), - Ok(Some(key)) => Poll::Ready(Ok(key)), - Ok(None) => { - Executor::wake_on_interrupt(cx.waker()); - Poll::Pending - } +pub async fn read_key() -> Result { + loop { + if let Some(key) = INPUT.lock().read_key()? { + break Ok(key); } - }) + Executor::wait_for_interrupt().await; + } } diff --git a/pixie-uefi/src/os/net/mod.rs b/pixie-uefi/src/os/net/mod.rs index f604284e..dffb7224 100644 --- a/pixie-uefi/src/os/net/mod.rs +++ b/pixie-uefi/src/os/net/mod.rs @@ -1,14 +1,10 @@ use alloc::string::{String, ToString}; -use alloc::sync::Arc; use alloc::vec::Vec; use core::fmt::Write; -use core::future::{poll_fn, Future}; use core::net::Ipv4Addr; use core::sync::atomic::{AtomicU64, Ordering}; -use core::task::Poll; use core::time::Duration; -use futures::task::AtomicWaker; use smoltcp::iface::{ Config, Interface, PollIngressSingleResult, PollResult, SocketHandle, SocketSet, }; @@ -25,7 +21,8 @@ use uefi::Handle; use super::timer::Timer; use crate::os::boot_options::BootOptions; -use crate::os::executor::{Executor, WrappedWaker}; +use crate::os::executor::event::{Event as ExecutorEvent, EventTrigger}; +use crate::os::executor::Executor; use crate::os::net::interface::SnpDevice; pub use crate::os::net::tcp::TcpStream; pub use crate::os::net::udp::UdpSocket; @@ -51,7 +48,7 @@ struct NetworkData { static NETWORK_DATA: Mutex> = Mutex::new(None); -static WAITING_FOR_IP: Mutex> = Mutex::new(vec![]); +static WAITING_FOR_IP: Mutex> = Mutex::new(vec![]); fn with_net T>(f: F) -> T { let mut mg = NETWORK_DATA.try_lock().expect("Network is locked"); @@ -127,27 +124,28 @@ pub(super) fn init() { dhcp_socket_handle, }); - Executor::spawn("[net_poll]", { - poll_fn(move |cx| { + Executor::spawn("[net_poll]", async { + loop { let wait = poll(); match wait { None => { - Executor::wake_on_interrupt(cx.waker()); + Executor::wait_for_interrupt().await; } - Some(x) if x < 200 => { + Some(wait) if wait < 200 => { // Immediately wake if we want call poll() again in a very short time. - cx.waker().wake_by_ref(); + Executor::sched_yield().await; } Some(wait) => { - // Halve the waiting time, to try to ensure that we don't exceed the suggested - // waiting time. - let deadline = Timer::micros() + wait as i64 / 2; - Executor::wake_on_interrupt(cx.waker()); - Executor::wake_at_micros(deadline, cx.waker(), &mut None); + futures::future::select( + Executor::wait_for_interrupt(), + // Halve the waiting time, to try to ensure that we don't exceed the suggested + // waiting time. + Executor::sleep(Duration::from_micros(wait / 2)), + ) + .await; } } - Poll::<()>::Pending - }) + } }); Executor::spawn("[show_ip]", async { @@ -169,21 +167,13 @@ pub(super) fn init() { speed::spawn_network_speed_task(); } -pub fn wait_for_ip() -> impl Future { - let mut last_waker = None; - poll_fn(move |cx| { - if ip().is_some() { - Poll::Ready(()) - } else { - if last_waker.is_none() { - let waker = Arc::new(AtomicWaker::new()); - WAITING_FOR_IP.lock().push(waker.clone()); - last_waker = Some(waker); - } - last_waker.as_ref().unwrap().register(cx.waker()); - Poll::Pending - } - }) +pub async fn wait_for_ip() { + if ip().is_some() { + return; + } + let event = ExecutorEvent::new(); + WAITING_FOR_IP.lock().push(event.trigger()); + event.await; } fn ip() -> Option { @@ -232,8 +222,8 @@ fn poll() -> Option { .unwrap(); } let to_wake = core::mem::take(&mut *WAITING_FOR_IP.lock()); - for w in to_wake { - w.wake(); + for e in to_wake { + e.trigger(); } } else { interface.update_ip_addrs(|a| { From 6d305ff1e25459f5f55b96b22ca2278af8991f77 Mon Sep 17 00:00:00 2001 From: Luca Versari Date: Sun, 21 Dec 2025 20:36:28 +0100 Subject: [PATCH 3/3] Change net sleeping logic --- pixie-uefi/src/os/net/mod.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/pixie-uefi/src/os/net/mod.rs b/pixie-uefi/src/os/net/mod.rs index dffb7224..9ab6294e 100644 --- a/pixie-uefi/src/os/net/mod.rs +++ b/pixie-uefi/src/os/net/mod.rs @@ -126,21 +126,22 @@ pub(super) fn init() { Executor::spawn("[net_poll]", async { loop { + const MIN_WAIT_US: u64 = 1000; let wait = poll(); match wait { None => { Executor::wait_for_interrupt().await; } - Some(wait) if wait < 200 => { + Some(wait) if wait < MIN_WAIT_US => { // Immediately wake if we want call poll() again in a very short time. Executor::sched_yield().await; } Some(wait) => { futures::future::select( Executor::wait_for_interrupt(), - // Halve the waiting time, to try to ensure that we don't exceed the suggested - // waiting time. - Executor::sleep(Duration::from_micros(wait / 2)), + // Reduce the waiting time, to try to ensure that we don't exceed the + // suggested waiting time. + Executor::sleep(Duration::from_micros(wait - MIN_WAIT_US)), ) .await; }