Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/app.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -720,7 +720,8 @@ bool handle_search_by_timestamp(std::string_view config_file_path,
break;
}
response.add_record(record.name, record.size,
storage.get_binlog_uri(record.name), record.gtids,
storage.get_binlog_uri(record.name),
record.previous_gtids, record.added_gtids,
record.timestamps.get_min_timestamp().get_value(),
record.timestamps.get_max_timestamp().get_value());
}
Expand Down
2 changes: 1 addition & 1 deletion src/binsrv/binlog_file_metadata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
namespace binsrv {

binlog_file_metadata::binlog_file_metadata()
: impl_{{expected_binlog_file_metadata_version}, {}, {}, {}, {}} {}
: impl_{{expected_binlog_file_metadata_version}, {}, {}, {}, {}, {}} {}

binlog_file_metadata::binlog_file_metadata(std::string_view data) : impl_{} {
auto json_value = boost::json::parse(data);
Expand Down
7 changes: 2 additions & 5 deletions src/binsrv/binlog_file_metadata.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ class [[nodiscard]] binlog_file_metadata {
// clang-format off
util::nv<"version", std::uint32_t>,
util::nv<"size", std::uint64_t>,
util::nv<"gtids", gtids::optional_gtid_set>,
util::nv<"previous_gtids", gtids::optional_gtid_set>,
util::nv<"added_gtids", gtids::optional_gtid_set>,
util::nv<"min_timestamp", ctime_timestamp>,
util::nv<"max_timestamp", ctime_timestamp>
// clang-format on
Expand All @@ -52,10 +53,6 @@ class [[nodiscard]] binlog_file_metadata {
[[nodiscard]] auto &root() noexcept { return impl_; }
[[nodiscard]] const auto &root() const noexcept { return impl_; }

[[nodiscard]] bool has_gtids() const noexcept {
return impl_.get<"gtids">().has_value();
}

private:
impl_type impl_;

Expand Down
4 changes: 3 additions & 1 deletion src/binsrv/gtids/gtid_set.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -695,7 +695,9 @@ std::ostream &operator<<(std::ostream &output, const gtid_set &obj) {

std::istream &operator>>(std::istream &input, gtid_set &obj) {
std::string gtids_str;
input >> gtids_str;
if (input.peek() != std::istream::traits_type::eof()) {
std::getline(input, gtids_str);
}
if (!input) {
return input;
}
Expand Down
3 changes: 2 additions & 1 deletion src/binsrv/models/binlog_file_record.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ struct [[nodiscard]] binlog_file_record
util::nv<"name", std::string>,
util::nv<"size", std::uint64_t>,
util::nv<"uri", std::string>,
util::nv<"gtids", gtids::optional_gtid_set>,
util::nv<"previous_gtids", gtids::optional_gtid_set>,
util::nv<"added_gtids", gtids::optional_gtid_set>,
util::nv<"min_timestamp", ctime_timestamp>,
util::nv<"max_timestamp", ctime_timestamp>
// clang-format on
Expand Down
6 changes: 4 additions & 2 deletions src/binsrv/models/search_by_timestamp_response.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,14 @@ search_by_timestamp_response::~search_by_timestamp_response() = default;

void search_by_timestamp_response::add_record(
std::string_view name, std::uint64_t size, std::string_view uri,
const gtids::optional_gtid_set &gtids, std::time_t min_timestamp,
gtids::optional_gtid_set previous_gtids,
gtids::optional_gtid_set added_gtids, std::time_t min_timestamp,
std::time_t max_timestamp) {
binlog_file_record record{{{std::string{name}},
{size},
{std::string{uri}},
{gtids},
{std::move(previous_gtids)},
{std::move(added_gtids)},
{ctime_timestamp{min_timestamp}},
{ctime_timestamp{max_timestamp}}}};
impl_.template get<"result">().emplace_back(std::move(record));
Expand Down
5 changes: 4 additions & 1 deletion src/binsrv/models/search_by_timestamp_response.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,11 @@ class [[nodiscard]] search_by_timestamp_response {

[[nodiscard]] auto &root() noexcept { return impl_; }

// 'previous_gtids' and 'added_gtids' are deliberately taken by value as we
// are going to move from them
void add_record(std::string_view name, std::uint64_t size,
std::string_view uri, const gtids::optional_gtid_set &gtids,
std::string_view uri, gtids::optional_gtid_set previous_gtids,
gtids::optional_gtid_set added_gtids,
std::time_t min_timestamp, std::time_t max_timestamp);

private:
Expand Down
49 changes: 37 additions & 12 deletions src/binsrv/storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,13 +184,16 @@ storage::open_binlog(std::string_view binlog_name) {
backend_->write_data_to_stream(event::magic_binlog_payload);

gtids::optional_gtid_set previous_binlog_gtids{};
gtids::optional_gtid_set added_binlog_gtids{};
if (is_in_gtid_replication_mode()) {
previous_binlog_gtids = get_gtids();
added_binlog_gtids = gtids::gtid_set{};
}

binlog_records_.emplace_back(
std::string{binlog_name}, event::magic_binlog_offset,
std::move(previous_binlog_gtids), ctime_timestamp_range{});
std::move(previous_binlog_gtids), std::move(added_binlog_gtids),
ctime_timestamp_range{});
save_binlog_metadata(get_current_binlog_record());
save_binlog_index();
result = open_binlog_status::created;
Expand Down Expand Up @@ -339,9 +342,9 @@ void storage::flush_event_buffer_internal() {
get_current_binlog_record().size +=
last_transaction_boundary_position_in_event_buffer_;
if (is_in_gtid_replication_mode()) {
auto &optional_gtids{get_current_binlog_record().gtids};
if (optional_gtids.has_value()) {
optional_gtids.value() += gtids_in_event_buffer_;
auto &optional_added_gtids{get_current_binlog_record().added_gtids};
if (optional_added_gtids.has_value()) {
optional_added_gtids.value() += gtids_in_event_buffer_;
}
}
get_current_binlog_record().timestamps.add_range(ready_to_flush_timestamps_);
Expand Down Expand Up @@ -392,8 +395,15 @@ void storage::load_binlog_index() {
util::exception_location().raise<std::logic_error>(
"binlog index contains a duplicate entry");
}
binlog_records_.emplace_back(current_binlog_name, 0ULL, gtids::gtid_set{},
ctime_timestamp_range{});
gtids::optional_gtid_set previous_binlog_gtids{};
gtids::optional_gtid_set added_binlog_gtids{};
if (is_in_gtid_replication_mode()) {
previous_binlog_gtids = gtids::gtid_set{};
added_binlog_gtids = gtids::gtid_set{};
}
binlog_records_.emplace_back(
current_binlog_name, 0ULL, std::move(previous_binlog_gtids),
std::move(added_binlog_gtids), ctime_timestamp_range{});
}
}

Expand Down Expand Up @@ -464,22 +474,36 @@ storage::load_binlog_metadata(std::string_view binlog_name) const {

return binlog_record{.name = std::string(binlog_name),
.size = metadata.root().get<"size">(),
.gtids = metadata.root().get<"gtids">(),
.previous_gtids =
metadata.root().get<"previous_gtids">(),
.added_gtids = metadata.root().get<"added_gtids">(),
.timestamps = {metadata.root().get<"min_timestamp">(),
metadata.root().get<"max_timestamp">()}};
}

void storage::validate_binlog_metadata(const binlog_record &record) const {
if (is_in_gtid_replication_mode()) {
if (!record.gtids.has_value()) {
if (!record.previous_gtids.has_value()) {
util::exception_location().raise<std::logic_error>(
"missing previous GTID set in the binlog metadata while in GTID "
"replication "
"mode");
}
if (!record.added_gtids.has_value()) {
util::exception_location().raise<std::logic_error>(
"missing GTID set in the binlog metadata while in GTID replication "
"missing added GTID set in the binlog metadata while in GTID "
"replication "
"mode");
}
} else {
if (record.gtids.has_value()) {
if (record.previous_gtids.has_value()) {
util::exception_location().raise<std::logic_error>(
"found previous GTID set in the binlog metadata while in position "
"replication mode");
}
if (record.added_gtids.has_value()) {
util::exception_location().raise<std::logic_error>(
"found GTID set in the binlog metadata while in position "
"found added GTID set in the binlog metadata while in position "
"replication mode");
}
}
Expand All @@ -488,7 +512,8 @@ void storage::validate_binlog_metadata(const binlog_record &record) const {
void storage::save_binlog_metadata(const binlog_record &record) const {
binlog_file_metadata metadata{};
metadata.root().get<"size">() = record.size;
metadata.root().get<"gtids">() = record.gtids;
metadata.root().get<"previous_gtids">() = record.previous_gtids;
metadata.root().get<"added_gtids">() = record.added_gtids;
metadata.root().get<"min_timestamp">() =
ctime_timestamp{record.timestamps.get_min_timestamp()};
metadata.root().get<"max_timestamp">() =
Expand Down
24 changes: 15 additions & 9 deletions src/binsrv/storage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ class [[nodiscard]] storage {
struct binlog_record {
std::string name;
std::uint64_t size{0ULL};
gtids::optional_gtid_set gtids{};
gtids::optional_gtid_set previous_gtids{};
gtids::optional_gtid_set added_gtids{};
ctime_timestamp_range timestamps{};
};
using binlog_record_container = std::vector<binlog_record>;
Expand Down Expand Up @@ -88,15 +89,22 @@ class [[nodiscard]] storage {
return get_flushed_position() + std::size(event_buffer_);
}

[[nodiscard]] const gtids::gtid_set &get_gtids() const noexcept {
[[nodiscard]] gtids::gtid_set get_gtids() const {
gtids::gtid_set result{};
if (is_empty()) {
return empty_gtids_;
return result;
}
const auto &optional_gtids{get_current_binlog_record().gtids};
if (!optional_gtids.has_value()) {
return empty_gtids_;
const auto &optional_previous_gtids{
get_current_binlog_record().previous_gtids};
if (!optional_previous_gtids.has_value()) {
return result;
}
return *optional_gtids;
result = *optional_previous_gtids;
const auto &optional_added_gtids{get_current_binlog_record().added_gtids};
if (optional_added_gtids.has_value()) {
result.add(*optional_added_gtids);
}
return result;
}

[[nodiscard]] static bool
Expand Down Expand Up @@ -129,8 +137,6 @@ class [[nodiscard]] storage {
std::chrono::steady_clock::duration checkpoint_interval_seconds_{};
std::chrono::steady_clock::time_point last_checkpoint_timestamp_{};

gtids::gtid_set empty_gtids_{};

using event_buffer_type = std::vector<std::byte>;
event_buffer_type event_buffer_{};
std::size_t last_transaction_boundary_position_in_event_buffer_{};
Expand Down
12 changes: 8 additions & 4 deletions tests/gtid_set_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,8 @@ BOOST_AUTO_TEST_CASE(GtidSetStreamOperatorEmpty) {

const auto gtids_str{boost::lexical_cast<std::string>(gtids)};
BOOST_CHECK_EQUAL(gtids_str, "");
const auto restored_gtids{binsrv::gtids::gtid_set{gtids_str}};
const auto restored_gtids{
boost::lexical_cast<binsrv::gtids::gtid_set>(gtids_str)};
BOOST_CHECK_EQUAL(gtids, restored_gtids);
}

Expand All @@ -371,7 +372,8 @@ BOOST_AUTO_TEST_CASE(GtidSetStreamOperatorUntagged) {
const auto gtids_str{boost::lexical_cast<std::string>(gtids)};
BOOST_CHECK_EQUAL(gtids_str, "11111111-aaaa-1111-aaaa-111111111111:1-3:5, "
"22222222-bbbb-2222-bbbb-222222222222:11-13:15");
const auto restored_gtids{binsrv::gtids::gtid_set{gtids_str}};
const auto restored_gtids{
boost::lexical_cast<binsrv::gtids::gtid_set>(gtids_str)};
BOOST_CHECK_EQUAL(gtids, restored_gtids);
}

Expand Down Expand Up @@ -411,7 +413,8 @@ BOOST_AUTO_TEST_CASE(GtidSetStreamOperatorTagged) {
"beta:121-123:125, "
"22222222-bbbb-2222-bbbb-222222222222:alpha:211-213:215:"
"beta:221-223:225");
const auto restored_gtids{binsrv::gtids::gtid_set{gtids_str}};
const auto restored_gtids{
boost::lexical_cast<binsrv::gtids::gtid_set>(gtids_str)};
BOOST_CHECK_EQUAL(gtids, restored_gtids);
}

Expand Down Expand Up @@ -461,6 +464,7 @@ BOOST_AUTO_TEST_CASE(GtidSetStreamOperatorMixed) {
"111-113:115:beta:121-123:125, "
"22222222-bbbb-2222-bbbb-222222222222:201-203:205:alpha:"
"211-213:215:beta:221-223:225");
const auto restored_gtids{binsrv::gtids::gtid_set{gtids_str}};
const auto restored_gtids{
boost::lexical_cast<binsrv::gtids::gtid_set>(gtids_str)};
BOOST_CHECK_EQUAL(gtids, restored_gtids);
}