diff --git a/internal/rows/arrowbased/batchloader.go b/internal/rows/arrowbased/batchloader.go index 2d86478..aafff8f 100644 --- a/internal/rows/arrowbased/batchloader.go +++ b/internal/rows/arrowbased/batchloader.go @@ -297,7 +297,7 @@ func (cft *cloudFetchDownloadTask) Run() { downloadStart := time.Now() data, err := fetchBatchBytes(cft.ctx, cft.link, cft.minTimeToExpiry, cft.speedThresholdMbps, cft.httpClient) if err != nil { - cft.resultChan <- cloudFetchDownloadTaskResult{data: nil, err: err} + cft.sendResult(cloudFetchDownloadTaskResult{data: nil, err: err}) return } @@ -306,7 +306,7 @@ func (cft *cloudFetchDownloadTask) Run() { data.Close() //nolint:errcheck,gosec // G104: close after reading data downloadMs := time.Since(downloadStart).Milliseconds() if err != nil { - cft.resultChan <- cloudFetchDownloadTaskResult{data: nil, err: err} + cft.sendResult(cloudFetchDownloadTaskResult{data: nil, err: err}) return } @@ -316,10 +316,21 @@ func (cft *cloudFetchDownloadTask) Run() { cft.link.RowCount, ) - cft.resultChan <- cloudFetchDownloadTaskResult{data: bytes.NewReader(buf), err: nil, downloadMs: downloadMs} + cft.sendResult(cloudFetchDownloadTaskResult{data: bytes.NewReader(buf), err: nil, downloadMs: downloadMs}) }() } +// sendResult delivers the download result to the consumer, but drops it if the +// task's context has already been cancelled. Without this guard, a goroutine +// that finishes its work after the iterator is closed blocks forever on the +// unbuffered resultChan and pins the downloaded buffer in the heap (issue #356). +func (cft *cloudFetchDownloadTask) sendResult(result cloudFetchDownloadTaskResult) { + select { + case cft.resultChan <- result: + case <-cft.ctx.Done(): + } +} + // logCloudFetchSpeed calculates and logs download speed metrics func logCloudFetchSpeed(fullURL string, contentLength int64, duration time.Duration, speedThresholdMbps float64) { if contentLength > 0 && duration.Seconds() > 0 { diff --git a/internal/rows/arrowbased/batchloader_test.go b/internal/rows/arrowbased/batchloader_test.go index 59947ff..52e7dc4 100644 --- a/internal/rows/arrowbased/batchloader_test.go +++ b/internal/rows/arrowbased/batchloader_test.go @@ -6,7 +6,10 @@ import ( "fmt" "net/http" "net/http/httptest" + "runtime" + "strings" "sync" + "sync/atomic" "testing" "time" @@ -604,3 +607,97 @@ func generateMockArrowBytes(record arrow.Record) []byte { } return buf.Bytes() } + +// TestCloudFetchIterator_CloseReleasesInFlightDownloads reproduces issue #356: +// when the consumer closes the iterator while downloads are still in flight, +// goroutines that completed their HTTP fetch get permanently blocked sending +// to the unbuffered resultChan. They retain the downloaded buffers (Arrow +// allocations in earlier versions, raw bytes in current code) until process +// exit, producing a heap plateau that only releases on restart. +// +// The test schedules many concurrent downloads, lets them complete, and then +// closes the iterator without consuming the queued results. After Close +// returns, no cloudFetchDownloadTask goroutines must remain. +func TestCloudFetchIterator_CloseReleasesInFlightDownloads(t *testing.T) { + arrowBytes := generateMockArrowBytes(generateArrowRecord()) + + // Track in-flight downloads. The server signals each request as it starts + // and waits on a release channel so the test can hold downloads in the + // queued-but-not-yet-consumed state before closing the iterator. + var inFlight atomic.Int64 + release := make(chan struct{}) + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + inFlight.Add(1) + <-release + w.WriteHeader(http.StatusOK) + _, _ = w.Write(arrowBytes) + })) + defer server.Close() + + const nLinks = 20 + links := make([]*cli_service.TSparkArrowResultLink, nLinks) + for i := range links { + links[i] = &cli_service.TSparkArrowResultLink{ + FileLink: server.URL, + ExpiryTime: time.Now().Add(10 * time.Minute).Unix(), + StartRowOffset: int64(i), + RowCount: 1, + } + } + + cfg := config.WithDefaults() + cfg.UseLz4Compression = false + cfg.MaxDownloadThreads = 10 + + bi, err := NewCloudBatchIterator(context.Background(), links, 0, nil, cfg, nil) + assert.Nil(t, err) + + // Kick off the first batch download. The iterator schedules + // MaxDownloadThreads concurrent fetches behind the scenes. + go func() { _, _ = bi.Next() }() + + // Wait for all MaxDownloadThreads goroutines to be blocked inside the + // server handler (they've issued the GET and are waiting for the body). + assert.Eventually(t, func() bool { + return inFlight.Load() == int64(cfg.MaxDownloadThreads) + }, 5*time.Second, 10*time.Millisecond, "expected %d in-flight downloads", cfg.MaxDownloadThreads) + + // Release the downloads so each goroutine finishes its HTTP read and + // attempts to send its result on the unbuffered resultChan. Only the + // first task's result will be read (by the Next() call above); the rest + // will be queued, blocked on the send. + close(release) + + // Give the goroutines time to finish their HTTP work and reach the + // channel send. + time.Sleep(200 * time.Millisecond) + + // Close the iterator without consuming the remaining batches. + bi.Close() + + // After Close, every cloudFetchDownloadTask goroutine must exit. We don't + // compare against the total goroutine count because httptest keeps + // persistent server/transport goroutines around — we look only for our + // own download goroutines. + assert.Eventually(t, func() bool { + return countDownloadTaskGoroutines() == 0 + }, 5*time.Second, 50*time.Millisecond, + "cloudFetchDownloadTask goroutines leaked after Close: have %d", + countDownloadTaskGoroutines()) +} + +// countDownloadTaskGoroutines returns the number of live goroutines whose +// stack includes cloudFetchDownloadTask.Run. Used to detect the leak in +// issue #356. +func countDownloadTaskGoroutines() int { + buf := make([]byte, 64*1024) + for { + n := runtime.Stack(buf, true) + if n < len(buf) { + buf = buf[:n] + break + } + buf = make([]byte, 2*len(buf)) + } + return strings.Count(string(buf), "cloudFetchDownloadTask).Run") +}