Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion crates/c/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -729,7 +729,6 @@ typedef uint32_t {snake}_callback_code_t;
#define {shouty}_CALLBACK_CODE_EXIT 0
#define {shouty}_CALLBACK_CODE_YIELD 1
#define {shouty}_CALLBACK_CODE_WAIT(set) (2 | (set << 4))
#define {shouty}_CALLBACK_CODE_POLL(set) (3 | (set << 4))
typedef enum {snake}_event_code {{
{shouty}_EVENT_NONE,
Expand Down
118 changes: 67 additions & 51 deletions crates/go/src/wit_async.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"runtime"
"unsafe"
"wit_component/wit_runtime"
)

const EVENT_NONE uint32 = 0
Expand All @@ -20,7 +21,6 @@ const STATUS_RETURNED uint32 = 2
const CALLBACK_CODE_EXIT uint32 = 0
const CALLBACK_CODE_YIELD uint32 = 1
const CALLBACK_CODE_WAIT uint32 = 2
const CALLBACK_CODE_POLL uint32 = 3

const RETURN_CODE_BLOCKED uint32 = 0xFFFFFFFF
const RETURN_CODE_COMPLETED uint32 = 0
Expand Down Expand Up @@ -74,37 +74,6 @@ func callback(event0, event1, event2 uint32) uint32 {
yielding <- unit{}
}

switch event0 {
case EVENT_NONE:

case EVENT_SUBTASK:
switch event2 {
case STATUS_STARTING:
panic(fmt.Sprintf("unexpected subtask status: %v", event2))

case STATUS_STARTED:

case STATUS_RETURNED:
waitableJoin(event1, 0)
subtaskDrop(event1)
channel := state.pending[event1]
delete(state.pending, event1)
channel <- event2

default:
panic("todo")
}

case EVENT_STREAM_READ, EVENT_STREAM_WRITE, EVENT_FUTURE_READ, EVENT_FUTURE_WRITE:
waitableJoin(event1, 0)
channel := state.pending[event1]
delete(state.pending, event1)
channel <- event2

default:
panic("todo")
}

// Tell the Go scheduler to write to `state.channel` only after all
// goroutines have either blocked or exited. This allows us to reliably
// delay returning control to the host until there's truly nothing more
Expand All @@ -121,31 +90,75 @@ func callback(event0, event1, event2 uint32) uint32 {
return false
})

// Block this goroutine until the scheduler wakes us up.
(<-state.channel)
for {
switch event0 {
case EVENT_NONE:

if state.yielding != nil {
contextSet(unsafe.Pointer(state))
if len(state.pending) == 0 {
return CALLBACK_CODE_YIELD
case EVENT_SUBTASK:
switch event2 {
case STATUS_STARTING:
panic(fmt.Sprintf("unexpected subtask status: %v", event2))

case STATUS_STARTED:

case STATUS_RETURNED:
waitableJoin(event1, 0)
subtaskDrop(event1)
channel := state.pending[event1]
delete(state.pending, event1)
channel <- event2

default:
panic("todo")
}

case EVENT_STREAM_READ, EVENT_STREAM_WRITE, EVENT_FUTURE_READ, EVENT_FUTURE_WRITE:
waitableJoin(event1, 0)
channel := state.pending[event1]
delete(state.pending, event1)
channel <- event2

default:
panic("todo")
}

// Block this goroutine until the scheduler wakes us up.
(<-state.channel)

if state.yielding != nil {
contextSet(unsafe.Pointer(state))
if len(state.pending) == 0 {
return CALLBACK_CODE_YIELD
} else {
if state.waitableSet == 0 {
panic("unreachable")
}
event0, event1, event2 = func() (uint32, uint32, uint32) {
pinner := runtime.Pinner{}
defer pinner.Unpin()
buffer := wit_runtime.Allocate(&pinner, 8, 4)
event0 := waitableSetPoll(state.waitableSet, buffer)
return event0,
unsafe.Slice((*uint32)(buffer), 2)[0],
unsafe.Slice((*uint32)(buffer), 2)[1]
}()
if event0 == EVENT_NONE {
return CALLBACK_CODE_YIELD
}
}
} else if len(state.pending) == 0 {
state.pinner.Unpin()
if state.waitableSet != 0 {
waitableSetDrop(state.waitableSet)
}
return CALLBACK_CODE_EXIT
} else {
if state.waitableSet == 0 {
panic("unreachable")
}
return CALLBACK_CODE_POLL | (state.waitableSet << 4)
contextSet(unsafe.Pointer(state))
return CALLBACK_CODE_WAIT | (state.waitableSet << 4)
}
} else if len(state.pending) == 0 {
state.pinner.Unpin()
if state.waitableSet != 0 {
waitableSetDrop(state.waitableSet)
}
return CALLBACK_CODE_EXIT
} else {
if state.waitableSet == 0 {
panic("unreachable")
}
contextSet(unsafe.Pointer(state))
return CALLBACK_CODE_WAIT | (state.waitableSet << 4)
}
}

Expand Down Expand Up @@ -196,6 +209,9 @@ func Yield() {
//go:wasmimport $root [waitable-set-new]
func waitableSetNew() uint32

//go:wasmimport $root [waitable-set-poll]
func waitableSetPoll(waitableSet uint32, eventPayload unsafe.Pointer) uint32

//go:wasmimport $root [waitable-set-drop]
func waitableSetDrop(waitableSet uint32)

Expand Down
18 changes: 9 additions & 9 deletions crates/guest-rust/src/rt/async_support.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,9 +214,9 @@ impl FutureState<'_> {
// processing the future here anyway.
me.cancel_inter_task_stream_read();

let mut context = Context::from_waker(&me.waker_clone);

loop {
let mut context = Context::from_waker(&me.waker_clone);

// On each turn of this loop reset the state to "polling"
// which clears out any pending wakeup if one was sent. This
// in theory helps minimize wakeups from previous iterations
Expand Down Expand Up @@ -255,8 +255,12 @@ impl FutureState<'_> {
assert!(!me.tasks.is_empty());
if me.waker.sleep_state.load(Ordering::Relaxed) == SLEEP_STATE_WOKEN {
if me.remaining_work() {
let waitable = me.waitable_set.as_ref().unwrap().as_raw();
break CallbackCode::Poll(waitable);
let (event0, event1, event2) =
me.waitable_set.as_ref().unwrap().poll();
if event0 != EVENT_NONE {
me.deliver_waitable_event(event1, event2);
continue;
}
}
break CallbackCode::Yield;
}
Expand Down Expand Up @@ -415,7 +419,6 @@ enum CallbackCode {
Exit,
Yield,
Wait(u32),
Poll(u32),
}

impl CallbackCode {
Expand All @@ -424,7 +427,6 @@ impl CallbackCode {
CallbackCode::Exit => 0,
CallbackCode::Yield => 1,
CallbackCode::Wait(waitable) => 2 | (waitable << 4),
CallbackCode::Poll(waitable) => 3 | (waitable << 4),
}
}
}
Expand Down Expand Up @@ -546,9 +548,7 @@ pub fn block_on<T: 'static>(future: impl Future<Output = T>) -> T {
drop(state);
break result.unwrap();
}
CallbackCode::Yield | CallbackCode::Poll(_) => {
event = state.waitable_set.as_ref().unwrap().poll()
}
CallbackCode::Yield => event = state.waitable_set.as_ref().unwrap().poll(),
CallbackCode::Wait(_) => event = state.waitable_set.as_ref().unwrap().wait(),
}
}
Expand Down
4 changes: 2 additions & 2 deletions tests/runtime-async/async/incomplete-writes/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func DroppedReaderTest(f1, f2 *FutureReader[*TestThing]) (*FutureReader[*TestThi
thing := f2.Read()

// Write the thing to the first future, the read end of which
// the calle4 will drop without reading from, forcing us to
// the callee will drop without reading from, forcing us to
// re-take ownership.
assert(!tx1.Write(thing))

Expand All @@ -116,7 +116,7 @@ func DroppedReaderLeaf(f1, f2 *FutureReader[*LeafThing]) (*FutureReader[*LeafThi
thing := f2.Read()

// Write the thing to the first future, the read end of which
// the calle4 will drop without reading from, forcing us to
// the callee will drop without reading from, forcing us to
// re-take ownership.
assert(!tx1.Write(thing))

Expand Down
Loading