Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 22 additions & 2 deletions pkg/restorer/restorer.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,18 +161,38 @@ func (restorer *WALRestorer) RestoreList(
go func(walIndex int) {
result := &resultList[walIndex]
result.WalName = fetchList[walIndex]

// Determine where to download the file
var downloadPath string
if walIndex == 0 {
// The WAL that PostgreSQL requested will go directly
// to the destination path
// to the destination path (no staging needed)
downloadPath = destinationPath
result.DestinationPath = destinationPath
} else {
// Prefetched WALs go to a temp file first to avoid race conditions
// where MoveOut could read a partially-written file
downloadPath = restorer.spool.TempFileName(result.WalName)
result.DestinationPath = restorer.spool.FileName(result.WalName)
}

result.StartTime = time.Now()
result.Err = restorer.Restore(fetchList[walIndex], result.DestinationPath, options)
result.Err = restorer.Restore(fetchList[walIndex], downloadPath, options)
result.EndTime = time.Now()

// For prefetched WALs, commit the temp file to make it visible,
// or clean up on failure
if walIndex != 0 {
if result.Err == nil {
if commitErr := restorer.spool.Commit(result.WalName); commitErr != nil {
result.Err = commitErr
}
} else {
// Clean up failed temp file
restorer.spool.CleanupTemp(result.WalName)
}
}

elapsedWalTime := result.EndTime.Sub(result.StartTime)
if result.Err == nil {
contextLog.Info(
Expand Down
35 changes: 35 additions & 0 deletions pkg/spool/spool.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ import (
// on a file which doesn't exist
var ErrorNonExistentFile = fs.ErrNotExist

const (
// tempSuffix is appended to files that are being downloaded.
// Files with this suffix are not yet complete and should not be moved.
tempSuffix = ".tmp"
)

// WALSpool is a way to keep track of which WAL files were processes from the parallel
// feature and not by PostgreSQL request.
// It works using a directory, under which we create an empty file carrying the name
Expand Down Expand Up @@ -101,3 +107,32 @@ func (spool *WALSpool) MoveOut(walName, destination string) (err error) {
func (spool *WALSpool) FileName(walName string) string {
return path.Join(spool.spoolDirectory, walName)
}

// TempFileName gets the temporary file path for a WAL being downloaded.
// Files should be written here first, then committed with Commit() to
// ensure atomic visibility to MoveOut and Contains.
func (spool *WALSpool) TempFileName(walName string) string {
return path.Join(spool.spoolDirectory, walName+tempSuffix)
}

// Commit atomically moves a completed download from its temp path to the final path.
// This should be called after a successful download to make the file visible to MoveOut.
// The rename is atomic on POSIX systems when both paths are on the same filesystem.
func (spool *WALSpool) Commit(walName string) error {
tempPath := path.Join(spool.spoolDirectory, walName+tempSuffix)
finalPath := path.Join(spool.spoolDirectory, walName)

if err := os.Rename(tempPath, finalPath); err != nil {
// Clean up the temp file on failure
_ = os.Remove(tempPath)
return fmt.Errorf("failed to commit WAL file %s: %w", walName, err)
}
return nil
}

// CleanupTemp removes a temporary file if it exists.
// This should be called when a download fails to avoid leaving partial files.
func (spool *WALSpool) CleanupTemp(walName string) {
tempPath := path.Join(spool.spoolDirectory, walName+tempSuffix)
_ = os.Remove(tempPath)
}
104 changes: 104 additions & 0 deletions pkg/spool/spool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,4 +93,108 @@ var _ = Describe("Spool", func() {
const walFile = "000000020000068A00000004"
Expect(spool.FileName(walFile)).To(Equal(path.Join(tmpDir, walFile)))
})

It("returns temp file path with .tmp suffix", func() {
const walFile = "000000020000068A00000005"
tempPath := spool.TempFileName(walFile)
Expect(tempPath).To(Equal(path.Join(tmpDir, walFile+".tmp")))
})

It("commits a temp file to its final location", func() {
const walFile = "000000020000068A00000006"

// Create a temp file with some content
tempPath := spool.TempFileName(walFile)
err := os.WriteFile(tempPath, []byte("test content"), 0o600)
Expect(err).ToNot(HaveOccurred())

// Temp file exists, final file does not
Expect(fileutils.FileExists(tempPath)).To(BeTrue())
Expect(spool.Contains(walFile)).To(BeFalse())

// Commit the file
err = spool.Commit(walFile)
Expect(err).ToNot(HaveOccurred())

// Now final file exists, temp file does not
Expect(spool.Contains(walFile)).To(BeTrue())
tempExists, _ := fileutils.FileExists(tempPath)
Expect(tempExists).To(BeFalse())

// Verify content was preserved
content, err := os.ReadFile(spool.FileName(walFile))
Expect(err).ToNot(HaveOccurred())
Expect(string(content)).To(Equal("test content"))
})

It("returns error when committing non-existent temp file", func() {
const walFile = "000000020000068A00000007"

err := spool.Commit(walFile)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("failed to commit WAL file"))
})

It("cleans up temp files", func() {
const walFile = "000000020000068A00000008"

// Create a temp file
tempPath := spool.TempFileName(walFile)
err := os.WriteFile(tempPath, []byte("test content"), 0o600)
Expect(err).ToNot(HaveOccurred())
Expect(fileutils.FileExists(tempPath)).To(BeTrue())

// Clean it up
spool.CleanupTemp(walFile)

// Temp file should be gone
tempExists, _ := fileutils.FileExists(tempPath)
Expect(tempExists).To(BeFalse())
})

It("cleanup is safe on non-existent temp file", func() {
const walFile = "000000020000068A00000009"

// This should not panic or error
spool.CleanupTemp(walFile)
})

// These two tests verify the race condition fix:
// temp files (.tmp) must be invisible to Contains and MoveOut

It("Contains does NOT see temp files", func() {
const walFile = "000000020000068A0000000A"

// Create a temp file (simulating an in-progress download)
tempPath := spool.TempFileName(walFile)
err := os.WriteFile(tempPath, []byte("partial content"), 0o600)
Expect(err).ToNot(HaveOccurred())

// Contains should return false - temp files are invisible
Expect(spool.Contains(walFile)).To(BeFalse())

// Clean up
spool.CleanupTemp(walFile)
})

It("MoveOut does NOT see temp files", func() {
const walFile = "000000020000068A0000000B"

// Create a temp file (simulating an in-progress download)
tempPath := spool.TempFileName(walFile)
err := os.WriteFile(tempPath, []byte("partial content"), 0o600)
Expect(err).ToNot(HaveOccurred())

// MoveOut should fail with ErrorNonExistentFile - temp files are invisible
destinationPath := path.Join(tmpDir2, "testFile")
err = spool.MoveOut(walFile, destinationPath)
Expect(err).To(Equal(ErrorNonExistentFile))

// Destination should not exist
destExists, _ := fileutils.FileExists(destinationPath)
Expect(destExists).To(BeFalse())

// Clean up
spool.CleanupTemp(walFile)
})
})