From e60c1c6e09aa54782e371042a05492dab52832f6 Mon Sep 17 00:00:00 2001 From: Yagiz Nizipli Date: Wed, 24 Dec 2025 13:29:56 -0500 Subject: [PATCH] fix streams byob request handling after close for spec compliance --- src/workerd/api/BUILD.bazel | 13 +++++++ src/workerd/api/streams/queue-test.c++ | 15 ++------ src/workerd/api/streams/queue.c++ | 36 ++++++++++++++++-- src/workerd/api/streams/queue.h | 6 +++ src/workerd/api/streams/standard-test.c++ | 15 ++------ src/workerd/api/streams/standard.c++ | 46 ++++++++++++++++++++++- src/workerd/api/streams/standard.h | 3 ++ src/wpt/streams-test.ts | 21 +---------- 8 files changed, 105 insertions(+), 50 deletions(-) diff --git a/src/workerd/api/BUILD.bazel b/src/workerd/api/BUILD.bazel index 04df1378abe..7f62373a742 100644 --- a/src/workerd/api/BUILD.bazel +++ b/src/workerd/api/BUILD.bazel @@ -531,6 +531,19 @@ wd_cc_library( "basics-test.c++", "crypto/aes-test.c++", "crypto/impl-test.c++", + ] +] + +[ + kj_test( + src = f, + deps = [ + "//src/workerd/io", + "//src/workerd/io:promise-wrapper", + "//src/workerd/tests:test-fixture", + ], + ) + for f in [ "streams/queue-test.c++", "streams/standard-test.c++", ] diff --git a/src/workerd/api/streams/queue-test.c++ b/src/workerd/api/streams/queue-test.c++ index 1ccf88222fb..5488fc548fb 100644 --- a/src/workerd/api/streams/queue-test.c++ +++ b/src/workerd/api/streams/queue-test.c++ @@ -6,23 +6,14 @@ #include #include +#include namespace workerd::api { namespace { -jsg::V8System v8System; - -struct QueueContext: public jsg::Object, public jsg::ContextGlobal { - JSG_RESOURCE_TYPE(QueueContext) {} -}; -JSG_DECLARE_ISOLATE_TYPE(QueueIsolate, QueueContext); - void preamble(auto callback) { - QueueIsolate isolate(v8System, kj::heap()); - isolate.runInLockScope([&](QueueIsolate::Lock& lock) { - JSG_WITHIN_CONTEXT_SCOPE(lock, lock.newContext().getHandle(lock), - [&](jsg::Lock& js) { callback(js); }); - }); + TestFixture fixture; + fixture.runInIoContext([&](const TestFixture::Environment& env) { callback(env.js); }); } using ReadContinuation = jsg::Promise(ReadResult&&); diff --git a/src/workerd/api/streams/queue.c++ b/src/workerd/api/streams/queue.c++ index 068a54d8e8f..0ce58b77057 100644 --- a/src/workerd/api/streams/queue.c++ +++ b/src/workerd/api/streams/queue.c++ @@ -4,6 +4,11 @@ #include "queue.h" +#include +#include + +#include + #include namespace workerd::api { @@ -533,15 +538,38 @@ v8::Local ByteQueue::ByobRequest::getView(jsg::Lock& js) { return v8::Local(); } +size_t ByteQueue::ByobRequest::getOriginalBufferByteLength(jsg::Lock& js) const { + KJ_IF_SOME(req, request) { + KJ_IF_SOME(size, req.pullInto.store.underlyingArrayBufferSize(js)) { + return size; + } + } + return 0; +} + +size_t ByteQueue::ByobRequest::getOriginalByteOffsetPlusBytesFilled() const { + KJ_IF_SOME(req, request) { + return req.pullInto.store.getOffset() + req.pullInto.filled; + } + return 0; +} + #pragma endregion ByteQueue::ByobRequest ByteQueue::ByteQueue(size_t highWaterMark): impl(highWaterMark) {} void ByteQueue::close(jsg::Lock& js) { - KJ_IF_SOME(ready, impl.state.tryGet()) { - while (!ready.pendingByobReadRequests.empty()) { - ready.pendingByobReadRequests.front()->invalidate(); - ready.pendingByobReadRequests.pop_front(); + // Note: We intentionally do NOT invalidate pending byob requests here. + // According to the spec, the byobRequest should remain accessible after close + // so that respondWithNewView() can be called on it (which should throw + // appropriate errors for invalid views). The byob request will be invalidated + // when respond() or respondWithNewView() is called. + if (!FeatureFlags::get(js).getPedanticWpt()) { + KJ_IF_SOME(ready, impl.state.tryGet()) { + while (!ready.pendingByobReadRequests.empty()) { + ready.pendingByobReadRequests.front()->invalidate(); + ready.pendingByobReadRequests.pop_front(); + } } } impl.close(js); diff --git a/src/workerd/api/streams/queue.h b/src/workerd/api/streams/queue.h index c5ba6922e47..2cad9cf02c7 100644 --- a/src/workerd/api/streams/queue.h +++ b/src/workerd/api/streams/queue.h @@ -859,6 +859,12 @@ class ByteQueue final { v8::Local getView(jsg::Lock& js); + // Returns the byte length of the original underlying ArrayBuffer. + size_t getOriginalBufferByteLength(jsg::Lock& js) const; + + // Returns the byte offset of the original view plus bytes filled. + size_t getOriginalByteOffsetPlusBytesFilled() const; + JSG_MEMORY_INFO(ByteQueue::ByobRequest) {} private: diff --git a/src/workerd/api/streams/standard-test.c++ b/src/workerd/api/streams/standard-test.c++ index bd55520b972..f45551767d6 100644 --- a/src/workerd/api/streams/standard-test.c++ +++ b/src/workerd/api/streams/standard-test.c++ @@ -4,23 +4,14 @@ #include #include #include +#include namespace workerd::api { namespace { -jsg::V8System v8System; - -struct RsContext: public jsg::Object, public jsg::ContextGlobal { - JSG_RESOURCE_TYPE(RsContext) {} -}; -JSG_DECLARE_ISOLATE_TYPE(RsIsolate, RsContext, ReadResult); - void preamble(auto callback) { - RsIsolate isolate(v8System, kj::heap()); - isolate.runInLockScope([&](RsIsolate::Lock& lock) { - JSG_WITHIN_CONTEXT_SCOPE( - lock, lock.newContext().getHandle(lock), [&](jsg::Lock& js) { callback(js); }); - }); + TestFixture fixture; + fixture.runInIoContext([&](const TestFixture::Environment& env) { callback(env.js); }); } v8::Local toBytes(jsg::Lock& js, kj::String str) { diff --git a/src/workerd/api/streams/standard.c++ b/src/workerd/api/streams/standard.c++ index 428ad03d9d8..0bc46d6e79d 100644 --- a/src/workerd/api/streams/standard.c++ +++ b/src/workerd/api/streams/standard.c++ @@ -1991,7 +1991,10 @@ ReadableStreamBYOBRequest::Impl::Impl(jsg::Lock& js, kj::Rc> controller) : readRequest(kj::mv(readRequest)), controller(kj::mv(controller)), - view(js.v8Ref(this->readRequest->getView(js))) {} + view(js.v8Ref(this->readRequest->getView(js))), + originalBufferByteLength(this->readRequest->getOriginalBufferByteLength(js)), + originalByteOffsetPlusBytesFilled(this->readRequest->getOriginalByteOffsetPlusBytesFilled()) { +} void ReadableStreamBYOBRequest::Impl::updateView(jsg::Lock& js) { jsg::check(view.getHandle(js)->Buffer()->Detach(v8::Local())); @@ -2085,7 +2088,35 @@ void ReadableStreamBYOBRequest::respondWithNewView(jsg::Lock& js, jsg::BufferSou if (!controller.canCloseOrEnqueue()) { JSG_REQUIRE(view.size() == 0, TypeError, "The view byte length must be zero after the stream is closed."); - KJ_ASSERT(impl.readRequest->isInvalidated()); + + if (FeatureFlags::get(js).getPedanticWpt()) { + // Per the spec, when the stream is closed: + // 1. The view byte length must be zero (TypeError if not) + // 2. The underlying buffer must not be detached (TypeError) + // 3. The buffer byte length must not be zero (RangeError) + // 4. The buffer byte length must match the original (RangeError) + auto handle = view.getHandle(js); + auto buffer = handle->IsArrayBuffer() ? handle.As() + : handle.As()->Buffer(); + JSG_REQUIRE( + !buffer->WasDetached(), TypeError, "The underlying ArrayBuffer has been detached."); + + JSG_REQUIRE(view.canDetach(js), TypeError, "Unable to use non-detachable ArrayBuffer."); + // Use the stored values since the ByobRequest may have been invalidated during close. + auto actualBufferByteLength = buffer->ByteLength(); + JSG_REQUIRE( + actualBufferByteLength != 0, RangeError, "The underlying ArrayBuffer is zero-length."); + JSG_REQUIRE(actualBufferByteLength == impl.originalBufferByteLength, RangeError, + "The underlying ArrayBuffer is not the correct length."); + // The view's byte offset must match the original byte offset plus bytes filled. + auto viewByteOffset = + handle->IsArrayBuffer() ? 0 : handle.As()->ByteOffset(); + JSG_REQUIRE(viewByteOffset == impl.originalByteOffsetPlusBytesFilled, RangeError, + "The view has an invalid byte offset."); + } else { + KJ_ASSERT(impl.readRequest->isInvalidated()); + } + invalidate(js); } else { bool shouldInvalidate = false; @@ -2170,6 +2201,17 @@ void ReadableByteStreamController::close(jsg::Lock& js) { KJ_IF_SOME(byobRequest, maybeByobRequest) { JSG_REQUIRE(!byobRequest->isPartiallyFulfilled(), TypeError, "This ReadableStream was closed with a partial read pending."); + } else if (FeatureFlags::get(js).getPedanticWpt()) { + // If maybeByobRequest is not set, check if there's a pending byob request. + // If so, materialize it before closing so it remains accessible after + // the state changes to Closed. This is required by the spec for proper + // respondWithNewView() error handling in the closed state. + // Only do this if the queue doesn't have a partially fulfilled read. + KJ_IF_SOME(queue, impl.state.tryGet()) { + if (!queue.hasPartiallyFulfilledRead()) { + getByobRequest(js); + } + } } impl.close(js); } diff --git a/src/workerd/api/streams/standard.h b/src/workerd/api/streams/standard.h index efaf80fd791..cedc65a2590 100644 --- a/src/workerd/api/streams/standard.h +++ b/src/workerd/api/streams/standard.h @@ -501,6 +501,9 @@ class ReadableStreamBYOBRequest: public jsg::Object { kj::Rc> controller; jsg::V8Ref view; + size_t originalBufferByteLength; + size_t originalByteOffsetPlusBytesFilled; + Impl(jsg::Lock& js, kj::Own readRequest, kj::Rc> controller); diff --git a/src/wpt/streams-test.ts b/src/wpt/streams-test.ts index c5de7d6370f..f4becf31a10 100644 --- a/src/wpt/streams-test.ts +++ b/src/wpt/streams-test.ts @@ -124,26 +124,7 @@ export default { ], }, - 'readable-byte-streams/bad-buffers-and-views.any.js': { - comment: 'See individual comments', - expectedFailures: [ - "ReadableStream with byte source: respond() throws if the BYOB request's buffer has been detached (in the closed state)", - // TODO(conform): The spec expects us to throw here because the supplied view - // has a different offset. Instead, we allow it because the view is zero length - // and the controller has been closed (we do the close and zero length check) - // first. - // assert_throws_js(RangeError, () => c.byobRequest.respondWithNewView(view)); - 'ReadableStream with byte source: respondWithNewView() throws if the supplied view has a different offset (in the closed state)', - // TODO(conform): The spec expects this to be a RangeError - "ReadableStream with byte source: respondWithNewView() throws if the supplied view's buffer is zero-length (in the closed state)", - // TODO(conform): The spec expects this to be a RangeError - "ReadableStream with byte source: respondWithNewView() throws if the supplied view's buffer has a different length (in the closed state)", - // TODO(conform): We currently do not throw here since reading causes the - // view here to be zero length, which is allowed when the stream is closed. - //assert_throws_js(TypeError, () => c.byobRequest.respondWithNewView(view)); - "ReadableStream with byte source: enqueue() throws if the BYOB request's buffer has been detached (in the closed state)", - ], - }, + 'readable-byte-streams/bad-buffers-and-views.any.js': {}, 'readable-byte-streams/construct-byob-request.any.js': {}, 'readable-byte-streams/enqueue-with-detached-buffer.any.js': {}, 'readable-byte-streams/general.any.js': {