diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ContainerChecksumTreeManager.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ContainerChecksumTreeManager.java index 7042531f5767..3e18e16c40b7 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ContainerChecksumTreeManager.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ContainerChecksumTreeManager.java @@ -60,10 +60,13 @@ public class ContainerChecksumTreeManager { public ContainerChecksumTreeManager(ConfigurationSource conf) { fileLock = SimpleStriped.readWriteLock( conf.getObject(DatanodeConfiguration.class).getContainerChecksumLockStripes(), true); - // TODO: TO unregister metrics on stop. metrics = ContainerMerkleTreeMetrics.create(); } + public void stop() { + ContainerMerkleTreeMetrics.unregister(); + } + /** * Writes the specified container merkle tree to the specified container's checksum file. * The data merkle tree within the file is replaced with the {@code tree} parameter, but all other content of the diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ContainerMerkleTreeMetrics.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ContainerMerkleTreeMetrics.java index 3d76288616e1..c1bab5aa4856 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ContainerMerkleTreeMetrics.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ContainerMerkleTreeMetrics.java @@ -32,7 +32,6 @@ public class ContainerMerkleTreeMetrics { public static ContainerMerkleTreeMetrics create() { MetricsSystem ms = DefaultMetricsSystem.instance(); - // TODO: Remove when checksum manager is moved from KeyValueHandler. MetricsSource source = ms.getSource(METRICS_SOURCE_NAME); if (source != null) { ms.unregisterSource(METRICS_SOURCE_NAME); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ReconcileContainerTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ReconcileContainerTask.java new file mode 100644 index 000000000000..ac42efd45ad5 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ReconcileContainerTask.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.checksum; + +import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController; +import org.apache.hadoop.ozone.container.replication.AbstractReplicationTask; +import org.apache.hadoop.ozone.protocol.commands.ReconcileContainerCommand; +import org.apache.hadoop.util.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Objects; + +/** + * Used to execute a container reconciliation task that has been queued from the ReplicationSupervisor. + */ +public class ReconcileContainerTask extends AbstractReplicationTask { + private final ReconcileContainerCommand command; + private final DNContainerOperationClient dnClient; + private final ContainerController controller; + + private static final Logger LOG = + LoggerFactory.getLogger(ReconcileContainerTask.class); + + public ReconcileContainerTask(ContainerController controller, + DNContainerOperationClient dnClient, ReconcileContainerCommand command) { + super(command.getContainerID(), command.getDeadline(), command.getTerm()); + this.command = command; + this.controller = controller; + this.dnClient = dnClient; + } + + @Override + public void runTask() { + long start = Time.monotonicNow(); + + LOG.info("{}", this); + + try { + controller.reconcileContainer(dnClient, command.getContainerID(), command.getPeerDatanodes()); + setStatus(Status.DONE); + long elapsed = Time.monotonicNow() - start; + LOG.info("{} completed in {} ms", this, elapsed); + } catch (Exception e) { + long elapsed = Time.monotonicNow() - start; + setStatus(Status.FAILED); + LOG.warn("{} failed in {} ms", this, elapsed, e); + } + } + + @Override + protected Object getCommandForDebug() { + return command.toString(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ReconcileContainerTask that = (ReconcileContainerTask) o; + return Objects.equals(command, that.command); + } + + @Override + public int hashCode() { + return Objects.hash(getContainerId()); + } +} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java index 01435d8002a3..1579f4af8eab 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java @@ -21,7 +21,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.util.List; +import java.util.Set; import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.protocol.DatanodeDetails; @@ -30,6 +30,8 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerType; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; +import org.apache.hadoop.ozone.container.checksum.ContainerChecksumTreeManager; +import org.apache.hadoop.ozone.container.checksum.DNContainerOperationClient; import org.apache.hadoop.ozone.container.common.helpers.BlockData; import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics; import org.apache.hadoop.ozone.container.common.impl.ContainerData; @@ -70,16 +72,17 @@ protected Handler(ConfigurationSource config, String datanodeId, this.icrSender = icrSender; } + @SuppressWarnings("checkstyle:ParameterNumber") public static Handler getHandlerForContainerType( final ContainerType containerType, final ConfigurationSource config, final String datanodeId, final ContainerSet contSet, final VolumeSet volumeSet, final ContainerMetrics metrics, - IncrementalReportSender icrSender) { + IncrementalReportSender icrSender, ContainerChecksumTreeManager checksumManager) { switch (containerType) { case KeyValueContainer: return new KeyValueHandler(config, datanodeId, contSet, volumeSet, metrics, - icrSender); + icrSender, checksumManager); default: throw new IllegalArgumentException("Handler for ContainerType: " + containerType + "doesn't exist."); @@ -199,7 +202,8 @@ public abstract void deleteContainer(Container container, boolean force) * @param container container to be reconciled. * @param peers The other datanodes with a copy of this container whose data should be checked. */ - public abstract void reconcileContainer(Container container, List peers) throws IOException; + public abstract void reconcileContainer(DNContainerOperationClient dnClient, Container container, + Set peers) throws IOException; /** * Deletes the given files associated with a block of the container. diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java index a8deb9823d75..888d1fd0181d 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java @@ -45,6 +45,7 @@ import org.apache.hadoop.hdds.utils.IOUtils; import org.apache.hadoop.hdds.utils.NettyMetrics; import org.apache.hadoop.ozone.HddsDatanodeStopService; +import org.apache.hadoop.ozone.container.checksum.DNContainerOperationClient; import org.apache.hadoop.ozone.container.common.DatanodeLayoutStorage; import org.apache.hadoop.ozone.container.common.report.ReportManager; import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.CloseContainerCommandHandler; @@ -225,6 +226,10 @@ public DatanodeStateMachine(DatanodeDetails datanodeDetails, new ReconstructECContainersCommandHandler(conf, supervisor, ecReconstructionCoordinator); + // TODO HDDS-11218 combine the clients used for reconstruction and reconciliation so they share the same cache of + // datanode clients. + DNContainerOperationClient dnClient = new DNContainerOperationClient(conf, certClient, secretKeyClient); + ThreadFactory threadFactory = new ThreadFactoryBuilder() .setNameFormat(threadNamePrefix + "PipelineCommandHandlerThread-%d") .build(); @@ -253,7 +258,7 @@ public DatanodeStateMachine(DatanodeDetails datanodeDetails, supervisor::nodeStateUpdated)) .addHandler(new FinalizeNewLayoutVersionCommandHandler()) .addHandler(new RefreshVolumeUsageCommandHandler()) - .addHandler(new ReconcileContainerCommandHandler(threadNamePrefix)) + .addHandler(new ReconcileContainerCommandHandler(supervisor, dnClient)) .setConnectionManager(connectionManager) .setContainer(container) .setContext(context) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReconcileContainerCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReconcileContainerCommandHandler.java index 9a4110c7dfcb..99185a7e10b3 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReconcileContainerCommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReconcileContainerCommandHandler.java @@ -18,66 +18,38 @@ package org.apache.hadoop.ozone.container.common.statemachine.commandhandler; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto; +import org.apache.hadoop.ozone.container.checksum.DNContainerOperationClient; +import org.apache.hadoop.ozone.container.checksum.ReconcileContainerTask; import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager; import org.apache.hadoop.ozone.container.common.statemachine.StateContext; import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; +import org.apache.hadoop.ozone.container.replication.ReplicationSupervisor; import org.apache.hadoop.ozone.protocol.commands.ReconcileContainerCommand; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; -import org.apache.hadoop.util.Time; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; /** * Handles commands from SCM to reconcile a container replica on this datanode with the replicas on its peers. */ public class ReconcileContainerCommandHandler implements CommandHandler { - private static final Logger LOG = - LoggerFactory.getLogger(ReconcileContainerCommandHandler.class); - + private final ReplicationSupervisor supervisor; private final AtomicLong invocationCount; - private final AtomicInteger queuedCount; - private final ExecutorService executor; - private long totalTime; + private final DNContainerOperationClient dnClient; - public ReconcileContainerCommandHandler(String threadNamePrefix) { - invocationCount = new AtomicLong(0); - queuedCount = new AtomicInteger(0); - // TODO Allow configurable thread pool size with a default value when the implementation is ready. - executor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder() - .setNameFormat(threadNamePrefix + "ReconcileContainerThread-%d") - .build()); - totalTime = 0; + public ReconcileContainerCommandHandler(ReplicationSupervisor supervisor, DNContainerOperationClient dnClient) { + this.supervisor = supervisor; + this.dnClient = dnClient; + this.invocationCount = new AtomicLong(0); } @Override public void handle(SCMCommand command, OzoneContainer container, StateContext context, SCMConnectionManager connectionManager) { - queuedCount.incrementAndGet(); - CompletableFuture.runAsync(() -> { - invocationCount.incrementAndGet(); - long startTime = Time.monotonicNow(); - ReconcileContainerCommand reconcileCommand = (ReconcileContainerCommand) command; - LOG.info("Processing reconcile container command for container {} with peers {}", - reconcileCommand.getContainerID(), reconcileCommand.getPeerDatanodes()); - try { - container.getController().reconcileContainer(reconcileCommand.getContainerID(), - reconcileCommand.getPeerDatanodes()); - } catch (IOException ex) { - LOG.error("Failed to reconcile container {}.", reconcileCommand.getContainerID(), ex); - } finally { - long endTime = Time.monotonicNow(); - totalTime += endTime - startTime; - } - }, executor).whenComplete((v, e) -> queuedCount.decrementAndGet()); + invocationCount.incrementAndGet(); + ReconcileContainerCommand reconcileCommand = (ReconcileContainerCommand) command; + supervisor.addTask(new ReconcileContainerTask(container.getController(), dnClient, reconcileCommand)); } @Override @@ -90,21 +62,20 @@ public int getInvocationCount() { return (int)invocationCount.get(); } + // Uses ReplicationSupervisor for these metrics. + @Override public long getAverageRunTime() { - if (invocationCount.get() > 0) { - return totalTime / invocationCount.get(); - } return 0; } @Override public long getTotalRunTime() { - return totalTime; + return 0; } @Override public int getQueuedCount() { - return queuedCount.get(); + return 0; } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java index 9b888a9c1efc..4b635194cdb3 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java @@ -32,6 +32,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.locks.Lock; import java.util.function.Function; @@ -65,6 +66,7 @@ import org.apache.hadoop.ozone.common.OzoneChecksumException; import org.apache.hadoop.ozone.common.utils.BufferUtils; import org.apache.hadoop.ozone.container.checksum.ContainerChecksumTreeManager; +import org.apache.hadoop.ozone.container.checksum.DNContainerOperationClient; import org.apache.hadoop.ozone.container.common.helpers.BlockData; import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics; @@ -159,14 +161,15 @@ public KeyValueHandler(ConfigurationSource config, ContainerSet contSet, VolumeSet volSet, ContainerMetrics metrics, - IncrementalReportSender icrSender) { + IncrementalReportSender icrSender, + ContainerChecksumTreeManager checksumManager) { super(config, datanodeId, contSet, volSet, metrics, icrSender); blockManager = new BlockManagerImpl(config); validateChunkChecksumData = conf.getObject( DatanodeConfiguration.class).isChunkDataValidationCheck(); chunkManager = ChunkManagerFactory.createChunkManager(config, blockManager, volSet); - checksumManager = new ContainerChecksumTreeManager(config); + this.checksumManager = checksumManager; try { volumeChoosingPolicy = VolumeChoosingPolicyFactory.getPolicy(conf); } catch (Exception e) { @@ -1303,7 +1306,8 @@ public void deleteContainer(Container container, boolean force) } @Override - public void reconcileContainer(Container container, List peers) throws IOException { + public void reconcileContainer(DNContainerOperationClient dnClient, Container container, + Set peers) throws IOException { // TODO Just a deterministic placeholder hash for testing until actual implementation is finished. ContainerData data = container.getContainerData(); long id = data.getContainerID(); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java index 47b503ee0542..6a1ceef0c037 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException; +import org.apache.hadoop.ozone.container.checksum.DNContainerOperationClient; import org.apache.hadoop.ozone.container.common.impl.ContainerData; import org.apache.hadoop.ozone.container.common.impl.ContainerSet; import org.apache.hadoop.ozone.container.common.interfaces.Container; @@ -39,7 +40,6 @@ import java.io.OutputStream; import java.time.Instant; import java.util.Iterator; -import java.util.List; import java.util.Map; import java.util.Set; @@ -215,12 +215,13 @@ public void deleteContainer(final long containerId, boolean force) } } - public void reconcileContainer(long containerID, List peers) throws IOException { + public void reconcileContainer(DNContainerOperationClient dnClient, long containerID, Set peers) + throws IOException { Container container = containerSet.getContainer(containerID); if (container == null) { LOG.warn("Container {} to reconcile not found on this datanode.", containerID); } else { - getHandler(container).reconcileContainer(container, peers); + getHandler(container).reconcileContainer(dnClient, container, peers); } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java index 6a6ac8bb35f1..f8034244c19b 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java @@ -34,7 +34,6 @@ import org.apache.hadoop.hdds.security.token.TokenVerifier; import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient; import org.apache.hadoop.hdds.utils.HddsServerUtil; -import org.apache.hadoop.ozone.container.checksum.ContainerMerkleTreeMetrics; import org.apache.hadoop.ozone.container.checksum.ContainerChecksumTreeManager; import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics; import org.apache.hadoop.ozone.container.common.impl.BlockDeletingService; @@ -188,12 +187,13 @@ public OzoneContainer( } }; + checksumTreeManager = new ContainerChecksumTreeManager(config); for (ContainerType containerType : ContainerType.values()) { handlers.put(containerType, Handler.getHandlerForContainerType( containerType, conf, context.getParent().getDatanodeDetails().getUuidString(), - containerSet, volumeSet, metrics, icrSender)); + containerSet, volumeSet, metrics, icrSender, checksumTreeManager)); } SecurityConfig secConf = new SecurityConfig(conf); @@ -226,8 +226,6 @@ public OzoneContainer( Duration blockDeletingSvcInterval = conf.getObject( DatanodeConfiguration.class).getBlockDeletionInterval(); - checksumTreeManager = new ContainerChecksumTreeManager(config); - long blockDeletingServiceTimeout = config .getTimeDuration(OZONE_BLOCK_DELETING_SERVICE_TIMEOUT, OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT, @@ -500,8 +498,7 @@ public void stop() { blockDeletingService.shutdown(); recoveringContainerScrubbingService.shutdown(); ContainerMetrics.remove(); - // TODO: To properly shut down ContainerMerkleTreeMetrics - ContainerMerkleTreeMetrics.unregister(); + checksumTreeManager.stop(); } public void handleVolumeFailures() { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReconcileContainerCommand.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReconcileContainerCommand.java index cdd4522cc691..3d24756a4085 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReconcileContainerCommand.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReconcileContainerCommand.java @@ -25,18 +25,20 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ReconcileContainerCommandProto; import java.util.List; +import java.util.Objects; +import java.util.Set; import java.util.stream.Collectors; -import static java.util.Collections.emptyList; +import static java.util.Collections.emptySet; /** * Asks datanodes to reconcile the specified container with other container replicas. */ public class ReconcileContainerCommand extends SCMCommand { - private final List peerDatanodes; + private final Set peerDatanodes; - public ReconcileContainerCommand(long containerID, List peerDatanodes) { + public ReconcileContainerCommand(long containerID, Set peerDatanodes) { // Container ID serves as command ID, since only one reconciliation should be in progress at a time. super(containerID); this.peerDatanodes = peerDatanodes; @@ -58,7 +60,7 @@ public ReconcileContainerCommandProto getProto() { return builder.build(); } - public List getPeerDatanodes() { + public Set getPeerDatanodes() { return peerDatanodes; } @@ -70,11 +72,11 @@ public static ReconcileContainerCommand getFromProtobuf(ReconcileContainerComman Preconditions.checkNotNull(protoMessage); List peers = protoMessage.getPeersList(); - List peerNodes = !peers.isEmpty() + Set peerNodes = !peers.isEmpty() ? peers.stream() .map(DatanodeDetails::getFromProtoBuf) - .collect(Collectors.toList()) - : emptyList(); + .collect(Collectors.toSet()) + : emptySet(); return new ReconcileContainerCommand(protoMessage.getContainerID(), peerNodes); } @@ -85,4 +87,22 @@ public String toString() { ": containerId=" + getContainerID() + ", peerNodes=" + peerDatanodes; } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ReconcileContainerCommand that = (ReconcileContainerCommand) o; + return getContainerID() == that.getContainerID() && + Objects.equals(peerDatanodes, that.peerDatanodes); + } + + @Override + public int hashCode() { + return Objects.hash(getContainerID(), peerDatanodes); + } } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/checksum/TestReconcileContainerTask.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/checksum/TestReconcileContainerTask.java new file mode 100644 index 000000000000..04d08347ed4b --- /dev/null +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/checksum/TestReconcileContainerTask.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.checksum; + +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController; +import org.apache.hadoop.ozone.container.replication.AbstractReplicationTask; +import org.apache.hadoop.ozone.protocol.commands.ReconcileContainerCommand; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import java.util.UUID; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; + +class TestReconcileContainerTask { + private DNContainerOperationClient mockClient; + private ContainerController mockController; + + @BeforeEach + public void init() { + mockClient = mock(DNContainerOperationClient.class); + mockController = mock(ContainerController.class); + } + + @Test + public void testFailedTaskStatus() throws Exception { + doThrow(IOException.class).when(mockController).reconcileContainer(any(), anyLong(), any()); + ReconcileContainerTask task = new ReconcileContainerTask(mockController, mockClient, + new ReconcileContainerCommand(1, Collections.emptySet())); + + assertEquals(AbstractReplicationTask.Status.QUEUED, task.getStatus()); + task.runTask(); + assertEquals(AbstractReplicationTask.Status.FAILED, task.getStatus()); + } + + @Test + public void testSuccessfulTaskStatus() { + ReconcileContainerTask task = new ReconcileContainerTask(mockController, mockClient, + new ReconcileContainerCommand(1, Collections.emptySet())); + + assertEquals(AbstractReplicationTask.Status.QUEUED, task.getStatus()); + task.runTask(); + assertEquals(AbstractReplicationTask.Status.DONE, task.getStatus()); + } + + @Test + public void testEqualityWhenContainerIDsMatch() { + final long containerID = 1; + final UUID dnID1 = UUID.randomUUID(); + + Set peerSet1 = new HashSet<>(); + peerSet1.add(buildDn(dnID1)); + Set peerSet1Other = new HashSet<>(); + peerSet1Other.add(buildDn(dnID1)); + Set peerSet2 = new HashSet<>(); + peerSet2.add(buildDn()); + + ReconcileContainerTask peerSet1Task = new ReconcileContainerTask(mockController, mockClient, + new ReconcileContainerCommand(containerID, peerSet1)); + ReconcileContainerTask otherPeerSet1Task = new ReconcileContainerTask(mockController, mockClient, + new ReconcileContainerCommand(containerID, peerSet1Other)); + ReconcileContainerTask peerSet2Task = new ReconcileContainerTask(mockController, mockClient, + new ReconcileContainerCommand(containerID, peerSet2)); + + // Same container ID and peers. + assertEquals(peerSet1Task, otherPeerSet1Task); + // Same container ID, different peers. + assertNotEquals(peerSet1Task, peerSet2Task); + } + + @Test + public void testEqualityWhenContainerIDsDifferent() { + Set peerSet = new HashSet<>(); + peerSet.add(buildDn()); + + ReconcileContainerTask id1Task = new ReconcileContainerTask(mockController, mockClient, + new ReconcileContainerCommand(1, peerSet)); + ReconcileContainerTask id2Task = new ReconcileContainerTask(mockController, mockClient, + new ReconcileContainerCommand(2, peerSet)); + ReconcileContainerTask id2NoPeersTask = new ReconcileContainerTask(mockController, mockClient, + new ReconcileContainerCommand(2, Collections.emptySet())); + + // Different container ID, same peers. + assertNotEquals(id1Task, id2Task); + // Different container ID, different peers. + assertNotEquals(id1Task, id2NoPeersTask); + } + + private DatanodeDetails buildDn(UUID id) { + return DatanodeDetails.newBuilder() + .setUuid(id) + .build(); + } + + private DatanodeDetails buildDn() { + return DatanodeDetails.newBuilder() + .setUuid(UUID.randomUUID()) + .build(); + } +} diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ContainerTestUtils.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ContainerTestUtils.java index b5b578554b19..d4c00df38ba1 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ContainerTestUtils.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ContainerTestUtils.java @@ -17,6 +17,7 @@ package org.apache.hadoop.ozone.container.common; +import com.google.common.collect.Maps; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.hdds.conf.ConfigurationSource; @@ -27,6 +28,7 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerType; +import org.apache.hadoop.hdds.security.token.TokenVerifier; import org.apache.hadoop.hdds.utils.LegacyHadoopConfigurationSource; import org.apache.hadoop.hdfs.util.Canceler; import org.apache.hadoop.hdfs.util.DataTransferThrottler; @@ -36,9 +38,12 @@ import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.container.ContainerTestHelper; +import org.apache.hadoop.ozone.container.checksum.ContainerChecksumTreeManager; +import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics; import org.apache.hadoop.ozone.container.common.impl.ContainerData; import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion; import org.apache.hadoop.ozone.container.common.impl.ContainerSet; +import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher; import org.apache.hadoop.ozone.container.common.interfaces.Container; import org.apache.hadoop.ozone.container.common.interfaces.Container.ScanResult; import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; @@ -54,8 +59,10 @@ import org.apache.hadoop.ozone.container.common.volume.HddsVolume; import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet; import org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy; +import org.apache.hadoop.ozone.container.common.volume.VolumeSet; import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer; import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; +import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler; import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil; import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController; import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; @@ -73,6 +80,7 @@ import java.util.UUID; import java.util.concurrent.atomic.AtomicLong; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.Mockito.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -179,6 +187,46 @@ public static KeyValueContainer getContainer(long containerId, return new KeyValueContainer(kvData, new OzoneConfiguration()); } + /** + * Constructs an instance of KeyValueHandler that can be used for testing. + * This instance can be used for tests that do not need an ICR sender or {@link ContainerChecksumTreeManager}. + */ + public static KeyValueHandler getKeyValueHandler(ConfigurationSource config, + String datanodeId, ContainerSet contSet, VolumeSet volSet, ContainerMetrics metrics) { + return new KeyValueHandler(config, datanodeId, contSet, volSet, metrics, c -> { }, + new ContainerChecksumTreeManager(config)); + } + + /** + * Constructs an instance of KeyValueHandler that can be used for testing. + * This instance can be used for tests that do not need an ICR sender, metrics, or a + * {@link ContainerChecksumTreeManager}. + */ + public static KeyValueHandler getKeyValueHandler(ConfigurationSource config, + String datanodeId, ContainerSet contSet, VolumeSet volSet) { + return getKeyValueHandler(config, datanodeId, contSet, volSet, ContainerMetrics.create(config)); + } + + public static HddsDispatcher getHddsDispatcher(OzoneConfiguration conf, + ContainerSet contSet, + VolumeSet volSet, + StateContext context) { + return getHddsDispatcher(conf, contSet, volSet, context, null); + } + + public static HddsDispatcher getHddsDispatcher(OzoneConfiguration conf, + ContainerSet contSet, + VolumeSet volSet, + StateContext context, TokenVerifier verifier) { + ContainerMetrics metrics = ContainerMetrics.create(conf); + Map handlers = Maps.newHashMap(); + handlers.put(ContainerType.KeyValueContainer, ContainerTestUtils.getKeyValueHandler(conf, + context.getParent().getDatanodeDetails().getUuidString(), contSet, volSet, metrics)); + assertEquals(1, ContainerType.values().length, "Tests only cover KeyValueContainer type"); + return new HddsDispatcher( + conf, contSet, volSet, handlers, context, metrics, verifier); + } + public static void enableSchemaV3(OzoneConfiguration conf) { DatanodeConfiguration dc = conf.getObject(DatanodeConfiguration.class); dc.setContainerSchemaV3Enabled(true); diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java index ab313d0ce66a..fd0c81fb023b 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java @@ -481,9 +481,7 @@ public void testPendingDeleteBlockReset(ContainerTestVersionInfo versionInfo) // runs so we can trigger it manually. ContainerMetrics metrics = ContainerMetrics.create(conf); KeyValueHandler keyValueHandler = - new KeyValueHandler(conf, datanodeUuid, containerSet, volumeSet, - metrics, c -> { - }); + ContainerTestUtils.getKeyValueHandler(conf, datanodeUuid, containerSet, volumeSet, metrics); OzoneContainer ozoneContainer = mockDependencies(containerSet, keyValueHandler); BlockDeletingService svc = new BlockDeletingService(ozoneContainer, @@ -550,9 +548,7 @@ public void testBlockDeletion(ContainerTestVersionInfo versionInfo) createToDeleteBlocks(containerSet, 1, 3, 1); ContainerMetrics metrics = ContainerMetrics.create(conf); KeyValueHandler keyValueHandler = - new KeyValueHandler(conf, datanodeUuid, containerSet, volumeSet, - metrics, c -> { - }); + ContainerTestUtils.getKeyValueHandler(conf, datanodeUuid, containerSet, volumeSet, metrics); BlockDeletingServiceTestImpl svc = getBlockDeletingService(containerSet, conf, keyValueHandler); svc.start(); @@ -684,9 +680,7 @@ public void testWithUnrecordedBlocks(ContainerTestVersionInfo versionInfo) ContainerMetrics metrics = ContainerMetrics.create(conf); KeyValueHandler keyValueHandler = - new KeyValueHandler(conf, datanodeUuid, containerSet, volumeSet, - metrics, c -> { - }); + ContainerTestUtils.getKeyValueHandler(conf, datanodeUuid, containerSet, volumeSet, metrics); BlockDeletingServiceTestImpl svc = getBlockDeletingService(containerSet, conf, keyValueHandler); svc.start(); @@ -792,9 +786,7 @@ public void testShutdownService(ContainerTestVersionInfo versionInfo) createToDeleteBlocks(containerSet, 1, 100, 1); ContainerMetrics metrics = ContainerMetrics.create(conf); KeyValueHandler keyValueHandler = - new KeyValueHandler(conf, datanodeUuid, containerSet, volumeSet, - metrics, c -> { - }); + ContainerTestUtils.getKeyValueHandler(conf, datanodeUuid, containerSet, volumeSet, metrics); BlockDeletingServiceTestImpl service = getBlockDeletingService(containerSet, conf, keyValueHandler); service.start(); @@ -822,9 +814,7 @@ public void testBlockDeletionTimeout(ContainerTestVersionInfo versionInfo) createToDeleteBlocks(containerSet, 1, 3, 1); ContainerMetrics metrics = ContainerMetrics.create(conf); KeyValueHandler keyValueHandler = - new KeyValueHandler(conf, datanodeUuid, containerSet, volumeSet, - metrics, c -> { - }); + ContainerTestUtils.getKeyValueHandler(conf, datanodeUuid, containerSet, volumeSet, metrics); // set timeout value as 1ns to trigger timeout behavior long timeout = 1; OzoneContainer ozoneContainer = @@ -929,9 +919,7 @@ public void testContainerThrottle(ContainerTestVersionInfo versionInfo) chunksPerBlock); ContainerMetrics metrics = ContainerMetrics.create(conf); KeyValueHandler keyValueHandler = - new KeyValueHandler(conf, datanodeUuid, containerSet, volumeSet, - metrics, c -> { - }); + ContainerTestUtils.getKeyValueHandler(conf, datanodeUuid, containerSet, volumeSet, metrics); BlockDeletingServiceTestImpl service = getBlockDeletingService(containerSet, conf, keyValueHandler); service.start(); @@ -988,9 +976,7 @@ public void testContainerMaxLockHoldingTime( createToDeleteBlocks(containerSet, containerCount, blocksPerContainer, chunksPerBlock); KeyValueHandler keyValueHandler = - new KeyValueHandler(conf, datanodeUuid, containerSet, volumeSet, - ContainerMetrics.create(conf), c -> { - }); + ContainerTestUtils.getKeyValueHandler(conf, datanodeUuid, containerSet, volumeSet); BlockDeletingServiceTestImpl service = getBlockDeletingService(containerSet, conf, keyValueHandler); service.start(); @@ -1047,9 +1033,7 @@ public void testBlockThrottle(ContainerTestVersionInfo versionInfo) ContainerSet containerSet = new ContainerSet(1000); ContainerMetrics metrics = ContainerMetrics.create(conf); KeyValueHandler keyValueHandler = - new KeyValueHandler(conf, datanodeUuid, containerSet, volumeSet, - metrics, c -> { - }); + ContainerTestUtils.getKeyValueHandler(conf, datanodeUuid, containerSet, volumeSet, metrics); int containerCount = 5; int blocksPerContainer = 3; createToDeleteBlocks(containerSet, containerCount, @@ -1119,7 +1103,7 @@ public void testChecksumFileUpdatedWhenDeleteRetried(ContainerTestVersionInfo ve ContainerSet containerSet = new ContainerSet(1000); KeyValueContainerData contData = createToDeleteBlocks(containerSet, numBlocks, 4); KeyValueHandler keyValueHandler = - new KeyValueHandler(conf, datanodeUuid, containerSet, volumeSet, ContainerMetrics.create(conf), c -> { }); + ContainerTestUtils.getKeyValueHandler(conf, datanodeUuid, containerSet, volumeSet); BlockDeletingServiceTestImpl svc = getBlockDeletingService(containerSet, conf, keyValueHandler); svc.start(); diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestSchemaOneBackwardsCompatibility.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestSchemaOneBackwardsCompatibility.java index ad5ca482189b..77746041270f 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestSchemaOneBackwardsCompatibility.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestSchemaOneBackwardsCompatibility.java @@ -28,7 +28,6 @@ import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.container.common.helpers.BlockData; import org.apache.hadoop.ozone.container.common.helpers.ChunkInfoList; -import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics; import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml; import org.apache.hadoop.ozone.container.common.impl.ContainerSet; import org.apache.hadoop.ozone.container.common.interfaces.BlockIterator; @@ -279,11 +278,8 @@ public void testDelete(String schemaVersion) throws Exception { ContainerSet containerSet = makeContainerSet(); VolumeSet volumeSet = new MutableVolumeSet(datanodeUuid, conf, null, StorageVolume.VolumeType.DATA_VOLUME, null); - ContainerMetrics metrics = ContainerMetrics.create(conf); KeyValueHandler keyValueHandler = - new KeyValueHandler(conf, datanodeUuid, containerSet, volumeSet, - metrics, c -> { - }); + ContainerTestUtils.getKeyValueHandler(conf, datanodeUuid, containerSet, volumeSet); long initialTotalSpace = newKvData().getBytesUsed(); long blockSpace = initialTotalSpace / TestDB.KEY_COUNT; @@ -352,11 +348,8 @@ public void testReadDeletedBlockChunkInfo(String schemaVersion) ContainerSet containerSet = makeContainerSet(); VolumeSet volumeSet = new MutableVolumeSet(datanodeUuid, conf, null, StorageVolume.VolumeType.DATA_VOLUME, null); - ContainerMetrics metrics = ContainerMetrics.create(conf); KeyValueHandler keyValueHandler = - new KeyValueHandler(conf, datanodeUuid, containerSet, volumeSet, - metrics, c -> { - }); + ContainerTestUtils.getKeyValueHandler(conf, datanodeUuid, containerSet, volumeSet); KeyValueContainerData cData = newKvData(); try (DBHandle refCountedDB = BlockUtils.getDB(cData, conf)) { // Read blocks that were already deleted before the upgrade. diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestSchemaTwoBackwardsCompatibility.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestSchemaTwoBackwardsCompatibility.java index 0c4612b79fa2..da0d2384ab61 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestSchemaTwoBackwardsCompatibility.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestSchemaTwoBackwardsCompatibility.java @@ -32,7 +32,6 @@ import org.apache.hadoop.ozone.container.ContainerTestHelper; import org.apache.hadoop.ozone.container.common.helpers.BlockData; import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; -import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics; import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion; import org.apache.hadoop.ozone.container.common.impl.ContainerSet; import org.apache.hadoop.ozone.container.common.interfaces.BlockIterator; @@ -135,8 +134,7 @@ public void setup() throws Exception { chunkManager = new FilePerBlockStrategy(true, blockManager, volumeSet); containerSet = new ContainerSet(1000); - keyValueHandler = new KeyValueHandler(conf, datanodeUuid, - containerSet, volumeSet, ContainerMetrics.create(conf), c -> { }); + keyValueHandler = ContainerTestUtils.getKeyValueHandler(conf, datanodeUuid, containerSet, volumeSet); ozoneContainer = mock(OzoneContainer.class); when(ozoneContainer.getContainerSet()).thenReturn(containerSet); when(ozoneContainer.getWriteChannel()).thenReturn(null); diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java index 3ff8f9e625d6..993179d1b79c 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java @@ -49,6 +49,7 @@ import org.apache.hadoop.ozone.common.ChecksumData; import org.apache.hadoop.ozone.common.ChunkBuffer; import org.apache.hadoop.ozone.container.ContainerTestHelper; +import org.apache.hadoop.ozone.container.checksum.ContainerChecksumTreeManager; import org.apache.hadoop.ozone.container.common.helpers.BlockData; import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics; @@ -311,7 +312,7 @@ public void testDeleteNonEmptyContainer(ContainerTestVersionInfo versionInfo) KeyValueHandler kvHandler = new KeyValueHandler(conf, datanodeId, containerSet, volumeSet, metrics, - c -> icrReceived.incrementAndGet()); + c -> icrReceived.incrementAndGet(), new ContainerChecksumTreeManager(conf)); Exception exception = assertThrows( StorageContainerException.class, diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java index 1cbd6ee4706d..05bebdd1b902 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java @@ -18,7 +18,6 @@ package org.apache.hadoop.ozone.container.common.impl; -import com.google.common.collect.Maps; import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.lang3.RandomUtils; import org.apache.hadoop.conf.StorageUnit; @@ -34,7 +33,6 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProtoOrBuilder; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerType; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.WriteChunkRequestProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerAction; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; @@ -48,8 +46,6 @@ import org.apache.hadoop.ozone.container.common.ContainerTestUtils; import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics; import org.apache.hadoop.ozone.container.common.interfaces.Container; -import org.apache.hadoop.ozone.container.common.interfaces.Handler; -import org.apache.hadoop.ozone.container.common.report.IncrementalReportSender; import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration; import org.apache.hadoop.ozone.container.common.statemachine.StateContext; import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext; @@ -78,7 +74,6 @@ import java.time.Duration; import java.util.Collections; import java.util.HashMap; -import java.util.Map; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -113,10 +108,6 @@ public class TestHddsDispatcher { @TempDir private File testDir; - public static final IncrementalReportSender NO_OP_ICR_SENDER = - c -> { - }; - @ContainerLayoutTestInfo.ContainerTest public void testContainerCloseActionWhenFull( ContainerLayoutVersion layout) throws IOException { @@ -143,16 +134,7 @@ public void testContainerCloseActionWhenFull( container.create(volumeSet, new RoundRobinVolumeChoosingPolicy(), scmId.toString()); containerSet.addContainer(container); - ContainerMetrics metrics = ContainerMetrics.create(conf); - Map handlers = Maps.newHashMap(); - for (ContainerType containerType : ContainerType.values()) { - handlers.put(containerType, - Handler.getHandlerForContainerType(containerType, conf, - context.getParent().getDatanodeDetails().getUuidString(), - containerSet, volumeSet, metrics, NO_OP_ICR_SENDER)); - } - HddsDispatcher hddsDispatcher = new HddsDispatcher( - conf, containerSet, volumeSet, handlers, context, metrics, null); + HddsDispatcher hddsDispatcher = ContainerTestUtils.getHddsDispatcher(conf, containerSet, volumeSet, context); hddsDispatcher.setClusterId(scmId.toString()); ContainerCommandResponseProto responseOne = hddsDispatcher .dispatch(getWriteChunkRequest(dd.getUuidString(), 1L, 1L), null); @@ -279,16 +261,7 @@ public void testContainerCloseActionWhenVolumeFull( container.create(volumeSet, new RoundRobinVolumeChoosingPolicy(), scmId.toString()); containerSet.addContainer(container); - ContainerMetrics metrics = ContainerMetrics.create(conf); - Map handlers = Maps.newHashMap(); - for (ContainerType containerType : ContainerType.values()) { - handlers.put(containerType, - Handler.getHandlerForContainerType(containerType, conf, - context.getParent().getDatanodeDetails().getUuidString(), - containerSet, volumeSet, metrics, NO_OP_ICR_SENDER)); - } - HddsDispatcher hddsDispatcher = new HddsDispatcher( - conf, containerSet, volumeSet, handlers, context, metrics, null); + HddsDispatcher hddsDispatcher = ContainerTestUtils.getHddsDispatcher(conf, containerSet, volumeSet, context); hddsDispatcher.setClusterId(scmId.toString()); containerData.getVolume().getVolumeInfo() .ifPresent(volumeInfo -> volumeInfo.incrementUsedSpace(50)); @@ -528,17 +501,8 @@ static HddsDispatcher createDispatcher(DatanodeDetails dd, UUID scmId, } }); StateContext context = ContainerTestUtils.getMockContext(dd, conf); - ContainerMetrics metrics = ContainerMetrics.create(conf); - Map handlers = Maps.newHashMap(); - for (ContainerType containerType : ContainerType.values()) { - handlers.put(containerType, - Handler.getHandlerForContainerType(containerType, conf, - context.getParent().getDatanodeDetails().getUuidString(), - containerSet, volumeSet, metrics, NO_OP_ICR_SENDER)); - } - - final HddsDispatcher hddsDispatcher = new HddsDispatcher(conf, - containerSet, volumeSet, handlers, context, metrics, tokenVerifier); + final HddsDispatcher hddsDispatcher = + ContainerTestUtils.getHddsDispatcher(conf, containerSet, volumeSet, context, tokenVerifier); hddsDispatcher.setClusterId(scmId.toString()); return hddsDispatcher; } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/interfaces/TestHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/interfaces/TestHandler.java index 8f2ad307e823..27257d5a0e14 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/interfaces/TestHandler.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/interfaces/TestHandler.java @@ -18,8 +18,6 @@ package org.apache.hadoop.ozone.container.common.interfaces; -import java.util.Map; - import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; @@ -27,13 +25,11 @@ import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics; import org.apache.hadoop.ozone.container.common.impl.ContainerSet; import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher; -import org.apache.hadoop.ozone.container.common.impl.TestHddsDispatcher; import org.apache.hadoop.ozone.container.common.statemachine.StateContext; import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet; import org.apache.hadoop.ozone.container.common.volume.VolumeSet; import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler; -import com.google.common.collect.Maps; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -62,19 +58,7 @@ public void setup() throws Exception { DatanodeDetails datanodeDetails = mock(DatanodeDetails.class); StateContext context = ContainerTestUtils.getMockContext( datanodeDetails, conf); - ContainerMetrics metrics = ContainerMetrics.create(conf); - Map handlers = Maps.newHashMap(); - for (ContainerProtos.ContainerType containerType : - ContainerProtos.ContainerType.values()) { - handlers.put(containerType, - Handler.getHandlerForContainerType( - containerType, conf, - context.getParent().getDatanodeDetails().getUuidString(), - containerSet, volumeSet, metrics, - TestHddsDispatcher.NO_OP_ICR_SENDER)); - } - this.dispatcher = new HddsDispatcher( - conf, containerSet, volumeSet, handlers, null, metrics, null); + this.dispatcher = ContainerTestUtils.getHddsDispatcher(conf, containerSet, volumeSet, context); } @AfterEach diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java index 6933400fba1d..fd4614335e3d 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java @@ -705,7 +705,7 @@ public void testCommandQueueSummary() throws IOException { ctx.addCommand(ReplicateContainerCommand.forTest(3)); ctx.addCommand(new ClosePipelineCommand(PipelineID.randomId())); ctx.addCommand(new CloseContainerCommand(1, PipelineID.randomId())); - ctx.addCommand(new ReconcileContainerCommand(4, Collections.emptyList())); + ctx.addCommand(new ReconcileContainerCommand(4, Collections.emptySet())); Map summary = ctx.getCommandQueueSummary(); assertEquals(3, diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReconcileContainerCommandHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReconcileContainerCommandHandler.java index d6be667f41bc..f27ed097d2f7 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReconcileContainerCommandHandler.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReconcileContainerCommandHandler.java @@ -24,6 +24,9 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.pipeline.PipelineID; +import org.apache.hadoop.ozone.container.checksum.ContainerChecksumTreeManager; +import org.apache.hadoop.ozone.container.checksum.DNContainerOperationClient; +import org.apache.hadoop.ozone.container.checksum.ReconcileContainerTask; import org.apache.hadoop.ozone.container.common.ContainerTestUtils; import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics; import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion; @@ -39,16 +42,14 @@ import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler; import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController; import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; +import org.apache.hadoop.ozone.container.replication.ReplicationSupervisor; import org.apache.hadoop.ozone.protocol.commands.ReconcileContainerCommand; -import org.apache.ozone.test.GenericTestUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Collections; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; import static java.util.Collections.singletonMap; import static org.apache.hadoop.hdds.protocol.MockDatanodeDetails.randomDatanodeDetails; @@ -56,7 +57,8 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -78,7 +80,14 @@ public void init(ContainerLayoutVersion layout, IncrementalReportSender { + ((ReconcileContainerTask)invocation.getArguments()[0]).runTask(); + return null; + }).when(mockSupervisor).addTask(any()); + + subject = new ReconcileContainerCommandHandler(mockSupervisor, mock(DNContainerOperationClient.class)); context = ContainerTestUtils.getMockContext(dnDetails, conf); containerSet = new ContainerSet(1000); @@ -91,7 +100,7 @@ public void init(ContainerLayoutVersion layout, IncrementalReportSender { }); - IncrementalReportSender icrSender = c -> { - try { - // Block the caller until the latch is counted down. - // Caller can check queue metrics in the meantime. - LOG.info("ICR sender waiting for latch"); - assertTrue(icrLatch.await(30, TimeUnit.SECONDS)); - LOG.info("ICR sender proceeding after latch"); - - Thread.sleep(minExecTimeMillis); - } catch (Exception ex) { - LOG.error("ICR sender failed", ex); - } - }; - - init(layout, icrSender); + assertEquals(0, subject.getInvocationCount()); // All commands submitted will be blocked until the latch is counted down. for (int id = 1; id <= NUM_CONTAINERS; id++) { - ReconcileContainerCommand cmd = new ReconcileContainerCommand(id, Collections.emptyList()); + ReconcileContainerCommand cmd = new ReconcileContainerCommand(id, Collections.emptySet()); subject.handle(cmd, ozoneContainer, context, null); } - assertEquals(NUM_CONTAINERS, subject.getQueuedCount()); - assertEquals(0, subject.getTotalRunTime()); - assertEquals(0, subject.getAverageRunTime()); - - // This will resume handling of the tasks. - icrLatch.countDown(); - waitForAllCommandsToFinish(); - assertEquals(NUM_CONTAINERS, subject.getInvocationCount()); - long totalRunTime = subject.getTotalRunTime(); - assertTrue(totalRunTime >= expectedTotalMinExecTimeMillis, - "Total run time " + totalRunTime + "ms was not larger than the minimum total exec time " + - expectedTotalMinExecTimeMillis + "ms"); - long avgRunTime = subject.getAverageRunTime(); - assertTrue(avgRunTime >= minExecTimeMillis, - "Average run time " + avgRunTime + "ms was not larger than the minimum per task exec time " + - minExecTimeMillis + "ms"); - } - - private void waitForAllCommandsToFinish() throws Exception { - // Queue count should be decremented only after the task completes, so the other metrics should be consistent when - // it reaches zero. - GenericTestUtils.waitFor(() -> { - int qCount = subject.getQueuedCount(); - LOG.info("Waiting for queued command count to reach 0. Currently at " + qCount); - return qCount == 0; - }, 500, 3000); } - private void verifyAllContainerReports(Map reportsSent) throws Exception { + private void verifyAllContainerReports(Map reportsSent) { assertEquals(NUM_CONTAINERS, reportsSent.size()); for (Map.Entry entry: reportsSent.entrySet()) { diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java index 6245489f13b9..49b109b91396 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java @@ -33,9 +33,11 @@ import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.OptionalLong; +import java.util.Set; import java.util.UUID; import com.google.protobuf.Proto2Utils; @@ -118,7 +120,7 @@ public void testHandlesReconcileContainerCommand() throws Exception { StorageContainerDatanodeProtocolClientSideTranslatorPB scm = mock(StorageContainerDatanodeProtocolClientSideTranslatorPB.class); - List peerDNs = new ArrayList<>(); + Set peerDNs = new HashSet<>(); peerDNs.add(MockDatanodeDetails.randomDatanodeDetails()); peerDNs.add(MockDatanodeDetails.randomDatanodeDetails()); ReconcileContainerCommand cmd = new ReconcileContainerCommand(1, peerDNs); diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java index 4527ee6d5140..8e5f7f01e789 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java @@ -41,6 +41,8 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.hdds.security.token.TokenVerifier; +import org.apache.hadoop.ozone.container.checksum.ContainerChecksumTreeManager; +import org.apache.hadoop.ozone.container.checksum.DNContainerOperationClient; import org.apache.hadoop.ozone.container.common.ContainerTestUtils; import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics; import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion; @@ -48,6 +50,7 @@ import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher; import org.apache.hadoop.ozone.container.common.interfaces.Container; import org.apache.hadoop.ozone.container.common.interfaces.Handler; +import org.apache.hadoop.ozone.container.common.report.IncrementalReportSender; import org.apache.hadoop.ozone.container.common.statemachine.StateContext; import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil; import org.apache.hadoop.ozone.container.common.volume.HddsVolume; @@ -275,16 +278,11 @@ public void testVolumeSetInKeyValueHandler() throws Exception { null, StorageVolume.VolumeType.DATA_VOLUME, null); try { ContainerSet cset = new ContainerSet(1000); - int[] interval = new int[1]; - interval[0] = 2; - ContainerMetrics metrics = new ContainerMetrics(interval); DatanodeDetails datanodeDetails = mock(DatanodeDetails.class); StateContext context = ContainerTestUtils.getMockContext( datanodeDetails, conf); - KeyValueHandler keyValueHandler = new KeyValueHandler(conf, - context.getParent().getDatanodeDetails().getUuidString(), cset, - volumeSet, metrics, c -> { - }); + KeyValueHandler keyValueHandler = ContainerTestUtils.getKeyValueHandler(conf, + context.getParent().getDatanodeDetails().getUuidString(), cset, volumeSet); assertEquals("org.apache.hadoop.ozone.container.common" + ".volume.CapacityVolumeChoosingPolicy", keyValueHandler.getVolumeChoosingPolicyForTesting() @@ -294,8 +292,8 @@ public void testVolumeSetInKeyValueHandler() throws Exception { conf.set(HDDS_DATANODE_VOLUME_CHOOSING_POLICY, "org.apache.hadoop.ozone.container.common.impl.HddsDispatcher"); RuntimeException exception = assertThrows(RuntimeException.class, - () -> new KeyValueHandler(conf, context.getParent().getDatanodeDetails().getUuidString(), cset, volumeSet, - metrics, c -> { })); + () -> ContainerTestUtils.getKeyValueHandler(conf, context.getParent().getDatanodeDetails().getUuidString(), + cset, volumeSet)); assertThat(exception).hasMessageEndingWith( "class org.apache.hadoop.ozone.container.common.impl.HddsDispatcher " + @@ -385,7 +383,7 @@ public void testDeleteContainer() throws IOException { final KeyValueHandler kvHandler = new KeyValueHandler(conf, datanodeId, containerSet, volumeSet, metrics, - c -> icrReceived.incrementAndGet()); + c -> icrReceived.incrementAndGet(), new ContainerChecksumTreeManager(conf)); kvHandler.setClusterID(clusterId); final ContainerCommandRequestProto createContainer = @@ -459,8 +457,7 @@ public void testReconcileContainer(ContainerLayoutVersion layoutVersion) throws // Allows checking the invocation count of the lambda. AtomicInteger icrCount = new AtomicInteger(0); - KeyValueHandler keyValueHandler = new KeyValueHandler(conf, randomDatanodeDetails().getUuidString(), containerSet, - mock(MutableVolumeSet.class), mock(ContainerMetrics.class), c -> { + IncrementalReportSender icrSender = c -> { // Check that the ICR contains expected info about the container. ContainerReplicaProto report = c.getContainerReport(); long reportedID = report.getContainerID(); @@ -470,11 +467,14 @@ public void testReconcileContainer(ContainerLayoutVersion layoutVersion) throws Assertions.assertNotEquals(0, reportDataChecksum, "Container report should have populated the checksum field with a non-zero value."); icrCount.incrementAndGet(); - }); + }; + + KeyValueHandler keyValueHandler = new KeyValueHandler(conf, randomDatanodeDetails().getUuidString(), containerSet, + mock(MutableVolumeSet.class), mock(ContainerMetrics.class), icrSender, new ContainerChecksumTreeManager(conf)); Assertions.assertEquals(0, icrCount.get()); // This should trigger container report validation in the ICR handler above. - keyValueHandler.reconcileContainer(container, Collections.emptyList()); + keyValueHandler.reconcileContainer(mock(DNContainerOperationClient.class), container, Collections.emptySet()); Assertions.assertEquals(1, icrCount.get()); } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandlerWithUnhealthyContainer.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandlerWithUnhealthyContainer.java index f0c8a2077eac..820799321859 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandlerWithUnhealthyContainer.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandlerWithUnhealthyContainer.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.scm.pipeline.MockPipeline; import org.apache.hadoop.ozone.ClientVersion; +import org.apache.hadoop.ozone.container.checksum.ContainerChecksumTreeManager; import org.apache.hadoop.ozone.container.common.ContainerTestUtils; import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics; import org.apache.hadoop.ozone.container.common.impl.ContainerSet; @@ -254,7 +255,7 @@ private KeyValueHandler getDummyHandler() { stateMachine.getDatanodeDetails().getUuidString(), mock(ContainerSet.class), mock(MutableVolumeSet.class), - mock(ContainerMetrics.class), mockIcrSender); + mock(ContainerMetrics.class), mockIcrSender, mock(ContainerChecksumTreeManager.class)); } private KeyValueContainer getMockUnhealthyContainer() { diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestGrpcReplicationService.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestGrpcReplicationService.java index 03901b99be3b..b8c43460ba38 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestGrpcReplicationService.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestGrpcReplicationService.java @@ -25,7 +25,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.security.SecurityConfig; import org.apache.hadoop.ozone.OzoneConfigKeys; -import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics; +import org.apache.hadoop.ozone.container.common.ContainerTestUtils; import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion; import org.apache.hadoop.ozone.container.common.impl.ContainerSet; import org.apache.hadoop.ozone.container.common.interfaces.Handler; @@ -35,7 +35,6 @@ import org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy; import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer; import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; -import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler; import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController; import org.apache.ratis.thirdparty.io.grpc.stub.CallStreamObserver; import org.junit.jupiter.api.AfterEach; @@ -129,11 +128,8 @@ public void init(boolean isZeroCopy) throws Exception { when(volumeSet.getVolumesList()).thenReturn(Collections.singletonList( new HddsVolume.Builder(testDir).conf(conf).build())); - ContainerMetrics metrics = ContainerMetrics.create(conf); Handler containerHandler = - new KeyValueHandler(conf, datanode.getUuidString(), containerSet, - volumeSet, metrics, c -> { - }); + ContainerTestUtils.getKeyValueHandler(conf, datanode.getUuidString(), containerSet, volumeSet); containerController = new ContainerController(containerSet, Collections.singletonMap( diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/reconciliation/ReconcileContainerEventHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/reconciliation/ReconcileContainerEventHandler.java index f13b37f3ee23..8e8b5bf71c4d 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/reconciliation/ReconcileContainerEventHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/reconciliation/ReconcileContainerEventHandler.java @@ -32,7 +32,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.List; import java.util.Set; import java.util.stream.Collectors; @@ -78,9 +77,9 @@ public void onMessage(ContainerID containerID, EventPublisher publisher) { LOG.info("Reconcile container event triggered for container {} with peers {}", containerID, allReplicaNodes); for (DatanodeDetails replica : allReplicaNodes) { - List otherReplicas = allReplicaNodes.stream() + Set otherReplicas = allReplicaNodes.stream() .filter(other -> !other.equals(replica)) - .collect(Collectors.toList()); + .collect(Collectors.toSet()); ReconcileContainerCommand command = new ReconcileContainerCommand(containerID.getId(), otherReplicas); command.setTerm(scmContext.getTermOfLeader()); publisher.fireEvent(DATANODE_COMMAND, new CommandForDatanode<>(replica.getUuid(), command)); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/metrics/TestContainerMetrics.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/metrics/TestContainerMetrics.java index 068cb01a9671..2d1b1a4bb09c 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/metrics/TestContainerMetrics.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/metrics/TestContainerMetrics.java @@ -21,7 +21,6 @@ import java.io.IOException; import java.nio.file.Path; import java.util.List; -import java.util.Map; import java.util.UUID; import org.apache.commons.io.FileUtils; @@ -48,7 +47,6 @@ import org.apache.hadoop.ozone.container.common.impl.ContainerSet; import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher; import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; -import org.apache.hadoop.ozone.container.common.interfaces.Handler; import org.apache.hadoop.ozone.container.common.statemachine.StateContext; import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerGrpc; import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerSpi; @@ -148,18 +146,7 @@ private HddsDispatcher createDispatcher(DatanodeDetails dd, VolumeSet volumeSet) ContainerSet containerSet = new ContainerSet(1000); StateContext context = ContainerTestUtils.getMockContext( dd, CONF); - ContainerMetrics metrics = ContainerMetrics.create(CONF); - Map handlers = Maps.newHashMap(); - for (ContainerProtos.ContainerType containerType : - ContainerProtos.ContainerType.values()) { - handlers.put(containerType, - Handler.getHandlerForContainerType(containerType, CONF, - context.getParent().getDatanodeDetails().getUuidString(), - containerSet, volumeSet, metrics, - c -> { })); - } - HddsDispatcher dispatcher = new HddsDispatcher(CONF, containerSet, - volumeSet, handlers, context, metrics, null); + HddsDispatcher dispatcher = ContainerTestUtils.getHddsDispatcher(CONF, containerSet, volumeSet, context); StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList()) .forEach(hddsVolume -> hddsVolume.setDbParentDir(tempDir.toFile())); dispatcher.setClusterId(UUID.randomUUID().toString()); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java index 630c4d314959..ab95467a7196 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java @@ -48,7 +48,6 @@ import org.apache.hadoop.ozone.RatisTestHelper; import org.apache.hadoop.ozone.container.ContainerTestHelper; import org.apache.hadoop.ozone.container.common.ContainerTestUtils; -import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics; import org.apache.hadoop.ozone.container.common.impl.ContainerSet; import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher; import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; @@ -200,19 +199,7 @@ private HddsDispatcher createDispatcher(DatanodeDetails dd, UUID scmId, StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList()) .forEach(hddsVolume -> hddsVolume.setDbParentDir(tempDir.toFile())); StateContext context = ContainerTestUtils.getMockContext(dd, conf); - ContainerMetrics metrics = ContainerMetrics.create(conf); - Map handlers = Maps.newHashMap(); - for (ContainerProtos.ContainerType containerType : - ContainerProtos.ContainerType.values()) { - handlers.put(containerType, - Handler.getHandlerForContainerType(containerType, conf, - dd.getUuid().toString(), - containerSet, volumeSet, metrics, - c -> { - })); - } - HddsDispatcher hddsDispatcher = new HddsDispatcher( - conf, containerSet, volumeSet, handlers, context, metrics, null); + HddsDispatcher hddsDispatcher = ContainerTestUtils.getHddsDispatcher(conf, containerSet, volumeSet, context); hddsDispatcher.setClusterId(scmId.toString()); return hddsDispatcher; } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestSecureContainerServer.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestSecureContainerServer.java index 8044685bb747..47be4daf90ae 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestSecureContainerServer.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestSecureContainerServer.java @@ -25,7 +25,6 @@ import java.util.ArrayList; import java.util.EnumSet; import java.util.List; -import java.util.Map; import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; @@ -57,11 +56,9 @@ import org.apache.hadoop.ozone.RatisTestHelper; import org.apache.hadoop.ozone.client.SecretKeyTestClient; import org.apache.hadoop.ozone.container.common.ContainerTestUtils; -import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics; import org.apache.hadoop.ozone.container.common.impl.ContainerSet; import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher; import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; -import org.apache.hadoop.ozone.container.common.interfaces.Handler; import org.apache.hadoop.ozone.container.common.statemachine.StateContext; import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerGrpc; import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerSpi; @@ -183,18 +180,7 @@ private HddsDispatcher createDispatcher(DatanodeDetails dd, UUID scmId, StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList()) .forEach(hddsVolume -> hddsVolume.setDbParentDir(tempDir.toFile())); StateContext context = ContainerTestUtils.getMockContext(dd, conf); - ContainerMetrics metrics = ContainerMetrics.create(conf); - Map handlers = Maps.newHashMap(); - for (ContainerProtos.ContainerType containerType : - ContainerProtos.ContainerType.values()) { - handlers.put(containerType, - Handler.getHandlerForContainerType(containerType, conf, - dd.getUuid().toString(), - containerSet, volumeSet, metrics, - c -> { })); - } - HddsDispatcher hddsDispatcher = new HddsDispatcher( - conf, containerSet, volumeSet, handlers, context, metrics, + HddsDispatcher hddsDispatcher = ContainerTestUtils.getHddsDispatcher(conf, containerSet, volumeSet, context, TokenVerifier.create(new SecurityConfig(conf), secretKeyClient)); hddsDispatcher.setClusterId(scmId.toString()); return hddsDispatcher; diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/container/ContainerCommands.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/container/ContainerCommands.java index 5592926bf883..a0aba2a1b156 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/container/ContainerCommands.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/container/ContainerCommands.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hdfs.server.datanode.StorageLocation; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.common.InconsistentStorageStateException; +import org.apache.hadoop.ozone.container.checksum.ContainerChecksumTreeManager; import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics; import org.apache.hadoop.ozone.container.common.helpers.DatanodeVersionFile; import org.apache.hadoop.ozone.container.common.impl.ContainerData; @@ -151,7 +152,9 @@ public void loadContainersFromVolumes() throws IOException { volumeSet, metrics, containerReplicaProto -> { - }); + }, + // Since this is an Ozone debug CLI, this instance is not part of a running datanode. + new ContainerChecksumTreeManager(conf)); handler.setClusterID(clusterId); handlers.put(containerType, handler); } diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java index d471c13462f7..0c525457aac6 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; import org.apache.hadoop.hdds.utils.HddsServerUtil; import org.apache.hadoop.hdfs.server.datanode.StorageLocation; +import org.apache.hadoop.ozone.container.checksum.ContainerChecksumTreeManager; import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics; import org.apache.hadoop.ozone.container.common.impl.ContainerSet; import org.apache.hadoop.ozone.container.common.interfaces.Handler; @@ -193,7 +194,9 @@ private void initializeReplicationSupervisor( volumeSet, metrics, containerReplicaProto -> { - }); + }, + // Since this a Freon tool, this instance is not part of a running datanode. + new ContainerChecksumTreeManager(conf)); handler.setClusterID(UUID.randomUUID().toString()); handlers.put(containerType, handler); }