Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion mtr/binlog_streaming/t/resume_streaming.test
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ eval $stmt_reset_binary_logs_and_gtids;
--let $binsrv_read_timeout = 60
--let $binsrv_idle_time = 10
--let $binsrv_verify_checksum = TRUE
--let $binsrv_replication_mode = `SELECT IF(@@global.gtid_mode, 'gtid', 'position')`
--let $binsrv_replication_mode = `SELECT IF(@@global.gtid_mode = 'ON', 'gtid', 'position')`
if ($extracted_init_connect_variable_value == 'buffered')
{
--let $binsrv_checkpoint_size = 1G
Expand Down
2 changes: 1 addition & 1 deletion mtr/binlog_streaming/t/search_by_timestamp.test
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ eval $stmt_reset_binary_logs_and_gtids;
--let $binsrv_read_timeout = 60
--let $binsrv_idle_time = 10
--let $binsrv_verify_checksum = TRUE
--let $binsrv_replication_mode = `SELECT IF(@@global.gtid_mode, 'gtid', 'position')`
--let $binsrv_replication_mode = `SELECT IF(@@global.gtid_mode = 'ON', 'gtid', 'position')`
--let $binsrv_checkpoint_size = 1
--source ../include/set_up_binsrv_environment.inc

Expand Down
24 changes: 5 additions & 19 deletions src/app.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,6 @@ void log_span_dump(binsrv::basic_logger &logger,

void process_artificial_rotate_event(const binsrv::event::event &current_event,
binsrv::basic_logger &logger,
binsrv::event::reader_context &context,
binsrv::storage &storage) {
assert(current_event.get_common_header().get_type_code() ==
binsrv::event::code_type::rotate);
Expand Down Expand Up @@ -414,10 +413,6 @@ void process_artificial_rotate_event(const binsrv::event::event &current_event,
}

binlog_opening_needed = false;
// after reusing the existing storage binlog file, we should instruct
// the reader context to mark the upcoming FDE and PREVIOUS_GTIDS_LOG
// events as info-only
context.set_expect_ignorable_preamble_events();

const std::string current_binlog_name{storage.get_current_binlog_name()};
logger.log(binsrv::log_severity::info,
Expand All @@ -440,16 +435,6 @@ void process_artificial_rotate_event(const binsrv::event::event &current_event,
const auto binlog_open_result{
storage.open_binlog(current_rotate_body.get_binlog())};

// we also need to instruct the reader context that we opened an
// existing file (the one that was neither empty nor just had the
// magic payload writtent to it), so that it would mark the upcoming FDE
// and PREVIOUS_GTIDS_LOG events as info-only

if (binlog_open_result ==
binsrv::open_binlog_status::opened_with_data_present) {
context.set_expect_ignorable_preamble_events();
}

std::string message{"storage: "};
if (binlog_open_result == binsrv::open_binlog_status::created) {
message += "created a new";
Expand Down Expand Up @@ -489,7 +474,7 @@ void process_binlog_event(const binsrv::event::event &current_event,

// processing the very first event in the sequence - artificial ROTATE event
if (code == binsrv::event::code_type::rotate && is_artificial) {
process_artificial_rotate_event(current_event, logger, context, storage);
process_artificial_rotate_event(current_event, logger, storage);
}

// checking if the event needs to be written to the binlog
Expand Down Expand Up @@ -588,9 +573,10 @@ void receive_binlog_events(

util::const_byte_span portion;

binsrv::event::reader_context context{connection.get_server_version(),
verify_checksum,
storage.get_replication_mode()};
binsrv::event::reader_context context{
connection.get_server_version(), verify_checksum,
storage.get_replication_mode(), storage.get_current_binlog_name(),
static_cast<std::uint32_t>(storage.get_current_position())};

bool fetch_result{};

Expand Down
83 changes: 67 additions & 16 deletions src/binsrv/event/reader_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <cstddef>
#include <cstdint>
#include <stdexcept>
#include <string_view>
#include <utility>

#include "binsrv/replication_mode_type.hpp"
Expand All @@ -36,9 +37,14 @@ namespace binsrv::event {

reader_context::reader_context(std::uint32_t encoded_server_version,
bool verify_checksum,
replication_mode_type replication_mode)
replication_mode_type replication_mode,
std::string_view binlog_name,
std::uint32_t position)
: encoded_server_version_{encoded_server_version},
verify_checksum_{verify_checksum}, replication_mode_{replication_mode},
binlog_name_{binlog_name},
position_{position == 0U ? static_cast<std::uint32_t>(magic_binlog_offset)
: position},
post_header_lengths_{
get_hardcoded_post_header_lengths(encoded_server_version_)} {}

Expand Down Expand Up @@ -107,16 +113,61 @@ reader_context::process_event_in_rotate_artificial_expected_state(
"non-zero next event position found in an artificial rotate event");
}

// we expect an artificial rotate event to be the very first event in the
// newly-created binlog file
if (position_ != 0U) {
util::exception_location().raise<std::logic_error>(
"artificial rotate event is not the very first event in a binlog "
"file");
const auto &current_post_header{
current_event.get_post_header<code_type::rotate>()};
const auto &current_body{current_event.get_body<code_type::rotate>()};
if (replication_mode_ == replication_mode_type::position) {
if (current_body.get_binlog() == binlog_name_) {
// in position-based replication mode, when we continue streaming to the
// same binlog file, we expect the artificial rotate event to have the
// same position as the one supplied to the constructor
if (current_post_header.get_position_raw() != position_) {
util::exception_location().raise<std::logic_error>(
"unexpected position found in an artificial rotate event in "
"position-based replication mode");
}
} else {
// in position-based replication mode, when we start streaming to a new
// binlog file, we expect the artificial rotate event
// to have the position equal to "magic_offset" (4)
if (current_post_header.get_position_raw() != magic_binlog_offset) {
util::exception_location().raise<std::logic_error>(
"unexpected position found in an artificial rotate event in "
"position-based replication mode");
}
}
} else {
// in GTID-based replication mode we expect the artificial rotate event to
// always have position set to "magic_offset" (4)
if (current_post_header.get_position_raw() != magic_binlog_offset) {
util::exception_location().raise<std::logic_error>(
"unexpected position found in an artificial rotate event in "
"GTID-based replication mode");
}
}
if (current_body.get_binlog() != binlog_name_) {
// in the case when binlog name in the artificial rotate event does not
// match the one specified in the last saved one, we should update it here
// and reset the position to "magic_offset" (4)

position_ = static_cast<std::uint32_t>(
current_event.get_post_header<code_type::rotate>().get_position_raw());
// please notice that the very first time the reader_context is created
// 'binlog_name' is initialized with the 'storage.get_current_binlog_name()'
// value

// also, when the storage objects is created on an empty directory,
// 'storage.get_current_binlog_name()' returns an empty string which will
// never be equal to any real binlog name
binlog_name_ = current_body.get_binlog();
reset_position();
}

// whether we should expect info-only FORMAT_DESCRIPTION and optional
// PREVIOUS_GTIDS_LOG events depends on whether we resumed streaming to an
// existing non-empty binlog file or not
if (current_body.get_binlog() == binlog_name_ &&
position_ > magic_binlog_offset) {
expect_ignorable_preamble_events_ = true;
}

info_only_event_ = true;
// transition to the next state
Expand Down Expand Up @@ -208,9 +259,9 @@ reader_context::process_event_in_format_description_expected_state(
// ...

// in other words, in GTID-based mode there is no way to distinguish whether
// the FDE / PREVIOUS_GTIDS_LOG is pseudo and should not be written, or not -
// that is why we rely only on externally supplied
// "start_from_new_binlog_file" constructor's argument
// the FDE / PREVIOUS_GTIDS_LOG should be written to the binlog file or
// not - that is why we rely only on the value of the
// "expect_ignorable_preamble_events" calculated previously
info_only_event_ = expect_ignorable_preamble_events_;
if (replication_mode_ == replication_mode_type::position &&
info_only_event_) {
Expand All @@ -224,10 +275,10 @@ reader_context::process_event_in_format_description_expected_state(
validate_position_and_advance(common_header);
}

// transition to the next state:
// the next expected event is PREVIOUS_GTIDS_LOG, unless we are in
// position-based replication mode and this we resumed streaming from the
// middle of a binlog file
// position-based replication mode and we resumed streaming to an
// existing non-empty binlog file, in which case we expect the next event
// to be one of the GTID_LOG events
if (replication_mode_ == replication_mode_type::position &&
info_only_event_) {
state_ = state_type::gtid_log_expected;
Expand Down Expand Up @@ -448,7 +499,7 @@ void reader_context::validate_position_and_advance(
position_ = common_header.get_next_event_position_raw();
}

void reader_context::reset_position() { position_ = 0U; }
void reader_context::reset_position() { position_ = magic_binlog_offset; }

void reader_context::start_transaction(const event &current_event) {
switch (current_event.get_common_header().get_type_code()) {
Expand Down
13 changes: 4 additions & 9 deletions src/binsrv/event/reader_context.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ class [[nodiscard]] reader_context {

public:
reader_context(std::uint32_t encoded_server_version, bool verify_checksum,
replication_mode_type replication_mode);
replication_mode_type replication_mode,
std::string_view binlog_name, std::uint32_t position);

[[nodiscard]] std::uint32_t
get_current_encoded_server_version() const noexcept {
Expand All @@ -47,9 +48,6 @@ class [[nodiscard]] reader_context {

[[nodiscard]] std::size_t
get_current_post_header_length(code_type code) const noexcept;
[[nodiscard]] std::uint32_t get_current_position() const noexcept {
return position_;
}

[[nodiscard]] bool has_transaction_gtid() const noexcept {
return !transaction_gtid_.is_empty();
Expand All @@ -67,10 +65,6 @@ class [[nodiscard]] reader_context {
return info_only_event_;
}

void set_expect_ignorable_preamble_events() noexcept {
expect_ignorable_preamble_events_ = true;
}

private:
// this class implements the logic of the following state machine
// (
Expand All @@ -92,8 +86,9 @@ class [[nodiscard]] reader_context {
std::uint32_t encoded_server_version_;
bool verify_checksum_;
replication_mode_type replication_mode_;
std::string binlog_name_;
std::uint32_t position_;
post_header_length_container post_header_lengths_{};
std::uint32_t position_{0U};

gtids::gtid transaction_gtid_{};
std::uint32_t expected_transaction_length_{0U};
Expand Down