diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ReconcileContainerTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ReconcileContainerTask.java index 5d949e90b194..f09258fdf6b5 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ReconcileContainerTask.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ReconcileContainerTask.java @@ -70,12 +70,12 @@ protected Object getCommandForDebug() { } @Override - protected String getMetricName() { + public String getMetricName() { return "ContainerReconciliations"; } @Override - protected String getMetricDescriptionSegment() { + public String getMetricDescriptionSegment() { return "Container Reconciliations"; } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReconcileContainerCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReconcileContainerCommandHandler.java index 99185a7e10b3..8a290d674422 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReconcileContainerCommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReconcileContainerCommandHandler.java @@ -28,28 +28,28 @@ import org.apache.hadoop.ozone.protocol.commands.ReconcileContainerCommand; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; -import java.util.concurrent.atomic.AtomicLong; - /** * Handles commands from SCM to reconcile a container replica on this datanode with the replicas on its peers. */ public class ReconcileContainerCommandHandler implements CommandHandler { private final ReplicationSupervisor supervisor; - private final AtomicLong invocationCount; private final DNContainerOperationClient dnClient; + private String metricsName; public ReconcileContainerCommandHandler(ReplicationSupervisor supervisor, DNContainerOperationClient dnClient) { this.supervisor = supervisor; this.dnClient = dnClient; - this.invocationCount = new AtomicLong(0); } @Override public void handle(SCMCommand command, OzoneContainer container, StateContext context, SCMConnectionManager connectionManager) { - invocationCount.incrementAndGet(); ReconcileContainerCommand reconcileCommand = (ReconcileContainerCommand) command; - supervisor.addTask(new ReconcileContainerTask(container.getController(), dnClient, reconcileCommand)); + ReconcileContainerTask task = new ReconcileContainerTask(container.getController(), dnClient, reconcileCommand); + if (metricsName == null) { + metricsName = task.getMetricName(); + } + supervisor.addTask(task); } @Override @@ -58,24 +58,30 @@ public SCMCommandProto.Type getCommandType() { } @Override - public int getInvocationCount() { - return (int)invocationCount.get(); + public int getQueuedCount() { + return this.metricsName == null ? 0 : (int) this.supervisor + .getReplicationQueuedCount(metricsName); } - // Uses ReplicationSupervisor for these metrics. + @Override + public int getInvocationCount() { + return this.metricsName == null ? 0 : (int) this.supervisor + .getReplicationRequestCount(metricsName); + } @Override public long getAverageRunTime() { - return 0; + return this.metricsName == null ? 0 : (int) this.supervisor + .getReplicationRequestAvgTime(metricsName); } @Override public long getTotalRunTime() { - return 0; + return this.metricsName == null ? 0 : this.supervisor + .getReplicationRequestTotalTime(metricsName); } - @Override - public int getQueuedCount() { - return 0; + public String getMetricsName() { + return this.metricsName; } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java index 9513cac84efe..9a1621be32ff 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java @@ -537,11 +537,11 @@ public long getReplicationQueuedCount(String metricsName) { public long getReplicationRequestAvgTime(String metricsName) { MutableRate rate = opsLatencyMs.get(metricsName); - return rate != null ? (long) rate.lastStat().mean() : 0; + return rate != null ? (long) Math.ceil(rate.lastStat().mean()) : 0; } public long getReplicationRequestTotalTime(String metricsName) { MutableRate rate = opsLatencyMs.get(metricsName); - return rate != null ? (long) rate.lastStat().total() : 0; + return rate != null ? (long) Math.ceil(rate.lastStat().total()) : 0; } } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReconcileContainerCommandHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReconcileContainerCommandHandler.java index f27ed097d2f7..fbc0f9714a59 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReconcileContainerCommandHandler.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReconcileContainerCommandHandler.java @@ -74,6 +74,7 @@ public class TestReconcileContainerCommandHandler { private OzoneContainer ozoneContainer; private StateContext context; private ReconcileContainerCommandHandler subject; + private ReplicationSupervisor mockSupervisor; public void init(ContainerLayoutVersion layout, IncrementalReportSender icrSender) throws Exception { @@ -81,7 +82,7 @@ public void init(ContainerLayoutVersion layout, IncrementalReportSender { ((ReconcileContainerTask)invocation.getArguments()[0]).runTask(); return null; @@ -145,12 +146,21 @@ public void testReconcileContainerCommandMetrics(ContainerLayoutVersion layout) assertEquals(0, subject.getInvocationCount()); - // All commands submitted will be blocked until the latch is counted down. for (int id = 1; id <= NUM_CONTAINERS; id++) { ReconcileContainerCommand cmd = new ReconcileContainerCommand(id, Collections.emptySet()); subject.handle(cmd, ozoneContainer, context, null); } + + when(mockSupervisor.getReplicationRequestCount(subject.getMetricsName())).thenReturn(3L); + when(mockSupervisor.getReplicationRequestTotalTime(subject.getMetricsName())).thenReturn(10L); + when(mockSupervisor.getReplicationRequestAvgTime(subject.getMetricsName())).thenReturn(3L); + when(mockSupervisor.getReplicationQueuedCount(subject.getMetricsName())).thenReturn(1L); + + assertEquals(subject.getMetricsName(), "ContainerReconciliations"); assertEquals(NUM_CONTAINERS, subject.getInvocationCount()); + assertEquals(subject.getQueuedCount(), 1); + assertEquals(subject.getTotalRunTime(), 10); + assertEquals(subject.getAverageRunTime(), 3); } private void verifyAllContainerReports(Map reportsSent) { diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java index 315e0c0253b4..bfae06b56a69 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java @@ -26,6 +26,7 @@ import java.util.ArrayList; import java.time.Instant; import java.time.ZoneId; +import java.util.Collections; import java.util.List; import java.util.SortedMap; import java.util.UUID; @@ -50,6 +51,8 @@ import org.apache.hadoop.hdds.security.symmetric.SecretKeySignerClient; import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient; import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl; +import org.apache.hadoop.ozone.container.checksum.DNContainerOperationClient; +import org.apache.hadoop.ozone.container.checksum.ReconcileContainerTask; import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion; import org.apache.hadoop.ozone.container.common.impl.ContainerSet; import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration; @@ -66,6 +69,7 @@ import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController; +import org.apache.hadoop.ozone.protocol.commands.ReconcileContainerCommand; import org.apache.hadoop.ozone.protocol.commands.ReconstructECContainersCommand; import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand; import org.apache.ozone.test.GenericTestUtils; @@ -93,6 +97,7 @@ import static org.mockito.Mockito.any; import static org.mockito.Mockito.anyList; import static org.mockito.Mockito.anyLong; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -125,6 +130,8 @@ public class TestReplicationSupervisor { private StateContext context; private TestClock clock; private DatanodeDetails datanode; + private DNContainerOperationClient mockClient; + private ContainerController mockController; @BeforeEach public void setUp() throws Exception { @@ -137,6 +144,8 @@ public void setUp() throws Exception { stateMachine, ""); context.setTermOfLeaderSCM(CURRENT_TERM); datanode = MockDatanodeDetails.randomDatanodeDetails(); + mockClient = mock(DNContainerOperationClient.class); + mockController = mock(ContainerController.class); when(stateMachine.getDatanodeDetails()).thenReturn(datanode); } @@ -513,6 +522,56 @@ public void testMultipleReplication(ContainerLayoutVersion layout, } } + @ContainerLayoutTestInfo.ContainerTest + public void testReconciliationTaskMetrics(ContainerLayoutVersion layout) throws IOException { + this.layoutVersion = layout; + // GIVEN + ReplicationSupervisor replicationSupervisor = + supervisorWithReplicator(FakeReplicator::new); + ReplicationSupervisorMetrics replicationMetrics = + ReplicationSupervisorMetrics.create(replicationSupervisor); + + try { + //WHEN + replicationSupervisor.addTask(createReconciliationTask(1L)); + replicationSupervisor.addTask(createReconciliationTask(2L)); + + ReconcileContainerTask reconciliationTask = createReconciliationTask(6L); + clock.fastForward(15000); + replicationSupervisor.addTask(reconciliationTask); + doThrow(IOException.class).when(mockController).reconcileContainer(any(), anyLong(), any()); + replicationSupervisor.addTask(createReconciliationTask(7L)); + + //THEN + assertEquals(2, replicationSupervisor.getReplicationSuccessCount()); + + assertEquals(2, replicationSupervisor.getReplicationSuccessCount( + reconciliationTask.getMetricName())); + assertEquals(1, replicationSupervisor.getReplicationFailureCount()); + assertEquals(1, replicationSupervisor.getReplicationFailureCount( + reconciliationTask.getMetricName())); + assertEquals(1, replicationSupervisor.getReplicationTimeoutCount()); + assertEquals(1, replicationSupervisor.getReplicationTimeoutCount( + reconciliationTask.getMetricName())); + assertEquals(4, replicationSupervisor.getReplicationRequestCount()); + assertEquals(4, replicationSupervisor.getReplicationRequestCount( + reconciliationTask.getMetricName())); + + + assertTrue(replicationSupervisor.getReplicationRequestTotalTime( + reconciliationTask.getMetricName()) > 0); + assertTrue(replicationSupervisor.getReplicationRequestAvgTime( + reconciliationTask.getMetricName()) > 0); + + MetricsCollectorImpl replicationMetricsCollector = new MetricsCollectorImpl(); + replicationMetrics.getMetrics(replicationMetricsCollector, true); + assertEquals(1, replicationMetricsCollector.getRecords().size()); + } finally { + replicationMetrics.unRegister(); + replicationSupervisor.stop(); + } + } + @ContainerLayoutTestInfo.ContainerTest public void testPriorityOrdering(ContainerLayoutVersion layout) throws InterruptedException { @@ -691,6 +750,15 @@ private ReplicationTask createTask(long containerId) { return new ReplicationTask(cmd, replicatorRef.get()); } + private ReconcileContainerTask createReconciliationTask(long containerId) { + ReconcileContainerCommand reconcileContainerCommand = + new ReconcileContainerCommand(containerId, Collections.singleton(datanode)); + reconcileContainerCommand.setTerm(CURRENT_TERM); + reconcileContainerCommand.setDeadline(clock.millis() + 10000); + return new ReconcileContainerTask(mockController, mockClient, + reconcileContainerCommand); + } + private ECReconstructionCoordinatorTask createECTask(long containerId) { return new ECReconstructionCoordinatorTask(null, createReconstructionCmdInfo(containerId));