Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
public class CSMMetrics {
public static final String SOURCE_NAME =
CSMMetrics.class.getSimpleName();
private RaftGroupId gid;

// ratis op metrics metrics
private @Metric MutableCounterLong numWriteStateMachineOps;
Expand All @@ -50,6 +51,7 @@ public class CSMMetrics {

private @Metric MutableRate transactionLatencyMs;
private final EnumMap<Type, MutableRate> opsLatencyMs;
private final EnumMap<Type, MutableRate> opsQueueingDelay;
private MetricsRegistry registry = null;

// Failure Metrics
Expand All @@ -64,23 +66,35 @@ public class CSMMetrics {
private @Metric MutableCounterLong numDataCacheMiss;
private @Metric MutableCounterLong numDataCacheHit;
private @Metric MutableCounterLong numEvictedCacheCount;
private @Metric MutableCounterLong pendingApplyTransactions;

private @Metric MutableRate applyTransactionNs;
private @Metric MutableRate writeStateMachineDataNs;
private @Metric MutableRate writeStateMachineQueueingLatencyNs;
private @Metric MutableRate untilApplyTransactionNs;
private @Metric MutableRate startTransactionCompleteNs;

public CSMMetrics() {
public CSMMetrics(RaftGroupId gid) {
this.gid = gid;
this.opsLatencyMs = new EnumMap<>(ContainerProtos.Type.class);
this.opsQueueingDelay = new EnumMap<>(ContainerProtos.Type.class);
this.registry = new MetricsRegistry(CSMMetrics.class.getSimpleName());
for (ContainerProtos.Type type : ContainerProtos.Type.values()) {
opsLatencyMs.put(type, registry.newRate(type.toString() + "Ms", type + " op"));
opsQueueingDelay.put(type, registry.newRate("queueingDelay" + type.toString() + "Ns", type + " op"));
}
}

public static CSMMetrics create(RaftGroupId gid) {
MetricsSystem ms = DefaultMetricsSystem.instance();
return ms.register(SOURCE_NAME + gid.toString(),
"Container State Machine",
new CSMMetrics());
new CSMMetrics(gid));
}

@Metric
public String getRaftGroupId() {
return gid.toString();
}

public void incNumWriteStateMachineOps() {
Expand Down Expand Up @@ -189,6 +203,11 @@ public void incPipelineLatencyMs(ContainerProtos.Type type,
transactionLatencyMs.add(latencyMillis);
}

public void recordQueueingDelay(ContainerProtos.Type type,
long latencyNanos) {
opsQueueingDelay.get(type).add(latencyNanos);
}

public void incNumStartTransactionVerifyFailures() {
numStartTransactionVerifyFailures.incr();
}
Expand All @@ -205,6 +224,19 @@ public void recordWriteStateMachineCompletionNs(long latencyNanos) {
writeStateMachineDataNs.add(latencyNanos);
}


public void recordWriteStateMachineQueueingLatencyNs(long latencyNanos) {
writeStateMachineQueueingLatencyNs.add(latencyNanos);
}

public void recordUntilApplyTransactionNs(long latencyNanos) {
untilApplyTransactionNs.add(latencyNanos);
}

public void recordStartTransactionCompleteNs(long latencyNanos) {
startTransactionCompleteNs.add(latencyNanos);
}

public void incNumDataCacheMiss() {
numDataCacheMiss.incr();
}
Expand All @@ -216,8 +248,16 @@ public void incNumEvictedCacheCount() {
numEvictedCacheCount.incr();
}

public void incPendingApplyTransactions() {
pendingApplyTransactions.incr();
}

public void decPendingApplyTransactions() {
pendingApplyTransactions.incr(-1);
}

public void unRegister() {
MetricsSystem ms = DefaultMetricsSystem.instance();
ms.unregisterSource(SOURCE_NAME);
ms.unregisterSource(SOURCE_NAME + gid.toString());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,7 @@ public TransactionContext startTransaction(LogEntryProto entry, RaftPeerRole rol
@Override
public TransactionContext startTransaction(RaftClientRequest request)
throws IOException {
long startTime = Time.monotonicNowNanos();
final ContainerCommandRequestProto proto =
message2ContainerCommandRequestProto(request.getMessage());
Preconditions.checkArgument(request.getRaftGroupId().equals(gid));
Expand All @@ -411,6 +412,8 @@ public TransactionContext startTransaction(RaftClientRequest request)
.setStateMachine(this)
.setServerRole(RaftPeerRole.LEADER);

metrics.incPendingApplyTransactions();

try {
dispatcher.validateContainerCommand(proto);
} catch (IOException ioe) {
Expand Down Expand Up @@ -440,9 +443,11 @@ public TransactionContext startTransaction(RaftClientRequest request)
builder.setStateMachineData(write.getData());
}
final ContainerCommandRequestProto containerCommandRequestProto = protoBuilder.build();
return builder.setStateMachineContext(new Context(proto, containerCommandRequestProto))
TransactionContext txnContext = builder.setStateMachineContext(new Context(proto, containerCommandRequestProto))
.setLogData(containerCommandRequestProto.toByteString())
.build();
metrics.recordStartTransactionCompleteNs(Time.monotonicNowNanos() - startTime);
return txnContext;
}

private static ContainerCommandRequestProto getContainerCommandRequestProto(
Expand Down Expand Up @@ -521,6 +526,8 @@ private CompletableFuture<Message> writeStateMachineData(
CompletableFuture<ContainerCommandResponseProto> writeChunkFuture =
CompletableFuture.supplyAsync(() -> {
try {
metrics.recordWriteStateMachineQueueingLatencyNs(
Time.monotonicNowNanos() - startTime);
return dispatchCommand(requestProto, context);
} catch (Exception e) {
LOG.error("{}: writeChunk writeStateMachineData failed: blockId" +
Expand Down Expand Up @@ -884,6 +891,11 @@ private CompletableFuture<ContainerCommandResponseProto> applyTransaction(
final CheckedSupplier<ContainerCommandResponseProto, Exception> task
= () -> {
try {
long timeNow = Time.monotonicNowNanos();
long queueingDelay = timeNow - context.getStartTime();
metrics.recordQueueingDelay(request.getCmdType(), queueingDelay);
// TODO: add a counter to track number of executing applyTransaction
// and queue size
return dispatchCommand(request, context);
} catch (Exception e) {
exceptionHandler.accept(e);
Expand Down Expand Up @@ -932,11 +944,13 @@ public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
.setTerm(trx.getLogEntry().getTerm())
.setLogIndex(index);

final Context context = (Context) trx.getStateMachineContext();
long applyTxnStartTime = Time.monotonicNowNanos();
metrics.recordUntilApplyTransactionNs(applyTxnStartTime - context.getStartTime());
applyTransactionSemaphore.acquire();
metrics.incNumApplyTransactionsOps();

final Context context = (Context) trx.getStateMachineContext();

Objects.requireNonNull(context, "context == null");
final ContainerCommandRequestProto requestProto = context.getLogProto();
final Type cmdType = requestProto.getCmdType();
Expand Down Expand Up @@ -1021,6 +1035,9 @@ public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
applyTransactionSemaphore.release();
metrics.recordApplyTransactionCompletionNs(
Time.monotonicNowNanos() - applyTxnStartTime);
if (trx.getServerRole() == RaftPeerRole.LEADER) {
metrics.decPendingApplyTransactions();
}
});
return applyTransactionFuture;
} catch (InterruptedException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.hadoop.hdds.annotation.InterfaceAudience;
import org.apache.hadoop.hdds.annotation.InterfaceStability;
import org.apache.hadoop.util.Time;
import org.apache.ratis.server.protocol.TermIndex;

import java.util.Map;
Expand Down Expand Up @@ -118,12 +119,15 @@ public static Op op(DispatcherContext context) {

private final Map<Long, Long> container2BCSIDMap;

private long startTime;

private DispatcherContext(Builder b) {
this.op = Objects.requireNonNull(b.op, "op == null");
this.term = b.term;
this.logIndex = b.logIndex;
this.stage = b.stage;
this.container2BCSIDMap = b.container2BCSIDMap;
this.startTime = Time.monotonicNowNanos();
}

/** Use {@link DispatcherContext#op(DispatcherContext)} for handling null. */
Expand All @@ -147,6 +151,10 @@ public Map<Long, Long> getContainer2BCSIDMap() {
return container2BCSIDMap;
}

public long getStartTime() {
return startTime;
}

@Override
public String toString() {
return op + "-" + stage + TermIndex.valueOf(term, logIndex);
Expand Down