Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions build/deps/gen/deps/dep_capnp_cpp.MODULE.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
http = use_extension("@//:build/exts/http.bzl", "http")
http.archive(
name = "capnp-cpp",
sha256 = "0884582731fb8b9a6ef7a0d58056535d0480533256958d3eaae189d706bc43aa",
strip_prefix = "capnproto-capnproto-0776402/c++",
sha256 = "cbf7dcef02deb3a3addcfefacb76672c46a48c953024860bf80fceabc255d41d",
strip_prefix = "capnproto-capnproto-c1bce20/c++",
type = "tgz",
url = "https://github.com/capnproto/capnproto/tarball/07764022ba75c6924a250d5be0bf2e83250602a0",
url = "https://github.com/capnproto/capnproto/tarball/c1bce2095a8dd76851fe3c1c61550f79b69d671d",
)
use_repo(http, "capnp-cpp")
56 changes: 42 additions & 14 deletions src/workerd/api/basics.c++
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,10 @@ class AbortTriggerRpcClient final {

namespace {
// The jsrpc handler that receives aborts from the remote and triggers them locally
//
// TODO(cleanup): This class has been copied to external-pusher.c++. The copy here can be
// deleted as soon as we've switched from StreamSink to ExternalPusher and can delete all the
// StreamSink-related code. For now I'm not trying to avoid duplication.
class AbortTriggerRpcServer final: public rpc::AbortTrigger::Server {
public:
AbortTriggerRpcServer(kj::Own<kj::PromiseFulfiller<void>> fulfiller,
Expand Down Expand Up @@ -858,15 +862,28 @@ void AbortSignal::serialize(jsg::Lock& js, jsg::Serializer& serializer) {
return;
}

auto streamCap = externalHandler
->writeStream([&](rpc::JsValue::External::Builder builder) mutable {
builder.setAbortTrigger();
}).castAs<rpc::AbortTrigger>();
auto triggerCap = [&]() -> rpc::AbortTrigger::Client {
KJ_IF_SOME(pusher, externalHandler->getExternalPusher()) {
auto pipeline = pusher.pushAbortSignalRequest(capnp::MessageSize{2, 0}).sendForPipeline();

externalHandler->write(
[signal = pipeline.getSignal()](rpc::JsValue::External::Builder builder) mutable {
builder.setAbortSignal(kj::mv(signal));
});

return pipeline.getTrigger();
} else {
return externalHandler
->writeStream([&](rpc::JsValue::External::Builder builder) mutable {
builder.setAbortTrigger();
}).castAs<rpc::AbortTrigger>();
}
}();

auto& ioContext = IoContext::current();
// Keep track of every AbortSignal cloned from this one.
// If this->triggerAbort(...) is called, each rpcClient will be informed.
rpcClients.add(ioContext.addObject(kj::heap<AbortTriggerRpcClient>(kj::mv(streamCap))));
rpcClients.add(ioContext.addObject(kj::heap<AbortTriggerRpcClient>(kj::mv(triggerCap))));
}

jsg::Ref<AbortSignal> AbortSignal::deserialize(
Expand All @@ -890,20 +907,31 @@ jsg::Ref<AbortSignal> AbortSignal::deserialize(
return js.alloc<AbortSignal>(/* exception */ kj::none, /* maybeReason */ kj::none, flag);
}

auto reader = externalHandler->read();
KJ_REQUIRE(reader.isAbortTrigger(), "external table slot type does't match serialization tag");

// The AbortSignalImpl will receive any remote triggerAbort requests and fulfill the promise with the reason for abort

auto signal = js.alloc<AbortSignal>(/* exception */ kj::none, /* maybeReason */ kj::none, flag);

auto paf = kj::newPromiseAndFulfiller<void>();
auto pendingReason = IoContext::current().addObject(kj::refcounted<PendingReason>());
auto& ioctx = IoContext::current();

auto reader = externalHandler->read();
if (reader.isAbortTrigger()) {
// Old-style StreamSink.
// TODO(cleanup): Remove this once the ExternalPusher autogate has rolled out.
auto paf = kj::newPromiseAndFulfiller<void>();
auto pendingReason = ioctx.addObject(kj::refcounted<PendingReason>());

externalHandler->setLastStream(
kj::heap<AbortTriggerRpcServer>(kj::mv(paf.fulfiller), kj::addRef(*pendingReason)));
signal->rpcAbortPromise = ioctx.addObject(kj::heap(kj::mv(paf.promise)));
signal->pendingReason = kj::mv(pendingReason);
} else {
KJ_REQUIRE(reader.isAbortSignal(), "external table slot type does't match serialization tag");

auto resolvedSignal = ioctx.getExternalPusher()->unwrapAbortSignal(reader.getAbortSignal());

externalHandler->setLastStream(
kj::heap<AbortTriggerRpcServer>(kj::mv(paf.fulfiller), kj::addRef(*pendingReason)));
signal->rpcAbortPromise = IoContext::current().addObject(kj::heap(kj::mv(paf.promise)));
signal->pendingReason = kj::mv(pendingReason);
signal->rpcAbortPromise = ioctx.addObject(kj::heap(kj::mv(resolvedSignal.signal)));
signal->pendingReason = ioctx.addObject(kj::mv(resolvedSignal.reason));
}

return signal;
}
Expand Down
5 changes: 2 additions & 3 deletions src/workerd/api/basics.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
// TODO(cleanup): Rename to events.h?

#include <workerd/io/compatibility-date.capnp.h>
#include <workerd/io/external-pusher.h>
#include <workerd/io/io-own.h>
#include <workerd/io/worker-interface.capnp.h>
#include <workerd/jsg/jsg.h>
Expand Down Expand Up @@ -571,9 +572,7 @@ class AbortSignal final: public EventTarget {
jsg::Optional<jsg::JsRef<jsg::JsValue>> maybeReason = kj::none,
Flag flag = Flag::NONE);

using PendingReason = kj::RefcountedWrapper<
kj::OneOf<kj::Array<kj::byte> /* v8Serialized */, kj::Exception /* if capability is dropped */
>>;
using PendingReason = ExternalPusherImpl::PendingAbortReason;

// The AbortSignal explicitly does not expose a constructor(). It is
// illegal for user code to create an AbortSignal directly.
Expand Down
67 changes: 47 additions & 20 deletions src/workerd/api/streams/readable.c++
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,10 @@ jsg::Optional<uint32_t> ByteLengthQueuingStrategy::size(

namespace {

// TODO(cleanup): These classes have been copied to external-pusher.c++. The copies here can be
// deleted as soon as we've switched from StreamSink to ExternalPusher and can delete all the
// StreamSink-related code. For now I'm not trying to avoid duplication.

// HACK: We need as async pipe, like kj::newOneWayPipe(), except supporting explicit end(). So we
// wrap the two ends of the pipe in special adapters that track whether end() was called.
class ExplicitEndOutputPipeAdapter final: public capnp::ExplicitEndOutputStream {
Expand Down Expand Up @@ -676,18 +680,37 @@ void ReadableStream::serialize(jsg::Lock& js, jsg::Serializer& serializer) {
StreamEncoding encoding = controller.getPreferredEncoding();
auto expectedLength = controller.tryGetLength(encoding);

auto streamCap = externalHandler->writeStream(
[encoding, expectedLength](rpc::JsValue::External::Builder builder) mutable {
auto rs = builder.initReadableStream();
rs.setEncoding(encoding);
KJ_IF_SOME(l, expectedLength) {
rs.getExpectedLength().setKnown(l);
capnp::ByteStream::Client streamCap = [&]() {
KJ_IF_SOME(pusher, externalHandler->getExternalPusher()) {
auto req = pusher.pushByteStreamRequest(capnp::MessageSize{2, 0});
KJ_IF_SOME(el, expectedLength) {
req.setLengthPlusOne(el + 1);
}
auto pipeline = req.sendForPipeline();

externalHandler->write([encoding, expectedLength, source = pipeline.getSource()](
rpc::JsValue::External::Builder builder) mutable {
auto rs = builder.initReadableStream();
rs.setStream(kj::mv(source));
rs.setEncoding(encoding);
});

return pipeline.getSink();
} else {
return externalHandler
->writeStream(
[encoding, expectedLength](rpc::JsValue::External::Builder builder) mutable {
auto rs = builder.initReadableStream();
rs.setEncoding(encoding);
KJ_IF_SOME(l, expectedLength) {
rs.getExpectedLength().setKnown(l);
}
}).castAs<capnp::ByteStream>();
}
});
}();

kj::Own<capnp::ExplicitEndOutputStream> kjStream =
ioctx.getByteStreamFactory().capnpToKjExplicitEnd(
kj::mv(streamCap).castAs<capnp::ByteStream>());
ioctx.getByteStreamFactory().capnpToKjExplicitEnd(kj::mv(streamCap));

auto sink = newSystemStream(kj::mv(kjStream), encoding, ioctx);

Expand Down Expand Up @@ -718,21 +741,25 @@ jsg::Ref<ReadableStream> ReadableStream::deserialize(

auto& ioctx = IoContext::current();

kj::Maybe<uint64_t> expectedLength;
auto el = rs.getExpectedLength();
if (el.isKnown()) {
expectedLength = el.getKnown();
}
kj::Own<kj::AsyncInputStream> in;
if (rs.hasStream()) {
in = ioctx.getExternalPusher()->unwrapStream(rs.getStream());
} else {
kj::Maybe<uint64_t> expectedLength;
auto el = rs.getExpectedLength();
if (el.isKnown()) {
expectedLength = el.getKnown();
}

auto pipe = kj::newOneWayPipe(expectedLength);
auto pipe = kj::newOneWayPipe(expectedLength);

auto endedFlag = kj::refcounted<kj::RefcountedWrapper<bool>>(false);
auto endedFlag = kj::refcounted<kj::RefcountedWrapper<bool>>(false);

auto out = kj::heap<ExplicitEndOutputPipeAdapter>(kj::mv(pipe.out), kj::addRef(*endedFlag));
auto in =
kj::heap<ExplicitEndInputPipeAdapter>(kj::mv(pipe.in), kj::mv(endedFlag), expectedLength);
auto out = kj::heap<ExplicitEndOutputPipeAdapter>(kj::mv(pipe.out), kj::addRef(*endedFlag));
in = kj::heap<ExplicitEndInputPipeAdapter>(kj::mv(pipe.in), kj::mv(endedFlag), expectedLength);

externalHandler->setLastStream(ioctx.getByteStreamFactory().kjToCapnp(kj::mv(out)));
externalHandler->setLastStream(ioctx.getByteStreamFactory().kjToCapnp(kj::mv(out)));
}

return js.alloc<ReadableStream>(ioctx,
kj::heap<NoDeferredProxyReadableStream>(newSystemStream(kj::mv(in), encoding, ioctx), ioctx));
Expand Down
29 changes: 29 additions & 0 deletions src/workerd/api/tests/js-rpc-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -2066,3 +2066,32 @@ export let sendServiceStubOverRpc = {
}
},
};

// Make sure that calls are delivered in e-order, even in the presence of pushed externals.
export let eOrderTest = {
async test(controller, env, ctx) {
let abortController = new AbortController();
let abortSignal = abortController.signal;

let readableController;
let readableStream = new ReadableStream({
start(c) {
readableController = c;
},
});

let stub = await env.MyService.makeCounter(0);

let promises = [];
promises.push(stub.increment(1));
promises.push(stub.increment(1));
promises.push(stub.increment(1, abortSignal));
promises.push(stub.increment(1));
promises.push(stub.increment(1, readableStream));
promises.push(stub.increment(1));

let results = await Promise.all(promises);
Comment on lines +2085 to +2093
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

non-blocking nit:

Suggested change
let promises = [];
promises.push(stub.increment(1));
promises.push(stub.increment(1));
promises.push(stub.increment(1, abortSignal));
promises.push(stub.increment(1));
promises.push(stub.increment(1, readableStream));
promises.push(stub.increment(1));
let results = await Promise.all(promises);
const results = await Promise.all([
stub.increment(1),
stub.increment(1),
stub.increment(1, abortSignal),
stub.increment(1),
stub.increment(1, readableStream),
stub.increment(1),
]);

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I intentionally didn't write it that way since the order in which the increments are evaluated matters. While JavaScript probably guarantees that elements of an array literal are evaluated in left-to-right order (unlike C++), I didn't really want to write code that assumes anything about expression evaluation order.


assert.deepEqual(results, [1, 2, 3, 4, 5, 6]);
},
};
Loading