From 29b955a545fb86a7e2a185c2936b18eccbbfca0d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20van=20Eeden?= Date: Tue, 16 Dec 2025 09:15:59 +0100 Subject: [PATCH 1/2] replication: decode heartbeat events --- replication/const.go | 16 +++++++++ replication/event.go | 64 ++++++++++++++++++++++++++++++++++++ replication/event_test.go | 53 +++++++++++++++++++++++++++++ replication/generic_event.go | 3 -- replication/parser.go | 4 +++ 5 files changed, 137 insertions(+), 3 deletions(-) diff --git a/replication/const.go b/replication/const.go index e4cfd2bd3..7894df2a6 100644 --- a/replication/const.go +++ b/replication/const.go @@ -261,3 +261,19 @@ const ( ENUM_EXTRA_ROW_INFO_TYPECODE_NDB byte = iota ENUM_EXTRA_ROW_INFO_TYPECODE_PARTITION ) + +// Binlog flags, from include/mysql.h +const ( + USE_HEARTBEAT_EVENT_V2 = 1 << 1 + MYSQL_RPL_SKIP_TAGGED_GTIDS = 1 << 2 + MYSQL_RPL_GTID = 1 << 16 + MYSQL_RPL_SKIP_HEARTBEAT = 1 << 17 +) + +// On-The-Wire HeartBeat fields +// See enum mysql::binlog::event::codecs::binary::Heartbeat::fields in MySQL +const ( + OTW_HB_HEADER_END_MARK = iota + OTW_HB_LOG_FILENAME_FIELD + OTW_HB_LOG_POSITION_FIELD +) diff --git a/replication/event.go b/replication/event.go index f1a6f4ca4..86e882b5a 100644 --- a/replication/event.go +++ b/replication/event.go @@ -945,3 +945,67 @@ func (i *IntVarEvent) Dump(w io.Writer) { fmt.Fprintf(w, "Type: %d\n", i.Type) fmt.Fprintf(w, "Value: %d\n", i.Value) } + +// HeartbeatEvent is a HEARTBEAT_EVENT or HEARTBEAT_LOG_EVENT_V2 +// https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_replication_binlog_event.html#sect_protocol_replication_event_heartbeat +type HeartbeatEvent struct { + // Event version, either 1 for HEARTBEAT_EVENT or 2 for HEARTBEAT_LOG_EVENT_V2 + Version int + + // Filename of the binary log + Filename string + + // Offset is the offset in the binlog file + Offset uint64 +} + +// Decode is decoding a heartbeat event payload (excluding event header and checksum) +func (h *HeartbeatEvent) Decode(data []byte) error { + switch h.Version { + case 1: + // Also known as HEARTBEAT_EVENT + h.Filename = string(data) + case 2: + // Also known as HEARTBEAT_LOG_EVENT_V2 + // + // The server sends this in the binlog stream if the following is set: + // DumpCommandFlag: replication.USE_HEARTBEAT_EVENT_V2 + pos := 0 + for pos < len(data) { + switch data[pos] { + case OTW_HB_LOG_FILENAME_FIELD: + pos++ + nameLength := int(data[pos]) + pos++ + h.Filename = string(data[pos : pos+nameLength]) + pos += nameLength + case OTW_HB_LOG_POSITION_FIELD: + pos++ + offsetLength := int(data[pos]) + pos++ + var n int + h.Offset, _, n = mysql.LengthEncodedInt(data[pos : pos+offsetLength]) + if n != offsetLength { + return errors.New("failed to read binary log offset") + } + pos += offsetLength + case OTW_HB_HEADER_END_MARK: + pos++ + default: + return errors.New("unknown heartbeatv2 field") + } + } + default: + return errors.New("unknown heartbeat version") + } + + return nil +} + +func (h *HeartbeatEvent) Dump(w io.Writer) { + fmt.Fprintf(w, + "Heartbeat Event Version: %d\nBinlog File Name: %s\nBinlog Offset: %d\n", + h.Version, h.Filename, h.Offset, + ) + fmt.Fprintln(w) +} diff --git a/replication/event_test.go b/replication/event_test.go index d7fa43927..2fbdb8a3c 100644 --- a/replication/event_test.go +++ b/replication/event_test.go @@ -194,3 +194,56 @@ func TestPreviousGTIDEvent(t *testing.T) { require.Equal(t, tc.GTIDSets, e.GTIDSets) } } + +func TestHeartbeatEvent(t *testing.T) { + testcases := []struct { + err bool + version int + input []byte // make sure to strip the 4 byte checksum + hbEvent HeartbeatEvent + }{ + { + false, + 2, + []byte{0x1, 0xd, 0x62, 0x69, 0x6e, 0x6c, 0x6f, 0x67, 0x2e, 0x30, 0x30, 0x30, 0x30, 0x30, 0x31, 0x2, 0x1, 0x9e, 0x0}, + HeartbeatEvent{Version: 2, Filename: "binlog.000001", Offset: 158}, + }, + { + true, + 2, + // 0x3 is not a valid field type + []byte{0x3, 0xd, 0x62, 0x69, 0x6e, 0x6c, 0x6f, 0x67, 0x2e, 0x30, 0x30, 0x30, 0x30, 0x30, 0x31, 0x2, 0x1, 0x9e, 0x0}, + HeartbeatEvent{}, + }, + { + true, + 2, + // 0x2, 0x9e is not a valid length encoded integer + []byte{0x1, 0xd, 0x62, 0x69, 0x6e, 0x6c, 0x6f, 0x67, 0x2e, 0x30, 0x30, 0x30, 0x30, 0x30, 0x31, 0x2, 0x2, 0x9e, 0x0}, + HeartbeatEvent{Version: 2, Filename: "binlog.000001", Offset: 158}, + }, + { + false, + 1, + []byte{0x62, 0x69, 0x6e, 0x6c, 0x6f, 0x67, 0x2e, 0x30, 0x30, 0x30, 0x30, 0x30, 0x31}, + HeartbeatEvent{Version: 1, Filename: "binlog.000001"}, + }, + { + true, + 3, // invalid heartbeat version + []byte{}, + HeartbeatEvent{}, + }, + } + + for _, tc := range testcases { + e := HeartbeatEvent{Version: tc.version} + err := e.Decode(tc.input) + if tc.err { + require.Error(t, err) + } else { + require.NoError(t, err) + require.Equal(t, tc.hbEvent, e) + } + } +} diff --git a/replication/generic_event.go b/replication/generic_event.go index ce13444a0..78da9c0d6 100644 --- a/replication/generic_event.go +++ b/replication/generic_event.go @@ -161,6 +161,3 @@ func (e *GenericEvent) Decode(data []byte) error { // MessageLength uint8 // Message []byte // } - -// type HeartbeatEvent struct { -// } diff --git a/replication/parser.go b/replication/parser.go index ebed120b3..58a43360c 100644 --- a/replication/parser.go +++ b/replication/parser.go @@ -320,6 +320,10 @@ func (p *BinlogParser) parseEvent(h *EventHeader, data []byte, rawData []byte) ( e = &IntVarEvent{} case TRANSACTION_PAYLOAD_EVENT: e = p.newTransactionPayloadEvent() + case HEARTBEAT_EVENT: + e = &HeartbeatEvent{Version: 1} + case HEARTBEAT_LOG_EVENT_V2: + e = &HeartbeatEvent{Version: 2} default: e = &GenericEvent{} } From 8e7728095c6be4a9f4b7d2a781b2a023ce7b3cdf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20van=20Eeden?= Date: Tue, 16 Dec 2025 09:26:35 +0100 Subject: [PATCH 2/2] Add a testcase with a binlog postion of >1 byte --- replication/event_test.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/replication/event_test.go b/replication/event_test.go index 2fbdb8a3c..27e7469f3 100644 --- a/replication/event_test.go +++ b/replication/event_test.go @@ -234,6 +234,12 @@ func TestHeartbeatEvent(t *testing.T) { []byte{}, HeartbeatEvent{}, }, + { + false, + 2, + []byte{0x1, 0xd, 0x62, 0x69, 0x6e, 0x6c, 0x6f, 0x67, 0x2e, 0x30, 0x30, 0x30, 0x30, 0x30, 0x31, 0x2, 0x4, 0xfd, 0xbb, 0xaf, 0x27, 0x0}, + HeartbeatEvent{Version: 2, Filename: "binlog.000001", Offset: 2600891}, + }, } for _, tc := range testcases {