From 2480864508e2515961870f6c0ce8c3a15bcdad37 Mon Sep 17 00:00:00 2001
From: "li.sun"
Date: Sun, 25 May 2025 22:41:21 +0800
Subject: [PATCH 1/2] #fix llm response is too long,the scanner.Scan can't read
---
internal/utils/http_requests/http_warpper.go | 14 +++++++++++---
1 file changed, 11 insertions(+), 3 deletions(-)
diff --git a/internal/utils/http_requests/http_warpper.go b/internal/utils/http_requests/http_warpper.go
index 99518e15c..8948aad1e 100644
--- a/internal/utils/http_requests/http_warpper.go
+++ b/internal/utils/http_requests/http_warpper.go
@@ -109,11 +109,19 @@ func RequestAndParseStream[T any](client *http.Client, url string, method string
"module": "http_requests",
"function": "RequestAndParseStream",
}, func() {
- scanner := bufio.NewScanner(resp.Body)
+ reader := bufio.NewReaderSize(resp.Body, 4*1024) // init with 4KB buffer
defer resp.Body.Close()
+ for {
+ // read line by line
+ data, err := reader.ReadBytes('\n')
+ if err != nil {
+ if err != io.EOF {
+ log.Error("read body err:", err)
+ }
+ break
+ }
- for scanner.Scan() {
- data := scanner.Bytes()
+ data = bytes.TrimSpace(data)
if len(data) == 0 {
continue
}
From 13855c74999908b9e2343a6ccaff8a8322af0257 Mon Sep 17 00:00:00 2001
From: Yeuoly
Date: Mon, 26 May 2025 19:57:49 +0800
Subject: [PATCH 2/2] feat(http): Add readBodyStream function for improved
response handling
- Introduced a new `readBodyStream` function to read and process HTTP response bodies line by line, enhancing the ability to handle streaming data.
- Refactored `RequestAndParseStream` to utilize `readBodyStream`, improving code clarity and maintainability.
- Added a new test file `http_wrapper_reader_test.go` with comprehensive unit tests for parsing JSON bodies from chunked responses, ensuring robust handling of various scenarios including invalid JSON and large data sets.
---
internal/utils/http_requests/http_warpper.go | 75 +++++----
.../http_requests/http_wrapper_reader_test.go | 150 ++++++++++++++++++
2 files changed, 192 insertions(+), 33 deletions(-)
create mode 100644 internal/utils/http_requests/http_wrapper_reader_test.go
diff --git a/internal/utils/http_requests/http_warpper.go b/internal/utils/http_requests/http_warpper.go
index 8948aad1e..0859c10c2 100644
--- a/internal/utils/http_requests/http_warpper.go
+++ b/internal/utils/http_requests/http_warpper.go
@@ -75,6 +75,44 @@ func PatchAndParse[T any](client *http.Client, url string, options ...HttpOption
return RequestAndParse[T](client, url, "PATCH", options...)
}
+func readBodyStream(resp io.ReadCloser, callback func(data []byte) error) error {
+ reader := bufio.NewReaderSize(resp, 4*1024) // init with 4KB buffer
+ defer resp.Close()
+ for {
+ // read line by line
+ data, err := reader.ReadBytes('\n')
+ if err != nil {
+ if err != io.EOF {
+ log.Error("read body err:", err)
+ }
+ break
+ }
+
+ data = bytes.TrimSpace(data)
+ if len(data) == 0 {
+ continue
+ }
+
+ if bytes.HasPrefix(data, []byte("data:")) {
+ // split
+ data = data[5:]
+ }
+
+ if bytes.HasPrefix(data, []byte("event:")) {
+ // TODO: handle event
+ continue
+ }
+
+ // trim space
+ data = bytes.TrimSpace(data)
+ if err := callback(data); err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
func RequestAndParseStream[T any](client *http.Client, url string, method string, options ...HttpOptions) (*stream.Stream[T], error) {
resp, err := Request(client, url, method, options...)
if err != nil {
@@ -109,50 +147,21 @@ func RequestAndParseStream[T any](client *http.Client, url string, method string
"module": "http_requests",
"function": "RequestAndParseStream",
}, func() {
- reader := bufio.NewReaderSize(resp.Body, 4*1024) // init with 4KB buffer
- defer resp.Body.Close()
- for {
- // read line by line
- data, err := reader.ReadBytes('\n')
- if err != nil {
- if err != io.EOF {
- log.Error("read body err:", err)
- }
- break
- }
-
- data = bytes.TrimSpace(data)
- if len(data) == 0 {
- continue
- }
-
- if bytes.HasPrefix(data, []byte("data:")) {
- // split
- data = data[5:]
- }
-
- if bytes.HasPrefix(data, []byte("event:")) {
- // TODO: handle event
- continue
- }
-
- // trim space
- data = bytes.TrimSpace(data)
-
+ readBodyStream(resp.Body, func(data []byte) error {
// unmarshal
t, err := parser.UnmarshalJsonBytes[T](data)
if err != nil {
if raiseErrorWhenStreamDataNotMatch {
ch.WriteError(err)
- break
+ return err
} else {
log.Warn("stream data not match for %s, got %s", url, string(data))
}
- continue
}
ch.Write(t)
- }
+ return nil
+ })
ch.Close()
})
diff --git a/internal/utils/http_requests/http_wrapper_reader_test.go b/internal/utils/http_requests/http_wrapper_reader_test.go
new file mode 100644
index 000000000..cd08e3581
--- /dev/null
+++ b/internal/utils/http_requests/http_wrapper_reader_test.go
@@ -0,0 +1,150 @@
+package http_requests
+
+import (
+ "io"
+ "net/http"
+ "strings"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+type mockReader struct {
+ chunks []string
+ index int
+}
+
+func (m *mockReader) Read(p []byte) (n int, err error) {
+ if m.index >= len(m.chunks) {
+ return 0, io.EOF
+ }
+ n = copy(p, m.chunks[m.index])
+ m.index++
+ if m.index == len(m.chunks) {
+ return n, io.EOF
+ }
+ return n, nil
+}
+
+func TestParseJsonBody(t *testing.T) {
+ t.Run("multiple chunks with newlines", func(t *testing.T) {
+ chunks := []string{
+ `{"name": "John",`,
+ "\n",
+ `"age": 30}`,
+ "\n",
+ }
+ reader := &mockReader{chunks: chunks}
+ resp := &http.Response{Body: io.NopCloser(reader)}
+
+ var result map[string]interface{}
+ err := parseJsonBody(resp, &result)
+ assert.Nil(t, err)
+
+ assert.Equal(t, "John", result["name"])
+ assert.Equal(t, 30, int(result["age"].(float64)))
+ })
+
+ t.Run("chunks without newlines", func(t *testing.T) {
+ chunks := []string{
+ `{"name": "Alice",`,
+ `"age": 25}`,
+ }
+ reader := &mockReader{chunks: chunks}
+ resp := &http.Response{Body: io.NopCloser(reader)}
+
+ var result map[string]interface{}
+ err := parseJsonBody(resp, &result)
+ assert.Nil(t, err)
+ assert.Equal(t, "Alice", result["name"])
+ assert.Equal(t, 25, int(result["age"].(float64)))
+ })
+
+ t.Run("chunks with mixed newlines", func(t *testing.T) {
+ chunks := []string{
+ `{"name": "Bob",`,
+ "\n",
+ `"age": 35`,
+ `,"city": "New York"}`,
+ }
+ reader := &mockReader{chunks: chunks}
+ resp := &http.Response{Body: io.NopCloser(reader)}
+
+ var result map[string]interface{}
+ err := parseJsonBody(resp, &result)
+ assert.Nil(t, err)
+ assert.Equal(t, "Bob", result["name"])
+ assert.Equal(t, 35, int(result["age"].(float64)))
+ assert.Equal(t, "New York", result["city"])
+ })
+
+ t.Run("last chunk without newline", func(t *testing.T) {
+ chunks := []string{
+ `{"name": "Eve",`,
+ "\n",
+ `"age": 28}`,
+ }
+ reader := &mockReader{chunks: chunks}
+ resp := &http.Response{Body: io.NopCloser(reader)}
+
+ var result map[string]interface{}
+ err := parseJsonBody(resp, &result)
+ assert.Nil(t, err)
+ assert.Equal(t, "Eve", result["name"])
+ assert.Equal(t, 28, int(result["age"].(float64)))
+ })
+
+ t.Run("empty chunks", func(t *testing.T) {
+ chunks := []string{
+ "",
+ "\n",
+ "",
+ `{"name": "Charlie"}`,
+ "\n",
+ }
+ reader := &mockReader{chunks: chunks}
+ resp := &http.Response{Body: io.NopCloser(reader)}
+
+ var result map[string]interface{}
+ err := parseJsonBody(resp, &result)
+ assert.Nil(t, err)
+ assert.Equal(t, "Charlie", result["name"])
+ })
+
+ t.Run("invalid JSON", func(t *testing.T) {
+ chunks := []string{
+ `{"name": "Invalid`,
+ "\n",
+ `"age": }`,
+ }
+ reader := &mockReader{chunks: chunks}
+ resp := &http.Response{Body: io.NopCloser(reader)}
+
+ var result map[string]interface{}
+ err := parseJsonBody(resp, &result)
+ assert.NotNil(t, err)
+ })
+
+ t.Run("large JSON split across multiple chunks", func(t *testing.T) {
+ largeJSON := strings.Repeat(`{"key": "value"},`, 1000) // Create a large JSON array
+ largeJSON = "[" + largeJSON[:len(largeJSON)-1] + "]" // Remove last comma and wrap in array brackets
+
+ chunkSize := 100
+ chunks := make([]string, 0, len(largeJSON)/chunkSize+1)
+ for i := 0; i < len(largeJSON); i += chunkSize {
+ end := i + chunkSize
+ if end > len(largeJSON) {
+ end = len(largeJSON)
+ }
+ chunks = append(chunks, largeJSON[i:end])
+ }
+
+ reader := &mockReader{chunks: chunks}
+ resp := &http.Response{Body: io.NopCloser(reader)}
+
+ var result []map[string]string
+ err := parseJsonBody(resp, &result)
+ assert.Nil(t, err)
+ assert.Equal(t, 1000, len(result))
+ })
+}