Skip to content

Commit 29b955a

Browse files
committed
replication: decode heartbeat events
1 parent 8abbc87 commit 29b955a

File tree

5 files changed

+137
-3
lines changed

5 files changed

+137
-3
lines changed

replication/const.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -261,3 +261,19 @@ const (
261261
ENUM_EXTRA_ROW_INFO_TYPECODE_NDB byte = iota
262262
ENUM_EXTRA_ROW_INFO_TYPECODE_PARTITION
263263
)
264+
265+
// Binlog flags, from include/mysql.h
266+
const (
267+
USE_HEARTBEAT_EVENT_V2 = 1 << 1
268+
MYSQL_RPL_SKIP_TAGGED_GTIDS = 1 << 2
269+
MYSQL_RPL_GTID = 1 << 16
270+
MYSQL_RPL_SKIP_HEARTBEAT = 1 << 17
271+
)
272+
273+
// On-The-Wire HeartBeat fields
274+
// See enum mysql::binlog::event::codecs::binary::Heartbeat::fields in MySQL
275+
const (
276+
OTW_HB_HEADER_END_MARK = iota
277+
OTW_HB_LOG_FILENAME_FIELD
278+
OTW_HB_LOG_POSITION_FIELD
279+
)

replication/event.go

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -945,3 +945,67 @@ func (i *IntVarEvent) Dump(w io.Writer) {
945945
fmt.Fprintf(w, "Type: %d\n", i.Type)
946946
fmt.Fprintf(w, "Value: %d\n", i.Value)
947947
}
948+
949+
// HeartbeatEvent is a HEARTBEAT_EVENT or HEARTBEAT_LOG_EVENT_V2
950+
// https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_replication_binlog_event.html#sect_protocol_replication_event_heartbeat
951+
type HeartbeatEvent struct {
952+
// Event version, either 1 for HEARTBEAT_EVENT or 2 for HEARTBEAT_LOG_EVENT_V2
953+
Version int
954+
955+
// Filename of the binary log
956+
Filename string
957+
958+
// Offset is the offset in the binlog file
959+
Offset uint64
960+
}
961+
962+
// Decode is decoding a heartbeat event payload (excluding event header and checksum)
963+
func (h *HeartbeatEvent) Decode(data []byte) error {
964+
switch h.Version {
965+
case 1:
966+
// Also known as HEARTBEAT_EVENT
967+
h.Filename = string(data)
968+
case 2:
969+
// Also known as HEARTBEAT_LOG_EVENT_V2
970+
//
971+
// The server sends this in the binlog stream if the following is set:
972+
// DumpCommandFlag: replication.USE_HEARTBEAT_EVENT_V2
973+
pos := 0
974+
for pos < len(data) {
975+
switch data[pos] {
976+
case OTW_HB_LOG_FILENAME_FIELD:
977+
pos++
978+
nameLength := int(data[pos])
979+
pos++
980+
h.Filename = string(data[pos : pos+nameLength])
981+
pos += nameLength
982+
case OTW_HB_LOG_POSITION_FIELD:
983+
pos++
984+
offsetLength := int(data[pos])
985+
pos++
986+
var n int
987+
h.Offset, _, n = mysql.LengthEncodedInt(data[pos : pos+offsetLength])
988+
if n != offsetLength {
989+
return errors.New("failed to read binary log offset")
990+
}
991+
pos += offsetLength
992+
case OTW_HB_HEADER_END_MARK:
993+
pos++
994+
default:
995+
return errors.New("unknown heartbeatv2 field")
996+
}
997+
}
998+
default:
999+
return errors.New("unknown heartbeat version")
1000+
}
1001+
1002+
return nil
1003+
}
1004+
1005+
func (h *HeartbeatEvent) Dump(w io.Writer) {
1006+
fmt.Fprintf(w,
1007+
"Heartbeat Event Version: %d\nBinlog File Name: %s\nBinlog Offset: %d\n",
1008+
h.Version, h.Filename, h.Offset,
1009+
)
1010+
fmt.Fprintln(w)
1011+
}

replication/event_test.go

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,3 +194,56 @@ func TestPreviousGTIDEvent(t *testing.T) {
194194
require.Equal(t, tc.GTIDSets, e.GTIDSets)
195195
}
196196
}
197+
198+
func TestHeartbeatEvent(t *testing.T) {
199+
testcases := []struct {
200+
err bool
201+
version int
202+
input []byte // make sure to strip the 4 byte checksum
203+
hbEvent HeartbeatEvent
204+
}{
205+
{
206+
false,
207+
2,
208+
[]byte{0x1, 0xd, 0x62, 0x69, 0x6e, 0x6c, 0x6f, 0x67, 0x2e, 0x30, 0x30, 0x30, 0x30, 0x30, 0x31, 0x2, 0x1, 0x9e, 0x0},
209+
HeartbeatEvent{Version: 2, Filename: "binlog.000001", Offset: 158},
210+
},
211+
{
212+
true,
213+
2,
214+
// 0x3 is not a valid field type
215+
[]byte{0x3, 0xd, 0x62, 0x69, 0x6e, 0x6c, 0x6f, 0x67, 0x2e, 0x30, 0x30, 0x30, 0x30, 0x30, 0x31, 0x2, 0x1, 0x9e, 0x0},
216+
HeartbeatEvent{},
217+
},
218+
{
219+
true,
220+
2,
221+
// 0x2, 0x9e is not a valid length encoded integer
222+
[]byte{0x1, 0xd, 0x62, 0x69, 0x6e, 0x6c, 0x6f, 0x67, 0x2e, 0x30, 0x30, 0x30, 0x30, 0x30, 0x31, 0x2, 0x2, 0x9e, 0x0},
223+
HeartbeatEvent{Version: 2, Filename: "binlog.000001", Offset: 158},
224+
},
225+
{
226+
false,
227+
1,
228+
[]byte{0x62, 0x69, 0x6e, 0x6c, 0x6f, 0x67, 0x2e, 0x30, 0x30, 0x30, 0x30, 0x30, 0x31},
229+
HeartbeatEvent{Version: 1, Filename: "binlog.000001"},
230+
},
231+
{
232+
true,
233+
3, // invalid heartbeat version
234+
[]byte{},
235+
HeartbeatEvent{},
236+
},
237+
}
238+
239+
for _, tc := range testcases {
240+
e := HeartbeatEvent{Version: tc.version}
241+
err := e.Decode(tc.input)
242+
if tc.err {
243+
require.Error(t, err)
244+
} else {
245+
require.NoError(t, err)
246+
require.Equal(t, tc.hbEvent, e)
247+
}
248+
}
249+
}

replication/generic_event.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,3 @@ func (e *GenericEvent) Decode(data []byte) error {
161161
// MessageLength uint8
162162
// Message []byte
163163
// }
164-
165-
// type HeartbeatEvent struct {
166-
// }

replication/parser.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -320,6 +320,10 @@ func (p *BinlogParser) parseEvent(h *EventHeader, data []byte, rawData []byte) (
320320
e = &IntVarEvent{}
321321
case TRANSACTION_PAYLOAD_EVENT:
322322
e = p.newTransactionPayloadEvent()
323+
case HEARTBEAT_EVENT:
324+
e = &HeartbeatEvent{Version: 1}
325+
case HEARTBEAT_LOG_EVENT_V2:
326+
e = &HeartbeatEvent{Version: 2}
323327
default:
324328
e = &GenericEvent{}
325329
}

0 commit comments

Comments
 (0)