Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve KafkaConsumer group rejoin / disable heartbeats during join group #1695

Merged
merged 1 commit into from
Jan 15, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 23 additions & 11 deletions kafka/coordinator/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,18 +331,13 @@ def _handle_join_success(self, member_assignment_bytes):
with self._lock:
log.info("Successfully joined group %s with generation %s",
self.group_id, self._generation.generation_id)
self.join_future = None
self.state = MemberState.STABLE
self.rejoining = False
self._heartbeat_thread.enable()
self._on_join_complete(self._generation.generation_id,
self._generation.member_id,
self._generation.protocol,
member_assignment_bytes)
self.rejoin_needed = False
if self._heartbeat_thread:
self._heartbeat_thread.enable()

def _handle_join_failure(self, _):
with self._lock:
self.join_future = None
self.state = MemberState.UNJOINED

def ensure_active_group(self):
Expand All @@ -351,7 +346,7 @@ def ensure_active_group(self):
if self._heartbeat_thread is None:
self._start_heartbeat_thread()

while self.need_rejoin():
while self.need_rejoin() or self._rejoin_incomplete():
self.ensure_coordinator_ready()

# call on_join_prepare if needed. We set a flag
Expand Down Expand Up @@ -382,6 +377,12 @@ def ensure_active_group(self):
# This ensures that we do not mistakenly attempt to rejoin
# before the pending rebalance has completed.
if self.join_future is None:
# Fence off the heartbeat thread explicitly so that it cannot
# interfere with the join group. Note that this must come after
# the call to _on_join_prepare since we must be able to continue
# sending heartbeats if that callback takes some time.
self._heartbeat_thread.disable()

self.state = MemberState.REBALANCING
future = self._send_join_group_request()

Expand All @@ -402,7 +403,16 @@ def ensure_active_group(self):

self._client.poll(future=future)

if future.failed():
if future.succeeded():
self._on_join_complete(self._generation.generation_id,
self._generation.member_id,
self._generation.protocol,
future.value)
self.join_future = None
self.rejoining = False

else:
self.join_future = None
exception = future.exception
if isinstance(exception, (Errors.UnknownMemberIdError,
Errors.RebalanceInProgressError,
Expand All @@ -412,6 +422,9 @@ def ensure_active_group(self):
raise exception # pylint: disable-msg=raising-bad-type
time.sleep(self.config['retry_backoff_ms'] / 1000)

def _rejoin_incomplete(self):
return self.join_future is not None

def _send_join_group_request(self):
"""Join the group and return the assignment for the next generation.

Expand Down Expand Up @@ -497,7 +510,6 @@ def _handle_join_group_response(self, future, send_time, response):
self._generation = Generation(response.generation_id,
response.member_id,
response.group_protocol)
self.rejoin_needed = False

if response.leader_id == response.member_id:
log.info("Elected group leader -- performing partition"
Expand Down