Skip to content

Commit 39bd868

Browse files
committed
feat: add snapshot cached manifests
1 parent 61a7de5 commit 39bd868

9 files changed

Lines changed: 228 additions & 101 deletions

src/iceberg/json_internal.cc

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -611,10 +611,16 @@ Result<std::unique_ptr<Snapshot>> SnapshotFromJson(const nlohmann::json& json) {
611611

612612
ICEBERG_ASSIGN_OR_RAISE(auto schema_id, GetJsonValueOptional<int32_t>(json, kSchemaId));
613613

614-
return std::make_unique<Snapshot>(
615-
snapshot_id, parent_snapshot_id,
616-
sequence_number.value_or(TableMetadata::kInitialSequenceNumber), timestamp_ms,
617-
manifest_list, std::move(summary), schema_id);
614+
auto snapshot = std::make_unique<Snapshot>();
615+
snapshot->snapshot_id = snapshot_id;
616+
snapshot->parent_snapshot_id = parent_snapshot_id;
617+
snapshot->sequence_number =
618+
sequence_number.value_or(TableMetadata::kInitialSequenceNumber);
619+
snapshot->timestamp_ms = timestamp_ms;
620+
snapshot->manifest_list = manifest_list;
621+
snapshot->summary = std::move(summary);
622+
snapshot->schema_id = schema_id;
623+
return snapshot;
618624
}
619625

620626
nlohmann::json ToJson(const BlobMetadata& blob_metadata) {

src/iceberg/snapshot.cc

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,11 @@
1919

2020
#include "iceberg/snapshot.h"
2121

22+
#include "iceberg/file_io.h"
23+
#include "iceberg/manifest/manifest_list.h"
24+
#include "iceberg/manifest/manifest_reader.h"
25+
#include "iceberg/util/macros.h"
26+
2227
namespace iceberg {
2328

2429
bool SnapshotRef::Branch::Equals(const SnapshotRef::Branch& other) const {
@@ -70,6 +75,10 @@ std::optional<std::string_view> Snapshot::operation() const {
7075
return std::nullopt;
7176
}
7277

78+
Snapshot::Snapshot() = default;
79+
80+
Snapshot::~Snapshot() = default;
81+
7382
bool Snapshot::Equals(const Snapshot& other) const {
7483
if (this == &other) {
7584
return true;
@@ -80,4 +89,51 @@ bool Snapshot::Equals(const Snapshot& other) const {
8089
schema_id == other.schema_id;
8190
}
8291

92+
Result<Snapshot::ManifestsCache> Snapshot::InitManifestsCache(
93+
const Snapshot& self, std::shared_ptr<FileIO> file_io) {
94+
if (file_io == nullptr) {
95+
return InvalidArgument("Cannot cache manifests: FileIO is null");
96+
}
97+
98+
// Read manifest list
99+
ICEBERG_ASSIGN_OR_RAISE(auto reader,
100+
ManifestListReader::Make(self.manifest_list, file_io));
101+
ICEBERG_ASSIGN_OR_RAISE(auto manifest_files, reader->Files());
102+
103+
ManifestsCache cache;
104+
cache.all_manifests.reserve(manifest_files.size());
105+
for (const auto& manifest_file : manifest_files) {
106+
cache.all_manifests.push_back(manifest_file);
107+
}
108+
109+
// Filter by content type
110+
for (const auto& manifest : cache.all_manifests) {
111+
if (manifest.content == ManifestContent::kData) {
112+
cache.data_manifests.push_back(manifest);
113+
} else if (manifest.content == ManifestContent::kDeletes) {
114+
cache.delete_manifests.push_back(manifest);
115+
}
116+
}
117+
118+
return cache;
119+
}
120+
121+
Result<std::span<ManifestFile>> Snapshot::Manifests(
122+
std::shared_ptr<FileIO> file_io) const {
123+
ICEBERG_ASSIGN_OR_RAISE(auto cache_ref, manifests_cache_.Get(*this, file_io));
124+
return std::span<ManifestFile>(cache_ref.get().all_manifests);
125+
}
126+
127+
Result<std::span<ManifestFile>> Snapshot::DataManifests(
128+
std::shared_ptr<FileIO> file_io) const {
129+
ICEBERG_ASSIGN_OR_RAISE(auto cache_ref, manifests_cache_.Get(*this, file_io));
130+
return std::span<ManifestFile>(cache_ref.get().data_manifests);
131+
}
132+
133+
Result<std::span<ManifestFile>> Snapshot::DeleteManifests(
134+
std::shared_ptr<FileIO> file_io) const {
135+
ICEBERG_ASSIGN_OR_RAISE(auto cache_ref, manifests_cache_.Get(*this, file_io));
136+
return std::span<ManifestFile>(cache_ref.get().delete_manifests);
137+
}
138+
83139
} // namespace iceberg

src/iceberg/snapshot.h

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@
1919

2020
#pragma once
2121

22+
#include <memory>
2223
#include <optional>
24+
#include <span>
2325
#include <string>
2426
#include <string_view>
2527
#include <unordered_map>
@@ -28,6 +30,8 @@
2830

2931
#include "iceberg/iceberg_export.h"
3032
#include "iceberg/result.h"
33+
#include "iceberg/type_fwd.h"
34+
#include "iceberg/util/lazy.h"
3135
#include "iceberg/util/timepoint.h"
3236

3337
namespace iceberg {
@@ -250,14 +254,57 @@ struct ICEBERG_EXPORT Snapshot {
250254
/// unknown.
251255
std::optional<std::string_view> operation() const;
252256

257+
/// \brief Returns all ManifestFile instances for either data or delete manifests
258+
/// in this snapshot.
259+
///
260+
/// \param file_io The FileIO instance to use for reading the manifest list
261+
/// \param content The content type of the manifests to return
262+
/// \return A span of ManifestFile instances, or an error
263+
Result<std::span<ManifestFile>> Manifests(std::shared_ptr<FileIO> file_io) const;
264+
265+
/// \brief Returns a ManifestFile for each data manifest in this snapshot.
266+
///
267+
/// \param file_io The FileIO instance to use for reading the manifest list
268+
/// \return A span of ManifestFile instances, or an error
269+
Result<std::span<ManifestFile>> DataManifests(std::shared_ptr<FileIO> file_io) const;
270+
271+
/// \brief Returns a ManifestFile for each delete manifest in this snapshot.
272+
///
273+
/// \param file_io The FileIO instance to use for reading the manifest list
274+
/// \return A span of ManifestFile instances, or an error
275+
Result<std::span<ManifestFile>> DeleteManifests(std::shared_ptr<FileIO> file_io) const;
276+
253277
/// \brief Compare two snapshots for equality.
254278
friend bool operator==(const Snapshot& lhs, const Snapshot& rhs) {
255279
return lhs.Equals(rhs);
256280
}
257281

282+
/// \brief Default constructor
283+
Snapshot();
284+
285+
/// \brief Destructor
286+
~Snapshot();
287+
258288
private:
259289
/// \brief Compare two snapshots for equality.
260290
bool Equals(const Snapshot& other) const;
291+
292+
/// \brief Cache structure for storing loaded manifests
293+
struct ManifestsCache {
294+
std::vector<ManifestFile> all_manifests;
295+
std::vector<ManifestFile> data_manifests;
296+
std::vector<ManifestFile> delete_manifests;
297+
};
298+
299+
/// \brief Initialize manifests cache by loading them from the manifest list file.
300+
/// \param self The snapshot instance
301+
/// \param file_io The FileIO instance to use for reading the manifest list
302+
/// \return A result containing the manifests cache
303+
static Result<ManifestsCache> InitManifestsCache(const Snapshot& self,
304+
std::shared_ptr<FileIO> file_io);
305+
306+
/// Lazy-loaded manifests cache
307+
mutable Lazy<InitManifestsCache> manifests_cache_;
261308
};
262309

263310
} // namespace iceberg

src/iceberg/test/in_memory_catalog_test.cc

Lines changed: 24 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -132,40 +132,38 @@ TEST_F(InMemoryCatalogTest, RefreshTable) {
132132
auto io = std::make_shared<MockFileIO>();
133133
auto catalog = std::make_shared<MockCatalog>();
134134
// Mock 1st call to LoadTable
135+
auto snapshot1 = std::make_shared<Snapshot>();
136+
snapshot1->snapshot_id = 1;
137+
snapshot1->sequence_number = 1;
135138
EXPECT_CALL(*catalog, LoadTable(::testing::_))
136-
.WillOnce(::testing::Return(
137-
Table::Make(table_ident,
138-
std::make_shared<TableMetadata>(
139-
TableMetadata{.schemas = {schema},
140-
.current_schema_id = 1,
141-
.current_snapshot_id = 1,
142-
.snapshots = {std::make_shared<Snapshot>(Snapshot{
143-
.snapshot_id = 1,
144-
.sequence_number = 1,
145-
})}}),
146-
"s3://location/1.json", io, catalog)));
139+
.WillOnce(::testing::Return(Table::Make(
140+
table_ident,
141+
std::make_shared<TableMetadata>(TableMetadata{.schemas = {schema},
142+
.current_schema_id = 1,
143+
.current_snapshot_id = 1,
144+
.snapshots = {snapshot1}}),
145+
"s3://location/1.json", io, catalog)));
147146
auto load_table_result = catalog->LoadTable(table_ident);
148147
ASSERT_THAT(load_table_result, IsOk());
149148
auto loaded_table = std::move(load_table_result.value());
150149
ASSERT_EQ(loaded_table->current_snapshot().value()->snapshot_id, 1);
151150

152151
// Mock 2nd call to LoadTable
152+
auto snapshot2a = std::make_shared<Snapshot>();
153+
snapshot2a->snapshot_id = 1;
154+
snapshot2a->sequence_number = 1;
155+
auto snapshot2b = std::make_shared<Snapshot>();
156+
snapshot2b->snapshot_id = 2;
157+
snapshot2b->sequence_number = 2;
153158
EXPECT_CALL(*catalog, LoadTable(::testing::_))
154-
.WillOnce(::testing::Return(
155-
Table::Make(table_ident,
156-
std::make_shared<TableMetadata>(
157-
TableMetadata{.schemas = {schema},
158-
.current_schema_id = 1,
159-
.current_snapshot_id = 2,
160-
.snapshots = {std::make_shared<Snapshot>(Snapshot{
161-
.snapshot_id = 1,
162-
.sequence_number = 1,
163-
}),
164-
std::make_shared<Snapshot>(Snapshot{
165-
.snapshot_id = 2,
166-
.sequence_number = 2,
167-
})}}),
168-
"s3://location/2.json", io, catalog)));
159+
.WillOnce(
160+
::testing::Return(Table::Make(table_ident,
161+
std::make_shared<TableMetadata>(TableMetadata{
162+
.schemas = {schema},
163+
.current_schema_id = 1,
164+
.current_snapshot_id = 2,
165+
.snapshots = {snapshot2a, snapshot2b}}),
166+
"s3://location/2.json", io, catalog)));
169167
auto refreshed_result = loaded_table->Refresh();
170168
ASSERT_THAT(refreshed_result, IsOk());
171169
// check table is refreshed

src/iceberg/test/json_internal_test.cc

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -206,13 +206,14 @@ TEST(JsonInternalTest, Snapshot) {
206206
{SnapshotSummaryFields::kOperation, DataOperation::kAppend},
207207
{SnapshotSummaryFields::kAddedDataFiles, "50"}};
208208

209-
Snapshot snapshot{.snapshot_id = 1234567890,
210-
.parent_snapshot_id = 9876543210,
211-
.sequence_number = 99,
212-
.timestamp_ms = TimePointMsFromUnixMs(1234567890123).value(),
213-
.manifest_list = "/path/to/manifest_list",
214-
.summary = summary,
215-
.schema_id = 42};
209+
Snapshot snapshot;
210+
snapshot.snapshot_id = 1234567890;
211+
snapshot.parent_snapshot_id = 9876543210;
212+
snapshot.sequence_number = 99;
213+
snapshot.timestamp_ms = TimePointMsFromUnixMs(1234567890123).value();
214+
snapshot.manifest_list = "/path/to/manifest_list";
215+
snapshot.summary = summary;
216+
snapshot.schema_id = 42;
216217

217218
// Create a JSON object with the expected values
218219
nlohmann::json expected_json =

src/iceberg/test/metadata_io_test.cc

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,13 @@ class MetadataIOTest : public TempFileTestBase {
6060
/*optional=*/false);
6161
auto schema = std::make_shared<Schema>(std::move(schema_fields), /*schema_id=*/1);
6262

63+
auto snapshot = std::make_shared<Snapshot>();
64+
snapshot->snapshot_id = 3051729675574597004;
65+
snapshot->sequence_number = 0;
66+
snapshot->timestamp_ms = TimePointMsFromUnixMs(1515100955770).value();
67+
snapshot->manifest_list = "s3://a/b/1.avro";
68+
snapshot->summary = {{"operation", "append"}};
69+
6370
return TableMetadata{.format_version = 1,
6471
.table_uuid = "1234567890",
6572
.location = location_,
@@ -71,13 +78,7 @@ class MetadataIOTest : public TempFileTestBase {
7178
.last_partition_id = 0,
7279
.properties = TableProperties::FromMap({{"key", "value"}}),
7380
.current_snapshot_id = 3051729675574597004,
74-
.snapshots = {std::make_shared<Snapshot>(Snapshot{
75-
.snapshot_id = 3051729675574597004,
76-
.sequence_number = 0,
77-
.timestamp_ms = TimePointMsFromUnixMs(1515100955770).value(),
78-
.manifest_list = "s3://a/b/1.avro",
79-
.summary = {{"operation", "append"}},
80-
})},
81+
.snapshots = {snapshot},
8182
.sort_orders = {SortOrder::Unsorted()},
8283
.default_sort_order_id = 0,
8384
.next_row_id = 0};

src/iceberg/test/metadata_serde_test.cc

Lines changed: 31 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -166,23 +166,21 @@ TEST(MetadataSerdeTest, DeserializeV2Valid) {
166166

167167
auto expected_sort_order = std::shared_ptr<SortOrder>(std::move(sort_order));
168168

169-
auto expected_snapshot_1 = std::make_shared<Snapshot>(Snapshot{
170-
.snapshot_id = 3051729675574597004,
171-
.sequence_number = 0,
172-
.timestamp_ms = TimePointMsFromUnixMs(1515100955770).value(),
173-
.manifest_list = "s3://a/b/1.avro",
174-
.summary = {{"operation", "append"}},
175-
});
176-
177-
auto expected_snapshot_2 = std::make_shared<Snapshot>(Snapshot{
178-
.snapshot_id = 3055729675574597004,
179-
.parent_snapshot_id = 3051729675574597004,
180-
.sequence_number = 1,
181-
.timestamp_ms = TimePointMsFromUnixMs(1555100955770).value(),
182-
.manifest_list = "s3://a/b/2.avro",
183-
.summary = {{"operation", "append"}},
184-
.schema_id = 1,
185-
});
169+
auto expected_snapshot_1 = std::make_shared<Snapshot>();
170+
expected_snapshot_1->snapshot_id = 3051729675574597004;
171+
expected_snapshot_1->sequence_number = 0;
172+
expected_snapshot_1->timestamp_ms = TimePointMsFromUnixMs(1515100955770).value();
173+
expected_snapshot_1->manifest_list = "s3://a/b/1.avro";
174+
expected_snapshot_1->summary = {{"operation", "append"}};
175+
176+
auto expected_snapshot_2 = std::make_shared<Snapshot>();
177+
expected_snapshot_2->snapshot_id = 3055729675574597004;
178+
expected_snapshot_2->parent_snapshot_id = 3051729675574597004;
179+
expected_snapshot_2->sequence_number = 1;
180+
expected_snapshot_2->timestamp_ms = TimePointMsFromUnixMs(1555100955770).value();
181+
expected_snapshot_2->manifest_list = "s3://a/b/2.avro";
182+
expected_snapshot_2->summary = {{"operation", "append"}};
183+
expected_snapshot_2->schema_id = 1;
186184

187185
TableMetadata expected{
188186
.format_version = 2,
@@ -294,14 +292,13 @@ TEST(MetadataSerdeTest, DeserializeStatisticsFiles) {
294292
auto expected_spec =
295293
std::shared_ptr<PartitionSpec>(std::move(expected_spec_result.value()));
296294

297-
auto expected_snapshot = std::make_shared<Snapshot>(Snapshot{
298-
.snapshot_id = 3055729675574597004,
299-
.sequence_number = 1,
300-
.timestamp_ms = TimePointMsFromUnixMs(1555100955770).value(),
301-
.manifest_list = "s3://a/b/2.avro",
302-
.summary = {{"operation", "append"}},
303-
.schema_id = 0,
304-
});
295+
auto expected_snapshot = std::make_shared<Snapshot>();
296+
expected_snapshot->snapshot_id = 3055729675574597004;
297+
expected_snapshot->sequence_number = 1;
298+
expected_snapshot->timestamp_ms = TimePointMsFromUnixMs(1555100955770).value();
299+
expected_snapshot->manifest_list = "s3://a/b/2.avro";
300+
expected_snapshot->summary = {{"operation", "append"}};
301+
expected_snapshot->schema_id = 0;
305302

306303
auto expected_stats_file = std::make_shared<StatisticsFile>(StatisticsFile{
307304
.snapshot_id = 3055729675574597004,
@@ -355,6 +352,14 @@ TEST(MetadataSerdeTest, DeserializePartitionStatisticsFiles) {
355352
auto metadata,
356353
ReadTableMetadataFromResource("TableMetadataPartitionStatisticsFiles.json"));
357354

355+
auto snapshot = std::make_shared<Snapshot>();
356+
snapshot->snapshot_id = 3055729675574597004;
357+
snapshot->sequence_number = 1;
358+
snapshot->timestamp_ms = TimePointMsFromUnixMs(1555100955770).value();
359+
snapshot->manifest_list = "s3://a/b/2.avro";
360+
snapshot->summary = {{"operation", "append"}};
361+
snapshot->schema_id = 0;
362+
358363
TableMetadata expected{
359364
.format_version = 2,
360365
.table_uuid = "9c12d441-03fe-4693-9a96-a0705ddf69c1",
@@ -372,14 +377,7 @@ TEST(MetadataSerdeTest, DeserializePartitionStatisticsFiles) {
372377
.last_partition_id = 1000,
373378
.properties = {},
374379
.current_snapshot_id = 3055729675574597004,
375-
.snapshots = {std::make_shared<Snapshot>(Snapshot{
376-
.snapshot_id = 3055729675574597004,
377-
.sequence_number = 1,
378-
.timestamp_ms = TimePointMsFromUnixMs(1555100955770).value(),
379-
.manifest_list = "s3://a/b/2.avro",
380-
.summary = {{"operation", "append"}},
381-
.schema_id = 0,
382-
})},
380+
.snapshots = {snapshot},
383381
.snapshot_log = {},
384382
.metadata_log = {},
385383
.sort_orders = {SortOrder::Unsorted()},

0 commit comments

Comments
 (0)