From 97ddfbb474a71df64933a87faefe039b3c3320d0 Mon Sep 17 00:00:00 2001 From: Luke Wagner Date: Thu, 22 Jan 2026 15:08:16 -0600 Subject: [PATCH] CABI: put cooperative threads into their own per-component-instance table --- design/mvp/CanonicalABI.md | 241 +++++++++++++----------- design/mvp/Concurrency.md | 6 +- design/mvp/canonical-abi/definitions.py | 87 +++++---- design/mvp/canonical-abi/run_tests.py | 90 ++++----- 4 files changed, 219 insertions(+), 205 deletions(-) diff --git a/design/mvp/CanonicalABI.md b/design/mvp/CanonicalABI.md index 9552859e..133cd3f1 100644 --- a/design/mvp/CanonicalABI.md +++ b/design/mvp/CanonicalABI.md @@ -292,7 +292,8 @@ behavior and enforce invariants. class ComponentInstance: store: Store parent: Optional[ComponentInstance] - table: Table + handles: Table[ResourceHandle | Waitable | WaitableSet | ErrorContext] + threads: Table[Thread] may_leave: bool backpressure: int exclusive: bool @@ -302,7 +303,8 @@ class ComponentInstance: assert(parent is None or parent.store is store) self.store = store self.parent = parent - self.table = Table() + self.handles = Table() + self.threads = Table() self.may_leave = True self.backpressure = 0 self.exclusive = False @@ -436,9 +438,12 @@ The other fields of `ComponentInstance` are described below as they are used. #### Table State The `Table` class encapsulates a mutable, growable array of opaque elements -that are represented in Core WebAssembly as `i32` indices into the array. There -is one `Table` object per `ComponentInstance` object and "the current -component instance's table" refers to this object. +that are represented in Core WebAssembly as `i32` indices into the array. +Currently, every component instance contains two tables: a `threads` table +containing all the component's [threads](#thread-state) and a `handles` +table containing everything else ([resource handles](#resource-state), +[waitables and waitable sets](#waitable-state) and +[error contexts](#-canon-error-contextnew)). ```python class Table: array: list[any] @@ -493,7 +498,7 @@ sentinel values). #### Resource State The `ResourceHandle` class defines the elements of the component instance's -table used to represent handles to resources: +`handles` table used to represent handles to resources: ```python class ResourceHandle: rt: ResourceType @@ -561,6 +566,15 @@ a reference to a `Thread` through all Core WebAssembly calls so that the provides a set of primitive control-flow operations that are used by the rest of the Canonical ABI definitions. +While `Thread`s are semantically created for each component export call by the +Python `canon_lift` code, an optimizing runtime should be able to allocate +`Thread`s lazily, only when needed for actual thread switching operations, +thereby avoiding cross-component call overhead for simple, short-running +cross-component calls. To assist in this optimization, `Thread`s are put into +their own per-component-instance `threads` table so that thread table indices +and elements can be more-readily reused between calls without interference from +the other kinds of handles. + `Thread` is implemented using the Python standard library's [`threading`] module. While a Python [`threading.Thread`] is a preemptively-scheduled [kernel thread], it is coerced to behave like a cooperatively-scheduled [fiber] by @@ -617,11 +631,11 @@ class Thread: ``` The `in_event_loop` field is used by `Task.request_cancellation` to prevent unexpected reentrance of `callback` functions. The `index` field stores the -index of the thread in the component instance's table and is initialized only -once a thread is allowed to start executing (after the backpressure gate). The -`context` field holds the [thread-local storage] accessed by the -`context.{get,set}` built-ins. All the other fields are used directly by -`Thread` methods as shown next. +index of the thread in its containing component instance's `threads` table and +is initialized only once a thread is allowed to start executing (after the +backpressure gate). The `context` field holds the [thread-local storage] +accessed by the `context.{get,set}` built-ins. All the other fields are used +directly by `Thread` methods as shown next. When a `Thread` is created, an internal `threading.Thread` is started and immediately blocked `acquire()`ing `fiber_lock` (which will be `release()`ed by @@ -646,7 +660,7 @@ immediately blocked `acquire()`ing `fiber_lock` (which will be `release()`ed by assert(self.running()) self.task.thread_stop(self) if self.index is not None: - self.task.inst.table.remove(self.index) + self.task.inst.threads.remove(self.index) self.parent_lock.release() self.fiber = threading.Thread(target = fiber_func) self.fiber.start() @@ -660,8 +674,9 @@ is used for delivering cancellation requests sent to the `Task` by the caller when the last `Thread` in a `Task` exits. If a `Thread` was not cancelled while waiting for backpressure, it will be -allocated an `index` in the component instance table and, when the `Thread`'s -root function returns, this `index` is deallocated by the code above. +allocated an `index` in its containing component instance's `threads` table and, +when the `Thread`'s root function returns, this `index` is deallocated by the +code above. Once a `Thread` is created, it will only start `running` when `Thread.resume` is called. Once a thread is `running` it can then be `suspended` again by @@ -1267,8 +1282,8 @@ function signature has `borrow`s in `list`s or `stream`s) and thus can be stored inline in the native stack frame. The `Subtask.drop` method is only called for `Subtask`s that have been added to -the current component instance's table and checks that the callee has been -allowed to resolve and explicitly relinquish any borrowed handles. +the current component instance's `handles` table and checks that the callee has +been allowed to resolve and explicitly relinquish any borrowed handles. ```python def drop(self): trap_if(not self.resolve_delivered()) @@ -1389,7 +1404,7 @@ and lowering and defined below. #### Stream State Values of `stream` type are represented in the Canonical ABI as `i32` indices -into the current component instance's table referring to either the +into the current component instance's `handles` table referring to either the [readable or writable end] of a stream. Reading from the readable end of a stream is achieved by calling `stream.read` and supplying a `WritableBuffer`. Conversely, writing to the writable end of a stream is achieved by calling @@ -1617,9 +1632,9 @@ def none_or_number_type(t): ``` The two ends of a stream are stored as separate elements in the component -instance's table and each end has a separate `CopyState` that reflects what -*that end* is currently doing or has done. This `state` field is factored out -into the `CopyEnd` class that is derived below. The two ends also share some +instance `handles` table and each end has a separate `CopyState` that reflects +what *that end* is currently doing or has done. This `state` field is factored +out into the `CopyEnd` class that is derived below. The two ends also share some state which is referenced by the `shared` field and either points to a `SharedStreamImpl` (for component-created streams) or something host-defined for (host-created streams). @@ -2132,10 +2147,10 @@ def load_string_from_range(cx, ptr, tagged_code_units) -> String: ``` Error context values are lifted directly from the current component instance's -table: +`handles` table: ```python def lift_error_context(cx, i): - errctx = cx.inst.table.get(i) + errctx = cx.inst.handles.get(i) trap_if(not isinstance(errctx, ErrorContext)) return errctx ``` @@ -2207,13 +2222,13 @@ def unpack_flags_from_int(i, labels): ``` `own` handles are lifted by removing the handle from the current component -instance's table so that ownership is *transferred* to the lowering component. -The lifting operation fails if unique ownership of the handle isn't possible, -for example if the index was actually a `borrow` or if the `own` handle is -currently being lent out as borrows. +instance's `handles` table so that ownership is *transferred* to the lowering +component. The lifting operation fails if unique ownership of the handle isn't +possible, for example if the index was actually a `borrow` or if the `own` +handle is currently being lent out as borrows. ```python def lift_own(cx, i, t): - h = cx.inst.table.remove(i) + h = cx.inst.handles.remove(i) trap_if(not isinstance(h, ResourceHandle)) trap_if(h.rt is not t.rt) trap_if(h.num_lends != 0) @@ -2222,18 +2237,18 @@ def lift_own(cx, i, t): ``` The abstract lifted value for handle types is currently just the internal resource representation `i32`, which is kept opaque from the receiving -component (it's stored in the handle table and only accessed indirectly via +component (it's stored in the `handles` table and only accessed indirectly via index). (This assumes that resource representations are immutable. If representations were to become mutable, the address of the mutable cell would be passed as the lifted value instead.) In contrast to `own`, `borrow` handles are lifted by reading the representation from the source handle, leaving the source handle intact in the current -component instance's table: +component instance's `handles` table: ```python def lift_borrow(cx, i, t): assert(isinstance(cx.borrow_scope, Subtask)) - h = cx.inst.table.get(i) + h = cx.inst.handles.get(i) trap_if(not isinstance(h, ResourceHandle)) trap_if(h.rt is not t.rt) cx.borrow_scope.add_lender(h) @@ -2261,7 +2276,7 @@ def lift_future(cx, i, t): def lift_async_value(ReadableEndT, cx, i, t): assert(not contains_borrow(t)) - e = cx.inst.table.remove(i) + e = cx.inst.handles.remove(i) trap_if(not isinstance(e, ReadableEndT)) trap_if(e.shared.t != t) trap_if(e.state != CopyState.IDLE) @@ -2580,10 +2595,10 @@ def store_probably_utf16_to_latin1_or_utf16(cx, src, src_code_units): ``` Error context values are lowered by storing them directly into the current -component instance's table and passing the `i32` index to wasm: +component instance's `handles` table and passing the `i32` index to wasm: ```python def lower_error_context(cx, v): - return cx.inst.table.add(v) + return cx.inst.handles.add(v) ``` Lists and records are stored by recursively storing their elements and @@ -2666,13 +2681,13 @@ def pack_flags_into_int(v, labels): ``` Finally, `own` and `borrow` handles are lowered by initializing new handle -elements in the current component instance's table. The increment of +elements in the current component instance's `handles` table. The increment of `num_borrows` is complemented by a decrement in `canon_resource_drop` and ensures that all borrowed handles are dropped before the end of the task. ```python def lower_own(cx, rep, t): h = ResourceHandle(t.rt, rep, own = True) - return cx.inst.table.add(h) + return cx.inst.handles.add(h) def lower_borrow(cx, rep, t): assert(isinstance(cx.borrow_scope, Task)) @@ -2680,7 +2695,7 @@ def lower_borrow(cx, rep, t): return rep h = ResourceHandle(t.rt, rep, own = False, borrow_scope = cx.borrow_scope) h.borrow_scope.num_borrows += 1 - return cx.inst.table.add(h) + return cx.inst.handles.add(h) ``` The special case in `lower_borrow` is an optimization, recognizing that, when a borrowed handle is passed to the component that implemented the resource @@ -2689,18 +2704,18 @@ type, the only thing the borrowed handle is good for is calling intermediate borrow handle. Lowering a `stream` or `future` is entirely symmetric and simply adds a new -readable end to the current component instance's table, passing the index of -the new element to core wasm: +readable end to the current component instance's `handles` table, passing the +index of the new element to core wasm: ```python def lower_stream(cx, v, t): assert(isinstance(v, ReadableStream)) assert(not contains_borrow(t)) - return cx.inst.table.add(ReadableStreamEnd(v)) + return cx.inst.handles.add(ReadableStreamEnd(v)) def lower_future(cx, v, t): assert(isinstance(v, ReadableFuture)) assert(not contains_borrow(t)) - return cx.inst.table.add(ReadableFutureEnd(v)) + return cx.inst.handles.add(ReadableFutureEnd(v)) ``` @@ -3242,7 +3257,7 @@ def canon_lift(opts, inst, ft, callee, caller, on_start, on_resolve) -> Call: return assert(thread.index is None) - thread.index = inst.table.add(thread) + thread.index = inst.threads.add(thread) cx = LiftLowerContext(opts, inst, task) args = on_start() @@ -3262,10 +3277,10 @@ the arguments are lowered (which means that owned-handle arguments are not transferred). Once the backpressure gate is cleared, the `Thread` is added to the callee's -component instance's table (storing the index for later retrieval by the -`thread.index` built-in) and the arguments are lowered into core wasm values -and memory according to the `canonopt` immediates of `canon lift` (as defined -by `lower_flat_values` above). +component instance's `threads` table (storing the index for later retrieval by +the `thread.index` built-in) and the arguments are lowered into core wasm values +and memory according to the `canonopt` immediates of `canon lift` (as defined by +`lower_flat_values` above). If the `async` `canonopt` is *not* specified, a `lift`ed function then calls the core wasm callee, passing the lowered arguments in core function parameters @@ -3337,7 +3352,7 @@ function (specified as a `funcidx` immediate in `canon lift`) until the event = (EventCode.NONE, 0, 0) case CallbackCode.WAIT: trap_if(not task.may_block()) - wset = inst.table.get(si) + wset = inst.handles.get(si) trap_if(not isinstance(wset, WaitableSet)) event = task.wait_until(lambda: not inst.exclusive, thread, wset, cancellable = True) case _: @@ -3462,10 +3477,10 @@ blocked at runtime). It is however fine to make an `async`-lowered call to an to, e.g., `waitable-set.wait` would block). Each call to `canon_lower` creates a new `Subtask`. However, this `Subtask` is -only added to the current component instance's table (below) if `async` is -specified *and* `callee` blocks. In any case, this `Subtask` is used as the -`LiftLowerContext.borrow_scope` for `borrow` arguments, ensuring that owned -handles are not dropped before `Subtask.deliver_return` is called (below). +only added to the current component instance's `handles` table (below) if +`async` is specified *and* `callee` blocks. In any case, this `Subtask` is used +as the `LiftLowerContext.borrow_scope` for `borrow` arguments, ensuring that +owned handles are not dropped before `Subtask.deliver_return` is called (below). ```python subtask = Subtask() cx = LiftLowerContext(opts, thread.task.inst, subtask) @@ -3551,12 +3566,13 @@ from dropping lent handles while the synchronous call is blocked. In the `async` case, if the `callee` already called `on_resolve`, then the `RETURNED` code is eagerly returned to the core wasm caller without needing to -add a `Subtask` to the component instance's table. Otherwise, the index of a -new `Subtask` is returned, bit-packed with the current state of the `Subtask` -(which will either be `STARTING` or `STARTED`). `STARTING` tells the caller -that they need to keep the memory for both the arguments and results allocated; -`STARTED` tells the caller that the arguments have been ready and thus any -argument memory can be reused, but the result buffer has to be kept reserved. +add a `Subtask` to the current component instance's `handles` table. Otherwise, +the index of a new `Subtask` is returned, bit-packed with the current state of +the `Subtask` (which will either be `STARTING` or `STARTED`). `STARTING` tells +the caller that they need to keep the memory for both the arguments and results +allocated; `STARTED` tells the caller that the arguments have been ready and +thus any argument memory can be reused, but the result buffer has to be kept +reserved. ```python else: if subtask.resolved(): @@ -3564,7 +3580,7 @@ argument memory can be reused, but the result buffer has to be kept reserved. subtask.deliver_resolve() return [Subtask.State.RETURNED] else: - subtaski = thread.task.inst.table.add(subtask) + subtaski = thread.task.inst.handles.add(subtask) def on_progress(): def subtask_event(): if subtask.resolved(): @@ -3600,12 +3616,12 @@ validation specifies: Calling `$f` invokes the following function, which adds an owning handle containing the given resource representation to the current component -instance's table: +instance's `handles` table: ```python def canon_resource_new(rt, thread, rep): trap_if(not thread.task.inst.may_leave) h = ResourceHandle(rt, rep, own = True) - i = thread.task.inst.table.add(h) + i = thread.task.inst.handles.add(h) return [i] ``` @@ -3621,13 +3637,13 @@ validation specifies: * `$f` is given type `(func (param i32))` Calling `$f` invokes the following function, which removes the handle from the -current component instance's table and, if the handle was owning, calls the -resource's destructor. +current component instance's `handles` table and, if the handle was owning, +calls the resource's destructor. ```python def canon_resource_drop(rt, thread, i): trap_if(not thread.task.inst.may_leave) inst = thread.task.inst - h = inst.table.remove(i) + h = inst.handles.remove(i) trap_if(not isinstance(h, ResourceHandle)) trap_if(h.rt is not rt) trap_if(h.num_lends != 0) @@ -3678,10 +3694,11 @@ validation specifies: currently fixed to be `i32`. Calling `$f` invokes the following function, which extracts the resource -representation from the handle in the current component instance's table: +representation from the handle in the current component instance's `handles` +table: ```python def canon_resource_rep(rt, thread, i): - h = thread.task.inst.table.get(i) + h = thread.task.inst.handles.get(i) trap_if(not isinstance(h, ResourceHandle)) trap_if(h.rt is not rt) return [h.rep] @@ -3875,11 +3892,11 @@ validation specifies: * `$f` is given type `(func (result i32))` Calling `$f` invokes the following function, which adds an empty waitable set -to the current component instance's table: +to the current component instance's `handles` table: ```python def canon_waitable_set_new(thread): trap_if(not thread.task.inst.may_leave) - return [ thread.task.inst.table.add(WaitableSet()) ] + return [ thread.task.inst.handles.add(WaitableSet()) ] ``` @@ -3900,7 +3917,7 @@ returning its `EventCode` and writing the payload values into linear memory: def canon_waitable_set_wait(cancellable, mem, thread, si, ptr): trap_if(not thread.task.inst.may_leave) trap_if(not thread.task.may_block()) - wset = thread.task.inst.table.get(si) + wset = thread.task.inst.handles.get(si) trap_if(not isinstance(wset, WaitableSet)) event = thread.task.wait_until(lambda: True, thread, wset, cancellable) return unpack_event(mem, thread, ptr, event) @@ -3943,7 +3960,7 @@ was pending on one of the waitables in the given waitable set (the same way as ```python def canon_waitable_set_poll(cancellable, mem, thread, si, ptr): trap_if(not thread.task.inst.may_leave) - wset = thread.task.inst.table.get(si) + wset = thread.task.inst.handles.get(si) trap_if(not isinstance(wset, WaitableSet)) if thread.task.deliver_pending_cancel(cancellable): event = (EventCode.TASK_CANCELLED, 0, 0) @@ -3971,12 +3988,12 @@ validation specifies: * `$f` is given type `(func (param i32))` Calling `$f` invokes the following function, which removes the indicated -waitable set from the current component instance's table, performing the guards -defined by `WaitableSet.drop` above: +waitable set from the current component instance's `handles` table, performing +the guards defined by `WaitableSet.drop` above: ```python def canon_waitable_set_drop(thread, i): trap_if(not thread.task.inst.may_leave) - wset = thread.task.inst.table.remove(i) + wset = thread.task.inst.handles.remove(i) trap_if(not isinstance(wset, WaitableSet)) wset.drop() return [] @@ -4001,12 +4018,12 @@ waitable from any waitable set that it is currently a member of. ```python def canon_waitable_join(thread, wi, si): trap_if(not thread.task.inst.may_leave) - w = thread.task.inst.table.get(wi) + w = thread.task.inst.handles.get(wi) trap_if(not isinstance(w, Waitable)) if si == 0: w.join(None) else: - wset = thread.task.inst.table.get(si) + wset = thread.task.inst.handles.get(si) trap_if(not isinstance(wset, WaitableSet)) w.join(wset) return [] @@ -4051,7 +4068,7 @@ BLOCKED = 0xffff_ffff def canon_subtask_cancel(async_, thread, i): trap_if(not thread.task.inst.may_leave) trap_if(not thread.task.may_block() and not async_) - subtask = thread.task.inst.table.get(i) + subtask = thread.task.inst.handles.get(i) trap_if(not isinstance(subtask, Subtask)) trap_if(subtask.resolve_delivered()) trap_if(subtask.cancellation_requested) @@ -4095,12 +4112,12 @@ validation specifies: * `$f` is given type `(func (param i32))` Calling `$f` removes the subtask at the given index from the current component -instance's table, performing the guards and bookkeeping defined by +instance's `handles` table, performing the guards and bookkeeping defined by `Subtask.drop()`. ```python def canon_subtask_drop(thread, i): trap_if(not thread.task.inst.may_leave) - s = thread.task.inst.table.remove(i) + s = thread.task.inst.handles.remove(i) trap_if(not isinstance(s, Subtask)) s.drop() return [] @@ -4119,25 +4136,26 @@ validation specifies: * `$stream_t`/`$future_t` must be a type of the form `(stream $t?)`/`(future $t?)` Calling `$f` calls `canon_{stream,future}_new` which adds two elements to the -current component instance's table and returns their indices packed into a -single `i64`. The first element (in the low 32 bits) is the readable end (of -the new {stream, future}) and the second element (in the high 32 bits) is the -writable end. The expectation is that, after calling `{stream,future}.new`, the -readable end is subsequently transferred to another component (or the host) via -`stream` or `future` parameter/result type (see `lift_{stream,future}` above). +current component instance's `handles` table and returns their indices packed +into a single `i64`. The first element (in the low 32 bits) is the readable end +(of the new {stream, future}) and the second element (in the high 32 bits) is +the writable end. The expectation is that, after calling `{stream,future}.new`, +the readable end is subsequently transferred to another component (or the host) +via `stream` or `future` parameter/result type (see `lift_{stream,future}` +above). ```python def canon_stream_new(stream_t, thread): trap_if(not thread.task.inst.may_leave) shared = SharedStreamImpl(stream_t.t) - ri = thread.task.inst.table.add(ReadableStreamEnd(shared)) - wi = thread.task.inst.table.add(WritableStreamEnd(shared)) + ri = thread.task.inst.handles.add(ReadableStreamEnd(shared)) + wi = thread.task.inst.handles.add(WritableStreamEnd(shared)) return [ ri | (wi << 32) ] def canon_future_new(future_t, thread): trap_if(not thread.task.inst.may_leave) shared = SharedFutureImpl(future_t.t) - ri = thread.task.inst.table.add(ReadableFutureEnd(shared)) - wi = thread.task.inst.table.add(WritableFutureEnd(shared)) + ri = thread.task.inst.handles.add(ReadableFutureEnd(shared)) + wi = thread.task.inst.handles.add(WritableFutureEnd(shared)) return [ ri | (wi << 32) ] ``` @@ -4184,7 +4202,7 @@ Next, `stream_copy` checks that the element at index `i` is of the right type and allowed to start a new copy. (In the future, the "trap if not `IDLE`" condition could be relaxed to allow multiple pipelined reads or writes.) ```python - e = thread.task.inst.table.get(i) + e = thread.task.inst.handles.get(i) trap_if(not isinstance(e, EndT)) trap_if(e.shared.t != stream_t.t) trap_if(e.state != CopyState.IDLE) @@ -4291,7 +4309,7 @@ def future_copy(EndT, BufferT, event_code, future_t, opts, thread, i, ptr): trap_if(not thread.task.inst.may_leave) trap_if(not thread.task.may_block() and not opts.async_) - e = thread.task.inst.table.get(i) + e = thread.task.inst.handles.get(i) trap_if(not isinstance(e, EndT)) trap_if(e.shared.t != future_t.t) trap_if(e.state != CopyState.IDLE) @@ -4375,7 +4393,7 @@ def canon_future_cancel_write(future_t, async_, thread, i): def cancel_copy(EndT, event_code, stream_or_future_t, async_, thread, i): trap_if(not thread.task.inst.may_leave) trap_if(not thread.task.may_block() and not async_) - e = thread.task.inst.table.get(i) + e = thread.task.inst.handles.get(i) trap_if(not isinstance(e, EndT)) trap_if(e.shared.t != stream_or_future_t.t) trap_if(e.state != CopyState.ASYNC_COPYING) @@ -4435,9 +4453,9 @@ validation specifies: * `$stream_t`/`$future_t` must be a type of the form `(stream $t?)`/`(future $t?)` Calling `$f` removes the readable or writable end of the stream or future at -the given index from the current component instance's table, performing the -guards and bookkeeping defined by `{Readable,Writable}{Stream,Future}End.drop()` -above. +the given index from the current component instance's `handles` table, +performing the guards and bookkeeping defined by +`{Readable,Writable}{Stream,Future}End.drop()` above. ```python def canon_stream_drop_readable(stream_t, thread, i): return drop(ReadableStreamEnd, stream_t, thread, i) @@ -4453,7 +4471,7 @@ def canon_future_drop_writable(future_t, thread, hi): def drop(EndT, stream_or_future_t, thread, hi): trap_if(not thread.task.inst.may_leave) - e = thread.task.inst.table.remove(hi) + e = thread.task.inst.handles.remove(hi) trap_if(not isinstance(e, EndT)) trap_if(e.shared.t != stream_or_future_t.t) e.drop() @@ -4493,7 +4511,7 @@ validation specifies Calling `$new_indirect` invokes the following function which reads a `funcref` from `$ftbl` (trapping if out-of-bounds, null or the wrong type), calls the `funcref` passing the closure parameter `$c`, and returns the index of the new -thread in the current component instance's table. +thread in the current component instance's `threads` table. ```python @dataclass class CoreFuncRef: @@ -4509,7 +4527,7 @@ def canon_thread_new_indirect(ft, ftbl: Table[CoreFuncRef], thread, fi, c): [] = call_and_trap_on_throw(f.callee, thread, [c]) new_thread = Thread(thread.task, thread_func) assert(new_thread.suspended()) - new_thread.index = thread.task.inst.table.add(new_thread) + new_thread.index = thread.task.inst.threads.add(new_thread) return [new_thread.index] ``` The newly-created thread starts out in a "suspended" state and so, to @@ -4527,14 +4545,13 @@ validation specifies: * `$switch-to` is given type `(func (param $i i32) (result i32))` Calling `$switch-to` invokes the following function which loads a thread at -index `$i` from the current component instance's table, traps if it's not -[suspended], and then switches to that thread, leaving the [current thread] +index `$i` from the current component instance's `threads` table, traps if it's +not [suspended], and then switches to that thread, leaving the [current thread] suspended. ```python def canon_thread_switch_to(cancellable, thread, i): trap_if(not thread.task.inst.may_leave) - other_thread = thread.task.inst.table.get(i) - trap_if(not isinstance(other_thread, Thread)) + other_thread = thread.task.inst.threads.get(i) trap_if(not other_thread.suspended()) suspend_result = thread.task.switch_to(thread, cancellable, other_thread) return [suspend_result] @@ -4587,14 +4604,13 @@ validation specifies: * `$resume-later` is given type `(func (param $i i32))` Calling `$resume-later` invokes the following function which loads a thread at -index `$i` from the current component instance's table, traps if it's not -[suspended], and then marks that thread as ready to run at some +index `$i` from the current component instance's `threads` table, traps if it's +not [suspended], and then marks that thread as ready to run at some nondeterministic point in the future chosen by the embedder. ```python def canon_thread_resume_later(thread, i): trap_if(not thread.task.inst.may_leave) - other_thread = thread.task.inst.table.get(i) - trap_if(not isinstance(other_thread, Thread)) + other_thread = thread.task.inst.threads.get(i) trap_if(not other_thread.suspended()) other_thread.resume_later() return [] @@ -4614,15 +4630,14 @@ validation specifies: * ๐ŸšŸ - `cancellable` is allowed (otherwise it must be absent) Calling `$yield-to` invokes the following function which loads a thread at -index `$i` from the current component instance's table, traps if it's not -[suspended], and then switches to that thread, leaving the [current thread] +index `$i` from the current component instance's `threads` table, traps if it's +not [suspended], and then switches to that thread, leaving the [current thread] ready to run at some nondeterministic point in the future chosen by the embedder. ```python def canon_thread_yield_to(cancellable, thread, i): trap_if(not thread.task.inst.may_leave) - other_thread = thread.task.inst.table.get(i) - trap_if(not isinstance(other_thread, Thread)) + other_thread = thread.task.inst.threads.get(i) trap_if(not other_thread.suspended()) suspend_result = thread.task.yield_to(thread, cancellable, other_thread) return [suspend_result] @@ -4693,8 +4708,8 @@ validation specifies: Calling `$f` calls the following function which uses the `$opts` immediate to (non-deterministically) lift the debug message, create a new `ErrorContext` -value, store it in the current component instance's table and returns its -index. +value, store it in the current component instance's `handles` table and returns +its index. ```python @dataclass class ErrorContext: @@ -4708,7 +4723,7 @@ def canon_error_context_new(opts, thread, ptr, tagged_code_units): cx = LiftLowerContext(opts, thread.task.inst) s = load_string_from_range(cx, ptr, tagged_code_units) s = host_defined_transformation(s) - i = thread.task.inst.table.add(ErrorContext(s)) + i = thread.task.inst.handles.add(ErrorContext(s)) return [i] ``` Supporting the requirement (introduced in the @@ -4741,7 +4756,7 @@ single `error-context` value must return the same debug message from ```python def canon_error_context_debug_message(opts, thread, i, ptr): trap_if(not thread.task.inst.may_leave) - errctx = thread.task.inst.table.get(i) + errctx = thread.task.inst.handles.get(i) trap_if(not isinstance(errctx, ErrorContext)) cx = LiftLowerContext(opts, thread.task.inst) store_string(cx, errctx.debug_message, ptr) @@ -4761,11 +4776,11 @@ validation specifies: * `$f` is given type `(func (param i32))` Calling `$f` calls the following function, which drops the error context value -at the given index from the current component instance's table: +at the given index from the current component instance's `handles` table: ```python def canon_error_context_drop(thread, i): trap_if(not thread.task.inst.may_leave) - errctx = thread.task.inst.table.remove(i) + errctx = thread.task.inst.handles.remove(i) trap_if(not isinstance(errctx, ErrorContext)) return [] ``` diff --git a/design/mvp/Concurrency.md b/design/mvp/Concurrency.md index dd13f2e7..0ec5972b 100644 --- a/design/mvp/Concurrency.md +++ b/design/mvp/Concurrency.md @@ -369,8 +369,8 @@ creating and running threads. New threads are created with the [`thread.new-indirect`] built-in. As mentioned [above](#threads-and-tasks), a spawned thread inherits the task of the spawning thread which is why threads and tasks are N:1. `thread.new-indirect` adds a new -thread to the component instance's table and returns the `i32` index of this -table entry to the Core WebAssembly caller. Like [`pthread_create`], +thread to the component instance's threads table and returns the `i32` index of +this table entry to the Core WebAssembly caller. Like [`pthread_create`], `thread.new-indirect` takes a Core WebAssembly function (via `i32` index into a `funcref` table) and a "closure" parameter to pass to the function when called on the new thread. However, unlike `pthread_create`, the new thread is @@ -378,7 +378,7 @@ initially in a "suspended" state and must be explicitly "resumed" using one of the following 3 thread built-ins. Once the thread is resumed, the thread can learn its own index by calling the [`thread.index`] built-in. -A suspended thread (identified by table index) can be resumed at some +A suspended thread (identified by thread-table index) can be resumed at some non-deterministic point in future via the [`thread.resume-later`] built-in. In contrast, the [`thread.yield-to`] built-in switches execution to the given thread immediately, leaving the *calling* thread to be resumed at some diff --git a/design/mvp/canonical-abi/definitions.py b/design/mvp/canonical-abi/definitions.py index f7718729..4a36c572 100644 --- a/design/mvp/canonical-abi/definitions.py +++ b/design/mvp/canonical-abi/definitions.py @@ -256,7 +256,8 @@ class CanonicalOptions(LiftLowerOptions): class ComponentInstance: store: Store parent: Optional[ComponentInstance] - table: Table + handles: Table[ResourceHandle | Waitable | WaitableSet | ErrorContext] + threads: Table[Thread] may_leave: bool backpressure: int exclusive: bool @@ -266,7 +267,8 @@ def __init__(self, store, parent = None): assert(parent is None or parent.store is store) self.store = store self.parent = parent - self.table = Table() + self.handles = Table() + self.threads = Table() self.may_leave = True self.backpressure = 0 self.exclusive = False @@ -412,7 +414,7 @@ def fiber_func(): assert(self.running()) self.task.thread_stop(self) if self.index is not None: - self.task.inst.table.remove(self.index) + self.task.inst.threads.remove(self.index) self.parent_lock.release() self.fiber = threading.Thread(target = fiber_func) self.fiber.start() @@ -1278,7 +1280,7 @@ def load_string_from_range(cx, ptr, tagged_code_units) -> String: return (s, cx.opts.string_encoding, tagged_code_units) def lift_error_context(cx, i): - errctx = cx.inst.table.get(i) + errctx = cx.inst.handles.get(i) trap_if(not isinstance(errctx, ErrorContext)) return errctx @@ -1331,7 +1333,7 @@ def unpack_flags_from_int(i, labels): return record def lift_own(cx, i, t): - h = cx.inst.table.remove(i) + h = cx.inst.handles.remove(i) trap_if(not isinstance(h, ResourceHandle)) trap_if(h.rt is not t.rt) trap_if(h.num_lends != 0) @@ -1340,7 +1342,7 @@ def lift_own(cx, i, t): def lift_borrow(cx, i, t): assert(isinstance(cx.borrow_scope, Subtask)) - h = cx.inst.table.get(i) + h = cx.inst.handles.get(i) trap_if(not isinstance(h, ResourceHandle)) trap_if(h.rt is not t.rt) cx.borrow_scope.add_lender(h) @@ -1354,7 +1356,7 @@ def lift_future(cx, i, t): def lift_async_value(ReadableEndT, cx, i, t): assert(not contains_borrow(t)) - e = cx.inst.table.remove(i) + e = cx.inst.handles.remove(i) trap_if(not isinstance(e, ReadableEndT)) trap_if(e.shared.t != t) trap_if(e.state != CopyState.IDLE) @@ -1580,7 +1582,7 @@ def store_probably_utf16_to_latin1_or_utf16(cx, src, src_code_units): return (ptr, latin1_size) def lower_error_context(cx, v): - return cx.inst.table.add(v) + return cx.inst.handles.add(v) def store_list(cx, v, ptr, elem_type, maybe_length): if maybe_length is not None: @@ -1640,7 +1642,7 @@ def pack_flags_into_int(v, labels): def lower_own(cx, rep, t): h = ResourceHandle(t.rt, rep, own = True) - return cx.inst.table.add(h) + return cx.inst.handles.add(h) def lower_borrow(cx, rep, t): assert(isinstance(cx.borrow_scope, Task)) @@ -1648,17 +1650,17 @@ def lower_borrow(cx, rep, t): return rep h = ResourceHandle(t.rt, rep, own = False, borrow_scope = cx.borrow_scope) h.borrow_scope.num_borrows += 1 - return cx.inst.table.add(h) + return cx.inst.handles.add(h) def lower_stream(cx, v, t): assert(isinstance(v, ReadableStream)) assert(not contains_borrow(t)) - return cx.inst.table.add(ReadableStreamEnd(v)) + return cx.inst.handles.add(ReadableStreamEnd(v)) def lower_future(cx, v, t): assert(isinstance(v, ReadableFuture)) assert(not contains_borrow(t)) - return cx.inst.table.add(ReadableFutureEnd(v)) + return cx.inst.handles.add(ReadableFutureEnd(v)) ### Flattening @@ -1983,7 +1985,7 @@ def thread_func(thread): return assert(thread.index is None) - thread.index = inst.table.add(thread) + thread.index = inst.threads.add(thread) cx = LiftLowerContext(opts, inst, task) args = on_start() @@ -2022,7 +2024,7 @@ def thread_func(thread): event = (EventCode.NONE, 0, 0) case CallbackCode.WAIT: trap_if(not task.may_block()) - wset = inst.table.get(si) + wset = inst.handles.get(si) trap_if(not isinstance(wset, WaitableSet)) event = task.wait_until(lambda: not inst.exclusive, thread, wset, cancellable = True) case _: @@ -2118,7 +2120,7 @@ def on_resolve(result): subtask.deliver_resolve() return [Subtask.State.RETURNED] else: - subtaski = thread.task.inst.table.add(subtask) + subtaski = thread.task.inst.handles.add(subtask) def on_progress(): def subtask_event(): if subtask.resolved(): @@ -2134,7 +2136,7 @@ def subtask_event(): def canon_resource_new(rt, thread, rep): trap_if(not thread.task.inst.may_leave) h = ResourceHandle(rt, rep, own = True) - i = thread.task.inst.table.add(h) + i = thread.task.inst.handles.add(h) return [i] ### `canon resource.drop` @@ -2142,7 +2144,7 @@ def canon_resource_new(rt, thread, rep): def canon_resource_drop(rt, thread, i): trap_if(not thread.task.inst.may_leave) inst = thread.task.inst - h = inst.table.remove(i) + h = inst.handles.remove(i) trap_if(not isinstance(h, ResourceHandle)) trap_if(h.rt is not rt) trap_if(h.num_lends != 0) @@ -2167,7 +2169,7 @@ def canon_resource_drop(rt, thread, i): ### `canon resource.rep` def canon_resource_rep(rt, thread, i): - h = thread.task.inst.table.get(i) + h = thread.task.inst.handles.get(i) trap_if(not isinstance(h, ResourceHandle)) trap_if(h.rt is not rt) return [h.rep] @@ -2234,14 +2236,14 @@ def canon_task_cancel(thread): def canon_waitable_set_new(thread): trap_if(not thread.task.inst.may_leave) - return [ thread.task.inst.table.add(WaitableSet()) ] + return [ thread.task.inst.handles.add(WaitableSet()) ] ### ๐Ÿ”€ `canon waitable-set.wait` def canon_waitable_set_wait(cancellable, mem, thread, si, ptr): trap_if(not thread.task.inst.may_leave) trap_if(not thread.task.may_block()) - wset = thread.task.inst.table.get(si) + wset = thread.task.inst.handles.get(si) trap_if(not isinstance(wset, WaitableSet)) event = thread.task.wait_until(lambda: True, thread, wset, cancellable) return unpack_event(mem, thread, ptr, event) @@ -2257,7 +2259,7 @@ def unpack_event(mem, thread, ptr, e: EventTuple): def canon_waitable_set_poll(cancellable, mem, thread, si, ptr): trap_if(not thread.task.inst.may_leave) - wset = thread.task.inst.table.get(si) + wset = thread.task.inst.handles.get(si) trap_if(not isinstance(wset, WaitableSet)) if thread.task.deliver_pending_cancel(cancellable): event = (EventCode.TASK_CANCELLED, 0, 0) @@ -2271,7 +2273,7 @@ def canon_waitable_set_poll(cancellable, mem, thread, si, ptr): def canon_waitable_set_drop(thread, i): trap_if(not thread.task.inst.may_leave) - wset = thread.task.inst.table.remove(i) + wset = thread.task.inst.handles.remove(i) trap_if(not isinstance(wset, WaitableSet)) wset.drop() return [] @@ -2280,12 +2282,12 @@ def canon_waitable_set_drop(thread, i): def canon_waitable_join(thread, wi, si): trap_if(not thread.task.inst.may_leave) - w = thread.task.inst.table.get(wi) + w = thread.task.inst.handles.get(wi) trap_if(not isinstance(w, Waitable)) if si == 0: w.join(None) else: - wset = thread.task.inst.table.get(si) + wset = thread.task.inst.handles.get(si) trap_if(not isinstance(wset, WaitableSet)) w.join(wset) return [] @@ -2297,7 +2299,7 @@ def canon_waitable_join(thread, wi, si): def canon_subtask_cancel(async_, thread, i): trap_if(not thread.task.inst.may_leave) trap_if(not thread.task.may_block() and not async_) - subtask = thread.task.inst.table.get(i) + subtask = thread.task.inst.handles.get(i) trap_if(not isinstance(subtask, Subtask)) trap_if(subtask.resolve_delivered()) trap_if(subtask.cancellation_requested) @@ -2320,7 +2322,7 @@ def canon_subtask_cancel(async_, thread, i): def canon_subtask_drop(thread, i): trap_if(not thread.task.inst.may_leave) - s = thread.task.inst.table.remove(i) + s = thread.task.inst.handles.remove(i) trap_if(not isinstance(s, Subtask)) s.drop() return [] @@ -2330,15 +2332,15 @@ def canon_subtask_drop(thread, i): def canon_stream_new(stream_t, thread): trap_if(not thread.task.inst.may_leave) shared = SharedStreamImpl(stream_t.t) - ri = thread.task.inst.table.add(ReadableStreamEnd(shared)) - wi = thread.task.inst.table.add(WritableStreamEnd(shared)) + ri = thread.task.inst.handles.add(ReadableStreamEnd(shared)) + wi = thread.task.inst.handles.add(WritableStreamEnd(shared)) return [ ri | (wi << 32) ] def canon_future_new(future_t, thread): trap_if(not thread.task.inst.may_leave) shared = SharedFutureImpl(future_t.t) - ri = thread.task.inst.table.add(ReadableFutureEnd(shared)) - wi = thread.task.inst.table.add(WritableFutureEnd(shared)) + ri = thread.task.inst.handles.add(ReadableFutureEnd(shared)) + wi = thread.task.inst.handles.add(WritableFutureEnd(shared)) return [ ri | (wi << 32) ] ### ๐Ÿ”€ `canon stream.{read,write}` @@ -2355,7 +2357,7 @@ def stream_copy(EndT, BufferT, event_code, stream_t, opts, thread, i, ptr, n): trap_if(not thread.task.inst.may_leave) trap_if(not thread.task.may_block() and not opts.async_) - e = thread.task.inst.table.get(i) + e = thread.task.inst.handles.get(i) trap_if(not isinstance(e, EndT)) trap_if(e.shared.t != stream_t.t) trap_if(e.state != CopyState.IDLE) @@ -2408,7 +2410,7 @@ def future_copy(EndT, BufferT, event_code, future_t, opts, thread, i, ptr): trap_if(not thread.task.inst.may_leave) trap_if(not thread.task.may_block() and not opts.async_) - e = thread.task.inst.table.get(i) + e = thread.task.inst.handles.get(i) trap_if(not isinstance(e, EndT)) trap_if(e.shared.t != future_t.t) trap_if(e.state != CopyState.IDLE) @@ -2459,7 +2461,7 @@ def canon_future_cancel_write(future_t, async_, thread, i): def cancel_copy(EndT, event_code, stream_or_future_t, async_, thread, i): trap_if(not thread.task.inst.may_leave) trap_if(not thread.task.may_block() and not async_) - e = thread.task.inst.table.get(i) + e = thread.task.inst.handles.get(i) trap_if(not isinstance(e, EndT)) trap_if(e.shared.t != stream_or_future_t.t) trap_if(e.state != CopyState.ASYNC_COPYING) @@ -2491,7 +2493,7 @@ def canon_future_drop_writable(future_t, thread, hi): def drop(EndT, stream_or_future_t, thread, hi): trap_if(not thread.task.inst.may_leave) - e = thread.task.inst.table.remove(hi) + e = thread.task.inst.handles.remove(hi) trap_if(not isinstance(e, EndT)) trap_if(e.shared.t != stream_or_future_t.t) e.drop() @@ -2519,15 +2521,14 @@ def thread_func(thread): [] = call_and_trap_on_throw(f.callee, thread, [c]) new_thread = Thread(thread.task, thread_func) assert(new_thread.suspended()) - new_thread.index = thread.task.inst.table.add(new_thread) + new_thread.index = thread.task.inst.threads.add(new_thread) return [new_thread.index] ### ๐Ÿงต `canon thread.switch-to` def canon_thread_switch_to(cancellable, thread, i): trap_if(not thread.task.inst.may_leave) - other_thread = thread.task.inst.table.get(i) - trap_if(not isinstance(other_thread, Thread)) + other_thread = thread.task.inst.threads.get(i) trap_if(not other_thread.suspended()) suspend_result = thread.task.switch_to(thread, cancellable, other_thread) return [suspend_result] @@ -2544,8 +2545,7 @@ def canon_thread_suspend(cancellable, thread): def canon_thread_resume_later(thread, i): trap_if(not thread.task.inst.may_leave) - other_thread = thread.task.inst.table.get(i) - trap_if(not isinstance(other_thread, Thread)) + other_thread = thread.task.inst.threads.get(i) trap_if(not other_thread.suspended()) other_thread.resume_later() return [] @@ -2554,8 +2554,7 @@ def canon_thread_resume_later(thread, i): def canon_thread_yield_to(cancellable, thread, i): trap_if(not thread.task.inst.may_leave) - other_thread = thread.task.inst.table.get(i) - trap_if(not isinstance(other_thread, Thread)) + other_thread = thread.task.inst.threads.get(i) trap_if(not other_thread.suspended()) suspend_result = thread.task.yield_to(thread, cancellable, other_thread) return [suspend_result] @@ -2587,14 +2586,14 @@ def canon_error_context_new(opts, thread, ptr, tagged_code_units): cx = LiftLowerContext(opts, thread.task.inst) s = load_string_from_range(cx, ptr, tagged_code_units) s = host_defined_transformation(s) - i = thread.task.inst.table.add(ErrorContext(s)) + i = thread.task.inst.handles.add(ErrorContext(s)) return [i] ### ๐Ÿ“ `canon error-context.debug-message` def canon_error_context_debug_message(opts, thread, i, ptr): trap_if(not thread.task.inst.may_leave) - errctx = thread.task.inst.table.get(i) + errctx = thread.task.inst.handles.get(i) trap_if(not isinstance(errctx, ErrorContext)) cx = LiftLowerContext(opts, thread.task.inst) store_string(cx, errctx.debug_message, ptr) @@ -2604,6 +2603,6 @@ def canon_error_context_debug_message(opts, thread, i, ptr): def canon_error_context_drop(thread, i): trap_if(not thread.task.inst.may_leave) - errctx = thread.task.inst.table.remove(i) + errctx = thread.task.inst.handles.remove(i) trap_if(not isinstance(errctx, ErrorContext)) return [] diff --git a/design/mvp/canonical-abi/run_tests.py b/design/mvp/canonical-abi/run_tests.py index 77f40374..cd7ee74a 100644 --- a/design/mvp/canonical-abi/run_tests.py +++ b/design/mvp/canonical-abi/run_tests.py @@ -467,11 +467,11 @@ def core_wasm(thread, args): nonlocal dtor_value assert(len(args) == 4) - assert(len(inst.table.array) == 5) - assert(inst.table.array[0] is None) - assert(args[0] == 2) - assert(args[1] == 3) - assert(args[2] == 4) + assert(len(inst.handles.array) == 4) + assert(inst.handles.array[0] is None) + assert(args[0] == 1) + assert(args[1] == 2) + assert(args[2] == 3) assert(args[3] == 13) h1 = args[0] h2 = args[1] @@ -492,29 +492,29 @@ def core_wasm(thread, args): ] results = canon_lower(opts, host_ft, host_import, thread, args) assert(len(results) == 1) - assert(results[0] == 5) + assert(results[0] == 4) h4 = results[0] assert((canon_resource_rep(rt, thread, h4))[0] == 45) dtor_value = None [] = canon_resource_drop(rt, thread, h1) assert(dtor_value == 42) - assert(len(inst.table.array) == 6) - assert(inst.table.array[h1] is None) - assert(len(inst.table.free) == 1) + assert(len(inst.handles.array) == 5) + assert(inst.handles.array[h1] is None) + assert(len(inst.handles.free) == 1) h = (canon_resource_new(rt, thread, 46))[0] assert(h == h1) - assert(len(inst.table.array) == 6) - assert(inst.table.array[h] is not None) - assert(len(inst.table.free) == 0) + assert(len(inst.handles.array) == 5) + assert(inst.handles.array[h] is not None) + assert(len(inst.handles.free) == 0) dtor_value = None [] = canon_resource_drop(rt, thread, h3) assert(dtor_value is None) - assert(len(inst.table.array) == 6) - assert(inst.table.array[h3] is None) - assert(len(inst.table.free) == 1) + assert(len(inst.handles.array) == 5) + assert(inst.handles.array[h3] is None) + assert(len(inst.handles.free) == 1) return [h, h2, h4] @@ -543,9 +543,9 @@ def on_resolve(results): assert(got[0] == 46) assert(got[1] == 43) assert(got[2] == 45) - assert(len(inst.table.array) == 6) - assert(all(inst.table.array[i] is None for i in range(4))) - assert(len(inst.table.free) == 5) + assert(len(inst.handles.array) == 5) + assert(all(inst.handles.array[i] is None for i in range(4))) + assert(len(inst.handles.free) == 4) definitions.MAX_FLAT_RESULTS = before @@ -603,14 +603,14 @@ def consumer(thread, args): assert(u8 == 43) [ret] = canon_lower(consumer_opts, toggle_ft, toggle_callee, thread, []) state,subi1 = unpack_result(ret) - assert(subi1 == 3) + assert(subi1 == 2) assert(state == Subtask.State.STARTED) [] = canon_waitable_join(thread, subi1, seti) retp = ptr consumer_heap.memory[retp] = 13 [ret] = canon_lower(consumer_opts, blocking_ft, blocking_callee, thread, [83, retp]) state,subi2 = unpack_result(ret) - assert(subi2 == 4) + assert(subi2 == 3) assert(state == Subtask.State.STARTING) assert(consumer_heap.memory[retp] == 13) [] = canon_waitable_join(thread, subi2, seti) @@ -685,17 +685,17 @@ def consumer(thread, args): [ret] = canon_lower(opts, producer_ft, producer1, thread, []) state,subi1 = unpack_result(ret) - assert(subi1 == 2) + assert(subi1 == 1) assert(state == Subtask.State.STARTED) [ret] = canon_lower(opts, producer_ft, producer2, thread, []) state,subi2 = unpack_result(ret) - assert(subi2 == 3) + assert(subi2 == 2) assert(state == Subtask.State.STARTED) nonlocal seti [seti] = canon_waitable_set_new(thread) - assert(seti == 4) + assert(seti == 3) [] = canon_waitable_join(thread, subi1, seti) [] = canon_waitable_join(thread, subi2, seti) @@ -709,7 +709,7 @@ def callback(thread, args): match ctx: case 42: assert(args[0] == EventCode.SUBTASK) - assert(args[1] == 2) + assert(args[1] == 1) assert(args[2] == Subtask.State.RETURNED) subi = args[1] canon_subtask_drop(thread, subi) @@ -724,7 +724,7 @@ def callback(thread, args): return [definitions.CallbackCode.WAIT | (seti << 4)] case 62: assert(args[0] == EventCode.SUBTASK) - assert(args[1] == 3) + assert(args[1] == 2) assert(args[2] == Subtask.State.RETURNED) subi = args[1] canon_subtask_drop(thread, subi) @@ -1013,12 +1013,12 @@ def consumer(thread, args): [ret] = canon_lower(consumer_opts, producer_ft, producer1, thread, []) state,subi1 = unpack_result(ret) - assert(subi1 == 2) + assert(subi1 == 1) assert(state == Subtask.State.STARTED) [ret] = canon_lower(consumer_opts, producer_ft, producer2, thread, []) state,subi2 = unpack_result(ret) - assert(subi2 == 3) + assert(subi2 == 2) assert(state == Subtask.State.STARTING) [seti] = canon_waitable_set_new(thread) @@ -1101,12 +1101,12 @@ def consumer(thread, args): [ret] = canon_lower(consumer_opts, producer_ft, producer1, thread, []) state,subi1 = unpack_result(ret) - assert(subi1 == 2) + assert(subi1 == 1) assert(state == Subtask.State.STARTED) [ret] = canon_lower(consumer_opts, producer_ft, producer2, thread, []) state,subi2 = unpack_result(ret) - assert(subi2 == 3) + assert(subi2 == 2) assert(state == Subtask.State.STARTING) [seti] = canon_waitable_set_new(thread) @@ -1172,11 +1172,11 @@ def core_hostcall_pre(fut, thread, args): def core_func(thread, args): [ret] = canon_lower(lower_opts, ft, hostcall1, thread, []) state,subi1 = unpack_result(ret) - assert(subi1 == 2) + assert(subi1 == 1) assert(state == Subtask.State.STARTED) [ret] = canon_lower(lower_opts, ft, hostcall2, thread, []) state,subi2 = unpack_result(ret) - assert(subi2 == 3) + assert(subi2 == 2) assert(state == Subtask.State.STARTED) [seti] = canon_waitable_set_new(thread) @@ -1403,7 +1403,7 @@ def on_resolve(results): def core_func(thread, args): assert(len(args) == 1) rsi1 = args[0] - assert(rsi1 == 2) + assert(rsi1 == 1) [packed] = canon_stream_new(StreamType(U8Type()), thread) rsi2,wsi2 = unpack_new_ends(packed) [] = canon_task_return(thread, [StreamType(U8Type())], opts, [rsi2]) @@ -1499,7 +1499,7 @@ def on_resolve(results): def core_func(thread, args): [rsi1] = args - assert(rsi1 == 2) + assert(rsi1 == 1) [packed] = canon_stream_new(StreamType(U8Type()), thread) rsi2,wsi2 = unpack_new_ends(packed) [] = canon_task_return(thread, [StreamType(U8Type())], opts, [rsi2]) @@ -1521,7 +1521,7 @@ def core_func(thread, args): [ret] = canon_lower(opts, ft, host_import, thread, [rsi3, retp]) assert(ret == Subtask.State.RETURNED) rsi4 = mem[16] - assert(rsi4 == 5) + assert(rsi4 == 4) [ret] = canon_stream_write(StreamType(U8Type()), opts, thread, wsi3, 0, 4) assert(ret == definitions.BLOCKED) host_import_incoming.set_remain(100) @@ -1590,7 +1590,7 @@ def on_resolve(results): def core_func(thread, args): assert(len(args) == 1) rsi1 = args[0] - assert(rsi1 == 2) + assert(rsi1 == 1) return [rsi1] opts = mk_opts() @@ -1618,15 +1618,15 @@ def core_func(thread, args): assert(len(args) == 0) [packed] = canon_stream_new(StreamType(U8Type()), thread) rsi,wsi = unpack_new_ends(packed) - assert(rsi == 2) - assert(wsi == 3) + assert(rsi == 1) + assert(wsi == 2) [ret] = canon_stream_write(StreamType(U8Type()), opts, thread, wsi, 0, 4) assert(ret == definitions.BLOCKED) retp = 8 [ret] = canon_lower(opts, host_ft, host_import, thread, [rsi, retp]) assert(ret == Subtask.State.RETURNED) rsi2 = int.from_bytes(mem[retp : retp+4], 'little', signed=False) - assert(rsi2 == 2) + assert(rsi2 == 1) [ret] = canon_stream_cancel_write(StreamType(U8Type()), False, thread, wsi) result,n = unpack_result(ret) assert(result == CopyResult.CANCELLED and n == 0) @@ -1666,7 +1666,7 @@ def core_func(thread, args): [ret] = canon_lower(opts, source_ft, host_source, thread, [retp]) assert(ret == Subtask.State.RETURNED) rsi = mem[retp] - assert(rsi == 2) + assert(rsi == 1) [ret] = canon_stream_read(StreamType(U8Type()), opts, thread, rsi, 0, 4) result,n = unpack_result(ret) assert(n == 2 and result == CopyResult.COMPLETED) @@ -1690,8 +1690,8 @@ def core_func(thread, args): [packed] = canon_stream_new(StreamType(U8Type()), thread) rsi,wsi = unpack_new_ends(packed) - assert(rsi == 2) - assert(wsi == 4) + assert(rsi == 1) + assert(wsi == 3) [ret] = canon_lower(opts, sink_ft, host_sink, thread, [rsi]) assert(ret == Subtask.State.RETURNED) mem[0:6] = b'\x01\x02\x03\x04\x05\x06' @@ -1803,7 +1803,7 @@ def core_func2(thread, args): [ret] = canon_lower(opts2, ft1, func1, thread, [retp]) assert(ret == Subtask.State.RETURNED) rsi = mem2[retp] - assert(rsi == 2) + assert(rsi == 1) [ret] = canon_stream_read(StreamType(U8Type()), opts2, thread, rsi, 0, 8) assert(ret == definitions.BLOCKED) @@ -1914,7 +1914,7 @@ def core_func2(thread, args): [ret] = canon_lower(opts2, ft1, func1, thread, [retp]) assert(ret == Subtask.State.RETURNED) rsi = mem2[0] - assert(rsi == 2) + assert(rsi == 1) [ret] = canon_stream_read(StreamType(None), opts2, thread, rsi, 0, 8) assert(ret == definitions.BLOCKED) @@ -2170,7 +2170,7 @@ def core_func(thread, args): [ret] = canon_future_write(FutureType(U8Type()), lower_opts, thread, wfi, writep) assert(ret == CopyResult.COMPLETED) - while not thread.task.inst.table.get(rfi).has_pending_event(): + while not thread.task.inst.handles.get(rfi).has_pending_event(): canon_thread_yield(True, thread) [ret] = canon_future_cancel_read(FutureType(U8Type()), False, thread, rfi) @@ -2430,7 +2430,7 @@ def core_caller(thread, args): [ret] = canon_lower(caller_opts, ft, callee3, thread, [0, 0]) state,subi = unpack_result(ret) assert(state == Subtask.State.STARTED) - while caller_inst.table.get(subi).state == Subtask.State.STARTED: + while caller_inst.handles.get(subi).state == Subtask.State.STARTED: [_] = canon_thread_yield(True, thread) [ret] = canon_subtask_cancel(True, thread, subi) assert(ret == Subtask.State.RETURNED)