diff --git a/pixie-uefi/src/flash.rs b/pixie-uefi/src/flash.rs index 6167ea9..26826ef 100644 --- a/pixie-uefi/src/flash.rs +++ b/pixie-uefi/src/flash.rs @@ -5,6 +5,7 @@ use core::cell::{Cell, RefCell}; use core::fmt::Write; use core::mem; use core::net::SocketAddrV4; +use core::time::Duration; use futures::future::{select, Either}; use log::info; @@ -120,7 +121,7 @@ pub async fn flash(server_addr: SocketAddrV4) -> Result<()> { let draw_task = async { let mut last_stats = stats.borrow().clone(); while !done.get() { - Executor::sleep_us(100_000).await; + Executor::sleep(Duration::from_millis(100)).await; let stats = stats.borrow().clone(); if stats == last_stats { continue; @@ -176,7 +177,7 @@ pub async fn flash(server_addr: SocketAddrV4) -> Result<()> { ); while !chunks_info.is_empty() { let recv = Box::pin(socket.recv_from(&mut buf)); - let sleep = Box::pin(Executor::sleep_us(100_000)); + let sleep = Box::pin(Executor::sleep(Duration::from_millis(100))); match select(recv, sleep).await { Either::Left(((buf, _addr), _)) => { stats.borrow_mut().pack_recv += 1; diff --git a/pixie-uefi/src/main.rs b/pixie-uefi/src/main.rs index 8b58c96..1970b1b 100644 --- a/pixie-uefi/src/main.rs +++ b/pixie-uefi/src/main.rs @@ -7,6 +7,7 @@ extern crate alloc; use alloc::boxed::Box; use core::net::{Ipv4Addr, SocketAddrV4}; +use core::time::Duration; use futures::future::{self, Either}; use pixie_shared::{Action, TcpRequest, UdpRequest, ACTION_PORT, PING_PORT}; @@ -43,7 +44,7 @@ async fn server_discover() -> Result { socket .send_to(SocketAddrV4::new(Ipv4Addr::BROADCAST, ACTION_PORT), &msg) .await?; - Executor::sleep_us(1_000_000).await; + Executor::sleep(Duration::from_secs(1)).await; }) }; @@ -70,7 +71,7 @@ async fn shutdown() -> ! { export_cov::export().await; log::info!("Shutting down..."); - Executor::sleep_us(1_000_000).await; + Executor::sleep(Duration::from_secs(1)).await; power_control::shutdown() } @@ -108,7 +109,7 @@ async fn run() -> Result<()> { .send_to(SocketAddrV4::new(*server.ip(), PING_PORT), b"pixie") .await .unwrap(); - Executor::sleep_us(10_000_000).await; + Executor::sleep(Duration::from_secs(10)).await; } }); @@ -132,11 +133,7 @@ async fn run() -> Result<()> { log::warn!("Started waiting for another command..."); } last_was_wait = true; - const WAIT_10MSECS: u64 = 50; - for _ in 0..WAIT_10MSECS { - Executor::deep_sleep_us(10_000); - Executor::sched_yield().await; - } + Executor::sleep(Duration::from_millis(500)).await; } else { last_was_wait = false; log::info!("Command: {command:?}"); diff --git a/pixie-uefi/src/os/executor/event.rs b/pixie-uefi/src/os/executor/event.rs new file mode 100644 index 0000000..29b0e6b --- /dev/null +++ b/pixie-uefi/src/os/executor/event.rs @@ -0,0 +1,57 @@ +use alloc::sync::{Arc, Weak}; +use core::future::Future; +use core::pin::Pin; +use core::sync::atomic::{AtomicBool, Ordering}; +use core::task::{Context, Poll}; + +use futures::task::AtomicWaker; + +#[derive(Debug, Default)] +struct Inner { + triggered: AtomicBool, + waker: AtomicWaker, +} + +pub struct Event { + inner: Arc, +} + +impl Event { + pub fn new() -> Self { + Self { + inner: Default::default(), + } + } + + pub fn trigger(&self) -> EventTrigger { + EventTrigger { + inner: Arc::downgrade(&self.inner), + } + } +} + +impl Future for Event { + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { + if self.inner.triggered.load(Ordering::Relaxed) { + Poll::Ready(()) + } else { + self.inner.waker.register(cx.waker()); + Poll::Pending + } + } +} + +pub struct EventTrigger { + inner: Weak, +} + +impl EventTrigger { + pub fn trigger(&self) { + if let Some(inner) = self.inner.upgrade() { + inner.triggered.store(true, Ordering::Relaxed); + inner.waker.wake(); + } + } +} diff --git a/pixie-uefi/src/os/executor.rs b/pixie-uefi/src/os/executor/mod.rs similarity index 65% rename from pixie-uefi/src/os/executor.rs rename to pixie-uefi/src/os/executor/mod.rs index 76eb25c..7ea0bbc 100644 --- a/pixie-uefi/src/os/executor.rs +++ b/pixie-uefi/src/os/executor/mod.rs @@ -1,4 +1,5 @@ use alloc::boxed::Box; +use alloc::collections::binary_heap::BinaryHeap; use alloc::collections::VecDeque; use alloc::sync::Arc; use alloc::task::Wake; @@ -8,16 +9,19 @@ use core::future::{poll_fn, Future}; use core::pin::Pin; use core::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use core::task::{Context, Poll, Waker}; -use futures::channel::oneshot; +use core::time::Duration; +use futures::channel::oneshot; use spin::Mutex; -use uefi::boot::{EventType, TimerTrigger, Tpl}; use uefi::proto::console::text::Color; +use crate::os::executor::event::{Event, EventTrigger}; use crate::os::send_wrapper::SendWrapper; use crate::os::timer::Timer; use crate::os::ui::DrawArea; +pub mod event; + type BoxFuture = SendWrapper + 'static>>>; struct Task { @@ -53,10 +57,31 @@ impl Wake for Task { } } -static EXECUTOR: Mutex = Mutex::new(Executor { - ready_tasks: VecDeque::new(), - tasks: vec![], -}); +struct TimedWait { + wake_at: i64, + event: EventTrigger, +} + +impl PartialEq for TimedWait { + fn eq(&self, other: &Self) -> bool { + self.wake_at == other.wake_at + } +} + +impl Eq for TimedWait {} + +impl PartialOrd for TimedWait { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for TimedWait { + fn cmp(&self, other: &Self) -> core::cmp::Ordering { + // Reversed order: min-heap. + other.wake_at.cmp(&self.wake_at) + } +} pub struct JoinHandle(oneshot::Receiver); @@ -66,8 +91,16 @@ impl JoinHandle { } } +static EXECUTOR: Mutex = Mutex::new(Executor { + wake_on_interrupt: vec![], + timed_wait: BinaryHeap::new(), + ready_tasks: VecDeque::new(), + tasks: vec![], +}); + pub struct Executor { - // TODO(veluca): scheduling. + wake_on_interrupt: Vec, + timed_wait: BinaryHeap, ready_tasks: VecDeque>, tasks: Vec>, } @@ -81,7 +114,7 @@ impl Executor { assert!((w - 1).is_multiple_of(TASK_LEN + 1)); let num_w = (w - 1) / (TASK_LEN + 1); let mut last = Timer::micros() as u64; - Self::sleep_us(100_000).await; + Self::sleep(Duration::from_millis(100)).await; loop { draw_area.clear(); let cur = Timer::micros() as u64; @@ -161,19 +194,66 @@ impl Executor { tasks.retain(|t| !t.done.load(Ordering::Relaxed)); } last = Timer::micros() as u64; - Self::sleep_us(1_000_000).await; + Self::sleep(Duration::from_secs(1)).await } } pub fn run() -> ! { Self::spawn("[show_tasks]", Self::draw_tasks()); + + // Maximum amount of microseconds between wakeups of interrupt-based wakers. + const INTERRUPT_MICROS: i64 = 500; + + let mut last_interrupt_wakeup = Timer::micros(); + + let mut do_wake = |force_interrupt_wake| { + // Wake timed-waiting tasks. + loop { + let event = { + let mut ex = EXECUTOR.lock(); + let Some(w) = ex.timed_wait.peek() else { + break; + }; + if w.wake_at > Timer::micros() { + break; + } + let w = ex.timed_wait.pop().unwrap(); + w.event + }; + event.trigger(); + } + // Since we don't notice interrupts that happened while we are not hlt-ing, + // make sure that we wake up all the interrupt-based waiting tasks every at + // most INTERRUPT_MICROS micros to make it unlikely to miss interrupts. + if last_interrupt_wakeup + INTERRUPT_MICROS <= Timer::micros() || force_interrupt_wake { + last_interrupt_wakeup = Timer::micros(); + let to_wake = core::mem::take(&mut EXECUTOR.lock().wake_on_interrupt); + for e in to_wake { + e.trigger(); + } + } + }; + loop { - let task = EXECUTOR - .lock() - .ready_tasks - .pop_front() - .expect("Executor should never run out of ready tasks"); + do_wake(false); + let task = EXECUTOR.lock().ready_tasks.pop_front(); + let Some(task) = task else { + // If we don't have anything ready, sleep until the next interrupt. + // SAFETY: hlt is available on all reasonable x86 processors and has no safety + // requirements. + unsafe { + core::arch::asm!("hlt"); + } + do_wake(true); + continue; + }; + // It is possible for a done task to end up in the queue (if it wakes + // itself during execution). If that happens, we just remove it from + // the queue here. + if task.done.load(Ordering::Relaxed) { + continue; + } task.in_queue.store(false, Ordering::Relaxed); let waker = Waker::from(task.clone()); let mut context = Context::from_waker(&waker); @@ -204,27 +284,23 @@ impl Executor { }) } - pub fn sleep_us(us: u64) -> impl Future { - let tgt = Timer::micros() as u64 + us; - poll_fn(move |cx| { - let now = Timer::micros() as u64; - if now >= tgt { - Poll::Ready(()) - } else { - // TODO(veluca): actually suspend the task. - cx.waker().wake_by_ref(); - Poll::Pending - } - }) + // Wakes a task as soon as *any* interrupt is received. + pub fn wait_for_interrupt() -> impl Future { + let event = Event::new(); + EXECUTOR.lock().wake_on_interrupt.push(event.trigger()); + event } - /// **WARNING**: this function halts all tasks - pub fn deep_sleep_us(us: u64) { - // SAFETY: we are not using a callback - let e = - unsafe { uefi::boot::create_event(EventType::TIMER, Tpl::NOTIFY, None, None).unwrap() }; - uefi::boot::set_timer(&e, TimerTrigger::Relative(10 * us)).unwrap(); - uefi::boot::wait_for_event(&mut [e]).unwrap(); + // Note: there are no guarantees on whether the amount of time we will sleep for + // will be exceeded. + pub fn sleep(time: Duration) -> impl Future { + let tgt = Timer::micros() + time.as_micros() as i64; + let event = Event::new(); + EXECUTOR.lock().timed_wait.push(TimedWait { + wake_at: tgt, + event: event.trigger(), + }); + event } /// Spawn a new task. diff --git a/pixie-uefi/src/os/input.rs b/pixie-uefi/src/os/input.rs index 9545de0..ffb8208 100644 --- a/pixie-uefi/src/os/input.rs +++ b/pixie-uefi/src/os/input.rs @@ -1,12 +1,10 @@ -use core::future::{poll_fn, Future}; -use core::task::Poll; - use spin::lazy::Lazy; use spin::Mutex; use uefi::boot::ScopedProtocol; use uefi::proto::console::text::{Input, Key}; use crate::os::error::Result; +use crate::os::executor::Executor; use crate::os::send_wrapper::SendWrapper; static INPUT: Lazy>>> = Lazy::new(|| { @@ -15,16 +13,11 @@ static INPUT: Lazy>>> = Lazy::new(|| { Mutex::new(SendWrapper(input)) }); -pub fn read_key() -> impl Future> { - poll_fn(move |cx| { - let key = INPUT.lock().read_key(); - match key { - Err(e) => Poll::Ready(Err(e.into())), - Ok(Some(key)) => Poll::Ready(Ok(key)), - Ok(None) => { - cx.waker().wake_by_ref(); - Poll::Pending - } +pub async fn read_key() -> Result { + loop { + if let Some(key) = INPUT.lock().read_key()? { + break Ok(key); } - }) + Executor::wait_for_interrupt().await; + } } diff --git a/pixie-uefi/src/os/mod.rs b/pixie-uefi/src/os/mod.rs index d10a154..c2093ad 100644 --- a/pixie-uefi/src/os/mod.rs +++ b/pixie-uefi/src/os/mod.rs @@ -2,6 +2,7 @@ use core::ffi::c_void; use core::future::Future; use core::ptr::NonNull; use core::sync::atomic::{AtomicBool, Ordering}; +use core::time::Duration; use uefi::boot::{EventType, Tpl}; use uefi::{Event, Status}; @@ -72,7 +73,7 @@ where break; } - Executor::sleep_us(30_000_000).await; + Executor::sleep(Duration::from_secs(30)).await; } }); diff --git a/pixie-uefi/src/os/net/interface.rs b/pixie-uefi/src/os/net/interface.rs index 9afe786..9ca40f8 100644 --- a/pixie-uefi/src/os/net/interface.rs +++ b/pixie-uefi/src/os/net/interface.rs @@ -5,8 +5,9 @@ use uefi::proto::network::snp::{ReceiveFlags, SimpleNetwork}; use uefi::Status; use super::ETH_PACKET_SIZE; +use crate::os::send_wrapper::SendWrapper; -type Snp = ScopedProtocol; +type Snp = SendWrapper>; pub struct SnpDevice { snp: Snp, @@ -15,9 +16,6 @@ pub struct SnpDevice { rx_buf: [u8; ETH_PACKET_SIZE + 4], } -// SAFETY: we never create threads anyway. -unsafe impl Send for SnpDevice {} - impl SnpDevice { pub fn new(snp: Snp) -> SnpDevice { // Shut down the SNP protocol if needed. diff --git a/pixie-uefi/src/os/net/mod.rs b/pixie-uefi/src/os/net/mod.rs index 04be355..9ab6294 100644 --- a/pixie-uefi/src/os/net/mod.rs +++ b/pixie-uefi/src/os/net/mod.rs @@ -1,11 +1,13 @@ use alloc::string::{String, ToString}; +use alloc::vec::Vec; use core::fmt::Write; -use core::future::{poll_fn, Future}; use core::net::Ipv4Addr; use core::sync::atomic::{AtomicU64, Ordering}; -use core::task::Poll; +use core::time::Duration; -use smoltcp::iface::{Config, Interface, PollResult, SocketHandle, SocketSet}; +use smoltcp::iface::{ + Config, Interface, PollIngressSingleResult, PollResult, SocketHandle, SocketSet, +}; use smoltcp::socket::dhcpv4::{Event, Socket as Dhcpv4Socket}; use smoltcp::wire::{DhcpOption, HardwareAddress, IpCidr}; use spin::Mutex; @@ -19,10 +21,12 @@ use uefi::Handle; use super::timer::Timer; use crate::os::boot_options::BootOptions; +use crate::os::executor::event::{Event as ExecutorEvent, EventTrigger}; use crate::os::executor::Executor; use crate::os::net::interface::SnpDevice; pub use crate::os::net::tcp::TcpStream; pub use crate::os::net::udp::UdpSocket; +use crate::os::send_wrapper::SendWrapper; use crate::os::timer::rdtsc; use crate::os::ui; @@ -44,6 +48,8 @@ struct NetworkData { static NETWORK_DATA: Mutex> = Mutex::new(None); +static WAITING_FOR_IP: Mutex> = Mutex::new(vec![]); + fn with_net T>(f: F) -> T { let mut mg = NETWORK_DATA.try_lock().expect("Network is locked"); f(mg.as_mut().expect("Network is not initialized")) @@ -97,7 +103,7 @@ pub(super) fn init() { &snp.mode().current_address.0[..6], )); - let mut device = SnpDevice::new(snp); + let mut device = SnpDevice::new(SendWrapper(snp)); let mut interface_config = Config::new(hw_addr); interface_config.random_seed = rdtsc() as u64; @@ -118,15 +124,30 @@ pub(super) fn init() { dhcp_socket_handle, }); - Executor::spawn( - "[net_poll]", - poll_fn(move |cx| { - poll(); - // TODO(veluca): figure out whether we can suspend the task. - cx.waker().wake_by_ref(); - Poll::<()>::Pending - }), - ); + Executor::spawn("[net_poll]", async { + loop { + const MIN_WAIT_US: u64 = 1000; + let wait = poll(); + match wait { + None => { + Executor::wait_for_interrupt().await; + } + Some(wait) if wait < MIN_WAIT_US => { + // Immediately wake if we want call poll() again in a very short time. + Executor::sched_yield().await; + } + Some(wait) => { + futures::future::select( + Executor::wait_for_interrupt(), + // Reduce the waiting time, to try to ensure that we don't exceed the + // suggested waiting time. + Executor::sleep(Duration::from_micros(wait - MIN_WAIT_US)), + ) + .await; + } + } + } + }); Executor::spawn("[show_ip]", async { let mut draw_area = ui::DrawArea::ip(); @@ -136,10 +157,10 @@ pub(super) fn init() { let w = draw_area.size().0; if let Some(ip) = ip { write!(draw_area, "IP: {ip:>0$}", w - 4).unwrap(); - Executor::sleep_us(10_000_000).await + Executor::sleep(Duration::from_secs(10)).await } else { draw_area.write_with_color("DHCP...", Color::Yellow, Color::Black); - Executor::sleep_us(100_000).await + Executor::sleep(Duration::from_millis(100)).await } } }); @@ -147,15 +168,13 @@ pub(super) fn init() { speed::spawn_network_speed_task(); } -pub fn wait_for_ip() -> impl Future { - poll_fn(move |cx| { - if ip().is_some() { - Poll::Ready(()) - } else { - cx.waker().wake_by_ref(); - Poll::Pending - } - }) +pub async fn wait_for_ip() { + if ip().is_some() { + return; + } + let event = ExecutorEvent::new(); + WAITING_FOR_IP.lock().push(event.trigger()); + event.await; } fn ip() -> Option { @@ -167,25 +186,25 @@ fn get_ephemeral_port() -> u16 { ((ans % (60999 - 49152)) + 49152) as u16 } -fn poll() { +/// Returns # of microseconds to wait until we should call poll() again (possibly 0), or +/// None if we can wait until the next interrupt. +fn poll() -> Option { let now = Timer::instant(); let mut data = NETWORK_DATA.lock(); - let Some(NetworkData { + let NetworkData { interface, device, socket_set, dhcp_socket_handle, - }) = data.as_mut() - else { - return; - }; + } = data.as_mut().unwrap(); - let status = interface.poll(now, device, socket_set); + let status_out = interface.poll_egress(now, device, socket_set); + let status_in = interface.poll_ingress_single(now, device, socket_set); - if status == PollResult::None { - return; + if status_in == PollIngressSingleResult::None && status_out == PollResult::None { + return interface.poll_delay(now, socket_set).map(|x| x.micros()); } let dhcp_status = socket_set @@ -203,6 +222,10 @@ fn poll() { .add_default_ipv4_route(router) .unwrap(); } + let to_wake = core::mem::take(&mut *WAITING_FOR_IP.lock()); + for e in to_wake { + e.trigger(); + } } else { interface.update_ip_addrs(|a| { a.clear(); @@ -210,4 +233,5 @@ fn poll() { interface.routes_mut().remove_default_ipv4_route(); } } + Some(0) } diff --git a/pixie-uefi/src/os/net/speed.rs b/pixie-uefi/src/os/net/speed.rs index 89770cb..203ed12 100644 --- a/pixie-uefi/src/os/net/speed.rs +++ b/pixie-uefi/src/os/net/speed.rs @@ -1,5 +1,6 @@ use core::fmt::Write; use core::sync::atomic::{AtomicU64, Ordering}; +use core::time::Duration; use pixie_shared::util::BytesFmt; use uefi::proto::console::text::Color; @@ -71,7 +72,7 @@ pub(super) fn spawn_network_speed_task() { Color::Black, ); writeln!(draw_area, "{:10.1}/s", BytesFmt(vtx)).unwrap(); - Executor::sleep_us(1_000_000).await; + Executor::sleep(Duration::from_secs(1)).await; } }); } diff --git a/pixie-uefi/src/os/net/tcp.rs b/pixie-uefi/src/os/net/tcp.rs index 1afc9b9..8004575 100644 --- a/pixie-uefi/src/os/net/tcp.rs +++ b/pixie-uefi/src/os/net/tcp.rs @@ -65,7 +65,12 @@ impl TcpStream { let state = with_net(|n| n.socket_set.get_mut::(self.handle).state()); let res = f(state); if matches!(res, Poll::Pending) { - cx.waker().wake_by_ref(); + with_net(|n| { + let socket = n.socket_set.get_mut::(self.handle); + // Every meaningful state change causes either send or recv to become possible (possibly erroring). + socket.register_send_waker(cx.waker()); + socket.register_recv_waker(cx.waker()); + }); } res }) diff --git a/pixie-uefi/src/os/ui.rs b/pixie-uefi/src/os/ui.rs index ea2f69a..ae69ac8 100644 --- a/pixie-uefi/src/os/ui.rs +++ b/pixie-uefi/src/os/ui.rs @@ -1,6 +1,7 @@ use alloc::vec::Vec; use core::fmt::Write; use core::sync::atomic::{AtomicUsize, Ordering}; +use core::time::Duration; use pixie_shared::util::BytesFmt; use spin::lazy::Lazy; @@ -81,7 +82,7 @@ pub(super) fn init() { Executor::spawn("[flush_ui]", async move { loop { SCREEN.lock().flush(); - Executor::sleep_us(100_000).await; + Executor::sleep(Duration::from_millis(100)).await; } }); @@ -94,7 +95,7 @@ pub(super) fn init() { let time = Timer::micros() as f32 * 0.000_001; let w = draw_area.size().0; write!(draw_area, "uptime:{0:1$}{time:12.1}s", "", w - 20).unwrap(); - Executor::sleep_us(50_000).await; + Executor::sleep(Duration::from_millis(50)).await; } }); @@ -113,7 +114,7 @@ pub(super) fn init() { w - 27, ) .unwrap(); - Executor::sleep_us(1_000_000).await; + Executor::sleep(Duration::from_secs(1)).await; } }); diff --git a/pixie-uefi/src/power_control.rs b/pixie-uefi/src/power_control.rs index ca88c3a..24bfc4b 100644 --- a/pixie-uefi/src/power_control.rs +++ b/pixie-uefi/src/power_control.rs @@ -1,3 +1,5 @@ +use core::time::Duration; + use uefi::Status; use crate::os::boot_options::BootOptions; @@ -14,7 +16,7 @@ pub async fn reboot_to_os() -> ! { BootOptions::current() ); log::warn!("{:?}", BootOptions::order()); - Executor::sleep_us(100_000_000).await; + Executor::sleep(Duration::from_secs(100)).await; } reset(); }