Skip to content

Commit

Permalink
MINOR: Various cleanups in clients (apache#17895)
Browse files Browse the repository at this point in the history
Reviewers: Chia-Ping Tsai <[email protected]>
  • Loading branch information
mimaison authored and chiacyu committed Nov 30, 2024
1 parent 7310372 commit 0a57e85
Show file tree
Hide file tree
Showing 94 changed files with 199 additions and 268 deletions.
6 changes: 3 additions & 3 deletions clients/src/main/java/org/apache/kafka/clients/Metadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ public Map<String, Uuid> topicIds() {

public synchronized LeaderAndEpoch currentLeader(TopicPartition topicPartition) {
Optional<MetadataResponse.PartitionMetadata> maybeMetadata = partitionMetadataIfCurrent(topicPartition);
if (!maybeMetadata.isPresent())
if (maybeMetadata.isEmpty())
return new LeaderAndEpoch(Optional.empty(), Optional.ofNullable(lastSeenLeaderEpochs.get(topicPartition)));

MetadataResponse.PartitionMetadata partitionMetadata = maybeMetadata.get();
Expand Down Expand Up @@ -392,7 +392,7 @@ public synchronized Set<TopicPartition> updatePartitionLeadership(Map<TopicParti
TopicPartition partition = partitionLeader.getKey();
Metadata.LeaderAndEpoch currentLeader = currentLeader(partition);
Metadata.LeaderIdAndEpoch newLeader = partitionLeader.getValue();
if (!newLeader.epoch.isPresent() || !newLeader.leaderId.isPresent()) {
if (newLeader.epoch.isEmpty() || newLeader.leaderId.isEmpty()) {
log.debug("For {}, incoming leader information is incomplete {}", partition, newLeader);
continue;
}
Expand All @@ -404,7 +404,7 @@ public synchronized Set<TopicPartition> updatePartitionLeadership(Map<TopicParti
log.debug("For {}, incoming leader({}), the corresponding node information for node-id {} is missing, so ignoring.", partition, newLeader, newLeader.leaderId.get());
continue;
}
if (!this.metadataSnapshot.partitionMetadata(partition).isPresent()) {
if (this.metadataSnapshot.partitionMetadata(partition).isEmpty()) {
log.debug("For {}, incoming leader({}), partition metadata is no longer cached, ignoring.", partition, newLeader);
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public Cluster cluster() {
*/
public OptionalInt leaderEpochFor(TopicPartition tp) {
PartitionMetadata partitionMetadata = metadataByPartition.get(tp);
if (partitionMetadata == null || !partitionMetadata.leaderEpoch.isPresent()) {
if (partitionMetadata == null || partitionMetadata.leaderEpoch.isEmpty()) {
return OptionalInt.empty();
} else {
return OptionalInt.of(partitionMetadata.leaderEpoch.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
import org.apache.kafka.common.Node;
import org.apache.kafka.common.acl.AclOperation;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -84,8 +84,7 @@ public ConsumerGroupDescription(String groupId,
Set<AclOperation> authorizedOperations) {
this.groupId = groupId == null ? "" : groupId;
this.isSimpleConsumerGroup = isSimpleConsumerGroup;
this.members = members == null ? Collections.emptyList() :
Collections.unmodifiableList(new ArrayList<>(members));
this.members = members == null ? Collections.emptyList() : List.copyOf(members);
this.partitionAssignor = partitionAssignor == null ? "" : partitionAssignor;
this.type = type;
this.groupState = GroupState.parse(state.name());
Expand Down Expand Up @@ -122,8 +121,7 @@ public ConsumerGroupDescription(String groupId,
Set<AclOperation> authorizedOperations) {
this.groupId = groupId == null ? "" : groupId;
this.isSimpleConsumerGroup = isSimpleConsumerGroup;
this.members = members == null ? Collections.emptyList() :
Collections.unmodifiableList(new ArrayList<>(members));
this.members = members == null ? Collections.emptyList() : List.copyOf(members);
this.partitionAssignor = partitionAssignor == null ? "" : partitionAssignor;
this.type = type;
this.groupState = groupState;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public class DeleteConsumerGroupsResult {
*/
public Map<String, KafkaFuture<Void>> deletedGroups() {
Map<String, KafkaFuture<Void>> deletedGroups = new HashMap<>(futures.size());
futures.forEach((key, future) -> deletedGroups.put(key, future));
deletedGroups.putAll(futures);
return deletedGroups;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public KafkaFuture<UserScramCredentialsDescription> description(String userName)
// for users 1, 2, and 3 but this is looking for user 4), so explicitly take care of that case
Optional<DescribeUserScramCredentialsResponseData.DescribeUserScramCredentialsResult> optionalUserResult =
data.results().stream().filter(result -> result.user().equals(userName)).findFirst();
if (!optionalUserResult.isPresent()) {
if (optionalUserResult.isEmpty()) {
retval.completeExceptionally(new ResourceNotFoundException("No such user: " + userName));
} else {
DescribeUserScramCredentialsResponseData.DescribeUserScramCredentialsResult userResult = optionalUserResult.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.apache.kafka.common.TopicPartition;

import java.util.Collections;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
Expand All @@ -36,8 +35,7 @@ public class MemberAssignment {
* @param topicPartitions List of topic partitions
*/
public MemberAssignment(Set<TopicPartition> topicPartitions) {
this.topicPartitions = topicPartitions == null ? Collections.emptySet() :
Collections.unmodifiableSet(new HashSet<>(topicPartitions));
this.topicPartitions = topicPartitions == null ? Collections.emptySet() : Set.copyOf(topicPartitions);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.kafka.clients.admin;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;

Expand All @@ -34,7 +32,7 @@ public class NewPartitionReassignment {
public NewPartitionReassignment(List<Integer> targetReplicas) {
if (targetReplicas == null || targetReplicas.isEmpty())
throw new IllegalArgumentException("Cannot create a new partition reassignment without any replicas");
this.targetReplicas = Collections.unmodifiableList(new ArrayList<>(targetReplicas));
this.targetReplicas = List.copyOf(targetReplicas);
}

public List<Integer> targetReplicas() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.annotation.InterfaceStability;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -53,8 +53,7 @@ public ShareGroupDescription(String groupId,
Node coordinator,
Set<AclOperation> authorizedOperations) {
this.groupId = groupId == null ? "" : groupId;
this.members = members == null ? Collections.emptyList() :
Collections.unmodifiableList(new ArrayList<>(members));
this.members = members == null ? Collections.emptyList() : List.copyOf(members);
this.groupState = groupState;
this.coordinator = coordinator;
this.authorizedOperations = authorizedOperations;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.kafka.clients.admin;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;

Expand Down Expand Up @@ -61,7 +59,7 @@ public String toString() {
*/
public UserScramCredentialsDescription(String name, List<ScramCredentialInfo> credentialInfos) {
this.name = Objects.requireNonNull(name);
this.credentialInfos = Collections.unmodifiableList(new ArrayList<>(credentialInfos));
this.credentialInfos = List.copyOf(credentialInfos);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ private <T extends ApiRequestScope> void collectRequests(
}

// Copy the keys to avoid exposing the underlying mutable set
Set<K> copyKeys = Collections.unmodifiableSet(new HashSet<>(keys));
Set<K> copyKeys = Set.copyOf(keys);

Collection<AdminApiHandler.RequestAndKeys<K>> newRequests = buildRequest.apply(copyKeys, scope);
if (newRequests.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ public boolean needsRebootstrap(long now, long rebootstrapTriggerMs) {
public void transitionToUpdatePending(long now) {
this.state = State.UPDATE_PENDING;
this.lastMetadataFetchAttemptMs = now;
if (!metadataAttemptStartMs.isPresent())
if (metadataAttemptStartMs.isEmpty())
metadataAttemptStartMs = Optional.of(now);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ public KafkaFutureImpl<Map<Integer, KafkaFutureImpl<V>>> all() {
}

private KafkaFutureImpl<V> futureOrThrow(BrokerKey key) {
if (!key.brokerId.isPresent()) {
if (key.brokerId.isEmpty()) {
throw new IllegalArgumentException("Attempt to complete with invalid key: " + key);
} else {
int brokerId = key.brokerId.getAsInt();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ private AllBrokersStrategy.BrokerKey requireSingleton(
}

AllBrokersStrategy.BrokerKey key = keys.iterator().next();
if (!key.brokerId.isPresent() || key.brokerId.getAsInt() != brokerId) {
if (key.brokerId.isEmpty() || key.brokerId.getAsInt() != brokerId) {
throw new IllegalArgumentException("Unexpected broker key: " + key);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,7 @@ public class ConsumerConfig extends AbstractConfig {
// a list contains all the assignor names that only assign subscribed topics to consumer. Should be updated when new assignor added.
// This is to help optimize ConsumerCoordinator#performAssignment method
public static final List<String> ASSIGN_FROM_SUBSCRIBED_ASSIGNORS =
Collections.unmodifiableList(Arrays.asList(
RANGE_ASSIGNOR_NAME,
ROUNDROBIN_ASSIGNOR_NAME,
STICKY_ASSIGNOR_NAME,
COOPERATIVE_STICKY_ASSIGNOR_NAME
));
List.of(RANGE_ASSIGNOR_NAME, ROUNDROBIN_ASSIGNOR_NAME, STICKY_ASSIGNOR_NAME, COOPERATIVE_STICKY_ASSIGNOR_NAME);

/*
* NOTE: DO NOT CHANGE EITHER CONFIG STRINGS OR THEIR JAVA VARIABLE NAMES AS
Expand Down Expand Up @@ -709,7 +704,7 @@ private void maybeOverrideEnableAutoCommit(Map<String, Object> configs) {
Optional<String> groupId = Optional.ofNullable(getString(CommonClientConfigs.GROUP_ID_CONFIG));
Map<String, Object> originals = originals();
boolean enableAutoCommit = originals.containsKey(ENABLE_AUTO_COMMIT_CONFIG) ? getBoolean(ENABLE_AUTO_COMMIT_CONFIG) : false;
if (!groupId.isPresent()) { // overwrite in case of default group id where the config is not explicitly provided
if (groupId.isEmpty()) { // overwrite in case of default group id where the config is not explicitly provided
if (!originals.containsKey(ENABLE_AUTO_COMMIT_CONFIG)) {
configs.put(ENABLE_AUTO_COMMIT_CONFIG, false);
} else if (enableAutoCommit) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public ConcatenatedIterable(Iterable<? extends Iterable<ConsumerRecord<K, V>>> i

@Override
public Iterator<ConsumerRecord<K, V>> iterator() {
return new AbstractIterator<ConsumerRecord<K, V>>() {
return new AbstractIterator<>() {
final Iterator<? extends Iterable<ConsumerRecord<K, V>>> iters = iterables.iterator();
Iterator<ConsumerRecord<K, V>> current;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,7 @@ public synchronized void scheduleNopPollTask() {
}

public synchronized Set<TopicPartition> paused() {
return Collections.unmodifiableSet(new HashSet<>(paused));
return Set.copyOf(paused);
}

private void ensureNotClosed() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;

/**
Expand All @@ -40,7 +39,7 @@ public NoOffsetForPartitionException(TopicPartition partition) {

public NoOffsetForPartitionException(Collection<TopicPartition> partitions) {
super("Undefined offset with no reset policy for partitions: " + partitions);
this.partitions = Collections.unmodifiableSet(new HashSet<>(partitions));
this.partitions = Set.copyOf(partitions);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ public TopicAssignmentState(String topic, List<PartitionInfo> partitionInfos, Li
boolean racksMatch(String consumer, TopicPartition tp) {
Optional<String> consumerRack = consumers.get(consumer);
Set<String> replicaRacks = partitionRacks.get(tp);
return !consumerRack.isPresent() || (replicaRacks != null && replicaRacks.contains(consumerRack.get()));
return consumerRack.isEmpty() || (replicaRacks != null && replicaRacks.contains(consumerRack.get()));
}

int maxAssignable(String consumer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,7 @@ private synchronized RequestFuture<ByteBuffer> initiateJoinGroup() {
if (lastRebalanceStartMs == -1L)
lastRebalanceStartMs = time.milliseconds();
joinFuture = sendJoinGroupRequest();
joinFuture.addListener(new RequestFutureListener<ByteBuffer>() {
joinFuture.addListener(new RequestFutureListener<>() {
@Override
public void onSuccess(ByteBuffer value) {
// do nothing since all the handler logic are in SyncGroupResponseHandler already
Expand Down Expand Up @@ -1188,7 +1188,7 @@ public synchronized RequestFuture<Void> maybeLeaveGroup(String leaveReason) {
}

protected boolean isDynamicMember() {
return !rebalanceConfig.groupInstanceId.isPresent();
return rebalanceConfig.groupInstanceId.isEmpty();
}

private class LeaveGroupResponseHandler extends CoordinatorResponseHandler<LeaveGroupResponse, Void> {
Expand Down Expand Up @@ -1528,7 +1528,7 @@ public void run() {
} else {
heartbeat.sentHeartbeat(now);
final RequestFuture<Void> heartbeatFuture = sendHeartbeatRequest();
heartbeatFuture.addListener(new RequestFutureListener<Void>() {
heartbeatFuture.addListener(new RequestFutureListener<>() {
@Override
public void onSuccess(Void value) {
synchronized (AbstractCoordinator.this) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ protected Map<Node, FetchSessionHandler.FetchRequestData> prepareFetchRequests()

Optional<Node> leaderOpt = position.currentLeader.leader;

if (!leaderOpt.isPresent()) {
if (leaderOpt.isEmpty()) {
log.debug("Requesting metadata update for partition {} since the position {} is missing the current leader node", partition, position);
metadata.requestUpdate(false);
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -892,7 +892,7 @@ private List<TopicPartition> getUnassignedPartitions(List<TopicPartition> sorted

List<TopicPartition> unassignedPartitions = new ArrayList<>(totalPartitionsCount - sortedAssignedPartitions.size());

Collections.sort(sortedAssignedPartitions, Comparator.comparing(TopicPartition::topic).thenComparing(TopicPartition::partition));
sortedAssignedPartitions.sort(Comparator.comparing(TopicPartition::topic).thenComparing(TopicPartition::partition));

boolean shouldAddDirectly = false;
Iterator<TopicPartition> sortedAssignedPartitionsIter = sortedAssignedPartitions.iterator();
Expand Down Expand Up @@ -991,7 +991,7 @@ private class GeneralAssignmentBuilder extends AbstractAssignmentBuilder {
currentPartitionConsumer.put(topicPartition, entry.getKey());

List<String> sortedAllTopics = new ArrayList<>(topic2AllPotentialConsumers.keySet());
Collections.sort(sortedAllTopics, new TopicComparator(topic2AllPotentialConsumers));
sortedAllTopics.sort(new TopicComparator(topic2AllPotentialConsumers));
sortedAllPartitions = getAllTopicPartitions(sortedAllTopics);

sortedCurrentSubscriptions = new TreeSet<>(new SubscriptionComparator(currentAssignment));
Expand Down Expand Up @@ -1084,7 +1084,7 @@ private List<TopicPartition> getUnassignedPartitions(List<TopicPartition> sorted

List<TopicPartition> unassignedPartitions = new ArrayList<>();

Collections.sort(sortedAssignedPartitions, new PartitionComparator(topic2AllPotentialConsumers));
sortedAssignedPartitions.sort(new PartitionComparator(topic2AllPotentialConsumers));

boolean shouldAddDirectly = false;
Iterator<TopicPartition> sortedAssignedPartitionsIter = sortedAssignedPartitions.iterator();
Expand Down Expand Up @@ -1154,7 +1154,7 @@ private void prepopulateCurrentAssignments(Map<TopicPartition, ConsumerGeneratio
if (memberData.generation.isPresent() && memberData.generation.get() < maxGeneration) {
// if the current member's generation is lower than maxGeneration, put into prevAssignment if needed
updatePrevAssignment(prevAssignment, memberData.partitions, consumer, memberData.generation.get());
} else if (!memberData.generation.isPresent() && maxGeneration > DEFAULT_GENERATION) {
} else if (memberData.generation.isEmpty() && maxGeneration > DEFAULT_GENERATION) {
// if maxGeneration is larger than DEFAULT_GENERATION
// put all (no generation) partitions as DEFAULT_GENERATION into prevAssignment if needed
updatePrevAssignment(prevAssignment, memberData.partitions, consumer, DEFAULT_GENERATION);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public abstract class AsyncClient<T1, Req extends AbstractRequest, Resp extends
public RequestFuture<T2> sendAsyncRequest(Node node, T1 requestData) {
AbstractRequest.Builder<Req> requestBuilder = prepareRequest(node, requestData);

return client.send(node, requestBuilder).compose(new RequestFutureAdapter<ClientResponse, T2>() {
return client.send(node, requestBuilder).compose(new RequestFutureAdapter<>() {
@Override
@SuppressWarnings("unchecked")
public void onSuccess(ClientResponse value, RequestFuture<T2> future) {
Expand Down
Loading

0 comments on commit 0a57e85

Please sign in to comment.