From 7af9fb8424fdd41f8d7bbf5b2d15506dedffbd2e Mon Sep 17 00:00:00 2001 From: Paul Vickers Date: Tue, 14 Sep 2021 19:55:32 +0100 Subject: [PATCH 1/7] Add support for specifying an end_log_position --- pymysqlreplication/binlogstream.py | 53 ++++++++++++++++++++---------- 1 file changed, 35 insertions(+), 18 deletions(-) diff --git a/pymysqlreplication/binlogstream.py b/pymysqlreplication/binlogstream.py index 1c2154f6..856f688e 100644 --- a/pymysqlreplication/binlogstream.py +++ b/pymysqlreplication/binlogstream.py @@ -130,7 +130,8 @@ class BinLogStreamReader(object): def __init__(self, connection_settings, server_id, ctl_connection_settings=None, resume_stream=False, blocking=False, only_events=None, log_file=None, - log_pos=None, filter_non_implemented_events=True, + log_pos=None, end_log_pos=None, + filter_non_implemented_events=True, ignored_events=None, auto_position=None, only_tables=None, ignored_tables=None, only_schemas=None, ignored_schemas=None, @@ -152,6 +153,7 @@ def __init__(self, connection_settings, server_id, log_file: Set replication start log file log_pos: Set replication start log pos (resume_stream should be true) + end_log_pos: Set replication end log pos auto_position: Use master_auto_position gtid to set position only_tables: An array with the tables you want to watch (only works in binlog_format ROW) @@ -205,6 +207,7 @@ def __init__(self, connection_settings, server_id, # Store table meta information self.table_map = {} self.log_pos = log_pos + self.end_log_pos = end_log_pos self.log_file = log_file self.auto_position = auto_position self.skip_to_timestamp = skip_to_timestamp @@ -235,7 +238,8 @@ def __connect_to_ctl(self): self._ctl_connection_settings = dict(self.__connection_settings) self._ctl_connection_settings["db"] = "information_schema" self._ctl_connection_settings["cursorclass"] = DictCursor - self._ctl_connection = self.pymysql_wrapper(**self._ctl_connection_settings) + self._ctl_connection = self.pymysql_wrapper( + **self._ctl_connection_settings) self._ctl_connection._get_table_information = self.__get_table_information self.__connected_ctl = True @@ -273,7 +277,8 @@ def __connect_to_stream(self): # flags (2) BINLOG_DUMP_NON_BLOCK (0 or 1) # server_id (4) -- server id of this slave # log_file (string.EOF) -- filename of the binlog on the master - self._stream_connection = self.pymysql_wrapper(**self.__connection_settings) + self._stream_connection = self.pymysql_wrapper( + **self.__connection_settings) self.__use_checksum = self.__checksum_enabled() @@ -281,7 +286,8 @@ def __connect_to_stream(self): # we support it if self.__use_checksum: cur = self._stream_connection.cursor() - cur.execute("set @master_binlog_checksum= @@global.binlog_checksum") + cur.execute( + "set @master_binlog_checksum= @@global.binlog_checksum") cur.close() if self.slave_uuid: @@ -295,7 +301,7 @@ def __connect_to_stream(self): 4294967)) # If heartbeat is too low, the connection will disconnect before, # this is also the behavior in mysql - heartbeat = float(min(net_timeout/2., self.slave_heartbeat)) + heartbeat = float(min(net_timeout / 2., self.slave_heartbeat)) if heartbeat > 4294967: heartbeat = 4294967 @@ -416,7 +422,11 @@ def __connect_to_stream(self): self.__connected_stream = True def fetchone(self): - while True: + should_continue = True + while should_continue: + if past_end_position: + return None + if not self.__connected_stream: self.__connect_to_stream() @@ -443,16 +453,18 @@ def fetchone(self): if not pkt.is_ok_packet(): continue - binlog_event = BinLogPacketWrapper(pkt, self.table_map, - self._ctl_connection, - self.__use_checksum, - self.__allowed_events_in_packet, - self.__only_tables, - self.__ignored_tables, - self.__only_schemas, - self.__ignored_schemas, - self.__freeze_schema, - self.__fail_on_table_metadata_unavailable) + binlog_event = BinLogPacketWrapper( + pkt, + self.table_map, + self._ctl_connection, + self.__use_checksum, + self.__allowed_events_in_packet, + self.__only_tables, + self.__ignored_tables, + self.__only_schemas, + self.__ignored_schemas, + self.__freeze_schema, + self.__fail_on_table_metadata_unavailable) if binlog_event.event_type == ROTATE_EVENT: self.log_pos = binlog_event.event.position @@ -470,6 +482,10 @@ def fetchone(self): elif binlog_event.log_pos: self.log_pos = binlog_event.log_pos + if self.end_log_pos and self.log_pos >= self.end_log_pos: + # We're currently at, or past, the specified end log position. + should_continue = False + # This check must not occur before clearing the ``table_map`` as a # result of a RotateEvent. # @@ -504,7 +520,8 @@ def fetchone(self): # event is none if we have filter it on packet level # we filter also not allowed events - if binlog_event.event is None or (binlog_event.event.__class__ not in self.__allowed_events): + if binlog_event.event is None or ( + binlog_event.event.__class__ not in self.__allowed_events): continue return binlog_event.event @@ -529,7 +546,7 @@ def _allowed_event_list(self, only_events, ignored_events, TableMapEvent, HeartbeatLogEvent, NotImplementedEvent, - )) + )) if ignored_events is not None: for e in ignored_events: events.remove(e) From 7b48367ecc9f3d8e954b062f26ae61027c05f5f4 Mon Sep 17 00:00:00 2001 From: Paul Vickers Date: Tue, 14 Sep 2021 21:12:46 +0100 Subject: [PATCH 2/7] Undo unrelated formatting changes --- pymysqlreplication/binlogstream.py | 38 +++++++++++++----------------- 1 file changed, 16 insertions(+), 22 deletions(-) diff --git a/pymysqlreplication/binlogstream.py b/pymysqlreplication/binlogstream.py index 856f688e..f299e84d 100644 --- a/pymysqlreplication/binlogstream.py +++ b/pymysqlreplication/binlogstream.py @@ -238,8 +238,7 @@ def __connect_to_ctl(self): self._ctl_connection_settings = dict(self.__connection_settings) self._ctl_connection_settings["db"] = "information_schema" self._ctl_connection_settings["cursorclass"] = DictCursor - self._ctl_connection = self.pymysql_wrapper( - **self._ctl_connection_settings) + self._ctl_connection = self.pymysql_wrapper(**self._ctl_connection_settings) self._ctl_connection._get_table_information = self.__get_table_information self.__connected_ctl = True @@ -277,8 +276,7 @@ def __connect_to_stream(self): # flags (2) BINLOG_DUMP_NON_BLOCK (0 or 1) # server_id (4) -- server id of this slave # log_file (string.EOF) -- filename of the binlog on the master - self._stream_connection = self.pymysql_wrapper( - **self.__connection_settings) + self._stream_connection = self.pymysql_wrapper(**self.__connection_settings) self.__use_checksum = self.__checksum_enabled() @@ -286,8 +284,7 @@ def __connect_to_stream(self): # we support it if self.__use_checksum: cur = self._stream_connection.cursor() - cur.execute( - "set @master_binlog_checksum= @@global.binlog_checksum") + cur.execute("set @master_binlog_checksum= @@global.binlog_checksum") cur.close() if self.slave_uuid: @@ -301,7 +298,7 @@ def __connect_to_stream(self): 4294967)) # If heartbeat is too low, the connection will disconnect before, # this is also the behavior in mysql - heartbeat = float(min(net_timeout / 2., self.slave_heartbeat)) + heartbeat = float(min(net_timeout/2., self.slave_heartbeat)) if heartbeat > 4294967: heartbeat = 4294967 @@ -453,18 +450,16 @@ def fetchone(self): if not pkt.is_ok_packet(): continue - binlog_event = BinLogPacketWrapper( - pkt, - self.table_map, - self._ctl_connection, - self.__use_checksum, - self.__allowed_events_in_packet, - self.__only_tables, - self.__ignored_tables, - self.__only_schemas, - self.__ignored_schemas, - self.__freeze_schema, - self.__fail_on_table_metadata_unavailable) + binlog_event = BinLogPacketWrapper(pkt, self.table_map, + self._ctl_connection, + self.__use_checksum, + self.__allowed_events_in_packet, + self.__only_tables, + self.__ignored_tables, + self.__only_schemas, + self.__ignored_schemas, + self.__freeze_schema, + self.__fail_on_table_metadata_unavailable) if binlog_event.event_type == ROTATE_EVENT: self.log_pos = binlog_event.event.position @@ -520,8 +515,7 @@ def fetchone(self): # event is none if we have filter it on packet level # we filter also not allowed events - if binlog_event.event is None or ( - binlog_event.event.__class__ not in self.__allowed_events): + if binlog_event.event is None or (binlog_event.event.__class__ not in self.__allowed_events): continue return binlog_event.event @@ -546,7 +540,7 @@ def _allowed_event_list(self, only_events, ignored_events, TableMapEvent, HeartbeatLogEvent, NotImplementedEvent, - )) + )) if ignored_events is not None: for e in ignored_events: events.remove(e) From 565dbdd61a67b083ee3b71d0cea4d6be9fdae029 Mon Sep 17 00:00:00 2001 From: Paul Vickers Date: Tue, 14 Sep 2021 21:14:33 +0100 Subject: [PATCH 3/7] Remove unnecessary if condition --- pymysqlreplication/binlogstream.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/pymysqlreplication/binlogstream.py b/pymysqlreplication/binlogstream.py index f299e84d..4d0c2b53 100644 --- a/pymysqlreplication/binlogstream.py +++ b/pymysqlreplication/binlogstream.py @@ -421,9 +421,6 @@ def __connect_to_stream(self): def fetchone(self): should_continue = True while should_continue: - if past_end_position: - return None - if not self.__connected_stream: self.__connect_to_stream() From 2ff716aaa019c22ce61485cbe9a47080f176b9c9 Mon Sep 17 00:00:00 2001 From: dongwook-chan Date: Sun, 17 Oct 2021 09:11:50 +0900 Subject: [PATCH 4/7] Fix iteration condition for end_pos * Binlog event at end_pos is the last event to be read (end_pos is inclusive) * iter() ends when fetchone() returns None --- pymysqlreplication/event.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/pymysqlreplication/event.py b/pymysqlreplication/event.py index d5ee1060..f5bedc1a 100644 --- a/pymysqlreplication/event.py +++ b/pymysqlreplication/event.py @@ -188,11 +188,6 @@ def _dump(self): print("Execution time: %d" % (self.execution_time)) print("Query: %s" % (self.query)) - - # TODO: check if instance attribute with the same name already exists - # TODO: put all the instace attribute in separate class? called status_vars - # TODO: does length need to be remembered? - # TODO: ref(mysql doc. and mysql-server) for each hunk def _read_status_vars_value_for_key(self, key): """parse status variable VALUE for given KEY From 48fe98c38638e67a4932eeadf9945e2e3471c079 Mon Sep 17 00:00:00 2001 From: dongwook-chan Date: Sun, 17 Oct 2021 09:12:48 +0900 Subject: [PATCH 5/7] Remove unnecessary comments --- pymysqlreplication/binlogstream.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/pymysqlreplication/binlogstream.py b/pymysqlreplication/binlogstream.py index 4d0c2b53..24145a78 100644 --- a/pymysqlreplication/binlogstream.py +++ b/pymysqlreplication/binlogstream.py @@ -419,8 +419,7 @@ def __connect_to_stream(self): self.__connected_stream = True def fetchone(self): - should_continue = True - while should_continue: + while True: if not self.__connected_stream: self.__connect_to_stream() @@ -474,9 +473,9 @@ def fetchone(self): elif binlog_event.log_pos: self.log_pos = binlog_event.log_pos - if self.end_log_pos and self.log_pos >= self.end_log_pos: + if self.end_log_pos and self.log_pos > self.end_log_pos: # We're currently at, or past, the specified end log position. - should_continue = False + return None # This check must not occur before clearing the ``table_map`` as a # result of a RotateEvent. From 904f6f7523e84ca4de892d5a91093495a20f2040 Mon Sep 17 00:00:00 2001 From: dongwook-chan Date: Sun, 17 Oct 2021 09:13:13 +0900 Subject: [PATCH 6/7] Add test for end_pos --- pymysqlreplication/tests/test_abnormal.py | 1 + pymysqlreplication/tests/test_basic.py | 37 +++++++++++++++++++++++ 2 files changed, 38 insertions(+) diff --git a/pymysqlreplication/tests/test_abnormal.py b/pymysqlreplication/tests/test_abnormal.py index 9b0f6818..a3e75d3a 100644 --- a/pymysqlreplication/tests/test_abnormal.py +++ b/pymysqlreplication/tests/test_abnormal.py @@ -72,3 +72,4 @@ def _remove_trailing_rotate_event_from_first_binlog(self): for _ in reader: reader.truncatebinlog() break + diff --git a/pymysqlreplication/tests/test_basic.py b/pymysqlreplication/tests/test_basic.py index 42e3b340..a2ea52fa 100644 --- a/pymysqlreplication/tests/test_basic.py +++ b/pymysqlreplication/tests/test_basic.py @@ -482,6 +482,43 @@ def test_skip_to_timestamp(self): self.assertIsInstance(event, QueryEvent) self.assertEqual(event.query, query2) + def test_end_log_pos(self): + """Test end_log_pos parameter for BinLogStreamReader + + MUST BE TESTED IN DEFAULT SYSTEM VARIABLES SETTING + + Raises: + AssertionError: if null_bitmask isn't set as specified in 'bit_mask' variable + """ + + self.execute('CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, PRIMARY KEY(id))') + self.execute('INSERT INTO test values (NULL)') + self.execute('INSERT INTO test values (NULL)') + self.execute('INSERT INTO test values (NULL)') + self.execute('INSERT INTO test values (NULL)') + self.execute('INSERT INTO test values (NULL)') + self.execute('COMMIT') + #import os + #os._exit(1) + + binlog = self.execute("SHOW BINARY LOGS").fetchone()[0] + + self.stream.close() + self.stream = BinLogStreamReader( + self.database, + server_id=1024, + log_pos=0, + log_file=binlog, + end_log_pos=888) + + last_log_pos = 0 + last_event_type = 0 + for event in self.stream: + last_log_pos = self.stream.log_pos + last_event_type = event.event_type + + self.assertEqual(last_log_pos, 888) + self.assertEqual(last_event_type, TABLE_MAP_EVENT) class TestMultipleRowBinLogStreamReader(base.PyMySQLReplicationTestCase): def ignoredEvents(self): From 090fa733b400bc3f8b9f892f48b17a9cbc4a0a66 Mon Sep 17 00:00:00 2001 From: dongwook-chan Date: Sun, 17 Oct 2021 15:18:51 +0900 Subject: [PATCH 7/7] Fix iteration termination condition As paulvic pointed out in [the comment](https://github.com/dongwook-chan/python-mysql-replication/commit/67d3a71), iteration will be stalled indefinitely if there is no events to read past end_log_pos. `is_past_end_log_pos` attribute will be added to BinLogStreamReader to stop iteration even if it is the last event. --- pymysqlreplication/binlogstream.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/pymysqlreplication/binlogstream.py b/pymysqlreplication/binlogstream.py index 24145a78..f2f29e65 100644 --- a/pymysqlreplication/binlogstream.py +++ b/pymysqlreplication/binlogstream.py @@ -212,6 +212,9 @@ def __init__(self, connection_settings, server_id, self.auto_position = auto_position self.skip_to_timestamp = skip_to_timestamp + if end_log_pos: + self.is_past_end_log_pos = False + if report_slave: self.report_slave = ReportSlave(report_slave) self.slave_uuid = slave_uuid @@ -420,6 +423,9 @@ def __connect_to_stream(self): def fetchone(self): while True: + if self.end_log_pos and self.is_past_end_log_pos: + return None + if not self.__connected_stream: self.__connect_to_stream() @@ -473,9 +479,9 @@ def fetchone(self): elif binlog_event.log_pos: self.log_pos = binlog_event.log_pos - if self.end_log_pos and self.log_pos > self.end_log_pos: + if self.end_log_pos and self.log_pos >= self.end_log_pos: # We're currently at, or past, the specified end log position. - return None + self.is_past_end_log_pos = True # This check must not occur before clearing the ``table_map`` as a # result of a RotateEvent.