Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@

import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.metrics2.MetricsSystem;
import org.apache.hadoop.metrics2.annotation.Metric;
import org.apache.hadoop.metrics2.annotation.Metrics;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import org.apache.hadoop.metrics2.lib.MutableRate;
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
import org.apache.ratis.protocol.RaftGroupId;

/**
Expand All @@ -43,14 +46,28 @@ public class CSMMetrics {
private @Metric MutableCounterLong numBytesWrittenCount;
private @Metric MutableCounterLong numBytesCommittedCount;

private @Metric MutableRate transactionLatency;
private MutableRate[] opsLatency;
private MetricsRegistry registry = null;

// Failure Metrics
private @Metric MutableCounterLong numWriteStateMachineFails;
private @Metric MutableCounterLong numQueryStateMachineFails;
private @Metric MutableCounterLong numApplyTransactionFails;
private @Metric MutableCounterLong numReadStateMachineFails;
private @Metric MutableCounterLong numReadStateMachineMissCount;
private @Metric MutableCounterLong numStartTransactionVerifyFailures;
private @Metric MutableCounterLong numContainerNotOpenVerifyFailures;

public CSMMetrics() {
int numCmdTypes = ContainerProtos.Type.values().length;
this.opsLatency = new MutableRate[numCmdTypes];
this.registry = new MetricsRegistry(CSMMetrics.class.getName());
for (int i = 0; i < numCmdTypes; i++) {
opsLatency[i] = registry.newRate(
ContainerProtos.Type.forNumber(i + 1).toString(),
ContainerProtos.Type.forNumber(i + 1) + " op");
}
}

public static CSMMetrics create(RaftGroupId gid) {
Expand Down Expand Up @@ -154,6 +171,19 @@ public long getNumBytesCommittedCount() {
return numBytesCommittedCount.value();
}

public void incPipelineLatency(ContainerProtos.Type type, long latencyNanos) {
opsLatency[type.ordinal()].add(latencyNanos);
transactionLatency.add(latencyNanos);
}

public void incNumStartTransactionVerifyFailures() {
numStartTransactionVerifyFailures.incr();
}

public void incNumContainerNotOpenVerifyFailures() {
numContainerNotOpenVerifyFailures.incr();
}


public void unRegister() {
MetricsSystem ms = DefaultMetricsSystem.instance();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;

import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
import org.apache.hadoop.util.Time;
import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftGroupId;
Expand Down Expand Up @@ -262,12 +264,19 @@ public long takeSnapshot() throws IOException {
@Override
public TransactionContext startTransaction(RaftClientRequest request)
throws IOException {
long startTime = Time.monotonicNowNanos();
final ContainerCommandRequestProto proto =
getContainerCommandRequestProto(request.getMessage().getContent());
Preconditions.checkArgument(request.getRaftGroupId().equals(gid));
try {
dispatcher.validateContainerCommand(proto);
} catch (IOException ioe) {
if (ioe instanceof ContainerNotOpenException) {
metrics.incNumContainerNotOpenVerifyFailures();
} else {
metrics.incNumStartTransactionVerifyFailures();
LOG.error("startTransaction validation failed on leader", ioe);
}
TransactionContext ctxt = TransactionContext.newBuilder()
.setClientRequest(request)
.setStateMachine(this)
Expand Down Expand Up @@ -297,6 +306,7 @@ public TransactionContext startTransaction(RaftClientRequest request)
.setClientRequest(request)
.setStateMachine(this)
.setServerRole(RaftPeerRole.LEADER)
.setStateMachineContext(startTime)
.setStateMachineData(write.getData())
.setLogData(commitContainerCommandProto.toByteString())
.build();
Expand All @@ -305,6 +315,7 @@ public TransactionContext startTransaction(RaftClientRequest request)
.setClientRequest(request)
.setStateMachine(this)
.setServerRole(RaftPeerRole.LEADER)
.setStateMachineContext(startTime)
.setLogData(request.getMessage().getContent())
.build();
}
Expand Down Expand Up @@ -451,8 +462,10 @@ public CompletableFuture<Message> query(Message request) {
}

private ByteString readStateMachineData(
ContainerCommandRequestProto requestProto, long term, long index)
throws IOException {
ContainerCommandRequestProto requestProto, long term, long index) {
// the stateMachine data is not present in the stateMachine cache,
// increment the stateMachine cache miss count
metrics.incNumReadStateMachineMissCount();
WriteChunkRequestProto writeChunkRequestProto =
requestProto.getWriteChunk();
ContainerProtos.ChunkInfo chunkInfo = writeChunkRequestProto.getChunkData();
Expand Down Expand Up @@ -527,9 +540,6 @@ public CompletableFuture<ByteString> readStateMachineData(
return CompletableFuture.completedFuture(ByteString.EMPTY);
}
try {
// the stateMachine data is not present in the stateMachine cache,
// increment the stateMachine cache miss count
metrics.incNumReadStateMachineMissCount();
final ContainerCommandRequestProto requestProto =
getContainerCommandRequestProto(
entry.getStateMachineLogEntry().getLogData());
Expand Down Expand Up @@ -622,6 +632,12 @@ public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
getCommandExecutor(requestProto));

future.thenAccept(m -> {
if (trx.getServerRole() == RaftPeerRole.LEADER) {
long startTime = (long) trx.getStateMachineContext();
metrics.incPipelineLatency(cmdType,
Time.monotonicNowNanos() - startTime);
}

final Long previous =
applyTransactionCompletionMap
.put(index, trx.getLogEntry().getTerm());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ static void runContainerStateMachineMetrics(
assertCounter("NumApplyTransactionOps", 0L, metric);
assertCounter("NumBytesWrittenCount", 0L, metric);
assertCounter("NumBytesCommittedCount", 0L, metric);
assertCounter("NumStartTransactionVerifyFailures", 0L, metric);
assertCounter("NumContainerNotOpenVerifyFailures", 0L, metric);
assertCounter("WriteChunkNumOps", 0L, metric);

// Write Chunk
BlockID blockID = ContainerTestHelper.getTestBlockID(ContainerTestHelper.
Expand All @@ -133,6 +136,9 @@ static void runContainerStateMachineMetrics(
assertCounter("NumBytesWrittenCount", 1024L, metric);
assertCounter("NumApplyTransactionOps", 1L, metric);
assertCounter("NumBytesCommittedCount", 1024L, metric);
assertCounter("NumStartTransactionVerifyFailures", 0L, metric);
assertCounter("NumContainerNotOpenVerifyFailures", 0L, metric);
assertCounter("WriteChunkNumOps", 1L, metric);

//Read Chunk
ContainerProtos.ContainerCommandRequestProto readChunkRequest =
Expand Down