Skip to content

Commit

Permalink
RATIS-1967. Do not store CommitInfoProto in CommitInfoCache. (#988)
Browse files Browse the repository at this point in the history
  • Loading branch information
szetszwo authored Dec 13, 2023
1 parent 3febe36 commit cedcd2a
Show file tree
Hide file tree
Showing 6 changed files with 93 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,30 +24,45 @@
import org.apache.ratis.util.ProtoUtils;

import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

/** Caching the commit information. */
class CommitInfoCache {
private final ConcurrentMap<RaftPeerId, CommitInfoProto> map = new ConcurrentHashMap<>();
private final ConcurrentMap<RaftPeerId, Long> map = new ConcurrentHashMap<>();

CommitInfoProto get(RaftPeerId id) {
return map.get(id);
Optional<Long> get(RaftPeerId id) {
return Optional.ofNullable(map.get(id));
}

CommitInfoProto update(RaftPeer peer, long newCommitIndex) {
Objects.requireNonNull(peer, "peer == null");
return map.compute(peer.getId(), (id, old) ->
old == null || newCommitIndex > old.getCommitIndex()? ProtoUtils.toCommitInfoProto(peer, newCommitIndex): old);
final long updated = update(peer.getId(), newCommitIndex);
return ProtoUtils.toCommitInfoProto(peer, updated);
}

CommitInfoProto update(CommitInfoProto newInfo) {
return map.compute(RaftPeerId.valueOf(newInfo.getServer().getId()),
(id, old) -> old == null || newInfo.getCommitIndex() > old.getCommitIndex()? newInfo: old);
long update(RaftPeerId peerId, long newCommitIndex) {
Objects.requireNonNull(peerId, "peerId == null");
return map.compute(peerId, (id, oldCommitIndex) -> {
if (oldCommitIndex != null) {
// get around BX_UNBOXING_IMMEDIATELY_REBOXED
final long old = oldCommitIndex;
if (old >= newCommitIndex) {
return old;
}
}
return newCommitIndex;
});
}

void update(CommitInfoProto newInfo) {
final RaftPeerId id = RaftPeerId.valueOf(newInfo.getServer().getId());
update(id, newInfo.getCommitIndex());
}

@Override
public String toString() {
return JavaUtils.getClassSimpleName(getClass()) + ":" + map.values();
return JavaUtils.getClassSimpleName(getClass()) + ":" + map;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
*/
package org.apache.ratis.server.impl;

import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.RaftConfiguration;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.Preconditions;
Expand All @@ -31,17 +34,29 @@
* entries.
*/
public class ConfigurationManager {
private final RaftPeerId id;
private final RaftConfigurationImpl initialConf;
private final NavigableMap<Long, RaftConfigurationImpl> configurations = new TreeMap<>();
/**
* The current raft configuration. If configurations is not empty, should be
* the last entry of the map. Otherwise is initialConf.
*/
private volatile RaftConfigurationImpl currentConf;
/** Cache the peer corresponding to {@link #id}. */
private volatile RaftPeer currentPeer;

ConfigurationManager(RaftConfigurationImpl initialConf) {
ConfigurationManager(RaftPeerId id, RaftConfigurationImpl initialConf) {
this.id = id;
this.initialConf = initialConf;
this.currentConf = initialConf;
setCurrentConf(initialConf);
}

private void setCurrentConf(RaftConfigurationImpl currentConf) {
this.currentConf = currentConf;
final RaftPeer peer = currentConf.getPeer(id, RaftPeerRole.FOLLOWER, RaftPeerRole.LISTENER);
if (peer != null) {
this.currentPeer = peer;
}
}

synchronized void addConfiguration(RaftConfiguration conf) {
Expand All @@ -57,29 +72,32 @@ synchronized void addConfiguration(RaftConfiguration conf) {
private void addRaftConfigurationImpl(long logIndex, RaftConfigurationImpl conf) {
configurations.put(logIndex, conf);
if (logIndex == configurations.lastEntry().getKey()) {
currentConf = conf;
setCurrentConf(conf);
}
}

RaftConfigurationImpl getCurrent() {
return currentConf;
}

RaftPeer getCurrentPeer() {
return currentPeer;
}

/**
* Remove all the configurations whose log index is >= the given index.
*
* @param index The given index. All the configurations whose log index is >=
* this value will be removed.
* @return The configuration with largest log index < the given index.
*/
synchronized RaftConfiguration removeConfigurations(long index) {
synchronized void removeConfigurations(long index) {
// remove all configurations starting at the index
for(final Iterator<?> iter = configurations.tailMap(index).entrySet().iterator(); iter.hasNext();) {
iter.next();
iter.remove();
final SortedMap<Long, RaftConfigurationImpl> tail = configurations.tailMap(index);
if (tail.isEmpty()) {
return;
}
currentConf = configurations.isEmpty() ? initialConf :
configurations.lastEntry().getValue();
return currentConf;
tail.clear();
setCurrentConf(configurations.isEmpty() ? initialConf : configurations.lastEntry().getValue());
}

synchronized int numOfConf() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ public long[] getFollowerNextIndices() {
this.leaderElectionMetrics = LeaderElectionMetrics.getLeaderElectionMetrics(
getMemberId(), state::getLastLeaderElapsedTimeMs);
this.raftServerMetrics = RaftServerMetricsImpl.computeIfAbsentRaftServerMetrics(
getMemberId(), () -> commitInfoCache::get, retryCache::getStatistics);
getMemberId(), this::getCommitIndex, retryCache::getStatistics);

this.startComplete = new AtomicBoolean(false);
this.threadGroup = new ThreadGroup(proxy.getThreadGroup(), getMemberId().toString());
Expand All @@ -294,6 +294,10 @@ public long[] getFollowerNextIndices() {
id + "-client");
}

private long getCommitIndex(RaftPeerId id) {
return commitInfoCache.get(id).orElse(0L);
}

@Override
public DivisionProperties properties() {
return divisionProperties;
Expand Down Expand Up @@ -452,6 +456,12 @@ public RaftGroupMemberId getMemberId() {
return getState().getMemberId();
}

@Override
public RaftPeer getPeer() {
return Optional.ofNullable(getState().getCurrentPeer())
.orElseGet(() -> getRaftServer().getPeer());
}

@Override
public DivisionInfo getInfo() {
return info;
Expand Down Expand Up @@ -622,7 +632,8 @@ synchronized void changeToLeader() {
public Collection<CommitInfoProto> getCommitInfos() {
final List<CommitInfoProto> infos = new ArrayList<>();
// add the commit info of this server
infos.add(updateCommitInfoCache());
final long commitIndex = updateCommitInfoCache();
infos.add(ProtoUtils.toCommitInfoProto(getPeer(), commitIndex));

// add the commit infos of other servers
if (getInfo().isLeader()) {
Expand All @@ -633,9 +644,10 @@ public Collection<CommitInfoProto> getCommitInfos() {
Stream.concat(
raftConf.getAllPeers(RaftPeerRole.FOLLOWER).stream(),
raftConf.getAllPeers(RaftPeerRole.LISTENER).stream())
.map(RaftPeer::getId)
.filter(id -> !id.equals(getId()))
.map(commitInfoCache::get)
.filter(peer -> !peer.getId().equals(getId()))
.map(peer -> commitInfoCache.get(peer.getId())
.map(index -> ProtoUtils.toCommitInfoProto(peer, index))
.orElse(null))
.filter(Objects::nonNull)
.forEach(infos::add);
}
Expand Down Expand Up @@ -1534,8 +1546,8 @@ private void preAppendEntriesAsync(RaftPeerId leaderId, RaftGroupId leaderGroupI
}
}

private CommitInfoProto updateCommitInfoCache() {
return commitInfoCache.update(getPeer(), state.getLog().getLastCommittedIndex());
private long updateCommitInfoCache() {
return commitInfoCache.update(getId(), state.getLog().getLastCommittedIndex());
}

ExecutorService getServerExecutor() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ class ServerState {
final RaftConfigurationImpl initialConf = RaftConfigurationImpl.newBuilder()
.setConf(followerPeers, listenerPeers)
.build();
configurationManager = new ConfigurationManager(initialConf);
configurationManager = new ConfigurationManager(id, initialConf);
LOG.info("{}: {}", getMemberId(), configurationManager);

final String storageDirName = group.getGroupId().getUuid().toString();
Expand Down Expand Up @@ -196,6 +196,10 @@ RaftConfigurationImpl getRaftConf() {
return configurationManager.getCurrent();
}

RaftPeer getCurrentPeer() {
return configurationManager.getCurrentPeer();
}

long getCurrentTerm() {
return currentTerm.get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,18 @@

package org.apache.ratis.server.metrics;

import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.Map;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.function.ToLongFunction;

import org.apache.ratis.metrics.LongCounter;
import org.apache.ratis.metrics.Timekeeper;

import org.apache.ratis.metrics.MetricRegistryInfo;
import org.apache.ratis.metrics.RatisMetricRegistry;
import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto.TypeCase;
import org.apache.ratis.protocol.RaftClientRequest.Type;
import org.apache.ratis.protocol.RaftGroupMemberId;
Expand Down Expand Up @@ -98,7 +97,7 @@ public final class RaftServerMetricsImpl extends RatisMetrics implements RaftSer

/** Follower Id -> heartbeat elapsed */
private final Map<RaftPeerId, Long> followerLastHeartbeatElapsedTimeMap = new ConcurrentHashMap<>();
private final Supplier<Function<RaftPeerId, CommitInfoProto>> commitInfoCache;
private final ToLongFunction<RaftPeerId> commitInfoCache;

/** id -> metric */
private static final Map<RaftGroupMemberId, RaftServerMetricsImpl> METRICS = new ConcurrentHashMap<>();
Expand All @@ -111,7 +110,7 @@ static String getPeerCommitIndexGaugeKey(RaftPeerId serverId) {
}

public static RaftServerMetricsImpl computeIfAbsentRaftServerMetrics(RaftGroupMemberId serverId,
Supplier<Function<RaftPeerId, CommitInfoProto>> commitInfoCache,
ToLongFunction<RaftPeerId> commitInfoCache,
Supplier<RetryCache.Statistics> retryCacheStatistics) {
return METRICS.computeIfAbsent(serverId,
key -> new RaftServerMetricsImpl(serverId, commitInfoCache, retryCacheStatistics));
Expand All @@ -122,7 +121,7 @@ public static void removeRaftServerMetrics(RaftGroupMemberId serverId) {
}

public RaftServerMetricsImpl(RaftGroupMemberId serverId,
Supplier<Function<RaftPeerId, CommitInfoProto>> commitInfoCache,
ToLongFunction<RaftPeerId> commitInfoCache,
Supplier<RetryCache.Statistics> retryCacheStatistics) {
super(createRegistry(serverId.toString()));
this.commitInfoCache = commitInfoCache;
Expand Down Expand Up @@ -183,10 +182,8 @@ public void addFollower(RaftPeerId followerName) {
* Register a commit index tracker for the peer in cluster.
*/
private void addPeerCommitIndexGauge(RaftPeerId peerId) {
getRegistry().gauge(getPeerCommitIndexGaugeKey(peerId), () -> () -> Optional.ofNullable(commitInfoCache.get())
.map(cache -> cache.apply(peerId))
.map(CommitInfoProto::getCommitIndex)
.orElse(0L));
getRegistry().gauge(getPeerCommitIndexGaugeKey(peerId),
() -> () -> commitInfoCache.applyAsLong(peerId));
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,10 @@ public static void setUp() {
retryCache = new RetryCacheImpl(RaftServerConfigKeys.RetryCache.EXPIRY_TIME_DEFAULT, null);

final RaftServerMetricsImpl raftServerMetrics = RaftServerMetricsImpl.computeIfAbsentRaftServerMetrics(
raftGroupMemberId, () -> null, retryCache::getStatistics);
raftGroupMemberId, id -> 0L, retryCache::getStatistics);
ratisMetricRegistry = (RatisMetricRegistryImpl) raftServerMetrics.getRegistry();
}

@After
public void tearDown() {
retryCache.close();
Expand Down Expand Up @@ -92,23 +92,23 @@ public void testRetryCacheHitMissCount() {
}

private static void checkHit(long count, double rate) {
Long hitCount = (Long) ratisMetricRegistry.getGauges((s, metric) ->
final long hitCount = (Long) ratisMetricRegistry.getGauges((s, metric) ->
s.contains(RETRY_CACHE_HIT_COUNT_METRIC)).values().iterator().next().getValue();
assertEquals(hitCount.longValue(), count);
assertEquals(hitCount, count);

Double hitRate = (Double) ratisMetricRegistry.getGauges((s, metric) ->
final double hitRate = (Double) ratisMetricRegistry.getGauges((s, metric) ->
s.contains(RETRY_CACHE_HIT_RATE_METRIC)).values().iterator().next().getValue();
assertEquals(hitRate.doubleValue(), rate, 0.0);
assertEquals(hitRate, rate, 0.0);
}

private static void checkMiss(long count, double rate) {
Long missCount = (Long) ratisMetricRegistry.getGauges((s, metric) ->
final long missCount = (Long) ratisMetricRegistry.getGauges((s, metric) ->
s.contains(RETRY_CACHE_MISS_COUNT_METRIC)).values().iterator().next().getValue();
assertEquals(missCount.longValue(), count);
assertEquals(missCount, count);

Double missRate = (Double) ratisMetricRegistry.getGauges((s, metric) ->
final double missRate = (Double) ratisMetricRegistry.getGauges((s, metric) ->
s.contains(RETRY_CACHE_MISS_RATE_METRIC)).values().iterator().next().getValue();
assertEquals(missRate.doubleValue(), rate, 0.0);
assertEquals(missRate, rate, 0.0);
}

private static void checkEntryCount(long expected) {
Expand Down

0 comments on commit cedcd2a

Please sign in to comment.