From 7547e084c90b0d863df1dd53992ff80bd965fa1b Mon Sep 17 00:00:00 2001 From: Brian Lamb Date: Fri, 27 Feb 2026 08:32:36 -0700 Subject: [PATCH] fix: use atomic writes for parallel WAL restore spool Fixes a race condition where MoveOut could read partially-written WAL files during parallel restore, causing "invalid checkpoint record" errors. Downloads now write to .tmp files and atomically rename on completion. Co-Authored-By: Claude --- pkg/restorer/restorer.go | 24 ++++++++- pkg/spool/spool.go | 35 +++++++++++++ pkg/spool/spool_test.go | 104 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 161 insertions(+), 2 deletions(-) diff --git a/pkg/restorer/restorer.go b/pkg/restorer/restorer.go index 478a2a24..1ca5b5e2 100644 --- a/pkg/restorer/restorer.go +++ b/pkg/restorer/restorer.go @@ -161,18 +161,38 @@ func (restorer *WALRestorer) RestoreList( go func(walIndex int) { result := &resultList[walIndex] result.WalName = fetchList[walIndex] + + // Determine where to download the file + var downloadPath string if walIndex == 0 { // The WAL that PostgreSQL requested will go directly - // to the destination path + // to the destination path (no staging needed) + downloadPath = destinationPath result.DestinationPath = destinationPath } else { + // Prefetched WALs go to a temp file first to avoid race conditions + // where MoveOut could read a partially-written file + downloadPath = restorer.spool.TempFileName(result.WalName) result.DestinationPath = restorer.spool.FileName(result.WalName) } result.StartTime = time.Now() - result.Err = restorer.Restore(fetchList[walIndex], result.DestinationPath, options) + result.Err = restorer.Restore(fetchList[walIndex], downloadPath, options) result.EndTime = time.Now() + // For prefetched WALs, commit the temp file to make it visible, + // or clean up on failure + if walIndex != 0 { + if result.Err == nil { + if commitErr := restorer.spool.Commit(result.WalName); commitErr != nil { + result.Err = commitErr + } + } else { + // Clean up failed temp file + restorer.spool.CleanupTemp(result.WalName) + } + } + elapsedWalTime := result.EndTime.Sub(result.StartTime) if result.Err == nil { contextLog.Info( diff --git a/pkg/spool/spool.go b/pkg/spool/spool.go index 313eb7de..082b5f0c 100644 --- a/pkg/spool/spool.go +++ b/pkg/spool/spool.go @@ -31,6 +31,12 @@ import ( // on a file which doesn't exist var ErrorNonExistentFile = fs.ErrNotExist +const ( + // tempSuffix is appended to files that are being downloaded. + // Files with this suffix are not yet complete and should not be moved. + tempSuffix = ".tmp" +) + // WALSpool is a way to keep track of which WAL files were processes from the parallel // feature and not by PostgreSQL request. // It works using a directory, under which we create an empty file carrying the name @@ -101,3 +107,32 @@ func (spool *WALSpool) MoveOut(walName, destination string) (err error) { func (spool *WALSpool) FileName(walName string) string { return path.Join(spool.spoolDirectory, walName) } + +// TempFileName gets the temporary file path for a WAL being downloaded. +// Files should be written here first, then committed with Commit() to +// ensure atomic visibility to MoveOut and Contains. +func (spool *WALSpool) TempFileName(walName string) string { + return path.Join(spool.spoolDirectory, walName+tempSuffix) +} + +// Commit atomically moves a completed download from its temp path to the final path. +// This should be called after a successful download to make the file visible to MoveOut. +// The rename is atomic on POSIX systems when both paths are on the same filesystem. +func (spool *WALSpool) Commit(walName string) error { + tempPath := path.Join(spool.spoolDirectory, walName+tempSuffix) + finalPath := path.Join(spool.spoolDirectory, walName) + + if err := os.Rename(tempPath, finalPath); err != nil { + // Clean up the temp file on failure + _ = os.Remove(tempPath) + return fmt.Errorf("failed to commit WAL file %s: %w", walName, err) + } + return nil +} + +// CleanupTemp removes a temporary file if it exists. +// This should be called when a download fails to avoid leaving partial files. +func (spool *WALSpool) CleanupTemp(walName string) { + tempPath := path.Join(spool.spoolDirectory, walName+tempSuffix) + _ = os.Remove(tempPath) +} diff --git a/pkg/spool/spool_test.go b/pkg/spool/spool_test.go index e5abcac8..1d1d2c2e 100644 --- a/pkg/spool/spool_test.go +++ b/pkg/spool/spool_test.go @@ -93,4 +93,108 @@ var _ = Describe("Spool", func() { const walFile = "000000020000068A00000004" Expect(spool.FileName(walFile)).To(Equal(path.Join(tmpDir, walFile))) }) + + It("returns temp file path with .tmp suffix", func() { + const walFile = "000000020000068A00000005" + tempPath := spool.TempFileName(walFile) + Expect(tempPath).To(Equal(path.Join(tmpDir, walFile+".tmp"))) + }) + + It("commits a temp file to its final location", func() { + const walFile = "000000020000068A00000006" + + // Create a temp file with some content + tempPath := spool.TempFileName(walFile) + err := os.WriteFile(tempPath, []byte("test content"), 0o600) + Expect(err).ToNot(HaveOccurred()) + + // Temp file exists, final file does not + Expect(fileutils.FileExists(tempPath)).To(BeTrue()) + Expect(spool.Contains(walFile)).To(BeFalse()) + + // Commit the file + err = spool.Commit(walFile) + Expect(err).ToNot(HaveOccurred()) + + // Now final file exists, temp file does not + Expect(spool.Contains(walFile)).To(BeTrue()) + tempExists, _ := fileutils.FileExists(tempPath) + Expect(tempExists).To(BeFalse()) + + // Verify content was preserved + content, err := os.ReadFile(spool.FileName(walFile)) + Expect(err).ToNot(HaveOccurred()) + Expect(string(content)).To(Equal("test content")) + }) + + It("returns error when committing non-existent temp file", func() { + const walFile = "000000020000068A00000007" + + err := spool.Commit(walFile) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("failed to commit WAL file")) + }) + + It("cleans up temp files", func() { + const walFile = "000000020000068A00000008" + + // Create a temp file + tempPath := spool.TempFileName(walFile) + err := os.WriteFile(tempPath, []byte("test content"), 0o600) + Expect(err).ToNot(HaveOccurred()) + Expect(fileutils.FileExists(tempPath)).To(BeTrue()) + + // Clean it up + spool.CleanupTemp(walFile) + + // Temp file should be gone + tempExists, _ := fileutils.FileExists(tempPath) + Expect(tempExists).To(BeFalse()) + }) + + It("cleanup is safe on non-existent temp file", func() { + const walFile = "000000020000068A00000009" + + // This should not panic or error + spool.CleanupTemp(walFile) + }) + + // These two tests verify the race condition fix: + // temp files (.tmp) must be invisible to Contains and MoveOut + + It("Contains does NOT see temp files", func() { + const walFile = "000000020000068A0000000A" + + // Create a temp file (simulating an in-progress download) + tempPath := spool.TempFileName(walFile) + err := os.WriteFile(tempPath, []byte("partial content"), 0o600) + Expect(err).ToNot(HaveOccurred()) + + // Contains should return false - temp files are invisible + Expect(spool.Contains(walFile)).To(BeFalse()) + + // Clean up + spool.CleanupTemp(walFile) + }) + + It("MoveOut does NOT see temp files", func() { + const walFile = "000000020000068A0000000B" + + // Create a temp file (simulating an in-progress download) + tempPath := spool.TempFileName(walFile) + err := os.WriteFile(tempPath, []byte("partial content"), 0o600) + Expect(err).ToNot(HaveOccurred()) + + // MoveOut should fail with ErrorNonExistentFile - temp files are invisible + destinationPath := path.Join(tmpDir2, "testFile") + err = spool.MoveOut(walFile, destinationPath) + Expect(err).To(Equal(ErrorNonExistentFile)) + + // Destination should not exist + destExists, _ := fileutils.FileExists(destinationPath) + Expect(destExists).To(BeFalse()) + + // Clean up + spool.CleanupTemp(walFile) + }) })