Skip to content

Commit 084712b

Browse files
troyjfarrellbaloo
authored andcommitted
Clear table_map if RotateEvent has timestamp of 0 (julien-duponchelle#197)
* Clear table_map if RotateEvent has timestamp of 0 MySQL generates two RotateEvents when flushing binary logs. The first is added to the end of the old binary log. This RotateEvent has a valid timestamp. The second is added to the beginning of the new binary log. This RotateEvent has a timestamp of 0. Under some circumstances, e.g., the MySQL server crashes, the first RotateEvent will not be written to the binary log. (See the MySQL Internals Manual 20.9.) Before this change, when instantiating BinLogStreamReader with a positive skip_to_timestamp argument, the RotateEvents with a timestamp of 0 were ignored. This could cause silent corruption of the table_map. This change ensures that RotateEvents always reset the table_map, log_pos and log_file attributes. * Update documentation on running tests * Run the test_no_trailing_rotate_event with sudo This change alters the Travis CI configuration to run the test_no_trailing_rotate_event with sudo and exclude it from the normal test run. * Fix sudo not finding the nosetests command
1 parent 48c45ab commit 084712b

File tree

5 files changed

+264
-11
lines changed

5 files changed

+264
-11
lines changed

.travis.yml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,5 +22,7 @@ before_script:
2222
- env | grep DB
2323
- bash -c "if [ '$DB' = 'mysql57' ]; then sudo ./scripts/install_mysql57.sh; fi"
2424
- bash -c "if [ '$DB' = 'mysql56' ]; then sudo ./scripts/install_mysql56.sh; fi"
25+
- which nosetests > ~/where-is-nosetests
2526
script:
26-
- "nosetests"
27+
- "sudo $(cat ~/where-is-nosetests) pymysqlreplication.tests.test_abnormal:TestAbnormalBinLogStreamReader.test_no_trailing_rotate_event"
28+
- "nosetests -e test_no_trailing_rotate_event"

docs/developement.rst

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,11 +37,14 @@ Make sure you have the following configuration set in your mysql config file (us
3737
enforce_gtid_consistency
3838

3939

40-
To run tests:
40+
Run tests with
4141

4242
::
4343

44-
python setup.py test
44+
py.test -k "not test_no_trailing_rotate_event"
45+
46+
This will skip the ``test_no_trailing_rotate_event`` which requires that the
47+
user running the test have permission to alter the binary log files.
4548

4649
Running mysql in docker (main):
4750

pymysqlreplication/binlogstream.py

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -431,14 +431,6 @@ def fetchone(self):
431431
self.__freeze_schema,
432432
self.__fail_on_table_metadata_unavailable)
433433

434-
if self.skip_to_timestamp and binlog_event.timestamp < self.skip_to_timestamp:
435-
continue
436-
437-
if binlog_event.event_type == TABLE_MAP_EVENT and \
438-
binlog_event.event is not None:
439-
self.table_map[binlog_event.event.table_id] = \
440-
binlog_event.event.get_table()
441-
442434
if binlog_event.event_type == ROTATE_EVENT:
443435
self.log_pos = binlog_event.event.position
444436
self.log_file = binlog_event.event.next_binlog
@@ -455,6 +447,38 @@ def fetchone(self):
455447
elif binlog_event.log_pos:
456448
self.log_pos = binlog_event.log_pos
457449

450+
# This check must not occur before clearing the ``table_map`` as a
451+
# result of a RotateEvent.
452+
#
453+
# The first RotateEvent in a binlog file has a timestamp of
454+
# zero. If the server has moved to a new log and not written a
455+
# timestamped RotateEvent at the end of the previous log, the
456+
# RotateEvent at the beginning of the new log will be ignored
457+
# if the caller provided a positive ``skip_to_timestamp``
458+
# value. This will result in the ``table_map`` becoming
459+
# corrupt.
460+
#
461+
# https://dev.mysql.com/doc/internals/en/event-data-for-specific-event-types.html
462+
# From the MySQL Internals Manual:
463+
#
464+
# ROTATE_EVENT is generated locally and written to the binary
465+
# log on the master. It is written to the relay log on the
466+
# slave when FLUSH LOGS occurs, and when receiving a
467+
# ROTATE_EVENT from the master. In the latter case, there
468+
# will be two rotate events in total originating on different
469+
# servers.
470+
#
471+
# There are conditions under which the terminating
472+
# log-rotation event does not occur. For example, the server
473+
# might crash.
474+
if self.skip_to_timestamp and binlog_event.timestamp < self.skip_to_timestamp:
475+
continue
476+
477+
if binlog_event.event_type == TABLE_MAP_EVENT and \
478+
binlog_event.event is not None:
479+
self.table_map[binlog_event.event.table_id] = \
480+
binlog_event.event.get_table()
481+
458482
# event is none if we have filter it on packet level
459483
# we filter also not allowed events
460484
if binlog_event.event is None or (binlog_event.event.__class__ not in self.__allowed_events):
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
'''Read binlog files'''
2+
import struct
3+
4+
from pymysql.util import byte2int
5+
from pymysqlreplication import constants
6+
from pymysqlreplication.event import FormatDescriptionEvent
7+
from pymysqlreplication.event import QueryEvent
8+
from pymysqlreplication.event import RotateEvent
9+
from pymysqlreplication.event import XidEvent
10+
from pymysqlreplication.row_event import TableMapEvent
11+
from pymysqlreplication.row_event import WriteRowsEvent
12+
13+
class SimpleBinLogFileReader(object):
14+
'''Read binlog files'''
15+
16+
_expected_magic = b'\xfebin'
17+
18+
def __init__(self, file_path, only_events=None):
19+
self._current_event = None
20+
self._file = None
21+
self._file_path = file_path
22+
self._only_events = only_events
23+
self._pos = None
24+
25+
def fetchone(self):
26+
'''Fetch one record from the binlog file'''
27+
if self._pos is None or self._pos < 4:
28+
self._read_magic()
29+
while True:
30+
event = self._read_event()
31+
self._current_event = event
32+
if event is None:
33+
return None
34+
if self._filter_events(event):
35+
return event
36+
37+
def truncatebinlog(self):
38+
'''Truncate the binlog file at the current event'''
39+
if self._current_event is not None:
40+
self._file.truncate(self._current_event.pos)
41+
42+
def _filter_events(self, event):
43+
'''Return True if an event can be returned'''
44+
# It would be good if we could reuse the __event_map in
45+
# packet.BinLogPacketWrapper.
46+
event_type = {
47+
constants.QUERY_EVENT: QueryEvent,
48+
constants.ROTATE_EVENT: RotateEvent,
49+
constants.FORMAT_DESCRIPTION_EVENT: FormatDescriptionEvent,
50+
constants.XID_EVENT: XidEvent,
51+
constants.TABLE_MAP_EVENT: TableMapEvent,
52+
constants.WRITE_ROWS_EVENT_V2: WriteRowsEvent,
53+
}.get(event.event_type)
54+
return event_type in self._only_events
55+
56+
def _open_file(self):
57+
'''Open the file at ``self._file_path``'''
58+
if self._file is None:
59+
self._file = open(self._file_path, 'rb+')
60+
self._pos = self._file.tell()
61+
assert self._pos == 0
62+
63+
def _read_event(self):
64+
'''Read an event from the binlog file'''
65+
# Assuming a binlog version > 1
66+
headerlength = 19
67+
header = self._file.read(headerlength)
68+
event_pos = self._pos
69+
self._pos += len(header)
70+
if len(header) == 0:
71+
return None
72+
event = SimpleBinLogEvent(header)
73+
event.set_pos(event_pos)
74+
if event.event_size < headerlength:
75+
messagefmt = 'Event size {0} is too small'
76+
message = messagefmt.format(event.event_size)
77+
raise EventSizeTooSmallError(message)
78+
else:
79+
body = self._file.read(event.event_size - headerlength)
80+
self._pos += len(body)
81+
event.set_body(body)
82+
return event
83+
84+
def _read_magic(self):
85+
'''Read the first four *magic* bytes of the binlog file'''
86+
self._open_file()
87+
if self._pos == 0:
88+
magic = self._file.read(4)
89+
if magic == self._expected_magic:
90+
self._pos += len(magic)
91+
else:
92+
messagefmt = 'Magic bytes {0!r} did not match expected {1!r}'
93+
message = messagefmt.format(magic, self._expected_magic)
94+
raise BadMagicBytesError(message)
95+
96+
def __iter__(self):
97+
return iter(self.fetchone, None)
98+
99+
def __repr__(self):
100+
cls = self.__class__
101+
mod = cls.__module__
102+
name = cls.__name__
103+
only = [type(x).__name__ for x in self._only_events]
104+
fmt = '<{mod}.{name}(file_path={fpath}, only_events={only})>'
105+
return fmt.format(mod=mod, name=name, fpath=self._file_path, only=only)
106+
107+
108+
# pylint: disable=too-many-instance-attributes
109+
class SimpleBinLogEvent(object):
110+
'''An event from a binlog file'''
111+
112+
def __init__(self, header):
113+
'''Initialize the Event with the event header'''
114+
unpacked = struct.unpack('<IcIIIH', header)
115+
self.timestamp = unpacked[0]
116+
self.event_type = byte2int(unpacked[1])
117+
self.server_id = unpacked[2]
118+
self.event_size = unpacked[3]
119+
self.log_pos = unpacked[4]
120+
self.flags = unpacked[5]
121+
122+
self.body = None
123+
self.pos = None
124+
125+
def set_body(self, body):
126+
'''Save the body bytes'''
127+
self.body = body
128+
129+
def set_pos(self, pos):
130+
'''Save the event position'''
131+
self.pos = pos
132+
133+
def __repr__(self):
134+
cls = self.__class__
135+
mod = cls.__module__
136+
name = cls.__name__
137+
fmt = '<{mod}.{name}(timestamp={ts}, event_type={et}, log_pos={pos})>'
138+
return fmt.format(
139+
mod=mod,
140+
name=name,
141+
ts=int(self.timestamp),
142+
et=self.event_type,
143+
pos=self.log_pos)
144+
145+
146+
class BadMagicBytesError(Exception):
147+
'''The binlog file magic bytes did not match the specification'''
148+
149+
class EventSizeTooSmallError(Exception):
150+
'''The event size was smaller than the length of the event header'''
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
# -*- coding: utf-8 -*-
2+
'''Test abnormal conditions, such as caused by a MySQL crash
3+
'''
4+
import os.path
5+
6+
from pymysqlreplication.tests import base
7+
from pymysqlreplication.tests.binlogfilereader import SimpleBinLogFileReader
8+
from pymysqlreplication import BinLogStreamReader
9+
from pymysqlreplication.event import GtidEvent
10+
from pymysqlreplication.event import RotateEvent
11+
12+
class TestAbnormalBinLogStreamReader(base.PyMySQLReplicationTestCase):
13+
'''Test abnormal condition handling in the BinLogStreamReader
14+
'''
15+
16+
@staticmethod
17+
def ignored_events():
18+
'''Events the BinLogStreamReader should ignore'''
19+
return [GtidEvent]
20+
21+
def test_no_trailing_rotate_event(self):
22+
'''A missing RotateEvent and skip_to_timestamp cause corruption
23+
24+
This test shows that a binlog file which lacks the trailing RotateEvent
25+
and the use of the ``skip_to_timestamp`` argument together can cause
26+
the table_map to become corrupt. The trailing RotateEvent has a
27+
timestamp, but may be lost if the server crashes. The leading
28+
RotateEvent in the next binlog file always has a timestamp of 0, thus
29+
is discarded when ``skip_to_timestamp`` is greater than zero.
30+
'''
31+
self.execute(
32+
'CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, '
33+
'data VARCHAR (50) NOT NULL, PRIMARY KEY(id))')
34+
self.execute('SET AUTOCOMMIT = 0')
35+
self.execute('INSERT INTO test(id, data) VALUES (1, "Hello")')
36+
self.execute('COMMIT')
37+
timestamp = self.execute('SELECT UNIX_TIMESTAMP()').fetchone()[0]
38+
self.execute('FLUSH BINARY LOGS')
39+
self.execute('INSERT INTO test(id, data) VALUES (2, "Hi")')
40+
self.stream.close()
41+
self._remove_trailing_rotate_event_from_first_binlog()
42+
43+
binlog = self.execute("SHOW BINARY LOGS").fetchone()[0]
44+
45+
self.stream = BinLogStreamReader(
46+
self.database,
47+
server_id=1024,
48+
log_pos=4,
49+
log_file=binlog,
50+
skip_to_timestamp=timestamp,
51+
ignored_events=self.ignored_events())
52+
for _ in self.stream:
53+
pass
54+
# The table_map should be empty because of the binlog being rotated.
55+
self.assertEqual({}, self.stream.table_map)
56+
57+
def _remove_trailing_rotate_event_from_first_binlog(self):
58+
'''Remove the trailing RotateEvent from the first binlog
59+
60+
According to the MySQL Internals Manual, a RotateEvent will be added to
61+
the end of a binlog when the binlog is rotated. This may not happen if
62+
the server crashes, for example.
63+
64+
This method removes the trailing RotateEvent to verify that the library
65+
properly handles this case.
66+
'''
67+
datadir = self.execute("SHOW VARIABLES LIKE 'datadir'").fetchone()[1]
68+
binlog = self.execute("SHOW BINARY LOGS").fetchone()[0]
69+
binlogpath = os.path.join(datadir, binlog)
70+
71+
reader = SimpleBinLogFileReader(binlogpath, only_events=[RotateEvent])
72+
for _ in reader:
73+
reader.truncatebinlog()
74+
break

0 commit comments

Comments
 (0)