Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 132 additions & 17 deletions src/workerd/api/streams/standard.c++
Original file line number Diff line number Diff line change
Expand Up @@ -783,6 +783,10 @@ class WritableStreamJsController final: public WritableStreamController {

void doError(jsg::Lock& js, v8::Local<v8::Value> reason);

// Error through the underlying controller if available, going through the proper
// error transition (Erroring -> Errored).
void errorIfNeeded(jsg::Lock& js, v8::Local<v8::Value> reason);

kj::Maybe<int> getDesiredSize() override;

kj::Maybe<v8::Local<v8::Value>> isErroring(jsg::Lock& js) override;
Expand Down Expand Up @@ -3394,6 +3398,16 @@ void WritableStreamJsController::doError(jsg::Lock& js, v8::Local<v8::Value> rea
}
}

void WritableStreamJsController::errorIfNeeded(jsg::Lock& js, v8::Local<v8::Value> reason) {
// Error through the underlying controller if available, which goes through the proper
// error transition (Erroring -> Errored). This allows close() to be called while the
// stream is "erroring" and reject with the stored error.
KJ_IF_SOME(controller, state.tryGet<Controller>()) {
controller->error(js, reason);
}
// If state is not Controller (already Closed or Errored), this is a no-op.
}

kj::Maybe<int> WritableStreamJsController::getDesiredSize() {
KJ_SWITCH_ONEOF(state) {
KJ_CASE_ONEOF(closed, StreamStates::Closed) {
Expand Down Expand Up @@ -3790,23 +3804,41 @@ jsg::Promise<void> TransformStreamDefaultController::write(

jsg::Promise<void> TransformStreamDefaultController::abort(
jsg::Lock& js, v8::Local<v8::Value> reason) {
KJ_IF_SOME(finish, algorithms.maybeFinish) {
return finish.whenResolved(js);
if (FeatureFlags::get(js).getPedanticWpt()) {
// If a finish operation is already in progress, return the existing promise
// or handle the case where we're being called synchronously from within another
// finish operation.
if (algorithms.finishStarted) {
KJ_IF_SOME(finish, algorithms.maybeFinish) {
return finish.whenResolved(js);
}
// finishStarted is true but maybeFinish is not set yet - this means we're being
// called synchronously from within another finish operation (like cancel).
// We need to error the stream with the abort reason so that both the current
// operation and this abort reject with the abort reason.
error(js, reason);
return js.rejectedPromise<void>(js.v8Ref(reason));
}

// Mark that we're starting a finish operation before running the algorithm.
algorithms.finishStarted = true;
} else {
KJ_IF_SOME(finish, algorithms.maybeFinish) {
return finish.whenResolved(js);
}
}

return algorithms.maybeFinish
.emplace(maybeRunAlgorithm(js, algorithms.cancel,
JSG_VISITABLE_LAMBDA(
(this, ref = JSG_THIS, reason = jsg::JsRef(js, jsg::JsValue(reason))), (ref, reason),
(jsg::Lock & js)->jsg::Promise<void> {
// If the readable side is errored, return a rejected promise with the stored error
KJ_IF_SOME(controller, tryGetReadableController()) {
KJ_IF_SOME(error, controller.getMaybeErrorState(js)) {
return js.rejectedPromise<void>(kj::mv(error));
} else {
} // Else block to avert dangling else compiler warning.
} else {
} // Else block to avert dangling else compiler warning.

{
KJ_IF_SOME(err, getReadableErrorState(js)) {
return js.rejectedPromise<void>(kj::mv(err));
}
}
// Otherwise... error with the given reason and resolve the abort promise
error(js, reason.getHandle(js));
return js.resolvedPromise();
Expand All @@ -3821,15 +3853,53 @@ jsg::Promise<void> TransformStreamDefaultController::abort(
}

jsg::Promise<void> TransformStreamDefaultController::close(jsg::Lock& js) {
auto flags = FeatureFlags::get(js);
if (flags.getPedanticWpt()) {
// If a finish operation is already in progress (e.g., from cancel or abort),
// we should not run flush. Per the WHATWG streams spec, close/flush should
// coordinate with cancel to avoid calling both.
if (algorithms.finishStarted) {
KJ_IF_SOME(finish, algorithms.maybeFinish) {
return finish.whenResolved(js);
}
// finishStarted is true but maybeFinish is not set yet - this means we're being
// called synchronously from within another finish operation. If the stream was
// errored during that operation, return a rejected promise with the error.
KJ_IF_SOME(writableController, tryGetWritableController()) {
KJ_IF_SOME(err, writableController.isErroredOrErroring(js)) {
return js.rejectedPromise<void>(err);
}
}
KJ_IF_SOME(err, getReadableErrorState(js)) {
return js.rejectedPromise<void>(kj::mv(err));
}
return js.resolvedPromise();
}

// Mark that we're starting a finish operation before running the algorithm,
// since the algorithm may synchronously call other finish operations.
algorithms.finishStarted = true;
}

auto onSuccess =
JSG_VISITABLE_LAMBDA((ref = JSG_THIS), (ref), (jsg::Lock & js)->jsg::Promise<void> {
KJ_IF_SOME(readableController, ref->tryGetReadableController()) {
// If the stream was errored during the flush algorithm (e.g., by controller.error()
// or by a parallel cancel() calling abort()), we should reject with that error.
if (FeatureFlags::get(js).getPedanticWpt()) {
KJ_IF_SOME(err, ref->getReadableErrorState(js)) {
return js.rejectedPromise<void>(kj::mv(err));
}
}
// Allows for a graceful close of the readable side. Close will
// complete once all of the queued data is read or the stream
// errors.
// errors. Only close if the stream can still be closed (e.g.,
// it wasn't closed by a cancel operation from within flush).
{
KJ_IF_SOME(readableController, ref->tryGetReadableController()) {
if (readableController.canCloseOrEnqueue()) {
readableController.close(js);
} else {
// Else block to avert dangling else compiler warning.
}
}
}
return js.resolvedPromise();
});
Expand All @@ -3840,6 +3910,13 @@ jsg::Promise<void> TransformStreamDefaultController::close(jsg::Lock& js) {
return js.rejectedPromise<void>(kj::mv(reason));
});

if (flags.getPedanticWpt()) {
return algorithms.maybeFinish
.emplace(
maybeRunAlgorithm(js, algorithms.flush, kj::mv(onSuccess), kj::mv(onFailure), JSG_THIS))
.whenResolved(js);
}

return maybeRunAlgorithm(js, algorithms.flush, kj::mv(onSuccess), kj::mv(onFailure), JSG_THIS);
}

Expand All @@ -3851,14 +3928,40 @@ jsg::Promise<void> TransformStreamDefaultController::pull(jsg::Lock& js) {

jsg::Promise<void> TransformStreamDefaultController::cancel(
jsg::Lock& js, v8::Local<v8::Value> reason) {
KJ_IF_SOME(finish, algorithms.maybeFinish) {
return finish.whenResolved(js);
if (FeatureFlags::get(js).getPedanticWpt()) {
// If a finish operation is already in progress, return the existing promise
// or check for errors if we're being called synchronously from within another
// finish operation.
if (algorithms.finishStarted) {
KJ_IF_SOME(finish, algorithms.maybeFinish) {
return finish.whenResolved(js);
}
// finishStarted is true but maybeFinish is not set yet - check if the stream
// was errored during that operation.
KJ_IF_SOME(err, getReadableErrorState(js)) {
return js.rejectedPromise<void>(kj::mv(err));
}
return js.resolvedPromise();
}

// Mark that we're starting a finish operation before running the algorithm.
algorithms.finishStarted = true;
}

return algorithms.maybeFinish
.emplace(maybeRunAlgorithm(js, algorithms.cancel,
JSG_VISITABLE_LAMBDA(
(this, ref = JSG_THIS, reason = jsg::JsRef(js, jsg::JsValue(reason))), (ref, reason),
(jsg::Lock & js)->jsg::Promise<void> {
// If the stream was errored during the cancel algorithm (e.g., by controller.error()
// or by a parallel abort()), we should reject with that error.
if (FeatureFlags::get(js).getPedanticWpt()) {
KJ_IF_SOME(err, getReadableErrorState(js)) {
readable = kj::none;
errorWritableAndUnblockWrite(js, reason.getHandle(js));
return js.rejectedPromise<void>(kj::mv(err));
}
}
readable = kj::none;
errorWritableAndUnblockWrite(js, reason.getHandle(js));
return js.resolvedPromise();
Expand Down Expand Up @@ -3907,7 +4010,12 @@ void TransformStreamDefaultController::errorWritableAndUnblockWrite(
jsg::Lock& js, v8::Local<v8::Value> reason) {
algorithms.clear();
KJ_IF_SOME(writableController, tryGetWritableController()) {
if (writableController.isWritable()) {
if (FeatureFlags::get(js).getPedanticWpt()) {
// Use errorIfNeeded which goes through the proper error transition (Erroring -> Errored).
// This allows close() to be called while the stream is "erroring" and reject with the
// stored error, which is the expected behavior per the WHATWG streams spec.
writableController.errorIfNeeded(js, reason);
} else if (writableController.isWritable()) {
writableController.doError(js, reason);
}
writable = kj::none;
Expand Down Expand Up @@ -3991,6 +4099,13 @@ kj::Maybe<WritableStreamJsController&> TransformStreamDefaultController::
return kj::none;
}

kj::Maybe<jsg::Value> TransformStreamDefaultController::getReadableErrorState(jsg::Lock& js) {
KJ_IF_SOME(controller, tryGetReadableController()) {
return controller.getMaybeErrorState(js);
}
return kj::none;
}

template <class Self>
kj::StringPtr WritableImpl<Self>::jsgGetMemoryName() const {
return "WritableImpl"_kjc;
Expand Down
7 changes: 7 additions & 0 deletions src/workerd/api/streams/standard.h
Original file line number Diff line number Diff line change
Expand Up @@ -690,6 +690,11 @@ class TransformStreamDefaultController: public jsg::Object {
kj::Maybe<jsg::Function<Transformer::CancelAlgorithm>> cancel;

kj::Maybe<jsg::Promise<void>> maybeFinish = kj::none;
// This flag is set to true at the start of a finish operation (close/cancel/abort)
// before the algorithm runs. This is needed because emplace() evaluates its argument
// before setting maybeFinish, so if the algorithm calls another finish operation
// synchronously, maybeFinish wouldn't be set yet.
bool finishStarted = false;

Algorithms() {};
Algorithms(Algorithms&& other) = default;
Expand All @@ -716,6 +721,8 @@ class TransformStreamDefaultController: public jsg::Object {
kj::Maybe<ReadableStreamDefaultController&> tryGetReadableController();
kj::Maybe<WritableStreamJsController&> tryGetWritableController();

kj::Maybe<jsg::Value> getReadableErrorState(jsg::Lock& js);

// Currently, JS-backed transform streams only support value-oriented streams.
// In the future, that may change and this will need to become a kj::OneOf
// that includes a ReadableByteStreamController.
Expand Down
11 changes: 0 additions & 11 deletions src/wpt/streams-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -596,12 +596,8 @@ export default {
comment: 'To be investigated',
expectedFailures: [
'readable.cancel() and a parallel writable.close() should reject if a transformer.cancel() calls controller.error()',
'closing the writable side should reject if a parallel transformer.cancel() throws',
'writable.abort() and readable.cancel() should reject if a transformer.cancel() calls controller.error()',
'readable.cancel() should not call cancel() again when already called from writable.abort()',
'writable.close() should not call flush() when cancel() is already called from readable.cancel()',
'writable.abort() should not call cancel() again when already called from readable.cancel()',
'readable.cancel() should not call cancel() when flush() is already called from writable.close()',
],
},
'transform-streams/errors.any.js': {
Expand All @@ -611,16 +607,11 @@ export default {
'an exception from transform() should error the stream if terminate has been requested but not completed',
],
expectedFailures: [
'when controller.error is followed by a rejection, the error reason should come from controller.error',
'TransformStream constructor should throw when start does',
'when strategy.size throws inside start(), the constructor should throw the same error',
'when strategy.size calls controller.error() then throws, the constructor should throw the first error',
'it should be possible to error the readable between close requested and complete',
'controller.error() should do nothing after a transformer method has thrown an exception',
'controller.error() should do nothing the second time it is called',
'abort should set the close reason for the writable when it happens before cancel during start, and cancel should reject',
'controller.error() should close writable immediately after readable.cancel()',
'abort should set the close reason for the writable when it happens before cancel during underlying sink write, but cancel should still succeed',
'erroring during write with backpressure should result in the write failing',
],
},
Expand Down Expand Up @@ -691,8 +682,6 @@ export default {
comment: 'To be investigated',
expectedFailures: [
'controller.error() after controller.terminate() with queued chunk should error the readable',
'controller.error() after controller.terminate() without queued chunk should do nothing',
'controller.terminate() inside flush() should not prevent writer.close() from succeeding',
],
},

Expand Down
Loading