Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add basic support for end_log_pos #1

Merged
merged 1 commit into from
Sep 28, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions pymysqlreplication/binlogstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,8 @@ class BinLogStreamReader(object):
def __init__(self, connection_settings, server_id,
ctl_connection_settings=None, resume_stream=False,
blocking=False, only_events=None, log_file=None,
log_pos=None, filter_non_implemented_events=True,
log_pos=None, end_log_pos=None,
filter_non_implemented_events=True,
ignored_events=None, auto_position=None,
only_tables=None, ignored_tables=None,
only_schemas=None, ignored_schemas=None,
Expand All @@ -152,6 +153,7 @@ def __init__(self, connection_settings, server_id,
log_file: Set replication start log file
log_pos: Set replication start log pos (resume_stream should be
true)
end_log_pos: Set replication end log pos
auto_position: Use master_auto_position gtid to set position
only_tables: An array with the tables you want to watch (only works
in binlog_format ROW)
Expand Down Expand Up @@ -205,6 +207,7 @@ def __init__(self, connection_settings, server_id,
# Store table meta information
self.table_map = {}
self.log_pos = log_pos
self.end_log_pos = end_log_pos
self.log_file = log_file
self.auto_position = auto_position
self.skip_to_timestamp = skip_to_timestamp
Expand Down Expand Up @@ -416,7 +419,8 @@ def __connect_to_stream(self):
self.__connected_stream = True

def fetchone(self):
while True:
should_continue = True
while should_continue:
if not self.__connected_stream:
self.__connect_to_stream()

Expand Down Expand Up @@ -470,6 +474,10 @@ def fetchone(self):
elif binlog_event.log_pos:
self.log_pos = binlog_event.log_pos

if self.end_log_pos and self.log_pos >= self.end_log_pos:
# We're currently at, or past, the specified end log position.
should_continue = False

# This check must not occur before clearing the ``table_map`` as a
# result of a RotateEvent.
#
Expand Down