From 6eda5e532ad1f8fcfa7757d0c3a8144f20abc6d8 Mon Sep 17 00:00:00 2001 From: James M Snell Date: Mon, 29 Dec 2025 09:54:06 -0800 Subject: [PATCH] Fixes to the new streams adapters --- .../api/streams/readable-source-adapter.c++ | 44 ++++++++++++++++--- 1 file changed, 39 insertions(+), 5 deletions(-) diff --git a/src/workerd/api/streams/readable-source-adapter.c++ b/src/workerd/api/streams/readable-source-adapter.c++ index cb5f8abd767..c23b739dd6b 100644 --- a/src/workerd/api/streams/readable-source-adapter.c++ +++ b/src/workerd/api/streams/readable-source-adapter.c++ @@ -755,8 +755,10 @@ jsg::Promise> ReadableSourceKjAdap // At this point, we should have no left over data. KJ_DASSERT(context->maybeLeftOver == kj::none); - // We should also have some space left in our destination buffer. - KJ_DASSERT(context->buffer.size() > 0); + // If the buffer is exactly full (the chunk filled it perfectly), we're done. + if (context->buffer.size() == 0) { + return js.resolvedPromise(kj::mv(context)); + } // We might continue reading only if the adapter is still alive and // in an active state... @@ -1111,13 +1113,28 @@ kj::Promise ReadableSourceKjAdapter::pumpToImpl( .adapterRef = kj::none, // no need to track adapter liveness during pump }); - return active.ioContext.run([context = kj::mv(context), minReadPolicy](jsg::Lock& js) mutable { + // We need to pass a pointer to active into the promise continuation so we can + // save any leftover data. The caller (pumpToImpl) owns the Active and keeps it + // alive for the duration of the pump, so this is safe. + Active* activePtr = &active; + + return active.ioContext.run( + [activePtr, context = kj::mv(context), minReadPolicy](jsg::Lock& js) mutable { auto& ioContext = IoContext::current(); // The readInternal method (and the underlying read on the stream) should optimize // itself based on the bytes available in the stream itself and the minBytes requested. return ioContext .awaitJs(js, ReadableSourceKjAdapter::readInternal(js, kj::mv(context), minReadPolicy)) - .then([](kj::Own context) mutable -> kj::Promise { + .then([activePtr](kj::Own context) mutable -> kj::Promise { + // If there's leftover data from reading a chunk larger than the buffer, + // save it to active.state so it can be used on the next read iteration. + // Only do this if we're still in Idle state - if the state has transitioned + // to something else (e.g. Done, Canceling, Canceled), we discard the leftover. + KJ_IF_SOME(leftOver, context->maybeLeftOver) { + if (activePtr->state.is()) { + activePtr->state = kj::mv(leftOver); + } + } return context->totalRead; }); }); @@ -1238,12 +1255,29 @@ kj::Promise ReadableSourceKjAdapter::pumpToImpl( consecutiveFastReads = 0; } - // Start working on the next read. + // If there's leftover data from the previous read (happens when a JS chunk + // is larger than the buffer), extract it before starting the next read. + // We must do this BEFORE starting the next read so that active->state is Idle + // when the next read's promise continuation tries to save its leftover. + kj::Maybe maybeLeftover; + KJ_IF_SOME(readable, active->state.tryGet()) { + maybeLeftover = kj::mv(readable); + } + + // Start working on the next read. At this point, if there was leftover, we've + // moved it to maybeLeftover, so the next read can safely set its leftover + // to active->state when it completes. + active->state.init(); readPromise = pumpReadImpl(*active, buffers[currentReadBuf], minBytes, minReadPolicy); { KJ_ON_SCOPE_FAILURE(writeFailed = true); co_await output.write(writeBuf); + + // Write any leftover from the previous read. + KJ_IF_SOME(leftover, maybeLeftover) { + co_await output.write(leftover.view); + } } } } catch (...) {