From d4a0b99482049b15f75dc12e85763354c51a2975 Mon Sep 17 00:00:00 2001 From: Josh Friend Date: Mon, 23 Mar 2026 14:23:21 -0400 Subject: [PATCH 1/3] perf: parallel tar extraction with 64 worker goroutines MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The single-threaded bottleneck on restore was writing files to disk. Even though tar entries must be read sequentially (the format has no index), the actual file writes are independent. The extractor now dispatches each entry (buffered in memory, ≤4 MiB) to one of 64 worker goroutines that write concurrently. This hides the per-file syscall latency (~20µs × N files) behind parallelism, turning a 7+ second sequential write phase into one that completes within the download window. Large files (>4 MiB) are written inline by the main goroutine to keep peak memory bounded. Benchmarked on r8id.metal-48xlarge (NVMe, 96 cores) with a 334K-file bundle: 64 workers completes in ~6.3s vs ~13s sequential. --- internal/snapshot/snapshot.go | 178 ++++++++++++++++++++++++++++++++-- 1 file changed, 171 insertions(+), 7 deletions(-) diff --git a/internal/snapshot/snapshot.go b/internal/snapshot/snapshot.go index a3147e8..1ab2dcf 100644 --- a/internal/snapshot/snapshot.go +++ b/internal/snapshot/snapshot.go @@ -2,6 +2,7 @@ package snapshot import ( + "archive/tar" "bufio" "bytes" "context" @@ -12,6 +13,8 @@ import ( "os/exec" "path/filepath" "runtime" + "strings" + "sync" "time" "github.com/alecthomas/errors" @@ -187,9 +190,32 @@ func Restore(ctx context.Context, remote cache.Cache, key cache.Key, directory s return Extract(ctx, rc, directory, threads) } +const ( + // extractWorkers is the number of goroutines writing files concurrently + // during parallel tar extraction. Hides per-file open/write/close syscall + // latency so the tar-stream reader (and download pipeline behind it) is not + // stalled waiting for individual file writes to complete. + // Benchmarked on r8id.metal-48xlarge (NVMe, 96 cores) with a 334K-file + // bundle: 64 workers = 6.27s, 128 = 6.84s (extra GC pressure outweighs + // any I/O concurrency gain). + extractWorkers = 64 + // maxParallelFileSize is the largest file that will be buffered in memory + // and dispatched to the worker pool. Files larger than this are written + // inline in the main goroutine to keep peak memory bounded. + // At 4 MiB, 99.97% of Gradle cache entries go through the parallel path. + maxParallelFileSize = 4 << 20 // 4 MiB +) + // Extract decompresses a zstd+tar stream into directory, preserving all file // permissions, ownership, and symlinks. threads controls zstd parallelism; // 0 uses all available CPU cores. +// +// The single-threaded bottleneck on restore is writing files to disk. Even +// though tar entries must be read sequentially (the format has no index), the +// actual file writes are independent. The extractor dispatches each entry +// (buffered in memory, ≤4 MiB) to one of 64 worker goroutines that write +// concurrently. This hides the per-file syscall latency (~20µs × N files) +// behind parallelism. func Extract(ctx context.Context, r io.Reader, directory string, threads int) error { if threads <= 0 { threads = runtime.NumCPU() @@ -212,15 +238,153 @@ func Extract(ctx context.Context, r io.Reader, directory string, threads int) er } defer dec.Close() - tarCmd := exec.CommandContext(ctx, "tar", "-xpf", "-", "-C", directory) - tarCmd.Stdin = dec + return extractTarParallel(ctx, dec, directory) +} - var tarStderr bytes.Buffer - tarCmd.Stderr = &tarStderr +type writeJob struct { + target string + mode os.FileMode + data []byte +} + +// extractTarParallel reads a tar stream and writes files using a pool of +// goroutines. The main goroutine reads tar entries and buffers small file +// contents; workers write those files to disk concurrently. Large files are +// written inline to keep memory use bounded. +func extractTarParallel(ctx context.Context, r io.Reader, dir string) error { + jobs := make(chan writeJob, extractWorkers*2) + + var ( + wg sync.WaitGroup + writeErrOnce sync.Once + writeErr error + ) + + for range extractWorkers { + wg.Add(1) + go func() { + defer wg.Done() + for job := range jobs { + f, err := os.OpenFile(job.target, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, job.mode) + if err != nil { + writeErrOnce.Do(func() { writeErr = errors.Errorf("open %s: %w", filepath.Base(job.target), err) }) + continue + } + if _, err := f.Write(job.data); err != nil { + f.Close() //nolint:errcheck,gosec + writeErrOnce.Do(func() { writeErr = errors.Errorf("write %s: %w", filepath.Base(job.target), err) }) + continue + } + if err := f.Close(); err != nil { + writeErrOnce.Do(func() { writeErr = errors.Errorf("close %s: %w", filepath.Base(job.target), err) }) + } + } + }() + } + + copyBuf := make([]byte, 1<<20) // reused only for inline large-file writes + + // createdDirs is accessed only by the main goroutine, so no mutex needed. + createdDirs := make(map[string]struct{}) + ensureDir := func(d string) error { + if _, ok := createdDirs[d]; ok { + return nil + } + if err := os.MkdirAll(d, 0o750); err != nil { + return err + } + createdDirs[d] = struct{}{} + return nil + } + + tr := tar.NewReader(r) + var readErr error +loop: + for { + if err := ctx.Err(); err != nil { + readErr = errors.Wrap(err, "context cancelled") + break + } - if err := tarCmd.Run(); err != nil { - return errors.Errorf("tar failed: %w: %s", err, tarStderr.String()) + hdr, err := tr.Next() + if err == io.EOF { + break + } + if err != nil { + readErr = errors.Wrap(err, "read tar entry") + break + } + + name := filepath.Clean(hdr.Name) + if strings.HasPrefix(name, "..") { + readErr = errors.Errorf("tar entry %q escapes destination directory", hdr.Name) + break + } + + target := filepath.Join(dir, name) + + switch hdr.Typeflag { + case tar.TypeDir: + if err := ensureDir(target); err != nil { + readErr = errors.Errorf("mkdir %s: %w", hdr.Name, err) + break loop + } + + case tar.TypeReg: + if err := ensureDir(filepath.Dir(target)); err != nil { + readErr = errors.Errorf("mkdir %s: %w", hdr.Name, err) + break loop + } + + if hdr.Size <= maxParallelFileSize { + // Buffer in memory and dispatch to worker pool so the main + // goroutine can continue reading the tar stream immediately. + buf := make([]byte, hdr.Size) + if _, err := io.ReadFull(tr, buf); err != nil { + readErr = errors.Errorf("read %s: %w", hdr.Name, err) + break loop + } + // Propagate worker errors early. + if writeErr != nil { + readErr = writeErr + break loop + } + jobs <- writeJob{target: target, mode: hdr.FileInfo().Mode(), data: buf} + } else { + // Large file: write inline to keep memory bounded. + f, err := os.OpenFile(target, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, hdr.FileInfo().Mode()) + if err != nil { + readErr = errors.Errorf("open %s: %w", hdr.Name, err) + break loop + } + if _, err := io.CopyBuffer(f, io.LimitReader(tr, hdr.Size), copyBuf); err != nil { + f.Close() //nolint:errcheck,gosec + readErr = errors.Errorf("write %s: %w", hdr.Name, err) + break loop + } + if err := f.Close(); err != nil { + readErr = errors.Errorf("close %s: %w", hdr.Name, err) + break loop + } + } + + case tar.TypeSymlink: + if err := ensureDir(filepath.Dir(target)); err != nil { + readErr = errors.Errorf("mkdir for symlink %s: %w", hdr.Name, err) + break loop + } + if err := os.Symlink(hdr.Linkname, target); err != nil { + readErr = errors.Errorf("symlink %s → %s: %w", hdr.Name, hdr.Linkname, err) + break loop + } + } } - return nil + close(jobs) + wg.Wait() + + if readErr != nil { + return readErr + } + return writeErr } From 15f1526542e731ac35b2347b98bf52694ba5aee7 Mon Sep 17 00:00:00 2001 From: Josh Friend Date: Mon, 23 Mar 2026 14:41:58 -0400 Subject: [PATCH 2/3] fix: handle hardlinks and preserve directory permissions in parallel tar extraction Add tar.TypeLink support for hardlinks (git local clones create hardlinked pack objects) and use tar header mode for directories instead of hardcoded 0o750. --- internal/snapshot/snapshot.go | 74 +++++++++++++++++++++++++++-------- 1 file changed, 57 insertions(+), 17 deletions(-) diff --git a/internal/snapshot/snapshot.go b/internal/snapshot/snapshot.go index 1ab2dcf..ee9c4e8 100644 --- a/internal/snapshot/snapshot.go +++ b/internal/snapshot/snapshot.go @@ -247,11 +247,36 @@ type writeJob struct { data []byte } +// safePath validates that name is a relative path that stays within dir when +// joined. It rejects absolute paths and parent traversals (".."). Returns the +// resolved path under dir. +func safePath(dir, name string) (string, error) { + clean := filepath.Clean(name) + if filepath.IsAbs(clean) { + return "", errors.Errorf("path %q is absolute", name) + } + if clean == ".." || strings.HasPrefix(clean, ".."+string(os.PathSeparator)) { + return "", errors.Errorf("path %q escapes destination directory", name) + } + joined := filepath.Join(dir, clean) + if !strings.HasPrefix(joined, dir+string(os.PathSeparator)) && joined != dir { + return "", errors.Errorf("path %q resolves outside destination directory", name) + } + return joined, nil +} + // extractTarParallel reads a tar stream and writes files using a pool of // goroutines. The main goroutine reads tar entries and buffers small file // contents; workers write those files to disk concurrently. Large files are // written inline to keep memory use bounded. func extractTarParallel(ctx context.Context, r io.Reader, dir string) error { + // Resolve dir to absolute so containment checks are reliable. + var err error + dir, err = filepath.Abs(dir) + if err != nil { + return errors.Wrap(err, "resolve destination directory") + } + jobs := make(chan writeJob, extractWorkers*2) var ( @@ -261,9 +286,7 @@ func extractTarParallel(ctx context.Context, r io.Reader, dir string) error { ) for range extractWorkers { - wg.Add(1) - go func() { - defer wg.Done() + wg.Go(func() { for job := range jobs { f, err := os.OpenFile(job.target, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, job.mode) if err != nil { @@ -279,19 +302,19 @@ func extractTarParallel(ctx context.Context, r io.Reader, dir string) error { writeErrOnce.Do(func() { writeErr = errors.Errorf("close %s: %w", filepath.Base(job.target), err) }) } } - }() + }) } copyBuf := make([]byte, 1<<20) // reused only for inline large-file writes // createdDirs is accessed only by the main goroutine, so no mutex needed. createdDirs := make(map[string]struct{}) - ensureDir := func(d string) error { + ensureDir := func(d string, mode os.FileMode) error { if _, ok := createdDirs[d]; ok { return nil } - if err := os.MkdirAll(d, 0o750); err != nil { - return err + if err := os.MkdirAll(d, mode); err != nil { //nolint:gosec // path is validated by caller + return errors.Wrap(err, "mkdir") } createdDirs[d] = struct{}{} return nil @@ -307,7 +330,7 @@ loop: } hdr, err := tr.Next() - if err == io.EOF { + if errors.Is(err, io.EOF) { break } if err != nil { @@ -315,23 +338,36 @@ loop: break } - name := filepath.Clean(hdr.Name) - if strings.HasPrefix(name, "..") { - readErr = errors.Errorf("tar entry %q escapes destination directory", hdr.Name) + target, err := safePath(dir, hdr.Name) + if err != nil { + readErr = errors.Errorf("unsafe tar entry %q: %w", hdr.Name, err) break } - target := filepath.Join(dir, name) - switch hdr.Typeflag { case tar.TypeDir: - if err := ensureDir(target); err != nil { + if err := ensureDir(target, hdr.FileInfo().Mode()); err != nil { readErr = errors.Errorf("mkdir %s: %w", hdr.Name, err) break loop } + case tar.TypeLink: + if err := ensureDir(filepath.Dir(target), 0o755); err != nil { + readErr = errors.Errorf("mkdir for hardlink %s: %w", hdr.Name, err) + break loop + } + linkTarget, err := safePath(dir, hdr.Linkname) + if err != nil { + readErr = errors.Errorf("unsafe hardlink target %q: %w", hdr.Linkname, err) + break loop + } + if err := os.Link(linkTarget, target); err != nil { + readErr = errors.Errorf("hardlink %s → %s: %w", hdr.Name, hdr.Linkname, err) + break loop + } + case tar.TypeReg: - if err := ensureDir(filepath.Dir(target)); err != nil { + if err := ensureDir(filepath.Dir(target), 0o755); err != nil { readErr = errors.Errorf("mkdir %s: %w", hdr.Name, err) break loop } @@ -352,7 +388,7 @@ loop: jobs <- writeJob{target: target, mode: hdr.FileInfo().Mode(), data: buf} } else { // Large file: write inline to keep memory bounded. - f, err := os.OpenFile(target, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, hdr.FileInfo().Mode()) + f, err := os.OpenFile(target, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, hdr.FileInfo().Mode()) //nolint:gosec // path traversal guarded above if err != nil { readErr = errors.Errorf("open %s: %w", hdr.Name, err) break loop @@ -369,10 +405,14 @@ loop: } case tar.TypeSymlink: - if err := ensureDir(filepath.Dir(target)); err != nil { + if err := ensureDir(filepath.Dir(target), 0o755); err != nil { readErr = errors.Errorf("mkdir for symlink %s: %w", hdr.Name, err) break loop } + if _, err := safePath(dir, hdr.Linkname); err != nil { + readErr = errors.Errorf("unsafe symlink target %q: %w", hdr.Linkname, err) + break loop + } if err := os.Symlink(hdr.Linkname, target); err != nil { readErr = errors.Errorf("symlink %s → %s: %w", hdr.Name, hdr.Linkname, err) break loop From 5c96b93a543b1d5eb854316c5dba31855d536d06 Mon Sep 17 00:00:00 2001 From: Josh Friend Date: Tue, 24 Mar 2026 13:59:12 -0400 Subject: [PATCH 3/3] perf: parallel range-GET S3 downloads for large objects Replace the single GetObject stream in S3.Open with parallel range-GET requests for objects larger than 32 MiB. 8 workers download chunks concurrently and reassemble them in order via io.Pipe, multiplying S3 throughput for cold snapshot downloads (observed 32 MB/s single-stream vs expected ~250+ MB/s with parallel connections). --- internal/cache/s3.go | 8 +-- internal/cache/s3_parallel_get.go | 116 ++++++++++++++++++++++++++++++ 2 files changed, 120 insertions(+), 4 deletions(-) create mode 100644 internal/cache/s3_parallel_get.go diff --git a/internal/cache/s3.go b/internal/cache/s3.go index f2a029f..1206456 100644 --- a/internal/cache/s3.go +++ b/internal/cache/s3.go @@ -262,13 +262,13 @@ func (s *S3) Open(ctx context.Context, key Key) (io.ReadCloser, http.Header, err } } - // Get object - obj, err := s.client.GetObject(ctx, s.config.Bucket, objectName, minio.GetObjectOptions{}) + // Download object using parallel range-GET for large objects. + reader, err := s.parallelGetReader(ctx, s.config.Bucket, objectName, objInfo.Size) if err != nil { - return nil, nil, errors.Errorf("failed to get object: %w", err) + return nil, nil, err } - return &s3Reader{obj: obj}, headers, nil + return reader, headers, nil } // refreshExpiration updates the Expires-At metadata on an S3 object using diff --git a/internal/cache/s3_parallel_get.go b/internal/cache/s3_parallel_get.go new file mode 100644 index 0000000..b12dbb9 --- /dev/null +++ b/internal/cache/s3_parallel_get.go @@ -0,0 +1,116 @@ +package cache + +import ( + "context" + "io" + "sync" + + "github.com/alecthomas/errors" + "github.com/minio/minio-go/v7" +) + +const ( + // s3DownloadChunkSize is the size of each parallel range-GET request. + // 32 MiB matches the gradle-cache-tool's benchmarked default. + s3DownloadChunkSize = 32 << 20 + // s3DownloadWorkers is the number of concurrent range-GET requests. + // Benchmarking showed no throughput difference from 4 to 128 workers + // (extraction IOPS is the bottleneck), so 8 keeps connection count low. + s3DownloadWorkers = 8 +) + +// parallelGetReader returns an io.ReadCloser that downloads the S3 object +// using parallel range-GET requests and reassembles chunks in order. +// For objects smaller than one chunk, it falls back to a single GetObject. +func (s *S3) parallelGetReader(ctx context.Context, bucket, objectName string, size int64) (io.ReadCloser, error) { + if size <= s3DownloadChunkSize { + // Small object: single stream. + obj, err := s.client.GetObject(ctx, bucket, objectName, minio.GetObjectOptions{}) + if err != nil { + return nil, errors.Errorf("failed to get object: %w", err) + } + return &s3Reader{obj: obj}, nil + } + + // Large object: parallel range requests reassembled in order via io.Pipe. + pr, pw := io.Pipe() + go func() { + pw.CloseWithError(s.parallelGet(ctx, bucket, objectName, size, pw)) + }() + return pr, nil +} + +// parallelGet downloads an S3 object in parallel chunks and writes them in +// order to w. Each worker downloads its chunk into memory so the TCP +// connection stays active at full speed. Peak memory: numWorkers × chunkSize. +func (s *S3) parallelGet(ctx context.Context, bucket, objectName string, size int64, w io.Writer) error { + numChunks := int((size + s3DownloadChunkSize - 1) / s3DownloadChunkSize) + numWorkers := min(s3DownloadWorkers, numChunks) + + type chunkResult struct { + data []byte + err error + } + + // One buffered channel per chunk so workers never block after reading. + results := make([]chan chunkResult, numChunks) + for i := range results { + results[i] = make(chan chunkResult, 1) + } + + // Work queue of chunk indices. + work := make(chan int, numChunks) + for i := range numChunks { + work <- i + } + close(work) + + var wg sync.WaitGroup + for range numWorkers { + wg.Go(func() { + for seq := range work { + start := int64(seq) * s3DownloadChunkSize + end := min(start+s3DownloadChunkSize-1, size-1) + + opts := minio.GetObjectOptions{} + if err := opts.SetRange(start, end); err != nil { + results[seq] <- chunkResult{err: errors.Errorf("set range %d-%d: %w", start, end, err)} + continue + } + + obj, err := s.client.GetObject(ctx, bucket, objectName, opts) + if err != nil { + results[seq] <- chunkResult{err: errors.Errorf("get range %d-%d: %w", start, end, err)} + continue + } + + // Drain the body immediately so the TCP connection stays at + // full speed. All workers do this concurrently, saturating + // the available S3 bandwidth. + data, readErr := io.ReadAll(obj) + obj.Close() //nolint:errcheck,gosec + results[seq] <- chunkResult{data: data, err: readErr} + } + }) + } + + // Write chunks in order. Each receive blocks until that chunk's worker + // finishes, while other workers continue downloading concurrently. + var writeErr error + for _, ch := range results { + r := <-ch + if writeErr != nil { + continue // drain remaining channels so goroutines can exit + } + if r.err != nil { + writeErr = r.err + continue + } + if _, err := w.Write(r.data); err != nil { + writeErr = err + } + } + + wg.Wait() + return writeErr +}