From c656612942f609788507c2b85aaf429df9462dd3 Mon Sep 17 00:00:00 2001 From: Yura Sorokin Date: Fri, 20 Feb 2026 01:44:05 +0100 Subject: [PATCH] PS-10479 fix: Fetch operation fails with gtid replication mode https://perconadev.atlassian.net/browse/PS-10479 Fixed problem with Binlog Server not being able to resume streaming to an existing partially downloaded binlog file in GTID replication mode. 'binsev::event::reader_context' class constructor now accepts two more parameters: 'binlog_name' / 'position'. We pass 'storage.get_current_binlog_name()' and 'storage.get_current_position()' when the object of this class is constructed inside the 'receive_binlog_events()' function. This helps to pass the information about the last used binlog file position in GTID-based replication mode, which wasn't possible otherwise. We also added additional validations to the 'process_event_in_rotate_artificial_expected_state()' internal method. Simplified 'expect_ignorable_preamble_events' logic inside the 'binsev::event::reader_context' class - threre is no need to set this value externally anymore (it is now simply calculated based on the 'binlog_name' / 'position' arguments passed to the constructor). Fixed problem with the 'binlog_streaming.resume_streaming' and 'binlog_streaming.search_by_timestamp' MTR test cases that were not able to properly detect GTID mode based on the value of the '@@global.gtid_mode' system variable. Because of that those tests were never running Binlog Server in GTID-based replication mode. --- mtr/binlog_streaming/t/resume_streaming.test | 2 +- .../t/search_by_timestamp.test | 2 +- src/app.cpp | 24 ++---- src/binsrv/event/reader_context.cpp | 83 +++++++++++++++---- src/binsrv/event/reader_context.hpp | 13 +-- 5 files changed, 78 insertions(+), 46 deletions(-) diff --git a/mtr/binlog_streaming/t/resume_streaming.test b/mtr/binlog_streaming/t/resume_streaming.test index 4b1ddb3..f59d60a 100644 --- a/mtr/binlog_streaming/t/resume_streaming.test +++ b/mtr/binlog_streaming/t/resume_streaming.test @@ -21,7 +21,7 @@ eval $stmt_reset_binary_logs_and_gtids; --let $binsrv_read_timeout = 60 --let $binsrv_idle_time = 10 --let $binsrv_verify_checksum = TRUE ---let $binsrv_replication_mode = `SELECT IF(@@global.gtid_mode, 'gtid', 'position')` +--let $binsrv_replication_mode = `SELECT IF(@@global.gtid_mode = 'ON', 'gtid', 'position')` if ($extracted_init_connect_variable_value == 'buffered') { --let $binsrv_checkpoint_size = 1G diff --git a/mtr/binlog_streaming/t/search_by_timestamp.test b/mtr/binlog_streaming/t/search_by_timestamp.test index 94dd25d..05cee27 100644 --- a/mtr/binlog_streaming/t/search_by_timestamp.test +++ b/mtr/binlog_streaming/t/search_by_timestamp.test @@ -21,7 +21,7 @@ eval $stmt_reset_binary_logs_and_gtids; --let $binsrv_read_timeout = 60 --let $binsrv_idle_time = 10 --let $binsrv_verify_checksum = TRUE ---let $binsrv_replication_mode = `SELECT IF(@@global.gtid_mode, 'gtid', 'position')` +--let $binsrv_replication_mode = `SELECT IF(@@global.gtid_mode = 'ON', 'gtid', 'position')` --let $binsrv_checkpoint_size = 1 --source ../include/set_up_binsrv_environment.inc diff --git a/src/app.cpp b/src/app.cpp index 5483e30..2eb0ed3 100644 --- a/src/app.cpp +++ b/src/app.cpp @@ -377,7 +377,6 @@ void log_span_dump(binsrv::basic_logger &logger, void process_artificial_rotate_event(const binsrv::event::event ¤t_event, binsrv::basic_logger &logger, - binsrv::event::reader_context &context, binsrv::storage &storage) { assert(current_event.get_common_header().get_type_code() == binsrv::event::code_type::rotate); @@ -414,10 +413,6 @@ void process_artificial_rotate_event(const binsrv::event::event ¤t_event, } binlog_opening_needed = false; - // after reusing the existing storage binlog file, we should instruct - // the reader context to mark the upcoming FDE and PREVIOUS_GTIDS_LOG - // events as info-only - context.set_expect_ignorable_preamble_events(); const std::string current_binlog_name{storage.get_current_binlog_name()}; logger.log(binsrv::log_severity::info, @@ -440,16 +435,6 @@ void process_artificial_rotate_event(const binsrv::event::event ¤t_event, const auto binlog_open_result{ storage.open_binlog(current_rotate_body.get_binlog())}; - // we also need to instruct the reader context that we opened an - // existing file (the one that was neither empty nor just had the - // magic payload writtent to it), so that it would mark the upcoming FDE - // and PREVIOUS_GTIDS_LOG events as info-only - - if (binlog_open_result == - binsrv::open_binlog_status::opened_with_data_present) { - context.set_expect_ignorable_preamble_events(); - } - std::string message{"storage: "}; if (binlog_open_result == binsrv::open_binlog_status::created) { message += "created a new"; @@ -489,7 +474,7 @@ void process_binlog_event(const binsrv::event::event ¤t_event, // processing the very first event in the sequence - artificial ROTATE event if (code == binsrv::event::code_type::rotate && is_artificial) { - process_artificial_rotate_event(current_event, logger, context, storage); + process_artificial_rotate_event(current_event, logger, storage); } // checking if the event needs to be written to the binlog @@ -588,9 +573,10 @@ void receive_binlog_events( util::const_byte_span portion; - binsrv::event::reader_context context{connection.get_server_version(), - verify_checksum, - storage.get_replication_mode()}; + binsrv::event::reader_context context{ + connection.get_server_version(), verify_checksum, + storage.get_replication_mode(), storage.get_current_binlog_name(), + static_cast(storage.get_current_position())}; bool fetch_result{}; diff --git a/src/binsrv/event/reader_context.cpp b/src/binsrv/event/reader_context.cpp index 018fda5..ba1db9b 100644 --- a/src/binsrv/event/reader_context.cpp +++ b/src/binsrv/event/reader_context.cpp @@ -20,6 +20,7 @@ #include #include #include +#include #include #include "binsrv/replication_mode_type.hpp" @@ -36,9 +37,14 @@ namespace binsrv::event { reader_context::reader_context(std::uint32_t encoded_server_version, bool verify_checksum, - replication_mode_type replication_mode) + replication_mode_type replication_mode, + std::string_view binlog_name, + std::uint32_t position) : encoded_server_version_{encoded_server_version}, verify_checksum_{verify_checksum}, replication_mode_{replication_mode}, + binlog_name_{binlog_name}, + position_{position == 0U ? static_cast(magic_binlog_offset) + : position}, post_header_lengths_{ get_hardcoded_post_header_lengths(encoded_server_version_)} {} @@ -107,16 +113,61 @@ reader_context::process_event_in_rotate_artificial_expected_state( "non-zero next event position found in an artificial rotate event"); } - // we expect an artificial rotate event to be the very first event in the - // newly-created binlog file - if (position_ != 0U) { - util::exception_location().raise( - "artificial rotate event is not the very first event in a binlog " - "file"); + const auto ¤t_post_header{ + current_event.get_post_header()}; + const auto ¤t_body{current_event.get_body()}; + if (replication_mode_ == replication_mode_type::position) { + if (current_body.get_binlog() == binlog_name_) { + // in position-based replication mode, when we continue streaming to the + // same binlog file, we expect the artificial rotate event to have the + // same position as the one supplied to the constructor + if (current_post_header.get_position_raw() != position_) { + util::exception_location().raise( + "unexpected position found in an artificial rotate event in " + "position-based replication mode"); + } + } else { + // in position-based replication mode, when we start streaming to a new + // binlog file, we expect the artificial rotate event + // to have the position equal to "magic_offset" (4) + if (current_post_header.get_position_raw() != magic_binlog_offset) { + util::exception_location().raise( + "unexpected position found in an artificial rotate event in " + "position-based replication mode"); + } + } + } else { + // in GTID-based replication mode we expect the artificial rotate event to + // always have position set to "magic_offset" (4) + if (current_post_header.get_position_raw() != magic_binlog_offset) { + util::exception_location().raise( + "unexpected position found in an artificial rotate event in " + "GTID-based replication mode"); + } } + if (current_body.get_binlog() != binlog_name_) { + // in the case when binlog name in the artificial rotate event does not + // match the one specified in the last saved one, we should update it here + // and reset the position to "magic_offset" (4) - position_ = static_cast( - current_event.get_post_header().get_position_raw()); + // please notice that the very first time the reader_context is created + // 'binlog_name' is initialized with the 'storage.get_current_binlog_name()' + // value + + // also, when the storage objects is created on an empty directory, + // 'storage.get_current_binlog_name()' returns an empty string which will + // never be equal to any real binlog name + binlog_name_ = current_body.get_binlog(); + reset_position(); + } + + // whether we should expect info-only FORMAT_DESCRIPTION and optional + // PREVIOUS_GTIDS_LOG events depends on whether we resumed streaming to an + // existing non-empty binlog file or not + if (current_body.get_binlog() == binlog_name_ && + position_ > magic_binlog_offset) { + expect_ignorable_preamble_events_ = true; + } info_only_event_ = true; // transition to the next state @@ -208,9 +259,9 @@ reader_context::process_event_in_format_description_expected_state( // ... // in other words, in GTID-based mode there is no way to distinguish whether - // the FDE / PREVIOUS_GTIDS_LOG is pseudo and should not be written, or not - - // that is why we rely only on externally supplied - // "start_from_new_binlog_file" constructor's argument + // the FDE / PREVIOUS_GTIDS_LOG should be written to the binlog file or + // not - that is why we rely only on the value of the + // "expect_ignorable_preamble_events" calculated previously info_only_event_ = expect_ignorable_preamble_events_; if (replication_mode_ == replication_mode_type::position && info_only_event_) { @@ -224,10 +275,10 @@ reader_context::process_event_in_format_description_expected_state( validate_position_and_advance(common_header); } - // transition to the next state: // the next expected event is PREVIOUS_GTIDS_LOG, unless we are in - // position-based replication mode and this we resumed streaming from the - // middle of a binlog file + // position-based replication mode and we resumed streaming to an + // existing non-empty binlog file, in which case we expect the next event + // to be one of the GTID_LOG events if (replication_mode_ == replication_mode_type::position && info_only_event_) { state_ = state_type::gtid_log_expected; @@ -448,7 +499,7 @@ void reader_context::validate_position_and_advance( position_ = common_header.get_next_event_position_raw(); } -void reader_context::reset_position() { position_ = 0U; } +void reader_context::reset_position() { position_ = magic_binlog_offset; } void reader_context::start_transaction(const event ¤t_event) { switch (current_event.get_common_header().get_type_code()) { diff --git a/src/binsrv/event/reader_context.hpp b/src/binsrv/event/reader_context.hpp index c1c5d91..002e186 100644 --- a/src/binsrv/event/reader_context.hpp +++ b/src/binsrv/event/reader_context.hpp @@ -35,7 +35,8 @@ class [[nodiscard]] reader_context { public: reader_context(std::uint32_t encoded_server_version, bool verify_checksum, - replication_mode_type replication_mode); + replication_mode_type replication_mode, + std::string_view binlog_name, std::uint32_t position); [[nodiscard]] std::uint32_t get_current_encoded_server_version() const noexcept { @@ -47,9 +48,6 @@ class [[nodiscard]] reader_context { [[nodiscard]] std::size_t get_current_post_header_length(code_type code) const noexcept; - [[nodiscard]] std::uint32_t get_current_position() const noexcept { - return position_; - } [[nodiscard]] bool has_transaction_gtid() const noexcept { return !transaction_gtid_.is_empty(); @@ -67,10 +65,6 @@ class [[nodiscard]] reader_context { return info_only_event_; } - void set_expect_ignorable_preamble_events() noexcept { - expect_ignorable_preamble_events_ = true; - } - private: // this class implements the logic of the following state machine // ( @@ -92,8 +86,9 @@ class [[nodiscard]] reader_context { std::uint32_t encoded_server_version_; bool verify_checksum_; replication_mode_type replication_mode_; + std::string binlog_name_; + std::uint32_t position_; post_header_length_container post_header_lengths_{}; - std::uint32_t position_{0U}; gtids::gtid transaction_gtid_{}; std::uint32_t expected_transaction_length_{0U};