From 129abe5e27e198ca8b9660b1a49dbc432ba1a3ad Mon Sep 17 00:00:00 2001 From: Abdelrahman Hedia Date: Mon, 1 Jun 2026 22:24:32 +0300 Subject: [PATCH] MDEV-37602: Phase 1 & 2 - Session-scoped rgi and Master_info registration in index Phase 1: Persist rpl_group_info per connection instead of per BINLOG statement Phase 2: Create session Master_info and register in master_info_index with unique names This enables proper cross-event context and visibility in SHOW SLAVE STATUS. --- sql/sql_binlog.cc | 90 +++++++++++++++++++++++++---------------------- sql/sql_class.cc | 28 +++++++++++++-- sql/sql_class.h | 2 ++ 3 files changed, 74 insertions(+), 46 deletions(-) diff --git a/sql/sql_binlog.cc b/sql/sql_binlog.cc index 36ad1a4ceba1b..b8c1b30ace271 100644 --- a/sql/sql_binlog.cc +++ b/sql/sql_binlog.cc @@ -24,6 +24,7 @@ #include "rpl_mi.h" #include "slave.h" #include "log_event.h" +#include /** @@ -163,14 +164,7 @@ int save_restore_context_apply_event(Log_event *ev, rpl_group_info *rgi) THD *thd= rgi->thd; Relay_log_info *rli= thd->rli_fake; - DBUG_ASSERT(!rli->mi); - LEX_CSTRING connection_name= { STRING_WITH_LEN("BINLOG_BASE64_EVENT") }; - - if (!(rli->mi= new Master_info(&connection_name, false))) - { - my_error(ER_OUT_OF_RESOURCES, MYF(0)); - return -1; - } + DBUG_ASSERT(rli->mi); sql_digest_state *m_digest= thd->m_digest; PSI_statement_locker *m_statement_psi= thd->m_statement_psi;; @@ -189,8 +183,6 @@ int save_restore_context_apply_event(Log_event *ev, rpl_group_info *rgi) thd->m_statement_psi= m_statement_psi; thd->variables.pseudo_thread_id= m_thread_id; thd->reset_db(&save_db); - delete rli->mi; - rli->mi= NULL; return err; } @@ -234,28 +226,55 @@ void mysql_client_binlog_statement(THD* thd) int err; Relay_log_info *rli; - rpl_group_info *rgi; + rpl_group_info *rgi= thd->rgi_fake; uchar *buf= NULL; size_t coded_len= 0, decoded_len= 0; - - rli= thd->rli_fake; - if (!rli && (rli= thd->rli_fake= new Relay_log_info(FALSE, "BINLOG_BASE64_EVENT"))) - rli->sql_driver_thd= thd; - if (!(rgi= thd->rgi_fake)) - rgi= thd->rgi_fake= new rpl_group_info(rli); - rgi->thd= thd; const char *error= 0; - Log_event *ev = 0; + Log_event *ev= 0; my_bool is_fragmented= FALSE; - my_bool keep_rgi= false; - /* - Out of memory check - */ - if (!(rli)) + + rli= thd->rli_fake; + if (!rli) { - my_error(ER_OUTOFMEMORY, MYF(ME_FATAL), 1); /* needed 1 bytes */ - goto end; + /* + Create a session-scoped Master_info. Its embedded Relay_log_info + serves as rli_fake for the connection, eliminating throwaway + Master_info creation per Query event in save_restore_context_apply_event(). + Register it in master_info_index for visibility in SHOW SLAVE STATUS. + */ + char conn_name_buf[64]; + int len= snprintf(conn_name_buf, sizeof(conn_name_buf), + "binlog_replay_%llu", + (unsigned long long)thd->thread_id); + LEX_CSTRING connection_name= { conn_name_buf, (size_t)len }; + + Master_info *mi= new Master_info(&connection_name, false); + if (!mi || mi->error()) + { + delete mi; + my_error(ER_OUTOFMEMORY, MYF(ME_FATAL), 1); + goto end; + } + mi->rli.mi= mi; + mi->rli.sql_driver_thd= thd; + + /* + Register in master_info_index so it appears in monitoring queries. + Pass write_to_file=FALSE since binlog_replay connections are ephemeral. + */ + if (master_info_index->add_master_info(mi, FALSE)) + { + delete mi; + my_error(ER_OUTOFMEMORY, MYF(ME_FATAL), 1); + goto end; + } + + thd->mi_fake= mi; + thd->rli_fake= rli= &mi->rli; } + if (!rgi) + rgi= thd->rgi_fake= new rpl_group_info(rli); + rgi->thd= thd; DBUG_ASSERT(rli->belongs_to_client()); @@ -412,16 +431,6 @@ void mysql_client_binlog_statement(THD* thd) */ LEX *backup_lex; - /* - If we are re-assembling a Rows_log_event from a group of - Partial_rows_log_events, the rgi houses the assembler, so we need - it around while we are re-constructing the event. - */ - if (ev->get_type_code() == PARTIAL_ROW_DATA_EVENT && - (((Partial_rows_log_event *) ev)->seq_no < - ((Partial_rows_log_event *) ev)->total_fragments)) - keep_rgi= true; - thd->backup_and_reset_current_lex(&backup_lex); err= save_restore_context_apply_event(ev, rgi); thd->restore_current_lex(backup_lex); @@ -462,13 +471,8 @@ void mysql_client_binlog_statement(THD* thd) if (unlikely(is_fragmented)) my_free(const_cast(thd->lex->comment.str)); thd->variables.option_bits= thd_options; - rgi->slave_close_thread_tables(thd); + if (rgi) + rgi->slave_close_thread_tables(thd); my_free(buf); - - if (!keep_rgi) - { - delete rgi; - rgi= thd->rgi_fake= NULL; - } DBUG_VOID_RETURN; } diff --git a/sql/sql_class.cc b/sql/sql_class.cc index 3856a1d4ed736..1f0560658ce6c 100644 --- a/sql/sql_class.cc +++ b/sql/sql_class.cc @@ -36,6 +36,7 @@ #include "sql_base.h" #include "sql_handler.h" // mysql_ha_cleanup #include "rpl_rli.h" +#include "rpl_mi.h" #include "rpl_filter.h" #include "rpl_record.h" #include "slave.h" @@ -714,7 +715,7 @@ const char *thd_where(THD *thd) THD::THD(my_thread_id id, bool is_wsrep_applier) :Statement(&main_lex, &main_mem_root, STMT_CONVENTIONAL_EXECUTION, /* statement id */ 0), - rli_fake(0), rgi_fake(0), rgi_slave(NULL), + mi_fake(0), rli_fake(0), rgi_fake(0), rgi_slave(NULL), protocol_text(this), protocol_binary(this), initial_status_var(0), m_current_stage_key(0), m_psi(0), start_time(0), start_time_sec_part(0), in_sub_stmt(0), log_all_errors(0), @@ -1792,8 +1793,29 @@ void THD::free_connection() net_end(&net); delete(rgi_fake); rgi_fake= NULL; - delete(rli_fake); - rli_fake= NULL; + if (mi_fake) + { + /* + When mi_fake is set, it's a session-scoped Master_info for BINLOG replay. + Deregister it from master_info_index before deleting. + The rli_fake points to the embedded Relay_log_info inside Master_info. + Deleting mi_fake destroys the RLI along with it. + */ + if (master_info_index) + { + mysql_mutex_lock(&LOCK_active_mi); + master_info_index->remove_master_info(mi_fake, FALSE); + mysql_mutex_unlock(&LOCK_active_mi); + } + delete mi_fake; + mi_fake= NULL; + rli_fake= NULL; + } + else + { + delete(rli_fake); + rli_fake= NULL; + } #endif if (!cleanup_done) cleanup(); diff --git a/sql/sql_class.h b/sql/sql_class.h index 03394f46307c0..9c71d6341aad9 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -96,6 +96,7 @@ enum wsrep_consistency_check_mode { class Reprepare_observer; class Relay_log_info; +class Master_info; struct rpl_group_info; struct rpl_parallel_thread; class Rpl_filter; @@ -3195,6 +3196,7 @@ class THD: public THD_count, /* this must be first */ MDL_context mdl_context; /* Used to execute base64 coded binlog events in MySQL server */ + Master_info* mi_fake; Relay_log_info* rli_fake; rpl_group_info* rgi_fake; /* Slave applier execution context */