diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinatorTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinatorTask.java index 6d32f3a3f3ea..a50a125f6d44 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinatorTask.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinatorTask.java @@ -46,6 +46,16 @@ public ECReconstructionCoordinatorTask( debugString = reconstructionCommandInfo.toString(); } + @Override + public String getMetricName() { + return "ECReconstructions"; + } + + @Override + public String getMetricDescriptionSegment() { + return "EC reconstructions"; + } + @Override public void runTask() { // Implement the coordinator logic to handle a container group diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/AbstractReplicationTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/AbstractReplicationTask.java index 72fa88b35d94..f4bf54a3d827 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/AbstractReplicationTask.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/AbstractReplicationTask.java @@ -70,6 +70,10 @@ protected AbstractReplicationTask(long containerID, this.term = term; queued = Instant.now(clock); } + + protected abstract String getMetricName(); + + protected abstract String getMetricDescriptionSegment(); public long getContainerId() { return containerId; diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java index 5ceea125e814..92ff4b6d8d61 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java @@ -26,6 +26,7 @@ import java.util.Objects; import java.util.OptionalLong; import java.util.Set; +import java.util.Collections; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.PriorityBlockingQueue; @@ -71,11 +72,17 @@ public final class ReplicationSupervisor { private final StateContext context; private final Clock clock; - private final AtomicLong requestCounter = new AtomicLong(); - private final AtomicLong successCounter = new AtomicLong(); - private final AtomicLong failureCounter = new AtomicLong(); - private final AtomicLong timeoutCounter = new AtomicLong(); - private final AtomicLong skippedCounter = new AtomicLong(); + private final Map requestCounter = new ConcurrentHashMap<>(); + private final Map successCounter = new ConcurrentHashMap<>(); + private final Map failureCounter = new ConcurrentHashMap<>(); + private final Map timeoutCounter = new ConcurrentHashMap<>(); + private final Map skippedCounter = new ConcurrentHashMap<>(); + + private static final Map METRICS_MAP; + + static { + METRICS_MAP = new HashMap<>(); + } /** * A set of container IDs that are currently being downloaded @@ -188,6 +195,10 @@ public static Builder newBuilder() { return new Builder(); } + public static Map getMetricsMap() { + return Collections.unmodifiableMap(METRICS_MAP); + } + private ReplicationSupervisor(StateContext context, ExecutorService executor, ReplicationConfig replicationConfig, DatanodeConfiguration datanodeConfig, Clock clock, IntConsumer executorThreadUpdater) { @@ -221,6 +232,19 @@ public void addTask(AbstractReplicationTask task) { return; } + if (requestCounter.get(task.getMetricName()) == null) { + synchronized (this) { + if (requestCounter.get(task.getMetricName()) == null) { + requestCounter.put(task.getMetricName(), new AtomicLong(0)); + successCounter.put(task.getMetricName(), new AtomicLong(0)); + failureCounter.put(task.getMetricName(), new AtomicLong(0)); + timeoutCounter.put(task.getMetricName(), new AtomicLong(0)); + skippedCounter.put(task.getMetricName(), new AtomicLong(0)); + METRICS_MAP.put(task.getMetricName(), task.getMetricDescriptionSegment()); + } + } + } + if (inFlight.add(task)) { if (task.getPriority() != ReplicationCommandPriority.LOW) { // Low priority tasks are not included in the replication queue sizes @@ -330,14 +354,14 @@ public TaskRunner(AbstractReplicationTask task) { @Override public void run() { try { - requestCounter.incrementAndGet(); + requestCounter.get(task.getMetricName()).incrementAndGet(); final long now = clock.millis(); final long deadline = task.getDeadline(); if (deadline > 0 && now > deadline) { LOG.info("Ignoring {} since the deadline has passed ({} < {})", this, Instant.ofEpochMilli(deadline), Instant.ofEpochMilli(now)); - timeoutCounter.incrementAndGet(); + timeoutCounter.get(task.getMetricName()).incrementAndGet(); return; } @@ -364,18 +388,18 @@ public void run() { task.runTask(); if (task.getStatus() == Status.FAILED) { LOG.warn("Failed {}", this); - failureCounter.incrementAndGet(); + failureCounter.get(task.getMetricName()).incrementAndGet(); } else if (task.getStatus() == Status.DONE) { LOG.info("Successful {}", this); - successCounter.incrementAndGet(); + successCounter.get(task.getMetricName()).incrementAndGet(); } else if (task.getStatus() == Status.SKIPPED) { LOG.info("Skipped {}", this); - skippedCounter.incrementAndGet(); + skippedCounter.get(task.getMetricName()).incrementAndGet(); } } catch (Exception e) { task.setStatus(Status.FAILED); LOG.warn("Failed {}", this, e); - failureCounter.incrementAndGet(); + failureCounter.get(task.getMetricName()).incrementAndGet(); } finally { inFlight.remove(task); decrementTaskCounter(task); @@ -419,7 +443,12 @@ public boolean equals(Object o) { } public long getReplicationRequestCount() { - return requestCounter.get(); + return getCount(requestCounter); + } + + public long getReplicationRequestCount(String metricsName) { + AtomicLong counter = requestCounter.get(metricsName); + return counter != null ? counter.get() : 0; } public long getQueueSize() { @@ -438,20 +467,48 @@ public long getMaxReplicationStreams() { } } + private long getCount(Map counter) { + long total = 0; + for (Map.Entry entry : counter.entrySet()) { + total += entry.getValue().get(); + } + return total; + } + public long getReplicationSuccessCount() { - return successCounter.get(); + return getCount(successCounter); + } + + public long getReplicationSuccessCount(String metricsName) { + AtomicLong counter = successCounter.get(metricsName); + return counter != null ? counter.get() : 0; } public long getReplicationFailureCount() { - return failureCounter.get(); + return getCount(failureCounter); + } + + public long getReplicationFailureCount(String metricsName) { + AtomicLong counter = failureCounter.get(metricsName); + return counter != null ? counter.get() : 0; } public long getReplicationTimeoutCount() { - return timeoutCounter.get(); + return getCount(timeoutCounter); + } + + public long getReplicationTimeoutCount(String metricsName) { + AtomicLong counter = timeoutCounter.get(metricsName); + return counter != null ? counter.get() : 0; } public long getReplicationSkippedCount() { - return skippedCounter.get(); + return getCount(skippedCounter); + } + + public long getReplicationSkippedCount(String metricsName) { + AtomicLong counter = skippedCounter.get(metricsName); + return counter != null ? counter.get() : 0; } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisorMetrics.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisorMetrics.java index 671e985d7adc..a1763976af99 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisorMetrics.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisorMetrics.java @@ -71,16 +71,47 @@ public void getMetrics(MetricsCollector collector, boolean all) { .addGauge(Interns.info("numRequestedReplications", "Number of requested replications"), supervisor.getReplicationRequestCount()) + .addGauge(Interns.info("numSuccessReplications", + "Number of successful replications"), + supervisor.getReplicationSuccessCount()) + .addGauge(Interns.info("numFailureReplications", + "Number of failure replications"), + supervisor.getReplicationFailureCount()) .addGauge(Interns.info("numTimeoutReplications", "Number of replication requests timed out before being processed"), supervisor.getReplicationTimeoutCount()) .addGauge(Interns.info("numSkippedReplications", "Number of replication requests skipped as the container is " - + "already present"), supervisor.getReplicationSkippedCount()) + + "already present"), + supervisor.getReplicationSkippedCount()) .addGauge(Interns.info("maxReplicationStreams", "Maximum number of " + "concurrent replication tasks which can run simultaneously"), supervisor.getMaxReplicationStreams()); + Map metricsMap = ReplicationSupervisor.getMetricsMap(); + if (!metricsMap.isEmpty()) { + metricsMap.forEach((metricsName, descriptionSegment) -> { + if (!metricsName.equals("")) { + builder.addGauge(Interns.info("numRequested" + metricsName, + "Number of requested " + descriptionSegment), + supervisor.getReplicationRequestCount(metricsName)) + .addGauge(Interns.info("numSuccess" + metricsName, + "Number of successful " + descriptionSegment), + supervisor.getReplicationSuccessCount(metricsName)) + .addGauge(Interns.info("numFailure" + metricsName, + "Number of failure " + descriptionSegment), + supervisor.getReplicationFailureCount(metricsName)) + .addGauge(Interns.info("numTimeout" + metricsName, + "Number of " + descriptionSegment + " timed out before being processed"), + supervisor.getReplicationTimeoutCount(metricsName)) + .addGauge(Interns.info("numSkipped" + metricsName, + "Number of " + descriptionSegment + " skipped as the container is " + + "already present"), + supervisor.getReplicationSkippedCount(metricsName)); + } + }); + } + Map tasks = supervisor.getInFlightReplicationSummary(); for (Map.Entry entry : tasks.entrySet()) { builder.addGauge(Interns.info("numInflight" + entry.getKey(), diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationTask.java index ca0ca98906c0..2168f324c24d 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationTask.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationTask.java @@ -65,6 +65,16 @@ protected ReplicationTask( replicator); } + @Override + public String getMetricName() { + return "ContainerReplications"; + } + + @Override + public String getMetricDescriptionSegment() { + return "container replications"; + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java index 1f69db78d625..ef37c226653a 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java @@ -27,6 +27,7 @@ import java.time.Instant; import java.time.ZoneId; import java.util.List; +import java.util.SortedMap; import java.util.UUID; import java.util.concurrent.AbstractExecutorService; import java.util.concurrent.CountDownLatch; @@ -46,6 +47,8 @@ import org.apache.hadoop.hdds.protocol.MockDatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ReplicationCommandPriority; +import org.apache.hadoop.hdds.security.symmetric.SecretKeySignerClient; +import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient; import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl; import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion; import org.apache.hadoop.ozone.container.common.impl.ContainerSet; @@ -55,7 +58,9 @@ import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine; import org.apache.hadoop.ozone.container.common.statemachine.StateContext; import org.apache.hadoop.ozone.container.ec.reconstruction.ECReconstructionCommandInfo; +import org.apache.hadoop.ozone.container.ec.reconstruction.ECReconstructionCoordinator; import org.apache.hadoop.ozone.container.ec.reconstruction.ECReconstructionCoordinatorTask; +import org.apache.hadoop.ozone.container.ec.reconstruction.ECReconstructionMetrics; import org.apache.hadoop.ozone.container.keyvalue.ContainerLayoutTestInfo; import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer; import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; @@ -109,6 +114,8 @@ public class TestReplicationSupervisor { }; private final AtomicReference replicatorRef = new AtomicReference<>(); + private final AtomicReference ecReplicatorRef = + new AtomicReference<>(); private ContainerSet set; @@ -135,6 +142,7 @@ public void setUp() throws Exception { @AfterEach public void cleanup() { replicatorRef.set(null); + ecReplicatorRef.set(null); } @ContainerLayoutTestInfo.ContainerTest @@ -394,6 +402,107 @@ public void taskWithObsoleteTermIsDropped(ContainerLayoutVersion layout) { assertEquals(0, supervisor.getReplicationSuccessCount()); } + @ContainerLayoutTestInfo.ContainerTest + public void testMultipleReplication(ContainerLayoutVersion layout, + @TempDir File tempFile) throws IOException { + this.layoutVersion = layout; + OzoneConfiguration conf = new OzoneConfiguration(); + // GIVEN + ReplicationSupervisor replicationSupervisor = + supervisorWithReplicator(FakeReplicator::new); + ReplicationSupervisor ecReconstructionSupervisor = supervisorWithECReconstruction(); + ReplicationSupervisorMetrics replicationMetrics = + ReplicationSupervisorMetrics.create(replicationSupervisor); + ReplicationSupervisorMetrics ecReconstructionMetrics = + ReplicationSupervisorMetrics.create(ecReconstructionSupervisor); + try { + //WHEN + replicationSupervisor.addTask(createTask(1L)); + ecReconstructionSupervisor.addTask(createECTaskWithCoordinator(2L)); + replicationSupervisor.addTask(createTask(1L)); + replicationSupervisor.addTask(createTask(3L)); + ecReconstructionSupervisor.addTask(createECTaskWithCoordinator(4L)); + + SimpleContainerDownloader moc = mock(SimpleContainerDownloader.class); + Path res = Paths.get("file:/tmp/no-such-file"); + when(moc.getContainerDataFromReplicas(anyLong(), anyList(), + any(Path.class), any())).thenReturn(res); + + final String testDir = tempFile.getPath(); + MutableVolumeSet volumeSet = mock(MutableVolumeSet.class); + when(volumeSet.getVolumesList()).thenReturn(singletonList( + new HddsVolume.Builder(testDir).conf(conf).build())); + ContainerController mockedCC = mock(ContainerController.class); + ContainerImporter importer = new ContainerImporter(conf, set, mockedCC, volumeSet); + ContainerReplicator replicator = new DownloadAndImportReplicator( + conf, set, importer, moc); + replicatorRef.set(replicator); + replicationSupervisor.addTask(createTask(5L)); + + ReplicateContainerCommand cmd1 = createCommand(6L); + cmd1.setDeadline(clock.millis() + 10000); + ReplicationTask task1 = new ReplicationTask(cmd1, replicatorRef.get()); + clock.fastForward(15000); + replicationSupervisor.addTask(task1); + + ReconstructECContainersCommand cmd2 = createReconstructionCmd(7L); + cmd2.setDeadline(clock.millis() + 10000); + ECReconstructionCoordinatorTask task2 = new ECReconstructionCoordinatorTask( + ecReplicatorRef.get(), new ECReconstructionCommandInfo(cmd2)); + clock.fastForward(15000); + ecReconstructionSupervisor.addTask(task2); + ecReconstructionSupervisor.addTask(createECTask(8L)); + ecReconstructionSupervisor.addTask(createECTask(9L)); + + //THEN + assertEquals(2, replicationSupervisor.getReplicationSuccessCount()); + assertEquals(2, replicationSupervisor.getReplicationSuccessCount( + task1.getMetricName())); + assertEquals(1, replicationSupervisor.getReplicationFailureCount()); + assertEquals(1, replicationSupervisor.getReplicationFailureCount( + task1.getMetricName())); + assertEquals(1, replicationSupervisor.getReplicationSkippedCount()); + assertEquals(1, replicationSupervisor.getReplicationSkippedCount( + task1.getMetricName())); + assertEquals(1, replicationSupervisor.getReplicationTimeoutCount()); + assertEquals(1, replicationSupervisor.getReplicationTimeoutCount( + task1.getMetricName())); + assertEquals(5, replicationSupervisor.getReplicationRequestCount()); + assertEquals(5, replicationSupervisor.getReplicationRequestCount( + task1.getMetricName())); + assertEquals(0, replicationSupervisor.getReplicationRequestCount( + task2.getMetricName())); + + assertEquals(2, ecReconstructionSupervisor.getReplicationSuccessCount()); + assertEquals(2, ecReconstructionSupervisor.getReplicationSuccessCount( + task2.getMetricName())); + assertEquals(1, ecReconstructionSupervisor.getReplicationTimeoutCount()); + assertEquals(1, ecReconstructionSupervisor.getReplicationTimeoutCount( + task2.getMetricName())); + assertEquals(2, ecReconstructionSupervisor.getReplicationFailureCount()); + assertEquals(2, ecReconstructionSupervisor.getReplicationFailureCount( + task2.getMetricName())); + assertEquals(5, ecReconstructionSupervisor.getReplicationRequestCount()); + assertEquals(5, ecReconstructionSupervisor.getReplicationRequestCount( + task2.getMetricName())); + assertEquals(0, ecReconstructionSupervisor.getReplicationRequestCount( + task1.getMetricName())); + + MetricsCollectorImpl replicationMetricsCollector = new MetricsCollectorImpl(); + replicationMetrics.getMetrics(replicationMetricsCollector, true); + assertEquals(1, replicationMetricsCollector.getRecords().size()); + + MetricsCollectorImpl ecReconstructionMetricsCollector = new MetricsCollectorImpl(); + ecReconstructionMetrics.getMetrics(ecReconstructionMetricsCollector, true); + assertEquals(1, ecReconstructionMetricsCollector.getRecords().size()); + } finally { + replicationMetrics.unRegister(); + ecReconstructionMetrics.unRegister(); + replicationSupervisor.stop(); + ecReconstructionSupervisor.stop(); + } + } + @ContainerLayoutTestInfo.ContainerTest public void testPriorityOrdering(ContainerLayoutVersion layout) throws InterruptedException { @@ -476,6 +585,16 @@ private static class BlockingTask extends AbstractReplicationTask { this.waitForCompleteLatch = waitForCompletion; } + @Override + protected String getMetricName() { + return "Blockings"; + } + + @Override + protected String getMetricDescriptionSegment() { + return "blockings"; + } + @Override public void runTask() { runningLatch.countDown(); @@ -502,6 +621,16 @@ private static class OrderedTask extends AbstractReplicationTask { setPriority(priority); } + @Override + protected String getMetricName() { + return "Ordereds"; + } + + @Override + protected String getMetricDescriptionSegment() { + return "ordereds"; + } + @Override public void runTask() { completeList.add(name); @@ -531,6 +660,22 @@ private ReplicationSupervisor supervisorWith( return supervisor; } + private ReplicationSupervisor supervisorWithECReconstruction() throws IOException { + ConfigurationSource conf = new OzoneConfiguration(); + ExecutorService executor = newDirectExecutorService(); + ReplicationServer.ReplicationConfig repConf = + conf.getObject(ReplicationServer.ReplicationConfig.class); + ReplicationSupervisor supervisor = ReplicationSupervisor.newBuilder() + .stateContext(context).replicationConfig(repConf).executor(executor) + .clock(clock).build(); + + FakeECReconstructionCoordinator coordinator = new FakeECReconstructionCoordinator( + new OzoneConfiguration(), null, null, context, + ECReconstructionMetrics.create(), "", supervisor); + ecReplicatorRef.set(coordinator); + return supervisor; + } + private ReplicationTask createTask(long containerId) { ReplicateContainerCommand cmd = createCommand(containerId); return new ReplicationTask(cmd, replicatorRef.get()); @@ -538,7 +683,13 @@ private ReplicationTask createTask(long containerId) { private ECReconstructionCoordinatorTask createECTask(long containerId) { return new ECReconstructionCoordinatorTask(null, - createReconstructionCmd(containerId)); + createReconstructionCmdInfo(containerId)); + } + + private ECReconstructionCoordinatorTask createECTaskWithCoordinator(long containerId) { + ECReconstructionCommandInfo ecReconstructionCommandInfo = createReconstructionCmdInfo(containerId); + return new ECReconstructionCoordinatorTask(ecReplicatorRef.get(), + ecReconstructionCommandInfo); } private static ReplicateContainerCommand createCommand(long containerId) { @@ -548,18 +699,20 @@ private static ReplicateContainerCommand createCommand(long containerId) { return cmd; } - private static ECReconstructionCommandInfo createReconstructionCmd( + private static ECReconstructionCommandInfo createReconstructionCmdInfo( long containerId) { - List sources - = new ArrayList<>(); - sources.add(new ReconstructECContainersCommand - .DatanodeDetailsAndReplicaIndex( - MockDatanodeDetails.randomDatanodeDetails(), 1)); - sources.add(new ReconstructECContainersCommand - .DatanodeDetailsAndReplicaIndex( + return new ECReconstructionCommandInfo(createReconstructionCmd(containerId)); + } + + private static ReconstructECContainersCommand createReconstructionCmd( + long containerId) { + List sources = + new ArrayList<>(); + sources.add(new ReconstructECContainersCommand.DatanodeDetailsAndReplicaIndex( + MockDatanodeDetails.randomDatanodeDetails(), 1)); + sources.add(new ReconstructECContainersCommand.DatanodeDetailsAndReplicaIndex( MockDatanodeDetails.randomDatanodeDetails(), 2)); - sources.add(new ReconstructECContainersCommand - .DatanodeDetailsAndReplicaIndex( + sources.add(new ReconstructECContainersCommand.DatanodeDetailsAndReplicaIndex( MockDatanodeDetails.randomDatanodeDetails(), 3)); byte[] missingIndexes = new byte[1]; @@ -567,14 +720,44 @@ private static ECReconstructionCommandInfo createReconstructionCmd( List target = singletonList( MockDatanodeDetails.randomDatanodeDetails()); - ReconstructECContainersCommand cmd = - new ReconstructECContainersCommand(containerId, - sources, - target, - Proto2Utils.unsafeByteString(missingIndexes), - new ECReplicationConfig(3, 2)); - - return new ECReconstructionCommandInfo(cmd); + ReconstructECContainersCommand cmd = new ReconstructECContainersCommand(containerId, sources, target, + Proto2Utils.unsafeByteString(missingIndexes), + new ECReplicationConfig(3, 2)); + cmd.setTerm(CURRENT_TERM); + return cmd; + } + + /** + * A fake coordinator that simulates successful reconstruction of ec containers. + */ + private class FakeECReconstructionCoordinator extends ECReconstructionCoordinator { + + private final OzoneConfiguration conf = new OzoneConfiguration(); + private final ReplicationSupervisor supervisor; + + FakeECReconstructionCoordinator(ConfigurationSource conf, + CertificateClient certificateClient, SecretKeySignerClient secretKeyClient, + StateContext context, ECReconstructionMetrics metrics, String threadNamePrefix, + ReplicationSupervisor supervisor) + throws IOException { + super(conf, certificateClient, secretKeyClient, context, metrics, threadNamePrefix); + this.supervisor = supervisor; + } + + @Override + public void reconstructECContainerGroup(long containerID, + ECReplicationConfig repConfig, SortedMap sourceNodeMap, + SortedMap targetNodeMap) { + assertEquals(1, supervisor.getTotalInFlightReplications()); + + KeyValueContainerData kvcd = new KeyValueContainerData( + containerID, layoutVersion, 100L, + UUID.randomUUID().toString(), UUID.randomUUID().toString()); + KeyValueContainer kvc = new KeyValueContainer(kvcd, conf); + assertDoesNotThrow(() -> { + set.addContainer(kvc); + }); + } } /**