Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for specifying an end log_pos #357

Merged
merged 8 commits into from
Oct 18, 2021
15 changes: 14 additions & 1 deletion pymysqlreplication/binlogstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,8 @@ class BinLogStreamReader(object):
def __init__(self, connection_settings, server_id,
ctl_connection_settings=None, resume_stream=False,
blocking=False, only_events=None, log_file=None,
log_pos=None, filter_non_implemented_events=True,
log_pos=None, end_log_pos=None,
filter_non_implemented_events=True,
ignored_events=None, auto_position=None,
only_tables=None, ignored_tables=None,
only_schemas=None, ignored_schemas=None,
Expand All @@ -152,6 +153,7 @@ def __init__(self, connection_settings, server_id,
log_file: Set replication start log file
log_pos: Set replication start log pos (resume_stream should be
true)
end_log_pos: Set replication end log pos
auto_position: Use master_auto_position gtid to set position
only_tables: An array with the tables you want to watch (only works
in binlog_format ROW)
Expand Down Expand Up @@ -205,10 +207,14 @@ def __init__(self, connection_settings, server_id,
# Store table meta information
self.table_map = {}
self.log_pos = log_pos
self.end_log_pos = end_log_pos
self.log_file = log_file
self.auto_position = auto_position
self.skip_to_timestamp = skip_to_timestamp

if end_log_pos:
self.is_past_end_log_pos = False

if report_slave:
self.report_slave = ReportSlave(report_slave)
self.slave_uuid = slave_uuid
Expand Down Expand Up @@ -417,6 +423,9 @@ def __connect_to_stream(self):

def fetchone(self):
while True:
if self.end_log_pos and self.is_past_end_log_pos:
return None

if not self.__connected_stream:
self.__connect_to_stream()

Expand Down Expand Up @@ -470,6 +479,10 @@ def fetchone(self):
elif binlog_event.log_pos:
self.log_pos = binlog_event.log_pos

if self.end_log_pos and self.log_pos >= self.end_log_pos:
# We're currently at, or past, the specified end log position.
self.is_past_end_log_pos = True

# This check must not occur before clearing the ``table_map`` as a
# result of a RotateEvent.
#
Expand Down
5 changes: 0 additions & 5 deletions pymysqlreplication/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,11 +188,6 @@ def _dump(self):
print("Execution time: %d" % (self.execution_time))
print("Query: %s" % (self.query))


# TODO: check if instance attribute with the same name already exists
# TODO: put all the instace attribute in separate class? called status_vars
# TODO: does length need to be remembered?
# TODO: ref(mysql doc. and mysql-server) for each hunk
def _read_status_vars_value_for_key(self, key):
"""parse status variable VALUE for given KEY

Expand Down
1 change: 1 addition & 0 deletions pymysqlreplication/tests/test_abnormal.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,4 @@ def _remove_trailing_rotate_event_from_first_binlog(self):
for _ in reader:
reader.truncatebinlog()
break

37 changes: 37 additions & 0 deletions pymysqlreplication/tests/test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,43 @@ def test_skip_to_timestamp(self):
self.assertIsInstance(event, QueryEvent)
self.assertEqual(event.query, query2)

def test_end_log_pos(self):
"""Test end_log_pos parameter for BinLogStreamReader

MUST BE TESTED IN DEFAULT SYSTEM VARIABLES SETTING

Raises:
AssertionError: if null_bitmask isn't set as specified in 'bit_mask' variable
"""

self.execute('CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, PRIMARY KEY(id))')
self.execute('INSERT INTO test values (NULL)')
self.execute('INSERT INTO test values (NULL)')
self.execute('INSERT INTO test values (NULL)')
self.execute('INSERT INTO test values (NULL)')
self.execute('INSERT INTO test values (NULL)')
self.execute('COMMIT')
#import os
#os._exit(1)

binlog = self.execute("SHOW BINARY LOGS").fetchone()[0]

self.stream.close()
self.stream = BinLogStreamReader(
self.database,
server_id=1024,
log_pos=0,
log_file=binlog,
end_log_pos=888)

last_log_pos = 0
last_event_type = 0
for event in self.stream:
last_log_pos = self.stream.log_pos
last_event_type = event.event_type

self.assertEqual(last_log_pos, 888)
self.assertEqual(last_event_type, TABLE_MAP_EVENT)

class TestMultipleRowBinLogStreamReader(base.PyMySQLReplicationTestCase):
def ignoredEvents(self):
Expand Down