diff --git a/src/iceberg/expire_snapshots.h b/src/iceberg/expire_snapshots.h new file mode 100644 index 000000000..c80561133 --- /dev/null +++ b/src/iceberg/expire_snapshots.h @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/expire_snapshots.h +/// API for removing old snapshots from a table + +#include +#include +#include +#include +#include + +#include "iceberg/iceberg_export.h" +#include "iceberg/pending_update.h" +#include "iceberg/type_fwd.h" + +namespace iceberg { + +/// \brief Cleanup level for snapshot expiration +/// +/// Controls which files are deleted during snapshot expiration. +enum class CleanupLevel { + /// Skip all file cleanup, only remove snapshot metadata + kNone, + /// Clean up only metadata files (manifests, manifest lists, statistics), + /// retain data files + kMetadataOnly, + /// Clean up both metadata and data files (default) + kAll, +}; + +/// \brief API for removing old snapshots from a table +/// +/// ExpireSnapshots accumulates snapshot deletions and commits the new snapshot +/// list to the table. This API does not allow deleting the current snapshot. +/// +/// When committing, changes are applied to the latest table metadata. Commit +/// conflicts are resolved by applying the changes to the new latest metadata +/// and reattempting the commit. +/// +/// Manifest files that are no longer used by valid snapshots will be deleted. +/// Data files that were deleted by snapshots that are expired will be deleted. +/// DeleteWith() can be used to pass an alternative deletion method. +/// +/// Apply() returns a list of the snapshots that will be removed (preview mode). +/// +/// Example usage: +/// \code +/// table.ExpireSnapshots() +/// .ExpireOlderThan(timestampMillis) +/// .RetainLast(5) +/// .Commit(); +/// \endcode +class ICEBERG_EXPORT ExpireSnapshots + : public PendingUpdateTyped>> { + public: + ~ExpireSnapshots() override = default; + + /// \brief Expire a specific snapshot identified by id + /// + /// Marks a specific snapshot for removal. This method can be called multiple + /// times to expire multiple snapshots. Snapshots marked by this method will + /// be expired even if they would be retained by RetainLast(). + /// + /// \param snapshot_id ID of the snapshot to expire + /// \return Reference to this for method chaining + virtual ExpireSnapshots& ExpireSnapshotId(int64_t snapshot_id) = 0; + + /// \brief Expire all snapshots older than the given timestamp + /// + /// Sets a timestamp threshold - all snapshots created before this time will + /// be expired (unless retained by RetainLast()). + /// + /// \param timestamp_millis Timestamp in milliseconds since epoch + /// \return Reference to this for method chaining + virtual ExpireSnapshots& ExpireOlderThan(int64_t timestamp_millis) = 0; + + /// \brief Retain the most recent ancestors of the current snapshot + /// + /// If a snapshot would be expired because it is older than the expiration + /// timestamp, but is one of the num_snapshots most recent ancestors of the + /// current state, it will be retained. This will not prevent snapshots + /// explicitly identified by ExpireSnapshotId() from expiring. + /// + /// This may keep more than num_snapshots ancestors if snapshots are added + /// concurrently. This may keep less than num_snapshots ancestors if the + /// current table state does not have that many. + /// + /// \param num_snapshots The number of snapshots to retain + /// \return Reference to this for method chaining + virtual ExpireSnapshots& RetainLast(int num_snapshots) = 0; + + /// \brief Set a custom file deletion callback + /// + /// Passes an alternative delete implementation that will be used for + /// manifests and data files. If this method is not called, unnecessary + /// manifests and data files will still be deleted using the default method. + /// + /// Manifest files that are no longer used by valid snapshots will be deleted. + /// Data files that were deleted by snapshots that are expired will be deleted. + /// + /// \param delete_func Callback function that will be called for each file to delete + /// \return Reference to this for method chaining + virtual ExpireSnapshots& DeleteWith( + std::function delete_func) = 0; + + /// \brief Configure the cleanup level for expired files + /// + /// This method provides fine-grained control over which files are cleaned up + /// during snapshot expiration. + /// + /// Use CleanupLevel::kMetadataOnly when data files are shared across tables or + /// when using procedures like add-files that may reference the same data files. + /// + /// Use CleanupLevel::kNone when data and metadata files may be more efficiently + /// removed using a distributed framework through the actions API. + /// + /// \param level The cleanup level to use for expired snapshots + /// \return Reference to this for method chaining + virtual ExpireSnapshots& CleanupLevel(enum CleanupLevel level) = 0; + + // Non-copyable, movable (inherited from PendingUpdate) + ExpireSnapshots(const ExpireSnapshots&) = delete; + ExpireSnapshots& operator=(const ExpireSnapshots&) = delete; + + protected: + ExpireSnapshots() = default; +}; + +} // namespace iceberg diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt index 25a03932d..e98b20ac1 100644 --- a/src/iceberg/test/CMakeLists.txt +++ b/src/iceberg/test/CMakeLists.txt @@ -79,6 +79,7 @@ add_iceberg_test(schema_test add_iceberg_test(table_test SOURCES + expire_snapshots_test.cc json_internal_test.cc pending_update_test.cc schema_json_test.cc diff --git a/src/iceberg/test/expire_snapshots_test.cc b/src/iceberg/test/expire_snapshots_test.cc new file mode 100644 index 000000000..0df1954a5 --- /dev/null +++ b/src/iceberg/test/expire_snapshots_test.cc @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/expire_snapshots.h" + +#include +#include + +#include + +#include "iceberg/result.h" +#include "iceberg/snapshot.h" +#include "iceberg/test/matchers.h" + +namespace iceberg { + +// Mock implementation of ExpireSnapshots for testing +// This mock tracks which methods were called to verify behavior +class MockExpireSnapshots : public ExpireSnapshots { + public: + MockExpireSnapshots() = default; + + Result>> Apply() override { + if (should_fail_) { + return ValidationFailed("Mock validation failed"); + } + apply_called_ = true; + + // Return a vector of snapshots that reflects the configuration + // In a real implementation, this would analyze the table and return + // snapshots that match the expiration criteria + std::vector> expired_snapshots; + + // Create mock snapshots for snapshots that would be expired + for (int64_t id : snapshot_ids_to_expire_) { + expired_snapshots.push_back(std::make_shared(Snapshot{ + .snapshot_id = id, + .parent_snapshot_id = std::nullopt, + .sequence_number = 1, + .timestamp_ms = TimePointMs{std::chrono::milliseconds{1000}}, + .manifest_list = "s3://bucket/metadata/snap-manifest-list.avro", + .summary = {}, + .schema_id = std::nullopt, + })); + } + + return expired_snapshots; + } + + Status Commit() override { + if (should_fail_commit_) { + return CommitFailed("Mock commit failed"); + } + commit_called_ = true; + + // Simulate file deletion if callback is set + if (delete_func_) { + // In a real implementation, this would delete manifest and data files + // For testing, just call the callback with test files + (*delete_func_)("manifest-1.avro"); + (*delete_func_)("data-1.parquet"); + } + + return {}; + } + + ExpireSnapshots& ExpireSnapshotId(int64_t snapshot_id) override { + snapshot_ids_to_expire_.push_back(snapshot_id); + return *this; + } + + ExpireSnapshots& ExpireOlderThan(int64_t timestamp_millis) override { + expire_older_than_ms_ = timestamp_millis; + return *this; + } + + ExpireSnapshots& RetainLast(int num_snapshots) override { + retain_last_ = num_snapshots; + return *this; + } + + ExpireSnapshots& DeleteWith( + std::function delete_func) override { + delete_func_ = std::move(delete_func); + return *this; + } + + ExpireSnapshots& CleanupLevel(enum CleanupLevel level) override { + cleanup_level_ = level; + return *this; + } + + void SetShouldFail(bool fail) { should_fail_ = fail; } + void SetShouldFailCommit(bool fail) { should_fail_commit_ = fail; } + bool ApplyCalled() const { return apply_called_; } + bool CommitCalled() const { return commit_called_; } + + private: + bool should_fail_ = false; + bool should_fail_commit_ = false; + bool apply_called_ = false; + bool commit_called_ = false; + + std::vector snapshot_ids_to_expire_; + std::optional expire_older_than_ms_; + std::optional retain_last_; + std::optional> delete_func_; + std::optional cleanup_level_; +}; + +TEST(ExpireSnapshotsTest, ExpireSnapshotId) { + MockExpireSnapshots expire; + expire.ExpireSnapshotId(123); + + // Verify through public API: Apply() should return snapshots to expire + auto result = expire.Apply(); + ASSERT_THAT(result, IsOk()); + + const auto& snapshots = result.value(); + EXPECT_EQ(snapshots.size(), 1); + EXPECT_EQ(snapshots[0]->snapshot_id, 123); +} + +TEST(ExpireSnapshotsTest, ExpireMultipleSnapshotIds) { + MockExpireSnapshots expire; + expire.ExpireSnapshotId(100).ExpireSnapshotId(200).ExpireSnapshotId(300); + + // Verify through public API + auto result = expire.Apply(); + ASSERT_THAT(result, IsOk()); + + const auto& snapshots = result.value(); + EXPECT_EQ(snapshots.size(), 3); + EXPECT_EQ(snapshots[0]->snapshot_id, 100); + EXPECT_EQ(snapshots[1]->snapshot_id, 200); + EXPECT_EQ(snapshots[2]->snapshot_id, 300); +} + +TEST(ExpireSnapshotsTest, ExpireOlderThan) { + MockExpireSnapshots expire; + int64_t timestamp_millis = 1609459200000; // 2021-01-01 00:00:00 + expire.ExpireOlderThan(timestamp_millis); + + // Just verify it doesn't error - timestamp filtering is implementation detail + auto result = expire.Apply(); + EXPECT_THAT(result, IsOk()); +} + +TEST(ExpireSnapshotsTest, RetainLast) { + MockExpireSnapshots expire; + expire.RetainLast(5); + + // Verify it doesn't error + auto result = expire.Apply(); + EXPECT_THAT(result, IsOk()); +} + +TEST(ExpireSnapshotsTest, DeleteWith) { + MockExpireSnapshots expire; + std::vector deleted_files; + + // Set up callback to track deleted files + expire.DeleteWith( + [&deleted_files](std::string_view path) { deleted_files.emplace_back(path); }); + + // Verify through public API: calling Commit() should invoke the callback + auto status = expire.Commit(); + EXPECT_THAT(status, IsOk()); + + // The mock implementation calls the delete callback with test files + EXPECT_EQ(deleted_files.size(), 2); + EXPECT_EQ(deleted_files[0], "manifest-1.avro"); + EXPECT_EQ(deleted_files[1], "data-1.parquet"); +} + +TEST(ExpireSnapshotsTest, CleanupLevelNone) { + MockExpireSnapshots expire; + expire.CleanupLevel(CleanupLevel::kNone); + + // Just verify it doesn't error + auto status = expire.Commit(); + EXPECT_THAT(status, IsOk()); +} + +TEST(ExpireSnapshotsTest, CleanupLevelMetadataOnly) { + MockExpireSnapshots expire; + expire.CleanupLevel(CleanupLevel::kMetadataOnly); + + auto status = expire.Commit(); + EXPECT_THAT(status, IsOk()); +} + +TEST(ExpireSnapshotsTest, CleanupLevelAll) { + MockExpireSnapshots expire; + expire.CleanupLevel(CleanupLevel::kAll); + + auto status = expire.Commit(); + EXPECT_THAT(status, IsOk()); +} + +TEST(ExpireSnapshotsTest, MethodChaining) { + MockExpireSnapshots expire; + + // Test that all methods return the reference for chaining + expire.ExpireSnapshotId(100) + .ExpireSnapshotId(200) + .ExpireOlderThan(1609459200000) + .RetainLast(5) + .CleanupLevel(CleanupLevel::kMetadataOnly); + + // Verify through public API + auto result = expire.Apply(); + ASSERT_THAT(result, IsOk()); + + const auto& snapshots = result.value(); + EXPECT_EQ(snapshots.size(), 2); +} + +TEST(ExpireSnapshotsTest, MethodChainingWithAllMethods) { + MockExpireSnapshots expire; + std::vector deleted_files; + + // Chain all builder methods together + expire.ExpireSnapshotId(100) + .ExpireOlderThan(1609459200000) + .RetainLast(5) + .DeleteWith( + [&deleted_files](std::string_view path) { deleted_files.emplace_back(path); }) + .CleanupLevel(CleanupLevel::kAll); + + // Verify through Apply() + auto result = expire.Apply(); + ASSERT_THAT(result, IsOk()); + EXPECT_EQ(result.value().size(), 1); + + // Verify through Commit() + auto status = expire.Commit(); + EXPECT_THAT(status, IsOk()); + EXPECT_EQ(deleted_files.size(), 2); +} + +TEST(ExpireSnapshotsTest, ApplySuccess) { + MockExpireSnapshots expire; + auto result = expire.Apply(); + EXPECT_THAT(result, IsOk()); + EXPECT_TRUE(expire.ApplyCalled()); +} + +TEST(ExpireSnapshotsTest, ApplyValidationFailed) { + MockExpireSnapshots expire; + expire.SetShouldFail(true); + auto result = expire.Apply(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("Mock validation failed")); +} + +TEST(ExpireSnapshotsTest, CommitSuccess) { + MockExpireSnapshots expire; + auto status = expire.Commit(); + EXPECT_THAT(status, IsOk()); + EXPECT_TRUE(expire.CommitCalled()); +} + +TEST(ExpireSnapshotsTest, CommitFailed) { + MockExpireSnapshots expire; + expire.SetShouldFailCommit(true); + auto status = expire.Commit(); + EXPECT_THAT(status, IsError(ErrorKind::kCommitFailed)); + EXPECT_THAT(status, HasErrorMessage("Mock commit failed")); +} + +TEST(ExpireSnapshotsTest, InheritanceFromPendingUpdate) { + std::unique_ptr base_ptr = std::make_unique(); + auto status = base_ptr->Commit(); + EXPECT_THAT(status, IsOk()); +} + +TEST(ExpireSnapshotsTest, InheritanceFromPendingUpdateTyped) { + std::unique_ptr>>> typed_ptr = + std::make_unique(); + auto status = typed_ptr->Commit(); + EXPECT_THAT(status, IsOk()); + + auto result = typed_ptr->Apply(); + EXPECT_THAT(result, IsOk()); +} + +} // namespace iceberg diff --git a/src/iceberg/type_fwd.h b/src/iceberg/type_fwd.h index 81681ebcd..e07d2d82b 100644 --- a/src/iceberg/type_fwd.h +++ b/src/iceberg/type_fwd.h @@ -160,6 +160,9 @@ class PendingUpdate; template class PendingUpdateTyped; +enum class CleanupLevel; +class ExpireSnapshots; + /// ---------------------------------------------------------------------------- /// TODO: Forward declarations below are not added yet. /// ----------------------------------------------------------------------------