Skip to content

Commit

Permalink
[improve][broker] Add logging to leader election (#22645)
Browse files Browse the repository at this point in the history
  • Loading branch information
lhotari authored May 3, 2024
1 parent d12f623 commit 7a8c454
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1181,7 +1181,7 @@ protected void startLeaderElectionService() {
new LeaderElectionService(coordinationService, getBrokerId(), getSafeWebServiceAddress(),
state -> {
if (state == LeaderElectionState.Leading) {
LOG.info("This broker was elected leader");
LOG.info("This broker {} was elected leader", getBrokerId());
if (getConfiguration().isLoadBalancerEnabled()) {
long resourceQuotaUpdateInterval = TimeUnit.MINUTES
.toMillis(getConfiguration().getLoadBalancerResourceQuotaUpdateIntervalMinutes());
Expand All @@ -1202,10 +1202,10 @@ protected void startLeaderElectionService() {
if (leaderElectionService != null) {
final Optional<LeaderBroker> currentLeader = leaderElectionService.getCurrentLeader();
if (currentLeader.isPresent()) {
LOG.info("This broker is a follower. Current leader is {}",
LOG.info("This broker {} is a follower. Current leader is {}", getBrokerId(),
currentLeader);
} else {
LOG.info("This broker is a follower. No leader has been elected yet");
LOG.info("This broker {} is a follower. No leader has been elected yet", getBrokerId());
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,19 +129,26 @@ private synchronized CompletableFuture<LeaderElectionState> handleExistingLeader
return FutureUtils.exception(t);
}

if (existingValue.equals(proposedValue.orElse(null))) {
T value = proposedValue.orElse(null);
if (existingValue.equals(value)) {
// If the value is the same as our proposed value, it means this instance was the leader at some
// point before. The existing value can either be for this same session or for a previous one.
if (res.getStat().isCreatedBySelf()) {
// The value is still valid because it was created in the same session
changeState(LeaderElectionState.Leading);
} else {
log.info("Conditionally deleting existing equals value {} for {} because it's not created in the "
+ "current session. stat={}", existingValue, path, res.getStat());
// Since the value was created in a different session, it might be expiring. We need to delete it
// and try the election again.
return store.delete(path, Optional.of(res.getStat().getVersion()))
.thenCompose(__ -> tryToBecomeLeader());
}
} else if (res.getStat().isCreatedBySelf()) {
log.warn("Conditionally deleting existing value {} for {} because it's different from the proposed value "
+ "({}). This is unexpected since it was created within the same session. "
+ "In tests this could happen because of an invalid shared session id when using mocks.",
existingValue, path, value);
// The existing value is different but was created from the same session
return store.delete(path, Optional.of(res.getStat().getVersion()))
.thenCompose(__ -> tryToBecomeLeader());
Expand All @@ -165,9 +172,10 @@ private synchronized void changeState(LeaderElectionState les) {
}

private synchronized CompletableFuture<LeaderElectionState> tryToBecomeLeader() {
T value = proposedValue.get();
byte[] payload;
try {
payload = serde.serialize(path, proposedValue.get());
payload = serde.serialize(path, value);
} catch (Throwable t) {
return FutureUtils.exception(t);
}
Expand All @@ -181,7 +189,7 @@ private synchronized CompletableFuture<LeaderElectionState> tryToBecomeLeader()
cache.get(path)
.thenRun(() -> {
synchronized (LeaderElectionImpl.this) {
log.info("Acquired leadership on {}", path);
log.info("Acquired leadership on {} with {}", path, value);
internalState = InternalState.LeaderIsPresent;
if (leaderElectionState != LeaderElectionState.Leading) {
leaderElectionState = LeaderElectionState.Leading;
Expand All @@ -196,6 +204,8 @@ private synchronized CompletableFuture<LeaderElectionState> tryToBecomeLeader()
}).exceptionally(ex -> {
// We fail to do the get(), so clean up the leader election fail the whole
// operation
log.warn("Failed to get the current state after acquiring leadership on {}. "
+ " Conditionally deleting current entry.", path, ex);
store.delete(path, Optional.of(stat.getVersion()))
.thenRun(() -> result.completeExceptionally(ex))
.exceptionally(ex2 -> {
Expand All @@ -205,6 +215,8 @@ private synchronized CompletableFuture<LeaderElectionState> tryToBecomeLeader()
return null;
});
} else {
log.info("Leadership on {} with value {} was lost. "
+ "Conditionally deleting entry with stat={}.", path, value, stat);
// LeaderElection was closed in between. Release the lock asynchronously
store.delete(path, Optional.of(stat.getVersion()))
.thenRun(() -> result.completeExceptionally(
Expand All @@ -219,7 +231,9 @@ private synchronized CompletableFuture<LeaderElectionState> tryToBecomeLeader()
if (ex.getCause() instanceof BadVersionException) {
// There was a conflict between 2 participants trying to become leaders at same time. Retry
// to fetch info on new leader.

log.info("There was a conflict between 2 participants trying to become leaders at the same "
+ "time on {}. Attempted with value {}. Retrying.",
path, value);
elect()
.thenAccept(lse -> result.complete(lse))
.exceptionally(ex2 -> {
Expand Down

0 comments on commit 7a8c454

Please sign in to comment.