Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions pixie-uefi/src/flash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use core::cell::{Cell, RefCell};
use core::fmt::Write;
use core::mem;
use core::net::SocketAddrV4;
use core::time::Duration;

use futures::future::{select, Either};
use log::info;
Expand Down Expand Up @@ -120,7 +121,7 @@ pub async fn flash(server_addr: SocketAddrV4) -> Result<()> {
let draw_task = async {
let mut last_stats = stats.borrow().clone();
while !done.get() {
Executor::sleep_us(100_000).await;
Executor::sleep(Duration::from_millis(100)).await;
let stats = stats.borrow().clone();
if stats == last_stats {
continue;
Expand Down Expand Up @@ -176,7 +177,7 @@ pub async fn flash(server_addr: SocketAddrV4) -> Result<()> {
);
while !chunks_info.is_empty() {
let recv = Box::pin(socket.recv_from(&mut buf));
let sleep = Box::pin(Executor::sleep_us(100_000));
let sleep = Box::pin(Executor::sleep(Duration::from_millis(100)));
match select(recv, sleep).await {
Either::Left(((buf, _addr), _)) => {
stats.borrow_mut().pack_recv += 1;
Expand Down
13 changes: 5 additions & 8 deletions pixie-uefi/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ extern crate alloc;

use alloc::boxed::Box;
use core::net::{Ipv4Addr, SocketAddrV4};
use core::time::Duration;

use futures::future::{self, Either};
use pixie_shared::{Action, TcpRequest, UdpRequest, ACTION_PORT, PING_PORT};
Expand Down Expand Up @@ -43,7 +44,7 @@ async fn server_discover() -> Result<SocketAddrV4> {
socket
.send_to(SocketAddrV4::new(Ipv4Addr::BROADCAST, ACTION_PORT), &msg)
.await?;
Executor::sleep_us(1_000_000).await;
Executor::sleep(Duration::from_secs(1)).await;
})
};

Expand All @@ -70,7 +71,7 @@ async fn shutdown() -> ! {
export_cov::export().await;

log::info!("Shutting down...");
Executor::sleep_us(1_000_000).await;
Executor::sleep(Duration::from_secs(1)).await;
power_control::shutdown()
}

Expand Down Expand Up @@ -108,7 +109,7 @@ async fn run() -> Result<()> {
.send_to(SocketAddrV4::new(*server.ip(), PING_PORT), b"pixie")
.await
.unwrap();
Executor::sleep_us(10_000_000).await;
Executor::sleep(Duration::from_secs(10)).await;
}
});

Expand All @@ -132,11 +133,7 @@ async fn run() -> Result<()> {
log::warn!("Started waiting for another command...");
}
last_was_wait = true;
const WAIT_10MSECS: u64 = 50;
for _ in 0..WAIT_10MSECS {
Executor::deep_sleep_us(10_000);
Executor::sched_yield().await;
}
Executor::sleep(Duration::from_millis(500)).await;
} else {
last_was_wait = false;
log::info!("Command: {command:?}");
Expand Down
57 changes: 57 additions & 0 deletions pixie-uefi/src/os/executor/event.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
use alloc::sync::{Arc, Weak};
use core::future::Future;
use core::pin::Pin;
use core::sync::atomic::{AtomicBool, Ordering};
use core::task::{Context, Poll};

use futures::task::AtomicWaker;

#[derive(Debug, Default)]
struct Inner {
triggered: AtomicBool,
waker: AtomicWaker,
}

pub struct Event {
inner: Arc<Inner>,
}

impl Event {
pub fn new() -> Self {
Self {
inner: Default::default(),
}
}

pub fn trigger(&self) -> EventTrigger {
EventTrigger {
inner: Arc::downgrade(&self.inner),
}
}
}

impl Future for Event {
type Output = ();

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
if self.inner.triggered.load(Ordering::Relaxed) {
Poll::Ready(())
} else {
self.inner.waker.register(cx.waker());
Poll::Pending
}
}
}

pub struct EventTrigger {
inner: Weak<Inner>,
}

impl EventTrigger {
pub fn trigger(&self) {
if let Some(inner) = self.inner.upgrade() {
inner.triggered.store(true, Ordering::Relaxed);
inner.waker.wake();
}
}
}
142 changes: 109 additions & 33 deletions pixie-uefi/src/os/executor.rs → pixie-uefi/src/os/executor/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use alloc::boxed::Box;
use alloc::collections::binary_heap::BinaryHeap;
use alloc::collections::VecDeque;
use alloc::sync::Arc;
use alloc::task::Wake;
Expand All @@ -8,16 +9,19 @@ use core::future::{poll_fn, Future};
use core::pin::Pin;
use core::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use core::task::{Context, Poll, Waker};
use futures::channel::oneshot;
use core::time::Duration;

use futures::channel::oneshot;
use spin::Mutex;
use uefi::boot::{EventType, TimerTrigger, Tpl};
use uefi::proto::console::text::Color;

use crate::os::executor::event::{Event, EventTrigger};
use crate::os::send_wrapper::SendWrapper;
use crate::os::timer::Timer;
use crate::os::ui::DrawArea;

pub mod event;

type BoxFuture = SendWrapper<Pin<Box<dyn Future<Output = ()> + 'static>>>;

struct Task {
Expand Down Expand Up @@ -53,10 +57,31 @@ impl Wake for Task {
}
}

static EXECUTOR: Mutex<Executor> = Mutex::new(Executor {
ready_tasks: VecDeque::new(),
tasks: vec![],
});
struct TimedWait {
wake_at: i64,
event: EventTrigger,
}

impl PartialEq for TimedWait {
fn eq(&self, other: &Self) -> bool {
self.wake_at == other.wake_at
}
}

impl Eq for TimedWait {}

impl PartialOrd for TimedWait {
fn partial_cmp(&self, other: &Self) -> Option<core::cmp::Ordering> {
Some(self.cmp(other))
}
}

impl Ord for TimedWait {
fn cmp(&self, other: &Self) -> core::cmp::Ordering {
// Reversed order: min-heap.
other.wake_at.cmp(&self.wake_at)
}
}

pub struct JoinHandle<T>(oneshot::Receiver<T>);

Expand All @@ -66,8 +91,16 @@ impl<T> JoinHandle<T> {
}
}

static EXECUTOR: Mutex<Executor> = Mutex::new(Executor {
wake_on_interrupt: vec![],
timed_wait: BinaryHeap::new(),
ready_tasks: VecDeque::new(),
tasks: vec![],
});

pub struct Executor {
// TODO(veluca): scheduling.
wake_on_interrupt: Vec<EventTrigger>,
timed_wait: BinaryHeap<TimedWait>,
ready_tasks: VecDeque<Arc<Task>>,
tasks: Vec<Arc<Task>>,
}
Expand All @@ -81,7 +114,7 @@ impl Executor {
assert!((w - 1).is_multiple_of(TASK_LEN + 1));
let num_w = (w - 1) / (TASK_LEN + 1);
let mut last = Timer::micros() as u64;
Self::sleep_us(100_000).await;
Self::sleep(Duration::from_millis(100)).await;
loop {
draw_area.clear();
let cur = Timer::micros() as u64;
Expand Down Expand Up @@ -161,19 +194,66 @@ impl Executor {
tasks.retain(|t| !t.done.load(Ordering::Relaxed));
}
last = Timer::micros() as u64;
Self::sleep_us(1_000_000).await;
Self::sleep(Duration::from_secs(1)).await
}
}

pub fn run() -> ! {
Self::spawn("[show_tasks]", Self::draw_tasks());

// Maximum amount of microseconds between wakeups of interrupt-based wakers.
const INTERRUPT_MICROS: i64 = 500;

let mut last_interrupt_wakeup = Timer::micros();

let mut do_wake = |force_interrupt_wake| {
// Wake timed-waiting tasks.
loop {
let event = {
let mut ex = EXECUTOR.lock();
let Some(w) = ex.timed_wait.peek() else {
break;
};
if w.wake_at > Timer::micros() {
break;
}
let w = ex.timed_wait.pop().unwrap();
w.event
};
event.trigger();
}
// Since we don't notice interrupts that happened while we are not hlt-ing,
// make sure that we wake up all the interrupt-based waiting tasks every at
// most INTERRUPT_MICROS micros to make it unlikely to miss interrupts.
if last_interrupt_wakeup + INTERRUPT_MICROS <= Timer::micros() || force_interrupt_wake {
last_interrupt_wakeup = Timer::micros();
let to_wake = core::mem::take(&mut EXECUTOR.lock().wake_on_interrupt);
for e in to_wake {
e.trigger();
}
}
};

loop {
let task = EXECUTOR
.lock()
.ready_tasks
.pop_front()
.expect("Executor should never run out of ready tasks");
do_wake(false);
let task = EXECUTOR.lock().ready_tasks.pop_front();
let Some(task) = task else {
// If we don't have anything ready, sleep until the next interrupt.
// SAFETY: hlt is available on all reasonable x86 processors and has no safety
// requirements.
unsafe {
core::arch::asm!("hlt");
}
do_wake(true);
continue;
};

// It is possible for a done task to end up in the queue (if it wakes
// itself during execution). If that happens, we just remove it from
// the queue here.
if task.done.load(Ordering::Relaxed) {
continue;
}
task.in_queue.store(false, Ordering::Relaxed);
let waker = Waker::from(task.clone());
let mut context = Context::from_waker(&waker);
Expand Down Expand Up @@ -204,27 +284,23 @@ impl Executor {
})
}

pub fn sleep_us(us: u64) -> impl Future<Output = ()> {
let tgt = Timer::micros() as u64 + us;
poll_fn(move |cx| {
let now = Timer::micros() as u64;
if now >= tgt {
Poll::Ready(())
} else {
// TODO(veluca): actually suspend the task.
cx.waker().wake_by_ref();
Poll::Pending
}
})
// Wakes a task as soon as *any* interrupt is received.
pub fn wait_for_interrupt() -> impl Future<Output = ()> {
let event = Event::new();
EXECUTOR.lock().wake_on_interrupt.push(event.trigger());
event
}

/// **WARNING**: this function halts all tasks
pub fn deep_sleep_us(us: u64) {
// SAFETY: we are not using a callback
let e =
unsafe { uefi::boot::create_event(EventType::TIMER, Tpl::NOTIFY, None, None).unwrap() };
uefi::boot::set_timer(&e, TimerTrigger::Relative(10 * us)).unwrap();
uefi::boot::wait_for_event(&mut [e]).unwrap();
// Note: there are no guarantees on whether the amount of time we will sleep for
// will be exceeded.
pub fn sleep(time: Duration) -> impl Future<Output = ()> {
let tgt = Timer::micros() + time.as_micros() as i64;
let event = Event::new();
EXECUTOR.lock().timed_wait.push(TimedWait {
wake_at: tgt,
event: event.trigger(),
});
event
}

/// Spawn a new task.
Expand Down
21 changes: 7 additions & 14 deletions pixie-uefi/src/os/input.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
use core::future::{poll_fn, Future};
use core::task::Poll;

use spin::lazy::Lazy;
use spin::Mutex;
use uefi::boot::ScopedProtocol;
use uefi::proto::console::text::{Input, Key};

use crate::os::error::Result;
use crate::os::executor::Executor;
use crate::os::send_wrapper::SendWrapper;

static INPUT: Lazy<Mutex<SendWrapper<ScopedProtocol<Input>>>> = Lazy::new(|| {
Expand All @@ -15,16 +13,11 @@ static INPUT: Lazy<Mutex<SendWrapper<ScopedProtocol<Input>>>> = Lazy::new(|| {
Mutex::new(SendWrapper(input))
});

pub fn read_key() -> impl Future<Output = Result<Key>> {
poll_fn(move |cx| {
let key = INPUT.lock().read_key();
match key {
Err(e) => Poll::Ready(Err(e.into())),
Ok(Some(key)) => Poll::Ready(Ok(key)),
Ok(None) => {
cx.waker().wake_by_ref();
Poll::Pending
}
pub async fn read_key() -> Result<Key> {
loop {
if let Some(key) = INPUT.lock().read_key()? {
break Ok(key);
}
})
Executor::wait_for_interrupt().await;
}
}
3 changes: 2 additions & 1 deletion pixie-uefi/src/os/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use core::ffi::c_void;
use core::future::Future;
use core::ptr::NonNull;
use core::sync::atomic::{AtomicBool, Ordering};
use core::time::Duration;

use uefi::boot::{EventType, Tpl};
use uefi::{Event, Status};
Expand Down Expand Up @@ -72,7 +73,7 @@ where
break;
}

Executor::sleep_us(30_000_000).await;
Executor::sleep(Duration::from_secs(30)).await;
}
});

Expand Down
Loading