From ea47a28f4e04f1b6c2beab311e69ad98d83f868a Mon Sep 17 00:00:00 2001 From: Kenton Varda Date: Sat, 20 Dec 2025 11:40:36 -0600 Subject: [PATCH 1/7] Intorduce new JsValue.ExternalPusher interface to replace StreamSink. The design of `StreamSink` makes it fairly complicated and somewhat inefficient to implement. For example: * It requires the use of `setPipeline()` so that the caller can start to enable promise pipelining on the `StreamSink` before it actually returns results. * Every call must create a `resultsStreamSink` object in advance, before it knows if there will be any streams in the results. * Generally there's just a lot of contortions involved in supporting it. It occurred to me that a different design is possible: one where we have an object that is created *per-IoContext* (instead of per-call) which can be used to "push" values into that IoContext, so that they can then be referenced as externals by subsequent JsValues. This commit introduces that design, including specifying how it would work to implement `ReadableStream`. Subsequent commits will actually implement this design. Eventually, after everyone in production is updated to understand and then use the new design, we can deprecate and remove StreamSink, thus cleaning up all the mess it created. --- src/workerd/api/worker-rpc.c++ | 4 +- src/workerd/io/worker-interface.capnp | 88 ++++++++++++++++++++++++++- 2 files changed, 87 insertions(+), 5 deletions(-) diff --git a/src/workerd/api/worker-rpc.c++ b/src/workerd/api/worker-rpc.c++ index aa856d2504a..2c8fe55babe 100644 --- a/src/workerd/api/worker-rpc.c++ +++ b/src/workerd/api/worker-rpc.c++ @@ -519,7 +519,7 @@ JsRpcPromiseAndPipeline callImpl(jsg::Lock& js, // after the call completes whether or not it will return any streams. If it's unused, // though, it should only be a couple allocations. auto resultStreamSink = kj::refcounted(); - builder.setResultsStreamSink(kj::addRef(*resultStreamSink)); + builder.getResultsStreamHandler().setStreamSink(kj::addRef(*resultStreamSink)); auto callResult = builder.send(); @@ -1078,7 +1078,7 @@ class JsRpcTargetBase: public rpc::JsRpcTarget::Server { [callContext, ownCallContext = kj::mv(ownCallContext), paramDisposalGroup = kj::mv(invocationResult.paramDisposalGroup), paramsStreamSink = kj::mv(invocationResult.streamSink), - resultStreamSink = params.getResultsStreamSink(), + resultStreamSink = params.getResultsStreamHandler().getStreamSink(), callPipelineFulfiller = kj::mv(callPipelineFulfiller)]( jsg::Lock& js, jsg::Value value) mutable { jsg::JsValue resultValue(value.getHandle(js)); diff --git a/src/workerd/io/worker-interface.capnp b/src/workerd/io/worker-interface.capnp index ab1c8ef4da0..f41e4fb9aaf 100644 --- a/src/workerd/io/worker-interface.capnp +++ b/src/workerd/io/worker-interface.capnp @@ -474,10 +474,19 @@ struct JsValue { # A ReadableStream. The sender of the JsValue will use the associated StreamSink to open a # stream of type `ByteStream`. + stream @10 :ExternalPusher.InputStream; + # If present, a stream pushed using the destination isolate's ExternalPusher. + # + # If null (deprecated), then the sender will use the associated StreamSink to open a stream + # of type `ByteStream`. StreamSink is in the process of being replaced by ExternalPusher. + encoding @4 :StreamEncoding; # Bytes read from the stream have this encoding. expectedLength :union { + # NOTE: This is obsolete when `stream` is set. Instead, the length is passed to + # ExternalPusher.pushByteStream(). + unknown @5 :Void; known @6 :UInt64; } @@ -488,6 +497,12 @@ struct JsValue { # mechanism used to trigger the abort later. This is modeled as a stream, since the sender is # the one that will later on send the abort signal. This external will have an associated # stream in the corresponding `StreamSink` with type `AbortTrigger`. + # + # TODO(soon): This will be obsolete when we stop using `StreamSink`; `abortSignal` will + # replace it. (The name is wrong anyway -- this is the signal end, not the trigger end.) + + abortSignal @11 :ExternalPusher.AbortSignal; + # Indicates that an `AbortSignal` is being passed. subrequestChannelToken @8 :Data; actorClassChannelToken @9 :Data; @@ -507,12 +522,60 @@ struct JsValue { # # Similarly, the caller passes a `resultsStreamSink` to the callee. If the response contains # any streams, it can start pushing to this immediately after responding. + # + # TODO(soon): This design is overcomplicated since it requires allocating StreamSinks for every + # request, even when not used, and requires a lot of weird promise magic. The newer + # ExternalPusher design is simpler, and only incurs overhead when used. Once all of + # production has been updated to understand ExternalPusher, then we can flip an autogate to + # use it by default. Once that has rolled out globally, we can remove StreamSink. startStream @0 (externalIndex :UInt32) -> (stream :Capability); # Opens a stream corresponding to the given index in the JsValue's `externals` array. The type # of capability returned depends on the type of external. E.g. for `readableStream`, it is a # `ByteStream`. } + + interface ExternalPusher { + # This object allows "pushing" external objects to a target isolate, so that they can + # sublequently be referenced by a `JsValue.External`. This allows implementing externals where + # the sender might need to send subsequent information to the receiver *before* the receiver + # has had a chance to call back to request it. For example, when a ReadableStream is sent over + # RPC, the sender will immediately start sending body bytes without waiting for a round trip. + # + # The key to ExternalPusher is that it constructs and returns capabilities pointing at objects + # living directly in the target isolate's runtime. These capabilities have empty interfaces, + # but can be passed back to the target in the `External` table of a `JsValue`. Since the + # capabilities point to objects directly in the recipient's memory space, they can then be + # unwrapped to obtain the underlying local object, which the recipient then uses to back the + # external value delivered to the application. + # + # Note that externals must be pushed BEFORE the JsValue that uses them is sent, so that they + # can be unwrapped immediately when deserializing the value. + + pushByteStream @0 (lengthPlusOne :UInt64 = 0) -> (source :InputStream, sink :ByteStream); + # Creates a readable stream within the remote's memory space. `source` should be placed in a + # sublequent `External` of type `readableStream`. The caller should write bytes to `sink`. + # + # `lengthPlusOne` is the expected length of the stream, plus 1, with zero indicating no + # expectation. This is used e.g. when the `ReadableStream` was created with `FixedLengthStream`. + # (The weird "plus one" encoding is used because Cap'n Proto doesn't have a Maybe. Perhaps we + # can fix this eventually.) + + interface InputStream { + # No methods. This will be unwrapped by the recipient to obtain the underlying local value. + } + + pushAbortSignal @1 () -> (signal :AbortSignal, trigger :AbortTrigger); + + interface AbortSignal { + # No methods. This can be unwrapped by the recipient to obtain a Promise which + # rejects when the signal is aborted. + } + + # TODO(soon): + # - AbortTrigger + # - Promises + } } interface AbortTrigger $Cxx.allowCancellation { @@ -532,7 +595,15 @@ interface AbortTrigger $Cxx.allowCancellation { # be triggered. Otherwise, the cloned signal will treat a dropped cabability as an abort. } -interface JsRpcTarget $Cxx.allowCancellation { +interface JsRpcTarget extends(JsValue.ExternalPusher) $Cxx.allowCancellation { + # Target on which RPC methods may be invoked. + # + # This is the backing capnp type for a JsRpcStub, as well as used to represent top-level RPC + # events. + # + # JsRpcTarget must implement `JsValue.ExternalPusher` to allow externals to be pushed to the + # target in advance of a call that uses them. + struct CallParams { union { methodName @0 :Text; @@ -578,8 +649,19 @@ interface JsRpcTarget $Cxx.allowCancellation { # "bar". } - resultsStreamSink @4 :JsValue.StreamSink; - # StreamSink used for ReadableStreams found in the results. + resultsStreamHandler :union { + # We're in the process of switching from `StreamSink` to `ExternalPusher`. A caller will only + # offer one or the other, and expect the callee to use that. (Initially, callers will still + # send StreamSink for backwards-compatibility, but once all recipients are able to understand + # ExternalPusher, we'll flip an autogate to make callers send it.) + + streamSink @4 :JsValue.StreamSink; + # StreamSink used for ReadableStreams found in the results. + + externalPusher @5 :JsValue.ExternalPusher; + # ExternalPusher object which will push into the caller's isolate. Use this to push externals + # that will be included in the results. + } } struct CallResults { From 50dd74aa3cc3d1e59efebdbc642322d90e2f8f98 Mon Sep 17 00:00:00 2001 From: Kenton Varda Date: Mon, 22 Dec 2025 12:40:56 -0600 Subject: [PATCH 2/7] Update Cap'n Proto to get CapabilityServerSet::tryGetLocalServerSync(). capnp PR: https://github.com/capnproto/capnproto/pull/2475 The comment in this file says not to hand-edit it, but I don't understand how to use update-deps.py to update to a not-yet-merged PR... --- build/deps/gen/deps/dep_capnp_cpp.MODULE.bazel | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/build/deps/gen/deps/dep_capnp_cpp.MODULE.bazel b/build/deps/gen/deps/dep_capnp_cpp.MODULE.bazel index 6d1cb726fbf..2163b753ae8 100644 --- a/build/deps/gen/deps/dep_capnp_cpp.MODULE.bazel +++ b/build/deps/gen/deps/dep_capnp_cpp.MODULE.bazel @@ -3,9 +3,9 @@ http = use_extension("@//:build/exts/http.bzl", "http") http.archive( name = "capnp-cpp", - sha256 = "0884582731fb8b9a6ef7a0d58056535d0480533256958d3eaae189d706bc43aa", - strip_prefix = "capnproto-capnproto-0776402/c++", + sha256 = "cbf7dcef02deb3a3addcfefacb76672c46a48c953024860bf80fceabc255d41d", + strip_prefix = "capnproto-capnproto-c1bce20/c++", type = "tgz", - url = "https://github.com/capnproto/capnproto/tarball/07764022ba75c6924a250d5be0bf2e83250602a0", + url = "https://github.com/capnproto/capnproto/tarball/c1bce2095a8dd76851fe3c1c61550f79b69d671d", ) use_repo(http, "capnp-cpp") From 42186b77c184bdd404c650a56eee15877d220da3 Mon Sep 17 00:00:00 2001 From: Kenton Varda Date: Sun, 21 Dec 2025 15:50:10 -0600 Subject: [PATCH 3/7] Wire up ExternalPusher in workers-rpc.c++. This doesn't yet support any actual pushed types, this is just the infrastructure needed to make it available. This introduces an autogate which opts into using ExternalPusher for calls. The caller decides (based on the autogate) whether to use ExternalPusher for the whole call. We can turn on the autogate once all call receivers in production understand the new protocol. --- src/workerd/api/worker-rpc.c++ | 177 +++++++++++++++++++------ src/workerd/api/worker-rpc.h | 17 ++- src/workerd/io/BUILD.bazel | 2 + src/workerd/io/external-pusher.c++ | 12 ++ src/workerd/io/external-pusher.h | 25 ++++ src/workerd/io/io-context.c++ | 8 ++ src/workerd/io/io-context.h | 5 + src/workerd/util/autogate.c++ | 2 + src/workerd/util/autogate.h | 2 + src/workerd/util/completion-membrane.h | 4 + 10 files changed, 212 insertions(+), 42 deletions(-) create mode 100644 src/workerd/io/external-pusher.c++ create mode 100644 src/workerd/io/external-pusher.h diff --git a/src/workerd/api/worker-rpc.c++ b/src/workerd/api/worker-rpc.c++ index 2c8fe55babe..1b82db92909 100644 --- a/src/workerd/api/worker-rpc.c++ +++ b/src/workerd/api/worker-rpc.c++ @@ -100,13 +100,27 @@ class StreamSinkImpl final: public rpc::JsValue::StreamSink::Server, public kj:: kj::Vector table; }; +kj::Maybe RpcSerializerExternalHandler::getExternalPusher() { + KJ_IF_SOME(ep, externalPusher) { + return ep; + } else KJ_IF_SOME(func, getStreamHandlerFunc.tryGet()) { + // First call, set up ExternalPusher. + return externalPusher.emplace(func()); + } else { + // Using StreamSink. + return kj::none; + } +} + capnp::Capability::Client RpcSerializerExternalHandler::writeStream(BuilderCallback callback) { rpc::JsValue::StreamSink::Client* streamSinkPtr; KJ_IF_SOME(ss, streamSink) { streamSinkPtr = &ss; } else { // First stream written, set up the StreamSink. - streamSinkPtr = &streamSink.emplace(getStreamSinkFunc()); + auto& func = KJ_REQUIRE_NONNULL(getStreamHandlerFunc.tryGet(), + "this serialization is not using StreamSink; use getExternalPusher() instead"); + streamSinkPtr = &streamSink.emplace(func()); } auto result = ({ @@ -233,9 +247,15 @@ DeserializeResult deserializeJsValue( // Does deserializeJsValue() and then adds a `dispose()` method to the returned object (if it is // an object) which disposes all stubs therein. -jsg::JsValue deserializeRpcReturnValue( - jsg::Lock& js, rpc::JsRpcTarget::CallResults::Reader callResults, StreamSinkImpl& streamSink) { - auto [value, disposalGroup, _] = deserializeJsValue(js, callResults.getResult(), streamSink); +jsg::JsValue deserializeRpcReturnValue(jsg::Lock& js, + rpc::JsRpcTarget::CallResults::Reader callResults, + kj::Maybe streamSink) { + auto [value, disposalGroup, ss] = deserializeJsValue(js, callResults.getResult(), streamSink); + + if (streamSink == kj::none) { + KJ_REQUIRE(ss == kj::none, + "RPC returned result using StreamSink even though ExternalPusher was provided"); + } // If the object had a disposer on the callee side, it will run when we discard the callPipeline, // so attach that to the disposal group on the caller side. If the returned object did NOT have @@ -480,6 +500,9 @@ JsRpcPromiseAndPipeline callImpl(jsg::Lock& js, kj::Maybe paramsStreamSinkFulfiller; + bool useExternalPusher = + util::Autogate::isEnabled(util::AutogateKey::RPC_USE_EXTERNAL_PUSHER); + KJ_IF_SOME(args, maybeArgs) { // If we have arguments, serialize them. // Note that we may fail to serialize some element, in which case this will throw back to @@ -496,15 +519,22 @@ JsRpcPromiseAndPipeline callImpl(jsg::Lock& js, ? RpcSerializerExternalHandler::DUPLICATE : RpcSerializerExternalHandler::TRANSFER; - RpcSerializerExternalHandler externalHandler( - stubOwnership, [&]() -> rpc::JsValue::StreamSink::Client { - // A stream was encountered in the params, so we must expect the response to contain - // paramsStreamSink. But we don't have the response yet. So, we need to set up a - // temporary promise client, which we hook to the response a little bit later. - auto paf = kj::newPromiseAndFulfiller(); - paramsStreamSinkFulfiller = kj::mv(paf.fulfiller); - return kj::mv(paf.promise); - }); + RpcSerializerExternalHandler::GetStreamHandlerFunc getStreamHandlerFunc; + if (useExternalPusher) { + getStreamHandlerFunc.init( + [&]() -> rpc::JsValue::ExternalPusher::Client { return client; }); + } else { + getStreamHandlerFunc.init([&]() { + // A stream was encountered in the params, so we must expect the response to contain + // paramsStreamSink. But we don't have the response yet. So, we need to set up a + // temporary promise client, which we hook to the response a little bit later. + auto paf = kj::newPromiseAndFulfiller(); + paramsStreamSinkFulfiller = kj::mv(paf.fulfiller); + return kj::mv(paf.promise); + }); + } + + RpcSerializerExternalHandler externalHandler(stubOwnership, kj::mv(getStreamHandlerFunc)); serializeJsValue(js, jsg::JsValue(arr), externalHandler, [&](capnp::MessageSize hint) { // TODO(perf): Actually use the size hint. return builder.getOperation().initCallWithArgs(); @@ -515,11 +545,20 @@ JsRpcPromiseAndPipeline callImpl(jsg::Lock& js, builder.getOperation().setGetProperty(); } - // Unfortunately, we always have to send a `resultsStreamSink` because we don't know until - // after the call completes whether or not it will return any streams. If it's unused, - // though, it should only be a couple allocations. - auto resultStreamSink = kj::refcounted(); - builder.getResultsStreamHandler().setStreamSink(kj::addRef(*resultStreamSink)); + kj::Maybe> resultStreamSink; + if (useExternalPusher) { + // Unfortunately, we always have to send the ExternalPusher since we don't know whether the + // call will return any streams (or other pushed externals). Luckily, it's a + // one-per-IoContext object, not a big deal. (It'll take a slot on the capnp export table + // though.) + builder.getResultsStreamHandler().setExternalPusher(ioContext.getExternalPusher()); + } else { + // Unfortunately, we always have to send a `resultsStreamSink` because we don't know until + // after the call completes whether or not it will return any streams. If it's unused, + // though, it should only be a couple allocations. + builder.getResultsStreamHandler().setStreamSink( + kj::addRef(*resultStreamSink.emplace(kj::refcounted()))); + } auto callResult = builder.send(); @@ -533,13 +572,24 @@ JsRpcPromiseAndPipeline callImpl(jsg::Lock& js, // here, which is filled in later on to point at the JsRpcPromise, if and when one is created. auto weakRef = kj::atomicRefcounted(); + // HACK: Make sure that any calls to the ExternalPusher get to us before we try to + // deserialize the result. A weird quirk of Cap'n Proto is that return values arrive faster + // than calls by 1 turn of the event loop, so if we just insert a turn here we should be OK. + // + // Note that key to this working is the fact that the continuation returns a Promise, even + // though it is initialized with an immediate value. This forces the extra turn. + auto promise = callResult.then( + [](auto resp) -> kj::Promise> { + return kj::mv(resp); + }); + // RemotePromise lets us consume its pipeline and promise portions independently; we consume // the promise here and we consume the pipeline below, both via kj::mv(). - auto jsPromise = ioContext.awaitIo(js, kj::mv(callResult), + auto jsPromise = ioContext.awaitIo(js, kj::mv(promise), [weakRef = kj::atomicAddRef(*weakRef), resultStreamSink = kj::mv(resultStreamSink)]( jsg::Lock& js, capnp::Response response) mutable -> jsg::Value { - auto jsResult = deserializeRpcReturnValue(js, response, *resultStreamSink); + auto jsResult = deserializeRpcReturnValue(js, response, resultStreamSink); if (weakRef->disposed) { // The promise was explicitly disposed before it even resolved. This means we must dispose @@ -914,7 +964,7 @@ template MakeCallPipeline::Result serializeJsValueWithPipeline(jsg::Lock& js, jsg::JsValue value, Func makeBuilder, - RpcSerializerExternalHandler::GetStreamSinkFunc getStreamSinkFunc); + RpcSerializerExternalHandler::GetStreamHandlerFunc getStreamSinkFunc); // Callee-side implementation of JsRpcTarget. // @@ -933,7 +983,8 @@ class JsRpcTargetBase: public rpc::JsRpcTarget::Server { : enterIsolateAndCall(ctx.makeReentryCallback( [this, &ctx](Worker::Lock& lock, CallContext callContext) { return callImpl(lock, ctx, callContext); - })) {} + })), + externalPusher(ctx.getExternalPusher()) {} // Constructor use by EntrypointJsRpcTarget, which is revoked and destroyed before the IoContext // can possibly be canceled. It can just use ctx.run(). @@ -944,7 +995,8 @@ class JsRpcTargetBase: public rpc::JsRpcTarget::Server { return ctx.run([this, &ctx, callContext](Worker::Lock& lock) mutable { return callImpl(lock, ctx, callContext); }); - }) {} + }), + externalPusher(ctx.getExternalPusher()) {} struct EnvCtx { v8::Local env; @@ -968,8 +1020,10 @@ class JsRpcTargetBase: public rpc::JsRpcTarget::Server { // Handles the delivery of JS RPC method calls. kj::Promise call(CallContext callContext) override { + co_await kj::yield(); + // Try to execute the requested method. - return enterIsolateAndCall(callContext).catch_([](kj::Exception&& e) { + co_return co_await enterIsolateAndCall(callContext).catch_([](kj::Exception&& e) { if (jsg::isTunneledException(e.getDescription())) { // Annotate exceptions in RPC worker calls as remote exceptions. auto description = jsg::stripRemoteExceptionPrefix(e.getDescription()); @@ -983,6 +1037,18 @@ class JsRpcTargetBase: public rpc::JsRpcTarget::Server { }); } + // Implements ExternalPusher by forwarding to the shared implementation. + // + // Note JsRpcTarget has to implement `ExternalPusher` directly rather than providing a method + // like `getExternalPusher()` because it's important that the pushes arrive before the call, and + // the ordering can only be guaranteed if they're on the same object. + kj::Promise pushByteStream(PushByteStreamContext context) override { + return externalPusher->pushByteStream(context); + } + kj::Promise pushAbortSignal(PushAbortSignalContext context) override { + return externalPusher->pushAbortSignal(context); + } + KJ_DISALLOW_COPY_AND_MOVE(JsRpcTargetBase); private: @@ -992,6 +1058,8 @@ class JsRpcTargetBase: public rpc::JsRpcTarget::Server { // using IoContext::makeReentryCallback(). kj::Function(CallContext callContext)> enterIsolateAndCall; + kj::Rc externalPusher; + // Returns true if the given name cannot be used as a method on this type. virtual bool isReservedName(kj::StringPtr name) = 0; @@ -1036,6 +1104,19 @@ class JsRpcTargetBase: public rpc::JsRpcTarget::Server { // Given a handle for the result, if it's a promise, await the promise, then serialize the // final result for return. + RpcSerializerExternalHandler::GetStreamHandlerFunc getResultsStreamHandlerFunc; + auto resultStreamHandler = params.getResultsStreamHandler(); + switch (resultStreamHandler.which()) { + case rpc::JsRpcTarget::CallParams::ResultsStreamHandler::EXTERNAL_PUSHER: + getResultsStreamHandlerFunc.init( + [cap = resultStreamHandler.getExternalPusher()]() mutable { return kj::mv(cap); }); + break; + case rpc::JsRpcTarget::CallParams::ResultsStreamHandler::STREAM_SINK: + getResultsStreamHandlerFunc.init( + [cap = resultStreamHandler.getStreamSink()]() mutable { return kj::mv(cap); }); + break; + } + kj::Maybe>> callPipelineFulfiller; // We need another ref to this fulfiller for the error callback. It can rely on being @@ -1047,6 +1128,12 @@ class JsRpcTargetBase: public rpc::JsRpcTarget::Server { // immediately. Annoyingly, that also means we need to hook up a pipeline for // callPipeline, which we don't actually have yet, so we need to promise-ify it. + // If the caller requested using ExternalPusher for the results, then it should also use + // ExternalPusher for the params. (Theoretically we could support mix-and-match but... + // let's keep it simple.) + KJ_REQUIRE(resultStreamHandler.isStreamSink(), + "RPC params used StreamSink when result is supposed to use ExternalPusher"); + auto paf = kj::newPromiseAndFulfiller(); callPipelineFulfillerRef = *paf.fulfiller; callPipelineFulfiller = kj::mv(paf.fulfiller); @@ -1078,7 +1165,7 @@ class JsRpcTargetBase: public rpc::JsRpcTarget::Server { [callContext, ownCallContext = kj::mv(ownCallContext), paramDisposalGroup = kj::mv(invocationResult.paramDisposalGroup), paramsStreamSink = kj::mv(invocationResult.streamSink), - resultStreamSink = params.getResultsStreamHandler().getStreamSink(), + getResultsStreamHandlerFunc = kj::mv(getResultsStreamHandlerFunc), callPipelineFulfiller = kj::mv(callPipelineFulfiller)]( jsg::Lock& js, jsg::Value value) mutable { jsg::JsValue resultValue(value.getHandle(js)); @@ -1090,10 +1177,7 @@ class JsRpcTargetBase: public rpc::JsRpcTarget::Server { hint.capCount += 1; // for callPipeline results = callContext.initResults(hint); return results.initResult(); - }, [&]() -> rpc::JsValue::StreamSink::Client { - // The results contain streams. We return the resultsStreamSink passed in the request. - return kj::mv(resultStreamSink); - }); + }, kj::mv(getResultsStreamHandlerFunc)); KJ_SWITCH_ONEOF(maybePipeline) { KJ_CASE_ONEOF(obj, MakeCallPipeline::Object) { @@ -1575,7 +1659,7 @@ template MakeCallPipeline::Result serializeJsValueWithPipeline(jsg::Lock& js, jsg::JsValue value, Func makeBuilder, - RpcSerializerExternalHandler::GetStreamSinkFunc getStreamSinkFunc) { + RpcSerializerExternalHandler::GetStreamHandlerFunc getStreamHandlerFunc) { auto maybeDispose = js.withinHandleScope([&]() -> kj::Maybe> { jsg::JsObject obj = KJ_UNWRAP_OR(value.tryCast(), { return kj::none; }); @@ -1599,7 +1683,7 @@ MakeCallPipeline::Result serializeJsValueWithPipeline(jsg::Lock& js, // Now that we've extracted our dispose function, we can serialize our value. RpcSerializerExternalHandler externalHandler( - RpcSerializerExternalHandler::TRANSFER, kj::mv(getStreamSinkFunc)); + RpcSerializerExternalHandler::TRANSFER, kj::mv(getStreamHandlerFunc)); serializeJsValue(js, value, externalHandler, kj::mv(makeBuilder)); auto stubDisposers = externalHandler.releaseStubDisposers(); @@ -2021,20 +2105,35 @@ class JsRpcSessionCustomEvent::ServerTopLevelMembrane final: public capnp::Membr public kj::Refcounted { public: explicit ServerTopLevelMembrane(kj::Own> doneFulfiller) - : doneFulfiller(kj::mv(doneFulfiller)) {} + : completionMembrane(kj::refcounted(kj::mv(doneFulfiller))) {} + ~ServerTopLevelMembrane() noexcept(false) { - KJ_IF_SOME(f, doneFulfiller) { - f->reject( + KJ_IF_SOME(cm, completionMembrane) { + cm->reject( KJ_EXCEPTION(DISCONNECTED, "JS RPC session canceled without calling an RPC method.")); } } kj::Maybe inboundCall( uint64_t interfaceId, uint16_t methodId, capnp::Capability::Client target) override { - auto f = kj::mv(JSG_REQUIRE_NONNULL( - doneFulfiller, Error, "Only one RPC method call is allowed on this object.")); - doneFulfiller = kj::none; - return capnp::membrane(kj::mv(target), kj::refcounted(kj::mv(f))); + if (interfaceId == capnp::typeId()) { + // JsRpcTarget::call() + auto cm = kj::mv(JSG_REQUIRE_NONNULL( + completionMembrane, Error, "Only one RPC method call is allowed on this object.")); + completionMembrane = kj::none; + return capnp::membrane(kj::mv(target), kj::mv(cm)); + } else if (interfaceId == capnp::typeId()) { + // ExternalPusher methods + // + // It's important that we use the same membrane that we'll use for call(), so that + // capabilities returned by the ExternalPusher will be wrapped in the membrane, hence they + // will be unwrapped when passed back through the membrane again to call(). + auto& cm = *JSG_REQUIRE_NONNULL( + completionMembrane, Error, "getExternalPusher() must be called before call()"); + return capnp::membrane(kj::mv(target), kj::addRef(cm)); + } else { + KJ_FAIL_ASSERT("unkown interface ID for JsRpcTarget"); + } } kj::Maybe outboundCall( @@ -2047,7 +2146,7 @@ class JsRpcSessionCustomEvent::ServerTopLevelMembrane final: public capnp::Membr } private: - kj::Maybe>> doneFulfiller; + kj::Maybe> completionMembrane; }; kj::Promise JsRpcSessionCustomEvent::run( diff --git a/src/workerd/api/worker-rpc.h b/src/workerd/api/worker-rpc.h index 8e5ab8398ae..a1d42818b2e 100644 --- a/src/workerd/api/worker-rpc.h +++ b/src/workerd/api/worker-rpc.h @@ -37,14 +37,17 @@ constexpr size_t MAX_JS_RPC_MESSAGE_SIZE = 1u << 25; class RpcSerializerExternalHandler final: public jsg::Serializer::ExternalHandler { public: using GetStreamSinkFunc = kj::Function; + using GetExternalPusherFunc = kj::Function; + using GetStreamHandlerFunc = kj::OneOf; enum StubOwnership { TRANSFER, DUPLICATE }; // `getStreamSinkFunc` will be called at most once, the first time a stream is encountered in // serialization, to get the StreamSink that should be used. - RpcSerializerExternalHandler(StubOwnership stubOwnership, GetStreamSinkFunc getStreamSinkFunc) + RpcSerializerExternalHandler( + StubOwnership stubOwnership, GetStreamHandlerFunc getStreamHandlerFunc) : stubOwnership(stubOwnership), - getStreamSinkFunc(kj::mv(getStreamSinkFunc)) {} + getStreamHandlerFunc(kj::mv(getStreamHandlerFunc)) {} inline StubOwnership getStubOwnership() { return stubOwnership; @@ -52,6 +55,10 @@ class RpcSerializerExternalHandler final: public jsg::Serializer::ExternalHandle using BuilderCallback = kj::Function; + // Returns the ExternalPusher for the remote side. Returns kj::none if this serialization is + // using the older StreamSink approach, in which case you need to call `writeStream()` instead. + kj::Maybe getExternalPusher(); + // Add an external. The value is a callback which will be invoked later to fill in the // JsValue::External in the Cap'n Proto structure. The external array cannot be allocated until // the number of externals are known, which is only after all calls to `add()` have completed, @@ -62,6 +69,9 @@ class RpcSerializerExternalHandler final: public jsg::Serializer::ExternalHandle // Like write(), but use this when there is also a stream associated with the external, i.e. // using StreamSink. This returns a capability which will eventually resolve to the stream. + // + // StreamSink is being replaced by ExternalPusher. You should only call writeStream() if + // getExternalPusher() returns kj::none. If ExternalPusher is available, this method will throw. capnp::Capability::Client writeStream(BuilderCallback callback); // Build the final list. @@ -98,12 +108,13 @@ class RpcSerializerExternalHandler final: public jsg::Serializer::ExternalHandle private: StubOwnership stubOwnership; - GetStreamSinkFunc getStreamSinkFunc; + GetStreamHandlerFunc getStreamHandlerFunc; kj::Vector externals; kj::Vector> stubDisposers; kj::Maybe streamSink; + kj::Maybe externalPusher; }; class RpcStubDisposalGroup; diff --git a/src/workerd/io/BUILD.bazel b/src/workerd/io/BUILD.bazel index 02e46772056..ea453e85133 100644 --- a/src/workerd/io/BUILD.bazel +++ b/src/workerd/io/BUILD.bazel @@ -34,6 +34,7 @@ wd_cc_library( # TODO(cleanup): Fix this. srcs = [ "compatibility-date.c++", + "external-pusher.c++", "features.c++", "hibernation-manager.c++", "io-channels.c++", @@ -47,6 +48,7 @@ wd_cc_library( ] + ["//src/workerd/api:srcs"], hdrs = [ "compatibility-date.h", + "external-pusher.h", "hibernation-manager.h", "io-channels.h", "io-context.h", diff --git a/src/workerd/io/external-pusher.c++ b/src/workerd/io/external-pusher.c++ new file mode 100644 index 00000000000..81d89cb191b --- /dev/null +++ b/src/workerd/io/external-pusher.c++ @@ -0,0 +1,12 @@ +// Copyright (c) 2025 Cloudflare, Inc. +// Licensed under the Apache 2.0 license found in the LICENSE file or at: +// https://opensource.org/licenses/Apache-2.0 + +#include +#include + +namespace workerd { + +// TODO(now): implement + +} // namespace workerd diff --git a/src/workerd/io/external-pusher.h b/src/workerd/io/external-pusher.h new file mode 100644 index 00000000000..c20a257a755 --- /dev/null +++ b/src/workerd/io/external-pusher.h @@ -0,0 +1,25 @@ +// Copyright (c) 2025 Cloudflare, Inc. +// Licensed under the Apache 2.0 license found in the LICENSE file or at: +// https://opensource.org/licenses/Apache-2.0 + +#pragma once + +#include + +#include +#include + +namespace workerd { + +// Implements JsValue.ExternalPusher from worker-interface.capnp. +// +// ExternalPusher allows a remote peer to "push" certain kinds of objects into our address space +// so that they can then be embedded in `JsValue` as `External` values. +class ExternalPusherImpl: public rpc::JsValue::ExternalPusher::Server, public kj::Refcounted { + public: + ExternalPusherImpl(capnp::ByteStreamFactory& byteStreamFactory) {} + + // TODO(now): Implement methods. +}; + +} // namespace workerd diff --git a/src/workerd/io/io-context.c++ b/src/workerd/io/io-context.c++ index 88ea365d210..293b58f4c42 100644 --- a/src/workerd/io/io-context.c++ +++ b/src/workerd/io/io-context.c++ @@ -901,6 +901,14 @@ kj::Date IoContext::now() { return now(getCurrentIncomingRequest()); } +kj::Rc IoContext::getExternalPusher() { + KJ_IF_SOME(ep, externalPusher) { + return ep.addRef(); + } else { + return externalPusher.emplace(kj::rc(getByteStreamFactory())).addRef(); + } +} + kj::Own IoContext::getSubrequestNoChecks( kj::FunctionParam(TraceContext&, IoChannelFactory&)> func, SubrequestOptions options) { diff --git a/src/workerd/io/io-context.h b/src/workerd/io/io-context.h index 1910b5310f0..622996829fb 100644 --- a/src/workerd/io/io-context.h +++ b/src/workerd/io/io-context.h @@ -9,6 +9,7 @@ #include #include +#include #include #include #include @@ -772,6 +773,8 @@ class IoContext final: public kj::Refcounted, private kj::TaskSet::ErrorHandler return thread.getHeaderIds(); } + kj::Rc getExternalPusher(); + // Subrequest channel numbers for the two special channels. // NULL = The channel used by global fetch() when the Request has no fetcher attached. // NEXT = DEPRECATED: The fetcher attached to Requests delivered by a FetchEvent, so that we can @@ -1012,6 +1015,8 @@ class IoContext final: public kj::Refcounted, private kj::TaskSet::ErrorHandler // objects which reference `deleteQueue` in their destructors. OwnedObjectList ownedObjects; + kj::Maybe> externalPusher; + // Implementation detail of makeCachePutStream(). // TODO: Used for Cache PUT serialization. diff --git a/src/workerd/util/autogate.c++ b/src/workerd/util/autogate.c++ index aac95cbe387..db39eaeb157 100644 --- a/src/workerd/util/autogate.c++ +++ b/src/workerd/util/autogate.c++ @@ -31,6 +31,8 @@ kj::StringPtr KJ_STRINGIFY(AutogateKey key) { return "fetch-request-memory-adjustment"_kj; case AutogateKey::RUST_BACKED_NODE_DNS: return "rust-backed-node-dns"_kj; + case AutogateKey::RPC_USE_EXTERNAL_PUSHER: + return "rpc-use-external-pusher"_kj; case AutogateKey::NumOfKeys: KJ_FAIL_ASSERT("NumOfKeys should not be used in getName"); } diff --git a/src/workerd/util/autogate.h b/src/workerd/util/autogate.h index 53fc4c08760..8593db2aafb 100644 --- a/src/workerd/util/autogate.h +++ b/src/workerd/util/autogate.h @@ -26,6 +26,8 @@ enum class AutogateKey { FETCH_REQUEST_MEMORY_ADJUSTMENT, // Enable Rust-backed Node.js DNS implementation RUST_BACKED_NODE_DNS, + // Use ExternalPusher instead of StreamSink to handle streams in RPC. + RPC_USE_EXTERNAL_PUSHER, NumOfKeys // Reserved for iteration. }; diff --git a/src/workerd/util/completion-membrane.h b/src/workerd/util/completion-membrane.h index fe63afcf1e8..50276935c57 100644 --- a/src/workerd/util/completion-membrane.h +++ b/src/workerd/util/completion-membrane.h @@ -30,6 +30,10 @@ class CompletionMembrane final: public capnp::MembranePolicy, public kj::Refcoun return kj::addRef(*this); } + void reject(kj::Exception&& e) { + doneFulfiller->reject(kj::mv(e)); + } + private: kj::Own> doneFulfiller; }; From ffb11e0c946bcbb61fb46d3b0a414f74e218720b Mon Sep 17 00:00:00 2001 From: Kenton Varda Date: Sun, 21 Dec 2025 16:02:47 -0600 Subject: [PATCH 4/7] Implement ReadableStream serialization via ExternalPusher. --- src/workerd/api/streams/readable.c++ | 67 ++++++++++---- src/workerd/io/external-pusher.c++ | 132 ++++++++++++++++++++++++++- src/workerd/io/external-pusher.h | 18 +++- 3 files changed, 194 insertions(+), 23 deletions(-) diff --git a/src/workerd/api/streams/readable.c++ b/src/workerd/api/streams/readable.c++ index 86237b85cfd..65cc0489061 100644 --- a/src/workerd/api/streams/readable.c++ +++ b/src/workerd/api/streams/readable.c++ @@ -519,6 +519,10 @@ jsg::Optional ByteLengthQueuingStrategy::size( namespace { +// TODO(cleanup): These classes have been copied to external-pusher.c++. The copies here can be +// deleted as soon as we've switched from StreamSink to ExternalPusher and can delete all the +// StreamSink-related code. For now I'm not trying to avoid duplication. + // HACK: We need as async pipe, like kj::newOneWayPipe(), except supporting explicit end(). So we // wrap the two ends of the pipe in special adapters that track whether end() was called. class ExplicitEndOutputPipeAdapter final: public capnp::ExplicitEndOutputStream { @@ -676,18 +680,37 @@ void ReadableStream::serialize(jsg::Lock& js, jsg::Serializer& serializer) { StreamEncoding encoding = controller.getPreferredEncoding(); auto expectedLength = controller.tryGetLength(encoding); - auto streamCap = externalHandler->writeStream( - [encoding, expectedLength](rpc::JsValue::External::Builder builder) mutable { - auto rs = builder.initReadableStream(); - rs.setEncoding(encoding); - KJ_IF_SOME(l, expectedLength) { - rs.getExpectedLength().setKnown(l); + capnp::ByteStream::Client streamCap = [&]() { + KJ_IF_SOME(pusher, externalHandler->getExternalPusher()) { + auto req = pusher.pushByteStreamRequest(capnp::MessageSize{2, 0}); + KJ_IF_SOME(el, expectedLength) { + req.setLengthPlusOne(el + 1); + } + auto pipeline = req.sendForPipeline(); + + externalHandler->write([encoding, expectedLength, source = pipeline.getSource()]( + rpc::JsValue::External::Builder builder) mutable { + auto rs = builder.initReadableStream(); + rs.setStream(kj::mv(source)); + rs.setEncoding(encoding); + }); + + return pipeline.getSink(); + } else { + return externalHandler + ->writeStream( + [encoding, expectedLength](rpc::JsValue::External::Builder builder) mutable { + auto rs = builder.initReadableStream(); + rs.setEncoding(encoding); + KJ_IF_SOME(l, expectedLength) { + rs.getExpectedLength().setKnown(l); + } + }).castAs(); } - }); + }(); kj::Own kjStream = - ioctx.getByteStreamFactory().capnpToKjExplicitEnd( - kj::mv(streamCap).castAs()); + ioctx.getByteStreamFactory().capnpToKjExplicitEnd(kj::mv(streamCap)); auto sink = newSystemStream(kj::mv(kjStream), encoding, ioctx); @@ -718,21 +741,25 @@ jsg::Ref ReadableStream::deserialize( auto& ioctx = IoContext::current(); - kj::Maybe expectedLength; - auto el = rs.getExpectedLength(); - if (el.isKnown()) { - expectedLength = el.getKnown(); - } + kj::Own in; + if (rs.hasStream()) { + in = ioctx.getExternalPusher()->unwrapStream(rs.getStream()); + } else { + kj::Maybe expectedLength; + auto el = rs.getExpectedLength(); + if (el.isKnown()) { + expectedLength = el.getKnown(); + } - auto pipe = kj::newOneWayPipe(expectedLength); + auto pipe = kj::newOneWayPipe(expectedLength); - auto endedFlag = kj::refcounted>(false); + auto endedFlag = kj::refcounted>(false); - auto out = kj::heap(kj::mv(pipe.out), kj::addRef(*endedFlag)); - auto in = - kj::heap(kj::mv(pipe.in), kj::mv(endedFlag), expectedLength); + auto out = kj::heap(kj::mv(pipe.out), kj::addRef(*endedFlag)); + in = kj::heap(kj::mv(pipe.in), kj::mv(endedFlag), expectedLength); - externalHandler->setLastStream(ioctx.getByteStreamFactory().kjToCapnp(kj::mv(out))); + externalHandler->setLastStream(ioctx.getByteStreamFactory().kjToCapnp(kj::mv(out))); + } return js.alloc(ioctx, kj::heap(newSystemStream(kj::mv(in), encoding, ioctx), ioctx)); diff --git a/src/workerd/io/external-pusher.c++ b/src/workerd/io/external-pusher.c++ index 81d89cb191b..c53ec2eefb9 100644 --- a/src/workerd/io/external-pusher.c++ +++ b/src/workerd/io/external-pusher.c++ @@ -7,6 +7,136 @@ namespace workerd { -// TODO(now): implement +// ======================================================================================= +// ReadableStream handling + +namespace { + +// TODO(cleanup): These classes have been copied from streams/readable.c++. The copies there can be +// deleted as soon as we've switched from StreamSink to ExternalPusher and can delete all the +// StreamSink-related code. For now I'm not trying to avoid duplication. + +// HACK: We need as async pipe, like kj::newOneWayPipe(), except supporting explicit end(). So we +// wrap the two ends of the pipe in special adapters that track whether end() was called. +class ExplicitEndOutputPipeAdapter final: public capnp::ExplicitEndOutputStream { + public: + ExplicitEndOutputPipeAdapter( + kj::Own inner, kj::Own> ended) + : inner(kj::mv(inner)), + ended(kj::mv(ended)) {} + + kj::Promise write(kj::ArrayPtr buffer) override { + return KJ_REQUIRE_NONNULL(inner)->write(buffer); + } + kj::Promise write(kj::ArrayPtr> pieces) override { + return KJ_REQUIRE_NONNULL(inner)->write(pieces); + } + + kj::Maybe> tryPumpFrom( + kj::AsyncInputStream& input, uint64_t amount) override { + return KJ_REQUIRE_NONNULL(inner)->tryPumpFrom(input, amount); + } + + kj::Promise whenWriteDisconnected() override { + return KJ_REQUIRE_NONNULL(inner)->whenWriteDisconnected(); + } + + kj::Promise end() override { + // Signal to the other side that end() was actually called. + ended->getWrapped() = true; + inner = kj::none; + return kj::READY_NOW; + } + + private: + kj::Maybe> inner; + kj::Own> ended; +}; + +class ExplicitEndInputPipeAdapter final: public kj::AsyncInputStream { + public: + ExplicitEndInputPipeAdapter(kj::Own inner, + kj::Own> ended, + kj::Maybe expectedLength) + : inner(kj::mv(inner)), + ended(kj::mv(ended)), + expectedLength(expectedLength) {} + + kj::Promise tryRead(void* buffer, size_t minBytes, size_t maxBytes) override { + size_t result = co_await inner->tryRead(buffer, minBytes, maxBytes); + + KJ_IF_SOME(l, expectedLength) { + KJ_ASSERT(result <= l); + l -= result; + if (l == 0) { + // If we got all the bytes we expected, we treat this as a successful end, because the + // underlying KJ pipe is not actually going to wait for the other side to drop. This is + // consistent with the behavior of Content-Length in HTTP anyway. + ended->getWrapped() = true; + } + } + + if (result < minBytes) { + // Verify that end() was called. + if (!ended->getWrapped()) { + JSG_FAIL_REQUIRE(Error, "ReadableStream received over RPC disconnected prematurely."); + } + } + co_return result; + } + + kj::Maybe tryGetLength() override { + return inner->tryGetLength(); + } + + kj::Promise pumpTo(kj::AsyncOutputStream& output, uint64_t amount) override { + return inner->pumpTo(output, amount); + } + + private: + kj::Own inner; + kj::Own> ended; + kj::Maybe expectedLength; +}; + +} // namespace + +class ExternalPusherImpl::InputStreamImpl final: public ExternalPusher::InputStream::Server { + public: + InputStreamImpl(kj::Own stream): stream(kj::mv(stream)) {} + + kj::Maybe> stream; +}; + +kj::Promise ExternalPusherImpl::pushByteStream(PushByteStreamContext context) { + kj::Maybe expectedLength; + auto lp1 = context.getParams().getLengthPlusOne(); + if (lp1 > 0) { + expectedLength = lp1 - 1; + } + + auto pipe = kj::newOneWayPipe(expectedLength); + + auto endedFlag = kj::refcounted>(false); + + auto out = kj::heap(kj::mv(pipe.out), kj::addRef(*endedFlag)); + auto in = + kj::heap(kj::mv(pipe.in), kj::mv(endedFlag), expectedLength); + + auto results = context.initResults(capnp::MessageSize{4, 2}); + + results.setSource(inputStreamSet.add(kj::heap(kj::mv(in)))); + results.setSink(byteStreamFactory.kjToCapnp(kj::mv(out))); + return kj::READY_NOW; +} + +kj::Own ExternalPusherImpl::unwrapStream( + ExternalPusher::InputStream::Client cap) { + auto& unwrapped = KJ_REQUIRE_NONNULL( + inputStreamSet.tryGetLocalServerSync(cap), "pushed external is not a byte stream"); + + return KJ_REQUIRE_NONNULL(kj::mv(kj::downcast(unwrapped).stream), + "pushed byte stream has already been consumed"); +} } // namespace workerd diff --git a/src/workerd/io/external-pusher.h b/src/workerd/io/external-pusher.h index c20a257a755..ff415296821 100644 --- a/src/workerd/io/external-pusher.h +++ b/src/workerd/io/external-pusher.h @@ -11,15 +11,29 @@ namespace workerd { +using kj::byte; + // Implements JsValue.ExternalPusher from worker-interface.capnp. // // ExternalPusher allows a remote peer to "push" certain kinds of objects into our address space // so that they can then be embedded in `JsValue` as `External` values. class ExternalPusherImpl: public rpc::JsValue::ExternalPusher::Server, public kj::Refcounted { public: - ExternalPusherImpl(capnp::ByteStreamFactory& byteStreamFactory) {} + ExternalPusherImpl(capnp::ByteStreamFactory& byteStreamFactory) + : byteStreamFactory(byteStreamFactory) {} + + using ExternalPusher = rpc::JsValue::ExternalPusher; + + kj::Own unwrapStream(ExternalPusher::InputStream::Client cap); + + kj::Promise pushByteStream(PushByteStreamContext context) override; + + private: + capnp::ByteStreamFactory& byteStreamFactory; + + capnp::CapabilityServerSet inputStreamSet; - // TODO(now): Implement methods. + class InputStreamImpl; }; } // namespace workerd From 6b76260d3a607017fb67666eb8a667ab9422ea35 Mon Sep 17 00:00:00 2001 From: Kenton Varda Date: Sun, 21 Dec 2025 19:12:02 -0600 Subject: [PATCH 5/7] Implement AbortSignal serialization via ExternalPusher. --- src/workerd/api/basics.c++ | 56 +++++++++++++++----- src/workerd/api/basics.h | 5 +- src/workerd/io/external-pusher.c++ | 85 ++++++++++++++++++++++++++++++ src/workerd/io/external-pusher.h | 18 +++++++ 4 files changed, 147 insertions(+), 17 deletions(-) diff --git a/src/workerd/api/basics.c++ b/src/workerd/api/basics.c++ index 73bf6bca19f..ae2c57feb06 100644 --- a/src/workerd/api/basics.c++ +++ b/src/workerd/api/basics.c++ @@ -574,6 +574,10 @@ class AbortTriggerRpcClient final { namespace { // The jsrpc handler that receives aborts from the remote and triggers them locally +// +// TODO(cleanup): This class has been copied to external-pusher.c++. The copy here can be +// deleted as soon as we've switched from StreamSink to ExternalPusher and can delete all the +// StreamSink-related code. For now I'm not trying to avoid duplication. class AbortTriggerRpcServer final: public rpc::AbortTrigger::Server { public: AbortTriggerRpcServer(kj::Own> fulfiller, @@ -858,15 +862,28 @@ void AbortSignal::serialize(jsg::Lock& js, jsg::Serializer& serializer) { return; } - auto streamCap = externalHandler - ->writeStream([&](rpc::JsValue::External::Builder builder) mutable { - builder.setAbortTrigger(); - }).castAs(); + auto triggerCap = [&]() -> rpc::AbortTrigger::Client { + KJ_IF_SOME(pusher, externalHandler->getExternalPusher()) { + auto pipeline = pusher.pushAbortSignalRequest(capnp::MessageSize{2, 0}).sendForPipeline(); + + externalHandler->write( + [signal = pipeline.getSignal()](rpc::JsValue::External::Builder builder) mutable { + builder.setAbortSignal(kj::mv(signal)); + }); + + return pipeline.getTrigger(); + } else { + return externalHandler + ->writeStream([&](rpc::JsValue::External::Builder builder) mutable { + builder.setAbortTrigger(); + }).castAs(); + } + }(); auto& ioContext = IoContext::current(); // Keep track of every AbortSignal cloned from this one. // If this->triggerAbort(...) is called, each rpcClient will be informed. - rpcClients.add(ioContext.addObject(kj::heap(kj::mv(streamCap)))); + rpcClients.add(ioContext.addObject(kj::heap(kj::mv(triggerCap)))); } jsg::Ref AbortSignal::deserialize( @@ -890,20 +907,31 @@ jsg::Ref AbortSignal::deserialize( return js.alloc(/* exception */ kj::none, /* maybeReason */ kj::none, flag); } - auto reader = externalHandler->read(); - KJ_REQUIRE(reader.isAbortTrigger(), "external table slot type does't match serialization tag"); - // The AbortSignalImpl will receive any remote triggerAbort requests and fulfill the promise with the reason for abort auto signal = js.alloc(/* exception */ kj::none, /* maybeReason */ kj::none, flag); - auto paf = kj::newPromiseAndFulfiller(); - auto pendingReason = IoContext::current().addObject(kj::refcounted()); + auto& ioctx = IoContext::current(); + + auto reader = externalHandler->read(); + if (reader.isAbortTrigger()) { + // Old-style StreamSink. + // TODO(cleanup): Remove this once the ExternalPusher autogate has rolled out. + auto paf = kj::newPromiseAndFulfiller(); + auto pendingReason = ioctx.addObject(kj::refcounted()); + + externalHandler->setLastStream( + kj::heap(kj::mv(paf.fulfiller), kj::addRef(*pendingReason))); + signal->rpcAbortPromise = ioctx.addObject(kj::heap(kj::mv(paf.promise))); + signal->pendingReason = kj::mv(pendingReason); + } else { + KJ_REQUIRE(reader.isAbortSignal(), "external table slot type does't match serialization tag"); + + auto resolvedSignal = ioctx.getExternalPusher()->unwrapAbortSignal(reader.getAbortSignal()); - externalHandler->setLastStream( - kj::heap(kj::mv(paf.fulfiller), kj::addRef(*pendingReason))); - signal->rpcAbortPromise = IoContext::current().addObject(kj::heap(kj::mv(paf.promise))); - signal->pendingReason = kj::mv(pendingReason); + signal->rpcAbortPromise = ioctx.addObject(kj::heap(kj::mv(resolvedSignal.signal))); + signal->pendingReason = ioctx.addObject(kj::mv(resolvedSignal.reason)); + } return signal; } diff --git a/src/workerd/api/basics.h b/src/workerd/api/basics.h index 5aac73e3ea7..8e75294a051 100644 --- a/src/workerd/api/basics.h +++ b/src/workerd/api/basics.h @@ -8,6 +8,7 @@ // TODO(cleanup): Rename to events.h? #include +#include #include #include #include @@ -571,9 +572,7 @@ class AbortSignal final: public EventTarget { jsg::Optional> maybeReason = kj::none, Flag flag = Flag::NONE); - using PendingReason = kj::RefcountedWrapper< - kj::OneOf /* v8Serialized */, kj::Exception /* if capability is dropped */ - >>; + using PendingReason = ExternalPusherImpl::PendingAbortReason; // The AbortSignal explicitly does not expose a constructor(). It is // illegal for user code to create an AbortSignal directly. diff --git a/src/workerd/io/external-pusher.c++ b/src/workerd/io/external-pusher.c++ index c53ec2eefb9..382577d35a9 100644 --- a/src/workerd/io/external-pusher.c++ +++ b/src/workerd/io/external-pusher.c++ @@ -139,4 +139,89 @@ kj::Own ExternalPusherImpl::unwrapStream( "pushed byte stream has already been consumed"); } +// ======================================================================================= +// AbortSignal handling + +namespace { + +// The jsrpc handler that receives aborts from the remote and triggers them locally +// +// TODO(cleanup): This class has been copied from external-pusher.c++. The copy there can be +// deleted as soon as we've switched from StreamSink to ExternalPusher and can delete all the +// StreamSink-related code. For now I'm not trying to avoid duplication. +class AbortTriggerRpcServer final: public rpc::AbortTrigger::Server { + public: + AbortTriggerRpcServer(kj::Own> fulfiller, + kj::Own&& pendingReason) + : fulfiller(kj::mv(fulfiller)), + pendingReason(kj::mv(pendingReason)) {} + + kj::Promise abort(AbortContext abortCtx) override { + auto params = abortCtx.getParams(); + auto reason = params.getReason().getV8Serialized(); + + pendingReason->getWrapped() = kj::heapArray(reason.asBytes()); + fulfiller->fulfill(); + return kj::READY_NOW; + } + + kj::Promise release(ReleaseContext releaseCtx) override { + released = true; + return kj::READY_NOW; + } + + ~AbortTriggerRpcServer() noexcept(false) { + if (pendingReason->getWrapped() != nullptr) { + // Already triggered + return; + } + + if (!released) { + pendingReason->getWrapped() = JSG_KJ_EXCEPTION(FAILED, DOMAbortError, + "An AbortSignal received over RPC was implicitly aborted because the connection back to " + "its trigger was lost."); + } + + // Always fulfill the promise in case the AbortSignal was waiting + fulfiller->fulfill(); + } + + private: + kj::Own> fulfiller; + kj::Own pendingReason; + bool released = false; +}; + +} // namespace + +class ExternalPusherImpl::AbortSignalImpl final: public ExternalPusher::AbortSignal::Server { + public: + AbortSignalImpl(AbortSignal content): content(kj::mv(content)) {} + + kj::Maybe content; +}; + +kj::Promise ExternalPusherImpl::pushAbortSignal(PushAbortSignalContext context) { + auto paf = kj::newPromiseAndFulfiller(); + auto pendingReason = kj::refcounted(); + + auto results = context.initResults(capnp::MessageSize{4, 2}); + + results.setTrigger( + kj::heap(kj::mv(paf.fulfiller), kj::addRef(*pendingReason))); + results.setSignal(abortSignalSet.add( + kj::heap(AbortSignal{kj::mv(paf.promise), kj::mv(pendingReason)}))); + + return kj::READY_NOW; +} + +ExternalPusherImpl::AbortSignal ExternalPusherImpl::unwrapAbortSignal( + ExternalPusher::AbortSignal::Client cap) { + auto& unwrapped = KJ_REQUIRE_NONNULL( + abortSignalSet.tryGetLocalServerSync(cap), "pushed external is not an AbortSignal"); + + return KJ_REQUIRE_NONNULL(kj::mv(kj::downcast(unwrapped).content), + "pushed AbortSignal has already been consumed"); +} + } // namespace workerd diff --git a/src/workerd/io/external-pusher.h b/src/workerd/io/external-pusher.h index ff415296821..3e12a85940b 100644 --- a/src/workerd/io/external-pusher.h +++ b/src/workerd/io/external-pusher.h @@ -26,14 +26,32 @@ class ExternalPusherImpl: public rpc::JsValue::ExternalPusher::Server, public kj kj::Own unwrapStream(ExternalPusher::InputStream::Client cap); + // Box which holds the reason why an AbortSignal was aborted. May be either: + // - A serialized V8 value if the signal was aborted from JavaScript. + // - A KJ exception if the connection from the trigger was lost. + using PendingAbortReason = kj::RefcountedWrapper, kj::Exception>>; + + struct AbortSignal { + // Resolves when `reason` has been filled in. + kj::Promise signal; + + // The abort reason box, will be uninitialized until `signal` resolves. + kj::Own reason; + }; + + AbortSignal unwrapAbortSignal(ExternalPusher::AbortSignal::Client cap); + kj::Promise pushByteStream(PushByteStreamContext context) override; + kj::Promise pushAbortSignal(PushAbortSignalContext context) override; private: capnp::ByteStreamFactory& byteStreamFactory; capnp::CapabilityServerSet inputStreamSet; + capnp::CapabilityServerSet abortSignalSet; class InputStreamImpl; + class AbortSignalImpl; }; } // namespace workerd From e7fe4d007ede8bd58b37e1ad34f9d4445fb48a12 Mon Sep 17 00:00:00 2001 From: Kenton Varda Date: Sun, 21 Dec 2025 19:12:19 -0600 Subject: [PATCH 6/7] Run js-rpc-test and abortsignal-test again with ExternalPusher. Currently, it appears that we do not run wd-tests with all-autogates. We probably should but that'll have to be a separate change. For now, this arranges just to run js-rpc-test and abortsignal-test a second time with the ExternalPusher autogate on. --- src/workerd/api/tests/BUILD.bazel | 18 +++++ .../abortsignal-external-pusher-test.wd-test | 10 +++ .../api/tests/abortsignal-test.wd-test | 30 ++++---- .../tests/js-rpc-external-pusher-test.wd-test | 9 +++ src/workerd/api/tests/js-rpc-test.js | 29 ++++++++ src/workerd/api/tests/js-rpc-test.wd-test | 70 ++++++++++--------- 6 files changed, 118 insertions(+), 48 deletions(-) create mode 100644 src/workerd/api/tests/abortsignal-external-pusher-test.wd-test create mode 100644 src/workerd/api/tests/js-rpc-external-pusher-test.wd-test diff --git a/src/workerd/api/tests/BUILD.bazel b/src/workerd/api/tests/BUILD.bazel index e7b0e3ee22b..5c415d9130e 100644 --- a/src/workerd/api/tests/BUILD.bazel +++ b/src/workerd/api/tests/BUILD.bazel @@ -157,6 +157,15 @@ wd_test( data = ["abortsignal-test.js"], ) +wd_test( + src = "abortsignal-external-pusher-test.wd-test", + args = ["--experimental"], + data = [ + "abortsignal-test.js", + "abortsignal-test.wd-test", + ], +) + wd_test( src = "actor-stub-test.wd-test", args = ["--experimental"], @@ -275,6 +284,15 @@ wd_test( data = ["js-rpc-test.js"], ) +wd_test( + src = "js-rpc-external-pusher-test.wd-test", + args = ["--experimental"], + data = [ + "js-rpc-test.js", + "js-rpc-test.wd-test", + ], +) + wd_test( src = "js-rpc-params-ownership-test.wd-test", args = ["--experimental"], diff --git a/src/workerd/api/tests/abortsignal-external-pusher-test.wd-test b/src/workerd/api/tests/abortsignal-external-pusher-test.wd-test new file mode 100644 index 00000000000..26426e8ae4b --- /dev/null +++ b/src/workerd/api/tests/abortsignal-external-pusher-test.wd-test @@ -0,0 +1,10 @@ +# Same as abortsignal-test.wd-test, but enabling the autogate +# workerd-autogate-rpc-use-external-pusher. + +using Workerd = import "/workerd/workerd.capnp"; + +const unitTests :Workerd.Config = ( + services = import "abortsignal-test.wd-test".services, + v8Flags = [ "--expose-gc" ], + autogates = [ "workerd-autogate-rpc-use-external-pusher" ] +); diff --git a/src/workerd/api/tests/abortsignal-test.wd-test b/src/workerd/api/tests/abortsignal-test.wd-test index f7cf881cb8d..38628557091 100644 --- a/src/workerd/api/tests/abortsignal-test.wd-test +++ b/src/workerd/api/tests/abortsignal-test.wd-test @@ -1,19 +1,21 @@ using Workerd = import "/workerd/workerd.capnp"; +const services :List(Workerd.Service) = [ + ( name = "abortsignal-test", + worker = ( + modules = [ + (name = "worker", esModule = embed "abortsignal-test.js") + ], + compatibilityDate = "2025-01-01", + compatibilityFlags = ["nodejs_compat", "enable_abortsignal_rpc", "experimental"], + bindings = [ + (name = "RpcRemoteEnd", service = (name = "abortsignal-test", entrypoint = "RpcRemoteEnd")), + ] + ) + ), +]; + const unitTests :Workerd.Config = ( - services = [ - ( name = "abortsignal-test", - worker = ( - modules = [ - (name = "worker", esModule = embed "abortsignal-test.js") - ], - compatibilityDate = "2025-01-01", - compatibilityFlags = ["nodejs_compat", "enable_abortsignal_rpc", "experimental"], - bindings = [ - (name = "RpcRemoteEnd", service = (name = "abortsignal-test", entrypoint = "RpcRemoteEnd")), - ] - ) - ), - ], + services = .services, v8Flags = ["--expose-gc"] ); diff --git a/src/workerd/api/tests/js-rpc-external-pusher-test.wd-test b/src/workerd/api/tests/js-rpc-external-pusher-test.wd-test new file mode 100644 index 00000000000..0847345009d --- /dev/null +++ b/src/workerd/api/tests/js-rpc-external-pusher-test.wd-test @@ -0,0 +1,9 @@ +# Same as js-rpc-test.wd-test, but enabling the autogate workerd-autogate-rpc-use-external-pusher. + +using Workerd = import "/workerd/workerd.capnp"; + +const unitTests :Workerd.Config = ( + services = import "js-rpc-test.wd-test".services, + v8Flags = [ "--expose-gc" ], + autogates = [ "workerd-autogate-rpc-use-external-pusher" ] +); diff --git a/src/workerd/api/tests/js-rpc-test.js b/src/workerd/api/tests/js-rpc-test.js index cbe74d18e33..2f9e84ccabc 100644 --- a/src/workerd/api/tests/js-rpc-test.js +++ b/src/workerd/api/tests/js-rpc-test.js @@ -2066,3 +2066,32 @@ export let sendServiceStubOverRpc = { } }, }; + +// Make sure that calls are delivered in e-order, even in the presence of pushed externals. +export let eOrderTest = { + async test(controller, env, ctx) { + let abortController = new AbortController(); + let abortSignal = abortController.signal; + + let readableController; + let readableStream = new ReadableStream({ + start(c) { + readableController = c; + }, + }); + + let stub = await env.MyService.makeCounter(0); + + let promises = []; + promises.push(stub.increment(1)); + promises.push(stub.increment(1)); + promises.push(stub.increment(1, abortSignal)); + promises.push(stub.increment(1)); + promises.push(stub.increment(1, readableStream)); + promises.push(stub.increment(1)); + + let results = await Promise.all(promises); + + assert.deepEqual(results, [1, 2, 3, 4, 5, 6]); + }, +}; diff --git a/src/workerd/api/tests/js-rpc-test.wd-test b/src/workerd/api/tests/js-rpc-test.wd-test index da9c62b62cd..9ff825fbeff 100644 --- a/src/workerd/api/tests/js-rpc-test.wd-test +++ b/src/workerd/api/tests/js-rpc-test.wd-test @@ -1,40 +1,42 @@ using Workerd = import "/workerd/workerd.capnp"; -const unitTests :Workerd.Config = ( - services = [ - ( name = "js-rpc-test", - worker = ( - modules = [ - (name = "worker", esModule = embed "js-rpc-test.js") - ], - compatibilityDate = "2024-01-01", - compatibilityFlags = [ - "nodejs_compat", - "fetcher_no_get_put_delete", - "enable_abortsignal_rpc", - "enhanced_error_serialization", - "enable_ctx_exports", - "experimental" - ], - bindings = [ - (name = "self", service = (name = "js-rpc-test", entrypoint = "nonClass")), - (name = "MyService", service = (name = "js-rpc-test", entrypoint = "MyService")), - (name = "MyServiceProxy", service = (name = "js-rpc-test", entrypoint = "MyServiceProxy")), - (name = "MyActor", durableObjectNamespace = "MyActor"), - (name = "ActorNoExtends", durableObjectNamespace = "ActorNoExtends"), - (name = "defaultExport", service = "js-rpc-test"), - (name = "twelve", json = "12"), - (name = "GreeterFactory", service = (name = "js-rpc-test", entrypoint = "GreeterFactory")), - ], +const services :List(Workerd.Service) = [ + ( name = "js-rpc-test", + worker = ( + modules = [ + (name = "worker", esModule = embed "js-rpc-test.js") + ], + compatibilityDate = "2024-01-01", + compatibilityFlags = [ + "nodejs_compat", + "fetcher_no_get_put_delete", + "enable_abortsignal_rpc", + "enhanced_error_serialization", + "enable_ctx_exports", + "experimental" + ], + bindings = [ + (name = "self", service = (name = "js-rpc-test", entrypoint = "nonClass")), + (name = "MyService", service = (name = "js-rpc-test", entrypoint = "MyService")), + (name = "MyServiceProxy", service = (name = "js-rpc-test", entrypoint = "MyServiceProxy")), + (name = "MyActor", durableObjectNamespace = "MyActor"), + (name = "ActorNoExtends", durableObjectNamespace = "ActorNoExtends"), + (name = "defaultExport", service = "js-rpc-test"), + (name = "twelve", json = "12"), + (name = "GreeterFactory", service = (name = "js-rpc-test", entrypoint = "GreeterFactory")), + ], + + durableObjectNamespaces = [ + (className = "MyActor", uniqueKey = "foo"), + (className = "ActorNoExtends", uniqueKey = "bar"), + ], - durableObjectNamespaces = [ - (className = "MyActor", uniqueKey = "foo"), - (className = "ActorNoExtends", uniqueKey = "bar"), - ], + durableObjectStorage = (inMemory = void), + ) + ), +]; - durableObjectStorage = (inMemory = void), - ) - ), - ], +const unitTests :Workerd.Config = ( + services = .services, v8Flags = [ "--expose-gc" ], ); From 349407a755049baada4f97a9cd253a5f35509478 Mon Sep 17 00:00:00 2001 From: Kenton Varda Date: Mon, 22 Dec 2025 13:41:40 -0600 Subject: [PATCH 7/7] WIP: Remove StreamSink DO NOT MERGE until the autogate introduced in #5738 is fully rolled out and working. Until then I'm putting this up just to show what we'll be able to remove. --- src/workerd/api/basics.c++ | 95 +----- src/workerd/api/streams/readable.c++ | 149 +-------- src/workerd/api/tests/BUILD.bazel | 18 -- .../abortsignal-external-pusher-test.wd-test | 10 - .../tests/js-rpc-external-pusher-test.wd-test | 9 - src/workerd/api/worker-rpc.c++ | 286 ++---------------- src/workerd/api/worker-rpc.h | 54 +--- src/workerd/io/worker-interface.capnp | 67 +--- 8 files changed, 76 insertions(+), 612 deletions(-) delete mode 100644 src/workerd/api/tests/abortsignal-external-pusher-test.wd-test delete mode 100644 src/workerd/api/tests/js-rpc-external-pusher-test.wd-test diff --git a/src/workerd/api/basics.c++ b/src/workerd/api/basics.c++ index ae2c57feb06..b741370ad1e 100644 --- a/src/workerd/api/basics.c++ +++ b/src/workerd/api/basics.c++ @@ -572,56 +572,6 @@ class AbortTriggerRpcClient final { rpc::AbortTrigger::Client client; }; -namespace { -// The jsrpc handler that receives aborts from the remote and triggers them locally -// -// TODO(cleanup): This class has been copied to external-pusher.c++. The copy here can be -// deleted as soon as we've switched from StreamSink to ExternalPusher and can delete all the -// StreamSink-related code. For now I'm not trying to avoid duplication. -class AbortTriggerRpcServer final: public rpc::AbortTrigger::Server { - public: - AbortTriggerRpcServer(kj::Own> fulfiller, - kj::Own&& pendingReason) - : fulfiller(kj::mv(fulfiller)), - pendingReason(kj::mv(pendingReason)) {} - - kj::Promise abort(AbortContext abortCtx) override { - auto params = abortCtx.getParams(); - auto reason = params.getReason().getV8Serialized(); - - pendingReason->getWrapped() = kj::heapArray(reason.asBytes()); - fulfiller->fulfill(); - return kj::READY_NOW; - } - - kj::Promise release(ReleaseContext releaseCtx) override { - released = true; - return kj::READY_NOW; - } - - ~AbortTriggerRpcServer() noexcept(false) { - if (pendingReason->getWrapped() != nullptr) { - // Already triggered - return; - } - - if (!released) { - pendingReason->getWrapped() = JSG_KJ_EXCEPTION(FAILED, DOMAbortError, - "An AbortSignal received over RPC was implicitly aborted because the connection back to " - "its trigger was lost."); - } - - // Always fulfill the promise in case the AbortSignal was waiting - fulfiller->fulfill(); - } - - private: - kj::Own> fulfiller; - kj::Own pendingReason; - bool released = false; -}; -} // namespace - AbortSignal::AbortSignal(kj::Maybe exception, jsg::Optional> maybeReason, Flag flag) @@ -862,28 +812,19 @@ void AbortSignal::serialize(jsg::Lock& js, jsg::Serializer& serializer) { return; } - auto triggerCap = [&]() -> rpc::AbortTrigger::Client { - KJ_IF_SOME(pusher, externalHandler->getExternalPusher()) { - auto pipeline = pusher.pushAbortSignalRequest(capnp::MessageSize{2, 0}).sendForPipeline(); - - externalHandler->write( - [signal = pipeline.getSignal()](rpc::JsValue::External::Builder builder) mutable { - builder.setAbortSignal(kj::mv(signal)); - }); + auto pipeline = externalHandler->getExternalPusher() + .pushAbortSignalRequest(capnp::MessageSize{2, 0}) + .sendForPipeline(); - return pipeline.getTrigger(); - } else { - return externalHandler - ->writeStream([&](rpc::JsValue::External::Builder builder) mutable { - builder.setAbortTrigger(); - }).castAs(); - } - }(); + externalHandler->write( + [signal = pipeline.getSignal()](rpc::JsValue::External::Builder builder) mutable { + builder.setAbortSignal(kj::mv(signal)); + }); auto& ioContext = IoContext::current(); // Keep track of every AbortSignal cloned from this one. // If this->triggerAbort(...) is called, each rpcClient will be informed. - rpcClients.add(ioContext.addObject(kj::heap(kj::mv(triggerCap)))); + rpcClients.add(ioContext.addObject(kj::heap(pipeline.getTrigger()))); } jsg::Ref AbortSignal::deserialize( @@ -914,24 +855,12 @@ jsg::Ref AbortSignal::deserialize( auto& ioctx = IoContext::current(); auto reader = externalHandler->read(); - if (reader.isAbortTrigger()) { - // Old-style StreamSink. - // TODO(cleanup): Remove this once the ExternalPusher autogate has rolled out. - auto paf = kj::newPromiseAndFulfiller(); - auto pendingReason = ioctx.addObject(kj::refcounted()); - - externalHandler->setLastStream( - kj::heap(kj::mv(paf.fulfiller), kj::addRef(*pendingReason))); - signal->rpcAbortPromise = ioctx.addObject(kj::heap(kj::mv(paf.promise))); - signal->pendingReason = kj::mv(pendingReason); - } else { - KJ_REQUIRE(reader.isAbortSignal(), "external table slot type does't match serialization tag"); + KJ_REQUIRE(reader.isAbortSignal(), "external table slot type does't match serialization tag"); - auto resolvedSignal = ioctx.getExternalPusher()->unwrapAbortSignal(reader.getAbortSignal()); + auto resolvedSignal = ioctx.getExternalPusher()->unwrapAbortSignal(reader.getAbortSignal()); - signal->rpcAbortPromise = ioctx.addObject(kj::heap(kj::mv(resolvedSignal.signal))); - signal->pendingReason = ioctx.addObject(kj::mv(resolvedSignal.reason)); - } + signal->rpcAbortPromise = ioctx.addObject(kj::heap(kj::mv(resolvedSignal.signal))); + signal->pendingReason = ioctx.addObject(kj::mv(resolvedSignal.reason)); return signal; } diff --git a/src/workerd/api/streams/readable.c++ b/src/workerd/api/streams/readable.c++ index 65cc0489061..cfa7ccc6276 100644 --- a/src/workerd/api/streams/readable.c++ +++ b/src/workerd/api/streams/readable.c++ @@ -519,93 +519,6 @@ jsg::Optional ByteLengthQueuingStrategy::size( namespace { -// TODO(cleanup): These classes have been copied to external-pusher.c++. The copies here can be -// deleted as soon as we've switched from StreamSink to ExternalPusher and can delete all the -// StreamSink-related code. For now I'm not trying to avoid duplication. - -// HACK: We need as async pipe, like kj::newOneWayPipe(), except supporting explicit end(). So we -// wrap the two ends of the pipe in special adapters that track whether end() was called. -class ExplicitEndOutputPipeAdapter final: public capnp::ExplicitEndOutputStream { - public: - ExplicitEndOutputPipeAdapter( - kj::Own inner, kj::Own> ended) - : inner(kj::mv(inner)), - ended(kj::mv(ended)) {} - - kj::Promise write(kj::ArrayPtr buffer) override { - return KJ_REQUIRE_NONNULL(inner)->write(buffer); - } - kj::Promise write(kj::ArrayPtr> pieces) override { - return KJ_REQUIRE_NONNULL(inner)->write(pieces); - } - - kj::Maybe> tryPumpFrom( - kj::AsyncInputStream& input, uint64_t amount) override { - return KJ_REQUIRE_NONNULL(inner)->tryPumpFrom(input, amount); - } - - kj::Promise whenWriteDisconnected() override { - return KJ_REQUIRE_NONNULL(inner)->whenWriteDisconnected(); - } - - kj::Promise end() override { - // Signal to the other side that end() was actually called. - ended->getWrapped() = true; - inner = kj::none; - return kj::READY_NOW; - } - - private: - kj::Maybe> inner; - kj::Own> ended; -}; - -class ExplicitEndInputPipeAdapter final: public kj::AsyncInputStream { - public: - ExplicitEndInputPipeAdapter(kj::Own inner, - kj::Own> ended, - kj::Maybe expectedLength) - : inner(kj::mv(inner)), - ended(kj::mv(ended)), - expectedLength(expectedLength) {} - - kj::Promise tryRead(void* buffer, size_t minBytes, size_t maxBytes) override { - size_t result = co_await inner->tryRead(buffer, minBytes, maxBytes); - - KJ_IF_SOME(l, expectedLength) { - KJ_ASSERT(result <= l); - l -= result; - if (l == 0) { - // If we got all the bytes we expected, we treat this as a successful end, because the - // underlying KJ pipe is not actually going to wait for the other side to drop. This is - // consistent with the behavior of Content-Length in HTTP anyway. - ended->getWrapped() = true; - } - } - - if (result < minBytes) { - // Verify that end() was called. - if (!ended->getWrapped()) { - JSG_FAIL_REQUIRE(Error, "ReadableStream received over RPC disconnected prematurely."); - } - } - co_return result; - } - - kj::Maybe tryGetLength() override { - return inner->tryGetLength(); - } - - kj::Promise pumpTo(kj::AsyncOutputStream& output, uint64_t amount) override { - return inner->pumpTo(output, amount); - } - - private: - kj::Own inner; - kj::Own> ended; - kj::Maybe expectedLength; -}; - // Wrapper around ReadableStreamSource that prevents deferred proxying. We need this for RPC // streams because although they are "system streams", they become disconnected when the IoContext // is destroyed, due to the JsRpcCustomEvent being canceled. @@ -680,37 +593,21 @@ void ReadableStream::serialize(jsg::Lock& js, jsg::Serializer& serializer) { StreamEncoding encoding = controller.getPreferredEncoding(); auto expectedLength = controller.tryGetLength(encoding); - capnp::ByteStream::Client streamCap = [&]() { - KJ_IF_SOME(pusher, externalHandler->getExternalPusher()) { - auto req = pusher.pushByteStreamRequest(capnp::MessageSize{2, 0}); - KJ_IF_SOME(el, expectedLength) { - req.setLengthPlusOne(el + 1); - } - auto pipeline = req.sendForPipeline(); - - externalHandler->write([encoding, expectedLength, source = pipeline.getSource()]( - rpc::JsValue::External::Builder builder) mutable { - auto rs = builder.initReadableStream(); - rs.setStream(kj::mv(source)); - rs.setEncoding(encoding); - }); - - return pipeline.getSink(); - } else { - return externalHandler - ->writeStream( - [encoding, expectedLength](rpc::JsValue::External::Builder builder) mutable { - auto rs = builder.initReadableStream(); - rs.setEncoding(encoding); - KJ_IF_SOME(l, expectedLength) { - rs.getExpectedLength().setKnown(l); - } - }).castAs(); - } - }(); + auto req = externalHandler->getExternalPusher().pushByteStreamRequest(capnp::MessageSize{2, 0}); + KJ_IF_SOME(el, expectedLength) { + req.setLengthPlusOne(el + 1); + } + auto pipeline = req.sendForPipeline(); + + externalHandler->write([encoding, expectedLength, source = pipeline.getSource()]( + rpc::JsValue::External::Builder builder) mutable { + auto rs = builder.initReadableStream(); + rs.setStream(kj::mv(source)); + rs.setEncoding(encoding); + }); kj::Own kjStream = - ioctx.getByteStreamFactory().capnpToKjExplicitEnd(kj::mv(streamCap)); + ioctx.getByteStreamFactory().capnpToKjExplicitEnd(pipeline.getSink()); auto sink = newSystemStream(kj::mv(kjStream), encoding, ioctx); @@ -741,25 +638,7 @@ jsg::Ref ReadableStream::deserialize( auto& ioctx = IoContext::current(); - kj::Own in; - if (rs.hasStream()) { - in = ioctx.getExternalPusher()->unwrapStream(rs.getStream()); - } else { - kj::Maybe expectedLength; - auto el = rs.getExpectedLength(); - if (el.isKnown()) { - expectedLength = el.getKnown(); - } - - auto pipe = kj::newOneWayPipe(expectedLength); - - auto endedFlag = kj::refcounted>(false); - - auto out = kj::heap(kj::mv(pipe.out), kj::addRef(*endedFlag)); - in = kj::heap(kj::mv(pipe.in), kj::mv(endedFlag), expectedLength); - - externalHandler->setLastStream(ioctx.getByteStreamFactory().kjToCapnp(kj::mv(out))); - } + auto in = ioctx.getExternalPusher()->unwrapStream(rs.getStream()); return js.alloc(ioctx, kj::heap(newSystemStream(kj::mv(in), encoding, ioctx), ioctx)); diff --git a/src/workerd/api/tests/BUILD.bazel b/src/workerd/api/tests/BUILD.bazel index 5c415d9130e..e7b0e3ee22b 100644 --- a/src/workerd/api/tests/BUILD.bazel +++ b/src/workerd/api/tests/BUILD.bazel @@ -157,15 +157,6 @@ wd_test( data = ["abortsignal-test.js"], ) -wd_test( - src = "abortsignal-external-pusher-test.wd-test", - args = ["--experimental"], - data = [ - "abortsignal-test.js", - "abortsignal-test.wd-test", - ], -) - wd_test( src = "actor-stub-test.wd-test", args = ["--experimental"], @@ -284,15 +275,6 @@ wd_test( data = ["js-rpc-test.js"], ) -wd_test( - src = "js-rpc-external-pusher-test.wd-test", - args = ["--experimental"], - data = [ - "js-rpc-test.js", - "js-rpc-test.wd-test", - ], -) - wd_test( src = "js-rpc-params-ownership-test.wd-test", args = ["--experimental"], diff --git a/src/workerd/api/tests/abortsignal-external-pusher-test.wd-test b/src/workerd/api/tests/abortsignal-external-pusher-test.wd-test deleted file mode 100644 index 26426e8ae4b..00000000000 --- a/src/workerd/api/tests/abortsignal-external-pusher-test.wd-test +++ /dev/null @@ -1,10 +0,0 @@ -# Same as abortsignal-test.wd-test, but enabling the autogate -# workerd-autogate-rpc-use-external-pusher. - -using Workerd = import "/workerd/workerd.capnp"; - -const unitTests :Workerd.Config = ( - services = import "abortsignal-test.wd-test".services, - v8Flags = [ "--expose-gc" ], - autogates = [ "workerd-autogate-rpc-use-external-pusher" ] -); diff --git a/src/workerd/api/tests/js-rpc-external-pusher-test.wd-test b/src/workerd/api/tests/js-rpc-external-pusher-test.wd-test deleted file mode 100644 index 0847345009d..00000000000 --- a/src/workerd/api/tests/js-rpc-external-pusher-test.wd-test +++ /dev/null @@ -1,9 +0,0 @@ -# Same as js-rpc-test.wd-test, but enabling the autogate workerd-autogate-rpc-use-external-pusher. - -using Workerd = import "/workerd/workerd.capnp"; - -const unitTests :Workerd.Config = ( - services = import "js-rpc-test.wd-test".services, - v8Flags = [ "--expose-gc" ], - autogates = [ "workerd-autogate-rpc-use-external-pusher" ] -); diff --git a/src/workerd/api/worker-rpc.c++ b/src/workerd/api/worker-rpc.c++ index 1b82db92909..baa4b4315c5 100644 --- a/src/workerd/api/worker-rpc.c++ +++ b/src/workerd/api/worker-rpc.c++ @@ -15,125 +15,6 @@ namespace workerd::api { -namespace { - -using StreamSinkFulfiller = kj::Own>; - -} // namespace - -// Implementation of StreamSink RPC interface. The stream sender calls `startStream()` when -// serializing each stream, and the recipient calls `setSlot()` when deserializing streams to -// provide the appropriate destination capability. This class is designed to allow these two -// calls to happen in either order for each slot. -class StreamSinkImpl final: public rpc::JsValue::StreamSink::Server, public kj::Refcounted { - public: - ~StreamSinkImpl() noexcept(false) { - for (auto& slot: table) { - KJ_IF_SOME(f, slot.tryGet()) { - f->reject(KJ_EXCEPTION(FAILED, "expected startStream() was never received")); - } - } - } - - void setSlot(uint i, capnp::Capability::Client stream) { - if (table.size() <= i) table.resize(i + 1); - - if (table[i] == nullptr) { - table[i] = kj::mv(stream); - } else KJ_SWITCH_ONEOF(table[i]) { - KJ_CASE_ONEOF(stream, capnp::Capability::Client) { - KJ_FAIL_REQUIRE("setSlot() tried to set the same slot twice", i); - } - KJ_CASE_ONEOF(fulfiller, StreamFulfiller) { - fulfiller->fulfill(kj::mv(stream)); - table[i] = Consumed(); - } - KJ_CASE_ONEOF(_, Consumed) { - KJ_FAIL_REQUIRE("setSlot() tried to set the same slot twice", i); - } - } - } - - kj::Promise startStream(StartStreamContext context) override { - uint i = context.getParams().getExternalIndex(); - - if (table.size() <= i) { - // guard against ridiculous table allocation - JSG_REQUIRE(i < 1024, Error, "Too many streams in one message."); - table.resize(i + 1); - } - - if (table[i] == nullptr) { - auto paf = kj::newPromiseAndFulfiller(); - table[i] = kj::mv(paf.fulfiller); - context.getResults(capnp::MessageSize{4, 1}).setStream(kj::mv(paf.promise)); - } else KJ_SWITCH_ONEOF(table[i]) { - KJ_CASE_ONEOF(stream, capnp::Capability::Client) { - context.getResults(capnp::MessageSize{4, 1}).setStream(kj::mv(stream)); - table[i] = Consumed(); - } - KJ_CASE_ONEOF(fulfiller, StreamFulfiller) { - KJ_FAIL_REQUIRE("startStream() tried to start the same stream twice", i); - } - KJ_CASE_ONEOF(_, Consumed) { - KJ_FAIL_REQUIRE("startStream() tried to start the same stream twice", i); - } - } - - return kj::READY_NOW; - } - - private: - using StreamFulfiller = kj::Own>; - struct Consumed {}; - - // Each slot starts out null (uninitialized). It becomes a Capability::Client if setSlot() is - // called first, or a StreamFulfiller if startStream() is called first. It becomes `Consumed` - // when the other method is called. - // HACK: Slots in the table take advantage of the little-known fact that OneOf has a "null" - // value, which is the value a OneOf has when default-initialized. This is useful because we - // don't want to explicitly initialize skipped slots. Maybe would be another option - // here, but would add 8 bytes to every slot just to store a boolean... feels bloated. There - // are only two methods in this class so I think it's OK. - using Slot = kj::OneOf; - - kj::Vector table; -}; - -kj::Maybe RpcSerializerExternalHandler::getExternalPusher() { - KJ_IF_SOME(ep, externalPusher) { - return ep; - } else KJ_IF_SOME(func, getStreamHandlerFunc.tryGet()) { - // First call, set up ExternalPusher. - return externalPusher.emplace(func()); - } else { - // Using StreamSink. - return kj::none; - } -} - -capnp::Capability::Client RpcSerializerExternalHandler::writeStream(BuilderCallback callback) { - rpc::JsValue::StreamSink::Client* streamSinkPtr; - KJ_IF_SOME(ss, streamSink) { - streamSinkPtr = &ss; - } else { - // First stream written, set up the StreamSink. - auto& func = KJ_REQUIRE_NONNULL(getStreamHandlerFunc.tryGet(), - "this serialization is not using StreamSink; use getExternalPusher() instead"); - streamSinkPtr = &streamSink.emplace(func()); - } - - auto result = ({ - auto req = streamSinkPtr->startStreamRequest(capnp::MessageSize{4, 0}); - req.setExternalIndex(externals.size()); - req.send().getStream(); - }); - - write(kj::mv(callback)); - - return result; -} - capnp::Orphan> RpcSerializerExternalHandler::build( capnp::Orphanage orphanage) { auto result = orphanage.newOrphan>(externals.size()); @@ -155,17 +36,6 @@ rpc::JsValue::External::Reader RpcDeserializerExternalHandler::read() { return externals[i++]; } -void RpcDeserializerExternalHandler::setLastStream(capnp::Capability::Client stream) { - KJ_IF_SOME(ss, streamSink) { - ss.setSlot(i - 1, kj::mv(stream)); - } else { - auto ss = kj::refcounted(); - ss->setSlot(i - 1, kj::mv(stream)); - streamSink = *ss; - streamSinkCap = rpc::JsValue::StreamSink::Client(kj::mv(ss)); - } -} - namespace { // Call to construct an `rpc::JsValue` from a JS value. @@ -213,15 +83,13 @@ void serializeJsValue(jsg::Lock& js, struct DeserializeResult { jsg::JsValue value; kj::Own disposalGroup; - kj::Maybe streamSink; }; // Call to construct a JS value from an `rpc::JsValue`. -DeserializeResult deserializeJsValue( - jsg::Lock& js, rpc::JsValue::Reader reader, kj::Maybe streamSink = kj::none) { +DeserializeResult deserializeJsValue(jsg::Lock& js, rpc::JsValue::Reader reader) { auto disposalGroup = kj::heap(); - RpcDeserializerExternalHandler externalHandler(reader.getExternals(), *disposalGroup, streamSink); + RpcDeserializerExternalHandler externalHandler(reader.getExternals(), *disposalGroup); jsg::Deserializer deserializer(js, reader.getV8Serialized(), kj::none, kj::none, jsg::Deserializer::Options{ @@ -241,21 +109,14 @@ DeserializeResult deserializeJsValue( return { .value = deserializer.readValue(js), .disposalGroup = kj::mv(disposalGroup), - .streamSink = externalHandler.getStreamSink(), }; } // Does deserializeJsValue() and then adds a `dispose()` method to the returned object (if it is // an object) which disposes all stubs therein. -jsg::JsValue deserializeRpcReturnValue(jsg::Lock& js, - rpc::JsRpcTarget::CallResults::Reader callResults, - kj::Maybe streamSink) { - auto [value, disposalGroup, ss] = deserializeJsValue(js, callResults.getResult(), streamSink); - - if (streamSink == kj::none) { - KJ_REQUIRE(ss == kj::none, - "RPC returned result using StreamSink even though ExternalPusher was provided"); - } +jsg::JsValue deserializeRpcReturnValue( + jsg::Lock& js, rpc::JsRpcTarget::CallResults::Reader callResults) { + auto [value, disposalGroup] = deserializeJsValue(js, callResults.getResult()); // If the object had a disposer on the callee side, it will run when we discard the callPipeline, // so attach that to the disposal group on the caller side. If the returned object did NOT have @@ -498,11 +359,6 @@ JsRpcPromiseAndPipeline callImpl(jsg::Lock& js, } } - kj::Maybe paramsStreamSinkFulfiller; - - bool useExternalPusher = - util::Autogate::isEnabled(util::AutogateKey::RPC_USE_EXTERNAL_PUSHER); - KJ_IF_SOME(args, maybeArgs) { // If we have arguments, serialize them. // Note that we may fail to serialize some element, in which case this will throw back to @@ -519,22 +375,7 @@ JsRpcPromiseAndPipeline callImpl(jsg::Lock& js, ? RpcSerializerExternalHandler::DUPLICATE : RpcSerializerExternalHandler::TRANSFER; - RpcSerializerExternalHandler::GetStreamHandlerFunc getStreamHandlerFunc; - if (useExternalPusher) { - getStreamHandlerFunc.init( - [&]() -> rpc::JsValue::ExternalPusher::Client { return client; }); - } else { - getStreamHandlerFunc.init([&]() { - // A stream was encountered in the params, so we must expect the response to contain - // paramsStreamSink. But we don't have the response yet. So, we need to set up a - // temporary promise client, which we hook to the response a little bit later. - auto paf = kj::newPromiseAndFulfiller(); - paramsStreamSinkFulfiller = kj::mv(paf.fulfiller); - return kj::mv(paf.promise); - }); - } - - RpcSerializerExternalHandler externalHandler(stubOwnership, kj::mv(getStreamHandlerFunc)); + RpcSerializerExternalHandler externalHandler(stubOwnership, client); serializeJsValue(js, jsg::JsValue(arr), externalHandler, [&](capnp::MessageSize hint) { // TODO(perf): Actually use the size hint. return builder.getOperation().initCallWithArgs(); @@ -545,27 +386,13 @@ JsRpcPromiseAndPipeline callImpl(jsg::Lock& js, builder.getOperation().setGetProperty(); } - kj::Maybe> resultStreamSink; - if (useExternalPusher) { - // Unfortunately, we always have to send the ExternalPusher since we don't know whether the - // call will return any streams (or other pushed externals). Luckily, it's a - // one-per-IoContext object, not a big deal. (It'll take a slot on the capnp export table - // though.) - builder.getResultsStreamHandler().setExternalPusher(ioContext.getExternalPusher()); - } else { - // Unfortunately, we always have to send a `resultsStreamSink` because we don't know until - // after the call completes whether or not it will return any streams. If it's unused, - // though, it should only be a couple allocations. - builder.getResultsStreamHandler().setStreamSink( - kj::addRef(*resultStreamSink.emplace(kj::refcounted()))); - } - + // Unfortunately, we always have to send the ExternalPusher since we don't know whether the + // call will return any streams (or other pushed externals). Luckily, it's a + // one-per-IoContext object, not a big deal. (It'll take a slot on the capnp export table + // though.) + builder.getResultsStreamHandler().setExternalPusher(ioContext.getExternalPusher()); auto callResult = builder.send(); - KJ_IF_SOME(ssf, paramsStreamSinkFulfiller) { - ssf->fulfill(callResult.getParamsStreamSink()); - } - // We need to arrange that our JsRpcPromise will updated in-place with the final settlement // of this RPC promise. However, we can't actually construct the JsRpcPromise until we have // the final promise to give it. To resolve the cycle, we only create a JsRpcPromise::WeakRef @@ -586,10 +413,9 @@ JsRpcPromiseAndPipeline callImpl(jsg::Lock& js, // RemotePromise lets us consume its pipeline and promise portions independently; we consume // the promise here and we consume the pipeline below, both via kj::mv(). auto jsPromise = ioContext.awaitIo(js, kj::mv(promise), - [weakRef = kj::atomicAddRef(*weakRef), resultStreamSink = kj::mv(resultStreamSink)]( - jsg::Lock& js, + [weakRef = kj::atomicAddRef(*weakRef)](jsg::Lock& js, capnp::Response response) mutable -> jsg::Value { - auto jsResult = deserializeRpcReturnValue(js, response, resultStreamSink); + auto jsResult = deserializeRpcReturnValue(js, response); if (weakRef->disposed) { // The promise was explicitly disposed before it even resolved. This means we must dispose @@ -962,9 +788,9 @@ using Result = kj::OneOf; template MakeCallPipeline::Result serializeJsValueWithPipeline(jsg::Lock& js, + rpc::JsValue::ExternalPusher::Client externalPusher, jsg::JsValue value, - Func makeBuilder, - RpcSerializerExternalHandler::GetStreamHandlerFunc getStreamSinkFunc); + Func makeBuilder); // Callee-side implementation of JsRpcTarget. // @@ -1104,46 +930,6 @@ class JsRpcTargetBase: public rpc::JsRpcTarget::Server { // Given a handle for the result, if it's a promise, await the promise, then serialize the // final result for return. - RpcSerializerExternalHandler::GetStreamHandlerFunc getResultsStreamHandlerFunc; - auto resultStreamHandler = params.getResultsStreamHandler(); - switch (resultStreamHandler.which()) { - case rpc::JsRpcTarget::CallParams::ResultsStreamHandler::EXTERNAL_PUSHER: - getResultsStreamHandlerFunc.init( - [cap = resultStreamHandler.getExternalPusher()]() mutable { return kj::mv(cap); }); - break; - case rpc::JsRpcTarget::CallParams::ResultsStreamHandler::STREAM_SINK: - getResultsStreamHandlerFunc.init( - [cap = resultStreamHandler.getStreamSink()]() mutable { return kj::mv(cap); }); - break; - } - - kj::Maybe>> callPipelineFulfiller; - - // We need another ref to this fulfiller for the error callback. It can rely on being - // destroyed at the same time as the success callback. - kj::Maybe&> callPipelineFulfillerRef; - - KJ_IF_SOME(ss, invocationResult.streamSink) { - // Since we have a StreamSink, it's important that we hook up the pipeline for that - // immediately. Annoyingly, that also means we need to hook up a pipeline for - // callPipeline, which we don't actually have yet, so we need to promise-ify it. - - // If the caller requested using ExternalPusher for the results, then it should also use - // ExternalPusher for the params. (Theoretically we could support mix-and-match but... - // let's keep it simple.) - KJ_REQUIRE(resultStreamHandler.isStreamSink(), - "RPC params used StreamSink when result is supposed to use ExternalPusher"); - - auto paf = kj::newPromiseAndFulfiller(); - callPipelineFulfillerRef = *paf.fulfiller; - callPipelineFulfiller = kj::mv(paf.fulfiller); - - capnp::PipelineBuilder builder(16); - builder.setCallPipeline(kj::mv(paf.promise)); - builder.setParamsStreamSink(ss); - callContext.setPipeline(builder.build()); - } - // HACK: Cap'n Proto call contexts are documented as being pointer-like types where the // backing object's lifetime is that of the RPC call, but in reality they are refcounted // under the hood. Since we'll be executing the call in the JS microtask queue, we have no @@ -1164,20 +950,19 @@ class JsRpcTargetBase: public rpc::JsRpcTarget::Server { // must take full ownership. [callContext, ownCallContext = kj::mv(ownCallContext), paramDisposalGroup = kj::mv(invocationResult.paramDisposalGroup), - paramsStreamSink = kj::mv(invocationResult.streamSink), - getResultsStreamHandlerFunc = kj::mv(getResultsStreamHandlerFunc), - callPipelineFulfiller = kj::mv(callPipelineFulfiller)]( + resultsExternalPusher = + params.getResultsStreamHandler().getExternalPusher()]( jsg::Lock& js, jsg::Value value) mutable { jsg::JsValue resultValue(value.getHandle(js)); rpc::JsRpcTarget::CallResults::Builder results = nullptr; - auto maybePipeline = - serializeJsValueWithPipeline(js, resultValue, [&](capnp::MessageSize hint) { + auto maybePipeline = serializeJsValueWithPipeline( + js, resultsExternalPusher, resultValue, [&](capnp::MessageSize hint) { hint.wordCount += capnp::sizeInWords(); hint.capCount += 1; // for callPipeline results = callContext.initResults(hint); return results.initResult(); - }, kj::mv(getResultsStreamHandlerFunc)); + }); KJ_SWITCH_ONEOF(maybePipeline) { KJ_CASE_ONEOF(obj, MakeCallPipeline::Object) { @@ -1202,25 +987,9 @@ class JsRpcTargetBase: public rpc::JsRpcTarget::Server { } } - KJ_IF_SOME(cpf, callPipelineFulfiller) { - cpf->fulfill(results.getCallPipeline()); - } - - KJ_IF_SOME(ss, paramsStreamSink) { - results.setParamsStreamSink(kj::mv(ss)); - } - // paramDisposalGroup will be destroyed when we return (or when this lambda is destroyed // as a result of the promise being rejected). This will implicitly dispose the param // stubs. - }), - ctx.addFunctor([callPipelineFulfillerRef](jsg::Lock& js, jsg::Value&& error) { - // If we set up a `callPipeline` early, we have to make sure it propagates the error. - // (Otherwise we get a PromiseFulfiller error instead, which is pretty useless...) - KJ_IF_SOME(cpf, callPipelineFulfillerRef) { - cpf.reject(js.exceptionToKj(error.addRef(js))); - } - js.throwException(kj::mv(error)); }))); if (ctx.hasOutputGate()) { @@ -1416,7 +1185,6 @@ class JsRpcTargetBase: public rpc::JsRpcTarget::Server { struct InvocationResult { v8::Local returnValue; kj::Maybe> paramDisposalGroup; - kj::Maybe streamSink; }; // Deserializes the arguments and passes them to the given function. @@ -1426,7 +1194,7 @@ class JsRpcTargetBase: public rpc::JsRpcTarget::Server { kj::Maybe args) { // We received arguments from the client, deserialize them back to JS. KJ_IF_SOME(a, args) { - auto [value, disposalGroup, streamSink] = deserializeJsValue(js, a); + auto [value, disposalGroup] = deserializeJsValue(js, a); auto args = KJ_REQUIRE_NONNULL( value.tryCast(), "expected JsArray when deserializing arguments."); // Call() expects a `Local []`... so we populate an array. @@ -1439,7 +1207,6 @@ class JsRpcTargetBase: public rpc::JsRpcTarget::Server { InvocationResult result{ .returnValue = jsg::check(fn->Call(js.v8Context(), thisArg, arguments.size(), arguments.data())), - .streamSink = kj::mv(streamSink), }; if (!disposalGroup->empty()) { result.paramDisposalGroup = kj::mv(disposalGroup); @@ -1478,7 +1245,6 @@ class JsRpcTargetBase: public rpc::JsRpcTarget::Server { } kj::Maybe> paramDisposalGroup; - kj::Maybe streamSink; // We're going to pass all the arguments from the client to the function, but we are going to // insert `env` and `ctx`. We assume the last two arguments that the function declared are @@ -1486,8 +1252,7 @@ class JsRpcTargetBase: public rpc::JsRpcTarget::Server { kj::Maybe argsArrayFromClient; size_t argCountFromClient = 0; KJ_IF_SOME(a, args) { - auto [value, disposalGroup, ss] = deserializeJsValue(js, a); - streamSink = kj::mv(ss); + auto [value, disposalGroup] = deserializeJsValue(js, a); auto array = KJ_REQUIRE_NONNULL( value.tryCast(), "expected JsArray when deserializing arguments."); @@ -1540,7 +1305,6 @@ class JsRpcTargetBase: public rpc::JsRpcTarget::Server { .returnValue = jsg::check(fn->Call(js.v8Context(), thisArg, arguments.size(), arguments.data())), .paramDisposalGroup = kj::mv(paramDisposalGroup), - .streamSink = kj::mv(streamSink), }; }; }; @@ -1657,9 +1421,9 @@ static rpc::JsRpcTarget::Client makeJsRpcTargetForSingleLoopbackCall( template MakeCallPipeline::Result serializeJsValueWithPipeline(jsg::Lock& js, + rpc::JsValue::ExternalPusher::Client externalPusher, jsg::JsValue value, - Func makeBuilder, - RpcSerializerExternalHandler::GetStreamHandlerFunc getStreamHandlerFunc) { + Func makeBuilder) { auto maybeDispose = js.withinHandleScope([&]() -> kj::Maybe> { jsg::JsObject obj = KJ_UNWRAP_OR(value.tryCast(), { return kj::none; }); @@ -1683,7 +1447,7 @@ MakeCallPipeline::Result serializeJsValueWithPipeline(jsg::Lock& js, // Now that we've extracted our dispose function, we can serialize our value. RpcSerializerExternalHandler externalHandler( - RpcSerializerExternalHandler::TRANSFER, kj::mv(getStreamHandlerFunc)); + RpcSerializerExternalHandler::TRANSFER, kj::mv(externalPusher)); serializeJsValue(js, value, externalHandler, kj::mv(makeBuilder)); auto stubDisposers = externalHandler.releaseStubDisposers(); diff --git a/src/workerd/api/worker-rpc.h b/src/workerd/api/worker-rpc.h index a1d42818b2e..185c5aacc3f 100644 --- a/src/workerd/api/worker-rpc.h +++ b/src/workerd/api/worker-rpc.h @@ -36,18 +36,12 @@ constexpr size_t MAX_JS_RPC_MESSAGE_SIZE = 1u << 25; // handle RPC specially should use this. class RpcSerializerExternalHandler final: public jsg::Serializer::ExternalHandler { public: - using GetStreamSinkFunc = kj::Function; - using GetExternalPusherFunc = kj::Function; - using GetStreamHandlerFunc = kj::OneOf; - enum StubOwnership { TRANSFER, DUPLICATE }; - // `getStreamSinkFunc` will be called at most once, the first time a stream is encountered in - // serialization, to get the StreamSink that should be used. RpcSerializerExternalHandler( - StubOwnership stubOwnership, GetStreamHandlerFunc getStreamHandlerFunc) + StubOwnership stubOwnership, rpc::JsValue::ExternalPusher::Client externalPusher) : stubOwnership(stubOwnership), - getStreamHandlerFunc(kj::mv(getStreamHandlerFunc)) {} + externalPusher(kj::mv(externalPusher)) {} inline StubOwnership getStubOwnership() { return stubOwnership; @@ -55,9 +49,10 @@ class RpcSerializerExternalHandler final: public jsg::Serializer::ExternalHandle using BuilderCallback = kj::Function; - // Returns the ExternalPusher for the remote side. Returns kj::none if this serialization is - // using the older StreamSink approach, in which case you need to call `writeStream()` instead. - kj::Maybe getExternalPusher(); + // Returns the ExternalPusher for the remote side. + rpc::JsValue::ExternalPusher::Client getExternalPusher() { + return externalPusher; + } // Add an external. The value is a callback which will be invoked later to fill in the // JsValue::External in the Cap'n Proto structure. The external array cannot be allocated until @@ -67,13 +62,6 @@ class RpcSerializerExternalHandler final: public jsg::Serializer::ExternalHandle externals.add(kj::mv(callback)); } - // Like write(), but use this when there is also a stream associated with the external, i.e. - // using StreamSink. This returns a capability which will eventually resolve to the stream. - // - // StreamSink is being replaced by ExternalPusher. You should only call writeStream() if - // getExternalPusher() returns kj::none. If ExternalPusher is available, this method will throw. - capnp::Capability::Client writeStream(BuilderCallback callback); - // Build the final list. capnp::Orphan> build(capnp::Orphanage orphanage); @@ -108,61 +96,39 @@ class RpcSerializerExternalHandler final: public jsg::Serializer::ExternalHandle private: StubOwnership stubOwnership; - GetStreamHandlerFunc getStreamHandlerFunc; + rpc::JsValue::ExternalPusher::Client externalPusher; kj::Vector externals; kj::Vector> stubDisposers; - - kj::Maybe streamSink; - kj::Maybe externalPusher; }; class RpcStubDisposalGroup; -class StreamSinkImpl; // ExternalHandler used when deserializing RPC messages. Deserialization functions with which to // handle RPC specially should use this. class RpcDeserializerExternalHandler final: public jsg::Deserializer::ExternalHandler { public: - // The `streamSink` parameter should be provided if a StreamSink already exists, e.g. when - // deserializing results. If omitted, it will be constructed on-demand. - RpcDeserializerExternalHandler(capnp::List::Reader externals, - RpcStubDisposalGroup& disposalGroup, - kj::Maybe streamSink) + RpcDeserializerExternalHandler( + capnp::List::Reader externals, RpcStubDisposalGroup& disposalGroup) : externals(externals), - disposalGroup(disposalGroup), - streamSink(streamSink) {} + disposalGroup(disposalGroup) {} ~RpcDeserializerExternalHandler() noexcept(false); // Read and return the next external. rpc::JsValue::External::Reader read(); - // Call immediately after `read()` when reading an external that is associated with a stream. - // `stream` is published back to the sender via StreamSink. - void setLastStream(capnp::Capability::Client stream); - // All stubs deserialized as part of a particular parameter or result set are placed in a // common disposal group so that they can be disposed together. RpcStubDisposalGroup& getDisposalGroup() { return disposalGroup; } - // Call after serialization is complete to get the StreamSink that should handle streams found - // while deserializing. Returns none if there were no streams. This should only be called if - // a `streamSink` was NOT passed to the constructor. - kj::Maybe getStreamSink() { - return kj::mv(streamSinkCap); - } - private: capnp::List::Reader externals; uint i = 0; kj::UnwindDetector unwindDetector; RpcStubDisposalGroup& disposalGroup; - - kj::Maybe streamSink; - kj::Maybe streamSinkCap; }; // Base class for objects which can be sent over RPC, but doing so actually sends a stub which diff --git a/src/workerd/io/worker-interface.capnp b/src/workerd/io/worker-interface.capnp index f41e4fb9aaf..db4a01c8746 100644 --- a/src/workerd/io/worker-interface.capnp +++ b/src/workerd/io/worker-interface.capnp @@ -471,35 +471,24 @@ struct JsValue { } readableStream :group { - # A ReadableStream. The sender of the JsValue will use the associated StreamSink to open a - # stream of type `ByteStream`. + # A ReadableStream. stream @10 :ExternalPusher.InputStream; - # If present, a stream pushed using the destination isolate's ExternalPusher. - # - # If null (deprecated), then the sender will use the associated StreamSink to open a stream - # of type `ByteStream`. StreamSink is in the process of being replaced by ExternalPusher. + # A stream pushed using the destination isolate's ExternalPusher. encoding @4 :StreamEncoding; # Bytes read from the stream have this encoding. - expectedLength :union { - # NOTE: This is obsolete when `stream` is set. Instead, the length is passed to - # ExternalPusher.pushByteStream(). - - unknown @5 :Void; - known @6 :UInt64; + obsoleteUnion :union { + # With `StreamSink`, the length was sent here. With `ExternalPusher` it's sent to + # `ExternalPusher.pushByteStream()`. + obsolete5 @5 :Void; + obsolete6 @6 :UInt64; } } - abortTrigger @7 :Void; - # Indicates that an `AbortTrigger` is being passed, see the `AbortTrigger` interface for the - # mechanism used to trigger the abort later. This is modeled as a stream, since the sender is - # the one that will later on send the abort signal. This external will have an associated - # stream in the corresponding `StreamSink` with type `AbortTrigger`. - # - # TODO(soon): This will be obsolete when we stop using `StreamSink`; `abortSignal` will - # replace it. (The name is wrong anyway -- this is the signal end, not the trigger end.) + obsolete7 @7 :Void; + # Obsolete `abortTrigger` (the `StreamSink` way of handling `AbortSignal`). abortSignal @11 :ExternalPusher.AbortSignal; # Indicates that an `AbortSignal` is being passed. @@ -512,29 +501,6 @@ struct JsValue { } } - interface StreamSink { - # A JsValue may contain streams that flow from the sender to the receiver. We don't want such - # streams to require a network round trip before the stream can begin pumping. So, we need a - # place to start sending bytes right away. - # - # To that end, JsRpcTarget::call() returns a `paramsStreamSink`. Immediately upon sending the - # request, the client can use promise pipelining to begin pushing bytes to this object. - # - # Similarly, the caller passes a `resultsStreamSink` to the callee. If the response contains - # any streams, it can start pushing to this immediately after responding. - # - # TODO(soon): This design is overcomplicated since it requires allocating StreamSinks for every - # request, even when not used, and requires a lot of weird promise magic. The newer - # ExternalPusher design is simpler, and only incurs overhead when used. Once all of - # production has been updated to understand ExternalPusher, then we can flip an autogate to - # use it by default. Once that has rolled out globally, we can remove StreamSink. - - startStream @0 (externalIndex :UInt32) -> (stream :Capability); - # Opens a stream corresponding to the given index in the JsValue's `externals` array. The type - # of capability returned depends on the type of external. E.g. for `readableStream`, it is a - # `ByteStream`. - } - interface ExternalPusher { # This object allows "pushing" external objects to a target isolate, so that they can # sublequently be referenced by a `JsValue.External`. This allows implementing externals where @@ -650,13 +616,11 @@ interface JsRpcTarget extends(JsValue.ExternalPusher) $Cxx.allowCancellation { } resultsStreamHandler :union { - # We're in the process of switching from `StreamSink` to `ExternalPusher`. A caller will only - # offer one or the other, and expect the callee to use that. (Initially, callers will still - # send StreamSink for backwards-compatibility, but once all recipients are able to understand - # ExternalPusher, we'll flip an autogate to make callers send it.) + # Historically there was a different way to handle streams, before `ExternalPusher`. Now + # we always use `ExteranlPusher`. - streamSink @4 :JsValue.StreamSink; - # StreamSink used for ReadableStreams found in the results. + obsolete4 @4 :Capability; + # Obsolete StreamSink, replaced by ExternalPusher. externalPusher @5 :JsValue.ExternalPusher; # ExternalPusher object which will push into the caller's isolate. Use this to push externals @@ -685,9 +649,8 @@ interface JsRpcTarget extends(JsValue.ExternalPusher) $Cxx.allowCancellation { # `callPipeline` until the disposer is invoked. If `hasDisposer` is false, `callPipeline` can # safely be dropped immediately. - paramsStreamSink @3 :JsValue.StreamSink; - # StreamSink used for ReadableStreams found in the params. The caller begins sending bytes for - # these streams immediately using promise pipelining. + obsolete3 @3 :Capability; + # Obsolete StreamSink, replaced by ExternalPusher. } call @0 CallParams -> CallResults;