Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions src/workerd/api/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,19 @@ wd_cc_library(
"basics-test.c++",
"crypto/aes-test.c++",
"crypto/impl-test.c++",
]
]

[
kj_test(
src = f,
deps = [
"//src/workerd/io",
"//src/workerd/io:promise-wrapper",
"//src/workerd/tests:test-fixture",
],
)
for f in [
"streams/queue-test.c++",
"streams/standard-test.c++",
]
Expand Down
15 changes: 3 additions & 12 deletions src/workerd/api/streams/queue-test.c++
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,14 @@

#include <workerd/jsg/jsg-test.h>
#include <workerd/jsg/jsg.h>
#include <workerd/tests/test-fixture.h>

namespace workerd::api {
namespace {

jsg::V8System v8System;

struct QueueContext: public jsg::Object, public jsg::ContextGlobal {
JSG_RESOURCE_TYPE(QueueContext) {}
};
JSG_DECLARE_ISOLATE_TYPE(QueueIsolate, QueueContext);

void preamble(auto callback) {
QueueIsolate isolate(v8System, kj::heap<jsg::IsolateObserver>());
isolate.runInLockScope([&](QueueIsolate::Lock& lock) {
JSG_WITHIN_CONTEXT_SCOPE(lock, lock.newContext<QueueContext>().getHandle(lock),
[&](jsg::Lock& js) { callback(js); });
});
TestFixture fixture;
fixture.runInIoContext([&](const TestFixture::Environment& env) { callback(env.js); });
}

using ReadContinuation = jsg::Promise<ReadResult>(ReadResult&&);
Expand Down
36 changes: 32 additions & 4 deletions src/workerd/api/streams/queue.c++
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@

#include "queue.h"

#include <workerd/io/features.h>
#include <workerd/jsg/jsg.h>

#include <kj/common.h>

#include <algorithm>

namespace workerd::api {
Expand Down Expand Up @@ -533,15 +538,38 @@ v8::Local<v8::Uint8Array> ByteQueue::ByobRequest::getView(jsg::Lock& js) {
return v8::Local<v8::Uint8Array>();
}

size_t ByteQueue::ByobRequest::getOriginalBufferByteLength(jsg::Lock& js) const {
KJ_IF_SOME(req, request) {
KJ_IF_SOME(size, req.pullInto.store.underlyingArrayBufferSize(js)) {
return size;
}
}
return 0;
}

size_t ByteQueue::ByobRequest::getOriginalByteOffsetPlusBytesFilled() const {
KJ_IF_SOME(req, request) {
return req.pullInto.store.getOffset() + req.pullInto.filled;
}
return 0;
}

#pragma endregion ByteQueue::ByobRequest

ByteQueue::ByteQueue(size_t highWaterMark): impl(highWaterMark) {}

void ByteQueue::close(jsg::Lock& js) {
KJ_IF_SOME(ready, impl.state.tryGet<ByteQueue::QueueImpl::Ready>()) {
while (!ready.pendingByobReadRequests.empty()) {
ready.pendingByobReadRequests.front()->invalidate();
ready.pendingByobReadRequests.pop_front();
// Note: We intentionally do NOT invalidate pending byob requests here.
// According to the spec, the byobRequest should remain accessible after close
// so that respondWithNewView() can be called on it (which should throw
// appropriate errors for invalid views). The byob request will be invalidated
// when respond() or respondWithNewView() is called.
if (!FeatureFlags::get(js).getPedanticWpt()) {
KJ_IF_SOME(ready, impl.state.tryGet<ByteQueue::QueueImpl::Ready>()) {
while (!ready.pendingByobReadRequests.empty()) {
ready.pendingByobReadRequests.front()->invalidate();
ready.pendingByobReadRequests.pop_front();
}
}
}
impl.close(js);
Expand Down
6 changes: 6 additions & 0 deletions src/workerd/api/streams/queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -859,6 +859,12 @@ class ByteQueue final {

v8::Local<v8::Uint8Array> getView(jsg::Lock& js);

// Returns the byte length of the original underlying ArrayBuffer.
size_t getOriginalBufferByteLength(jsg::Lock& js) const;

// Returns the byte offset of the original view plus bytes filled.
size_t getOriginalByteOffsetPlusBytesFilled() const;

JSG_MEMORY_INFO(ByteQueue::ByobRequest) {}

private:
Expand Down
15 changes: 3 additions & 12 deletions src/workerd/api/streams/standard-test.c++
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,14 @@
#include <workerd/jsg/jsg-test.h>
#include <workerd/jsg/jsg.h>
#include <workerd/jsg/observer.h>
#include <workerd/tests/test-fixture.h>

namespace workerd::api {
namespace {

jsg::V8System v8System;

struct RsContext: public jsg::Object, public jsg::ContextGlobal {
JSG_RESOURCE_TYPE(RsContext) {}
};
JSG_DECLARE_ISOLATE_TYPE(RsIsolate, RsContext, ReadResult);

void preamble(auto callback) {
RsIsolate isolate(v8System, kj::heap<jsg::IsolateObserver>());
isolate.runInLockScope([&](RsIsolate::Lock& lock) {
JSG_WITHIN_CONTEXT_SCOPE(
lock, lock.newContext<RsContext>().getHandle(lock), [&](jsg::Lock& js) { callback(js); });
});
TestFixture fixture;
fixture.runInIoContext([&](const TestFixture::Environment& env) { callback(env.js); });
}

v8::Local<v8::Value> toBytes(jsg::Lock& js, kj::String str) {
Expand Down
46 changes: 44 additions & 2 deletions src/workerd/api/streams/standard.c++
Original file line number Diff line number Diff line change
Expand Up @@ -1991,7 +1991,10 @@ ReadableStreamBYOBRequest::Impl::Impl(jsg::Lock& js,
kj::Rc<WeakRef<ReadableByteStreamController>> controller)
: readRequest(kj::mv(readRequest)),
controller(kj::mv(controller)),
view(js.v8Ref(this->readRequest->getView(js))) {}
view(js.v8Ref(this->readRequest->getView(js))),
originalBufferByteLength(this->readRequest->getOriginalBufferByteLength(js)),
originalByteOffsetPlusBytesFilled(this->readRequest->getOriginalByteOffsetPlusBytesFilled()) {
}

void ReadableStreamBYOBRequest::Impl::updateView(jsg::Lock& js) {
jsg::check(view.getHandle(js)->Buffer()->Detach(v8::Local<v8::Value>()));
Expand Down Expand Up @@ -2085,7 +2088,35 @@ void ReadableStreamBYOBRequest::respondWithNewView(jsg::Lock& js, jsg::BufferSou
if (!controller.canCloseOrEnqueue()) {
JSG_REQUIRE(view.size() == 0, TypeError,
"The view byte length must be zero after the stream is closed.");
KJ_ASSERT(impl.readRequest->isInvalidated());

if (FeatureFlags::get(js).getPedanticWpt()) {
// Per the spec, when the stream is closed:
// 1. The view byte length must be zero (TypeError if not)
// 2. The underlying buffer must not be detached (TypeError)
// 3. The buffer byte length must not be zero (RangeError)
// 4. The buffer byte length must match the original (RangeError)
auto handle = view.getHandle(js);
auto buffer = handle->IsArrayBuffer() ? handle.As<v8::ArrayBuffer>()
: handle.As<v8::ArrayBufferView>()->Buffer();
JSG_REQUIRE(
!buffer->WasDetached(), TypeError, "The underlying ArrayBuffer has been detached.");

JSG_REQUIRE(view.canDetach(js), TypeError, "Unable to use non-detachable ArrayBuffer.");
// Use the stored values since the ByobRequest may have been invalidated during close.
auto actualBufferByteLength = buffer->ByteLength();
JSG_REQUIRE(
actualBufferByteLength != 0, RangeError, "The underlying ArrayBuffer is zero-length.");
JSG_REQUIRE(actualBufferByteLength == impl.originalBufferByteLength, RangeError,
"The underlying ArrayBuffer is not the correct length.");
// The view's byte offset must match the original byte offset plus bytes filled.
auto viewByteOffset =
handle->IsArrayBuffer() ? 0 : handle.As<v8::ArrayBufferView>()->ByteOffset();
JSG_REQUIRE(viewByteOffset == impl.originalByteOffsetPlusBytesFilled, RangeError,
"The view has an invalid byte offset.");
} else {
KJ_ASSERT(impl.readRequest->isInvalidated());
}

invalidate(js);
} else {
bool shouldInvalidate = false;
Expand Down Expand Up @@ -2170,6 +2201,17 @@ void ReadableByteStreamController::close(jsg::Lock& js) {
KJ_IF_SOME(byobRequest, maybeByobRequest) {
JSG_REQUIRE(!byobRequest->isPartiallyFulfilled(), TypeError,
"This ReadableStream was closed with a partial read pending.");
} else if (FeatureFlags::get(js).getPedanticWpt()) {
// If maybeByobRequest is not set, check if there's a pending byob request.
// If so, materialize it before closing so it remains accessible after
// the state changes to Closed. This is required by the spec for proper
// respondWithNewView() error handling in the closed state.
// Only do this if the queue doesn't have a partially fulfilled read.
KJ_IF_SOME(queue, impl.state.tryGet<ByteQueue>()) {
if (!queue.hasPartiallyFulfilledRead()) {
getByobRequest(js);
}
}
}
impl.close(js);
}
Expand Down
3 changes: 3 additions & 0 deletions src/workerd/api/streams/standard.h
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,9 @@ class ReadableStreamBYOBRequest: public jsg::Object {
kj::Rc<WeakRef<ReadableByteStreamController>> controller;
jsg::V8Ref<v8::Uint8Array> view;

size_t originalBufferByteLength;
size_t originalByteOffsetPlusBytesFilled;

Impl(jsg::Lock& js,
kj::Own<ByteQueue::ByobRequest> readRequest,
kj::Rc<WeakRef<ReadableByteStreamController>> controller);
Expand Down
21 changes: 1 addition & 20 deletions src/wpt/streams-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -124,26 +124,7 @@ export default {
],
},

'readable-byte-streams/bad-buffers-and-views.any.js': {
comment: 'See individual comments',
expectedFailures: [
"ReadableStream with byte source: respond() throws if the BYOB request's buffer has been detached (in the closed state)",
// TODO(conform): The spec expects us to throw here because the supplied view
// has a different offset. Instead, we allow it because the view is zero length
// and the controller has been closed (we do the close and zero length check)
// first.
// assert_throws_js(RangeError, () => c.byobRequest.respondWithNewView(view));
'ReadableStream with byte source: respondWithNewView() throws if the supplied view has a different offset (in the closed state)',
// TODO(conform): The spec expects this to be a RangeError
"ReadableStream with byte source: respondWithNewView() throws if the supplied view's buffer is zero-length (in the closed state)",
// TODO(conform): The spec expects this to be a RangeError
"ReadableStream with byte source: respondWithNewView() throws if the supplied view's buffer has a different length (in the closed state)",
// TODO(conform): We currently do not throw here since reading causes the
// view here to be zero length, which is allowed when the stream is closed.
//assert_throws_js(TypeError, () => c.byobRequest.respondWithNewView(view));
"ReadableStream with byte source: enqueue() throws if the BYOB request's buffer has been detached (in the closed state)",
],
},
'readable-byte-streams/bad-buffers-and-views.any.js': {},
'readable-byte-streams/construct-byob-request.any.js': {},
'readable-byte-streams/enqueue-with-detached-buffer.any.js': {},
'readable-byte-streams/general.any.js': {
Expand Down