diff --git a/build/deps/gen/deps/dep_capnp_cpp.MODULE.bazel b/build/deps/gen/deps/dep_capnp_cpp.MODULE.bazel index 6d1cb726fbf..2163b753ae8 100644 --- a/build/deps/gen/deps/dep_capnp_cpp.MODULE.bazel +++ b/build/deps/gen/deps/dep_capnp_cpp.MODULE.bazel @@ -3,9 +3,9 @@ http = use_extension("@//:build/exts/http.bzl", "http") http.archive( name = "capnp-cpp", - sha256 = "0884582731fb8b9a6ef7a0d58056535d0480533256958d3eaae189d706bc43aa", - strip_prefix = "capnproto-capnproto-0776402/c++", + sha256 = "cbf7dcef02deb3a3addcfefacb76672c46a48c953024860bf80fceabc255d41d", + strip_prefix = "capnproto-capnproto-c1bce20/c++", type = "tgz", - url = "https://github.com/capnproto/capnproto/tarball/07764022ba75c6924a250d5be0bf2e83250602a0", + url = "https://github.com/capnproto/capnproto/tarball/c1bce2095a8dd76851fe3c1c61550f79b69d671d", ) use_repo(http, "capnp-cpp") diff --git a/src/workerd/api/basics.c++ b/src/workerd/api/basics.c++ index 73bf6bca19f..ae2c57feb06 100644 --- a/src/workerd/api/basics.c++ +++ b/src/workerd/api/basics.c++ @@ -574,6 +574,10 @@ class AbortTriggerRpcClient final { namespace { // The jsrpc handler that receives aborts from the remote and triggers them locally +// +// TODO(cleanup): This class has been copied to external-pusher.c++. The copy here can be +// deleted as soon as we've switched from StreamSink to ExternalPusher and can delete all the +// StreamSink-related code. For now I'm not trying to avoid duplication. class AbortTriggerRpcServer final: public rpc::AbortTrigger::Server { public: AbortTriggerRpcServer(kj::Own> fulfiller, @@ -858,15 +862,28 @@ void AbortSignal::serialize(jsg::Lock& js, jsg::Serializer& serializer) { return; } - auto streamCap = externalHandler - ->writeStream([&](rpc::JsValue::External::Builder builder) mutable { - builder.setAbortTrigger(); - }).castAs(); + auto triggerCap = [&]() -> rpc::AbortTrigger::Client { + KJ_IF_SOME(pusher, externalHandler->getExternalPusher()) { + auto pipeline = pusher.pushAbortSignalRequest(capnp::MessageSize{2, 0}).sendForPipeline(); + + externalHandler->write( + [signal = pipeline.getSignal()](rpc::JsValue::External::Builder builder) mutable { + builder.setAbortSignal(kj::mv(signal)); + }); + + return pipeline.getTrigger(); + } else { + return externalHandler + ->writeStream([&](rpc::JsValue::External::Builder builder) mutable { + builder.setAbortTrigger(); + }).castAs(); + } + }(); auto& ioContext = IoContext::current(); // Keep track of every AbortSignal cloned from this one. // If this->triggerAbort(...) is called, each rpcClient will be informed. - rpcClients.add(ioContext.addObject(kj::heap(kj::mv(streamCap)))); + rpcClients.add(ioContext.addObject(kj::heap(kj::mv(triggerCap)))); } jsg::Ref AbortSignal::deserialize( @@ -890,20 +907,31 @@ jsg::Ref AbortSignal::deserialize( return js.alloc(/* exception */ kj::none, /* maybeReason */ kj::none, flag); } - auto reader = externalHandler->read(); - KJ_REQUIRE(reader.isAbortTrigger(), "external table slot type does't match serialization tag"); - // The AbortSignalImpl will receive any remote triggerAbort requests and fulfill the promise with the reason for abort auto signal = js.alloc(/* exception */ kj::none, /* maybeReason */ kj::none, flag); - auto paf = kj::newPromiseAndFulfiller(); - auto pendingReason = IoContext::current().addObject(kj::refcounted()); + auto& ioctx = IoContext::current(); + + auto reader = externalHandler->read(); + if (reader.isAbortTrigger()) { + // Old-style StreamSink. + // TODO(cleanup): Remove this once the ExternalPusher autogate has rolled out. + auto paf = kj::newPromiseAndFulfiller(); + auto pendingReason = ioctx.addObject(kj::refcounted()); + + externalHandler->setLastStream( + kj::heap(kj::mv(paf.fulfiller), kj::addRef(*pendingReason))); + signal->rpcAbortPromise = ioctx.addObject(kj::heap(kj::mv(paf.promise))); + signal->pendingReason = kj::mv(pendingReason); + } else { + KJ_REQUIRE(reader.isAbortSignal(), "external table slot type does't match serialization tag"); + + auto resolvedSignal = ioctx.getExternalPusher()->unwrapAbortSignal(reader.getAbortSignal()); - externalHandler->setLastStream( - kj::heap(kj::mv(paf.fulfiller), kj::addRef(*pendingReason))); - signal->rpcAbortPromise = IoContext::current().addObject(kj::heap(kj::mv(paf.promise))); - signal->pendingReason = kj::mv(pendingReason); + signal->rpcAbortPromise = ioctx.addObject(kj::heap(kj::mv(resolvedSignal.signal))); + signal->pendingReason = ioctx.addObject(kj::mv(resolvedSignal.reason)); + } return signal; } diff --git a/src/workerd/api/basics.h b/src/workerd/api/basics.h index 5aac73e3ea7..8e75294a051 100644 --- a/src/workerd/api/basics.h +++ b/src/workerd/api/basics.h @@ -8,6 +8,7 @@ // TODO(cleanup): Rename to events.h? #include +#include #include #include #include @@ -571,9 +572,7 @@ class AbortSignal final: public EventTarget { jsg::Optional> maybeReason = kj::none, Flag flag = Flag::NONE); - using PendingReason = kj::RefcountedWrapper< - kj::OneOf /* v8Serialized */, kj::Exception /* if capability is dropped */ - >>; + using PendingReason = ExternalPusherImpl::PendingAbortReason; // The AbortSignal explicitly does not expose a constructor(). It is // illegal for user code to create an AbortSignal directly. diff --git a/src/workerd/api/streams/readable.c++ b/src/workerd/api/streams/readable.c++ index 01407efa993..b2016b17af6 100644 --- a/src/workerd/api/streams/readable.c++ +++ b/src/workerd/api/streams/readable.c++ @@ -519,6 +519,10 @@ jsg::Optional ByteLengthQueuingStrategy::size( namespace { +// TODO(cleanup): These classes have been copied to external-pusher.c++. The copies here can be +// deleted as soon as we've switched from StreamSink to ExternalPusher and can delete all the +// StreamSink-related code. For now I'm not trying to avoid duplication. + // HACK: We need as async pipe, like kj::newOneWayPipe(), except supporting explicit end(). So we // wrap the two ends of the pipe in special adapters that track whether end() was called. class ExplicitEndOutputPipeAdapter final: public capnp::ExplicitEndOutputStream { @@ -676,18 +680,37 @@ void ReadableStream::serialize(jsg::Lock& js, jsg::Serializer& serializer) { StreamEncoding encoding = controller.getPreferredEncoding(); auto expectedLength = controller.tryGetLength(encoding); - auto streamCap = externalHandler->writeStream( - [encoding, expectedLength](rpc::JsValue::External::Builder builder) mutable { - auto rs = builder.initReadableStream(); - rs.setEncoding(encoding); - KJ_IF_SOME(l, expectedLength) { - rs.getExpectedLength().setKnown(l); + capnp::ByteStream::Client streamCap = [&]() { + KJ_IF_SOME(pusher, externalHandler->getExternalPusher()) { + auto req = pusher.pushByteStreamRequest(capnp::MessageSize{2, 0}); + KJ_IF_SOME(el, expectedLength) { + req.setLengthPlusOne(el + 1); + } + auto pipeline = req.sendForPipeline(); + + externalHandler->write([encoding, expectedLength, source = pipeline.getSource()]( + rpc::JsValue::External::Builder builder) mutable { + auto rs = builder.initReadableStream(); + rs.setStream(kj::mv(source)); + rs.setEncoding(encoding); + }); + + return pipeline.getSink(); + } else { + return externalHandler + ->writeStream( + [encoding, expectedLength](rpc::JsValue::External::Builder builder) mutable { + auto rs = builder.initReadableStream(); + rs.setEncoding(encoding); + KJ_IF_SOME(l, expectedLength) { + rs.getExpectedLength().setKnown(l); + } + }).castAs(); } - }); + }(); kj::Own kjStream = - ioctx.getByteStreamFactory().capnpToKjExplicitEnd( - kj::mv(streamCap).castAs()); + ioctx.getByteStreamFactory().capnpToKjExplicitEnd(kj::mv(streamCap)); auto sink = newSystemStream(kj::mv(kjStream), encoding, ioctx); @@ -718,21 +741,25 @@ jsg::Ref ReadableStream::deserialize( auto& ioctx = IoContext::current(); - kj::Maybe expectedLength; - auto el = rs.getExpectedLength(); - if (el.isKnown()) { - expectedLength = el.getKnown(); - } + kj::Own in; + if (rs.hasStream()) { + in = ioctx.getExternalPusher()->unwrapStream(rs.getStream()); + } else { + kj::Maybe expectedLength; + auto el = rs.getExpectedLength(); + if (el.isKnown()) { + expectedLength = el.getKnown(); + } - auto pipe = kj::newOneWayPipe(expectedLength); + auto pipe = kj::newOneWayPipe(expectedLength); - auto endedFlag = kj::refcounted>(false); + auto endedFlag = kj::refcounted>(false); - auto out = kj::heap(kj::mv(pipe.out), kj::addRef(*endedFlag)); - auto in = - kj::heap(kj::mv(pipe.in), kj::mv(endedFlag), expectedLength); + auto out = kj::heap(kj::mv(pipe.out), kj::addRef(*endedFlag)); + in = kj::heap(kj::mv(pipe.in), kj::mv(endedFlag), expectedLength); - externalHandler->setLastStream(ioctx.getByteStreamFactory().kjToCapnp(kj::mv(out))); + externalHandler->setLastStream(ioctx.getByteStreamFactory().kjToCapnp(kj::mv(out))); + } return js.alloc(ioctx, kj::heap(newSystemStream(kj::mv(in), encoding, ioctx), ioctx)); diff --git a/src/workerd/api/tests/js-rpc-test.js b/src/workerd/api/tests/js-rpc-test.js index cbe74d18e33..2f9e84ccabc 100644 --- a/src/workerd/api/tests/js-rpc-test.js +++ b/src/workerd/api/tests/js-rpc-test.js @@ -2066,3 +2066,32 @@ export let sendServiceStubOverRpc = { } }, }; + +// Make sure that calls are delivered in e-order, even in the presence of pushed externals. +export let eOrderTest = { + async test(controller, env, ctx) { + let abortController = new AbortController(); + let abortSignal = abortController.signal; + + let readableController; + let readableStream = new ReadableStream({ + start(c) { + readableController = c; + }, + }); + + let stub = await env.MyService.makeCounter(0); + + let promises = []; + promises.push(stub.increment(1)); + promises.push(stub.increment(1)); + promises.push(stub.increment(1, abortSignal)); + promises.push(stub.increment(1)); + promises.push(stub.increment(1, readableStream)); + promises.push(stub.increment(1)); + + let results = await Promise.all(promises); + + assert.deepEqual(results, [1, 2, 3, 4, 5, 6]); + }, +}; diff --git a/src/workerd/api/worker-rpc.c++ b/src/workerd/api/worker-rpc.c++ index aa856d2504a..1b82db92909 100644 --- a/src/workerd/api/worker-rpc.c++ +++ b/src/workerd/api/worker-rpc.c++ @@ -100,13 +100,27 @@ class StreamSinkImpl final: public rpc::JsValue::StreamSink::Server, public kj:: kj::Vector table; }; +kj::Maybe RpcSerializerExternalHandler::getExternalPusher() { + KJ_IF_SOME(ep, externalPusher) { + return ep; + } else KJ_IF_SOME(func, getStreamHandlerFunc.tryGet()) { + // First call, set up ExternalPusher. + return externalPusher.emplace(func()); + } else { + // Using StreamSink. + return kj::none; + } +} + capnp::Capability::Client RpcSerializerExternalHandler::writeStream(BuilderCallback callback) { rpc::JsValue::StreamSink::Client* streamSinkPtr; KJ_IF_SOME(ss, streamSink) { streamSinkPtr = &ss; } else { // First stream written, set up the StreamSink. - streamSinkPtr = &streamSink.emplace(getStreamSinkFunc()); + auto& func = KJ_REQUIRE_NONNULL(getStreamHandlerFunc.tryGet(), + "this serialization is not using StreamSink; use getExternalPusher() instead"); + streamSinkPtr = &streamSink.emplace(func()); } auto result = ({ @@ -233,9 +247,15 @@ DeserializeResult deserializeJsValue( // Does deserializeJsValue() and then adds a `dispose()` method to the returned object (if it is // an object) which disposes all stubs therein. -jsg::JsValue deserializeRpcReturnValue( - jsg::Lock& js, rpc::JsRpcTarget::CallResults::Reader callResults, StreamSinkImpl& streamSink) { - auto [value, disposalGroup, _] = deserializeJsValue(js, callResults.getResult(), streamSink); +jsg::JsValue deserializeRpcReturnValue(jsg::Lock& js, + rpc::JsRpcTarget::CallResults::Reader callResults, + kj::Maybe streamSink) { + auto [value, disposalGroup, ss] = deserializeJsValue(js, callResults.getResult(), streamSink); + + if (streamSink == kj::none) { + KJ_REQUIRE(ss == kj::none, + "RPC returned result using StreamSink even though ExternalPusher was provided"); + } // If the object had a disposer on the callee side, it will run when we discard the callPipeline, // so attach that to the disposal group on the caller side. If the returned object did NOT have @@ -480,6 +500,9 @@ JsRpcPromiseAndPipeline callImpl(jsg::Lock& js, kj::Maybe paramsStreamSinkFulfiller; + bool useExternalPusher = + util::Autogate::isEnabled(util::AutogateKey::RPC_USE_EXTERNAL_PUSHER); + KJ_IF_SOME(args, maybeArgs) { // If we have arguments, serialize them. // Note that we may fail to serialize some element, in which case this will throw back to @@ -496,15 +519,22 @@ JsRpcPromiseAndPipeline callImpl(jsg::Lock& js, ? RpcSerializerExternalHandler::DUPLICATE : RpcSerializerExternalHandler::TRANSFER; - RpcSerializerExternalHandler externalHandler( - stubOwnership, [&]() -> rpc::JsValue::StreamSink::Client { - // A stream was encountered in the params, so we must expect the response to contain - // paramsStreamSink. But we don't have the response yet. So, we need to set up a - // temporary promise client, which we hook to the response a little bit later. - auto paf = kj::newPromiseAndFulfiller(); - paramsStreamSinkFulfiller = kj::mv(paf.fulfiller); - return kj::mv(paf.promise); - }); + RpcSerializerExternalHandler::GetStreamHandlerFunc getStreamHandlerFunc; + if (useExternalPusher) { + getStreamHandlerFunc.init( + [&]() -> rpc::JsValue::ExternalPusher::Client { return client; }); + } else { + getStreamHandlerFunc.init([&]() { + // A stream was encountered in the params, so we must expect the response to contain + // paramsStreamSink. But we don't have the response yet. So, we need to set up a + // temporary promise client, which we hook to the response a little bit later. + auto paf = kj::newPromiseAndFulfiller(); + paramsStreamSinkFulfiller = kj::mv(paf.fulfiller); + return kj::mv(paf.promise); + }); + } + + RpcSerializerExternalHandler externalHandler(stubOwnership, kj::mv(getStreamHandlerFunc)); serializeJsValue(js, jsg::JsValue(arr), externalHandler, [&](capnp::MessageSize hint) { // TODO(perf): Actually use the size hint. return builder.getOperation().initCallWithArgs(); @@ -515,11 +545,20 @@ JsRpcPromiseAndPipeline callImpl(jsg::Lock& js, builder.getOperation().setGetProperty(); } - // Unfortunately, we always have to send a `resultsStreamSink` because we don't know until - // after the call completes whether or not it will return any streams. If it's unused, - // though, it should only be a couple allocations. - auto resultStreamSink = kj::refcounted(); - builder.setResultsStreamSink(kj::addRef(*resultStreamSink)); + kj::Maybe> resultStreamSink; + if (useExternalPusher) { + // Unfortunately, we always have to send the ExternalPusher since we don't know whether the + // call will return any streams (or other pushed externals). Luckily, it's a + // one-per-IoContext object, not a big deal. (It'll take a slot on the capnp export table + // though.) + builder.getResultsStreamHandler().setExternalPusher(ioContext.getExternalPusher()); + } else { + // Unfortunately, we always have to send a `resultsStreamSink` because we don't know until + // after the call completes whether or not it will return any streams. If it's unused, + // though, it should only be a couple allocations. + builder.getResultsStreamHandler().setStreamSink( + kj::addRef(*resultStreamSink.emplace(kj::refcounted()))); + } auto callResult = builder.send(); @@ -533,13 +572,24 @@ JsRpcPromiseAndPipeline callImpl(jsg::Lock& js, // here, which is filled in later on to point at the JsRpcPromise, if and when one is created. auto weakRef = kj::atomicRefcounted(); + // HACK: Make sure that any calls to the ExternalPusher get to us before we try to + // deserialize the result. A weird quirk of Cap'n Proto is that return values arrive faster + // than calls by 1 turn of the event loop, so if we just insert a turn here we should be OK. + // + // Note that key to this working is the fact that the continuation returns a Promise, even + // though it is initialized with an immediate value. This forces the extra turn. + auto promise = callResult.then( + [](auto resp) -> kj::Promise> { + return kj::mv(resp); + }); + // RemotePromise lets us consume its pipeline and promise portions independently; we consume // the promise here and we consume the pipeline below, both via kj::mv(). - auto jsPromise = ioContext.awaitIo(js, kj::mv(callResult), + auto jsPromise = ioContext.awaitIo(js, kj::mv(promise), [weakRef = kj::atomicAddRef(*weakRef), resultStreamSink = kj::mv(resultStreamSink)]( jsg::Lock& js, capnp::Response response) mutable -> jsg::Value { - auto jsResult = deserializeRpcReturnValue(js, response, *resultStreamSink); + auto jsResult = deserializeRpcReturnValue(js, response, resultStreamSink); if (weakRef->disposed) { // The promise was explicitly disposed before it even resolved. This means we must dispose @@ -914,7 +964,7 @@ template MakeCallPipeline::Result serializeJsValueWithPipeline(jsg::Lock& js, jsg::JsValue value, Func makeBuilder, - RpcSerializerExternalHandler::GetStreamSinkFunc getStreamSinkFunc); + RpcSerializerExternalHandler::GetStreamHandlerFunc getStreamSinkFunc); // Callee-side implementation of JsRpcTarget. // @@ -933,7 +983,8 @@ class JsRpcTargetBase: public rpc::JsRpcTarget::Server { : enterIsolateAndCall(ctx.makeReentryCallback( [this, &ctx](Worker::Lock& lock, CallContext callContext) { return callImpl(lock, ctx, callContext); - })) {} + })), + externalPusher(ctx.getExternalPusher()) {} // Constructor use by EntrypointJsRpcTarget, which is revoked and destroyed before the IoContext // can possibly be canceled. It can just use ctx.run(). @@ -944,7 +995,8 @@ class JsRpcTargetBase: public rpc::JsRpcTarget::Server { return ctx.run([this, &ctx, callContext](Worker::Lock& lock) mutable { return callImpl(lock, ctx, callContext); }); - }) {} + }), + externalPusher(ctx.getExternalPusher()) {} struct EnvCtx { v8::Local env; @@ -968,8 +1020,10 @@ class JsRpcTargetBase: public rpc::JsRpcTarget::Server { // Handles the delivery of JS RPC method calls. kj::Promise call(CallContext callContext) override { + co_await kj::yield(); + // Try to execute the requested method. - return enterIsolateAndCall(callContext).catch_([](kj::Exception&& e) { + co_return co_await enterIsolateAndCall(callContext).catch_([](kj::Exception&& e) { if (jsg::isTunneledException(e.getDescription())) { // Annotate exceptions in RPC worker calls as remote exceptions. auto description = jsg::stripRemoteExceptionPrefix(e.getDescription()); @@ -983,6 +1037,18 @@ class JsRpcTargetBase: public rpc::JsRpcTarget::Server { }); } + // Implements ExternalPusher by forwarding to the shared implementation. + // + // Note JsRpcTarget has to implement `ExternalPusher` directly rather than providing a method + // like `getExternalPusher()` because it's important that the pushes arrive before the call, and + // the ordering can only be guaranteed if they're on the same object. + kj::Promise pushByteStream(PushByteStreamContext context) override { + return externalPusher->pushByteStream(context); + } + kj::Promise pushAbortSignal(PushAbortSignalContext context) override { + return externalPusher->pushAbortSignal(context); + } + KJ_DISALLOW_COPY_AND_MOVE(JsRpcTargetBase); private: @@ -992,6 +1058,8 @@ class JsRpcTargetBase: public rpc::JsRpcTarget::Server { // using IoContext::makeReentryCallback(). kj::Function(CallContext callContext)> enterIsolateAndCall; + kj::Rc externalPusher; + // Returns true if the given name cannot be used as a method on this type. virtual bool isReservedName(kj::StringPtr name) = 0; @@ -1036,6 +1104,19 @@ class JsRpcTargetBase: public rpc::JsRpcTarget::Server { // Given a handle for the result, if it's a promise, await the promise, then serialize the // final result for return. + RpcSerializerExternalHandler::GetStreamHandlerFunc getResultsStreamHandlerFunc; + auto resultStreamHandler = params.getResultsStreamHandler(); + switch (resultStreamHandler.which()) { + case rpc::JsRpcTarget::CallParams::ResultsStreamHandler::EXTERNAL_PUSHER: + getResultsStreamHandlerFunc.init( + [cap = resultStreamHandler.getExternalPusher()]() mutable { return kj::mv(cap); }); + break; + case rpc::JsRpcTarget::CallParams::ResultsStreamHandler::STREAM_SINK: + getResultsStreamHandlerFunc.init( + [cap = resultStreamHandler.getStreamSink()]() mutable { return kj::mv(cap); }); + break; + } + kj::Maybe>> callPipelineFulfiller; // We need another ref to this fulfiller for the error callback. It can rely on being @@ -1047,6 +1128,12 @@ class JsRpcTargetBase: public rpc::JsRpcTarget::Server { // immediately. Annoyingly, that also means we need to hook up a pipeline for // callPipeline, which we don't actually have yet, so we need to promise-ify it. + // If the caller requested using ExternalPusher for the results, then it should also use + // ExternalPusher for the params. (Theoretically we could support mix-and-match but... + // let's keep it simple.) + KJ_REQUIRE(resultStreamHandler.isStreamSink(), + "RPC params used StreamSink when result is supposed to use ExternalPusher"); + auto paf = kj::newPromiseAndFulfiller(); callPipelineFulfillerRef = *paf.fulfiller; callPipelineFulfiller = kj::mv(paf.fulfiller); @@ -1078,7 +1165,7 @@ class JsRpcTargetBase: public rpc::JsRpcTarget::Server { [callContext, ownCallContext = kj::mv(ownCallContext), paramDisposalGroup = kj::mv(invocationResult.paramDisposalGroup), paramsStreamSink = kj::mv(invocationResult.streamSink), - resultStreamSink = params.getResultsStreamSink(), + getResultsStreamHandlerFunc = kj::mv(getResultsStreamHandlerFunc), callPipelineFulfiller = kj::mv(callPipelineFulfiller)]( jsg::Lock& js, jsg::Value value) mutable { jsg::JsValue resultValue(value.getHandle(js)); @@ -1090,10 +1177,7 @@ class JsRpcTargetBase: public rpc::JsRpcTarget::Server { hint.capCount += 1; // for callPipeline results = callContext.initResults(hint); return results.initResult(); - }, [&]() -> rpc::JsValue::StreamSink::Client { - // The results contain streams. We return the resultsStreamSink passed in the request. - return kj::mv(resultStreamSink); - }); + }, kj::mv(getResultsStreamHandlerFunc)); KJ_SWITCH_ONEOF(maybePipeline) { KJ_CASE_ONEOF(obj, MakeCallPipeline::Object) { @@ -1575,7 +1659,7 @@ template MakeCallPipeline::Result serializeJsValueWithPipeline(jsg::Lock& js, jsg::JsValue value, Func makeBuilder, - RpcSerializerExternalHandler::GetStreamSinkFunc getStreamSinkFunc) { + RpcSerializerExternalHandler::GetStreamHandlerFunc getStreamHandlerFunc) { auto maybeDispose = js.withinHandleScope([&]() -> kj::Maybe> { jsg::JsObject obj = KJ_UNWRAP_OR(value.tryCast(), { return kj::none; }); @@ -1599,7 +1683,7 @@ MakeCallPipeline::Result serializeJsValueWithPipeline(jsg::Lock& js, // Now that we've extracted our dispose function, we can serialize our value. RpcSerializerExternalHandler externalHandler( - RpcSerializerExternalHandler::TRANSFER, kj::mv(getStreamSinkFunc)); + RpcSerializerExternalHandler::TRANSFER, kj::mv(getStreamHandlerFunc)); serializeJsValue(js, value, externalHandler, kj::mv(makeBuilder)); auto stubDisposers = externalHandler.releaseStubDisposers(); @@ -2021,20 +2105,35 @@ class JsRpcSessionCustomEvent::ServerTopLevelMembrane final: public capnp::Membr public kj::Refcounted { public: explicit ServerTopLevelMembrane(kj::Own> doneFulfiller) - : doneFulfiller(kj::mv(doneFulfiller)) {} + : completionMembrane(kj::refcounted(kj::mv(doneFulfiller))) {} + ~ServerTopLevelMembrane() noexcept(false) { - KJ_IF_SOME(f, doneFulfiller) { - f->reject( + KJ_IF_SOME(cm, completionMembrane) { + cm->reject( KJ_EXCEPTION(DISCONNECTED, "JS RPC session canceled without calling an RPC method.")); } } kj::Maybe inboundCall( uint64_t interfaceId, uint16_t methodId, capnp::Capability::Client target) override { - auto f = kj::mv(JSG_REQUIRE_NONNULL( - doneFulfiller, Error, "Only one RPC method call is allowed on this object.")); - doneFulfiller = kj::none; - return capnp::membrane(kj::mv(target), kj::refcounted(kj::mv(f))); + if (interfaceId == capnp::typeId()) { + // JsRpcTarget::call() + auto cm = kj::mv(JSG_REQUIRE_NONNULL( + completionMembrane, Error, "Only one RPC method call is allowed on this object.")); + completionMembrane = kj::none; + return capnp::membrane(kj::mv(target), kj::mv(cm)); + } else if (interfaceId == capnp::typeId()) { + // ExternalPusher methods + // + // It's important that we use the same membrane that we'll use for call(), so that + // capabilities returned by the ExternalPusher will be wrapped in the membrane, hence they + // will be unwrapped when passed back through the membrane again to call(). + auto& cm = *JSG_REQUIRE_NONNULL( + completionMembrane, Error, "getExternalPusher() must be called before call()"); + return capnp::membrane(kj::mv(target), kj::addRef(cm)); + } else { + KJ_FAIL_ASSERT("unkown interface ID for JsRpcTarget"); + } } kj::Maybe outboundCall( @@ -2047,7 +2146,7 @@ class JsRpcSessionCustomEvent::ServerTopLevelMembrane final: public capnp::Membr } private: - kj::Maybe>> doneFulfiller; + kj::Maybe> completionMembrane; }; kj::Promise JsRpcSessionCustomEvent::run( diff --git a/src/workerd/api/worker-rpc.h b/src/workerd/api/worker-rpc.h index 8e5ab8398ae..a1d42818b2e 100644 --- a/src/workerd/api/worker-rpc.h +++ b/src/workerd/api/worker-rpc.h @@ -37,14 +37,17 @@ constexpr size_t MAX_JS_RPC_MESSAGE_SIZE = 1u << 25; class RpcSerializerExternalHandler final: public jsg::Serializer::ExternalHandler { public: using GetStreamSinkFunc = kj::Function; + using GetExternalPusherFunc = kj::Function; + using GetStreamHandlerFunc = kj::OneOf; enum StubOwnership { TRANSFER, DUPLICATE }; // `getStreamSinkFunc` will be called at most once, the first time a stream is encountered in // serialization, to get the StreamSink that should be used. - RpcSerializerExternalHandler(StubOwnership stubOwnership, GetStreamSinkFunc getStreamSinkFunc) + RpcSerializerExternalHandler( + StubOwnership stubOwnership, GetStreamHandlerFunc getStreamHandlerFunc) : stubOwnership(stubOwnership), - getStreamSinkFunc(kj::mv(getStreamSinkFunc)) {} + getStreamHandlerFunc(kj::mv(getStreamHandlerFunc)) {} inline StubOwnership getStubOwnership() { return stubOwnership; @@ -52,6 +55,10 @@ class RpcSerializerExternalHandler final: public jsg::Serializer::ExternalHandle using BuilderCallback = kj::Function; + // Returns the ExternalPusher for the remote side. Returns kj::none if this serialization is + // using the older StreamSink approach, in which case you need to call `writeStream()` instead. + kj::Maybe getExternalPusher(); + // Add an external. The value is a callback which will be invoked later to fill in the // JsValue::External in the Cap'n Proto structure. The external array cannot be allocated until // the number of externals are known, which is only after all calls to `add()` have completed, @@ -62,6 +69,9 @@ class RpcSerializerExternalHandler final: public jsg::Serializer::ExternalHandle // Like write(), but use this when there is also a stream associated with the external, i.e. // using StreamSink. This returns a capability which will eventually resolve to the stream. + // + // StreamSink is being replaced by ExternalPusher. You should only call writeStream() if + // getExternalPusher() returns kj::none. If ExternalPusher is available, this method will throw. capnp::Capability::Client writeStream(BuilderCallback callback); // Build the final list. @@ -98,12 +108,13 @@ class RpcSerializerExternalHandler final: public jsg::Serializer::ExternalHandle private: StubOwnership stubOwnership; - GetStreamSinkFunc getStreamSinkFunc; + GetStreamHandlerFunc getStreamHandlerFunc; kj::Vector externals; kj::Vector> stubDisposers; kj::Maybe streamSink; + kj::Maybe externalPusher; }; class RpcStubDisposalGroup; diff --git a/src/workerd/io/BUILD.bazel b/src/workerd/io/BUILD.bazel index 715156fb7dd..73b432077eb 100644 --- a/src/workerd/io/BUILD.bazel +++ b/src/workerd/io/BUILD.bazel @@ -34,6 +34,7 @@ wd_cc_library( # TODO(cleanup): Fix this. srcs = [ "compatibility-date.c++", + "external-pusher.c++", "features.c++", "hibernation-manager.c++", "io-channels.c++", @@ -47,6 +48,7 @@ wd_cc_library( ] + ["//src/workerd/api:srcs"], hdrs = [ "compatibility-date.h", + "external-pusher.h", "hibernation-manager.h", "io-channels.h", "io-context.h", diff --git a/src/workerd/io/external-pusher.c++ b/src/workerd/io/external-pusher.c++ new file mode 100644 index 00000000000..382577d35a9 --- /dev/null +++ b/src/workerd/io/external-pusher.c++ @@ -0,0 +1,227 @@ +// Copyright (c) 2025 Cloudflare, Inc. +// Licensed under the Apache 2.0 license found in the LICENSE file or at: +// https://opensource.org/licenses/Apache-2.0 + +#include +#include + +namespace workerd { + +// ======================================================================================= +// ReadableStream handling + +namespace { + +// TODO(cleanup): These classes have been copied from streams/readable.c++. The copies there can be +// deleted as soon as we've switched from StreamSink to ExternalPusher and can delete all the +// StreamSink-related code. For now I'm not trying to avoid duplication. + +// HACK: We need as async pipe, like kj::newOneWayPipe(), except supporting explicit end(). So we +// wrap the two ends of the pipe in special adapters that track whether end() was called. +class ExplicitEndOutputPipeAdapter final: public capnp::ExplicitEndOutputStream { + public: + ExplicitEndOutputPipeAdapter( + kj::Own inner, kj::Own> ended) + : inner(kj::mv(inner)), + ended(kj::mv(ended)) {} + + kj::Promise write(kj::ArrayPtr buffer) override { + return KJ_REQUIRE_NONNULL(inner)->write(buffer); + } + kj::Promise write(kj::ArrayPtr> pieces) override { + return KJ_REQUIRE_NONNULL(inner)->write(pieces); + } + + kj::Maybe> tryPumpFrom( + kj::AsyncInputStream& input, uint64_t amount) override { + return KJ_REQUIRE_NONNULL(inner)->tryPumpFrom(input, amount); + } + + kj::Promise whenWriteDisconnected() override { + return KJ_REQUIRE_NONNULL(inner)->whenWriteDisconnected(); + } + + kj::Promise end() override { + // Signal to the other side that end() was actually called. + ended->getWrapped() = true; + inner = kj::none; + return kj::READY_NOW; + } + + private: + kj::Maybe> inner; + kj::Own> ended; +}; + +class ExplicitEndInputPipeAdapter final: public kj::AsyncInputStream { + public: + ExplicitEndInputPipeAdapter(kj::Own inner, + kj::Own> ended, + kj::Maybe expectedLength) + : inner(kj::mv(inner)), + ended(kj::mv(ended)), + expectedLength(expectedLength) {} + + kj::Promise tryRead(void* buffer, size_t minBytes, size_t maxBytes) override { + size_t result = co_await inner->tryRead(buffer, minBytes, maxBytes); + + KJ_IF_SOME(l, expectedLength) { + KJ_ASSERT(result <= l); + l -= result; + if (l == 0) { + // If we got all the bytes we expected, we treat this as a successful end, because the + // underlying KJ pipe is not actually going to wait for the other side to drop. This is + // consistent with the behavior of Content-Length in HTTP anyway. + ended->getWrapped() = true; + } + } + + if (result < minBytes) { + // Verify that end() was called. + if (!ended->getWrapped()) { + JSG_FAIL_REQUIRE(Error, "ReadableStream received over RPC disconnected prematurely."); + } + } + co_return result; + } + + kj::Maybe tryGetLength() override { + return inner->tryGetLength(); + } + + kj::Promise pumpTo(kj::AsyncOutputStream& output, uint64_t amount) override { + return inner->pumpTo(output, amount); + } + + private: + kj::Own inner; + kj::Own> ended; + kj::Maybe expectedLength; +}; + +} // namespace + +class ExternalPusherImpl::InputStreamImpl final: public ExternalPusher::InputStream::Server { + public: + InputStreamImpl(kj::Own stream): stream(kj::mv(stream)) {} + + kj::Maybe> stream; +}; + +kj::Promise ExternalPusherImpl::pushByteStream(PushByteStreamContext context) { + kj::Maybe expectedLength; + auto lp1 = context.getParams().getLengthPlusOne(); + if (lp1 > 0) { + expectedLength = lp1 - 1; + } + + auto pipe = kj::newOneWayPipe(expectedLength); + + auto endedFlag = kj::refcounted>(false); + + auto out = kj::heap(kj::mv(pipe.out), kj::addRef(*endedFlag)); + auto in = + kj::heap(kj::mv(pipe.in), kj::mv(endedFlag), expectedLength); + + auto results = context.initResults(capnp::MessageSize{4, 2}); + + results.setSource(inputStreamSet.add(kj::heap(kj::mv(in)))); + results.setSink(byteStreamFactory.kjToCapnp(kj::mv(out))); + return kj::READY_NOW; +} + +kj::Own ExternalPusherImpl::unwrapStream( + ExternalPusher::InputStream::Client cap) { + auto& unwrapped = KJ_REQUIRE_NONNULL( + inputStreamSet.tryGetLocalServerSync(cap), "pushed external is not a byte stream"); + + return KJ_REQUIRE_NONNULL(kj::mv(kj::downcast(unwrapped).stream), + "pushed byte stream has already been consumed"); +} + +// ======================================================================================= +// AbortSignal handling + +namespace { + +// The jsrpc handler that receives aborts from the remote and triggers them locally +// +// TODO(cleanup): This class has been copied from external-pusher.c++. The copy there can be +// deleted as soon as we've switched from StreamSink to ExternalPusher and can delete all the +// StreamSink-related code. For now I'm not trying to avoid duplication. +class AbortTriggerRpcServer final: public rpc::AbortTrigger::Server { + public: + AbortTriggerRpcServer(kj::Own> fulfiller, + kj::Own&& pendingReason) + : fulfiller(kj::mv(fulfiller)), + pendingReason(kj::mv(pendingReason)) {} + + kj::Promise abort(AbortContext abortCtx) override { + auto params = abortCtx.getParams(); + auto reason = params.getReason().getV8Serialized(); + + pendingReason->getWrapped() = kj::heapArray(reason.asBytes()); + fulfiller->fulfill(); + return kj::READY_NOW; + } + + kj::Promise release(ReleaseContext releaseCtx) override { + released = true; + return kj::READY_NOW; + } + + ~AbortTriggerRpcServer() noexcept(false) { + if (pendingReason->getWrapped() != nullptr) { + // Already triggered + return; + } + + if (!released) { + pendingReason->getWrapped() = JSG_KJ_EXCEPTION(FAILED, DOMAbortError, + "An AbortSignal received over RPC was implicitly aborted because the connection back to " + "its trigger was lost."); + } + + // Always fulfill the promise in case the AbortSignal was waiting + fulfiller->fulfill(); + } + + private: + kj::Own> fulfiller; + kj::Own pendingReason; + bool released = false; +}; + +} // namespace + +class ExternalPusherImpl::AbortSignalImpl final: public ExternalPusher::AbortSignal::Server { + public: + AbortSignalImpl(AbortSignal content): content(kj::mv(content)) {} + + kj::Maybe content; +}; + +kj::Promise ExternalPusherImpl::pushAbortSignal(PushAbortSignalContext context) { + auto paf = kj::newPromiseAndFulfiller(); + auto pendingReason = kj::refcounted(); + + auto results = context.initResults(capnp::MessageSize{4, 2}); + + results.setTrigger( + kj::heap(kj::mv(paf.fulfiller), kj::addRef(*pendingReason))); + results.setSignal(abortSignalSet.add( + kj::heap(AbortSignal{kj::mv(paf.promise), kj::mv(pendingReason)}))); + + return kj::READY_NOW; +} + +ExternalPusherImpl::AbortSignal ExternalPusherImpl::unwrapAbortSignal( + ExternalPusher::AbortSignal::Client cap) { + auto& unwrapped = KJ_REQUIRE_NONNULL( + abortSignalSet.tryGetLocalServerSync(cap), "pushed external is not an AbortSignal"); + + return KJ_REQUIRE_NONNULL(kj::mv(kj::downcast(unwrapped).content), + "pushed AbortSignal has already been consumed"); +} + +} // namespace workerd diff --git a/src/workerd/io/external-pusher.h b/src/workerd/io/external-pusher.h new file mode 100644 index 00000000000..5d3139d34a7 --- /dev/null +++ b/src/workerd/io/external-pusher.h @@ -0,0 +1,57 @@ +// Copyright (c) 2025 Cloudflare, Inc. +// Licensed under the Apache 2.0 license found in the LICENSE file or at: +// https://opensource.org/licenses/Apache-2.0 + +#pragma once + +#include + +#include +#include + +namespace workerd { + +using kj::byte; + +// Implements JsValue.ExternalPusher from worker-interface.capnp. +// +// ExternalPusher allows a remote peer to "push" certain kinds of objects into our address space +// so that they can then be embedded in `JsValue` as `External` values. +class ExternalPusherImpl: public rpc::JsValue::ExternalPusher::Server, public kj::Refcounted { + public: + ExternalPusherImpl(capnp::ByteStreamFactory& byteStreamFactory) + : byteStreamFactory(byteStreamFactory) {} + + using ExternalPusher = rpc::JsValue::ExternalPusher; + + kj::Own unwrapStream(ExternalPusher::InputStream::Client cap); + + // Box which holds the reason why an AbortSignal was aborted. May be either: + // - A serialized V8 value if the signal was aborted from JavaScript. + // - A KJ exception if the connection from the trigger was lost. + using PendingAbortReason = kj::RefcountedWrapper, kj::Exception>>; + + struct AbortSignal { + // Resolves when `reason` has been filled in. + kj::Promise signal; + + // The abort reason box, will be uninitialized until `signal` resolves. + kj::Own reason; + }; + + AbortSignal unwrapAbortSignal(ExternalPusher::AbortSignal::Client cap); + + kj::Promise pushByteStream(PushByteStreamContext context) override; + kj::Promise pushAbortSignal(PushAbortSignalContext context) override; + + private: + capnp::ByteStreamFactory& byteStreamFactory; + + capnp::CapabilityServerSet inputStreamSet; + capnp::CapabilityServerSet abortSignalSet; + + class InputStreamImpl; + class AbortSignalImpl; +}; + +} // namespace workerd diff --git a/src/workerd/io/io-context.c++ b/src/workerd/io/io-context.c++ index 78dd3a1da6b..d5a009c54a7 100644 --- a/src/workerd/io/io-context.c++ +++ b/src/workerd/io/io-context.c++ @@ -903,6 +903,14 @@ kj::Date IoContext::now() { return now(getCurrentIncomingRequest()); } +kj::Rc IoContext::getExternalPusher() { + KJ_IF_SOME(ep, externalPusher) { + return ep.addRef(); + } else { + return externalPusher.emplace(kj::rc(getByteStreamFactory())).addRef(); + } +} + kj::Own IoContext::getSubrequestNoChecks( kj::FunctionParam(TraceContext&, IoChannelFactory&)> func, SubrequestOptions options) { diff --git a/src/workerd/io/io-context.h b/src/workerd/io/io-context.h index ff8d6f6ae0c..774f6996a6b 100644 --- a/src/workerd/io/io-context.h +++ b/src/workerd/io/io-context.h @@ -9,6 +9,7 @@ #include #include +#include #include #include #include @@ -772,6 +773,8 @@ class IoContext final: public kj::Refcounted, private kj::TaskSet::ErrorHandler return thread.getHeaderIds(); } + kj::Rc getExternalPusher(); + // Subrequest channel numbers for the two special channels. // NULL = The channel used by global fetch() when the Request has no fetcher attached. // NEXT = DEPRECATED: The fetcher attached to Requests delivered by a FetchEvent, so that we can @@ -1012,6 +1015,8 @@ class IoContext final: public kj::Refcounted, private kj::TaskSet::ErrorHandler // objects which reference `deleteQueue` in their destructors. OwnedObjectList ownedObjects; + kj::Maybe> externalPusher; + // Implementation detail of makeCachePutStream(). // TODO: Used for Cache PUT serialization. diff --git a/src/workerd/io/worker-interface.capnp b/src/workerd/io/worker-interface.capnp index ab1c8ef4da0..f41e4fb9aaf 100644 --- a/src/workerd/io/worker-interface.capnp +++ b/src/workerd/io/worker-interface.capnp @@ -474,10 +474,19 @@ struct JsValue { # A ReadableStream. The sender of the JsValue will use the associated StreamSink to open a # stream of type `ByteStream`. + stream @10 :ExternalPusher.InputStream; + # If present, a stream pushed using the destination isolate's ExternalPusher. + # + # If null (deprecated), then the sender will use the associated StreamSink to open a stream + # of type `ByteStream`. StreamSink is in the process of being replaced by ExternalPusher. + encoding @4 :StreamEncoding; # Bytes read from the stream have this encoding. expectedLength :union { + # NOTE: This is obsolete when `stream` is set. Instead, the length is passed to + # ExternalPusher.pushByteStream(). + unknown @5 :Void; known @6 :UInt64; } @@ -488,6 +497,12 @@ struct JsValue { # mechanism used to trigger the abort later. This is modeled as a stream, since the sender is # the one that will later on send the abort signal. This external will have an associated # stream in the corresponding `StreamSink` with type `AbortTrigger`. + # + # TODO(soon): This will be obsolete when we stop using `StreamSink`; `abortSignal` will + # replace it. (The name is wrong anyway -- this is the signal end, not the trigger end.) + + abortSignal @11 :ExternalPusher.AbortSignal; + # Indicates that an `AbortSignal` is being passed. subrequestChannelToken @8 :Data; actorClassChannelToken @9 :Data; @@ -507,12 +522,60 @@ struct JsValue { # # Similarly, the caller passes a `resultsStreamSink` to the callee. If the response contains # any streams, it can start pushing to this immediately after responding. + # + # TODO(soon): This design is overcomplicated since it requires allocating StreamSinks for every + # request, even when not used, and requires a lot of weird promise magic. The newer + # ExternalPusher design is simpler, and only incurs overhead when used. Once all of + # production has been updated to understand ExternalPusher, then we can flip an autogate to + # use it by default. Once that has rolled out globally, we can remove StreamSink. startStream @0 (externalIndex :UInt32) -> (stream :Capability); # Opens a stream corresponding to the given index in the JsValue's `externals` array. The type # of capability returned depends on the type of external. E.g. for `readableStream`, it is a # `ByteStream`. } + + interface ExternalPusher { + # This object allows "pushing" external objects to a target isolate, so that they can + # sublequently be referenced by a `JsValue.External`. This allows implementing externals where + # the sender might need to send subsequent information to the receiver *before* the receiver + # has had a chance to call back to request it. For example, when a ReadableStream is sent over + # RPC, the sender will immediately start sending body bytes without waiting for a round trip. + # + # The key to ExternalPusher is that it constructs and returns capabilities pointing at objects + # living directly in the target isolate's runtime. These capabilities have empty interfaces, + # but can be passed back to the target in the `External` table of a `JsValue`. Since the + # capabilities point to objects directly in the recipient's memory space, they can then be + # unwrapped to obtain the underlying local object, which the recipient then uses to back the + # external value delivered to the application. + # + # Note that externals must be pushed BEFORE the JsValue that uses them is sent, so that they + # can be unwrapped immediately when deserializing the value. + + pushByteStream @0 (lengthPlusOne :UInt64 = 0) -> (source :InputStream, sink :ByteStream); + # Creates a readable stream within the remote's memory space. `source` should be placed in a + # sublequent `External` of type `readableStream`. The caller should write bytes to `sink`. + # + # `lengthPlusOne` is the expected length of the stream, plus 1, with zero indicating no + # expectation. This is used e.g. when the `ReadableStream` was created with `FixedLengthStream`. + # (The weird "plus one" encoding is used because Cap'n Proto doesn't have a Maybe. Perhaps we + # can fix this eventually.) + + interface InputStream { + # No methods. This will be unwrapped by the recipient to obtain the underlying local value. + } + + pushAbortSignal @1 () -> (signal :AbortSignal, trigger :AbortTrigger); + + interface AbortSignal { + # No methods. This can be unwrapped by the recipient to obtain a Promise which + # rejects when the signal is aborted. + } + + # TODO(soon): + # - AbortTrigger + # - Promises + } } interface AbortTrigger $Cxx.allowCancellation { @@ -532,7 +595,15 @@ interface AbortTrigger $Cxx.allowCancellation { # be triggered. Otherwise, the cloned signal will treat a dropped cabability as an abort. } -interface JsRpcTarget $Cxx.allowCancellation { +interface JsRpcTarget extends(JsValue.ExternalPusher) $Cxx.allowCancellation { + # Target on which RPC methods may be invoked. + # + # This is the backing capnp type for a JsRpcStub, as well as used to represent top-level RPC + # events. + # + # JsRpcTarget must implement `JsValue.ExternalPusher` to allow externals to be pushed to the + # target in advance of a call that uses them. + struct CallParams { union { methodName @0 :Text; @@ -578,8 +649,19 @@ interface JsRpcTarget $Cxx.allowCancellation { # "bar". } - resultsStreamSink @4 :JsValue.StreamSink; - # StreamSink used for ReadableStreams found in the results. + resultsStreamHandler :union { + # We're in the process of switching from `StreamSink` to `ExternalPusher`. A caller will only + # offer one or the other, and expect the callee to use that. (Initially, callers will still + # send StreamSink for backwards-compatibility, but once all recipients are able to understand + # ExternalPusher, we'll flip an autogate to make callers send it.) + + streamSink @4 :JsValue.StreamSink; + # StreamSink used for ReadableStreams found in the results. + + externalPusher @5 :JsValue.ExternalPusher; + # ExternalPusher object which will push into the caller's isolate. Use this to push externals + # that will be included in the results. + } } struct CallResults { diff --git a/src/workerd/util/autogate.c++ b/src/workerd/util/autogate.c++ index 5a8b5a8d9dd..25bedf3e996 100644 --- a/src/workerd/util/autogate.c++ +++ b/src/workerd/util/autogate.c++ @@ -35,6 +35,8 @@ kj::StringPtr KJ_STRINGIFY(AutogateKey key) { return "compression-stream-use-state-machine"_kj; case AutogateKey::IDENTITY_TRANSFORM_STREAM_USE_STATE_MACHINE: return "identity-transform-stream-use-state-machine"_kj; + case AutogateKey::RPC_USE_EXTERNAL_PUSHER: + return "rpc-use-external-pusher"_kj; case AutogateKey::NumOfKeys: KJ_FAIL_ASSERT("NumOfKeys should not be used in getName"); } diff --git a/src/workerd/util/autogate.h b/src/workerd/util/autogate.h index 82a4af828e8..a23921a3d90 100644 --- a/src/workerd/util/autogate.h +++ b/src/workerd/util/autogate.h @@ -30,6 +30,8 @@ enum class AutogateKey { COMPRESSION_STREAM_USE_STATE_MACHINE, // Switch the IdentityTransformStream to use the new state machine-based impl IDENTITY_TRANSFORM_STREAM_USE_STATE_MACHINE, + // Use ExternalPusher instead of StreamSink to handle streams in RPC. + RPC_USE_EXTERNAL_PUSHER, NumOfKeys // Reserved for iteration. }; diff --git a/src/workerd/util/completion-membrane.h b/src/workerd/util/completion-membrane.h index fe63afcf1e8..50276935c57 100644 --- a/src/workerd/util/completion-membrane.h +++ b/src/workerd/util/completion-membrane.h @@ -30,6 +30,10 @@ class CompletionMembrane final: public capnp::MembranePolicy, public kj::Refcoun return kj::addRef(*this); } + void reject(kj::Exception&& e) { + doneFulfiller->reject(kj::mv(e)); + } + private: kj::Own> doneFulfiller; };