From b433e4a92f179c6af07b9ea8ef9ab4d75ed05d9c Mon Sep 17 00:00:00 2001 From: Kenton Varda Date: Sat, 20 Dec 2025 11:40:36 -0600 Subject: [PATCH 1/6] Intorduce new JsValue.ExternalPusher interface to replace StreamSink. The design of `StreamSink` makes it fairly complicated and somewhat inefficient to implement. For example: * It requires the use of `setPipeline()` so that the caller can start to enable promise pipelining on the `StreamSink` before it actually returns results. * Every call must create a `resultsStreamSink` object in advance, before it knows if there will be any streams in the results. * Generally there's just a lot of contortions involved in supporting it. It occurred to me that a different design is possible: one where we have an object that is created *per-IoContext* (instead of per-call) which can be used to "push" values into that IoContext, so that they can then be referenced as externals by subsequent JsValues. This commit introduces that design, including specifying how it would work to implement `ReadableStream`. Subsequent commits will actually implement this design. Eventually, after everyone in production is updated to understand and then use the new design, we can deprecate and remove StreamSink, thus cleaning up all the mess it created. --- src/workerd/api/worker-rpc.c++ | 4 +- src/workerd/io/worker-interface.capnp | 88 ++++++++++++++++++++++++++- 2 files changed, 87 insertions(+), 5 deletions(-) diff --git a/src/workerd/api/worker-rpc.c++ b/src/workerd/api/worker-rpc.c++ index aa856d2504a..2c8fe55babe 100644 --- a/src/workerd/api/worker-rpc.c++ +++ b/src/workerd/api/worker-rpc.c++ @@ -519,7 +519,7 @@ JsRpcPromiseAndPipeline callImpl(jsg::Lock& js, // after the call completes whether or not it will return any streams. If it's unused, // though, it should only be a couple allocations. auto resultStreamSink = kj::refcounted(); - builder.setResultsStreamSink(kj::addRef(*resultStreamSink)); + builder.getResultsStreamHandler().setStreamSink(kj::addRef(*resultStreamSink)); auto callResult = builder.send(); @@ -1078,7 +1078,7 @@ class JsRpcTargetBase: public rpc::JsRpcTarget::Server { [callContext, ownCallContext = kj::mv(ownCallContext), paramDisposalGroup = kj::mv(invocationResult.paramDisposalGroup), paramsStreamSink = kj::mv(invocationResult.streamSink), - resultStreamSink = params.getResultsStreamSink(), + resultStreamSink = params.getResultsStreamHandler().getStreamSink(), callPipelineFulfiller = kj::mv(callPipelineFulfiller)]( jsg::Lock& js, jsg::Value value) mutable { jsg::JsValue resultValue(value.getHandle(js)); diff --git a/src/workerd/io/worker-interface.capnp b/src/workerd/io/worker-interface.capnp index ab1c8ef4da0..f41e4fb9aaf 100644 --- a/src/workerd/io/worker-interface.capnp +++ b/src/workerd/io/worker-interface.capnp @@ -474,10 +474,19 @@ struct JsValue { # A ReadableStream. The sender of the JsValue will use the associated StreamSink to open a # stream of type `ByteStream`. + stream @10 :ExternalPusher.InputStream; + # If present, a stream pushed using the destination isolate's ExternalPusher. + # + # If null (deprecated), then the sender will use the associated StreamSink to open a stream + # of type `ByteStream`. StreamSink is in the process of being replaced by ExternalPusher. + encoding @4 :StreamEncoding; # Bytes read from the stream have this encoding. expectedLength :union { + # NOTE: This is obsolete when `stream` is set. Instead, the length is passed to + # ExternalPusher.pushByteStream(). + unknown @5 :Void; known @6 :UInt64; } @@ -488,6 +497,12 @@ struct JsValue { # mechanism used to trigger the abort later. This is modeled as a stream, since the sender is # the one that will later on send the abort signal. This external will have an associated # stream in the corresponding `StreamSink` with type `AbortTrigger`. + # + # TODO(soon): This will be obsolete when we stop using `StreamSink`; `abortSignal` will + # replace it. (The name is wrong anyway -- this is the signal end, not the trigger end.) + + abortSignal @11 :ExternalPusher.AbortSignal; + # Indicates that an `AbortSignal` is being passed. subrequestChannelToken @8 :Data; actorClassChannelToken @9 :Data; @@ -507,12 +522,60 @@ struct JsValue { # # Similarly, the caller passes a `resultsStreamSink` to the callee. If the response contains # any streams, it can start pushing to this immediately after responding. + # + # TODO(soon): This design is overcomplicated since it requires allocating StreamSinks for every + # request, even when not used, and requires a lot of weird promise magic. The newer + # ExternalPusher design is simpler, and only incurs overhead when used. Once all of + # production has been updated to understand ExternalPusher, then we can flip an autogate to + # use it by default. Once that has rolled out globally, we can remove StreamSink. startStream @0 (externalIndex :UInt32) -> (stream :Capability); # Opens a stream corresponding to the given index in the JsValue's `externals` array. The type # of capability returned depends on the type of external. E.g. for `readableStream`, it is a # `ByteStream`. } + + interface ExternalPusher { + # This object allows "pushing" external objects to a target isolate, so that they can + # sublequently be referenced by a `JsValue.External`. This allows implementing externals where + # the sender might need to send subsequent information to the receiver *before* the receiver + # has had a chance to call back to request it. For example, when a ReadableStream is sent over + # RPC, the sender will immediately start sending body bytes without waiting for a round trip. + # + # The key to ExternalPusher is that it constructs and returns capabilities pointing at objects + # living directly in the target isolate's runtime. These capabilities have empty interfaces, + # but can be passed back to the target in the `External` table of a `JsValue`. Since the + # capabilities point to objects directly in the recipient's memory space, they can then be + # unwrapped to obtain the underlying local object, which the recipient then uses to back the + # external value delivered to the application. + # + # Note that externals must be pushed BEFORE the JsValue that uses them is sent, so that they + # can be unwrapped immediately when deserializing the value. + + pushByteStream @0 (lengthPlusOne :UInt64 = 0) -> (source :InputStream, sink :ByteStream); + # Creates a readable stream within the remote's memory space. `source` should be placed in a + # sublequent `External` of type `readableStream`. The caller should write bytes to `sink`. + # + # `lengthPlusOne` is the expected length of the stream, plus 1, with zero indicating no + # expectation. This is used e.g. when the `ReadableStream` was created with `FixedLengthStream`. + # (The weird "plus one" encoding is used because Cap'n Proto doesn't have a Maybe. Perhaps we + # can fix this eventually.) + + interface InputStream { + # No methods. This will be unwrapped by the recipient to obtain the underlying local value. + } + + pushAbortSignal @1 () -> (signal :AbortSignal, trigger :AbortTrigger); + + interface AbortSignal { + # No methods. This can be unwrapped by the recipient to obtain a Promise which + # rejects when the signal is aborted. + } + + # TODO(soon): + # - AbortTrigger + # - Promises + } } interface AbortTrigger $Cxx.allowCancellation { @@ -532,7 +595,15 @@ interface AbortTrigger $Cxx.allowCancellation { # be triggered. Otherwise, the cloned signal will treat a dropped cabability as an abort. } -interface JsRpcTarget $Cxx.allowCancellation { +interface JsRpcTarget extends(JsValue.ExternalPusher) $Cxx.allowCancellation { + # Target on which RPC methods may be invoked. + # + # This is the backing capnp type for a JsRpcStub, as well as used to represent top-level RPC + # events. + # + # JsRpcTarget must implement `JsValue.ExternalPusher` to allow externals to be pushed to the + # target in advance of a call that uses them. + struct CallParams { union { methodName @0 :Text; @@ -578,8 +649,19 @@ interface JsRpcTarget $Cxx.allowCancellation { # "bar". } - resultsStreamSink @4 :JsValue.StreamSink; - # StreamSink used for ReadableStreams found in the results. + resultsStreamHandler :union { + # We're in the process of switching from `StreamSink` to `ExternalPusher`. A caller will only + # offer one or the other, and expect the callee to use that. (Initially, callers will still + # send StreamSink for backwards-compatibility, but once all recipients are able to understand + # ExternalPusher, we'll flip an autogate to make callers send it.) + + streamSink @4 :JsValue.StreamSink; + # StreamSink used for ReadableStreams found in the results. + + externalPusher @5 :JsValue.ExternalPusher; + # ExternalPusher object which will push into the caller's isolate. Use this to push externals + # that will be included in the results. + } } struct CallResults { From e8411263bb15fc681ee9ebcc144c331669dfbbb3 Mon Sep 17 00:00:00 2001 From: Kenton Varda Date: Mon, 22 Dec 2025 12:40:56 -0600 Subject: [PATCH 2/6] Update Cap'n Proto to get CapabilityServerSet::tryGetLocalServerSync(). capnp PR: https://github.com/capnproto/capnproto/pull/2475 The comment in this file says not to hand-edit it, but I don't understand how to use update-deps.py to update to a not-yet-merged PR... --- build/deps/gen/deps/dep_capnp_cpp.MODULE.bazel | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/build/deps/gen/deps/dep_capnp_cpp.MODULE.bazel b/build/deps/gen/deps/dep_capnp_cpp.MODULE.bazel index c8b717df7b3..2163b753ae8 100644 --- a/build/deps/gen/deps/dep_capnp_cpp.MODULE.bazel +++ b/build/deps/gen/deps/dep_capnp_cpp.MODULE.bazel @@ -3,9 +3,9 @@ http = use_extension("@//:build/exts/http.bzl", "http") http.archive( name = "capnp-cpp", - sha256 = "fc67ba4a0dbf683ebac4ed3acc0c9b7366a0271766e23c4f4038febf5149a13b", - strip_prefix = "capnproto-capnproto-fd3cc9c/c++", + sha256 = "cbf7dcef02deb3a3addcfefacb76672c46a48c953024860bf80fceabc255d41d", + strip_prefix = "capnproto-capnproto-c1bce20/c++", type = "tgz", - url = "https://github.com/capnproto/capnproto/tarball/fd3cc9ce8e66363bc70a88166e912788fa903173", + url = "https://github.com/capnproto/capnproto/tarball/c1bce2095a8dd76851fe3c1c61550f79b69d671d", ) use_repo(http, "capnp-cpp") From 9cf133b4f5dbe994dfc62211f508ac942c6c7464 Mon Sep 17 00:00:00 2001 From: Kenton Varda Date: Sun, 21 Dec 2025 15:50:10 -0600 Subject: [PATCH 3/6] Wire up ExternalPusher in workers-rpc.c++. This doesn't yet support any actual pushed types, this is just the infrastructure needed to make it available. This introduces an autogate which opts into using ExternalPusher for calls. The caller decides (based on the autogate) whether to use ExternalPusher for the whole call. We can turn on the autogate once all call receivers in production understand the new protocol. --- src/workerd/api/worker-rpc.c++ | 177 +++++++++++++++++++------ src/workerd/api/worker-rpc.h | 17 ++- src/workerd/io/BUILD.bazel | 2 + src/workerd/io/external-pusher.c++ | 12 ++ src/workerd/io/external-pusher.h | 25 ++++ src/workerd/io/io-context.c++ | 8 ++ src/workerd/io/io-context.h | 5 + src/workerd/util/autogate.c++ | 2 + src/workerd/util/autogate.h | 2 + src/workerd/util/completion-membrane.h | 4 + 10 files changed, 212 insertions(+), 42 deletions(-) create mode 100644 src/workerd/io/external-pusher.c++ create mode 100644 src/workerd/io/external-pusher.h diff --git a/src/workerd/api/worker-rpc.c++ b/src/workerd/api/worker-rpc.c++ index 2c8fe55babe..1b82db92909 100644 --- a/src/workerd/api/worker-rpc.c++ +++ b/src/workerd/api/worker-rpc.c++ @@ -100,13 +100,27 @@ class StreamSinkImpl final: public rpc::JsValue::StreamSink::Server, public kj:: kj::Vector table; }; +kj::Maybe RpcSerializerExternalHandler::getExternalPusher() { + KJ_IF_SOME(ep, externalPusher) { + return ep; + } else KJ_IF_SOME(func, getStreamHandlerFunc.tryGet()) { + // First call, set up ExternalPusher. + return externalPusher.emplace(func()); + } else { + // Using StreamSink. + return kj::none; + } +} + capnp::Capability::Client RpcSerializerExternalHandler::writeStream(BuilderCallback callback) { rpc::JsValue::StreamSink::Client* streamSinkPtr; KJ_IF_SOME(ss, streamSink) { streamSinkPtr = &ss; } else { // First stream written, set up the StreamSink. - streamSinkPtr = &streamSink.emplace(getStreamSinkFunc()); + auto& func = KJ_REQUIRE_NONNULL(getStreamHandlerFunc.tryGet(), + "this serialization is not using StreamSink; use getExternalPusher() instead"); + streamSinkPtr = &streamSink.emplace(func()); } auto result = ({ @@ -233,9 +247,15 @@ DeserializeResult deserializeJsValue( // Does deserializeJsValue() and then adds a `dispose()` method to the returned object (if it is // an object) which disposes all stubs therein. -jsg::JsValue deserializeRpcReturnValue( - jsg::Lock& js, rpc::JsRpcTarget::CallResults::Reader callResults, StreamSinkImpl& streamSink) { - auto [value, disposalGroup, _] = deserializeJsValue(js, callResults.getResult(), streamSink); +jsg::JsValue deserializeRpcReturnValue(jsg::Lock& js, + rpc::JsRpcTarget::CallResults::Reader callResults, + kj::Maybe streamSink) { + auto [value, disposalGroup, ss] = deserializeJsValue(js, callResults.getResult(), streamSink); + + if (streamSink == kj::none) { + KJ_REQUIRE(ss == kj::none, + "RPC returned result using StreamSink even though ExternalPusher was provided"); + } // If the object had a disposer on the callee side, it will run when we discard the callPipeline, // so attach that to the disposal group on the caller side. If the returned object did NOT have @@ -480,6 +500,9 @@ JsRpcPromiseAndPipeline callImpl(jsg::Lock& js, kj::Maybe paramsStreamSinkFulfiller; + bool useExternalPusher = + util::Autogate::isEnabled(util::AutogateKey::RPC_USE_EXTERNAL_PUSHER); + KJ_IF_SOME(args, maybeArgs) { // If we have arguments, serialize them. // Note that we may fail to serialize some element, in which case this will throw back to @@ -496,15 +519,22 @@ JsRpcPromiseAndPipeline callImpl(jsg::Lock& js, ? RpcSerializerExternalHandler::DUPLICATE : RpcSerializerExternalHandler::TRANSFER; - RpcSerializerExternalHandler externalHandler( - stubOwnership, [&]() -> rpc::JsValue::StreamSink::Client { - // A stream was encountered in the params, so we must expect the response to contain - // paramsStreamSink. But we don't have the response yet. So, we need to set up a - // temporary promise client, which we hook to the response a little bit later. - auto paf = kj::newPromiseAndFulfiller(); - paramsStreamSinkFulfiller = kj::mv(paf.fulfiller); - return kj::mv(paf.promise); - }); + RpcSerializerExternalHandler::GetStreamHandlerFunc getStreamHandlerFunc; + if (useExternalPusher) { + getStreamHandlerFunc.init( + [&]() -> rpc::JsValue::ExternalPusher::Client { return client; }); + } else { + getStreamHandlerFunc.init([&]() { + // A stream was encountered in the params, so we must expect the response to contain + // paramsStreamSink. But we don't have the response yet. So, we need to set up a + // temporary promise client, which we hook to the response a little bit later. + auto paf = kj::newPromiseAndFulfiller(); + paramsStreamSinkFulfiller = kj::mv(paf.fulfiller); + return kj::mv(paf.promise); + }); + } + + RpcSerializerExternalHandler externalHandler(stubOwnership, kj::mv(getStreamHandlerFunc)); serializeJsValue(js, jsg::JsValue(arr), externalHandler, [&](capnp::MessageSize hint) { // TODO(perf): Actually use the size hint. return builder.getOperation().initCallWithArgs(); @@ -515,11 +545,20 @@ JsRpcPromiseAndPipeline callImpl(jsg::Lock& js, builder.getOperation().setGetProperty(); } - // Unfortunately, we always have to send a `resultsStreamSink` because we don't know until - // after the call completes whether or not it will return any streams. If it's unused, - // though, it should only be a couple allocations. - auto resultStreamSink = kj::refcounted(); - builder.getResultsStreamHandler().setStreamSink(kj::addRef(*resultStreamSink)); + kj::Maybe> resultStreamSink; + if (useExternalPusher) { + // Unfortunately, we always have to send the ExternalPusher since we don't know whether the + // call will return any streams (or other pushed externals). Luckily, it's a + // one-per-IoContext object, not a big deal. (It'll take a slot on the capnp export table + // though.) + builder.getResultsStreamHandler().setExternalPusher(ioContext.getExternalPusher()); + } else { + // Unfortunately, we always have to send a `resultsStreamSink` because we don't know until + // after the call completes whether or not it will return any streams. If it's unused, + // though, it should only be a couple allocations. + builder.getResultsStreamHandler().setStreamSink( + kj::addRef(*resultStreamSink.emplace(kj::refcounted()))); + } auto callResult = builder.send(); @@ -533,13 +572,24 @@ JsRpcPromiseAndPipeline callImpl(jsg::Lock& js, // here, which is filled in later on to point at the JsRpcPromise, if and when one is created. auto weakRef = kj::atomicRefcounted(); + // HACK: Make sure that any calls to the ExternalPusher get to us before we try to + // deserialize the result. A weird quirk of Cap'n Proto is that return values arrive faster + // than calls by 1 turn of the event loop, so if we just insert a turn here we should be OK. + // + // Note that key to this working is the fact that the continuation returns a Promise, even + // though it is initialized with an immediate value. This forces the extra turn. + auto promise = callResult.then( + [](auto resp) -> kj::Promise> { + return kj::mv(resp); + }); + // RemotePromise lets us consume its pipeline and promise portions independently; we consume // the promise here and we consume the pipeline below, both via kj::mv(). - auto jsPromise = ioContext.awaitIo(js, kj::mv(callResult), + auto jsPromise = ioContext.awaitIo(js, kj::mv(promise), [weakRef = kj::atomicAddRef(*weakRef), resultStreamSink = kj::mv(resultStreamSink)]( jsg::Lock& js, capnp::Response response) mutable -> jsg::Value { - auto jsResult = deserializeRpcReturnValue(js, response, *resultStreamSink); + auto jsResult = deserializeRpcReturnValue(js, response, resultStreamSink); if (weakRef->disposed) { // The promise was explicitly disposed before it even resolved. This means we must dispose @@ -914,7 +964,7 @@ template MakeCallPipeline::Result serializeJsValueWithPipeline(jsg::Lock& js, jsg::JsValue value, Func makeBuilder, - RpcSerializerExternalHandler::GetStreamSinkFunc getStreamSinkFunc); + RpcSerializerExternalHandler::GetStreamHandlerFunc getStreamSinkFunc); // Callee-side implementation of JsRpcTarget. // @@ -933,7 +983,8 @@ class JsRpcTargetBase: public rpc::JsRpcTarget::Server { : enterIsolateAndCall(ctx.makeReentryCallback( [this, &ctx](Worker::Lock& lock, CallContext callContext) { return callImpl(lock, ctx, callContext); - })) {} + })), + externalPusher(ctx.getExternalPusher()) {} // Constructor use by EntrypointJsRpcTarget, which is revoked and destroyed before the IoContext // can possibly be canceled. It can just use ctx.run(). @@ -944,7 +995,8 @@ class JsRpcTargetBase: public rpc::JsRpcTarget::Server { return ctx.run([this, &ctx, callContext](Worker::Lock& lock) mutable { return callImpl(lock, ctx, callContext); }); - }) {} + }), + externalPusher(ctx.getExternalPusher()) {} struct EnvCtx { v8::Local env; @@ -968,8 +1020,10 @@ class JsRpcTargetBase: public rpc::JsRpcTarget::Server { // Handles the delivery of JS RPC method calls. kj::Promise call(CallContext callContext) override { + co_await kj::yield(); + // Try to execute the requested method. - return enterIsolateAndCall(callContext).catch_([](kj::Exception&& e) { + co_return co_await enterIsolateAndCall(callContext).catch_([](kj::Exception&& e) { if (jsg::isTunneledException(e.getDescription())) { // Annotate exceptions in RPC worker calls as remote exceptions. auto description = jsg::stripRemoteExceptionPrefix(e.getDescription()); @@ -983,6 +1037,18 @@ class JsRpcTargetBase: public rpc::JsRpcTarget::Server { }); } + // Implements ExternalPusher by forwarding to the shared implementation. + // + // Note JsRpcTarget has to implement `ExternalPusher` directly rather than providing a method + // like `getExternalPusher()` because it's important that the pushes arrive before the call, and + // the ordering can only be guaranteed if they're on the same object. + kj::Promise pushByteStream(PushByteStreamContext context) override { + return externalPusher->pushByteStream(context); + } + kj::Promise pushAbortSignal(PushAbortSignalContext context) override { + return externalPusher->pushAbortSignal(context); + } + KJ_DISALLOW_COPY_AND_MOVE(JsRpcTargetBase); private: @@ -992,6 +1058,8 @@ class JsRpcTargetBase: public rpc::JsRpcTarget::Server { // using IoContext::makeReentryCallback(). kj::Function(CallContext callContext)> enterIsolateAndCall; + kj::Rc externalPusher; + // Returns true if the given name cannot be used as a method on this type. virtual bool isReservedName(kj::StringPtr name) = 0; @@ -1036,6 +1104,19 @@ class JsRpcTargetBase: public rpc::JsRpcTarget::Server { // Given a handle for the result, if it's a promise, await the promise, then serialize the // final result for return. + RpcSerializerExternalHandler::GetStreamHandlerFunc getResultsStreamHandlerFunc; + auto resultStreamHandler = params.getResultsStreamHandler(); + switch (resultStreamHandler.which()) { + case rpc::JsRpcTarget::CallParams::ResultsStreamHandler::EXTERNAL_PUSHER: + getResultsStreamHandlerFunc.init( + [cap = resultStreamHandler.getExternalPusher()]() mutable { return kj::mv(cap); }); + break; + case rpc::JsRpcTarget::CallParams::ResultsStreamHandler::STREAM_SINK: + getResultsStreamHandlerFunc.init( + [cap = resultStreamHandler.getStreamSink()]() mutable { return kj::mv(cap); }); + break; + } + kj::Maybe>> callPipelineFulfiller; // We need another ref to this fulfiller for the error callback. It can rely on being @@ -1047,6 +1128,12 @@ class JsRpcTargetBase: public rpc::JsRpcTarget::Server { // immediately. Annoyingly, that also means we need to hook up a pipeline for // callPipeline, which we don't actually have yet, so we need to promise-ify it. + // If the caller requested using ExternalPusher for the results, then it should also use + // ExternalPusher for the params. (Theoretically we could support mix-and-match but... + // let's keep it simple.) + KJ_REQUIRE(resultStreamHandler.isStreamSink(), + "RPC params used StreamSink when result is supposed to use ExternalPusher"); + auto paf = kj::newPromiseAndFulfiller(); callPipelineFulfillerRef = *paf.fulfiller; callPipelineFulfiller = kj::mv(paf.fulfiller); @@ -1078,7 +1165,7 @@ class JsRpcTargetBase: public rpc::JsRpcTarget::Server { [callContext, ownCallContext = kj::mv(ownCallContext), paramDisposalGroup = kj::mv(invocationResult.paramDisposalGroup), paramsStreamSink = kj::mv(invocationResult.streamSink), - resultStreamSink = params.getResultsStreamHandler().getStreamSink(), + getResultsStreamHandlerFunc = kj::mv(getResultsStreamHandlerFunc), callPipelineFulfiller = kj::mv(callPipelineFulfiller)]( jsg::Lock& js, jsg::Value value) mutable { jsg::JsValue resultValue(value.getHandle(js)); @@ -1090,10 +1177,7 @@ class JsRpcTargetBase: public rpc::JsRpcTarget::Server { hint.capCount += 1; // for callPipeline results = callContext.initResults(hint); return results.initResult(); - }, [&]() -> rpc::JsValue::StreamSink::Client { - // The results contain streams. We return the resultsStreamSink passed in the request. - return kj::mv(resultStreamSink); - }); + }, kj::mv(getResultsStreamHandlerFunc)); KJ_SWITCH_ONEOF(maybePipeline) { KJ_CASE_ONEOF(obj, MakeCallPipeline::Object) { @@ -1575,7 +1659,7 @@ template MakeCallPipeline::Result serializeJsValueWithPipeline(jsg::Lock& js, jsg::JsValue value, Func makeBuilder, - RpcSerializerExternalHandler::GetStreamSinkFunc getStreamSinkFunc) { + RpcSerializerExternalHandler::GetStreamHandlerFunc getStreamHandlerFunc) { auto maybeDispose = js.withinHandleScope([&]() -> kj::Maybe> { jsg::JsObject obj = KJ_UNWRAP_OR(value.tryCast(), { return kj::none; }); @@ -1599,7 +1683,7 @@ MakeCallPipeline::Result serializeJsValueWithPipeline(jsg::Lock& js, // Now that we've extracted our dispose function, we can serialize our value. RpcSerializerExternalHandler externalHandler( - RpcSerializerExternalHandler::TRANSFER, kj::mv(getStreamSinkFunc)); + RpcSerializerExternalHandler::TRANSFER, kj::mv(getStreamHandlerFunc)); serializeJsValue(js, value, externalHandler, kj::mv(makeBuilder)); auto stubDisposers = externalHandler.releaseStubDisposers(); @@ -2021,20 +2105,35 @@ class JsRpcSessionCustomEvent::ServerTopLevelMembrane final: public capnp::Membr public kj::Refcounted { public: explicit ServerTopLevelMembrane(kj::Own> doneFulfiller) - : doneFulfiller(kj::mv(doneFulfiller)) {} + : completionMembrane(kj::refcounted(kj::mv(doneFulfiller))) {} + ~ServerTopLevelMembrane() noexcept(false) { - KJ_IF_SOME(f, doneFulfiller) { - f->reject( + KJ_IF_SOME(cm, completionMembrane) { + cm->reject( KJ_EXCEPTION(DISCONNECTED, "JS RPC session canceled without calling an RPC method.")); } } kj::Maybe inboundCall( uint64_t interfaceId, uint16_t methodId, capnp::Capability::Client target) override { - auto f = kj::mv(JSG_REQUIRE_NONNULL( - doneFulfiller, Error, "Only one RPC method call is allowed on this object.")); - doneFulfiller = kj::none; - return capnp::membrane(kj::mv(target), kj::refcounted(kj::mv(f))); + if (interfaceId == capnp::typeId()) { + // JsRpcTarget::call() + auto cm = kj::mv(JSG_REQUIRE_NONNULL( + completionMembrane, Error, "Only one RPC method call is allowed on this object.")); + completionMembrane = kj::none; + return capnp::membrane(kj::mv(target), kj::mv(cm)); + } else if (interfaceId == capnp::typeId()) { + // ExternalPusher methods + // + // It's important that we use the same membrane that we'll use for call(), so that + // capabilities returned by the ExternalPusher will be wrapped in the membrane, hence they + // will be unwrapped when passed back through the membrane again to call(). + auto& cm = *JSG_REQUIRE_NONNULL( + completionMembrane, Error, "getExternalPusher() must be called before call()"); + return capnp::membrane(kj::mv(target), kj::addRef(cm)); + } else { + KJ_FAIL_ASSERT("unkown interface ID for JsRpcTarget"); + } } kj::Maybe outboundCall( @@ -2047,7 +2146,7 @@ class JsRpcSessionCustomEvent::ServerTopLevelMembrane final: public capnp::Membr } private: - kj::Maybe>> doneFulfiller; + kj::Maybe> completionMembrane; }; kj::Promise JsRpcSessionCustomEvent::run( diff --git a/src/workerd/api/worker-rpc.h b/src/workerd/api/worker-rpc.h index 3b724cea7cf..b02a3095bce 100644 --- a/src/workerd/api/worker-rpc.h +++ b/src/workerd/api/worker-rpc.h @@ -37,14 +37,17 @@ constexpr size_t MAX_JS_RPC_MESSAGE_SIZE = 1u << 25; class RpcSerializerExternalHandler final: public jsg::Serializer::ExternalHandler { public: using GetStreamSinkFunc = kj::Function; + using GetExternalPusherFunc = kj::Function; + using GetStreamHandlerFunc = kj::OneOf; enum StubOwnership { TRANSFER, DUPLICATE }; // `getStreamSinkFunc` will be called at most once, the first time a stream is encountered in // serialization, to get the StreamSink that should be used. - RpcSerializerExternalHandler(StubOwnership stubOwnership, GetStreamSinkFunc getStreamSinkFunc) + RpcSerializerExternalHandler( + StubOwnership stubOwnership, GetStreamHandlerFunc getStreamHandlerFunc) : stubOwnership(stubOwnership), - getStreamSinkFunc(kj::mv(getStreamSinkFunc)) {} + getStreamHandlerFunc(kj::mv(getStreamHandlerFunc)) {} inline StubOwnership getStubOwnership() { return stubOwnership; @@ -52,6 +55,10 @@ class RpcSerializerExternalHandler final: public jsg::Serializer::ExternalHandle using BuilderCallback = kj::Function; + // Returns the ExternalPusher for the remote side. Returns kj::none if this serialization is + // using the older StreamSink approach, in which case you need to call `writeStream()` instead. + kj::Maybe getExternalPusher(); + // Add an external. The value is a callback which will be invoked later to fill in the // JsValue::External in the Cap'n Proto structure. The external array cannot be allocated until // the number of externals are known, which is only after all calls to `add()` have completed, @@ -62,6 +69,9 @@ class RpcSerializerExternalHandler final: public jsg::Serializer::ExternalHandle // Like write(), but use this when there is also a stream associated with the external, i.e. // using StreamSink. This returns a capability which will eventually resolve to the stream. + // + // StreamSink is being replaced by ExternalPusher. You should only call writeStream() if + // getExternalPusher() returns kj::none. If ExternalPusher is available, this method will throw. capnp::Capability::Client writeStream(BuilderCallback callback); // Build the final list. @@ -98,12 +108,13 @@ class RpcSerializerExternalHandler final: public jsg::Serializer::ExternalHandle private: StubOwnership stubOwnership; - GetStreamSinkFunc getStreamSinkFunc; + GetStreamHandlerFunc getStreamHandlerFunc; kj::Vector externals; kj::Vector> stubDisposers; kj::Maybe streamSink; + kj::Maybe externalPusher; }; class RpcStubDisposalGroup; diff --git a/src/workerd/io/BUILD.bazel b/src/workerd/io/BUILD.bazel index 715156fb7dd..73b432077eb 100644 --- a/src/workerd/io/BUILD.bazel +++ b/src/workerd/io/BUILD.bazel @@ -34,6 +34,7 @@ wd_cc_library( # TODO(cleanup): Fix this. srcs = [ "compatibility-date.c++", + "external-pusher.c++", "features.c++", "hibernation-manager.c++", "io-channels.c++", @@ -47,6 +48,7 @@ wd_cc_library( ] + ["//src/workerd/api:srcs"], hdrs = [ "compatibility-date.h", + "external-pusher.h", "hibernation-manager.h", "io-channels.h", "io-context.h", diff --git a/src/workerd/io/external-pusher.c++ b/src/workerd/io/external-pusher.c++ new file mode 100644 index 00000000000..81d89cb191b --- /dev/null +++ b/src/workerd/io/external-pusher.c++ @@ -0,0 +1,12 @@ +// Copyright (c) 2025 Cloudflare, Inc. +// Licensed under the Apache 2.0 license found in the LICENSE file or at: +// https://opensource.org/licenses/Apache-2.0 + +#include +#include + +namespace workerd { + +// TODO(now): implement + +} // namespace workerd diff --git a/src/workerd/io/external-pusher.h b/src/workerd/io/external-pusher.h new file mode 100644 index 00000000000..c20a257a755 --- /dev/null +++ b/src/workerd/io/external-pusher.h @@ -0,0 +1,25 @@ +// Copyright (c) 2025 Cloudflare, Inc. +// Licensed under the Apache 2.0 license found in the LICENSE file or at: +// https://opensource.org/licenses/Apache-2.0 + +#pragma once + +#include + +#include +#include + +namespace workerd { + +// Implements JsValue.ExternalPusher from worker-interface.capnp. +// +// ExternalPusher allows a remote peer to "push" certain kinds of objects into our address space +// so that they can then be embedded in `JsValue` as `External` values. +class ExternalPusherImpl: public rpc::JsValue::ExternalPusher::Server, public kj::Refcounted { + public: + ExternalPusherImpl(capnp::ByteStreamFactory& byteStreamFactory) {} + + // TODO(now): Implement methods. +}; + +} // namespace workerd diff --git a/src/workerd/io/io-context.c++ b/src/workerd/io/io-context.c++ index 760e314bc16..a26b1bc25ec 100644 --- a/src/workerd/io/io-context.c++ +++ b/src/workerd/io/io-context.c++ @@ -908,6 +908,14 @@ kj::Date IoContext::now() { return now(getCurrentIncomingRequest()); } +kj::Rc IoContext::getExternalPusher() { + KJ_IF_SOME(ep, externalPusher) { + return ep.addRef(); + } else { + return externalPusher.emplace(kj::rc(getByteStreamFactory())).addRef(); + } +} + kj::Own IoContext::getSubrequestNoChecks( kj::FunctionParam(TraceContext&, IoChannelFactory&)> func, SubrequestOptions options) { diff --git a/src/workerd/io/io-context.h b/src/workerd/io/io-context.h index ff8d6f6ae0c..774f6996a6b 100644 --- a/src/workerd/io/io-context.h +++ b/src/workerd/io/io-context.h @@ -9,6 +9,7 @@ #include #include +#include #include #include #include @@ -772,6 +773,8 @@ class IoContext final: public kj::Refcounted, private kj::TaskSet::ErrorHandler return thread.getHeaderIds(); } + kj::Rc getExternalPusher(); + // Subrequest channel numbers for the two special channels. // NULL = The channel used by global fetch() when the Request has no fetcher attached. // NEXT = DEPRECATED: The fetcher attached to Requests delivered by a FetchEvent, so that we can @@ -1012,6 +1015,8 @@ class IoContext final: public kj::Refcounted, private kj::TaskSet::ErrorHandler // objects which reference `deleteQueue` in their destructors. OwnedObjectList ownedObjects; + kj::Maybe> externalPusher; + // Implementation detail of makeCachePutStream(). // TODO: Used for Cache PUT serialization. diff --git a/src/workerd/util/autogate.c++ b/src/workerd/util/autogate.c++ index 5a8b5a8d9dd..25bedf3e996 100644 --- a/src/workerd/util/autogate.c++ +++ b/src/workerd/util/autogate.c++ @@ -35,6 +35,8 @@ kj::StringPtr KJ_STRINGIFY(AutogateKey key) { return "compression-stream-use-state-machine"_kj; case AutogateKey::IDENTITY_TRANSFORM_STREAM_USE_STATE_MACHINE: return "identity-transform-stream-use-state-machine"_kj; + case AutogateKey::RPC_USE_EXTERNAL_PUSHER: + return "rpc-use-external-pusher"_kj; case AutogateKey::NumOfKeys: KJ_FAIL_ASSERT("NumOfKeys should not be used in getName"); } diff --git a/src/workerd/util/autogate.h b/src/workerd/util/autogate.h index 82a4af828e8..a23921a3d90 100644 --- a/src/workerd/util/autogate.h +++ b/src/workerd/util/autogate.h @@ -30,6 +30,8 @@ enum class AutogateKey { COMPRESSION_STREAM_USE_STATE_MACHINE, // Switch the IdentityTransformStream to use the new state machine-based impl IDENTITY_TRANSFORM_STREAM_USE_STATE_MACHINE, + // Use ExternalPusher instead of StreamSink to handle streams in RPC. + RPC_USE_EXTERNAL_PUSHER, NumOfKeys // Reserved for iteration. }; diff --git a/src/workerd/util/completion-membrane.h b/src/workerd/util/completion-membrane.h index fe63afcf1e8..50276935c57 100644 --- a/src/workerd/util/completion-membrane.h +++ b/src/workerd/util/completion-membrane.h @@ -30,6 +30,10 @@ class CompletionMembrane final: public capnp::MembranePolicy, public kj::Refcoun return kj::addRef(*this); } + void reject(kj::Exception&& e) { + doneFulfiller->reject(kj::mv(e)); + } + private: kj::Own> doneFulfiller; }; From 0751311c0d3d79de8133005e4f768847c301fe61 Mon Sep 17 00:00:00 2001 From: Kenton Varda Date: Sun, 21 Dec 2025 16:02:47 -0600 Subject: [PATCH 4/6] Implement ReadableStream serialization via ExternalPusher. --- src/workerd/api/streams/readable.c++ | 67 ++++++++++---- src/workerd/io/external-pusher.c++ | 132 ++++++++++++++++++++++++++- src/workerd/io/external-pusher.h | 18 +++- 3 files changed, 194 insertions(+), 23 deletions(-) diff --git a/src/workerd/api/streams/readable.c++ b/src/workerd/api/streams/readable.c++ index 71f068a862e..54b9af9a75e 100644 --- a/src/workerd/api/streams/readable.c++ +++ b/src/workerd/api/streams/readable.c++ @@ -531,6 +531,10 @@ jsg::Optional ByteLengthQueuingStrategy::size( namespace { +// TODO(cleanup): These classes have been copied to external-pusher.c++. The copies here can be +// deleted as soon as we've switched from StreamSink to ExternalPusher and can delete all the +// StreamSink-related code. For now I'm not trying to avoid duplication. + // HACK: We need as async pipe, like kj::newOneWayPipe(), except supporting explicit end(). So we // wrap the two ends of the pipe in special adapters that track whether end() was called. class ExplicitEndOutputPipeAdapter final: public capnp::ExplicitEndOutputStream { @@ -688,18 +692,37 @@ void ReadableStream::serialize(jsg::Lock& js, jsg::Serializer& serializer) { StreamEncoding encoding = controller.getPreferredEncoding(); auto expectedLength = controller.tryGetLength(encoding); - auto streamCap = externalHandler->writeStream( - [encoding, expectedLength](rpc::JsValue::External::Builder builder) mutable { - auto rs = builder.initReadableStream(); - rs.setEncoding(encoding); - KJ_IF_SOME(l, expectedLength) { - rs.getExpectedLength().setKnown(l); + capnp::ByteStream::Client streamCap = [&]() { + KJ_IF_SOME(pusher, externalHandler->getExternalPusher()) { + auto req = pusher.pushByteStreamRequest(capnp::MessageSize{2, 0}); + KJ_IF_SOME(el, expectedLength) { + req.setLengthPlusOne(el + 1); + } + auto pipeline = req.sendForPipeline(); + + externalHandler->write([encoding, expectedLength, source = pipeline.getSource()]( + rpc::JsValue::External::Builder builder) mutable { + auto rs = builder.initReadableStream(); + rs.setStream(kj::mv(source)); + rs.setEncoding(encoding); + }); + + return pipeline.getSink(); + } else { + return externalHandler + ->writeStream( + [encoding, expectedLength](rpc::JsValue::External::Builder builder) mutable { + auto rs = builder.initReadableStream(); + rs.setEncoding(encoding); + KJ_IF_SOME(l, expectedLength) { + rs.getExpectedLength().setKnown(l); + } + }).castAs(); } - }); + }(); kj::Own kjStream = - ioctx.getByteStreamFactory().capnpToKjExplicitEnd( - kj::mv(streamCap).castAs()); + ioctx.getByteStreamFactory().capnpToKjExplicitEnd(kj::mv(streamCap)); auto sink = newSystemStream(kj::mv(kjStream), encoding, ioctx); @@ -730,21 +753,25 @@ jsg::Ref ReadableStream::deserialize( auto& ioctx = IoContext::current(); - kj::Maybe expectedLength; - auto el = rs.getExpectedLength(); - if (el.isKnown()) { - expectedLength = el.getKnown(); - } + kj::Own in; + if (rs.hasStream()) { + in = ioctx.getExternalPusher()->unwrapStream(rs.getStream()); + } else { + kj::Maybe expectedLength; + auto el = rs.getExpectedLength(); + if (el.isKnown()) { + expectedLength = el.getKnown(); + } - auto pipe = kj::newOneWayPipe(expectedLength); + auto pipe = kj::newOneWayPipe(expectedLength); - auto endedFlag = kj::refcounted>(false); + auto endedFlag = kj::refcounted>(false); - auto out = kj::heap(kj::mv(pipe.out), kj::addRef(*endedFlag)); - auto in = - kj::heap(kj::mv(pipe.in), kj::mv(endedFlag), expectedLength); + auto out = kj::heap(kj::mv(pipe.out), kj::addRef(*endedFlag)); + in = kj::heap(kj::mv(pipe.in), kj::mv(endedFlag), expectedLength); - externalHandler->setLastStream(ioctx.getByteStreamFactory().kjToCapnp(kj::mv(out))); + externalHandler->setLastStream(ioctx.getByteStreamFactory().kjToCapnp(kj::mv(out))); + } return js.alloc(ioctx, kj::heap(newSystemStream(kj::mv(in), encoding, ioctx), ioctx)); diff --git a/src/workerd/io/external-pusher.c++ b/src/workerd/io/external-pusher.c++ index 81d89cb191b..c53ec2eefb9 100644 --- a/src/workerd/io/external-pusher.c++ +++ b/src/workerd/io/external-pusher.c++ @@ -7,6 +7,136 @@ namespace workerd { -// TODO(now): implement +// ======================================================================================= +// ReadableStream handling + +namespace { + +// TODO(cleanup): These classes have been copied from streams/readable.c++. The copies there can be +// deleted as soon as we've switched from StreamSink to ExternalPusher and can delete all the +// StreamSink-related code. For now I'm not trying to avoid duplication. + +// HACK: We need as async pipe, like kj::newOneWayPipe(), except supporting explicit end(). So we +// wrap the two ends of the pipe in special adapters that track whether end() was called. +class ExplicitEndOutputPipeAdapter final: public capnp::ExplicitEndOutputStream { + public: + ExplicitEndOutputPipeAdapter( + kj::Own inner, kj::Own> ended) + : inner(kj::mv(inner)), + ended(kj::mv(ended)) {} + + kj::Promise write(kj::ArrayPtr buffer) override { + return KJ_REQUIRE_NONNULL(inner)->write(buffer); + } + kj::Promise write(kj::ArrayPtr> pieces) override { + return KJ_REQUIRE_NONNULL(inner)->write(pieces); + } + + kj::Maybe> tryPumpFrom( + kj::AsyncInputStream& input, uint64_t amount) override { + return KJ_REQUIRE_NONNULL(inner)->tryPumpFrom(input, amount); + } + + kj::Promise whenWriteDisconnected() override { + return KJ_REQUIRE_NONNULL(inner)->whenWriteDisconnected(); + } + + kj::Promise end() override { + // Signal to the other side that end() was actually called. + ended->getWrapped() = true; + inner = kj::none; + return kj::READY_NOW; + } + + private: + kj::Maybe> inner; + kj::Own> ended; +}; + +class ExplicitEndInputPipeAdapter final: public kj::AsyncInputStream { + public: + ExplicitEndInputPipeAdapter(kj::Own inner, + kj::Own> ended, + kj::Maybe expectedLength) + : inner(kj::mv(inner)), + ended(kj::mv(ended)), + expectedLength(expectedLength) {} + + kj::Promise tryRead(void* buffer, size_t minBytes, size_t maxBytes) override { + size_t result = co_await inner->tryRead(buffer, minBytes, maxBytes); + + KJ_IF_SOME(l, expectedLength) { + KJ_ASSERT(result <= l); + l -= result; + if (l == 0) { + // If we got all the bytes we expected, we treat this as a successful end, because the + // underlying KJ pipe is not actually going to wait for the other side to drop. This is + // consistent with the behavior of Content-Length in HTTP anyway. + ended->getWrapped() = true; + } + } + + if (result < minBytes) { + // Verify that end() was called. + if (!ended->getWrapped()) { + JSG_FAIL_REQUIRE(Error, "ReadableStream received over RPC disconnected prematurely."); + } + } + co_return result; + } + + kj::Maybe tryGetLength() override { + return inner->tryGetLength(); + } + + kj::Promise pumpTo(kj::AsyncOutputStream& output, uint64_t amount) override { + return inner->pumpTo(output, amount); + } + + private: + kj::Own inner; + kj::Own> ended; + kj::Maybe expectedLength; +}; + +} // namespace + +class ExternalPusherImpl::InputStreamImpl final: public ExternalPusher::InputStream::Server { + public: + InputStreamImpl(kj::Own stream): stream(kj::mv(stream)) {} + + kj::Maybe> stream; +}; + +kj::Promise ExternalPusherImpl::pushByteStream(PushByteStreamContext context) { + kj::Maybe expectedLength; + auto lp1 = context.getParams().getLengthPlusOne(); + if (lp1 > 0) { + expectedLength = lp1 - 1; + } + + auto pipe = kj::newOneWayPipe(expectedLength); + + auto endedFlag = kj::refcounted>(false); + + auto out = kj::heap(kj::mv(pipe.out), kj::addRef(*endedFlag)); + auto in = + kj::heap(kj::mv(pipe.in), kj::mv(endedFlag), expectedLength); + + auto results = context.initResults(capnp::MessageSize{4, 2}); + + results.setSource(inputStreamSet.add(kj::heap(kj::mv(in)))); + results.setSink(byteStreamFactory.kjToCapnp(kj::mv(out))); + return kj::READY_NOW; +} + +kj::Own ExternalPusherImpl::unwrapStream( + ExternalPusher::InputStream::Client cap) { + auto& unwrapped = KJ_REQUIRE_NONNULL( + inputStreamSet.tryGetLocalServerSync(cap), "pushed external is not a byte stream"); + + return KJ_REQUIRE_NONNULL(kj::mv(kj::downcast(unwrapped).stream), + "pushed byte stream has already been consumed"); +} } // namespace workerd diff --git a/src/workerd/io/external-pusher.h b/src/workerd/io/external-pusher.h index c20a257a755..ff415296821 100644 --- a/src/workerd/io/external-pusher.h +++ b/src/workerd/io/external-pusher.h @@ -11,15 +11,29 @@ namespace workerd { +using kj::byte; + // Implements JsValue.ExternalPusher from worker-interface.capnp. // // ExternalPusher allows a remote peer to "push" certain kinds of objects into our address space // so that they can then be embedded in `JsValue` as `External` values. class ExternalPusherImpl: public rpc::JsValue::ExternalPusher::Server, public kj::Refcounted { public: - ExternalPusherImpl(capnp::ByteStreamFactory& byteStreamFactory) {} + ExternalPusherImpl(capnp::ByteStreamFactory& byteStreamFactory) + : byteStreamFactory(byteStreamFactory) {} + + using ExternalPusher = rpc::JsValue::ExternalPusher; + + kj::Own unwrapStream(ExternalPusher::InputStream::Client cap); + + kj::Promise pushByteStream(PushByteStreamContext context) override; + + private: + capnp::ByteStreamFactory& byteStreamFactory; + + capnp::CapabilityServerSet inputStreamSet; - // TODO(now): Implement methods. + class InputStreamImpl; }; } // namespace workerd From 9148a4deb9f943cc1c4f7f1ff9c85e77e257c3f1 Mon Sep 17 00:00:00 2001 From: Kenton Varda Date: Sun, 21 Dec 2025 19:12:02 -0600 Subject: [PATCH 5/6] Implement AbortSignal serialization via ExternalPusher. --- src/workerd/api/basics.c++ | 56 +++++++++++++++----- src/workerd/api/basics.h | 5 +- src/workerd/io/external-pusher.c++ | 85 ++++++++++++++++++++++++++++++ src/workerd/io/external-pusher.h | 18 +++++++ 4 files changed, 147 insertions(+), 17 deletions(-) diff --git a/src/workerd/api/basics.c++ b/src/workerd/api/basics.c++ index 73bf6bca19f..ae2c57feb06 100644 --- a/src/workerd/api/basics.c++ +++ b/src/workerd/api/basics.c++ @@ -574,6 +574,10 @@ class AbortTriggerRpcClient final { namespace { // The jsrpc handler that receives aborts from the remote and triggers them locally +// +// TODO(cleanup): This class has been copied to external-pusher.c++. The copy here can be +// deleted as soon as we've switched from StreamSink to ExternalPusher and can delete all the +// StreamSink-related code. For now I'm not trying to avoid duplication. class AbortTriggerRpcServer final: public rpc::AbortTrigger::Server { public: AbortTriggerRpcServer(kj::Own> fulfiller, @@ -858,15 +862,28 @@ void AbortSignal::serialize(jsg::Lock& js, jsg::Serializer& serializer) { return; } - auto streamCap = externalHandler - ->writeStream([&](rpc::JsValue::External::Builder builder) mutable { - builder.setAbortTrigger(); - }).castAs(); + auto triggerCap = [&]() -> rpc::AbortTrigger::Client { + KJ_IF_SOME(pusher, externalHandler->getExternalPusher()) { + auto pipeline = pusher.pushAbortSignalRequest(capnp::MessageSize{2, 0}).sendForPipeline(); + + externalHandler->write( + [signal = pipeline.getSignal()](rpc::JsValue::External::Builder builder) mutable { + builder.setAbortSignal(kj::mv(signal)); + }); + + return pipeline.getTrigger(); + } else { + return externalHandler + ->writeStream([&](rpc::JsValue::External::Builder builder) mutable { + builder.setAbortTrigger(); + }).castAs(); + } + }(); auto& ioContext = IoContext::current(); // Keep track of every AbortSignal cloned from this one. // If this->triggerAbort(...) is called, each rpcClient will be informed. - rpcClients.add(ioContext.addObject(kj::heap(kj::mv(streamCap)))); + rpcClients.add(ioContext.addObject(kj::heap(kj::mv(triggerCap)))); } jsg::Ref AbortSignal::deserialize( @@ -890,20 +907,31 @@ jsg::Ref AbortSignal::deserialize( return js.alloc(/* exception */ kj::none, /* maybeReason */ kj::none, flag); } - auto reader = externalHandler->read(); - KJ_REQUIRE(reader.isAbortTrigger(), "external table slot type does't match serialization tag"); - // The AbortSignalImpl will receive any remote triggerAbort requests and fulfill the promise with the reason for abort auto signal = js.alloc(/* exception */ kj::none, /* maybeReason */ kj::none, flag); - auto paf = kj::newPromiseAndFulfiller(); - auto pendingReason = IoContext::current().addObject(kj::refcounted()); + auto& ioctx = IoContext::current(); + + auto reader = externalHandler->read(); + if (reader.isAbortTrigger()) { + // Old-style StreamSink. + // TODO(cleanup): Remove this once the ExternalPusher autogate has rolled out. + auto paf = kj::newPromiseAndFulfiller(); + auto pendingReason = ioctx.addObject(kj::refcounted()); + + externalHandler->setLastStream( + kj::heap(kj::mv(paf.fulfiller), kj::addRef(*pendingReason))); + signal->rpcAbortPromise = ioctx.addObject(kj::heap(kj::mv(paf.promise))); + signal->pendingReason = kj::mv(pendingReason); + } else { + KJ_REQUIRE(reader.isAbortSignal(), "external table slot type does't match serialization tag"); + + auto resolvedSignal = ioctx.getExternalPusher()->unwrapAbortSignal(reader.getAbortSignal()); - externalHandler->setLastStream( - kj::heap(kj::mv(paf.fulfiller), kj::addRef(*pendingReason))); - signal->rpcAbortPromise = IoContext::current().addObject(kj::heap(kj::mv(paf.promise))); - signal->pendingReason = kj::mv(pendingReason); + signal->rpcAbortPromise = ioctx.addObject(kj::heap(kj::mv(resolvedSignal.signal))); + signal->pendingReason = ioctx.addObject(kj::mv(resolvedSignal.reason)); + } return signal; } diff --git a/src/workerd/api/basics.h b/src/workerd/api/basics.h index 5aac73e3ea7..8e75294a051 100644 --- a/src/workerd/api/basics.h +++ b/src/workerd/api/basics.h @@ -8,6 +8,7 @@ // TODO(cleanup): Rename to events.h? #include +#include #include #include #include @@ -571,9 +572,7 @@ class AbortSignal final: public EventTarget { jsg::Optional> maybeReason = kj::none, Flag flag = Flag::NONE); - using PendingReason = kj::RefcountedWrapper< - kj::OneOf /* v8Serialized */, kj::Exception /* if capability is dropped */ - >>; + using PendingReason = ExternalPusherImpl::PendingAbortReason; // The AbortSignal explicitly does not expose a constructor(). It is // illegal for user code to create an AbortSignal directly. diff --git a/src/workerd/io/external-pusher.c++ b/src/workerd/io/external-pusher.c++ index c53ec2eefb9..382577d35a9 100644 --- a/src/workerd/io/external-pusher.c++ +++ b/src/workerd/io/external-pusher.c++ @@ -139,4 +139,89 @@ kj::Own ExternalPusherImpl::unwrapStream( "pushed byte stream has already been consumed"); } +// ======================================================================================= +// AbortSignal handling + +namespace { + +// The jsrpc handler that receives aborts from the remote and triggers them locally +// +// TODO(cleanup): This class has been copied from external-pusher.c++. The copy there can be +// deleted as soon as we've switched from StreamSink to ExternalPusher and can delete all the +// StreamSink-related code. For now I'm not trying to avoid duplication. +class AbortTriggerRpcServer final: public rpc::AbortTrigger::Server { + public: + AbortTriggerRpcServer(kj::Own> fulfiller, + kj::Own&& pendingReason) + : fulfiller(kj::mv(fulfiller)), + pendingReason(kj::mv(pendingReason)) {} + + kj::Promise abort(AbortContext abortCtx) override { + auto params = abortCtx.getParams(); + auto reason = params.getReason().getV8Serialized(); + + pendingReason->getWrapped() = kj::heapArray(reason.asBytes()); + fulfiller->fulfill(); + return kj::READY_NOW; + } + + kj::Promise release(ReleaseContext releaseCtx) override { + released = true; + return kj::READY_NOW; + } + + ~AbortTriggerRpcServer() noexcept(false) { + if (pendingReason->getWrapped() != nullptr) { + // Already triggered + return; + } + + if (!released) { + pendingReason->getWrapped() = JSG_KJ_EXCEPTION(FAILED, DOMAbortError, + "An AbortSignal received over RPC was implicitly aborted because the connection back to " + "its trigger was lost."); + } + + // Always fulfill the promise in case the AbortSignal was waiting + fulfiller->fulfill(); + } + + private: + kj::Own> fulfiller; + kj::Own pendingReason; + bool released = false; +}; + +} // namespace + +class ExternalPusherImpl::AbortSignalImpl final: public ExternalPusher::AbortSignal::Server { + public: + AbortSignalImpl(AbortSignal content): content(kj::mv(content)) {} + + kj::Maybe content; +}; + +kj::Promise ExternalPusherImpl::pushAbortSignal(PushAbortSignalContext context) { + auto paf = kj::newPromiseAndFulfiller(); + auto pendingReason = kj::refcounted(); + + auto results = context.initResults(capnp::MessageSize{4, 2}); + + results.setTrigger( + kj::heap(kj::mv(paf.fulfiller), kj::addRef(*pendingReason))); + results.setSignal(abortSignalSet.add( + kj::heap(AbortSignal{kj::mv(paf.promise), kj::mv(pendingReason)}))); + + return kj::READY_NOW; +} + +ExternalPusherImpl::AbortSignal ExternalPusherImpl::unwrapAbortSignal( + ExternalPusher::AbortSignal::Client cap) { + auto& unwrapped = KJ_REQUIRE_NONNULL( + abortSignalSet.tryGetLocalServerSync(cap), "pushed external is not an AbortSignal"); + + return KJ_REQUIRE_NONNULL(kj::mv(kj::downcast(unwrapped).content), + "pushed AbortSignal has already been consumed"); +} + } // namespace workerd diff --git a/src/workerd/io/external-pusher.h b/src/workerd/io/external-pusher.h index ff415296821..5d3139d34a7 100644 --- a/src/workerd/io/external-pusher.h +++ b/src/workerd/io/external-pusher.h @@ -26,14 +26,32 @@ class ExternalPusherImpl: public rpc::JsValue::ExternalPusher::Server, public kj kj::Own unwrapStream(ExternalPusher::InputStream::Client cap); + // Box which holds the reason why an AbortSignal was aborted. May be either: + // - A serialized V8 value if the signal was aborted from JavaScript. + // - A KJ exception if the connection from the trigger was lost. + using PendingAbortReason = kj::RefcountedWrapper, kj::Exception>>; + + struct AbortSignal { + // Resolves when `reason` has been filled in. + kj::Promise signal; + + // The abort reason box, will be uninitialized until `signal` resolves. + kj::Own reason; + }; + + AbortSignal unwrapAbortSignal(ExternalPusher::AbortSignal::Client cap); + kj::Promise pushByteStream(PushByteStreamContext context) override; + kj::Promise pushAbortSignal(PushAbortSignalContext context) override; private: capnp::ByteStreamFactory& byteStreamFactory; capnp::CapabilityServerSet inputStreamSet; + capnp::CapabilityServerSet abortSignalSet; class InputStreamImpl; + class AbortSignalImpl; }; } // namespace workerd From 07846c3835753b64c872fe5ac011a05be057f059 Mon Sep 17 00:00:00 2001 From: Kenton Varda Date: Sun, 21 Dec 2025 19:12:19 -0600 Subject: [PATCH 6/6] Test e-order in js-rpc-test when embedding complex externals. This was missing, and an earlier version of ExternalPusher broke e-order but I didn't notice initially. --- src/workerd/api/tests/js-rpc-test.js | 29 ++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/src/workerd/api/tests/js-rpc-test.js b/src/workerd/api/tests/js-rpc-test.js index cbe74d18e33..2f9e84ccabc 100644 --- a/src/workerd/api/tests/js-rpc-test.js +++ b/src/workerd/api/tests/js-rpc-test.js @@ -2066,3 +2066,32 @@ export let sendServiceStubOverRpc = { } }, }; + +// Make sure that calls are delivered in e-order, even in the presence of pushed externals. +export let eOrderTest = { + async test(controller, env, ctx) { + let abortController = new AbortController(); + let abortSignal = abortController.signal; + + let readableController; + let readableStream = new ReadableStream({ + start(c) { + readableController = c; + }, + }); + + let stub = await env.MyService.makeCounter(0); + + let promises = []; + promises.push(stub.increment(1)); + promises.push(stub.increment(1)); + promises.push(stub.increment(1, abortSignal)); + promises.push(stub.increment(1)); + promises.push(stub.increment(1, readableStream)); + promises.push(stub.increment(1)); + + let results = await Promise.all(promises); + + assert.deepEqual(results, [1, 2, 3, 4, 5, 6]); + }, +};