Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINOR: Cleanups in raft module #17877

Merged
merged 1 commit into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions raft/src/main/java/org/apache/kafka/raft/ElectionState.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,11 @@ public boolean isLeader(int nodeId) {
public boolean isVotedCandidate(ReplicaKey nodeKey) {
if (nodeKey.id() < 0) {
throw new IllegalArgumentException("Invalid node key " + nodeKey);
} else if (!votedKey.isPresent()) {
} else if (votedKey.isEmpty()) {
return false;
} else if (votedKey.get().id() != nodeKey.id()) {
return false;
} else if (!votedKey.get().directoryId().isPresent()) {
} else if (votedKey.get().directoryId().isEmpty()) {
// when the persisted voted directory id is not present assume that we voted for this candidate;
// this happens when the kraft version is 0.
return true;
Expand All @@ -87,7 +87,7 @@ public boolean isVotedCandidate(ReplicaKey nodeKey) {
}

public int leaderId() {
if (!leaderId.isPresent())
if (leaderId.isEmpty())
throw new IllegalStateException("Attempt to access nil leaderId");
return leaderId.getAsInt();
}
Expand All @@ -101,7 +101,7 @@ public OptionalInt optionalLeaderId() {
}

public ReplicaKey votedKey() {
if (!votedKey.isPresent()) {
if (votedKey.isEmpty()) {
throw new IllegalStateException("Attempt to access nil votedId");
}

Expand Down
4 changes: 2 additions & 2 deletions raft/src/main/java/org/apache/kafka/raft/FollowerState.java
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public void overrideFetchTimeout(long currentTimeMs, long timeoutMs) {
private long updateVoterPeriodMs() {
// Allow for a few rounds of fetch request before attempting to update
// the voter state
return fetchTimeoutMs * 3;
return fetchTimeoutMs * 3L;
}

public boolean hasUpdateVoterPeriodExpired(long currentTimeMs) {
Expand All @@ -150,7 +150,7 @@ public void resetUpdateVoterPeriod(long currentTimeMs) {
}

public boolean updateHighWatermark(OptionalLong newHighWatermark) {
if (!newHighWatermark.isPresent() && highWatermark.isPresent()) {
if (newHighWatermark.isEmpty() && highWatermark.isPresent()) {
throw new IllegalArgumentException(
String.format("Attempt to overwrite current high watermark %s with unknown value", highWatermark)
);
Expand Down
28 changes: 14 additions & 14 deletions raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -1210,7 +1210,7 @@ private long endEpochElectionBackoff(Collection<ReplicaKey> preferredCandidates)
int position = 0;
for (ReplicaKey candidate : preferredCandidates) {
if (candidate.id() == quorum.localIdOrThrow()) {
if (!candidate.directoryId().isPresent() ||
if (candidate.directoryId().isEmpty() ||
candidate.directoryId().get().equals(quorum.localDirectoryId())
) {
// Found ourselves in the preferred candidate list
Expand Down Expand Up @@ -1788,7 +1788,7 @@ private FetchSnapshotResponseData handleFetchSnapshotRequest(

Optional<FetchSnapshotRequestData.PartitionSnapshot> partitionSnapshotOpt = FetchSnapshotRequest
.forTopicPartition(data, log.topicPartition());
if (!partitionSnapshotOpt.isPresent()) {
if (partitionSnapshotOpt.isEmpty()) {
// The Raft client assumes that there is only one topic partition.
TopicPartition unknownTopicPartition = new TopicPartition(
data.topics().get(0).name(),
Expand Down Expand Up @@ -1828,7 +1828,7 @@ private FetchSnapshotResponseData handleFetchSnapshotRequest(
);

Optional<RawSnapshotReader> snapshotOpt = log.readSnapshot(snapshotId);
if (!snapshotOpt.isPresent() || snapshotId.equals(BOOTSTRAP_SNAPSHOT_ID)) {
if (snapshotOpt.isEmpty() || snapshotId.equals(BOOTSTRAP_SNAPSHOT_ID)) {
// The bootstrap checkpoint should not be replicated. The first leader will
// make sure that the content of the bootstrap checkpoint is included in the
// partition log
Expand Down Expand Up @@ -1944,7 +1944,7 @@ private boolean handleFetchSnapshotResponse(

Optional<FetchSnapshotResponseData.PartitionSnapshot> partitionSnapshotOpt = FetchSnapshotResponse
.forTopicPartition(data, log.topicPartition());
if (!partitionSnapshotOpt.isPresent()) {
if (partitionSnapshotOpt.isEmpty()) {
return false;
}

Expand Down Expand Up @@ -2098,7 +2098,7 @@ private CompletableFuture<AddRaftVoterResponseData> handleAddVoterRequest(
}

Optional<ReplicaKey> newVoter = RaftUtil.addVoterRequestVoterKey(data);
if (!newVoter.isPresent() || !newVoter.get().directoryId().isPresent()) {
if (newVoter.isEmpty() || newVoter.get().directoryId().isEmpty()) {
return completedFuture(
new AddRaftVoterResponseData()
.setErrorCode(Errors.INVALID_REQUEST.code())
Expand All @@ -2107,7 +2107,7 @@ private CompletableFuture<AddRaftVoterResponseData> handleAddVoterRequest(
}

Endpoints newVoterEndpoints = Endpoints.fromAddVoterRequest(data.listeners());
if (!newVoterEndpoints.address(channel.listenerName()).isPresent()) {
if (newVoterEndpoints.address(channel.listenerName()).isEmpty()) {
return completedFuture(
new AddRaftVoterResponseData()
.setErrorCode(Errors.INVALID_REQUEST.code())
Expand Down Expand Up @@ -2181,7 +2181,7 @@ private CompletableFuture<RemoveRaftVoterResponseData> handleRemoveVoterRequest(
}

Optional<ReplicaKey> oldVoter = RaftUtil.removeVoterRequestVoterKey(data);
if (!oldVoter.isPresent() || !oldVoter.get().directoryId().isPresent()) {
if (oldVoter.isEmpty() || oldVoter.get().directoryId().isEmpty()) {
return completedFuture(
new RemoveRaftVoterResponseData()
.setErrorCode(Errors.INVALID_REQUEST.code())
Expand Down Expand Up @@ -2226,7 +2226,7 @@ private CompletableFuture<UpdateRaftVoterResponseData> handleUpdateVoterRequest(
}

Optional<ReplicaKey> voter = RaftUtil.updateVoterRequestVoterKey(data);
if (!voter.isPresent() || !voter.get().directoryId().isPresent()) {
if (voter.isEmpty() || voter.get().directoryId().isEmpty()) {
return completedFuture(
RaftUtil.updateVoterResponse(
Errors.INVALID_REQUEST,
Expand All @@ -2238,7 +2238,7 @@ private CompletableFuture<UpdateRaftVoterResponseData> handleUpdateVoterRequest(
}

Endpoints voterEndpoints = Endpoints.fromUpdateVoterRequest(data.listeners());
if (!voterEndpoints.address(channel.listenerName()).isPresent()) {
if (voterEndpoints.address(channel.listenerName()).isEmpty()) {
return completedFuture(
RaftUtil.updateVoterResponse(
Errors.INVALID_REQUEST,
Expand Down Expand Up @@ -2319,8 +2319,8 @@ private boolean hasConsistentLeader(int epoch, OptionalInt leaderId) {
return quorum.isLeader();
} else {
return epoch != quorum.epoch()
|| !leaderId.isPresent()
|| !quorum.leaderId().isPresent()
|| leaderId.isEmpty()
|| quorum.leaderId().isEmpty()
|| leaderId.equals(quorum.leaderId());
}
}
Expand Down Expand Up @@ -2516,7 +2516,7 @@ private boolean isValidVoterKey(Optional<ReplicaKey> voterKey) {
return voterKey
.map(key -> {
if (!OptionalInt.of(key.id()).equals(nodeId)) return false;
if (!key.directoryId().isPresent()) return true;
if (key.directoryId().isEmpty()) return true;

return key.directoryId().get().equals(nodeDirectoryId);
})
Expand Down Expand Up @@ -3399,7 +3399,7 @@ public void resign(int epoch) {
// Note that if we transition to another state before we have a chance to
// request resignation, then we consider the call fulfilled.
Optional<LeaderState<Object>> leaderStateOpt = quorum.maybeLeaderState();
if (!leaderStateOpt.isPresent()) {
if (leaderStateOpt.isEmpty()) {
logger.debug("Ignoring call to resign from epoch {} since this node is " +
"no longer the leader", epoch);
return;
Expand Down Expand Up @@ -3702,7 +3702,7 @@ private boolean shouldFireLeaderChange(LeaderAndEpoch leaderAndEpoch) {
return true;
} else {
return leaderAndEpoch.leaderId().isPresent() &&
!lastFiredLeaderChange.leaderId().isPresent();
lastFiredLeaderChange.leaderId().isEmpty();
}
}

Expand Down
7 changes: 3 additions & 4 deletions raft/src/main/java/org/apache/kafka/raft/LeaderState.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.slf4j.Logger;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -112,7 +111,7 @@ protected LeaderState(
new ReplicaState(voterNode.voterKey(), hasAcknowledgedLeader, voterNode.listeners())
);
}
this.grantingVoters = Collections.unmodifiableSet(new HashSet<>(grantingVoters));
this.grantingVoters = Set.copyOf(grantingVoters);
this.log = logContext.logger(LeaderState.class);
this.accumulator = Objects.requireNonNull(accumulator, "accumulator must be non-null");
// use the 1.5x of fetch timeout to tolerate some network transition time or other IO time.
Expand Down Expand Up @@ -809,9 +808,9 @@ void updateFollowerState(
public int compareTo(ReplicaState that) {
if (this.endOffset.equals(that.endOffset))
return this.replicaKey.compareTo(that.replicaKey);
else if (!this.endOffset.isPresent())
else if (this.endOffset.isEmpty())
return 1;
else if (!that.endOffset.isPresent())
else if (that.endOffset.isEmpty())
return -1;
else
return Long.compare(that.endOffset.get().offset(), this.endOffset.get().offset());
Expand Down
8 changes: 4 additions & 4 deletions raft/src/main/java/org/apache/kafka/raft/QuorumState.java
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ public void initialize(OffsetAndEpoch logEndOffsetAndEpoch) throws IllegalStateE
ElectionState election = readElectionState();

final EpochState initialState;
if (election.hasVoted() && !localId.isPresent()) {
if (election.hasVoted() && localId.isEmpty()) {
throw new IllegalStateException(
String.format(
"Initialized quorum state (%s) with a voted candidate but without a local id",
Expand Down Expand Up @@ -332,7 +332,7 @@ public Endpoints leaderEndpoints() {
}

public boolean isVoter() {
if (!localId.isPresent()) {
if (localId.isEmpty()) {
return false;
}

Expand Down Expand Up @@ -425,7 +425,7 @@ public void transitionToUnattachedVotedState(
epoch
)
);
} else if (!localId.isPresent()) {
} else if (localId.isEmpty()) {
throw new IllegalStateException("Cannot transition to voted without a replica id");
} else if (epoch < currentEpoch) {
throw new IllegalStateException(
Expand Down Expand Up @@ -707,7 +707,7 @@ public boolean isUnattached() {
}

public boolean isUnattachedNotVoted() {
return maybeUnattachedState().filter(unattached -> !unattached.votedKey().isPresent()).isPresent();
return maybeUnattachedState().filter(unattached -> unattached.votedKey().isEmpty()).isPresent();
}

public boolean isUnattachedAndVoted() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public boolean canGrantVote(ReplicaKey candidateKey, boolean isLogUpToDate) {
if (votedKey.isPresent()) {
ReplicaKey votedReplicaKey = votedKey.get();
if (votedReplicaKey.id() == candidateKey.id()) {
return !votedReplicaKey.directoryId().isPresent() || votedReplicaKey.directoryId().equals(candidateKey.directoryId());
return votedReplicaKey.directoryId().isEmpty() || votedReplicaKey.directoryId().equals(candidateKey.directoryId());
}
log.debug(
"Rejecting vote request from candidate ({}), already have voted for another " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public CompletableFuture<AddRaftVoterResponseData> handleAddVoterRequest(

// Check that the leader has established a HWM and committed the current epoch
Optional<Long> highWatermark = leaderState.highWatermark().map(LogOffsetMetadata::offset);
if (!highWatermark.isPresent()) {
if (highWatermark.isEmpty()) {
return CompletableFuture.completedFuture(
RaftUtil.addVoterResponse(
Errors.REQUEST_TIMED_OUT,
Expand All @@ -127,7 +127,7 @@ public CompletableFuture<AddRaftVoterResponseData> handleAddVoterRequest(

// Check that there are no uncommitted VotersRecord
Optional<LogHistory.Entry<VoterSet>> votersEntry = partitionState.lastVoterSetEntry();
if (!votersEntry.isPresent() || votersEntry.get().offset() >= highWatermark.get()) {
if (votersEntry.isEmpty() || votersEntry.get().offset() >= highWatermark.get()) {
return CompletableFuture.completedFuture(
RaftUtil.addVoterResponse(
Errors.REQUEST_TIMED_OUT,
Expand Down Expand Up @@ -172,7 +172,7 @@ public CompletableFuture<AddRaftVoterResponseData> handleAddVoterRequest(
this::buildApiVersionsRequest,
currentTimeMs
);
if (!timeout.isPresent()) {
if (timeout.isEmpty()) {
return CompletableFuture.completedFuture(
RaftUtil.addVoterResponse(
Errors.REQUEST_TIMED_OUT,
Expand Down Expand Up @@ -203,7 +203,7 @@ public boolean handleApiVersionsResponse(
long currentTimeMs
) {
Optional<AddVoterHandlerState> handlerState = leaderState.addVoterHandlerState();
if (!handlerState.isPresent()) {
if (handlerState.isEmpty()) {
// There are no pending add operation just ignore the api response
return true;
}
Expand Down Expand Up @@ -242,7 +242,7 @@ public boolean handleApiVersionsResponse(
return false;
}

// Check that the new voter supports the kraft.verion for reconfiguration
// Check that the new voter supports the kraft.version for reconfiguration
KRaftVersion kraftVersion = partitionState.lastKraftVersion();
if (!validVersionRange(kraftVersion, supportedKraftVersions)) {
logger.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public long timeUntilOperationExpiration(long currentTimeMs) {
}

public boolean expectingApiResponse(int replicaId) {
return !lastOffset.isPresent() && replicaId == voterKey.id();
return lastOffset.isEmpty() && replicaId == voterKey.id();
}

public void setLastOffset(long lastOffset) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ private RecordsBatchReader(
public boolean hasNext() {
ensureOpen();

if (!nextBatch.isPresent()) {
if (nextBatch.isEmpty()) {
nextBatch = nextBatch();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public RecordsIterator(
public boolean hasNext() {
ensureOpen();

if (!nextBatch.isPresent()) {
if (nextBatch.isEmpty()) {
nextBatch = nextBatch();
}

Expand Down Expand Up @@ -334,7 +334,7 @@ private T decodeDataRecord(Optional<ByteBuffer> key, Optional<ByteBuffer> value)
throw new IllegalArgumentException("Got key in the record when no key was expected");
}

if (!value.isPresent()) {
if (value.isEmpty()) {
throw new IllegalArgumentException("Missing value in the record when a value was expected");
} else if (value.get().remaining() == 0) {
throw new IllegalArgumentException("Got an unexpected empty value in the record");
Expand All @@ -346,13 +346,13 @@ private T decodeDataRecord(Optional<ByteBuffer> key, Optional<ByteBuffer> value)
}

private static ControlRecord decodeControlRecord(Optional<ByteBuffer> key, Optional<ByteBuffer> value) {
if (!key.isPresent()) {
if (key.isEmpty()) {
throw new IllegalArgumentException("Missing key in the record when a key was expected");
} else if (key.get().remaining() == 0) {
throw new IllegalArgumentException("Got an unexpected empty key in the record");
}

if (!value.isPresent()) {
if (value.isEmpty()) {
throw new IllegalArgumentException("Missing value in the record when a value was expected");
} else if (value.get().remaining() == 0) {
throw new IllegalArgumentException("Got an unexpected empty value in the record");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public CompletableFuture<RemoveRaftVoterResponseData> handleRemoveVoterRequest(

// Check that the leader has established a HWM and committed the current epoch
Optional<Long> highWatermark = leaderState.highWatermark().map(LogOffsetMetadata::offset);
if (!highWatermark.isPresent()) {
if (highWatermark.isEmpty()) {
return CompletableFuture.completedFuture(
RaftUtil.removeVoterResponse(
Errors.REQUEST_TIMED_OUT,
Expand All @@ -117,7 +117,7 @@ public CompletableFuture<RemoveRaftVoterResponseData> handleRemoveVoterRequest(

// Check that there are no uncommitted VotersRecord
Optional<LogHistory.Entry<VoterSet>> votersEntry = partitionState.lastVoterSetEntry();
if (!votersEntry.isPresent() || votersEntry.get().offset() >= highWatermark.get()) {
if (votersEntry.isEmpty() || votersEntry.get().offset() >= highWatermark.get()) {
return CompletableFuture.completedFuture(
RaftUtil.removeVoterResponse(
Errors.REQUEST_TIMED_OUT,
Expand All @@ -132,7 +132,7 @@ public CompletableFuture<RemoveRaftVoterResponseData> handleRemoveVoterRequest(

// Remove the voter from the set of voters
Optional<VoterSet> newVoters = votersEntry.get().value().removeVoter(voterKey);
if (!newVoters.isPresent()) {
if (newVoters.isEmpty()) {
return CompletableFuture.completedFuture(
RaftUtil.removeVoterResponse(
Errors.VOTER_NOT_FOUND,
Expand Down
Loading