Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion src/iceberg/transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,17 @@ Status Transaction::Apply(std::vector<std::unique_ptr<TableUpdate>> updates) {
}

Result<std::shared_ptr<Table>> Transaction::Commit() {
if (committed_) {
return Invalid("Transaction already committed");
}
if (!last_update_committed_) {
return InvalidArgument(
"Cannot commit transaction when previous update is not committed");
}

const auto& updates = metadata_builder_->changes();
if (updates.empty()) {
committed_ = true;
return table_;
}

Expand All @@ -98,7 +102,14 @@ Result<std::shared_ptr<Table>> Transaction::Commit() {
}

// XXX: we should handle commit failure and retry here.
return table_->catalog()->UpdateTable(table_->name(), requirements, updates);
ICEBERG_ASSIGN_OR_RAISE(auto updated_table, table_->catalog()->UpdateTable(
table_->name(), requirements, updates));

// Mark as committed and update table reference
committed_ = true;
table_ = std::move(updated_table);

return table_;
}

Result<std::shared_ptr<UpdateProperties>> Transaction::NewUpdateProperties() {
Expand Down
2 changes: 2 additions & 0 deletions src/iceberg/transaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ class ICEBERG_EXPORT Transaction : public std::enable_shared_from_this<Transacti
const bool auto_commit_;
// To make the state simple, we require updates are added and committed in order.
bool last_update_committed_ = true;
// Tracks if transaction has been committed to prevent double-commit
bool committed_ = false;
// Keep track of all created pending updates. Use weak_ptr to avoid circular references.
// This is useful to retry failed updates.
std::vector<std::weak_ptr<PendingUpdate>> pending_updates_;
Expand Down
Loading