-
Notifications
You must be signed in to change notification settings - Fork 1.5k
fix(client): apply timeout to response body reads #1908
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -949,7 +949,28 @@ export class OpenAI { | |
| const abort = this._makeAbort(controller); | ||
| if (signal) signal.addEventListener('abort', abort, { once: true }); | ||
|
|
||
| const timeout = setTimeout(abort, ms); | ||
| let timeout: ReturnType<typeof setTimeout> | undefined = setTimeout(abort, ms); | ||
| let timeoutCleared = false; | ||
| // Re-arm the timer so that the timeout applies to inactivity during the | ||
| // body-read phase as well, not just to header arrival. A stalled body read | ||
| // (headers received, then the server stops sending) is aborted after `ms` | ||
| // of inactivity instead of hanging forever, while a steadily-streaming | ||
| // response keeps resetting the timer and is never cut off. | ||
| // See https://github.com/openai/openai-node/issues/1825 | ||
| const rearmTimeout = () => { | ||
| if (timeoutCleared) return; | ||
| if (timeout) clearTimeout(timeout); | ||
| timeout = setTimeout(abort, ms); | ||
| }; | ||
| const clearCurrentTimeout = () => { | ||
| if (timeout) clearTimeout(timeout); | ||
| timeout = undefined; | ||
| }; | ||
| const clearTimeoutOnce = () => { | ||
| if (timeoutCleared) return; | ||
| timeoutCleared = true; | ||
| clearCurrentTimeout(); | ||
| }; | ||
|
|
||
| const isReadableBody = | ||
| ((globalThis as any).ReadableStream && options.body instanceof (globalThis as any).ReadableStream) || | ||
|
|
@@ -967,12 +988,65 @@ export class OpenAI { | |
| fetchOptions.method = method.toUpperCase(); | ||
| } | ||
|
|
||
| let response: Response; | ||
| try { | ||
| // use undefined this binding; fetch errors if bound to something else in browser/cloudflare | ||
| return await this.fetch.call(undefined, url, fetchOptions); | ||
| } finally { | ||
| clearTimeout(timeout); | ||
| response = await this.fetch.call(undefined, url, fetchOptions); | ||
| } catch (err) { | ||
| clearTimeoutOnce(); | ||
| throw err; | ||
| } | ||
|
|
||
| const ReadableStreamCtor = (globalThis as any).ReadableStream; | ||
| if (!response.body || !ReadableStreamCtor) { | ||
| clearTimeoutOnce(); | ||
| return response; | ||
| } | ||
|
|
||
| // Headers have arrived; wait until the body is actually read before starting | ||
| // the body-read timeout, so callers that only inspect the Response don't | ||
| // leave an active timer behind. | ||
| clearCurrentTimeout(); | ||
|
|
||
| const reader = response.body.getReader(); | ||
| const releaseReader = () => { | ||
| try { | ||
| reader.releaseLock(); | ||
| } catch {} | ||
| }; | ||
| const monitoredBody = new ReadableStreamCtor({ | ||
| async pull(streamController: ReadableStreamDefaultController) { | ||
| try { | ||
| rearmTimeout(); | ||
| const { done, value } = await reader.read(); | ||
|
Comment on lines
+1020
to
+1021
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
When the server sends headers and then stalls the body, the new timer now fires from this Useful? React with 👍 / 👎. |
||
| if (done) { | ||
| clearTimeoutOnce(); | ||
| releaseReader(); | ||
| streamController.close(); | ||
| return; | ||
| } | ||
| clearCurrentTimeout(); | ||
| streamController.enqueue(value); | ||
| } catch (err) { | ||
| clearTimeoutOnce(); | ||
| releaseReader(); | ||
| streamController.error(err); | ||
| } | ||
| }, | ||
| cancel(reason: any) { | ||
| clearTimeoutOnce(); | ||
| return reader.cancel(reason).finally(releaseReader); | ||
| }, | ||
| }); | ||
|
|
||
| const monitoredResponse = new Response(monitoredBody as any, { | ||
| status: response.status, | ||
| statusText: response.statusText, | ||
| headers: response.headers, | ||
| }); | ||
| // `Response` does not let us pass `url` through the constructor, so preserve it. | ||
| Object.defineProperty(monitoredResponse, 'url', { value: response.url, configurable: true }); | ||
| return monitoredResponse; | ||
| } | ||
|
|
||
| private async shouldRetry(response: Response): Promise<boolean> { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In Node 20+
globalThis.ReadableStreamexists, so any custom fetch that returns a node-fetch-style response with a NodeReadablebody reaches this path, but those bodies do not implementgetReader(). Previously such responses could still be parsed via their ownResponse.text()/json()methods; now the request throwsTypeError: response.body.getReader is not a functionbefore returning, so custom fetch implementations with non-WHATWG bodies need to be left unwrapped or checked forgetReaderfirst.Useful? React with 👍 / 👎.