Skip to content

Commit a89924d

Browse files
authored
feat: add snapshot cached manifests (#444)
1 parent bbdb227 commit a89924d

File tree

4 files changed

+118
-2
lines changed

4 files changed

+118
-2
lines changed

src/iceberg/manifest/manifest_list.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
#include "iceberg/partition_spec.h"
3232
#include "iceberg/result.h"
3333
#include "iceberg/schema_field.h"
34-
#include "iceberg/snapshot.h"
3534
#include "iceberg/table_metadata.h"
3635
#include "iceberg/type.h"
3736

@@ -107,7 +106,7 @@ struct ICEBERG_EXPORT ManifestFile {
107106
int64_t min_sequence_number = TableMetadata::kInitialSequenceNumber;
108107
/// Field id: 503
109108
/// ID of the snapshot where the manifest file was added
110-
int64_t added_snapshot_id = Snapshot::kInvalidSnapshotId;
109+
int64_t added_snapshot_id = -1; // Snapshot::kInvalidSnapshotId
111110
/// Field id: 504
112111
/// Number of entries in the manifest that have status ADDED (1), when null this is
113112
/// assumed to be non-zero

src/iceberg/manifest/manifest_writer.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
#include "iceberg/partition_summary_internal.h"
3030
#include "iceberg/result.h"
3131
#include "iceberg/schema.h"
32+
#include "iceberg/snapshot.h"
3233
#include "iceberg/table_metadata.h"
3334
#include "iceberg/util/macros.h"
3435

src/iceberg/snapshot.cc

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,11 @@
1919

2020
#include "iceberg/snapshot.h"
2121

22+
#include "iceberg/file_io.h"
23+
#include "iceberg/manifest/manifest_list.h"
24+
#include "iceberg/manifest/manifest_reader.h"
25+
#include "iceberg/util/macros.h"
26+
2227
namespace iceberg {
2328

2429
bool SnapshotRef::Branch::Equals(const SnapshotRef::Branch& other) const {
@@ -80,4 +85,60 @@ bool Snapshot::Equals(const Snapshot& other) const {
8085
schema_id == other.schema_id;
8186
}
8287

88+
Result<CachedSnapshot::ManifestsCache> CachedSnapshot::InitManifestsCache(
89+
const Snapshot& snapshot, std::shared_ptr<FileIO> file_io) {
90+
if (file_io == nullptr) {
91+
return InvalidArgument("Cannot cache manifests: FileIO is null");
92+
}
93+
94+
// Read manifest list
95+
ICEBERG_ASSIGN_OR_RAISE(auto reader,
96+
ManifestListReader::Make(snapshot.manifest_list, file_io));
97+
ICEBERG_ASSIGN_OR_RAISE(auto manifest_files, reader->Files());
98+
99+
std::vector<ManifestFile> manifests;
100+
manifests.reserve(manifest_files.size());
101+
102+
// Partition manifests: data manifests first, then delete manifests
103+
// First pass: collect data manifests
104+
for (const auto& manifest_file : manifest_files) {
105+
if (manifest_file.content == ManifestContent::kData) {
106+
manifests.push_back(manifest_file);
107+
}
108+
}
109+
size_t data_manifests_count = manifests.size();
110+
111+
// Second pass: append delete manifests
112+
for (const auto& manifest_file : manifest_files) {
113+
if (manifest_file.content == ManifestContent::kDeletes) {
114+
manifests.push_back(manifest_file);
115+
}
116+
}
117+
118+
return std::make_pair(std::move(manifests), data_manifests_count);
119+
}
120+
121+
Result<std::span<ManifestFile>> CachedSnapshot::Manifests(
122+
std::shared_ptr<FileIO> file_io) const {
123+
ICEBERG_ASSIGN_OR_RAISE(auto cache_ref, manifests_cache_.Get(snapshot_, file_io));
124+
auto& cache = cache_ref.get();
125+
return std::span<ManifestFile>(cache.first.data(), cache.first.size());
126+
}
127+
128+
Result<std::span<ManifestFile>> CachedSnapshot::DataManifests(
129+
std::shared_ptr<FileIO> file_io) const {
130+
ICEBERG_ASSIGN_OR_RAISE(auto cache_ref, manifests_cache_.Get(snapshot_, file_io));
131+
auto& cache = cache_ref.get();
132+
return std::span<ManifestFile>(cache.first.data(), cache.second);
133+
}
134+
135+
Result<std::span<ManifestFile>> CachedSnapshot::DeleteManifests(
136+
std::shared_ptr<FileIO> file_io) const {
137+
ICEBERG_ASSIGN_OR_RAISE(auto cache_ref, manifests_cache_.Get(snapshot_, file_io));
138+
auto& cache = cache_ref.get();
139+
const size_t delete_start = cache.second;
140+
const size_t delete_count = cache.first.size() - delete_start;
141+
return std::span<ManifestFile>(cache.first.data() + delete_start, delete_count);
142+
}
143+
83144
} // namespace iceberg

src/iceberg/snapshot.h

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,20 @@
1919

2020
#pragma once
2121

22+
#include <memory>
2223
#include <optional>
24+
#include <span>
2325
#include <string>
2426
#include <string_view>
2527
#include <unordered_map>
2628
#include <utility>
2729
#include <variant>
2830

2931
#include "iceberg/iceberg_export.h"
32+
#include "iceberg/manifest/manifest_list.h"
3033
#include "iceberg/result.h"
34+
#include "iceberg/type_fwd.h"
35+
#include "iceberg/util/lazy.h"
3136
#include "iceberg/util/timepoint.h"
3237

3338
namespace iceberg {
@@ -260,4 +265,54 @@ struct ICEBERG_EXPORT Snapshot {
260265
bool Equals(const Snapshot& other) const;
261266
};
262267

268+
/// \brief A snapshot with cached manifest loading capabilities.
269+
///
270+
/// This class wraps a Snapshot reference and provides lazy-loading of manifests.
271+
class ICEBERG_EXPORT CachedSnapshot {
272+
public:
273+
explicit CachedSnapshot(const Snapshot& snapshot) : snapshot_(snapshot) {}
274+
275+
/// \brief Get the underlying Snapshot reference
276+
const Snapshot& snapshot() const { return snapshot_; }
277+
278+
/// \brief Returns all ManifestFile instances for either data or delete manifests
279+
/// in this snapshot.
280+
///
281+
/// \param file_io The FileIO instance to use for reading the manifest list
282+
/// \return A span of ManifestFile instances, or an error
283+
Result<std::span<ManifestFile>> Manifests(std::shared_ptr<FileIO> file_io) const;
284+
285+
/// \brief Returns a ManifestFile for each data manifest in this snapshot.
286+
///
287+
/// \param file_io The FileIO instance to use for reading the manifest list
288+
/// \return A span of ManifestFile instances, or an error
289+
Result<std::span<ManifestFile>> DataManifests(std::shared_ptr<FileIO> file_io) const;
290+
291+
/// \brief Returns a ManifestFile for each delete manifest in this snapshot.
292+
///
293+
/// \param file_io The FileIO instance to use for reading the manifest list
294+
/// \return A span of ManifestFile instances, or an error
295+
Result<std::span<ManifestFile>> DeleteManifests(std::shared_ptr<FileIO> file_io) const;
296+
297+
private:
298+
/// \brief Cache structure for storing loaded manifests
299+
///
300+
/// \note Manifests are stored in a single vector with data manifests at the head
301+
/// and delete manifests at the tail, separated by the number of data manifests.
302+
using ManifestsCache = std::pair<std::vector<ManifestFile>, size_t>;
303+
304+
/// \brief Initialize manifests cache by loading them from the manifest list file.
305+
/// \param snapshot The snapshot to initialize the manifests cache for
306+
/// \param file_io The FileIO instance to use for reading the manifest list
307+
/// \return A result containing the manifests cache
308+
static Result<ManifestsCache> InitManifestsCache(const Snapshot& snapshot,
309+
std::shared_ptr<FileIO> file_io);
310+
311+
/// The underlying snapshot data
312+
const Snapshot& snapshot_;
313+
314+
/// Lazy-loaded manifests cache
315+
Lazy<InitManifestsCache> manifests_cache_;
316+
};
317+
263318
} // namespace iceberg

0 commit comments

Comments
 (0)