From 7b8b1900283a21ee8429f0fe4081ec79863f20c7 Mon Sep 17 00:00:00 2001 From: Matt Holloway Date: Mon, 26 Jan 2026 17:16:04 +0000 Subject: [PATCH 1/3] add line length truncate buffer --- pkg/buffer/buffer.go | 86 ++++++++++++++++++++++++++++++++++++++- pkg/buffer/buffer_test.go | 79 +++++++++++++++++++++++++++++++++++ 2 files changed, 164 insertions(+), 1 deletion(-) create mode 100644 pkg/buffer/buffer_test.go diff --git a/pkg/buffer/buffer.go b/pkg/buffer/buffer.go index 14bcf9582..6260252fc 100644 --- a/pkg/buffer/buffer.go +++ b/pkg/buffer/buffer.go @@ -3,10 +3,15 @@ package buffer import ( "bufio" "fmt" + "io" "net/http" "strings" ) +// maxLineSize is the maximum size for a single log line (10MB). +// GitHub Actions logs can contain extremely long lines (base64 content, minified JS, etc.) +const maxLineSize = 10 * 1024 * 1024 + // ProcessResponseAsRingBufferToEnd reads the body of an HTTP response line by line, // storing only the last maxJobLogLines lines using a ring buffer (sliding window). // This efficiently retains the most recent lines, overwriting older ones as needed. @@ -25,6 +30,7 @@ import ( // // The function uses a ring buffer to efficiently store only the last maxJobLogLines lines. // If the response contains more lines than maxJobLogLines, only the most recent lines are kept. +// Lines exceeding maxLineSize are truncated with a marker. func ProcessResponseAsRingBufferToEnd(httpResp *http.Response, maxJobLogLines int) (string, int, *http.Response, error) { if maxJobLogLines > 100000 { maxJobLogLines = 100000 @@ -36,7 +42,8 @@ func ProcessResponseAsRingBufferToEnd(httpResp *http.Response, maxJobLogLines in writeIndex := 0 scanner := bufio.NewScanner(httpResp.Body) - scanner.Buffer(make([]byte, 0, 64*1024), 1024*1024) + // Set initial buffer to 64KB and max token size to 10MB to handle very long lines + scanner.Buffer(make([]byte, 0, 64*1024), maxLineSize) for scanner.Scan() { line := scanner.Text() @@ -48,6 +55,11 @@ func ProcessResponseAsRingBufferToEnd(httpResp *http.Response, maxJobLogLines in } if err := scanner.Err(); err != nil { + // If we hit a token too long error, fall back to byte-by-byte reading + // with line truncation to handle extremely long lines gracefully + if err == bufio.ErrTooLong { + return processWithLongLineHandling(httpResp.Body, lines, validLines, totalLines, writeIndex, maxJobLogLines) + } return "", 0, httpResp, fmt.Errorf("failed to read log content: %w", err) } @@ -71,3 +83,75 @@ func ProcessResponseAsRingBufferToEnd(httpResp *http.Response, maxJobLogLines in return strings.Join(result, "\n"), totalLines, httpResp, nil } + +// processWithLongLineHandling continues processing after encountering a line +// that exceeds the scanner's max token size. It reads byte-by-byte and +// truncates extremely long lines instead of failing. +func processWithLongLineHandling(body io.Reader, lines []string, validLines []bool, totalLines, writeIndex, maxJobLogLines int) (string, int, *http.Response, error) { + // Add a marker that we encountered truncated content + truncatedMarker := "[LINE TRUNCATED - exceeded maximum line length of 10MB]" + lines[writeIndex] = truncatedMarker + validLines[writeIndex] = true + totalLines++ + writeIndex = (writeIndex + 1) % maxJobLogLines + + // Continue reading with a buffered reader, truncating long lines + reader := bufio.NewReader(body) + var currentLine strings.Builder + const maxDisplayLength = 1000 // Keep first 1000 chars of truncated lines + + for { + b, err := reader.ReadByte() + if err == io.EOF { + // Handle final line without newline + if currentLine.Len() > 0 { + line := currentLine.String() + if len(line) > maxLineSize { + line = line[:maxDisplayLength] + "... [TRUNCATED]" + } + lines[writeIndex] = line + validLines[writeIndex] = true + totalLines++ + } + break + } + if err != nil { + return "", 0, nil, fmt.Errorf("failed to read log content: %w", err) + } + + if b == '\n' { + line := currentLine.String() + if len(line) > maxLineSize { + line = line[:maxDisplayLength] + "... [TRUNCATED]" + } + lines[writeIndex] = line + validLines[writeIndex] = true + totalLines++ + writeIndex = (writeIndex + 1) % maxJobLogLines + currentLine.Reset() + } else if currentLine.Len() < maxLineSize+maxDisplayLength { + // Stop accumulating bytes once we exceed the limit (plus buffer for truncation message) + currentLine.WriteByte(b) + } + } + + var result []string + linesInBuffer := totalLines + if linesInBuffer > maxJobLogLines { + linesInBuffer = maxJobLogLines + } + + startIndex := 0 + if totalLines > maxJobLogLines { + startIndex = writeIndex + } + + for i := 0; i < linesInBuffer; i++ { + idx := (startIndex + i) % maxJobLogLines + if validLines[idx] { + result = append(result, lines[idx]) + } + } + + return strings.Join(result, "\n"), totalLines, nil, nil +} diff --git a/pkg/buffer/buffer_test.go b/pkg/buffer/buffer_test.go new file mode 100644 index 000000000..bef597c94 --- /dev/null +++ b/pkg/buffer/buffer_test.go @@ -0,0 +1,79 @@ +package buffer + +import ( + "io" + "net/http" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestProcessResponseAsRingBufferToEnd(t *testing.T) { + t.Run("normal lines", func(t *testing.T) { + body := "line1\nline2\nline3\n" + resp := &http.Response{ + Body: io.NopCloser(strings.NewReader(body)), + } + + result, totalLines, respOut, err := ProcessResponseAsRingBufferToEnd(resp, 10) + if respOut != nil && respOut.Body != nil { + defer respOut.Body.Close() + } + require.NoError(t, err) + assert.Equal(t, 3, totalLines) + assert.Equal(t, "line1\nline2\nline3", result) + }) + + t.Run("ring buffer keeps last N lines", func(t *testing.T) { + body := "line1\nline2\nline3\nline4\nline5\n" + resp := &http.Response{ + Body: io.NopCloser(strings.NewReader(body)), + } + + result, totalLines, respOut, err := ProcessResponseAsRingBufferToEnd(resp, 3) + if respOut != nil && respOut.Body != nil { + defer respOut.Body.Close() + } + require.NoError(t, err) + assert.Equal(t, 5, totalLines) + assert.Equal(t, "line3\nline4\nline5", result) + }) + + t.Run("handles very long line exceeding 10MB", func(t *testing.T) { + // Create a line that exceeds maxLineSize (10MB) + longLine := strings.Repeat("x", 11*1024*1024) // 11MB + body := "line1\n" + longLine + "\nline3\n" + resp := &http.Response{ + Body: io.NopCloser(strings.NewReader(body)), + } + + result, totalLines, respOut, err := ProcessResponseAsRingBufferToEnd(resp, 100) + if respOut != nil && respOut.Body != nil { + defer respOut.Body.Close() + } + require.NoError(t, err) + // Should have processed lines with truncation marker + assert.Greater(t, totalLines, 0) + assert.Contains(t, result, "TRUNCATED") + }) + + t.Run("handles line at exactly max size", func(t *testing.T) { + // Create a line just under maxLineSize + longLine := strings.Repeat("a", 1024*1024) // 1MB - should work fine + body := "start\n" + longLine + "\nend\n" + resp := &http.Response{ + Body: io.NopCloser(strings.NewReader(body)), + } + + result, totalLines, respOut, err := ProcessResponseAsRingBufferToEnd(resp, 100) + if respOut != nil && respOut.Body != nil { + defer respOut.Body.Close() + } + require.NoError(t, err) + assert.Equal(t, 3, totalLines) + assert.Contains(t, result, "start") + assert.Contains(t, result, "end") + }) +} From 17a45772ccdb023c297ed687515b5023d73f3dac Mon Sep 17 00:00:00 2001 From: Matt Holloway Date: Mon, 26 Jan 2026 17:46:47 +0000 Subject: [PATCH 2/3] consolidate buffer logic --- pkg/buffer/buffer.go | 148 ++++++++++++++++++-------------------- pkg/buffer/buffer_test.go | 64 +++++++++++++++++ 2 files changed, 135 insertions(+), 77 deletions(-) diff --git a/pkg/buffer/buffer.go b/pkg/buffer/buffer.go index 6260252fc..5f95c67c3 100644 --- a/pkg/buffer/buffer.go +++ b/pkg/buffer/buffer.go @@ -1,7 +1,7 @@ package buffer import ( - "bufio" + "bytes" "fmt" "io" "net/http" @@ -41,73 +41,82 @@ func ProcessResponseAsRingBufferToEnd(httpResp *http.Response, maxJobLogLines in totalLines := 0 writeIndex := 0 - scanner := bufio.NewScanner(httpResp.Body) - // Set initial buffer to 64KB and max token size to 10MB to handle very long lines - scanner.Buffer(make([]byte, 0, 64*1024), maxLineSize) + const readBufferSize = 64 * 1024 // 64KB read buffer + const maxDisplayLength = 1000 // Keep first 1000 chars of truncated lines - for scanner.Scan() { - line := scanner.Text() - totalLines++ - - lines[writeIndex] = line - validLines[writeIndex] = true - writeIndex = (writeIndex + 1) % maxJobLogLines - } - - if err := scanner.Err(); err != nil { - // If we hit a token too long error, fall back to byte-by-byte reading - // with line truncation to handle extremely long lines gracefully - if err == bufio.ErrTooLong { - return processWithLongLineHandling(httpResp.Body, lines, validLines, totalLines, writeIndex, maxJobLogLines) - } - return "", 0, httpResp, fmt.Errorf("failed to read log content: %w", err) - } - - var result []string - linesInBuffer := totalLines - if linesInBuffer > maxJobLogLines { - linesInBuffer = maxJobLogLines - } - - startIndex := 0 - if totalLines > maxJobLogLines { - startIndex = writeIndex - } - - for i := 0; i < linesInBuffer; i++ { - idx := (startIndex + i) % maxJobLogLines - if validLines[idx] { - result = append(result, lines[idx]) - } - } - - return strings.Join(result, "\n"), totalLines, httpResp, nil -} - -// processWithLongLineHandling continues processing after encountering a line -// that exceeds the scanner's max token size. It reads byte-by-byte and -// truncates extremely long lines instead of failing. -func processWithLongLineHandling(body io.Reader, lines []string, validLines []bool, totalLines, writeIndex, maxJobLogLines int) (string, int, *http.Response, error) { - // Add a marker that we encountered truncated content - truncatedMarker := "[LINE TRUNCATED - exceeded maximum line length of 10MB]" - lines[writeIndex] = truncatedMarker - validLines[writeIndex] = true - totalLines++ - writeIndex = (writeIndex + 1) % maxJobLogLines - - // Continue reading with a buffered reader, truncating long lines - reader := bufio.NewReader(body) + readBuf := make([]byte, readBufferSize) var currentLine strings.Builder - const maxDisplayLength = 1000 // Keep first 1000 chars of truncated lines + lineTruncated := false for { - b, err := reader.ReadByte() + n, err := httpResp.Body.Read(readBuf) + if n > 0 { + chunk := readBuf[:n] + for len(chunk) > 0 { + // Find the next newline in the chunk + newlineIdx := bytes.IndexByte(chunk, '\n') + + if newlineIdx >= 0 { + // Found a newline - complete the current line + if !lineTruncated { + remaining := maxLineSize - currentLine.Len() + if remaining > newlineIdx { + remaining = newlineIdx + } + if remaining > 0 { + currentLine.Write(chunk[:remaining]) + } + if currentLine.Len() >= maxLineSize { + lineTruncated = true + } + } + + // Store the completed line + line := currentLine.String() + if lineTruncated { + // Only keep first maxDisplayLength chars for truncated lines + if len(line) > maxDisplayLength { + line = line[:maxDisplayLength] + } + line += "... [TRUNCATED]" + } + lines[writeIndex] = line + validLines[writeIndex] = true + totalLines++ + writeIndex = (writeIndex + 1) % maxJobLogLines + + // Reset for next line + currentLine.Reset() + lineTruncated = false + chunk = chunk[newlineIdx+1:] + } else { + // No newline in remaining chunk - accumulate if not truncated + if !lineTruncated { + remaining := maxLineSize - currentLine.Len() + if remaining > len(chunk) { + remaining = len(chunk) + } + if remaining > 0 { + currentLine.Write(chunk[:remaining]) + } + if currentLine.Len() >= maxLineSize { + lineTruncated = true + } + } + break + } + } + } + if err == io.EOF { // Handle final line without newline if currentLine.Len() > 0 { line := currentLine.String() - if len(line) > maxLineSize { - line = line[:maxDisplayLength] + "... [TRUNCATED]" + if lineTruncated { + if len(line) > maxDisplayLength { + line = line[:maxDisplayLength] + } + line += "... [TRUNCATED]" } lines[writeIndex] = line validLines[writeIndex] = true @@ -116,22 +125,7 @@ func processWithLongLineHandling(body io.Reader, lines []string, validLines []bo break } if err != nil { - return "", 0, nil, fmt.Errorf("failed to read log content: %w", err) - } - - if b == '\n' { - line := currentLine.String() - if len(line) > maxLineSize { - line = line[:maxDisplayLength] + "... [TRUNCATED]" - } - lines[writeIndex] = line - validLines[writeIndex] = true - totalLines++ - writeIndex = (writeIndex + 1) % maxJobLogLines - currentLine.Reset() - } else if currentLine.Len() < maxLineSize+maxDisplayLength { - // Stop accumulating bytes once we exceed the limit (plus buffer for truncation message) - currentLine.WriteByte(b) + return "", 0, httpResp, fmt.Errorf("failed to read log content: %w", err) } } @@ -153,5 +147,5 @@ func processWithLongLineHandling(body io.Reader, lines []string, validLines []bo } } - return strings.Join(result, "\n"), totalLines, nil, nil + return strings.Join(result, "\n"), totalLines, httpResp, nil } diff --git a/pkg/buffer/buffer_test.go b/pkg/buffer/buffer_test.go index bef597c94..1e5c08b60 100644 --- a/pkg/buffer/buffer_test.go +++ b/pkg/buffer/buffer_test.go @@ -1,6 +1,7 @@ package buffer import ( + "fmt" "io" "net/http" "strings" @@ -76,4 +77,67 @@ func TestProcessResponseAsRingBufferToEnd(t *testing.T) { assert.Contains(t, result, "start") assert.Contains(t, result, "end") }) + + t.Run("ring buffer with long line in middle of many lines", func(t *testing.T) { + // Create many lines with a long line in the middle + // Ring buffer size is 5, so we should only keep the last 5 lines + var sb strings.Builder + for i := 1; i <= 10; i++ { + sb.WriteString(fmt.Sprintf("line%d\n", i)) + } + // Insert an 11MB line (exceeds maxLineSize of 10MB) + longLine := strings.Repeat("x", 11*1024*1024) + sb.WriteString(longLine) + sb.WriteString("\n") + for i := 11; i <= 20; i++ { + sb.WriteString(fmt.Sprintf("line%d\n", i)) + } + + resp := &http.Response{ + Body: io.NopCloser(strings.NewReader(sb.String())), + } + + result, totalLines, respOut, err := ProcessResponseAsRingBufferToEnd(resp, 5) + if respOut != nil && respOut.Body != nil { + defer respOut.Body.Close() + } + require.NoError(t, err) + // 10 lines before + 1 long line + 10 lines after = 21 total + assert.Equal(t, 21, totalLines) + // Should only have the last 5 lines (line16 through line20) + assert.Contains(t, result, "line16") + assert.Contains(t, result, "line17") + assert.Contains(t, result, "line18") + assert.Contains(t, result, "line19") + assert.Contains(t, result, "line20") + // Should NOT contain earlier lines + assert.NotContains(t, result, "line1\n") + assert.NotContains(t, result, "line10\n") + // The truncated line should not be in the last 5 + assert.NotContains(t, result, "TRUNCATED") + }) + + t.Run("ring buffer keeps truncated line when in last N", func(t *testing.T) { + // Long line followed by only 2 more lines, with ring buffer size 5 + longLine := strings.Repeat("y", 11*1024*1024) + body := "line1\nline2\nline3\n" + longLine + "\nlineA\nlineB\n" + resp := &http.Response{ + Body: io.NopCloser(strings.NewReader(body)), + } + + result, totalLines, respOut, err := ProcessResponseAsRingBufferToEnd(resp, 5) + if respOut != nil && respOut.Body != nil { + defer respOut.Body.Close() + } + require.NoError(t, err) + assert.Equal(t, 6, totalLines) + // Last 5: line2, line3, truncated, lineA, lineB + assert.Contains(t, result, "line2") + assert.Contains(t, result, "line3") + assert.Contains(t, result, "TRUNCATED") + assert.Contains(t, result, "lineA") + assert.Contains(t, result, "lineB") + // line1 should be rotated out + assert.NotContains(t, result, "line1") + }) } From b4f1e979b6f0cddff346c335ec74baeac2481b17 Mon Sep 17 00:00:00 2001 From: Sam Morrow Date: Tue, 27 Jan 2026 12:12:11 +0100 Subject: [PATCH 3/3] Refactor buffer processing for clarity and add edge case tests - Extract storeLine() and accumulate() helper closures to eliminate duplicated line processing and truncation logic - Simplify main loop by using early return pattern (newlineIdx < 0 -> break) - Add test for empty response body edge case - Add test for exact maxLineSize boundary condition (10MB) The refactored code reduces nesting and makes the flow clearer: accumulate handles byte collection with truncation detection, storeLine handles ring buffer storage with truncation markers. --- pkg/buffer/buffer.go | 102 ++++++++++++++++---------------------- pkg/buffer/buffer_test.go | 33 ++++++++++++ 2 files changed, 75 insertions(+), 60 deletions(-) diff --git a/pkg/buffer/buffer.go b/pkg/buffer/buffer.go index 5f95c67c3..54ed0be4d 100644 --- a/pkg/buffer/buffer.go +++ b/pkg/buffer/buffer.go @@ -48,79 +48,61 @@ func ProcessResponseAsRingBufferToEnd(httpResp *http.Response, maxJobLogLines in var currentLine strings.Builder lineTruncated := false + // storeLine saves the current line to the ring buffer and resets state + storeLine := func() { + line := currentLine.String() + if lineTruncated && len(line) > maxDisplayLength { + line = line[:maxDisplayLength] + } + if lineTruncated { + line += "... [TRUNCATED]" + } + lines[writeIndex] = line + validLines[writeIndex] = true + totalLines++ + writeIndex = (writeIndex + 1) % maxJobLogLines + currentLine.Reset() + lineTruncated = false + } + + // accumulate adds bytes to currentLine up to maxLineSize, sets lineTruncated if exceeded + accumulate := func(data []byte) { + if lineTruncated { + return + } + remaining := maxLineSize - currentLine.Len() + if remaining <= 0 { + lineTruncated = true + return + } + if remaining > len(data) { + remaining = len(data) + } + currentLine.Write(data[:remaining]) + if currentLine.Len() >= maxLineSize { + lineTruncated = true + } + } + for { n, err := httpResp.Body.Read(readBuf) if n > 0 { chunk := readBuf[:n] for len(chunk) > 0 { - // Find the next newline in the chunk newlineIdx := bytes.IndexByte(chunk, '\n') - - if newlineIdx >= 0 { - // Found a newline - complete the current line - if !lineTruncated { - remaining := maxLineSize - currentLine.Len() - if remaining > newlineIdx { - remaining = newlineIdx - } - if remaining > 0 { - currentLine.Write(chunk[:remaining]) - } - if currentLine.Len() >= maxLineSize { - lineTruncated = true - } - } - - // Store the completed line - line := currentLine.String() - if lineTruncated { - // Only keep first maxDisplayLength chars for truncated lines - if len(line) > maxDisplayLength { - line = line[:maxDisplayLength] - } - line += "... [TRUNCATED]" - } - lines[writeIndex] = line - validLines[writeIndex] = true - totalLines++ - writeIndex = (writeIndex + 1) % maxJobLogLines - - // Reset for next line - currentLine.Reset() - lineTruncated = false - chunk = chunk[newlineIdx+1:] - } else { - // No newline in remaining chunk - accumulate if not truncated - if !lineTruncated { - remaining := maxLineSize - currentLine.Len() - if remaining > len(chunk) { - remaining = len(chunk) - } - if remaining > 0 { - currentLine.Write(chunk[:remaining]) - } - if currentLine.Len() >= maxLineSize { - lineTruncated = true - } - } + if newlineIdx < 0 { + accumulate(chunk) break } + accumulate(chunk[:newlineIdx]) + storeLine() + chunk = chunk[newlineIdx+1:] } } if err == io.EOF { - // Handle final line without newline if currentLine.Len() > 0 { - line := currentLine.String() - if lineTruncated { - if len(line) > maxDisplayLength { - line = line[:maxDisplayLength] - } - line += "... [TRUNCATED]" - } - lines[writeIndex] = line - validLines[writeIndex] = true - totalLines++ + storeLine() } break } diff --git a/pkg/buffer/buffer_test.go b/pkg/buffer/buffer_test.go index 1e5c08b60..86308ec5e 100644 --- a/pkg/buffer/buffer_test.go +++ b/pkg/buffer/buffer_test.go @@ -117,6 +117,39 @@ func TestProcessResponseAsRingBufferToEnd(t *testing.T) { assert.NotContains(t, result, "TRUNCATED") }) + t.Run("empty response body", func(t *testing.T) { + resp := &http.Response{ + Body: io.NopCloser(strings.NewReader("")), + } + + result, totalLines, respOut, err := ProcessResponseAsRingBufferToEnd(resp, 10) + if respOut != nil && respOut.Body != nil { + defer respOut.Body.Close() + } + require.NoError(t, err) + assert.Equal(t, 0, totalLines) + assert.Equal(t, "", result) + }) + + t.Run("line at exactly maxLineSize boundary", func(t *testing.T) { + // Create a line at exactly maxLineSize (10MB) - should be truncated + exactLine := strings.Repeat("z", 10*1024*1024) + body := "before\n" + exactLine + "\nafter\n" + resp := &http.Response{ + Body: io.NopCloser(strings.NewReader(body)), + } + + result, totalLines, respOut, err := ProcessResponseAsRingBufferToEnd(resp, 10) + if respOut != nil && respOut.Body != nil { + defer respOut.Body.Close() + } + require.NoError(t, err) + assert.Equal(t, 3, totalLines) + assert.Contains(t, result, "before") + assert.Contains(t, result, "TRUNCATED") + assert.Contains(t, result, "after") + }) + t.Run("ring buffer keeps truncated line when in last N", func(t *testing.T) { // Long line followed by only 2 more lines, with ring buffer size 5 longLine := strings.Repeat("y", 11*1024*1024)