diff --git a/build/deps/gen/deps/dep_capnp_cpp.MODULE.bazel b/build/deps/gen/deps/dep_capnp_cpp.MODULE.bazel index 6d1cb726fbf..2163b753ae8 100644 --- a/build/deps/gen/deps/dep_capnp_cpp.MODULE.bazel +++ b/build/deps/gen/deps/dep_capnp_cpp.MODULE.bazel @@ -3,9 +3,9 @@ http = use_extension("@//:build/exts/http.bzl", "http") http.archive( name = "capnp-cpp", - sha256 = "0884582731fb8b9a6ef7a0d58056535d0480533256958d3eaae189d706bc43aa", - strip_prefix = "capnproto-capnproto-0776402/c++", + sha256 = "cbf7dcef02deb3a3addcfefacb76672c46a48c953024860bf80fceabc255d41d", + strip_prefix = "capnproto-capnproto-c1bce20/c++", type = "tgz", - url = "https://github.com/capnproto/capnproto/tarball/07764022ba75c6924a250d5be0bf2e83250602a0", + url = "https://github.com/capnproto/capnproto/tarball/c1bce2095a8dd76851fe3c1c61550f79b69d671d", ) use_repo(http, "capnp-cpp") diff --git a/src/workerd/api/basics.c++ b/src/workerd/api/basics.c++ index 73bf6bca19f..b741370ad1e 100644 --- a/src/workerd/api/basics.c++ +++ b/src/workerd/api/basics.c++ @@ -572,52 +572,6 @@ class AbortTriggerRpcClient final { rpc::AbortTrigger::Client client; }; -namespace { -// The jsrpc handler that receives aborts from the remote and triggers them locally -class AbortTriggerRpcServer final: public rpc::AbortTrigger::Server { - public: - AbortTriggerRpcServer(kj::Own> fulfiller, - kj::Own&& pendingReason) - : fulfiller(kj::mv(fulfiller)), - pendingReason(kj::mv(pendingReason)) {} - - kj::Promise abort(AbortContext abortCtx) override { - auto params = abortCtx.getParams(); - auto reason = params.getReason().getV8Serialized(); - - pendingReason->getWrapped() = kj::heapArray(reason.asBytes()); - fulfiller->fulfill(); - return kj::READY_NOW; - } - - kj::Promise release(ReleaseContext releaseCtx) override { - released = true; - return kj::READY_NOW; - } - - ~AbortTriggerRpcServer() noexcept(false) { - if (pendingReason->getWrapped() != nullptr) { - // Already triggered - return; - } - - if (!released) { - pendingReason->getWrapped() = JSG_KJ_EXCEPTION(FAILED, DOMAbortError, - "An AbortSignal received over RPC was implicitly aborted because the connection back to " - "its trigger was lost."); - } - - // Always fulfill the promise in case the AbortSignal was waiting - fulfiller->fulfill(); - } - - private: - kj::Own> fulfiller; - kj::Own pendingReason; - bool released = false; -}; -} // namespace - AbortSignal::AbortSignal(kj::Maybe exception, jsg::Optional> maybeReason, Flag flag) @@ -858,15 +812,19 @@ void AbortSignal::serialize(jsg::Lock& js, jsg::Serializer& serializer) { return; } - auto streamCap = externalHandler - ->writeStream([&](rpc::JsValue::External::Builder builder) mutable { - builder.setAbortTrigger(); - }).castAs(); + auto pipeline = externalHandler->getExternalPusher() + .pushAbortSignalRequest(capnp::MessageSize{2, 0}) + .sendForPipeline(); + + externalHandler->write( + [signal = pipeline.getSignal()](rpc::JsValue::External::Builder builder) mutable { + builder.setAbortSignal(kj::mv(signal)); + }); auto& ioContext = IoContext::current(); // Keep track of every AbortSignal cloned from this one. // If this->triggerAbort(...) is called, each rpcClient will be informed. - rpcClients.add(ioContext.addObject(kj::heap(kj::mv(streamCap)))); + rpcClients.add(ioContext.addObject(kj::heap(pipeline.getTrigger()))); } jsg::Ref AbortSignal::deserialize( @@ -890,20 +848,19 @@ jsg::Ref AbortSignal::deserialize( return js.alloc(/* exception */ kj::none, /* maybeReason */ kj::none, flag); } - auto reader = externalHandler->read(); - KJ_REQUIRE(reader.isAbortTrigger(), "external table slot type does't match serialization tag"); - // The AbortSignalImpl will receive any remote triggerAbort requests and fulfill the promise with the reason for abort auto signal = js.alloc(/* exception */ kj::none, /* maybeReason */ kj::none, flag); - auto paf = kj::newPromiseAndFulfiller(); - auto pendingReason = IoContext::current().addObject(kj::refcounted()); + auto& ioctx = IoContext::current(); + + auto reader = externalHandler->read(); + KJ_REQUIRE(reader.isAbortSignal(), "external table slot type does't match serialization tag"); + + auto resolvedSignal = ioctx.getExternalPusher()->unwrapAbortSignal(reader.getAbortSignal()); - externalHandler->setLastStream( - kj::heap(kj::mv(paf.fulfiller), kj::addRef(*pendingReason))); - signal->rpcAbortPromise = IoContext::current().addObject(kj::heap(kj::mv(paf.promise))); - signal->pendingReason = kj::mv(pendingReason); + signal->rpcAbortPromise = ioctx.addObject(kj::heap(kj::mv(resolvedSignal.signal))); + signal->pendingReason = ioctx.addObject(kj::mv(resolvedSignal.reason)); return signal; } diff --git a/src/workerd/api/basics.h b/src/workerd/api/basics.h index 5aac73e3ea7..8e75294a051 100644 --- a/src/workerd/api/basics.h +++ b/src/workerd/api/basics.h @@ -8,6 +8,7 @@ // TODO(cleanup): Rename to events.h? #include +#include #include #include #include @@ -571,9 +572,7 @@ class AbortSignal final: public EventTarget { jsg::Optional> maybeReason = kj::none, Flag flag = Flag::NONE); - using PendingReason = kj::RefcountedWrapper< - kj::OneOf /* v8Serialized */, kj::Exception /* if capability is dropped */ - >>; + using PendingReason = ExternalPusherImpl::PendingAbortReason; // The AbortSignal explicitly does not expose a constructor(). It is // illegal for user code to create an AbortSignal directly. diff --git a/src/workerd/api/streams/readable.c++ b/src/workerd/api/streams/readable.c++ index 86237b85cfd..cfa7ccc6276 100644 --- a/src/workerd/api/streams/readable.c++ +++ b/src/workerd/api/streams/readable.c++ @@ -519,89 +519,6 @@ jsg::Optional ByteLengthQueuingStrategy::size( namespace { -// HACK: We need as async pipe, like kj::newOneWayPipe(), except supporting explicit end(). So we -// wrap the two ends of the pipe in special adapters that track whether end() was called. -class ExplicitEndOutputPipeAdapter final: public capnp::ExplicitEndOutputStream { - public: - ExplicitEndOutputPipeAdapter( - kj::Own inner, kj::Own> ended) - : inner(kj::mv(inner)), - ended(kj::mv(ended)) {} - - kj::Promise write(kj::ArrayPtr buffer) override { - return KJ_REQUIRE_NONNULL(inner)->write(buffer); - } - kj::Promise write(kj::ArrayPtr> pieces) override { - return KJ_REQUIRE_NONNULL(inner)->write(pieces); - } - - kj::Maybe> tryPumpFrom( - kj::AsyncInputStream& input, uint64_t amount) override { - return KJ_REQUIRE_NONNULL(inner)->tryPumpFrom(input, amount); - } - - kj::Promise whenWriteDisconnected() override { - return KJ_REQUIRE_NONNULL(inner)->whenWriteDisconnected(); - } - - kj::Promise end() override { - // Signal to the other side that end() was actually called. - ended->getWrapped() = true; - inner = kj::none; - return kj::READY_NOW; - } - - private: - kj::Maybe> inner; - kj::Own> ended; -}; - -class ExplicitEndInputPipeAdapter final: public kj::AsyncInputStream { - public: - ExplicitEndInputPipeAdapter(kj::Own inner, - kj::Own> ended, - kj::Maybe expectedLength) - : inner(kj::mv(inner)), - ended(kj::mv(ended)), - expectedLength(expectedLength) {} - - kj::Promise tryRead(void* buffer, size_t minBytes, size_t maxBytes) override { - size_t result = co_await inner->tryRead(buffer, minBytes, maxBytes); - - KJ_IF_SOME(l, expectedLength) { - KJ_ASSERT(result <= l); - l -= result; - if (l == 0) { - // If we got all the bytes we expected, we treat this as a successful end, because the - // underlying KJ pipe is not actually going to wait for the other side to drop. This is - // consistent with the behavior of Content-Length in HTTP anyway. - ended->getWrapped() = true; - } - } - - if (result < minBytes) { - // Verify that end() was called. - if (!ended->getWrapped()) { - JSG_FAIL_REQUIRE(Error, "ReadableStream received over RPC disconnected prematurely."); - } - } - co_return result; - } - - kj::Maybe tryGetLength() override { - return inner->tryGetLength(); - } - - kj::Promise pumpTo(kj::AsyncOutputStream& output, uint64_t amount) override { - return inner->pumpTo(output, amount); - } - - private: - kj::Own inner; - kj::Own> ended; - kj::Maybe expectedLength; -}; - // Wrapper around ReadableStreamSource that prevents deferred proxying. We need this for RPC // streams because although they are "system streams", they become disconnected when the IoContext // is destroyed, due to the JsRpcCustomEvent being canceled. @@ -676,18 +593,21 @@ void ReadableStream::serialize(jsg::Lock& js, jsg::Serializer& serializer) { StreamEncoding encoding = controller.getPreferredEncoding(); auto expectedLength = controller.tryGetLength(encoding); - auto streamCap = externalHandler->writeStream( - [encoding, expectedLength](rpc::JsValue::External::Builder builder) mutable { + auto req = externalHandler->getExternalPusher().pushByteStreamRequest(capnp::MessageSize{2, 0}); + KJ_IF_SOME(el, expectedLength) { + req.setLengthPlusOne(el + 1); + } + auto pipeline = req.sendForPipeline(); + + externalHandler->write([encoding, expectedLength, source = pipeline.getSource()]( + rpc::JsValue::External::Builder builder) mutable { auto rs = builder.initReadableStream(); + rs.setStream(kj::mv(source)); rs.setEncoding(encoding); - KJ_IF_SOME(l, expectedLength) { - rs.getExpectedLength().setKnown(l); - } }); kj::Own kjStream = - ioctx.getByteStreamFactory().capnpToKjExplicitEnd( - kj::mv(streamCap).castAs()); + ioctx.getByteStreamFactory().capnpToKjExplicitEnd(pipeline.getSink()); auto sink = newSystemStream(kj::mv(kjStream), encoding, ioctx); @@ -718,21 +638,7 @@ jsg::Ref ReadableStream::deserialize( auto& ioctx = IoContext::current(); - kj::Maybe expectedLength; - auto el = rs.getExpectedLength(); - if (el.isKnown()) { - expectedLength = el.getKnown(); - } - - auto pipe = kj::newOneWayPipe(expectedLength); - - auto endedFlag = kj::refcounted>(false); - - auto out = kj::heap(kj::mv(pipe.out), kj::addRef(*endedFlag)); - auto in = - kj::heap(kj::mv(pipe.in), kj::mv(endedFlag), expectedLength); - - externalHandler->setLastStream(ioctx.getByteStreamFactory().kjToCapnp(kj::mv(out))); + auto in = ioctx.getExternalPusher()->unwrapStream(rs.getStream()); return js.alloc(ioctx, kj::heap(newSystemStream(kj::mv(in), encoding, ioctx), ioctx)); diff --git a/src/workerd/api/tests/abortsignal-test.wd-test b/src/workerd/api/tests/abortsignal-test.wd-test index f7cf881cb8d..38628557091 100644 --- a/src/workerd/api/tests/abortsignal-test.wd-test +++ b/src/workerd/api/tests/abortsignal-test.wd-test @@ -1,19 +1,21 @@ using Workerd = import "/workerd/workerd.capnp"; +const services :List(Workerd.Service) = [ + ( name = "abortsignal-test", + worker = ( + modules = [ + (name = "worker", esModule = embed "abortsignal-test.js") + ], + compatibilityDate = "2025-01-01", + compatibilityFlags = ["nodejs_compat", "enable_abortsignal_rpc", "experimental"], + bindings = [ + (name = "RpcRemoteEnd", service = (name = "abortsignal-test", entrypoint = "RpcRemoteEnd")), + ] + ) + ), +]; + const unitTests :Workerd.Config = ( - services = [ - ( name = "abortsignal-test", - worker = ( - modules = [ - (name = "worker", esModule = embed "abortsignal-test.js") - ], - compatibilityDate = "2025-01-01", - compatibilityFlags = ["nodejs_compat", "enable_abortsignal_rpc", "experimental"], - bindings = [ - (name = "RpcRemoteEnd", service = (name = "abortsignal-test", entrypoint = "RpcRemoteEnd")), - ] - ) - ), - ], + services = .services, v8Flags = ["--expose-gc"] ); diff --git a/src/workerd/api/tests/js-rpc-test.js b/src/workerd/api/tests/js-rpc-test.js index cbe74d18e33..2f9e84ccabc 100644 --- a/src/workerd/api/tests/js-rpc-test.js +++ b/src/workerd/api/tests/js-rpc-test.js @@ -2066,3 +2066,32 @@ export let sendServiceStubOverRpc = { } }, }; + +// Make sure that calls are delivered in e-order, even in the presence of pushed externals. +export let eOrderTest = { + async test(controller, env, ctx) { + let abortController = new AbortController(); + let abortSignal = abortController.signal; + + let readableController; + let readableStream = new ReadableStream({ + start(c) { + readableController = c; + }, + }); + + let stub = await env.MyService.makeCounter(0); + + let promises = []; + promises.push(stub.increment(1)); + promises.push(stub.increment(1)); + promises.push(stub.increment(1, abortSignal)); + promises.push(stub.increment(1)); + promises.push(stub.increment(1, readableStream)); + promises.push(stub.increment(1)); + + let results = await Promise.all(promises); + + assert.deepEqual(results, [1, 2, 3, 4, 5, 6]); + }, +}; diff --git a/src/workerd/api/tests/js-rpc-test.wd-test b/src/workerd/api/tests/js-rpc-test.wd-test index da9c62b62cd..9ff825fbeff 100644 --- a/src/workerd/api/tests/js-rpc-test.wd-test +++ b/src/workerd/api/tests/js-rpc-test.wd-test @@ -1,40 +1,42 @@ using Workerd = import "/workerd/workerd.capnp"; -const unitTests :Workerd.Config = ( - services = [ - ( name = "js-rpc-test", - worker = ( - modules = [ - (name = "worker", esModule = embed "js-rpc-test.js") - ], - compatibilityDate = "2024-01-01", - compatibilityFlags = [ - "nodejs_compat", - "fetcher_no_get_put_delete", - "enable_abortsignal_rpc", - "enhanced_error_serialization", - "enable_ctx_exports", - "experimental" - ], - bindings = [ - (name = "self", service = (name = "js-rpc-test", entrypoint = "nonClass")), - (name = "MyService", service = (name = "js-rpc-test", entrypoint = "MyService")), - (name = "MyServiceProxy", service = (name = "js-rpc-test", entrypoint = "MyServiceProxy")), - (name = "MyActor", durableObjectNamespace = "MyActor"), - (name = "ActorNoExtends", durableObjectNamespace = "ActorNoExtends"), - (name = "defaultExport", service = "js-rpc-test"), - (name = "twelve", json = "12"), - (name = "GreeterFactory", service = (name = "js-rpc-test", entrypoint = "GreeterFactory")), - ], +const services :List(Workerd.Service) = [ + ( name = "js-rpc-test", + worker = ( + modules = [ + (name = "worker", esModule = embed "js-rpc-test.js") + ], + compatibilityDate = "2024-01-01", + compatibilityFlags = [ + "nodejs_compat", + "fetcher_no_get_put_delete", + "enable_abortsignal_rpc", + "enhanced_error_serialization", + "enable_ctx_exports", + "experimental" + ], + bindings = [ + (name = "self", service = (name = "js-rpc-test", entrypoint = "nonClass")), + (name = "MyService", service = (name = "js-rpc-test", entrypoint = "MyService")), + (name = "MyServiceProxy", service = (name = "js-rpc-test", entrypoint = "MyServiceProxy")), + (name = "MyActor", durableObjectNamespace = "MyActor"), + (name = "ActorNoExtends", durableObjectNamespace = "ActorNoExtends"), + (name = "defaultExport", service = "js-rpc-test"), + (name = "twelve", json = "12"), + (name = "GreeterFactory", service = (name = "js-rpc-test", entrypoint = "GreeterFactory")), + ], + + durableObjectNamespaces = [ + (className = "MyActor", uniqueKey = "foo"), + (className = "ActorNoExtends", uniqueKey = "bar"), + ], - durableObjectNamespaces = [ - (className = "MyActor", uniqueKey = "foo"), - (className = "ActorNoExtends", uniqueKey = "bar"), - ], + durableObjectStorage = (inMemory = void), + ) + ), +]; - durableObjectStorage = (inMemory = void), - ) - ), - ], +const unitTests :Workerd.Config = ( + services = .services, v8Flags = [ "--expose-gc" ], ); diff --git a/src/workerd/api/worker-rpc.c++ b/src/workerd/api/worker-rpc.c++ index aa856d2504a..baa4b4315c5 100644 --- a/src/workerd/api/worker-rpc.c++ +++ b/src/workerd/api/worker-rpc.c++ @@ -15,111 +15,6 @@ namespace workerd::api { -namespace { - -using StreamSinkFulfiller = kj::Own>; - -} // namespace - -// Implementation of StreamSink RPC interface. The stream sender calls `startStream()` when -// serializing each stream, and the recipient calls `setSlot()` when deserializing streams to -// provide the appropriate destination capability. This class is designed to allow these two -// calls to happen in either order for each slot. -class StreamSinkImpl final: public rpc::JsValue::StreamSink::Server, public kj::Refcounted { - public: - ~StreamSinkImpl() noexcept(false) { - for (auto& slot: table) { - KJ_IF_SOME(f, slot.tryGet()) { - f->reject(KJ_EXCEPTION(FAILED, "expected startStream() was never received")); - } - } - } - - void setSlot(uint i, capnp::Capability::Client stream) { - if (table.size() <= i) table.resize(i + 1); - - if (table[i] == nullptr) { - table[i] = kj::mv(stream); - } else KJ_SWITCH_ONEOF(table[i]) { - KJ_CASE_ONEOF(stream, capnp::Capability::Client) { - KJ_FAIL_REQUIRE("setSlot() tried to set the same slot twice", i); - } - KJ_CASE_ONEOF(fulfiller, StreamFulfiller) { - fulfiller->fulfill(kj::mv(stream)); - table[i] = Consumed(); - } - KJ_CASE_ONEOF(_, Consumed) { - KJ_FAIL_REQUIRE("setSlot() tried to set the same slot twice", i); - } - } - } - - kj::Promise startStream(StartStreamContext context) override { - uint i = context.getParams().getExternalIndex(); - - if (table.size() <= i) { - // guard against ridiculous table allocation - JSG_REQUIRE(i < 1024, Error, "Too many streams in one message."); - table.resize(i + 1); - } - - if (table[i] == nullptr) { - auto paf = kj::newPromiseAndFulfiller(); - table[i] = kj::mv(paf.fulfiller); - context.getResults(capnp::MessageSize{4, 1}).setStream(kj::mv(paf.promise)); - } else KJ_SWITCH_ONEOF(table[i]) { - KJ_CASE_ONEOF(stream, capnp::Capability::Client) { - context.getResults(capnp::MessageSize{4, 1}).setStream(kj::mv(stream)); - table[i] = Consumed(); - } - KJ_CASE_ONEOF(fulfiller, StreamFulfiller) { - KJ_FAIL_REQUIRE("startStream() tried to start the same stream twice", i); - } - KJ_CASE_ONEOF(_, Consumed) { - KJ_FAIL_REQUIRE("startStream() tried to start the same stream twice", i); - } - } - - return kj::READY_NOW; - } - - private: - using StreamFulfiller = kj::Own>; - struct Consumed {}; - - // Each slot starts out null (uninitialized). It becomes a Capability::Client if setSlot() is - // called first, or a StreamFulfiller if startStream() is called first. It becomes `Consumed` - // when the other method is called. - // HACK: Slots in the table take advantage of the little-known fact that OneOf has a "null" - // value, which is the value a OneOf has when default-initialized. This is useful because we - // don't want to explicitly initialize skipped slots. Maybe would be another option - // here, but would add 8 bytes to every slot just to store a boolean... feels bloated. There - // are only two methods in this class so I think it's OK. - using Slot = kj::OneOf; - - kj::Vector table; -}; - -capnp::Capability::Client RpcSerializerExternalHandler::writeStream(BuilderCallback callback) { - rpc::JsValue::StreamSink::Client* streamSinkPtr; - KJ_IF_SOME(ss, streamSink) { - streamSinkPtr = &ss; - } else { - // First stream written, set up the StreamSink. - streamSinkPtr = &streamSink.emplace(getStreamSinkFunc()); - } - - auto result = ({ - auto req = streamSinkPtr->startStreamRequest(capnp::MessageSize{4, 0}); - req.setExternalIndex(externals.size()); - req.send().getStream(); - }); - - write(kj::mv(callback)); - - return result; -} - capnp::Orphan> RpcSerializerExternalHandler::build( capnp::Orphanage orphanage) { auto result = orphanage.newOrphan>(externals.size()); @@ -141,17 +36,6 @@ rpc::JsValue::External::Reader RpcDeserializerExternalHandler::read() { return externals[i++]; } -void RpcDeserializerExternalHandler::setLastStream(capnp::Capability::Client stream) { - KJ_IF_SOME(ss, streamSink) { - ss.setSlot(i - 1, kj::mv(stream)); - } else { - auto ss = kj::refcounted(); - ss->setSlot(i - 1, kj::mv(stream)); - streamSink = *ss; - streamSinkCap = rpc::JsValue::StreamSink::Client(kj::mv(ss)); - } -} - namespace { // Call to construct an `rpc::JsValue` from a JS value. @@ -199,15 +83,13 @@ void serializeJsValue(jsg::Lock& js, struct DeserializeResult { jsg::JsValue value; kj::Own disposalGroup; - kj::Maybe streamSink; }; // Call to construct a JS value from an `rpc::JsValue`. -DeserializeResult deserializeJsValue( - jsg::Lock& js, rpc::JsValue::Reader reader, kj::Maybe streamSink = kj::none) { +DeserializeResult deserializeJsValue(jsg::Lock& js, rpc::JsValue::Reader reader) { auto disposalGroup = kj::heap(); - RpcDeserializerExternalHandler externalHandler(reader.getExternals(), *disposalGroup, streamSink); + RpcDeserializerExternalHandler externalHandler(reader.getExternals(), *disposalGroup); jsg::Deserializer deserializer(js, reader.getV8Serialized(), kj::none, kj::none, jsg::Deserializer::Options{ @@ -227,15 +109,14 @@ DeserializeResult deserializeJsValue( return { .value = deserializer.readValue(js), .disposalGroup = kj::mv(disposalGroup), - .streamSink = externalHandler.getStreamSink(), }; } // Does deserializeJsValue() and then adds a `dispose()` method to the returned object (if it is // an object) which disposes all stubs therein. jsg::JsValue deserializeRpcReturnValue( - jsg::Lock& js, rpc::JsRpcTarget::CallResults::Reader callResults, StreamSinkImpl& streamSink) { - auto [value, disposalGroup, _] = deserializeJsValue(js, callResults.getResult(), streamSink); + jsg::Lock& js, rpc::JsRpcTarget::CallResults::Reader callResults) { + auto [value, disposalGroup] = deserializeJsValue(js, callResults.getResult()); // If the object had a disposer on the callee side, it will run when we discard the callPipeline, // so attach that to the disposal group on the caller side. If the returned object did NOT have @@ -478,8 +359,6 @@ JsRpcPromiseAndPipeline callImpl(jsg::Lock& js, } } - kj::Maybe paramsStreamSinkFulfiller; - KJ_IF_SOME(args, maybeArgs) { // If we have arguments, serialize them. // Note that we may fail to serialize some element, in which case this will throw back to @@ -496,15 +375,7 @@ JsRpcPromiseAndPipeline callImpl(jsg::Lock& js, ? RpcSerializerExternalHandler::DUPLICATE : RpcSerializerExternalHandler::TRANSFER; - RpcSerializerExternalHandler externalHandler( - stubOwnership, [&]() -> rpc::JsValue::StreamSink::Client { - // A stream was encountered in the params, so we must expect the response to contain - // paramsStreamSink. But we don't have the response yet. So, we need to set up a - // temporary promise client, which we hook to the response a little bit later. - auto paf = kj::newPromiseAndFulfiller(); - paramsStreamSinkFulfiller = kj::mv(paf.fulfiller); - return kj::mv(paf.promise); - }); + RpcSerializerExternalHandler externalHandler(stubOwnership, client); serializeJsValue(js, jsg::JsValue(arr), externalHandler, [&](capnp::MessageSize hint) { // TODO(perf): Actually use the size hint. return builder.getOperation().initCallWithArgs(); @@ -515,31 +386,36 @@ JsRpcPromiseAndPipeline callImpl(jsg::Lock& js, builder.getOperation().setGetProperty(); } - // Unfortunately, we always have to send a `resultsStreamSink` because we don't know until - // after the call completes whether or not it will return any streams. If it's unused, - // though, it should only be a couple allocations. - auto resultStreamSink = kj::refcounted(); - builder.setResultsStreamSink(kj::addRef(*resultStreamSink)); - + // Unfortunately, we always have to send the ExternalPusher since we don't know whether the + // call will return any streams (or other pushed externals). Luckily, it's a + // one-per-IoContext object, not a big deal. (It'll take a slot on the capnp export table + // though.) + builder.getResultsStreamHandler().setExternalPusher(ioContext.getExternalPusher()); auto callResult = builder.send(); - KJ_IF_SOME(ssf, paramsStreamSinkFulfiller) { - ssf->fulfill(callResult.getParamsStreamSink()); - } - // We need to arrange that our JsRpcPromise will updated in-place with the final settlement // of this RPC promise. However, we can't actually construct the JsRpcPromise until we have // the final promise to give it. To resolve the cycle, we only create a JsRpcPromise::WeakRef // here, which is filled in later on to point at the JsRpcPromise, if and when one is created. auto weakRef = kj::atomicRefcounted(); + // HACK: Make sure that any calls to the ExternalPusher get to us before we try to + // deserialize the result. A weird quirk of Cap'n Proto is that return values arrive faster + // than calls by 1 turn of the event loop, so if we just insert a turn here we should be OK. + // + // Note that key to this working is the fact that the continuation returns a Promise, even + // though it is initialized with an immediate value. This forces the extra turn. + auto promise = callResult.then( + [](auto resp) -> kj::Promise> { + return kj::mv(resp); + }); + // RemotePromise lets us consume its pipeline and promise portions independently; we consume // the promise here and we consume the pipeline below, both via kj::mv(). - auto jsPromise = ioContext.awaitIo(js, kj::mv(callResult), - [weakRef = kj::atomicAddRef(*weakRef), resultStreamSink = kj::mv(resultStreamSink)]( - jsg::Lock& js, + auto jsPromise = ioContext.awaitIo(js, kj::mv(promise), + [weakRef = kj::atomicAddRef(*weakRef)](jsg::Lock& js, capnp::Response response) mutable -> jsg::Value { - auto jsResult = deserializeRpcReturnValue(js, response, *resultStreamSink); + auto jsResult = deserializeRpcReturnValue(js, response); if (weakRef->disposed) { // The promise was explicitly disposed before it even resolved. This means we must dispose @@ -912,9 +788,9 @@ using Result = kj::OneOf; template MakeCallPipeline::Result serializeJsValueWithPipeline(jsg::Lock& js, + rpc::JsValue::ExternalPusher::Client externalPusher, jsg::JsValue value, - Func makeBuilder, - RpcSerializerExternalHandler::GetStreamSinkFunc getStreamSinkFunc); + Func makeBuilder); // Callee-side implementation of JsRpcTarget. // @@ -933,7 +809,8 @@ class JsRpcTargetBase: public rpc::JsRpcTarget::Server { : enterIsolateAndCall(ctx.makeReentryCallback( [this, &ctx](Worker::Lock& lock, CallContext callContext) { return callImpl(lock, ctx, callContext); - })) {} + })), + externalPusher(ctx.getExternalPusher()) {} // Constructor use by EntrypointJsRpcTarget, which is revoked and destroyed before the IoContext // can possibly be canceled. It can just use ctx.run(). @@ -944,7 +821,8 @@ class JsRpcTargetBase: public rpc::JsRpcTarget::Server { return ctx.run([this, &ctx, callContext](Worker::Lock& lock) mutable { return callImpl(lock, ctx, callContext); }); - }) {} + }), + externalPusher(ctx.getExternalPusher()) {} struct EnvCtx { v8::Local env; @@ -968,8 +846,10 @@ class JsRpcTargetBase: public rpc::JsRpcTarget::Server { // Handles the delivery of JS RPC method calls. kj::Promise call(CallContext callContext) override { + co_await kj::yield(); + // Try to execute the requested method. - return enterIsolateAndCall(callContext).catch_([](kj::Exception&& e) { + co_return co_await enterIsolateAndCall(callContext).catch_([](kj::Exception&& e) { if (jsg::isTunneledException(e.getDescription())) { // Annotate exceptions in RPC worker calls as remote exceptions. auto description = jsg::stripRemoteExceptionPrefix(e.getDescription()); @@ -983,6 +863,18 @@ class JsRpcTargetBase: public rpc::JsRpcTarget::Server { }); } + // Implements ExternalPusher by forwarding to the shared implementation. + // + // Note JsRpcTarget has to implement `ExternalPusher` directly rather than providing a method + // like `getExternalPusher()` because it's important that the pushes arrive before the call, and + // the ordering can only be guaranteed if they're on the same object. + kj::Promise pushByteStream(PushByteStreamContext context) override { + return externalPusher->pushByteStream(context); + } + kj::Promise pushAbortSignal(PushAbortSignalContext context) override { + return externalPusher->pushAbortSignal(context); + } + KJ_DISALLOW_COPY_AND_MOVE(JsRpcTargetBase); private: @@ -992,6 +884,8 @@ class JsRpcTargetBase: public rpc::JsRpcTarget::Server { // using IoContext::makeReentryCallback(). kj::Function(CallContext callContext)> enterIsolateAndCall; + kj::Rc externalPusher; + // Returns true if the given name cannot be used as a method on this type. virtual bool isReservedName(kj::StringPtr name) = 0; @@ -1036,27 +930,6 @@ class JsRpcTargetBase: public rpc::JsRpcTarget::Server { // Given a handle for the result, if it's a promise, await the promise, then serialize the // final result for return. - kj::Maybe>> callPipelineFulfiller; - - // We need another ref to this fulfiller for the error callback. It can rely on being - // destroyed at the same time as the success callback. - kj::Maybe&> callPipelineFulfillerRef; - - KJ_IF_SOME(ss, invocationResult.streamSink) { - // Since we have a StreamSink, it's important that we hook up the pipeline for that - // immediately. Annoyingly, that also means we need to hook up a pipeline for - // callPipeline, which we don't actually have yet, so we need to promise-ify it. - - auto paf = kj::newPromiseAndFulfiller(); - callPipelineFulfillerRef = *paf.fulfiller; - callPipelineFulfiller = kj::mv(paf.fulfiller); - - capnp::PipelineBuilder builder(16); - builder.setCallPipeline(kj::mv(paf.promise)); - builder.setParamsStreamSink(ss); - callContext.setPipeline(builder.build()); - } - // HACK: Cap'n Proto call contexts are documented as being pointer-like types where the // backing object's lifetime is that of the RPC call, but in reality they are refcounted // under the hood. Since we'll be executing the call in the JS microtask queue, we have no @@ -1077,22 +950,18 @@ class JsRpcTargetBase: public rpc::JsRpcTarget::Server { // must take full ownership. [callContext, ownCallContext = kj::mv(ownCallContext), paramDisposalGroup = kj::mv(invocationResult.paramDisposalGroup), - paramsStreamSink = kj::mv(invocationResult.streamSink), - resultStreamSink = params.getResultsStreamSink(), - callPipelineFulfiller = kj::mv(callPipelineFulfiller)]( + resultsExternalPusher = + params.getResultsStreamHandler().getExternalPusher()]( jsg::Lock& js, jsg::Value value) mutable { jsg::JsValue resultValue(value.getHandle(js)); rpc::JsRpcTarget::CallResults::Builder results = nullptr; - auto maybePipeline = - serializeJsValueWithPipeline(js, resultValue, [&](capnp::MessageSize hint) { + auto maybePipeline = serializeJsValueWithPipeline( + js, resultsExternalPusher, resultValue, [&](capnp::MessageSize hint) { hint.wordCount += capnp::sizeInWords(); hint.capCount += 1; // for callPipeline results = callContext.initResults(hint); return results.initResult(); - }, [&]() -> rpc::JsValue::StreamSink::Client { - // The results contain streams. We return the resultsStreamSink passed in the request. - return kj::mv(resultStreamSink); }); KJ_SWITCH_ONEOF(maybePipeline) { @@ -1118,25 +987,9 @@ class JsRpcTargetBase: public rpc::JsRpcTarget::Server { } } - KJ_IF_SOME(cpf, callPipelineFulfiller) { - cpf->fulfill(results.getCallPipeline()); - } - - KJ_IF_SOME(ss, paramsStreamSink) { - results.setParamsStreamSink(kj::mv(ss)); - } - // paramDisposalGroup will be destroyed when we return (or when this lambda is destroyed // as a result of the promise being rejected). This will implicitly dispose the param // stubs. - }), - ctx.addFunctor([callPipelineFulfillerRef](jsg::Lock& js, jsg::Value&& error) { - // If we set up a `callPipeline` early, we have to make sure it propagates the error. - // (Otherwise we get a PromiseFulfiller error instead, which is pretty useless...) - KJ_IF_SOME(cpf, callPipelineFulfillerRef) { - cpf.reject(js.exceptionToKj(error.addRef(js))); - } - js.throwException(kj::mv(error)); }))); if (ctx.hasOutputGate()) { @@ -1332,7 +1185,6 @@ class JsRpcTargetBase: public rpc::JsRpcTarget::Server { struct InvocationResult { v8::Local returnValue; kj::Maybe> paramDisposalGroup; - kj::Maybe streamSink; }; // Deserializes the arguments and passes them to the given function. @@ -1342,7 +1194,7 @@ class JsRpcTargetBase: public rpc::JsRpcTarget::Server { kj::Maybe args) { // We received arguments from the client, deserialize them back to JS. KJ_IF_SOME(a, args) { - auto [value, disposalGroup, streamSink] = deserializeJsValue(js, a); + auto [value, disposalGroup] = deserializeJsValue(js, a); auto args = KJ_REQUIRE_NONNULL( value.tryCast(), "expected JsArray when deserializing arguments."); // Call() expects a `Local []`... so we populate an array. @@ -1355,7 +1207,6 @@ class JsRpcTargetBase: public rpc::JsRpcTarget::Server { InvocationResult result{ .returnValue = jsg::check(fn->Call(js.v8Context(), thisArg, arguments.size(), arguments.data())), - .streamSink = kj::mv(streamSink), }; if (!disposalGroup->empty()) { result.paramDisposalGroup = kj::mv(disposalGroup); @@ -1394,7 +1245,6 @@ class JsRpcTargetBase: public rpc::JsRpcTarget::Server { } kj::Maybe> paramDisposalGroup; - kj::Maybe streamSink; // We're going to pass all the arguments from the client to the function, but we are going to // insert `env` and `ctx`. We assume the last two arguments that the function declared are @@ -1402,8 +1252,7 @@ class JsRpcTargetBase: public rpc::JsRpcTarget::Server { kj::Maybe argsArrayFromClient; size_t argCountFromClient = 0; KJ_IF_SOME(a, args) { - auto [value, disposalGroup, ss] = deserializeJsValue(js, a); - streamSink = kj::mv(ss); + auto [value, disposalGroup] = deserializeJsValue(js, a); auto array = KJ_REQUIRE_NONNULL( value.tryCast(), "expected JsArray when deserializing arguments."); @@ -1456,7 +1305,6 @@ class JsRpcTargetBase: public rpc::JsRpcTarget::Server { .returnValue = jsg::check(fn->Call(js.v8Context(), thisArg, arguments.size(), arguments.data())), .paramDisposalGroup = kj::mv(paramDisposalGroup), - .streamSink = kj::mv(streamSink), }; }; }; @@ -1573,9 +1421,9 @@ static rpc::JsRpcTarget::Client makeJsRpcTargetForSingleLoopbackCall( template MakeCallPipeline::Result serializeJsValueWithPipeline(jsg::Lock& js, + rpc::JsValue::ExternalPusher::Client externalPusher, jsg::JsValue value, - Func makeBuilder, - RpcSerializerExternalHandler::GetStreamSinkFunc getStreamSinkFunc) { + Func makeBuilder) { auto maybeDispose = js.withinHandleScope([&]() -> kj::Maybe> { jsg::JsObject obj = KJ_UNWRAP_OR(value.tryCast(), { return kj::none; }); @@ -1599,7 +1447,7 @@ MakeCallPipeline::Result serializeJsValueWithPipeline(jsg::Lock& js, // Now that we've extracted our dispose function, we can serialize our value. RpcSerializerExternalHandler externalHandler( - RpcSerializerExternalHandler::TRANSFER, kj::mv(getStreamSinkFunc)); + RpcSerializerExternalHandler::TRANSFER, kj::mv(externalPusher)); serializeJsValue(js, value, externalHandler, kj::mv(makeBuilder)); auto stubDisposers = externalHandler.releaseStubDisposers(); @@ -2021,20 +1869,35 @@ class JsRpcSessionCustomEvent::ServerTopLevelMembrane final: public capnp::Membr public kj::Refcounted { public: explicit ServerTopLevelMembrane(kj::Own> doneFulfiller) - : doneFulfiller(kj::mv(doneFulfiller)) {} + : completionMembrane(kj::refcounted(kj::mv(doneFulfiller))) {} + ~ServerTopLevelMembrane() noexcept(false) { - KJ_IF_SOME(f, doneFulfiller) { - f->reject( + KJ_IF_SOME(cm, completionMembrane) { + cm->reject( KJ_EXCEPTION(DISCONNECTED, "JS RPC session canceled without calling an RPC method.")); } } kj::Maybe inboundCall( uint64_t interfaceId, uint16_t methodId, capnp::Capability::Client target) override { - auto f = kj::mv(JSG_REQUIRE_NONNULL( - doneFulfiller, Error, "Only one RPC method call is allowed on this object.")); - doneFulfiller = kj::none; - return capnp::membrane(kj::mv(target), kj::refcounted(kj::mv(f))); + if (interfaceId == capnp::typeId()) { + // JsRpcTarget::call() + auto cm = kj::mv(JSG_REQUIRE_NONNULL( + completionMembrane, Error, "Only one RPC method call is allowed on this object.")); + completionMembrane = kj::none; + return capnp::membrane(kj::mv(target), kj::mv(cm)); + } else if (interfaceId == capnp::typeId()) { + // ExternalPusher methods + // + // It's important that we use the same membrane that we'll use for call(), so that + // capabilities returned by the ExternalPusher will be wrapped in the membrane, hence they + // will be unwrapped when passed back through the membrane again to call(). + auto& cm = *JSG_REQUIRE_NONNULL( + completionMembrane, Error, "getExternalPusher() must be called before call()"); + return capnp::membrane(kj::mv(target), kj::addRef(cm)); + } else { + KJ_FAIL_ASSERT("unkown interface ID for JsRpcTarget"); + } } kj::Maybe outboundCall( @@ -2047,7 +1910,7 @@ class JsRpcSessionCustomEvent::ServerTopLevelMembrane final: public capnp::Membr } private: - kj::Maybe>> doneFulfiller; + kj::Maybe> completionMembrane; }; kj::Promise JsRpcSessionCustomEvent::run( diff --git a/src/workerd/api/worker-rpc.h b/src/workerd/api/worker-rpc.h index 8e5ab8398ae..185c5aacc3f 100644 --- a/src/workerd/api/worker-rpc.h +++ b/src/workerd/api/worker-rpc.h @@ -36,15 +36,12 @@ constexpr size_t MAX_JS_RPC_MESSAGE_SIZE = 1u << 25; // handle RPC specially should use this. class RpcSerializerExternalHandler final: public jsg::Serializer::ExternalHandler { public: - using GetStreamSinkFunc = kj::Function; - enum StubOwnership { TRANSFER, DUPLICATE }; - // `getStreamSinkFunc` will be called at most once, the first time a stream is encountered in - // serialization, to get the StreamSink that should be used. - RpcSerializerExternalHandler(StubOwnership stubOwnership, GetStreamSinkFunc getStreamSinkFunc) + RpcSerializerExternalHandler( + StubOwnership stubOwnership, rpc::JsValue::ExternalPusher::Client externalPusher) : stubOwnership(stubOwnership), - getStreamSinkFunc(kj::mv(getStreamSinkFunc)) {} + externalPusher(kj::mv(externalPusher)) {} inline StubOwnership getStubOwnership() { return stubOwnership; @@ -52,6 +49,11 @@ class RpcSerializerExternalHandler final: public jsg::Serializer::ExternalHandle using BuilderCallback = kj::Function; + // Returns the ExternalPusher for the remote side. + rpc::JsValue::ExternalPusher::Client getExternalPusher() { + return externalPusher; + } + // Add an external. The value is a callback which will be invoked later to fill in the // JsValue::External in the Cap'n Proto structure. The external array cannot be allocated until // the number of externals are known, which is only after all calls to `add()` have completed, @@ -60,10 +62,6 @@ class RpcSerializerExternalHandler final: public jsg::Serializer::ExternalHandle externals.add(kj::mv(callback)); } - // Like write(), but use this when there is also a stream associated with the external, i.e. - // using StreamSink. This returns a capability which will eventually resolve to the stream. - capnp::Capability::Client writeStream(BuilderCallback callback); - // Build the final list. capnp::Orphan> build(capnp::Orphanage orphanage); @@ -98,60 +96,39 @@ class RpcSerializerExternalHandler final: public jsg::Serializer::ExternalHandle private: StubOwnership stubOwnership; - GetStreamSinkFunc getStreamSinkFunc; + rpc::JsValue::ExternalPusher::Client externalPusher; kj::Vector externals; kj::Vector> stubDisposers; - - kj::Maybe streamSink; }; class RpcStubDisposalGroup; -class StreamSinkImpl; // ExternalHandler used when deserializing RPC messages. Deserialization functions with which to // handle RPC specially should use this. class RpcDeserializerExternalHandler final: public jsg::Deserializer::ExternalHandler { public: - // The `streamSink` parameter should be provided if a StreamSink already exists, e.g. when - // deserializing results. If omitted, it will be constructed on-demand. - RpcDeserializerExternalHandler(capnp::List::Reader externals, - RpcStubDisposalGroup& disposalGroup, - kj::Maybe streamSink) + RpcDeserializerExternalHandler( + capnp::List::Reader externals, RpcStubDisposalGroup& disposalGroup) : externals(externals), - disposalGroup(disposalGroup), - streamSink(streamSink) {} + disposalGroup(disposalGroup) {} ~RpcDeserializerExternalHandler() noexcept(false); // Read and return the next external. rpc::JsValue::External::Reader read(); - // Call immediately after `read()` when reading an external that is associated with a stream. - // `stream` is published back to the sender via StreamSink. - void setLastStream(capnp::Capability::Client stream); - // All stubs deserialized as part of a particular parameter or result set are placed in a // common disposal group so that they can be disposed together. RpcStubDisposalGroup& getDisposalGroup() { return disposalGroup; } - // Call after serialization is complete to get the StreamSink that should handle streams found - // while deserializing. Returns none if there were no streams. This should only be called if - // a `streamSink` was NOT passed to the constructor. - kj::Maybe getStreamSink() { - return kj::mv(streamSinkCap); - } - private: capnp::List::Reader externals; uint i = 0; kj::UnwindDetector unwindDetector; RpcStubDisposalGroup& disposalGroup; - - kj::Maybe streamSink; - kj::Maybe streamSinkCap; }; // Base class for objects which can be sent over RPC, but doing so actually sends a stub which diff --git a/src/workerd/io/BUILD.bazel b/src/workerd/io/BUILD.bazel index 02e46772056..ea453e85133 100644 --- a/src/workerd/io/BUILD.bazel +++ b/src/workerd/io/BUILD.bazel @@ -34,6 +34,7 @@ wd_cc_library( # TODO(cleanup): Fix this. srcs = [ "compatibility-date.c++", + "external-pusher.c++", "features.c++", "hibernation-manager.c++", "io-channels.c++", @@ -47,6 +48,7 @@ wd_cc_library( ] + ["//src/workerd/api:srcs"], hdrs = [ "compatibility-date.h", + "external-pusher.h", "hibernation-manager.h", "io-channels.h", "io-context.h", diff --git a/src/workerd/io/external-pusher.c++ b/src/workerd/io/external-pusher.c++ new file mode 100644 index 00000000000..382577d35a9 --- /dev/null +++ b/src/workerd/io/external-pusher.c++ @@ -0,0 +1,227 @@ +// Copyright (c) 2025 Cloudflare, Inc. +// Licensed under the Apache 2.0 license found in the LICENSE file or at: +// https://opensource.org/licenses/Apache-2.0 + +#include +#include + +namespace workerd { + +// ======================================================================================= +// ReadableStream handling + +namespace { + +// TODO(cleanup): These classes have been copied from streams/readable.c++. The copies there can be +// deleted as soon as we've switched from StreamSink to ExternalPusher and can delete all the +// StreamSink-related code. For now I'm not trying to avoid duplication. + +// HACK: We need as async pipe, like kj::newOneWayPipe(), except supporting explicit end(). So we +// wrap the two ends of the pipe in special adapters that track whether end() was called. +class ExplicitEndOutputPipeAdapter final: public capnp::ExplicitEndOutputStream { + public: + ExplicitEndOutputPipeAdapter( + kj::Own inner, kj::Own> ended) + : inner(kj::mv(inner)), + ended(kj::mv(ended)) {} + + kj::Promise write(kj::ArrayPtr buffer) override { + return KJ_REQUIRE_NONNULL(inner)->write(buffer); + } + kj::Promise write(kj::ArrayPtr> pieces) override { + return KJ_REQUIRE_NONNULL(inner)->write(pieces); + } + + kj::Maybe> tryPumpFrom( + kj::AsyncInputStream& input, uint64_t amount) override { + return KJ_REQUIRE_NONNULL(inner)->tryPumpFrom(input, amount); + } + + kj::Promise whenWriteDisconnected() override { + return KJ_REQUIRE_NONNULL(inner)->whenWriteDisconnected(); + } + + kj::Promise end() override { + // Signal to the other side that end() was actually called. + ended->getWrapped() = true; + inner = kj::none; + return kj::READY_NOW; + } + + private: + kj::Maybe> inner; + kj::Own> ended; +}; + +class ExplicitEndInputPipeAdapter final: public kj::AsyncInputStream { + public: + ExplicitEndInputPipeAdapter(kj::Own inner, + kj::Own> ended, + kj::Maybe expectedLength) + : inner(kj::mv(inner)), + ended(kj::mv(ended)), + expectedLength(expectedLength) {} + + kj::Promise tryRead(void* buffer, size_t minBytes, size_t maxBytes) override { + size_t result = co_await inner->tryRead(buffer, minBytes, maxBytes); + + KJ_IF_SOME(l, expectedLength) { + KJ_ASSERT(result <= l); + l -= result; + if (l == 0) { + // If we got all the bytes we expected, we treat this as a successful end, because the + // underlying KJ pipe is not actually going to wait for the other side to drop. This is + // consistent with the behavior of Content-Length in HTTP anyway. + ended->getWrapped() = true; + } + } + + if (result < minBytes) { + // Verify that end() was called. + if (!ended->getWrapped()) { + JSG_FAIL_REQUIRE(Error, "ReadableStream received over RPC disconnected prematurely."); + } + } + co_return result; + } + + kj::Maybe tryGetLength() override { + return inner->tryGetLength(); + } + + kj::Promise pumpTo(kj::AsyncOutputStream& output, uint64_t amount) override { + return inner->pumpTo(output, amount); + } + + private: + kj::Own inner; + kj::Own> ended; + kj::Maybe expectedLength; +}; + +} // namespace + +class ExternalPusherImpl::InputStreamImpl final: public ExternalPusher::InputStream::Server { + public: + InputStreamImpl(kj::Own stream): stream(kj::mv(stream)) {} + + kj::Maybe> stream; +}; + +kj::Promise ExternalPusherImpl::pushByteStream(PushByteStreamContext context) { + kj::Maybe expectedLength; + auto lp1 = context.getParams().getLengthPlusOne(); + if (lp1 > 0) { + expectedLength = lp1 - 1; + } + + auto pipe = kj::newOneWayPipe(expectedLength); + + auto endedFlag = kj::refcounted>(false); + + auto out = kj::heap(kj::mv(pipe.out), kj::addRef(*endedFlag)); + auto in = + kj::heap(kj::mv(pipe.in), kj::mv(endedFlag), expectedLength); + + auto results = context.initResults(capnp::MessageSize{4, 2}); + + results.setSource(inputStreamSet.add(kj::heap(kj::mv(in)))); + results.setSink(byteStreamFactory.kjToCapnp(kj::mv(out))); + return kj::READY_NOW; +} + +kj::Own ExternalPusherImpl::unwrapStream( + ExternalPusher::InputStream::Client cap) { + auto& unwrapped = KJ_REQUIRE_NONNULL( + inputStreamSet.tryGetLocalServerSync(cap), "pushed external is not a byte stream"); + + return KJ_REQUIRE_NONNULL(kj::mv(kj::downcast(unwrapped).stream), + "pushed byte stream has already been consumed"); +} + +// ======================================================================================= +// AbortSignal handling + +namespace { + +// The jsrpc handler that receives aborts from the remote and triggers them locally +// +// TODO(cleanup): This class has been copied from external-pusher.c++. The copy there can be +// deleted as soon as we've switched from StreamSink to ExternalPusher and can delete all the +// StreamSink-related code. For now I'm not trying to avoid duplication. +class AbortTriggerRpcServer final: public rpc::AbortTrigger::Server { + public: + AbortTriggerRpcServer(kj::Own> fulfiller, + kj::Own&& pendingReason) + : fulfiller(kj::mv(fulfiller)), + pendingReason(kj::mv(pendingReason)) {} + + kj::Promise abort(AbortContext abortCtx) override { + auto params = abortCtx.getParams(); + auto reason = params.getReason().getV8Serialized(); + + pendingReason->getWrapped() = kj::heapArray(reason.asBytes()); + fulfiller->fulfill(); + return kj::READY_NOW; + } + + kj::Promise release(ReleaseContext releaseCtx) override { + released = true; + return kj::READY_NOW; + } + + ~AbortTriggerRpcServer() noexcept(false) { + if (pendingReason->getWrapped() != nullptr) { + // Already triggered + return; + } + + if (!released) { + pendingReason->getWrapped() = JSG_KJ_EXCEPTION(FAILED, DOMAbortError, + "An AbortSignal received over RPC was implicitly aborted because the connection back to " + "its trigger was lost."); + } + + // Always fulfill the promise in case the AbortSignal was waiting + fulfiller->fulfill(); + } + + private: + kj::Own> fulfiller; + kj::Own pendingReason; + bool released = false; +}; + +} // namespace + +class ExternalPusherImpl::AbortSignalImpl final: public ExternalPusher::AbortSignal::Server { + public: + AbortSignalImpl(AbortSignal content): content(kj::mv(content)) {} + + kj::Maybe content; +}; + +kj::Promise ExternalPusherImpl::pushAbortSignal(PushAbortSignalContext context) { + auto paf = kj::newPromiseAndFulfiller(); + auto pendingReason = kj::refcounted(); + + auto results = context.initResults(capnp::MessageSize{4, 2}); + + results.setTrigger( + kj::heap(kj::mv(paf.fulfiller), kj::addRef(*pendingReason))); + results.setSignal(abortSignalSet.add( + kj::heap(AbortSignal{kj::mv(paf.promise), kj::mv(pendingReason)}))); + + return kj::READY_NOW; +} + +ExternalPusherImpl::AbortSignal ExternalPusherImpl::unwrapAbortSignal( + ExternalPusher::AbortSignal::Client cap) { + auto& unwrapped = KJ_REQUIRE_NONNULL( + abortSignalSet.tryGetLocalServerSync(cap), "pushed external is not an AbortSignal"); + + return KJ_REQUIRE_NONNULL(kj::mv(kj::downcast(unwrapped).content), + "pushed AbortSignal has already been consumed"); +} + +} // namespace workerd diff --git a/src/workerd/io/external-pusher.h b/src/workerd/io/external-pusher.h new file mode 100644 index 00000000000..3e12a85940b --- /dev/null +++ b/src/workerd/io/external-pusher.h @@ -0,0 +1,57 @@ +// Copyright (c) 2025 Cloudflare, Inc. +// Licensed under the Apache 2.0 license found in the LICENSE file or at: +// https://opensource.org/licenses/Apache-2.0 + +#pragma once + +#include + +#include +#include + +namespace workerd { + +using kj::byte; + +// Implements JsValue.ExternalPusher from worker-interface.capnp. +// +// ExternalPusher allows a remote peer to "push" certain kinds of objects into our address space +// so that they can then be embedded in `JsValue` as `External` values. +class ExternalPusherImpl: public rpc::JsValue::ExternalPusher::Server, public kj::Refcounted { + public: + ExternalPusherImpl(capnp::ByteStreamFactory& byteStreamFactory) + : byteStreamFactory(byteStreamFactory) {} + + using ExternalPusher = rpc::JsValue::ExternalPusher; + + kj::Own unwrapStream(ExternalPusher::InputStream::Client cap); + + // Box which holds the reason why an AbortSignal was aborted. May be either: + // - A serialized V8 value if the signal was aborted from JavaScript. + // - A KJ exception if the connection from the trigger was lost. + using PendingAbortReason = kj::RefcountedWrapper, kj::Exception>>; + + struct AbortSignal { + // Resolves when `reason` has been filled in. + kj::Promise signal; + + // The abort reason box, will be uninitialized until `signal` resolves. + kj::Own reason; + }; + + AbortSignal unwrapAbortSignal(ExternalPusher::AbortSignal::Client cap); + + kj::Promise pushByteStream(PushByteStreamContext context) override; + kj::Promise pushAbortSignal(PushAbortSignalContext context) override; + + private: + capnp::ByteStreamFactory& byteStreamFactory; + + capnp::CapabilityServerSet inputStreamSet; + capnp::CapabilityServerSet abortSignalSet; + + class InputStreamImpl; + class AbortSignalImpl; +}; + +} // namespace workerd diff --git a/src/workerd/io/io-context.c++ b/src/workerd/io/io-context.c++ index 88ea365d210..293b58f4c42 100644 --- a/src/workerd/io/io-context.c++ +++ b/src/workerd/io/io-context.c++ @@ -901,6 +901,14 @@ kj::Date IoContext::now() { return now(getCurrentIncomingRequest()); } +kj::Rc IoContext::getExternalPusher() { + KJ_IF_SOME(ep, externalPusher) { + return ep.addRef(); + } else { + return externalPusher.emplace(kj::rc(getByteStreamFactory())).addRef(); + } +} + kj::Own IoContext::getSubrequestNoChecks( kj::FunctionParam(TraceContext&, IoChannelFactory&)> func, SubrequestOptions options) { diff --git a/src/workerd/io/io-context.h b/src/workerd/io/io-context.h index 1910b5310f0..622996829fb 100644 --- a/src/workerd/io/io-context.h +++ b/src/workerd/io/io-context.h @@ -9,6 +9,7 @@ #include #include +#include #include #include #include @@ -772,6 +773,8 @@ class IoContext final: public kj::Refcounted, private kj::TaskSet::ErrorHandler return thread.getHeaderIds(); } + kj::Rc getExternalPusher(); + // Subrequest channel numbers for the two special channels. // NULL = The channel used by global fetch() when the Request has no fetcher attached. // NEXT = DEPRECATED: The fetcher attached to Requests delivered by a FetchEvent, so that we can @@ -1012,6 +1015,8 @@ class IoContext final: public kj::Refcounted, private kj::TaskSet::ErrorHandler // objects which reference `deleteQueue` in their destructors. OwnedObjectList ownedObjects; + kj::Maybe> externalPusher; + // Implementation detail of makeCachePutStream(). // TODO: Used for Cache PUT serialization. diff --git a/src/workerd/io/worker-interface.capnp b/src/workerd/io/worker-interface.capnp index ab1c8ef4da0..db4a01c8746 100644 --- a/src/workerd/io/worker-interface.capnp +++ b/src/workerd/io/worker-interface.capnp @@ -471,23 +471,27 @@ struct JsValue { } readableStream :group { - # A ReadableStream. The sender of the JsValue will use the associated StreamSink to open a - # stream of type `ByteStream`. + # A ReadableStream. + + stream @10 :ExternalPusher.InputStream; + # A stream pushed using the destination isolate's ExternalPusher. encoding @4 :StreamEncoding; # Bytes read from the stream have this encoding. - expectedLength :union { - unknown @5 :Void; - known @6 :UInt64; + obsoleteUnion :union { + # With `StreamSink`, the length was sent here. With `ExternalPusher` it's sent to + # `ExternalPusher.pushByteStream()`. + obsolete5 @5 :Void; + obsolete6 @6 :UInt64; } } - abortTrigger @7 :Void; - # Indicates that an `AbortTrigger` is being passed, see the `AbortTrigger` interface for the - # mechanism used to trigger the abort later. This is modeled as a stream, since the sender is - # the one that will later on send the abort signal. This external will have an associated - # stream in the corresponding `StreamSink` with type `AbortTrigger`. + obsolete7 @7 :Void; + # Obsolete `abortTrigger` (the `StreamSink` way of handling `AbortSignal`). + + abortSignal @11 :ExternalPusher.AbortSignal; + # Indicates that an `AbortSignal` is being passed. subrequestChannelToken @8 :Data; actorClassChannelToken @9 :Data; @@ -497,21 +501,46 @@ struct JsValue { } } - interface StreamSink { - # A JsValue may contain streams that flow from the sender to the receiver. We don't want such - # streams to require a network round trip before the stream can begin pumping. So, we need a - # place to start sending bytes right away. + interface ExternalPusher { + # This object allows "pushing" external objects to a target isolate, so that they can + # sublequently be referenced by a `JsValue.External`. This allows implementing externals where + # the sender might need to send subsequent information to the receiver *before* the receiver + # has had a chance to call back to request it. For example, when a ReadableStream is sent over + # RPC, the sender will immediately start sending body bytes without waiting for a round trip. # - # To that end, JsRpcTarget::call() returns a `paramsStreamSink`. Immediately upon sending the - # request, the client can use promise pipelining to begin pushing bytes to this object. + # The key to ExternalPusher is that it constructs and returns capabilities pointing at objects + # living directly in the target isolate's runtime. These capabilities have empty interfaces, + # but can be passed back to the target in the `External` table of a `JsValue`. Since the + # capabilities point to objects directly in the recipient's memory space, they can then be + # unwrapped to obtain the underlying local object, which the recipient then uses to back the + # external value delivered to the application. + # + # Note that externals must be pushed BEFORE the JsValue that uses them is sent, so that they + # can be unwrapped immediately when deserializing the value. + + pushByteStream @0 (lengthPlusOne :UInt64 = 0) -> (source :InputStream, sink :ByteStream); + # Creates a readable stream within the remote's memory space. `source` should be placed in a + # sublequent `External` of type `readableStream`. The caller should write bytes to `sink`. # - # Similarly, the caller passes a `resultsStreamSink` to the callee. If the response contains - # any streams, it can start pushing to this immediately after responding. + # `lengthPlusOne` is the expected length of the stream, plus 1, with zero indicating no + # expectation. This is used e.g. when the `ReadableStream` was created with `FixedLengthStream`. + # (The weird "plus one" encoding is used because Cap'n Proto doesn't have a Maybe. Perhaps we + # can fix this eventually.) - startStream @0 (externalIndex :UInt32) -> (stream :Capability); - # Opens a stream corresponding to the given index in the JsValue's `externals` array. The type - # of capability returned depends on the type of external. E.g. for `readableStream`, it is a - # `ByteStream`. + interface InputStream { + # No methods. This will be unwrapped by the recipient to obtain the underlying local value. + } + + pushAbortSignal @1 () -> (signal :AbortSignal, trigger :AbortTrigger); + + interface AbortSignal { + # No methods. This can be unwrapped by the recipient to obtain a Promise which + # rejects when the signal is aborted. + } + + # TODO(soon): + # - AbortTrigger + # - Promises } } @@ -532,7 +561,15 @@ interface AbortTrigger $Cxx.allowCancellation { # be triggered. Otherwise, the cloned signal will treat a dropped cabability as an abort. } -interface JsRpcTarget $Cxx.allowCancellation { +interface JsRpcTarget extends(JsValue.ExternalPusher) $Cxx.allowCancellation { + # Target on which RPC methods may be invoked. + # + # This is the backing capnp type for a JsRpcStub, as well as used to represent top-level RPC + # events. + # + # JsRpcTarget must implement `JsValue.ExternalPusher` to allow externals to be pushed to the + # target in advance of a call that uses them. + struct CallParams { union { methodName @0 :Text; @@ -578,8 +615,17 @@ interface JsRpcTarget $Cxx.allowCancellation { # "bar". } - resultsStreamSink @4 :JsValue.StreamSink; - # StreamSink used for ReadableStreams found in the results. + resultsStreamHandler :union { + # Historically there was a different way to handle streams, before `ExternalPusher`. Now + # we always use `ExteranlPusher`. + + obsolete4 @4 :Capability; + # Obsolete StreamSink, replaced by ExternalPusher. + + externalPusher @5 :JsValue.ExternalPusher; + # ExternalPusher object which will push into the caller's isolate. Use this to push externals + # that will be included in the results. + } } struct CallResults { @@ -603,9 +649,8 @@ interface JsRpcTarget $Cxx.allowCancellation { # `callPipeline` until the disposer is invoked. If `hasDisposer` is false, `callPipeline` can # safely be dropped immediately. - paramsStreamSink @3 :JsValue.StreamSink; - # StreamSink used for ReadableStreams found in the params. The caller begins sending bytes for - # these streams immediately using promise pipelining. + obsolete3 @3 :Capability; + # Obsolete StreamSink, replaced by ExternalPusher. } call @0 CallParams -> CallResults; diff --git a/src/workerd/util/autogate.c++ b/src/workerd/util/autogate.c++ index aac95cbe387..db39eaeb157 100644 --- a/src/workerd/util/autogate.c++ +++ b/src/workerd/util/autogate.c++ @@ -31,6 +31,8 @@ kj::StringPtr KJ_STRINGIFY(AutogateKey key) { return "fetch-request-memory-adjustment"_kj; case AutogateKey::RUST_BACKED_NODE_DNS: return "rust-backed-node-dns"_kj; + case AutogateKey::RPC_USE_EXTERNAL_PUSHER: + return "rpc-use-external-pusher"_kj; case AutogateKey::NumOfKeys: KJ_FAIL_ASSERT("NumOfKeys should not be used in getName"); } diff --git a/src/workerd/util/autogate.h b/src/workerd/util/autogate.h index 53fc4c08760..8593db2aafb 100644 --- a/src/workerd/util/autogate.h +++ b/src/workerd/util/autogate.h @@ -26,6 +26,8 @@ enum class AutogateKey { FETCH_REQUEST_MEMORY_ADJUSTMENT, // Enable Rust-backed Node.js DNS implementation RUST_BACKED_NODE_DNS, + // Use ExternalPusher instead of StreamSink to handle streams in RPC. + RPC_USE_EXTERNAL_PUSHER, NumOfKeys // Reserved for iteration. }; diff --git a/src/workerd/util/completion-membrane.h b/src/workerd/util/completion-membrane.h index fe63afcf1e8..50276935c57 100644 --- a/src/workerd/util/completion-membrane.h +++ b/src/workerd/util/completion-membrane.h @@ -30,6 +30,10 @@ class CompletionMembrane final: public capnp::MembranePolicy, public kj::Refcoun return kj::addRef(*this); } + void reject(kj::Exception&& e) { + doneFulfiller->reject(kj::mv(e)); + } + private: kj::Own> doneFulfiller; };