Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions pixie-uefi/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,11 @@ async fn run() -> Result<()> {
Action::Boot => power_control::reboot_to_os().await,
Action::Restart => {}
Action::Shutdown => shutdown().await,
Action::Register => register(server).await?,
Action::Store => store(server).await?,
Action::Flash => flash(server).await?,
Action::Register => {
Executor::spawn("register", register(server)).join().await?
}
Action::Store => Executor::spawn("store", store(server)).join().await?,
Action::Flash => Executor::spawn("flash", flash(server)).join().await?,
}

let tcp = TcpStream::connect(server).await?;
Expand Down
32 changes: 27 additions & 5 deletions pixie-uefi/src/os/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use core::future::{poll_fn, Future};
use core::pin::Pin;
use core::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use core::task::{Context, Poll, Waker};
use futures::channel::oneshot;

use spin::Mutex;
use uefi::boot::{EventType, TimerTrigger, Tpl};
Expand All @@ -25,6 +26,7 @@ struct Task {
future: Mutex<BoxFuture>,
micros: AtomicU64,
last_micros: AtomicU64,
done: AtomicBool,
}

impl Task {
Expand All @@ -38,13 +40,14 @@ impl Task {
micros: AtomicU64::new(0),
last_micros: AtomicU64::new(0),
in_queue: AtomicBool::new(false),
done: AtomicBool::new(false),
})
}
}

impl Wake for Task {
fn wake(self: Arc<Self>) {
if !self.in_queue.swap(true, Ordering::Relaxed) {
if !self.in_queue.swap(true, Ordering::Relaxed) && !self.done.load(Ordering::Relaxed) {
EXECUTOR.lock().ready_tasks.push_back(self);
}
}
Expand All @@ -55,6 +58,14 @@ static EXECUTOR: Mutex<Executor> = Mutex::new(Executor {
tasks: vec![],
});

pub struct JoinHandle<T>(oneshot::Receiver<T>);

impl<T> JoinHandle<T> {
pub async fn join(self) -> T {
self.0.await.expect("tasks should never be cancelled")
}
}

pub struct Executor {
// TODO(veluca): scheduling.
ready_tasks: VecDeque<Arc<Task>>,
Expand Down Expand Up @@ -145,6 +156,9 @@ impl Executor {
t.last_micros
.store(t.micros.load(Ordering::Relaxed), Ordering::Relaxed);
}

// Clear completed tasks.
tasks.retain(|t| !t.done.load(Ordering::Relaxed));
}
last = Timer::micros() as u64;
Self::sleep_us(1_000_000).await;
Expand All @@ -165,10 +179,13 @@ impl Executor {
let mut context = Context::from_waker(&waker);
let mut fut = task.future.try_lock().unwrap();
let begin = Timer::micros();
let _ = fut.0.as_mut().poll(&mut context);
let done = fut.0.as_mut().poll(&mut context);
let end = Timer::micros();
task.micros
.fetch_add((end - begin) as u64, Ordering::Relaxed);
if done.is_ready() {
task.done.swap(true, Ordering::Relaxed);
}
}
}

Expand Down Expand Up @@ -211,13 +228,18 @@ impl Executor {
}

/// Spawn a new task.
pub fn spawn<Fut>(name: &'static str, f: Fut)
pub fn spawn<Fut, T: 'static>(name: &'static str, f: Fut) -> JoinHandle<T>
where
Fut: Future<Output = ()> + 'static,
Fut: Future<Output = T> + 'static,
{
let task = Task::new(name, f);
let (send, recv) = oneshot::channel();
let task = Task::new(name, async move {
let t = f.await;
let _ = send.send(t);
});
let mut executor = EXECUTOR.lock();
executor.tasks.push(task.clone());
executor.ready_tasks.push_back(task);
JoinHandle(recv)
}
}
2 changes: 1 addition & 1 deletion pixie-uefi/src/os/net/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ pub(super) fn init() {
poll();
// TODO(veluca): figure out whether we can suspend the task.
cx.waker().wake_by_ref();
Poll::Pending
Poll::<()>::Pending
}),
);

Expand Down