Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions mtr/binlog_streaming/r/data_directory_8_0_to_8_4_upgrade.result
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@

*** Stopping current instance of 8.4 MySQL Server
include/stop_mysqld.inc [server 1]

*** Setting up the 8.0 data directory

*** Running 8.4 binaries on a 8.0 data directory
# restart

*** Checking if we can access data after upgrade
SELECT * FROM t1 ORDER BY id;
id
101
102
103
CHECKSUM TABLE t1;
Table Checksum
test.t1 1719212378

*** Adding more changed from the 8.4 server
INSERT INTO t1 VALUES(201);
START TRANSACTION;
INSERT INTO t1 VALUES(202);
INSERT INTO t1 VALUES(203);
COMMIT;
UPDATE t1 SET id = id + 100 WHERE id > 200;
DELETE FROM t1 WHERE id < 200;
DROP TABLE t1;

*** Setting up BINSRV environment

*** Generating a configuration file in JSON format for the Binlog
*** Server utility.

*** Determining binlog file directory from the server.

*** Creating a temporary directory <BINSRV_STORAGE_PATH> for storing
*** binlog files downloaded via the Binlog Server utility.

*** Executing the Binlog Server utility to download all binlog data
*** from both 8.0 and 8.4 binlog files

*** Stopping current instance of 8.4 MySQL Server
include/stop_mysqld.inc [server 1]

*** Cleaning up upgrade data directory
# restart

*** Cleaning up BINSRV environment

*** Removing the Binlog Server utility storage directory.

*** Removing the Binlog Server utility log file.

*** Removing the Binlog Server utility configuration file.
Binary file added mtr/binlog_streaming/std_data/data_80045_gtid.zip
Binary file not shown.
Binary file not shown.
2 changes: 1 addition & 1 deletion mtr/binlog_streaming/t/binsrv.test
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ INSERT INTO t1 VALUES(DEFAULT);
# identifying backend storage type ('file' or 's3')
--source ../include/identify_storage_backend.inc

# identifying utility checksum mode from the conbination
# identifying utility checksum mode from the combination
--let $extracted_init_connect_variable_name = binsrv_checksum
--source ../include/extract_init_connect_variable_value.inc

Expand Down
2 changes: 1 addition & 1 deletion mtr/binlog_streaming/t/checkpointing.test
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ DROP TABLE t1;
# identifying backend storage type ('file' or 's3')
--source ../include/identify_storage_backend.inc

# identifying interval checkpointing mode from the conbination
# identifying interval checkpointing mode from the combination
--let $extracted_init_connect_variable_name = binsrv_checkpointing
--source ../include/extract_init_connect_variable_value.inc

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
[position]

[gtid]
gtid-mode=on
enforce-gtid-consistency
102 changes: 102 additions & 0 deletions mtr/binlog_streaming/t/data_directory_8_0_to_8_4_upgrade.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
--source ../include/have_binsrv.inc

--source ../include/v80_v84_compatibility_defines.inc
if ($lts_series != v84)
{
--skip This test must be run on a MySQL Server 8.4
}

--source include/mysql_upgrade_preparation.inc

--let $binsrv_replication_mode = `SELECT IF(@@global.gtid_mode = 'ON', 'gtid', 'position')`

--let $CUSTOM_MYSQLD_DATADIR = $MYSQL_TMP_DIR/data_80045_$binsrv_replication_mode
--let $ZIP_FILE = $MYSQL_TEST_DIR/suite/binlog_streaming/std_data/data_80045_$binsrv_replication_mode.zip

--echo
--echo *** Stopping current instance of 8.4 MySQL Server
--source include/stop_mysqld.inc

--echo
--echo *** Setting up the 8.0 data directory
# Data directory was created with the following script
# CREATE TABLE t1(id SERIAL, PRIMARY KEY(id));
# INSERT INTO t1 VALUES();
# INSERT INTO t1 VALUES();
#
# START TRANSACTION;
# INSERT INTO t1 VALUES();
# INSERT INTO t1 VALUES();
# INSERT INTO t1 VALUES();
# INSERT INTO t1 VALUES();
# COMMIT;
#
# FLUSH BINARY LOGS;
#
# UPDATE t1 SET id = id + 100 WHERE id <= 3;
# DELETE FROM t1 WHERE id < 100;

--exec unzip -qo $ZIP_FILE -d $MYSQL_TMP_DIR

--echo
--echo *** Running 8.4 binaries on a 8.0 data directory
--let $MYSQLD_LOG = $MYSQLTEST_VARDIR/log/upgrade.log
--let $do_not_echo_parameters = 1
--let $restart_parameters = restart: --datadir=$CUSTOM_MYSQLD_DATADIR --log-error=$MYSQLD_LOG
--source include/start_mysqld.inc

--echo
--echo *** Checking if we can access data after upgrade
SELECT * FROM t1 ORDER BY id;
CHECKSUM TABLE t1;


--echo
--echo *** Adding more changed from the 8.4 server
INSERT INTO t1 VALUES(201);

START TRANSACTION;
INSERT INTO t1 VALUES(202);
INSERT INTO t1 VALUES(203);
COMMIT;

UPDATE t1 SET id = id + 100 WHERE id > 200;
DELETE FROM t1 WHERE id < 200;

DROP TABLE t1;

--echo
--echo *** Setting up BINSRV environment
# identifying backend storage type ('file' or 's3')
--source ../include/identify_storage_backend.inc

# creating data directory, configuration file, etc.
--let $binsrv_connect_timeout = 20
--let $binsrv_read_timeout = 60
--let $binsrv_idle_time = 10
--let $binsrv_verify_checksum = TRUE
# $binsrv_replication_mode already defined earlier
--let $binsrv_checkpoint_size = 1
--source ../include/set_up_binsrv_environment.inc

--echo
--echo *** Executing the Binlog Server utility to download all binlog data
--echo *** from both 8.0 and 8.4 binlog files
--exec $BINSRV fetch $binsrv_config_file_path > /dev/null

--echo
--echo *** Stopping current instance of 8.4 MySQL Server
--source include/stop_mysqld.inc

--echo
--echo *** Cleaning up upgrade data directory
--force-rmdir $CUSTOM_MYSQLD_DATADIR
--remove_file $MYSQLD_LOG

--let $restart_parameters =
--source include/start_mysqld.inc
--source include/mysql_upgrade_cleanup.inc

--echo
--echo *** Cleaning up BINSRV environment
--source ../include/tear_down_binsrv_environment.inc
2 changes: 1 addition & 1 deletion src/app.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,7 @@ void process_binlog_event(const binsrv::events::event_view &current_event_v,
// here we additionally check for log level because event materialization
// is not a trivial operation
if (binsrv::log_severity::debug >= logger.get_min_level()) {
const binsrv::events::event current_event{context, current_event_v};
const binsrv::events::event current_event{current_event_v};
logger.log(binsrv::log_severity::debug,
"event : [parsed] " +
boost::lexical_cast<std::string>(current_event));
Expand Down
27 changes: 10 additions & 17 deletions src/binsrv/events/event.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
#include <cassert>
#include <concepts>
#include <cstddef>
#include <cstdint>
#include <ostream>
#include <stdexcept>
#include <utility>
Expand All @@ -39,22 +38,20 @@

namespace binsrv::events {

event::event(const reader_context &context, const event_view &view)
event::event(const event_view &view)
: common_header_{view.get_common_header_raw()} {
const auto encoded_server_version{
context.get_current_encoded_server_version()};
const auto code{common_header_.get_type_code()};

emplace_post_header(encoded_server_version, code, view.get_post_header_raw());
emplace_body(encoded_server_version, code, view.get_body_raw());
emplace_post_header(code, view.get_post_header_raw());
emplace_body(code, view.get_body_raw());

if (view.has_footer()) {
footer_.emplace(view.get_footer_view());
};
}

event::event(const reader_context &context, util::const_byte_span portion)
: event{context, event_view{context, portion}} {}
: event{event_view{context, portion}} {}

template <typename T>
concept encodable = requires(const T &obj, util::byte_span &destination) {
Expand Down Expand Up @@ -99,17 +96,15 @@ void event::encode_to(util::byte_span &destination) const {
}
}

void event::emplace_post_header(std::uint32_t encoded_server_version,
code_type code, util::const_byte_span portion) {
void event::emplace_post_header(code_type code, util::const_byte_span portion) {
// our goal here is to initialize (emplace) a specific class inside
// 'post_header_' variant (determined via runtime argument 'code') with the
// 'portion' byte range

// we start with defining an alias for a member function pointer that
// accepts a byte range and performs some modification on an object of this
// class (on 'post_header_' member to be precise)
using emplace_function =
void (event::*)(std::uint32_t, util::const_byte_span);
using emplace_function = void (event::*)(util::const_byte_span);
// then, we define an alias for a container that can store
// '<number_of_events>' such member function pointers
using emplace_function_container =
Expand Down Expand Up @@ -148,14 +143,12 @@ void event::emplace_post_header(std::uint32_t encoded_server_version,
// this will initialize the 'post_header_' member with expected variant

// NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-constant-array-index)
(this->*emplace_functions[function_index])(encoded_server_version, portion);
(this->*emplace_functions[function_index])(portion);
}

void event::emplace_body(std::uint32_t encoded_server_version, code_type code,
util::const_byte_span portion) {
void event::emplace_body(code_type code, util::const_byte_span portion) {
// here we use the same technique as in 'emplace_post_header()'
using emplace_function =
void (event::*)(std::uint32_t, util::const_byte_span);
using emplace_function = void (event::*)(util::const_byte_span);
using emplace_function_container =
std::array<emplace_function, max_number_of_events>;
static constexpr emplace_function_container emplace_functions{
Expand All @@ -168,7 +161,7 @@ void event::emplace_body(std::uint32_t encoded_server_version, code_type code,
const auto function_index = util::enum_to_index(code);
assert(function_index < util::enum_to_index(code_type::delimiter));
// NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-constant-array-index)
(this->*emplace_functions[function_index])(encoded_server_version, portion);
(this->*emplace_functions[function_index])(portion);
}

void event::encode_and_checksum(event_storage &buffer, bool include_checksum) {
Expand Down
26 changes: 7 additions & 19 deletions src/binsrv/events/event.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ class [[nodiscard]] event {
post_header, body, include_checksum, nullptr};
}

event(const reader_context &context, const event_view &view);
explicit event(const event_view &view);
event(const reader_context &context, util::const_byte_span portion);

[[nodiscard]] const common_header &get_common_header() const noexcept {
Expand Down Expand Up @@ -192,27 +192,15 @@ class [[nodiscard]] event {
}

template <typename T>
void generic_emplace_post_header(std::uint32_t encoded_server_version,
util::const_byte_span portion) {
if constexpr (std::is_constructible_v<T, util::const_byte_span>) {
post_header_.emplace<T>(portion);
} else {
post_header_.emplace<T>(encoded_server_version, portion);
}
void generic_emplace_post_header(util::const_byte_span portion) {
post_header_.emplace<T>(portion);
}
void emplace_post_header(std::uint32_t encoded_server_version, code_type code,
util::const_byte_span portion);
void emplace_post_header(code_type code, util::const_byte_span portion);
template <typename T>
void generic_emplace_body(std::uint32_t encoded_server_version,
util::const_byte_span portion) {
if constexpr (std::is_constructible_v<T, util::const_byte_span>) {
body_.emplace<T>(portion);
} else {
body_.emplace<T>(encoded_server_version, portion);
}
void generic_emplace_body(util::const_byte_span portion) {
body_.emplace<T>(portion);
}
void emplace_body(std::uint32_t encoded_server_version, code_type code,
util::const_byte_span portion);
void emplace_body(code_type code, util::const_byte_span portion);
void encode_and_checksum(event_storage &buffer, bool include_checksum);
};

Expand Down
57 changes: 43 additions & 14 deletions src/binsrv/events/event_view.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,21 +70,50 @@ event_view_base::event_view_base(const reader_context &context,
"header");
}

post_header_size_ = context.get_current_post_header_length(code);
if (post_header_size_ == unspecified_post_header_length) {
util::exception_location().raise<std::invalid_argument>(
"received event of type " + std::to_string(util::enum_to_index(code)) +
" \"" + std::string{to_string_view(code)} +
"\" "
"is not known in server version " +
std::to_string(context.get_current_encoded_server_version()));
}
if (code == code_type::format_description) {
// special case for FORMAT_DESCRIPTION event

// in a scenario when 8.0 MySQL Server was upgraded to 8.4, it is possible
// for us to receive both 8.0 FDEs (coming from binlog files created before
// the upgrade) and 8.4 FDEs (coming from binlog files created after the
// upgrade)

// as 8.0 and 8.4 FDEs have different sizes of their post header section,
// the standard mechanism of extracting expected post header length from
// the context does not work here

// instead, we rely on the fact that FDE have body of a fixed size (1 byte
// for 'checksum_algorithm') and we can calculate the size of the
// post header based on that
const std::size_t group_size =
get_common_header_size() +
generic_body<code_type::format_description>::size_in_bytes +
get_footer_size();
if (get_total_size() < group_size) {
util::exception_location().raise<std::logic_error>(
"not enough data for format description event common header + body + "
"footer");
}
post_header_size_ = get_total_size() - group_size;
} else {
// for every other event, we proceed the standard way
post_header_size_ = context.get_current_post_header_length(code);
if (post_header_size_ == unspecified_post_header_length) {
util::exception_location().raise<std::invalid_argument>(
"received event of type " +
std::to_string(util::enum_to_index(code)) + " \"" +
std::string{to_string_view(code)} +
"\" "
"is not known in server version " +
std::to_string(context.get_current_encoded_server_version()));
}

const std::size_t group_size =
get_common_header_size() + get_post_header_size() + get_footer_size();
if (get_total_size() < group_size) {
util::exception_location().raise<std::logic_error>(
"not enough data for event post header + body + footer");
const std::size_t group_size =
get_common_header_size() + get_post_header_size() + get_footer_size();
if (get_total_size() < group_size) {
util::exception_location().raise<std::logic_error>(
"not enough data for event common header + post header + footer");
}
}

// optional checksum verification
Expand Down
Loading