From f465264926c0e1780deace815caa3dcac7d7fafd Mon Sep 17 00:00:00 2001 From: sim Date: Thu, 31 Oct 2024 11:05:11 +0000 Subject: [PATCH] Add ability to timeout to SSE --- .../kotlin/okhttp3/sse/EventSourceListener.kt | 5 ++++ .../okhttp3/sse/internal/RealEventSource.kt | 29 +++++++++++++++++-- 2 files changed, 32 insertions(+), 2 deletions(-) diff --git a/okhttp-sse/src/main/kotlin/okhttp3/sse/EventSourceListener.kt b/okhttp-sse/src/main/kotlin/okhttp3/sse/EventSourceListener.kt index 1693a3bfe94d..7908a426698e 100644 --- a/okhttp-sse/src/main/kotlin/okhttp3/sse/EventSourceListener.kt +++ b/okhttp-sse/src/main/kotlin/okhttp3/sse/EventSourceListener.kt @@ -18,6 +18,11 @@ package okhttp3.sse import okhttp3.Response abstract class EventSourceListener { + /** + * Milliseconds elapsed between 2 events until connection failed. Doesn't timeout if null + */ + open var idleTimeoutMillis: Long? = null + /** * Invoked when an event source has been accepted by the remote peer and may begin transmitting * events. diff --git a/okhttp-sse/src/main/kotlin/okhttp3/sse/internal/RealEventSource.kt b/okhttp-sse/src/main/kotlin/okhttp3/sse/internal/RealEventSource.kt index 3611fed83e7f..20ad8d41bc4e 100644 --- a/okhttp-sse/src/main/kotlin/okhttp3/sse/internal/RealEventSource.kt +++ b/okhttp-sse/src/main/kotlin/okhttp3/sse/internal/RealEventSource.kt @@ -16,6 +16,8 @@ package okhttp3.sse.internal import java.io.IOException +import kotlin.time.Duration +import kotlin.time.Duration.Companion.milliseconds import okhttp3.Call import okhttp3.Callback import okhttp3.Request @@ -24,6 +26,8 @@ import okhttp3.ResponseBody import okhttp3.internal.stripBody import okhttp3.sse.EventSource import okhttp3.sse.EventSourceListener +import okio.AsyncTimeout +import okio.Timeout.Companion.timeout internal class RealEventSource private constructor( private val call: Call?, @@ -38,6 +42,20 @@ internal class RealEventSource private constructor( @Volatile private var canceled = false + private fun updateTimeout(call: Call?, duration: Duration) { + val timeout = call?.timeout() + if (timeout is AsyncTimeout) { + timeout.apply { + // If a timeout is in process, we exit it before entering again + if (this.timeoutNanos() > 0L) { + exit() + } + timeout(duration) + enter() + } + } + } + override fun onResponse( call: Call, response: Response, @@ -63,8 +81,11 @@ internal class RealEventSource private constructor( return } - // This is a long-lived response. Cancel full-call timeouts. - call?.timeout()?.cancel() + // This is a long-lived response. Cancel full-call timeouts if no timeout has been set + listener.idleTimeoutMillis?.let { + // We spend at most timeout seconds if set + updateTimeout(call, it.milliseconds) + } ?: call?.timeout()?.cancel() // Replace the body with a stripped one so the callbacks can't see real data. val response = response.stripBody() @@ -74,6 +95,10 @@ internal class RealEventSource private constructor( if (!canceled) { listener.onOpen(this, response) while (!canceled && reader.processNextEvent()) { + listener.idleTimeoutMillis?.let { + // We spend at most timeout seconds if set + updateTimeout(call, it.milliseconds) + } } } } catch (e: Exception) {