diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java index 4867a2aa69a7..c9fe4ec67971 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java @@ -112,6 +112,8 @@ public class BlockInputStream extends BlockExtendedInputStream { private final Function refreshFunction; + private BlockData blockData; + public BlockInputStream( BlockLocationInfo blockInfo, Pipeline pipeline, @@ -153,7 +155,6 @@ public synchronized void initialize() throws IOException { return; } - BlockData blockData = null; List chunks = null; IOException catchEx = null; do { @@ -554,8 +555,7 @@ public long getLength() { return length; } - @VisibleForTesting - synchronized int getChunkIndex() { + public synchronized int getChunkIndex() { return chunkIndex; } @@ -618,9 +618,12 @@ private void handleReadError(IOException cause) throws IOException { refreshBlockInfo(cause); } - @VisibleForTesting public synchronized List getChunkStreams() { return chunkStreams; } + public BlockData getStreamBlockData() { + return blockData; + } + } diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java index 23c96fc7d6a7..efad9ff76cec 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java @@ -747,4 +747,8 @@ public synchronized void unbuffer() { public ByteBuffer[] getCachedBuffers() { return BufferUtils.getReadOnlyByteBuffers(buffers); } + + public ChunkInfo getChunkInfo() { + return chunkInfo; + } } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java index 0b2928410c99..cce329f91e5b 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java @@ -882,6 +882,11 @@ public static HddsProtos.UUID toProtobuf(UUID uuid) { : null; } + /** @return Hex string representation of {@code value} */ + public static String checksumToString(long value) { + return Long.toHexString(value); + } + /** * Logs a warning to report that the class is not closed properly. */ diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/ReplicationConfig.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/ReplicationConfig.java index 4bd470971633..20ddf555bcd6 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/ReplicationConfig.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/ReplicationConfig.java @@ -112,6 +112,7 @@ static HddsProtos.ReplicationFactor getLegacyFactor( return ((ReplicatedReplicationConfig) replicationConfig) .getReplicationFactor(); } + throw new UnsupportedOperationException( "Replication configuration of type " + replicationConfig.getReplicationType() diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplicaInfo.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplicaInfo.java index b2884e3fb033..24bc4d6d32cb 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplicaInfo.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplicaInfo.java @@ -17,6 +17,8 @@ package org.apache.hadoop.hdds.scm.container; +import static org.apache.hadoop.hdds.HddsUtils.checksumToString; + import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.databind.JsonSerializer; import com.fasterxml.jackson.databind.SerializerProvider; @@ -26,7 +28,6 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; - /** * Class which stores ContainerReplica details on the client. */ @@ -102,7 +103,7 @@ public long getDataChecksum() { private static class LongToHexJsonSerializer extends JsonSerializer { @Override public void serialize(Long value, JsonGenerator gen, SerializerProvider provider) throws IOException { - gen.writeString(Long.toHexString(value)); + gen.writeString(checksumToString(value)); } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ContainerChecksumTreeManager.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ContainerChecksumTreeManager.java index c247380fbe3f..99b5800c450c 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ContainerChecksumTreeManager.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ContainerChecksumTreeManager.java @@ -82,7 +82,9 @@ public void stop() { * file remains unchanged. * Concurrent writes to the same file are coordinated internally. */ - public void writeContainerDataTree(ContainerData data, ContainerMerkleTreeWriter tree) throws IOException { + public ContainerProtos.ContainerChecksumInfo writeContainerDataTree(ContainerData data, + ContainerMerkleTreeWriter tree) + throws IOException { long containerID = data.getContainerID(); Lock writeLock = getLock(containerID); writeLock.lock(); @@ -98,11 +100,13 @@ public void writeContainerDataTree(ContainerData data, ContainerMerkleTreeWriter checksumInfoBuilder = ContainerProtos.ContainerChecksumInfo.newBuilder(); } - checksumInfoBuilder + ContainerProtos.ContainerChecksumInfo checksumInfo = checksumInfoBuilder .setContainerID(containerID) - .setContainerMerkleTree(captureLatencyNs(metrics.getCreateMerkleTreeLatencyNS(), tree::toProto)); - write(data, checksumInfoBuilder.build()); + .setContainerMerkleTree(captureLatencyNs(metrics.getCreateMerkleTreeLatencyNS(), tree::toProto)) + .build(); + write(data, checksumInfo); LOG.debug("Data merkle tree for container {} updated", containerID); + return checksumInfo; } finally { writeLock.unlock(); } @@ -146,33 +150,32 @@ public void markBlocksAsDeleted(KeyValueContainerData data, Collection del } } - public ContainerDiffReport diff(KeyValueContainerData thisContainer, + /** + * Compares the checksum info of the container with the peer's checksum info and returns a report of the differences. + * @param thisChecksumInfo The checksum info of the container on this datanode. + * @param peerChecksumInfo The checksum info of the container on the peer datanode. + */ + public ContainerDiffReport diff(ContainerProtos.ContainerChecksumInfo thisChecksumInfo, ContainerProtos.ContainerChecksumInfo peerChecksumInfo) throws StorageContainerException { ContainerDiffReport report = new ContainerDiffReport(); try { captureLatencyNs(metrics.getMerkleTreeDiffLatencyNS(), () -> { - Preconditions.assertNotNull(thisContainer, "Container data is null"); - Preconditions.assertNotNull(peerChecksumInfo, "Peer checksum info is null"); - Optional thisContainerChecksumInfo = read(thisContainer); - if (!thisContainerChecksumInfo.isPresent()) { - throw new StorageContainerException("The container #" + thisContainer.getContainerID() + - " doesn't have container checksum", ContainerProtos.Result.IO_EXCEPTION); - } - - if (thisContainer.getContainerID() != peerChecksumInfo.getContainerID()) { - throw new StorageContainerException("Container Id does not match for container " - + thisContainer.getContainerID(), ContainerProtos.Result.CONTAINER_ID_MISMATCH); + Preconditions.assertNotNull(thisChecksumInfo, "Datanode's checksum info is null."); + Preconditions.assertNotNull(peerChecksumInfo, "Peer checksum info is null."); + if (thisChecksumInfo.getContainerID() != peerChecksumInfo.getContainerID()) { + throw new StorageContainerException("Container ID does not match. Local container ID " + + thisChecksumInfo.getContainerID() + " , Peer container ID " + peerChecksumInfo.getContainerID(), + ContainerProtos.Result.CONTAINER_ID_MISMATCH); } - ContainerProtos.ContainerChecksumInfo thisChecksumInfo = thisContainerChecksumInfo.get(); compareContainerMerkleTree(thisChecksumInfo, peerChecksumInfo, report); }); } catch (IOException ex) { metrics.incrementMerkleTreeDiffFailures(); - throw new StorageContainerException("Container Diff failed for container #" + thisContainer.getContainerID(), ex, - ContainerProtos.Result.IO_EXCEPTION); + throw new StorageContainerException("Container Diff failed for container #" + thisChecksumInfo.getContainerID(), + ex, ContainerProtos.Result.IO_EXCEPTION); } // Update Container Diff metrics based on the diff report. @@ -314,7 +317,7 @@ private Lock getLock(long containerID) { * Callers are not required to hold a lock while calling this since writes are done to a tmp file and atomically * swapped into place. */ - private Optional read(ContainerData data) throws IOException { + public Optional read(ContainerData data) throws IOException { long containerID = data.getContainerID(); File checksumFile = getContainerChecksumFile(data); try { @@ -361,6 +364,8 @@ private void write(ContainerData data, ContainerProtos.ContainerChecksumInfo che throw new IOException("Error occurred when writing container merkle tree for containerID " + data.getContainerID(), ex); } + // Set in-memory data checksum. + data.setDataChecksum(checksumInfo.getContainerMerkleTree().getDataChecksum()); } /** diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/DNContainerOperationClient.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/DNContainerOperationClient.java index 31c397ce0931..d5ba243dd12a 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/DNContainerOperationClient.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/DNContainerOperationClient.java @@ -84,6 +84,10 @@ public XceiverClientManager getXceiverClientManager() { return xceiverClientManager; } + public TokenHelper getTokenHelper() { + return tokenHelper; + } + /** * Reads {@link ContainerProtos.ContainerChecksumInfo} for a specified container for the specified datanode. * diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerMetrics.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerMetrics.java index af7662a131e3..652a90e9783c 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerMetrics.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerMetrics.java @@ -56,6 +56,8 @@ public class ContainerMetrics implements Closeable { @Metric private MutableCounterLong containerForceDelete; @Metric private MutableCounterLong numReadStateMachine; @Metric private MutableCounterLong bytesReadStateMachine; + @Metric private MutableCounterLong numContainerReconciledWithoutChanges; + @Metric private MutableCounterLong numContainerReconciledWithChanges; private final EnumMap numOpsArray; @@ -172,4 +174,12 @@ public void incBytesReadStateMachine(long bytes) { public long getBytesReadStateMachine() { return bytesReadStateMachine.value(); } + + public void incContainerReconciledWithoutChanges() { + numContainerReconciledWithoutChanges.incr(); + } + + public void incContainerReconciledWithChanges() { + numContainerReconciledWithChanges.incr(); + } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/ContainerLogger.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/ContainerLogger.java index 4aacc7c2de06..6f20f22a8bb3 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/ContainerLogger.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/ContainerLogger.java @@ -17,7 +17,10 @@ package org.apache.hadoop.ozone.container.common.utils; +import static org.apache.hadoop.hdds.HddsUtils.checksumToString; + import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.ozone.container.common.impl.ContainerData; import org.apache.hadoop.ozone.container.common.interfaces.ScanResult; import org.apache.logging.log4j.LogManager; @@ -145,6 +148,23 @@ public static void logRecovered(ContainerData containerData) { LOG.info(getMessage(containerData)); } + /** + * Logged when a container is reconciled. + * + * @param containerData The container that was reconciled on this datanode. + * @param oldDataChecksum The old data checksum. + */ + public static void logReconciled(ContainerData containerData, long oldDataChecksum, DatanodeDetails peer) { + if (containerData.getDataChecksum() == oldDataChecksum) { + LOG.info(getMessage(containerData, "Container reconciled with peer " + peer.toString() + + ". No change in checksum.")); + } else { + LOG.warn(getMessage(containerData, "Container reconciled with peer " + peer.toString() + + ". Checksum updated from " + checksumToString(oldDataChecksum) + " to " + + checksumToString(containerData.getDataChecksum()))); + } + } + private static String getMessage(ContainerData containerData, String message) { return String.join(FIELD_SEPARATOR, getMessage(containerData), message); @@ -155,6 +175,7 @@ private static String getMessage(ContainerData containerData) { "ID=" + containerData.getContainerID(), "Index=" + containerData.getReplicaIndex(), "BCSID=" + containerData.getBlockCommitSequenceId(), - "State=" + containerData.getState()); + "State=" + containerData.getState(), + "DataChecksum=" + checksumToString(containerData.getDataChecksum())); } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java index d4bfa79142e2..43926ca5e282 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java @@ -17,10 +17,12 @@ package org.apache.hadoop.ozone.container.keyvalue; +import static org.apache.hadoop.hdds.HddsUtils.checksumToString; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.CLOSED; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.QUASI_CLOSED; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.RECOVERING; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.UNHEALTHY; +import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CHUNK_FILE_INCONSISTENCY; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CLOSED_CONTAINER_IO; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_ALREADY_EXISTS; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_INTERNAL_ERROR; @@ -34,6 +36,8 @@ import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.PUT_SMALL_FILE_ERROR; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.UNCLOSED_CONTAINER_IO; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.UNSUPPORTED_REQUEST; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_DEFAULT; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY; import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getBlockDataResponse; import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getBlockLengthResponse; import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getEchoResponse; @@ -52,6 +56,7 @@ import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.unsupportedRequest; import static org.apache.hadoop.hdds.scm.utils.ClientCommandsUtils.getReadChunkVersion; import static org.apache.hadoop.ozone.OzoneConsts.INCREMENTAL_CHUNK_LIST; +import static org.apache.hadoop.ozone.container.checksum.DNContainerOperationClient.createSingleNodePipeline; import static org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion.DEFAULT_LAYOUT; import com.google.common.annotations.VisibleForTesting; @@ -67,17 +72,23 @@ import java.nio.file.Files; import java.nio.file.Path; import java.time.Clock; +import java.time.Instant; import java.util.ArrayList; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; import java.util.concurrent.locks.Lock; import java.util.function.Function; +import java.util.stream.Collectors; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.hdds.HddsUtils; import org.apache.hadoop.hdds.client.BlockID; +import org.apache.hadoop.hdds.client.RatisReplicationConfig; import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.conf.StorageUnit; @@ -91,21 +102,29 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.PutSmallFileRequestProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.WriteChunkRequestProto; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.ByteStringConversion; +import org.apache.hadoop.hdds.scm.OzoneClientConfig; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.hdds.scm.storage.BlockInputStream; +import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo; +import org.apache.hadoop.hdds.scm.storage.ChunkInputStream; +import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier; import org.apache.hadoop.hdds.upgrade.HDDSLayoutFeature; import org.apache.hadoop.hdds.utils.FaultInjector; import org.apache.hadoop.hdds.utils.HddsServerUtil; import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.client.io.BlockInputStreamFactoryImpl; import org.apache.hadoop.ozone.common.Checksum; -import org.apache.hadoop.ozone.common.ChecksumByteBuffer; -import org.apache.hadoop.ozone.common.ChecksumByteBufferFactory; import org.apache.hadoop.ozone.common.ChunkBuffer; import org.apache.hadoop.ozone.common.ChunkBufferToByteString; import org.apache.hadoop.ozone.common.OzoneChecksumException; import org.apache.hadoop.ozone.common.utils.BufferUtils; import org.apache.hadoop.ozone.container.checksum.ContainerChecksumTreeManager; +import org.apache.hadoop.ozone.container.checksum.ContainerDiffReport; import org.apache.hadoop.ozone.container.checksum.ContainerMerkleTreeWriter; import org.apache.hadoop.ozone.container.checksum.DNContainerOperationClient; import org.apache.hadoop.ozone.container.common.helpers.BlockData; @@ -136,7 +155,9 @@ import org.apache.hadoop.ozone.container.keyvalue.impl.ChunkManagerFactory; import org.apache.hadoop.ozone.container.keyvalue.interfaces.BlockManager; import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager; +import org.apache.hadoop.ozone.container.ozoneimpl.OnDemandContainerDataScanner; import org.apache.hadoop.ozone.container.upgrade.VersionedDatanodeFeatures; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.Time; import org.apache.ratis.statemachine.StateMachine; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; @@ -158,11 +179,13 @@ public class KeyValueHandler extends Handler { private final long maxDeleteLockWaitMs; private final Function byteBufferToByteString; private final boolean validateChunkChecksumData; + private final int chunkSize; // A striped lock that is held during container creation. private final Striped containerCreationLocks; private final ContainerChecksumTreeManager checksumManager; private static FaultInjector injector; private final Clock clock; + private final BlockInputStreamFactoryImpl blockInputStreamFactory; public KeyValueHandler(ConfigurationSource config, String datanodeId, @@ -224,6 +247,9 @@ public KeyValueHandler(ConfigurationSource config, ByteStringConversion .createByteBufferConversion(isUnsafeByteBufferConversionEnabled); + blockInputStreamFactory = new BlockInputStreamFactoryImpl(); + chunkSize = (int) conf.getStorageSize(OZONE_SCM_CHUNK_SIZE_KEY, OZONE_SCM_CHUNK_SIZE_DEFAULT, StorageUnit.BYTES); + if (ContainerLayoutVersion.getConfiguredVersion(conf) == ContainerLayoutVersion.FILE_PER_CHUNK) { LOG.warn("FILE_PER_CHUNK layout is not supported. Falling back to default : {}.", @@ -600,24 +626,20 @@ ContainerCommandResponseProto handleCloseContainer( return getSuccessResponse(request); } - private void createContainerMerkleTree(Container container) { + + /** + * Create a Merkle tree for the container if it does not exist. + * TODO: This method should be changed to private after HDDS-10374 is merged. + */ + @VisibleForTesting + public void createContainerMerkleTree(Container container) { if (ContainerChecksumTreeManager.checksumFileExist(container)) { return; } try { KeyValueContainerData containerData = (KeyValueContainerData) container.getContainerData(); - ContainerMerkleTreeWriter merkleTree = new ContainerMerkleTreeWriter(); - try (DBHandle dbHandle = BlockUtils.getDB(containerData, conf); - BlockIterator blockIterator = dbHandle.getStore(). - getBlockIterator(containerData.getContainerID())) { - while (blockIterator.hasNext()) { - BlockData blockData = blockIterator.nextBlock(); - List chunkInfos = blockData.getChunks(); - merkleTree.addChunks(blockData.getLocalID(), chunkInfos); - } - } - checksumManager.writeContainerDataTree(containerData, merkleTree); + updateAndGetContainerChecksum(containerData); } catch (IOException ex) { LOG.error("Cannot create container checksum for container {} , Exception: ", container.getContainerData().getContainerID(), ex); @@ -1483,21 +1505,297 @@ public void deleteContainer(Container container, boolean force) @Override public void reconcileContainer(DNContainerOperationClient dnClient, Container container, Set peers) throws IOException { - // TODO Just a deterministic placeholder hash for testing until actual implementation is finished. - ContainerData data = container.getContainerData(); - long id = data.getContainerID(); - ByteBuffer byteBuffer = ByteBuffer.allocate(Long.BYTES) - .putLong(id) - .asReadOnlyBuffer(); - byteBuffer.rewind(); - ChecksumByteBuffer checksumImpl = ChecksumByteBufferFactory.crc32CImpl(); - checksumImpl.update(byteBuffer); - long dataChecksum = checksumImpl.getValue(); - LOG.info("Generated data checksum of container {} for testing: {}", id, dataChecksum); - data.setDataChecksum(dataChecksum); + KeyValueContainer kvContainer = (KeyValueContainer) container; + KeyValueContainerData containerData = (KeyValueContainerData) container.getContainerData(); + Optional optionalChecksumInfo = checksumManager.read(containerData); + ContainerProtos.ContainerChecksumInfo checksumInfo; + + if (optionalChecksumInfo.isPresent()) { + checksumInfo = optionalChecksumInfo.get(); + } else { + // Try creating the checksum info from RocksDB metadata if it is not present. + checksumInfo = updateAndGetContainerChecksum(containerData); + } + long oldDataChecksum = checksumInfo.getContainerMerkleTree().getDataChecksum(); + + for (DatanodeDetails peer : peers) { + long start = Instant.now().toEpochMilli(); + ContainerProtos.ContainerChecksumInfo peerChecksumInfo = dnClient.getContainerChecksumInfo( + containerData.getContainerID(), peer); + if (peerChecksumInfo == null) { + LOG.warn("Cannot reconcile container {} with peer {} which has not yet generated a checksum", + containerData.getContainerID(), peer); + continue; + } + + ContainerDiffReport diffReport = checksumManager.diff(checksumInfo, peerChecksumInfo); + Pipeline pipeline = createSingleNodePipeline(peer); + ByteBuffer chunkByteBuffer = ByteBuffer.allocate(chunkSize); + + // Handle missing blocks + for (ContainerProtos.BlockMerkleTree missingBlock : diffReport.getMissingBlocks()) { + try { + handleMissingBlock(kvContainer, pipeline, dnClient, missingBlock, chunkByteBuffer); + } catch (IOException e) { + LOG.error("Error while reconciling missing block for block {} in container {}", missingBlock.getBlockID(), + containerData.getContainerID(), e); + } + } + + // Handle missing chunks + for (Map.Entry> entry : diffReport.getMissingChunks().entrySet()) { + try { + reconcileChunksPerBlock(kvContainer, pipeline, dnClient, entry.getKey(), entry.getValue(), chunkByteBuffer); + } catch (IOException e) { + LOG.error("Error while reconciling missing chunk for block {} in container {}", entry.getKey(), + containerData.getContainerID(), e); + } + } + + // Handle corrupt chunks + for (Map.Entry> entry : diffReport.getCorruptChunks().entrySet()) { + try { + reconcileChunksPerBlock(kvContainer, pipeline, dnClient, entry.getKey(), entry.getValue(), chunkByteBuffer); + } catch (IOException e) { + LOG.error("Error while reconciling corrupt chunk for block {} in container {}", entry.getKey(), + containerData.getContainerID(), e); + } + } + // Update checksum based on RocksDB metadata. The read chunk validates the checksum of the data + // we read. So we can update the checksum only based on the RocksDB metadata. + ContainerProtos.ContainerChecksumInfo updatedChecksumInfo = updateAndGetContainerChecksum(containerData); + long dataChecksum = updatedChecksumInfo.getContainerMerkleTree().getDataChecksum(); + + long duration = Instant.now().toEpochMilli() - start; + if (dataChecksum == oldDataChecksum) { + metrics.incContainerReconciledWithoutChanges(); + LOG.info("Container {} reconciled with peer {}. No change in checksum. Current checksum {}. Time taken {} ms", + containerData.getContainerID(), peer.toString(), checksumToString(dataChecksum), duration); + } else { + metrics.incContainerReconciledWithChanges(); + LOG.warn("Container {} reconciled with peer {}. Checksum updated from {} to {}. Time taken {} ms", + containerData.getContainerID(), peer.toString(), checksumToString(oldDataChecksum), + checksumToString(dataChecksum), duration); + } + ContainerLogger.logReconciled(container.getContainerData(), oldDataChecksum, peer); + } + + // Trigger manual on demand scanner + OnDemandContainerDataScanner.scanContainer(container); sendICR(container); } + /** + * Updates the container merkle tree based on the RocksDb's block metadata and returns the updated checksum info. + * @param containerData - Container data for which the container merkle tree needs to be updated. + */ + private ContainerProtos.ContainerChecksumInfo updateAndGetContainerChecksum(KeyValueContainerData containerData) + throws IOException { + ContainerMerkleTreeWriter merkleTree = new ContainerMerkleTreeWriter(); + try (DBHandle dbHandle = BlockUtils.getDB(containerData, conf); + BlockIterator blockIterator = dbHandle.getStore(). + getBlockIterator(containerData.getContainerID())) { + while (blockIterator.hasNext()) { + BlockData blockData = blockIterator.nextBlock(); + List chunkInfos = blockData.getChunks(); + // TODO: Add empty blocks to the merkle tree. Done in HDDS-10374, needs to be backported. + merkleTree.addChunks(blockData.getLocalID(), chunkInfos); + } + } + ContainerProtos.ContainerChecksumInfo checksumInfo = checksumManager + .writeContainerDataTree(containerData, merkleTree); + return checksumInfo; + } + + /** + * Handle missing block. It reads the missing block data from the peer datanode and writes it to the local container. + * If the block write fails, the block commit sequence id of the container and the block are not updated. + */ + private void handleMissingBlock(KeyValueContainer container, Pipeline pipeline, DNContainerOperationClient dnClient, + ContainerProtos.BlockMerkleTree missingBlock, ByteBuffer chunkByteBuffer) + throws IOException { + ContainerData containerData = container.getContainerData(); + BlockID blockID = new BlockID(containerData.getContainerID(), missingBlock.getBlockID()); + // The length of the block is not known, so instead of passing the default block length we pass 0. As the length + // is not used to validate the token for getBlock call. + Token blockToken = dnClient.getTokenHelper().getBlockToken(blockID, 0L); + if (getBlockManager().blockExists(container, blockID)) { + LOG.warn("Block {} already exists in container {}. The block should not exist and our container merkle tree" + + " is stale. Skipping reconciliation for this block.", blockID, containerData.getContainerID()); + return; + } + + List successfulChunksList = new ArrayList<>(); + boolean overwriteBcsId = true; + + BlockLocationInfo blkInfo = new BlockLocationInfo.Builder() + .setBlockID(blockID) + .setPipeline(pipeline) + .setToken(blockToken) + .build(); + // Under construction is set here, during BlockInputStream#initialize() it is used to update the block length. + blkInfo.setUnderConstruction(true); + try (BlockInputStream blockInputStream = (BlockInputStream) blockInputStreamFactory.create( + RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.ONE), + blkInfo, pipeline, blockToken, dnClient.getXceiverClientManager(), + null, conf.getObject(OzoneClientConfig.class))) { + // Initialize the BlockInputStream. Gets the blockData from the peer, sets the block length and + // initializes ChunkInputStream for each chunk. + blockInputStream.initialize(); + ContainerProtos.BlockData peerBlockData = blockInputStream.getStreamBlockData(); + // The maxBcsId is the peer's bcsId as there is no block for this blockID in the local container. + long maxBcsId = peerBlockData.getBlockID().getBlockCommitSequenceId(); + List peerChunksList = peerBlockData.getChunksList(); + + // Don't update bcsId if chunk read fails + for (ContainerProtos.ChunkInfo chunkInfoProto : peerChunksList) { + try { + // Seek to the offset of the chunk. Seek updates the chunkIndex in the BlockInputStream. + blockInputStream.seek(chunkInfoProto.getOffset()); + + // Read the chunk data from the BlockInputStream and write it to the container. + int chunkLength = (int) chunkInfoProto.getLen(); + if (chunkByteBuffer.capacity() < chunkLength) { + chunkByteBuffer = ByteBuffer.allocate(chunkLength); + } + + chunkByteBuffer.clear(); + chunkByteBuffer.limit(chunkLength); + int bytesRead = blockInputStream.read(chunkByteBuffer); + if (bytesRead != chunkLength) { + throw new IOException("Error while reading chunk data from block input stream. Expected length: " + + chunkLength + ", Actual length: " + bytesRead); + } + + chunkByteBuffer.flip(); + ChunkBuffer chunkBuffer = ChunkBuffer.wrap(chunkByteBuffer); + ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(chunkInfoProto); + chunkInfo.addMetadata(OzoneConsts.CHUNK_OVERWRITE, "true"); + writeChunkForClosedContainer(chunkInfo, blockID, chunkBuffer, container); + // If the chunk read/write fails, we are expected to have holes in the blockData's chunk list. + // But that is okay, if the read fails it means there might be a hole in the peer datanode as well. + // If the chunk write fails then we don't want to add the metadata without the actual data as there is + // no data to verify the chunk checksum. + successfulChunksList.add(chunkInfoProto); + } catch (IOException ex) { + overwriteBcsId = false; + LOG.error("Error while reconciling missing block {} for offset {} in container {}", + blockID, chunkInfoProto.getOffset(), containerData.getContainerID(), ex); + } + } + + BlockData putBlockData = BlockData.getFromProtoBuf(peerBlockData); + putBlockData.setChunks(successfulChunksList); + putBlockForClosedContainer(container, putBlockData, maxBcsId, overwriteBcsId); + chunkManager.finishWriteChunks(container, putBlockData); + } + } + + /** + * This method reconciles chunks per block. It reads the missing/corrupt chunk data from the peer + * datanode and writes it to the local container. If the chunk write fails, the block commit sequence + * id is not updated. + */ + private void reconcileChunksPerBlock(KeyValueContainer container, Pipeline pipeline, + DNContainerOperationClient dnClient, long blockId, + List chunkList, ByteBuffer chunkByteBuffer) + throws IOException { + + ContainerData containerData = container.getContainerData(); + BlockID blockID = new BlockID(containerData.getContainerID(), blockId); + // The length of the block is not known, so instead of passing the default block length we pass 0. As the length + // is not used to validate the token for getBlock call. + Token blockToken = dnClient.getTokenHelper().getBlockToken(blockID, 0L); + BlockData localBlockData = getBlockManager().getBlock(container, blockID); + + SortedMap localChunksMap = localBlockData.getChunks().stream() + .collect(Collectors.toMap(ContainerProtos.ChunkInfo::getOffset, + Function.identity(), (chunk1, chunk2) -> chunk1, TreeMap::new)); + boolean overwriteBcsId = true; + + BlockLocationInfo blkInfo = new BlockLocationInfo.Builder() + .setBlockID(blockID) + .setPipeline(pipeline) + .setToken(blockToken) + .build(); + // Under construction is set here, during BlockInputStream#initialize() it is used to update the block length. + blkInfo.setUnderConstruction(true); + try (BlockInputStream blockInputStream = (BlockInputStream) blockInputStreamFactory.create( + RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.ONE), + blkInfo, pipeline, blockToken, dnClient.getXceiverClientManager(), + null, conf.getObject(OzoneClientConfig.class))) { + // Initialize the BlockInputStream. Gets the blockData from the peer, sets the block length and + // initializes ChunkInputStream for each chunk. + blockInputStream.initialize(); + ContainerProtos.BlockData peerBlockData = blockInputStream.getStreamBlockData(); + // Check the local bcsId with the one from the bcsId from the peer datanode. + long maxBcsId = Math.max(peerBlockData.getBlockID().getBlockCommitSequenceId(), + localBlockData.getBlockCommitSequenceId()); + + for (ContainerProtos.ChunkMerkleTree chunkMerkleTree : chunkList) { + long chunkOffset = chunkMerkleTree.getOffset(); + try { + // Seek to the offset of the chunk. Seek updates the chunkIndex in the BlockInputStream. + blockInputStream.seek(chunkOffset); + ChunkInputStream currentChunkStream = blockInputStream.getChunkStreams().get( + blockInputStream.getChunkIndex()); + ContainerProtos.ChunkInfo chunkInfoProto = currentChunkStream.getChunkInfo(); + ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(chunkInfoProto); + chunkInfo.addMetadata(OzoneConsts.CHUNK_OVERWRITE, "true"); + verifyChunksLength(chunkInfoProto, localChunksMap.get(chunkOffset)); + + // Read the chunk data from the BlockInputStream and write it to the container. + int chunkLength = (int) chunkInfoProto.getLen(); + if (chunkByteBuffer.capacity() < chunkLength) { + chunkByteBuffer = ByteBuffer.allocate(chunkLength); + } + + chunkByteBuffer.clear(); + chunkByteBuffer.limit(chunkLength); + int bytesRead = blockInputStream.read(chunkByteBuffer); + if (bytesRead != chunkLength) { + throw new IOException("Error while reading chunk data from block input stream. Expected length: " + + chunkLength + ", Actual length: " + bytesRead); + } + + chunkByteBuffer.flip(); + ChunkBuffer chunkBuffer = ChunkBuffer.wrap(chunkByteBuffer); + writeChunkForClosedContainer(chunkInfo, blockID, chunkBuffer, container); + // In reconciling missing chunks which happens at the end of the block, we are expected to have holes in + // the blockData's chunk list because we continue to reconcile even if there are failures while reconciling + // chunks which is fine as we don't update the bcsId. + localChunksMap.put(chunkInfo.getOffset(), chunkInfoProto); + } catch (IOException ex) { + overwriteBcsId = false; + LOG.error("Error while reconciling chunk {} for block {} in container {}", + chunkOffset, blockID, containerData.getContainerID(), ex); + } + } + + List localChunkList = new ArrayList<>(localChunksMap.values()); + localBlockData.setChunks(localChunkList); + putBlockForClosedContainer(container, localBlockData, maxBcsId, overwriteBcsId); + chunkManager.finishWriteChunks(container, localBlockData); + } + } + + private void verifyChunksLength(ContainerProtos.ChunkInfo peerChunkInfo, ContainerProtos.ChunkInfo localChunkInfo) + throws StorageContainerException { + if (localChunkInfo == null || peerChunkInfo == null) { + return; + } + + if (peerChunkInfo.getOffset() != localChunkInfo.getOffset()) { + throw new StorageContainerException("Offset mismatch for chunk. Expected: " + localChunkInfo.getOffset() + + ", Actual: " + peerChunkInfo.getOffset(), CHUNK_FILE_INCONSISTENCY); + } + + if (peerChunkInfo.getLen() != localChunkInfo.getLen()) { + throw new StorageContainerException("Length mismatch for chunk at offset " + localChunkInfo.getOffset() + + ". Expected: " + localChunkInfo.getLen() + ", Actual: " + peerChunkInfo.getLen(), CHUNK_FILE_INCONSISTENCY); + } + } + /** * Called by BlockDeletingService to delete all the chunks in a block * before proceeding to delete the block info from DB. diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java index ba3d40489777..20ab37cf1c99 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java @@ -112,8 +112,6 @@ public long putBlockForClosedContainer(Container container, BlockData data, bool // We are not locking the key manager since RocksDB serializes all actions // against a single DB. We rely on DB level locking to avoid conflicts. try (DBHandle db = BlockUtils.getDB(containerData, config)) { - // This is a post condition that acts as a hint to the user. - // Should never fail. Preconditions.checkNotNull(db, DB_NULL_ERR_MSG); long blockBcsID = data.getBlockCommitSequenceId(); @@ -441,6 +439,19 @@ public List listBlock(Container container, long startLocalID, int } } + @Override + public boolean blockExists(Container container, BlockID blockID) throws IOException { + KeyValueContainerData containerData = (KeyValueContainerData) container + .getContainerData(); + try (DBHandle db = BlockUtils.getDB(containerData, config)) { + // This is a post condition that acts as a hint to the user. + // Should never fail. + Preconditions.checkNotNull(db, DB_NULL_ERR_MSG); + String blockKey = containerData.getBlockKey(blockID.getLocalID()); + return db.getStore().getBlockDataTable().isExist(blockKey); + } + } + /** * Shutdown KeyValueContainerManager. */ diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/BlockManager.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/BlockManager.java index 10562f450b19..cf65bb819d65 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/BlockManager.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/BlockManager.java @@ -94,6 +94,15 @@ long putBlockForClosedContainer(Container container, BlockData data, boolean ove List listBlock(Container container, long startLocalID, int count) throws IOException; + /** + * Check if a block exists in the container. + * + * @param container - Container from which blocks need to be listed. + * @param blockID - BlockID of the Block. + * @return True if block exists, false otherwise. + */ + boolean blockExists(Container container, BlockID blockID) throws IOException; + /** * Returns last committed length of the block. * diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/checksum/ContainerMerkleTreeTestUtils.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/checksum/ContainerMerkleTreeTestUtils.java index 22559aa37813..811e4b483a25 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/checksum/ContainerMerkleTreeTestUtils.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/checksum/ContainerMerkleTreeTestUtils.java @@ -353,5 +353,6 @@ public static void writeContainerDataTreeProto(ContainerData data, ContainerProt throw new IOException("Error occurred when writing container merkle tree for containerID " + data.getContainerID(), ex); } + data.setDataChecksum(checksumInfo.getContainerMerkleTree().getDataChecksum()); } } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/checksum/TestContainerChecksumTreeManager.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/checksum/TestContainerChecksumTreeManager.java index e0e8930c9466..987ff7cf81f2 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/checksum/TestContainerChecksumTreeManager.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/checksum/TestContainerChecksumTreeManager.java @@ -39,6 +39,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.stream.Stream; import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.hdds.conf.ConfigurationSource; @@ -339,7 +340,8 @@ public void testContainerWithNoDiff() throws Exception { ContainerProtos.ContainerChecksumInfo peerChecksumInfo = ContainerProtos.ContainerChecksumInfo.newBuilder() .setContainerID(container.getContainerID()) .setContainerMerkleTree(peerMerkleTree.toProto()).build(); - ContainerDiffReport diff = checksumManager.diff(container, peerChecksumInfo); + Optional checksumInfo = checksumManager.read(container); + ContainerDiffReport diff = checksumManager.diff(checksumInfo.get(), peerChecksumInfo); assertTrue(checksumManager.getMetrics().getMerkleTreeDiffLatencyNS().lastStat().total() > 0); assertFalse(diff.needsRepair()); assertEquals(checksumManager.getMetrics().getNoRepairContainerDiffs(), 1); @@ -362,7 +364,8 @@ public void testContainerDiffWithMismatches(int numMissingBlock, int numMissingC ContainerProtos.ContainerChecksumInfo peerChecksumInfo = ContainerProtos.ContainerChecksumInfo.newBuilder() .setContainerID(container.getContainerID()) .setContainerMerkleTree(peerMerkleTree.toProto()).build(); - ContainerDiffReport diff = checksumManager.diff(container, peerChecksumInfo); + Optional checksumInfo = checksumManager.read(container); + ContainerDiffReport diff = checksumManager.diff(checksumInfo.get(), peerChecksumInfo); assertTrue(metrics.getMerkleTreeDiffLatencyNS().lastStat().total() > 0); assertContainerDiffMatch(expectedDiff, diff); assertEquals(checksumManager.getMetrics().getRepairContainerDiffs(), 1); @@ -385,15 +388,19 @@ public void testPeerWithMismatchesHasNoDiff(int numMissingBlock, int numMissingC ContainerProtos.ContainerChecksumInfo peerChecksumInfo = ContainerProtos.ContainerChecksumInfo.newBuilder() .setContainerID(container.getContainerID()) .setContainerMerkleTree(peerMerkleTree).build(); - ContainerDiffReport diff = checksumManager.diff(container, peerChecksumInfo); + Optional checksumInfo = checksumManager.read(container); + ContainerDiffReport diff = checksumManager.diff(checksumInfo.get(), peerChecksumInfo); assertFalse(diff.needsRepair()); assertEquals(checksumManager.getMetrics().getNoRepairContainerDiffs(), 1); } @Test - public void testFailureContainerMerkleTreeMetric() { + public void testFailureContainerMerkleTreeMetric() throws IOException { ContainerProtos.ContainerChecksumInfo peerChecksum = ContainerProtos.ContainerChecksumInfo.newBuilder().build(); - assertThrows(StorageContainerException.class, () -> checksumManager.diff(container, peerChecksum)); + ContainerMerkleTreeWriter ourMerkleTree = buildTestTree(config); + checksumManager.writeContainerDataTree(container, ourMerkleTree); + Optional checksumInfo = checksumManager.read(container); + assertThrows(StorageContainerException.class, () -> checksumManager.diff(checksumInfo.get(), peerChecksum)); assertEquals(checksumManager.getMetrics().getMerkleTreeDiffFailure(), 1); } @@ -413,7 +420,8 @@ void testDeletedBlocksInPeerAndBoth() throws Exception { .addAllDeletedBlocks(deletedBlockList).build(); writeContainerDataTreeProto(container, ourMerkleTree); - ContainerDiffReport containerDiff = checksumManager.diff(container, peerChecksumInfo); + Optional checksumInfo = checksumManager.read(container); + ContainerDiffReport containerDiff = checksumManager.diff(checksumInfo.get(), peerChecksumInfo); // The diff should not have any missing block/missing chunk/corrupt chunks as the blocks are deleted // in peer merkle tree. @@ -423,7 +431,8 @@ void testDeletedBlocksInPeerAndBoth() throws Exception { // Delete blocks in our merkle tree as well. checksumManager.markBlocksAsDeleted(container, deletedBlockList); - containerDiff = checksumManager.diff(container, peerChecksumInfo); + checksumInfo = checksumManager.read(container); + containerDiff = checksumManager.diff(checksumInfo.get(), peerChecksumInfo); // The diff should not have any missing block/missing chunk/corrupt chunks as the blocks are deleted // in both merkle tree. @@ -449,7 +458,8 @@ void testDeletedBlocksInOurContainerOnly() throws Exception { writeContainerDataTreeProto(container, ourMerkleTree); checksumManager.markBlocksAsDeleted(container, deletedBlockList); - ContainerDiffReport containerDiff = checksumManager.diff(container, peerChecksumInfo); + Optional checksumInfo = checksumManager.read(container); + ContainerDiffReport containerDiff = checksumManager.diff(checksumInfo.get(), peerChecksumInfo); // The diff should not have any missing block/missing chunk/corrupt chunks as the blocks are deleted // in our merkle tree. @@ -475,7 +485,8 @@ void testCorruptionInOurMerkleTreeAndDeletedBlocksInPeer() throws Exception { writeContainerDataTreeProto(container, ourMerkleTree); - ContainerDiffReport containerDiff = checksumManager.diff(container, peerChecksumInfo); + Optional checksumInfo = checksumManager.read(container); + ContainerDiffReport containerDiff = checksumManager.diff(checksumInfo.get(), peerChecksumInfo); // The diff should not have any missing block/missing chunk/corrupt chunks as the blocks are deleted // in peer merkle tree. @@ -499,8 +510,8 @@ void testContainerDiffWithBlockDeletionInPeer() throws Exception { writeContainerDataTreeProto(container, ourMerkleTree); ContainerProtos.ContainerChecksumInfo peerChecksumInfo = peerChecksumInfoBuilder.build(); - - ContainerDiffReport containerDiff = checksumManager.diff(container, peerChecksumInfo); + Optional checksumInfo = checksumManager.read(container); + ContainerDiffReport containerDiff = checksumManager.diff(checksumInfo.get(), peerChecksumInfo); // The diff should not have any missing block/missing chunk/corrupt chunks as the blocks are deleted // in peer merkle tree. assertFalse(containerDiff.getMissingBlocks().isEmpty()); @@ -512,7 +523,8 @@ void testContainerDiffWithBlockDeletionInPeer() throws Exception { // Clear deleted blocks to add them in missing blocks. peerChecksumInfo = peerChecksumInfoBuilder.clearDeletedBlocks().build(); - containerDiff = checksumManager.diff(container, peerChecksumInfo); + checksumInfo = checksumManager.read(container); + containerDiff = checksumManager.diff(checksumInfo.get(), peerChecksumInfo); assertFalse(containerDiff.getMissingBlocks().isEmpty()); // Missing block does not contain the deleted blocks 6L to 10L diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ContainerTestUtils.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ContainerTestUtils.java index aa771cb6c365..ab8929c18c8a 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ContainerTestUtils.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ContainerTestUtils.java @@ -22,21 +22,25 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; import java.io.File; import java.io.IOException; import java.net.InetSocketAddress; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.Random; import java.util.UUID; import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.StorageUnit; +import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.conf.ReconfigurationHandler; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto; @@ -53,6 +57,8 @@ import org.apache.hadoop.ozone.container.ContainerTestHelper; import org.apache.hadoop.ozone.container.checksum.ContainerChecksumTreeManager; import org.apache.hadoop.ozone.container.checksum.ContainerMerkleTreeWriter; +import org.apache.hadoop.ozone.container.common.helpers.BlockData; +import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics; import org.apache.hadoop.ozone.container.common.impl.ContainerData; import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion; @@ -60,6 +66,7 @@ import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher; import org.apache.hadoop.ozone.container.common.interfaces.Container; import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; +import org.apache.hadoop.ozone.container.common.interfaces.DBHandle; import org.apache.hadoop.ozone.container.common.interfaces.Handler; import org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy; import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration; @@ -76,6 +83,7 @@ import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer; import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler; +import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils; import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil; import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController; import org.apache.hadoop.ozone.container.ozoneimpl.ContainerScanError; @@ -419,4 +427,27 @@ public static XceiverServerRatis newXceiverServerRatis( getNoopContainerDispatcher(), getEmptyContainerController(), null, null); } + + /** + * Creates block metadata for the given container with the specified number of blocks and chunks per block. + */ + public static void createBlockMetaData(KeyValueContainerData data, int numOfBlocksPerContainer, + int numOfChunksPerBlock) throws IOException { + try (DBHandle metadata = BlockUtils.getDB(data, new OzoneConfiguration())) { + for (int j = 0; j < numOfBlocksPerContainer; j++) { + BlockID blockID = new BlockID(data.getContainerID(), j); + String blockKey = data.getBlockKey(blockID.getLocalID()); + BlockData kd = new BlockData(blockID); + List chunks = Lists.newArrayList(); + for (int k = 0; k < numOfChunksPerBlock; k++) { + long dataLen = 10L; + ChunkInfo chunkInfo = ContainerTestHelper.getChunk(blockID.getLocalID(), k, k * dataLen, dataLen); + ContainerTestHelper.setDataChecksum(chunkInfo, ContainerTestHelper.getData((int) dataLen)); + chunks.add(chunkInfo.getProtoBufMessage()); + } + kd.setChunks(chunks); + metadata.getStore().getBlockDataTable().put(blockKey, kd); + } + } + } } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReconcileContainerCommandHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReconcileContainerCommandHandler.java index 6dab87cbac2c..d7d01f9349af 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReconcileContainerCommandHandler.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReconcileContainerCommandHandler.java @@ -20,6 +20,7 @@ import static java.util.Collections.singletonMap; import static org.apache.hadoop.hdds.protocol.MockDatanodeDetails.randomDatanodeDetails; import static org.apache.hadoop.ozone.OzoneConsts.GB; +import static org.apache.hadoop.ozone.container.common.ContainerTestUtils.createBlockMetaData; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -28,6 +29,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import java.nio.file.Path; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -57,6 +59,7 @@ import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; import org.apache.hadoop.ozone.container.replication.ReplicationSupervisor; import org.apache.hadoop.ozone.protocol.commands.ReconcileContainerCommand; +import org.junit.jupiter.api.io.TempDir; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -73,6 +76,10 @@ public class TestReconcileContainerCommandHandler { private StateContext context; private ReconcileContainerCommandHandler subject; private ReplicationSupervisor mockSupervisor; + @TempDir + private Path tempDir; + @TempDir + private Path dbFile; public void init(ContainerLayoutVersion layout, IncrementalReportSender icrSender) throws Exception { @@ -93,6 +100,9 @@ public void init(ContainerLayoutVersion layout, IncrementalReportSender container, long blockID) { + public static File getBlock(Container container, long blockID) { File blockFile; File chunksDir = new File(container.getContainerData().getContainerPath(), "chunks"); diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java index 5e39d331922b..33f4faefb6b8 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java @@ -17,6 +17,7 @@ package org.apache.hadoop.ozone.container.keyvalue; +import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DATANODE_VOLUME_CHOOSING_POLICY; import static org.apache.hadoop.hdds.HddsConfigKeys.OZONE_METADATA_DIRS; import static org.apache.hadoop.hdds.protocol.MockDatanodeDetails.randomDatanodeDetails; @@ -26,53 +27,88 @@ import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_DATANODE_DIR_KEY; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CONTAINER_LAYOUT_KEY; import static org.apache.hadoop.ozone.OzoneConsts.GB; +import static org.apache.hadoop.ozone.container.checksum.ContainerChecksumTreeManager.getContainerChecksumFile; +import static org.apache.hadoop.ozone.container.checksum.ContainerMerkleTreeTestUtils.writeContainerDataTreeProto; +import static org.apache.hadoop.ozone.container.common.ContainerTestUtils.WRITE_STAGE; +import static org.apache.hadoop.ozone.container.common.ContainerTestUtils.createBlockMetaData; +import static org.apache.hadoop.ozone.container.common.ContainerTestUtils.createDbInstancesForTestIfNeeded; +import static org.apache.hadoop.ozone.container.keyvalue.TestContainerCorruptions.getBlock; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyMap; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.any; +import static org.mockito.Mockito.atMostOnce; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.reset; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import com.google.common.collect.ImmutableList; import java.io.File; import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.ByteBuffer; import java.nio.file.Files; import java.nio.file.Path; +import java.nio.file.StandardOpenOption; import java.time.Clock; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; +import java.util.HashSet; import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Random; import java.util.Set; import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Stream; import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.RandomStringUtils; import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.fs.FileUtil; -import org.apache.hadoop.hdds.conf.ConfigurationSource; +import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.MockDatanodeDetails; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerType; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto; +import org.apache.hadoop.hdds.scm.XceiverClientSpi; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.pipeline.PipelineID; +import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls; import org.apache.hadoop.hdds.security.token.TokenVerifier; +import org.apache.hadoop.hdds.utils.db.BatchOperation; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.common.Checksum; +import org.apache.hadoop.ozone.common.ChecksumData; import org.apache.hadoop.ozone.container.checksum.ContainerChecksumTreeManager; import org.apache.hadoop.ozone.container.checksum.DNContainerOperationClient; import org.apache.hadoop.ozone.container.common.ContainerTestUtils; +import org.apache.hadoop.ozone.container.common.helpers.BlockData; +import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics; import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion; import org.apache.hadoop.ozone.container.common.impl.ContainerSet; import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher; import org.apache.hadoop.ozone.container.common.interfaces.Container; +import org.apache.hadoop.ozone.container.common.interfaces.DBHandle; import org.apache.hadoop.ozone.container.common.interfaces.Handler; import org.apache.hadoop.ozone.container.common.report.IncrementalReportSender; import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration; @@ -80,15 +116,24 @@ import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil; import org.apache.hadoop.ozone.container.common.volume.HddsVolume; import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet; +import org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy; import org.apache.hadoop.ozone.container.common.volume.StorageVolume; import org.apache.hadoop.ozone.container.common.volume.VolumeSet; +import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils; +import org.apache.hadoop.util.Sets; import org.apache.ozone.test.GenericTestUtils; +import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.mockito.MockedStatic; import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; /** * Unit tests for {@link KeyValueHandler}. @@ -101,17 +146,43 @@ public class TestKeyValueHandler { @TempDir private Path dbFile; - private static final String DATANODE_UUID = UUID.randomUUID().toString(); - private static final long DUMMY_CONTAINER_ID = 9999; private static final String DUMMY_PATH = "dummy/dir/doesnt/exist"; + private static final int CHUNK_LEN = 3 * (int) OzoneConsts.KB; + private static final int CHUNKS_PER_BLOCK = 4; + private static final String DATANODE_UUID = UUID.randomUUID().toString(); + private static final String CLUSTER_ID = UUID.randomUUID().toString(); private HddsDispatcher dispatcher; private KeyValueHandler handler; + private OzoneConfiguration conf; + + /** + * Number of corrupt blocks and chunks. + */ + public static Stream corruptionValues() { + return Stream.of( + Arguments.of(5, 0), + Arguments.of(0, 5), + Arguments.of(0, 10), + Arguments.of(10, 0), + Arguments.of(5, 10), + Arguments.of(10, 5), + Arguments.of(2, 3), + Arguments.of(3, 2), + Arguments.of(4, 6), + Arguments.of(6, 4), + Arguments.of(6, 9), + Arguments.of(9, 6) + ); + } @BeforeEach - public void setup() throws StorageContainerException { + public void setup() throws IOException { // Create mock HddsDispatcher and KeyValueHandler. + conf = new OzoneConfiguration(); + conf.set(HDDS_DATANODE_DIR_KEY, tempDir.toString()); + conf.set(OZONE_METADATA_DIRS, tempDir.toString()); handler = mock(KeyValueHandler.class); HashMap handlers = new HashMap<>(); @@ -283,7 +354,7 @@ public void testVolumeSetInKeyValueHandler() throws Exception { File metadataDir = Files.createDirectory(tempDir.resolve("metadataDir")).toFile(); - OzoneConfiguration conf = new OzoneConfiguration(); + conf = new OzoneConfiguration(); conf.set(HDDS_DATANODE_DIR_KEY, datanodeDir.getAbsolutePath()); conf.set(OZONE_METADATA_DIRS, metadataDir.getAbsolutePath()); MutableVolumeSet @@ -337,8 +408,8 @@ private ContainerCommandRequestProto getDummyCommandRequestProto( @ContainerLayoutTestInfo.ContainerTest public void testCloseInvalidContainer(ContainerLayoutVersion layoutVersion) throws IOException { - KeyValueHandler keyValueHandler = createKeyValueHandler(); - OzoneConfiguration conf = new OzoneConfiguration(); + KeyValueHandler keyValueHandler = createKeyValueHandler(tempDir); + conf = new OzoneConfiguration(); KeyValueContainerData kvData = new KeyValueContainerData(DUMMY_CONTAINER_ID, layoutVersion, (long) StorageUnit.GB.toBytes(1), UUID.randomUUID().toString(), @@ -382,7 +453,7 @@ public void testDeleteContainer() throws IOException { final long containerID = 1L; final String clusterId = UUID.randomUUID().toString(); final String datanodeId = UUID.randomUUID().toString(); - final ConfigurationSource conf = new OzoneConfiguration(); + conf = new OzoneConfiguration(); final ContainerSet containerSet = new ContainerSet(1000); final MutableVolumeSet volumeSet = mock(MutableVolumeSet.class); @@ -470,13 +541,16 @@ public void testDeleteContainer() throws IOException { } @ContainerLayoutTestInfo.ContainerTest - public void testReconcileContainer(ContainerLayoutVersion layoutVersion) throws Exception { - OzoneConfiguration conf = new OzoneConfiguration(); + public void testContainerChecksumInvocation(ContainerLayoutVersion layoutVersion) throws Exception { + conf = new OzoneConfiguration(); KeyValueContainerData data = new KeyValueContainerData(123L, layoutVersion, GB, PipelineID.randomId().toString(), randomDatanodeDetails().getUuidString()); + data.setMetadataPath(tempDir.toString()); + data.setDbFile(dbFile.toFile()); Container container = new KeyValueContainer(data, conf); + createBlockMetaData(data, 5, 3); ContainerSet containerSet = new ContainerSet(1000); containerSet.addContainer(container); @@ -489,7 +563,7 @@ public void testReconcileContainer(ContainerLayoutVersion layoutVersion) throws Assertions.assertEquals(container.getContainerData().getContainerID(), reportedID); long reportDataChecksum = report.getDataChecksum(); - Assertions.assertNotEquals(0, reportDataChecksum, + assertNotEquals(0, reportDataChecksum, "Container report should have populated the checksum field with a non-zero value."); icrCount.incrementAndGet(); }; @@ -499,10 +573,140 @@ public void testReconcileContainer(ContainerLayoutVersion layoutVersion) throws Assertions.assertEquals(0, icrCount.get()); // This should trigger container report validation in the ICR handler above. - keyValueHandler.reconcileContainer(mock(DNContainerOperationClient.class), container, Collections.emptySet()); + DNContainerOperationClient mockDnClient = mock(DNContainerOperationClient.class); + DatanodeDetails peer1 = MockDatanodeDetails.randomDatanodeDetails(); + DatanodeDetails peer2 = MockDatanodeDetails.randomDatanodeDetails(); + DatanodeDetails peer3 = MockDatanodeDetails.randomDatanodeDetails(); + when(mockDnClient.getContainerChecksumInfo(anyLong(), any())).thenReturn(null); + keyValueHandler.reconcileContainer(mockDnClient, container, Sets.newHashSet(peer1, peer2, peer3)); + // Make sure all the replicas are used for reconciliation. + Mockito.verify(mockDnClient, atMostOnce()).getContainerChecksumInfo(anyLong(), eq(peer1)); + Mockito.verify(mockDnClient, atMostOnce()).getContainerChecksumInfo(anyLong(), eq(peer2)); + Mockito.verify(mockDnClient, atMostOnce()).getContainerChecksumInfo(anyLong(), eq(peer3)); Assertions.assertEquals(1, icrCount.get()); } + @ParameterizedTest + @MethodSource("corruptionValues") + public void testFullContainerReconciliation(int numBlocks, int numChunks) throws Exception { + KeyValueHandler kvHandler = createKeyValueHandler(tempDir); + ContainerChecksumTreeManager checksumManager = kvHandler.getChecksumManager(); + DNContainerOperationClient dnClient = new DNContainerOperationClient(conf, null, null); + final long containerID = 100L; + // Create 3 containers with 15 blocks each and 3 replicas. + List containers = createContainerWithBlocks(kvHandler, containerID, 15, 3); + assertEquals(3, containers.size()); + + // Introduce corruption in each container on different replicas. + introduceCorruption(kvHandler, containers.get(1), numBlocks, numChunks, false); + introduceCorruption(kvHandler, containers.get(2), numBlocks, numChunks, true); + + // Without reconciliation, checksums should be different because of the corruption. + Set checksumsBeforeReconciliation = new HashSet<>(); + for (KeyValueContainer kvContainer : containers) { + Optional containerChecksumInfo = + checksumManager.read(kvContainer.getContainerData()); + assertTrue(containerChecksumInfo.isPresent()); + long dataChecksum = containerChecksumInfo.get().getContainerMerkleTree().getDataChecksum(); + assertEquals(kvContainer.getContainerData().getDataChecksum(), dataChecksum); + checksumsBeforeReconciliation.add(dataChecksum); + } + // There should be more than 1 checksum because of the corruption. + assertTrue(checksumsBeforeReconciliation.size() > 1); + + List datanodes = ImmutableList.of(randomDatanodeDetails(), randomDatanodeDetails(), + randomDatanodeDetails()); + Map dnToContainerMap = new HashMap<>(); + dnToContainerMap.put(datanodes.get(0).getUuidString(), containers.get(0)); + dnToContainerMap.put(datanodes.get(1).getUuidString(), containers.get(1)); + dnToContainerMap.put(datanodes.get(2).getUuidString(), containers.get(2)); + + // Setup mock for each datanode network calls needed for reconciliation. + try (MockedStatic containerProtocolMock = + Mockito.mockStatic(ContainerProtocolCalls.class)) { + mockContainerProtocolCalls(containerProtocolMock, dnToContainerMap, checksumManager, kvHandler, containerID); + + kvHandler.reconcileContainer(dnClient, containers.get(0), Sets.newHashSet(datanodes)); + kvHandler.reconcileContainer(dnClient, containers.get(1), Sets.newHashSet(datanodes)); + kvHandler.reconcileContainer(dnClient, containers.get(2), Sets.newHashSet(datanodes)); + + // After reconciliation, checksums should be the same for all containers. + ContainerProtos.ContainerChecksumInfo prevContainerChecksumInfo = null; + for (KeyValueContainer kvContainer : containers) { + kvHandler.createContainerMerkleTree(kvContainer); + Optional containerChecksumInfo = + checksumManager.read(kvContainer.getContainerData()); + assertTrue(containerChecksumInfo.isPresent()); + long dataChecksum = containerChecksumInfo.get().getContainerMerkleTree().getDataChecksum(); + assertEquals(kvContainer.getContainerData().getDataChecksum(), dataChecksum); + if (prevContainerChecksumInfo != null) { + assertEquals(prevContainerChecksumInfo.getContainerMerkleTree().getDataChecksum(), dataChecksum); + } + prevContainerChecksumInfo = containerChecksumInfo.get(); + } + } + } + private void mockContainerProtocolCalls(MockedStatic containerProtocolMock, + Map dnToContainerMap, + ContainerChecksumTreeManager checksumManager, + KeyValueHandler kvHandler, + long containerID) { + // Mock getContainerChecksumInfo + containerProtocolMock.when(() -> ContainerProtocolCalls.getContainerChecksumInfo(any(), anyLong(), any())) + .thenAnswer(inv -> { + XceiverClientSpi xceiverClientSpi = inv.getArgument(0); + Pipeline pipeline = xceiverClientSpi.getPipeline(); + assertEquals(1, pipeline.size()); + DatanodeDetails dn = pipeline.getFirstNode(); + KeyValueContainer container = dnToContainerMap.get(dn.getUuidString()); + ByteString checksumInfo = checksumManager.getContainerChecksumInfo(container.getContainerData()); + return ContainerProtos.GetContainerChecksumInfoResponseProto.newBuilder() + .setContainerID(containerID) + .setContainerChecksumInfo(checksumInfo) + .build(); + }); + + // Mock getBlock + containerProtocolMock.when(() -> ContainerProtocolCalls.getBlock(any(), any(), any(), any(), anyMap())) + .thenAnswer(inv -> { + XceiverClientSpi xceiverClientSpi = inv.getArgument(0); + Pipeline pipeline = xceiverClientSpi.getPipeline(); + assertEquals(1, pipeline.size()); + DatanodeDetails dn = pipeline.getFirstNode(); + KeyValueContainer container = dnToContainerMap.get(dn.getUuidString()); + ContainerProtos.BlockData blockData = kvHandler.getBlockManager().getBlock(container, inv.getArgument(2)) + .getProtoBufMessage(); + return ContainerProtos.GetBlockResponseProto.newBuilder() + .setBlockData(blockData) + .build(); + }); + + // Mock readChunk + containerProtocolMock.when(() -> ContainerProtocolCalls.readChunk(any(), any(), any(), any(), any())) + .thenAnswer(inv -> { + XceiverClientSpi xceiverClientSpi = inv.getArgument(0); + Pipeline pipeline = xceiverClientSpi.getPipeline(); + assertEquals(1, pipeline.size()); + DatanodeDetails dn = pipeline.getFirstNode(); + KeyValueContainer container = dnToContainerMap.get(dn.getUuidString()); + return createReadChunkResponse(inv, container, kvHandler); + }); + } + + // Helper method to create readChunk responses + private ContainerProtos.ReadChunkResponseProto createReadChunkResponse(InvocationOnMock inv, + KeyValueContainer container, + KeyValueHandler kvHandler) throws IOException { + ContainerProtos.DatanodeBlockID blockId = inv.getArgument(2); + ContainerProtos.ChunkInfo chunkInfo = inv.getArgument(1); + return ContainerProtos.ReadChunkResponseProto.newBuilder() + .setBlockID(blockId) + .setChunkData(chunkInfo) + .setData(kvHandler.getChunkManager().readChunk(container, BlockID.getFromProtobuf(blockId), + ChunkInfo.getFromProtoBuf(chunkInfo), null).toByteString()) + .build(); + } + @Test public void testGetContainerChecksumInfoOnInvalidContainerStates() { when(handler.handleGetContainerChecksumInfo(any(), any())).thenCallRealMethod(); @@ -537,7 +741,7 @@ public void testDeleteContainerTimeout() throws IOException { final long containerID = 1L; final String clusterId = UUID.randomUUID().toString(); final String datanodeId = UUID.randomUUID().toString(); - final ConfigurationSource conf = new OzoneConfiguration(); + conf = new OzoneConfiguration(); final ContainerSet containerSet = new ContainerSet(1000); final MutableVolumeSet volumeSet = mock(MutableVolumeSet.class); final Clock clock = mock(Clock.class); @@ -605,24 +809,184 @@ private static ContainerCommandRequestProto createContainerRequest( .build(); } - private KeyValueHandler createKeyValueHandler() throws IOException { - final String clusterId = UUID.randomUUID().toString(); - final String datanodeId = UUID.randomUUID().toString(); - final ConfigurationSource conf = new OzoneConfiguration(); + private KeyValueHandler createKeyValueHandler(Path path) throws IOException { final ContainerSet containerSet = new ContainerSet(1000); final MutableVolumeSet volumeSet = mock(MutableVolumeSet.class); - HddsVolume hddsVolume = new HddsVolume.Builder(tempDir.toString()).conf(conf) - .clusterID(clusterId).datanodeUuid(datanodeId) + HddsVolume hddsVolume = new HddsVolume.Builder(path.toString()).conf(conf) + .clusterID(CLUSTER_ID).datanodeUuid(DATANODE_UUID) .volumeSet(volumeSet) .build(); - hddsVolume.format(clusterId); - hddsVolume.createWorkingDir(clusterId, null); - hddsVolume.createTmpDirs(clusterId); + hddsVolume.format(CLUSTER_ID); + hddsVolume.createWorkingDir(CLUSTER_ID, null); + hddsVolume.createTmpDirs(CLUSTER_ID); when(volumeSet.getVolumesList()).thenReturn(Collections.singletonList(hddsVolume)); final KeyValueHandler kvHandler = ContainerTestUtils.getKeyValueHandler(conf, - datanodeId, containerSet, volumeSet); - kvHandler.setClusterID(clusterId); + DATANODE_UUID, containerSet, volumeSet); + kvHandler.setClusterID(CLUSTER_ID); + // Clean up metrics for next tests. + hddsVolume.getVolumeInfoStats().unregister(); + hddsVolume.getVolumeIOStats().unregister(); + ContainerMetrics.remove(); return kvHandler; } + + /** + * Creates a container with normal and deleted blocks. + * First it will insert normal blocks, and then it will insert + * deleted blocks. + */ + protected List createContainerWithBlocks(KeyValueHandler kvHandler, long containerId, + int blocks, int numContainerCopy) + throws Exception { + String strBlock = "block"; + String strChunk = "chunkFile"; + List containers = new ArrayList<>(); + MutableVolumeSet volumeSet = new MutableVolumeSet(DATANODE_UUID, conf, null, + StorageVolume.VolumeType.DATA_VOLUME, null); + createDbInstancesForTestIfNeeded(volumeSet, CLUSTER_ID, CLUSTER_ID, conf); + int bytesPerChecksum = 2 * (int) OzoneConsts.KB; + Checksum checksum = new Checksum(ContainerProtos.ChecksumType.SHA256, + bytesPerChecksum); + byte[] chunkData = RandomStringUtils.randomAscii(CHUNK_LEN).getBytes(UTF_8); + ChecksumData checksumData = checksum.computeChecksum(chunkData); + + for (int j = 0; j < numContainerCopy; j++) { + KeyValueContainerData containerData = new KeyValueContainerData(containerId, + ContainerLayoutVersion.FILE_PER_BLOCK, (long) CHUNKS_PER_BLOCK * CHUNK_LEN * blocks, + UUID.randomUUID().toString(), UUID.randomUUID().toString()); + Path kvContainerPath = Files.createDirectory(tempDir.resolve(containerId + "-" + j)); + containerData.setMetadataPath(kvContainerPath.toString()); + containerData.setDbFile(kvContainerPath.toFile()); + + KeyValueContainer container = new KeyValueContainer(containerData, conf); + StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList()) + .forEach(hddsVolume -> hddsVolume.setDbParentDir(kvContainerPath.toFile())); + container.create(volumeSet, new RoundRobinVolumeChoosingPolicy(), UUID.randomUUID().toString()); + assertNotNull(containerData.getChunksPath()); + File chunksPath = new File(containerData.getChunksPath()); + ContainerLayoutTestInfo.FILE_PER_BLOCK.validateFileCount(chunksPath, 0, 0); + + List chunkList = new ArrayList<>(); + for (int i = 0; i < blocks; i++) { + BlockID blockID = new BlockID(containerId, i); + BlockData blockData = new BlockData(blockID); + + chunkList.clear(); + for (long chunkCount = 0; chunkCount < CHUNKS_PER_BLOCK; chunkCount++) { + String chunkName = strBlock + i + strChunk + chunkCount; + long offset = chunkCount * CHUNK_LEN; + ChunkInfo info = new ChunkInfo(chunkName, offset, CHUNK_LEN); + info.setChecksumData(checksumData); + chunkList.add(info.getProtoBufMessage()); + kvHandler.getChunkManager().writeChunk(container, blockID, info, + ByteBuffer.wrap(chunkData), WRITE_STAGE); + } + kvHandler.getChunkManager().finishWriteChunks(container, blockData); + blockData.setChunks(chunkList); + blockData.setBlockCommitSequenceId(i); + kvHandler.getBlockManager().putBlock(container, blockData); + } + + ContainerLayoutTestInfo.FILE_PER_BLOCK.validateFileCount(chunksPath, blocks, (long) blocks * CHUNKS_PER_BLOCK); + container.markContainerForClose(); + kvHandler.closeContainer(container); + containers.add(container); + } + + return containers; + } + + /** + * Introduce corruption in the container. + * 1. Delete blocks from the container. + * 2. Corrupt chunks at an offset. + * If revers is true, the blocks and chunks are deleted in reverse order. + */ + private void introduceCorruption(KeyValueHandler kvHandler, KeyValueContainer keyValueContainer, int numBlocks, + int numChunks, boolean reverse) throws IOException { + Random random = new Random(); + KeyValueContainerData containerData = keyValueContainer.getContainerData(); + // Simulate missing blocks + try (DBHandle handle = BlockUtils.getDB(containerData, conf); + BatchOperation batch = handle.getStore().getBatchHandler().initBatchOperation()) { + List blockDataList = kvHandler.getBlockManager().listBlock(keyValueContainer, -1, 100); + int size = blockDataList.size(); + for (int i = 0; i < numBlocks; i++) { + BlockData blockData = reverse ? blockDataList.get(size - 1 - i) : blockDataList.get(i); + File blockFile = getBlock(keyValueContainer, blockData.getBlockID().getLocalID()); + Assertions.assertTrue(blockFile.delete()); + handle.getStore().getBlockDataTable().deleteWithBatch(batch, containerData.getBlockKey(blockData.getLocalID())); + } + handle.getStore().getBatchHandler().commitBatchOperation(batch); + } + Files.deleteIfExists(getContainerChecksumFile(keyValueContainer.getContainerData()).toPath()); + kvHandler.createContainerMerkleTree(keyValueContainer); + + // Corrupt chunks at an offset. + List blockDataList = kvHandler.getBlockManager().listBlock(keyValueContainer, -1, 100); + int size = blockDataList.size(); + for (int i = 0; i < numChunks; i++) { + int blockIndex = reverse ? size - 1 - (i % size) : i % size; + BlockData blockData = blockDataList.get(blockIndex); + int chunkIndex = i / size; + File blockFile = getBlock(keyValueContainer, blockData.getBlockID().getLocalID()); + List chunks = new ArrayList<>(blockData.getChunks()); + ContainerProtos.ChunkInfo chunkInfo = chunks.remove(chunkIndex); + corruptFileAtOffset(blockFile, (int) chunkInfo.getOffset(), (int) chunkInfo.getLen()); + + // TODO: On-demand scanner (HDDS-10374) should detect this corruption and generate container merkle tree. + ContainerProtos.ContainerChecksumInfo.Builder builder = kvHandler.getChecksumManager() + .read(containerData).get().toBuilder(); + List blockMerkleTreeList = builder.getContainerMerkleTree() + .getBlockMerkleTreeList(); + assertEquals(size, blockMerkleTreeList.size()); + + builder.getContainerMerkleTreeBuilder().clearBlockMerkleTree(); + for (int j = 0; j < blockMerkleTreeList.size(); j++) { + ContainerProtos.BlockMerkleTree.Builder blockMerkleTreeBuilder = blockMerkleTreeList.get(j).toBuilder(); + if (j == blockIndex) { + List chunkMerkleTreeBuilderList = + blockMerkleTreeBuilder.getChunkMerkleTreeBuilderList(); + chunkMerkleTreeBuilderList.get(chunkIndex).setIsHealthy(false).setDataChecksum(random.nextLong()); + blockMerkleTreeBuilder.setDataChecksum(random.nextLong()); + } + builder.getContainerMerkleTreeBuilder().addBlockMerkleTree(blockMerkleTreeBuilder.build()); + } + builder.getContainerMerkleTreeBuilder().setDataChecksum(random.nextLong()); + Files.deleteIfExists(getContainerChecksumFile(keyValueContainer.getContainerData()).toPath()); + writeContainerDataTreeProto(keyValueContainer.getContainerData(), builder.getContainerMerkleTree()); + } + } + + /** + * Overwrite the file with random bytes at an offset within the given length. + */ + public static void corruptFileAtOffset(File file, int offset, int chunkLength) { + try { + final int fileLength = (int) file.length(); + assertTrue(fileLength >= offset + chunkLength); + final int chunkEnd = offset + chunkLength; + + Path path = file.toPath(); + final byte[] original = IOUtils.readFully(Files.newInputStream(path), fileLength); + + // Corrupt the last byte and middle bytes of the block. The scanner should log this as two errors. + final byte[] corruptedBytes = Arrays.copyOf(original, fileLength); + corruptedBytes[chunkEnd - 1] = (byte) (original[chunkEnd - 1] << 1); + final long chunkMid = offset + ((long) chunkLength - offset) / 2; + corruptedBytes[(int) (chunkMid / 2)] = (byte) (original[(int) (chunkMid / 2)] << 1); + + + Files.write(path, corruptedBytes, + StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.SYNC); + + assertThat(IOUtils.readFully(Files.newInputStream(path), fileLength)) + .isEqualTo(corruptedBytes) + .isNotEqualTo(original); + } catch (IOException ex) { + // Fail the test. + throw new UncheckedIOException(ex); + } + } } diff --git a/hadoop-ozone/dist/src/main/smoketest/admincli/container.robot b/hadoop-ozone/dist/src/main/smoketest/admincli/container.robot index b17973e1f364..55132123cde9 100644 --- a/hadoop-ozone/dist/src/main/smoketest/admincli/container.robot +++ b/hadoop-ozone/dist/src/main/smoketest/admincli/container.robot @@ -150,11 +150,10 @@ Close container Reconcile closed container # Check that info does not show replica checksums, since manual reconciliation has not yet been triggered. - # TODO When the scanner is computing checksums automatically, this test may need to be updated. ${container} = Execute ozone admin container list --state CLOSED | jq -r 'select(.replicationConfig.replicationFactor == "THREE") | .containerID' | head -1 ${data_checksum} = Execute ozone admin container info "${container}" --json | jq -r '.replicas[].dataChecksum' | head -n1 - # 0 is the hex value of an empty checksum. - Should Be Equal As Strings 0 ${data_checksum} + # 0 is the hex value of an empty checksum. After container close the data checksum should not be 0. + Should Not Be Equal As Strings 0 ${data_checksum} # When reconciliation finishes, replica checksums should be shown. Execute ozone admin container reconcile ${container} Wait until keyword succeeds 1min 5sec Reconciliation complete ${container} diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestHelper.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestHelper.java index d5e13921d7ae..f835340ac3d1 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestHelper.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestHelper.java @@ -377,6 +377,20 @@ public static void waitForContainerClose(MiniOzoneCluster cluster, } } + public static void waitForScmContainerState(MiniOzoneCluster cluster, long containerID, + HddsProtos.LifeCycleState lifeCycleState) + throws InterruptedException, TimeoutException { + GenericTestUtils.waitFor(() -> { + try { + HddsProtos.LifeCycleState state = cluster.getStorageContainerManager().getContainerManager() + .getContainer(ContainerID.valueOf(containerID)).getState(); + return state == lifeCycleState; + } catch (ContainerNotFoundException e) { + return false; + } + }, 500, 100 * 1000); + } + public static StateMachine getStateMachine(MiniOzoneCluster cluster) throws Exception { return getStateMachine(cluster.getHddsDatanodes().get(0), null); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/checksum/TestContainerCommandReconciliation.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/checksum/TestContainerCommandReconciliation.java index 2211aaf2b069..f51dbfed43af 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/checksum/TestContainerCommandReconciliation.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/checksum/TestContainerCommandReconciliation.java @@ -18,29 +18,70 @@ package org.apache.hadoop.ozone.dn.checksum; import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION; +import static org.apache.hadoop.hdds.DFSConfigKeysLegacy.DFS_DATANODE_KERBEROS_KEYTAB_FILE_KEY; +import static org.apache.hadoop.hdds.DFSConfigKeysLegacy.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY; +import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED; +import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_TOKEN_ENABLED; +import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_SECRET_KEY_EXPIRY_DURATION; +import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_SECRET_KEY_ROTATE_CHECK_DURATION; +import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_SECRET_KEY_ROTATE_DURATION; import static org.apache.hadoop.hdds.HddsConfigKeys.OZONE_METADATA_DIRS; import static org.apache.hadoop.hdds.client.ReplicationFactor.THREE; import static org.apache.hadoop.hdds.client.ReplicationType.RATIS; +import static org.apache.hadoop.hdds.scm.ScmConfig.ConfigStrings.HDDS_SCM_KERBEROS_KEYTAB_FILE_KEY; +import static org.apache.hadoop.hdds.scm.ScmConfig.ConfigStrings.HDDS_SCM_KERBEROS_PRINCIPAL_KEY; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY; +import static org.apache.hadoop.hdds.scm.server.SCMHTTPServerConfig.ConfigStrings.HDDS_SCM_HTTP_KERBEROS_KEYTAB_FILE_KEY; +import static org.apache.hadoop.hdds.scm.server.SCMHTTPServerConfig.ConfigStrings.HDDS_SCM_HTTP_KERBEROS_PRINCIPAL_KEY; +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ADMINISTRATORS; +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE; +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SECURITY_ENABLED_KEY; +import static org.apache.hadoop.ozone.container.checksum.ContainerChecksumTreeManager.getContainerChecksumFile; import static org.apache.hadoop.ozone.container.checksum.ContainerMerkleTreeTestUtils.assertTreesSortedAndMatch; import static org.apache.hadoop.ozone.container.checksum.ContainerMerkleTreeTestUtils.buildTestTree; +import static org.apache.hadoop.ozone.container.checksum.ContainerMerkleTreeTestUtils.readChecksumFile; +import static org.apache.hadoop.ozone.container.checksum.ContainerMerkleTreeTestUtils.writeContainerDataTreeProto; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_HTTP_KERBEROS_KEYTAB_FILE; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_HTTP_KERBEROS_PRINCIPAL_KEY; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_KERBEROS_KEYTAB_FILE_KEY; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_KERBEROS_PRINCIPAL_KEY; +import static org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod.KERBEROS; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import java.io.File; import java.io.IOException; +import java.net.InetAddress; import java.nio.file.Files; +import java.nio.file.Paths; import java.nio.file.StandardOpenOption; import java.util.List; +import java.util.Properties; +import java.util.Random; +import java.util.Set; import java.util.UUID; import java.util.stream.Collectors; import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.conf.StorageUnit; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; -import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.ScmConfig; import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.container.ContainerReplica; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; +import org.apache.hadoop.hdds.scm.server.SCMHTTPServerConfig; +import org.apache.hadoop.hdds.security.symmetric.SecretKeyClient; +import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient; +import org.apache.hadoop.hdds.utils.db.BatchOperation; +import org.apache.hadoop.minikdc.MiniKdc; +import org.apache.hadoop.ozone.ClientVersion; import org.apache.hadoop.ozone.HddsDatanodeService; import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.ozone.client.ObjectStore; @@ -50,15 +91,26 @@ import org.apache.hadoop.ozone.client.OzoneVolume; import org.apache.hadoop.ozone.client.io.OzoneOutputStream; import org.apache.hadoop.ozone.container.TestHelper; -import org.apache.hadoop.ozone.container.checksum.ContainerChecksumTreeManager; import org.apache.hadoop.ozone.container.checksum.ContainerMerkleTreeWriter; import org.apache.hadoop.ozone.container.checksum.DNContainerOperationClient; +import org.apache.hadoop.ozone.container.common.helpers.BlockData; import org.apache.hadoop.ozone.container.common.interfaces.Container; +import org.apache.hadoop.ozone.container.common.interfaces.DBHandle; +import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine; import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer; +import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler; +import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils; +import org.apache.hadoop.ozone.container.keyvalue.interfaces.BlockManager; import org.apache.hadoop.ozone.container.ozoneimpl.ContainerScannerConfiguration; +import org.apache.hadoop.ozone.container.ozoneimpl.MetadataScanResult; +import org.apache.hadoop.ozone.om.OzoneManager; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.ozone.test.GenericTestUtils; +import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException; import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; @@ -73,24 +125,34 @@ public class TestContainerCommandReconciliation { private static ObjectStore store; private static OzoneConfiguration conf; private static DNContainerOperationClient dnClient; + private static final String KEY_NAME = "testkey"; @TempDir private static File testDir; + @TempDir + private static File workDir; + private static MiniKdc miniKdc; + private static File ozoneKeytab; + private static File spnegoKeytab; + private static File testUserKeytab; + private static String testUserPrincipal; + private static String host; @BeforeAll public static void init() throws Exception { conf = new OzoneConfiguration(); - conf.setInt(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT, 1); + conf.set(OZONE_SCM_CLIENT_ADDRESS_KEY, "localhost"); conf.set(OZONE_METADATA_DIRS, testDir.getAbsolutePath()); + conf.setStorageSize(OZONE_SCM_CHUNK_SIZE_KEY, 128 * 1024, StorageUnit.BYTES); + conf.setStorageSize(OZONE_SCM_BLOCK_SIZE, 512 * 1024, StorageUnit.BYTES); // Disable the container scanner so it does not create merkle tree files that interfere with this test. conf.getObject(ContainerScannerConfiguration.class).setEnabled(false); - cluster = MiniOzoneCluster.newBuilder(conf) - .setNumDatanodes(3) - .build(); - cluster.waitForClusterToBeReady(); - rpcClient = OzoneClientFactory.getRpcClient(conf); - store = rpcClient.getObjectStore(); - dnClient = new DNContainerOperationClient(conf, null, null); + + startMiniKdc(); + setSecureConfig(); + createCredentialsInKDC(); + setSecretKeysConfig(); + startCluster(); } @AfterAll @@ -99,8 +161,16 @@ public static void stop() throws IOException { rpcClient.close(); } + if (dnClient != null) { + dnClient.close(); + } + + if (miniKdc != null) { + miniKdc.stop(); + } + if (cluster != null) { - cluster.shutdown(); + cluster.stop(); } } @@ -110,7 +180,9 @@ public static void stop() throws IOException { */ @Test public void testGetChecksumInfoOpenReplica() throws Exception { - long containerID = writeDataAndGetContainer(false); + String volume = UUID.randomUUID().toString(); + String bucket = UUID.randomUUID().toString(); + long containerID = writeDataAndGetContainer(false, volume, bucket); HddsDatanodeService targetDN = cluster.getHddsDatanodes().get(0); StorageContainerException ex = assertThrows(StorageContainerException.class, () -> dnClient.getContainerChecksumInfo(containerID, targetDN.getDatanodeDetails())); @@ -145,12 +217,14 @@ public void testGetChecksumInfoNonexistentReplica() { */ @Test public void testGetChecksumInfoNonexistentFile() throws Exception { - long containerID = writeDataAndGetContainer(true); + String volume = UUID.randomUUID().toString(); + String bucket = UUID.randomUUID().toString(); + long containerID = writeDataAndGetContainer(true, volume, bucket); // Pick a datanode and remove its checksum file. HddsDatanodeService targetDN = cluster.getHddsDatanodes().get(0); Container container = targetDN.getDatanodeStateMachine().getContainer() .getContainerSet().getContainer(containerID); - File treeFile = ContainerChecksumTreeManager.getContainerChecksumFile(container.getContainerData()); + File treeFile = getContainerChecksumFile(container.getContainerData()); // Closing the container should have generated the tree file. assertTrue(treeFile.exists()); assertTrue(treeFile.delete()); @@ -168,12 +242,14 @@ public void testGetChecksumInfoNonexistentFile() throws Exception { */ @Test public void testGetChecksumInfoServerIOError() throws Exception { - long containerID = writeDataAndGetContainer(true); + String volume = UUID.randomUUID().toString(); + String bucket = UUID.randomUUID().toString(); + long containerID = writeDataAndGetContainer(true, volume, bucket); // Pick a datanode and remove its checksum file. HddsDatanodeService targetDN = cluster.getHddsDatanodes().get(0); Container container = targetDN.getDatanodeStateMachine().getContainer() .getContainerSet().getContainer(containerID); - File treeFile = ContainerChecksumTreeManager.getContainerChecksumFile(container.getContainerData()); + File treeFile = getContainerChecksumFile(container.getContainerData()); assertTrue(treeFile.exists()); // Make the server unable to read the file. assertTrue(treeFile.setReadable(false)); @@ -190,13 +266,15 @@ public void testGetChecksumInfoServerIOError() throws Exception { */ @Test public void testGetCorruptChecksumInfo() throws Exception { - long containerID = writeDataAndGetContainer(true); + String volume = UUID.randomUUID().toString(); + String bucket = UUID.randomUUID().toString(); + long containerID = writeDataAndGetContainer(true, volume, bucket); // Pick a datanode and corrupt its checksum file. HddsDatanodeService targetDN = cluster.getHddsDatanodes().get(0); Container container = targetDN.getDatanodeStateMachine().getContainer() .getContainerSet().getContainer(containerID); - File treeFile = ContainerChecksumTreeManager.getContainerChecksumFile(container.getContainerData()); + File treeFile = getContainerChecksumFile(container.getContainerData()); Files.write(treeFile.toPath(), new byte[]{1, 2, 3}, StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.SYNC); @@ -207,13 +285,15 @@ public void testGetCorruptChecksumInfo() throws Exception { @Test public void testGetEmptyChecksumInfo() throws Exception { - long containerID = writeDataAndGetContainer(true); + String volume = UUID.randomUUID().toString(); + String bucket = UUID.randomUUID().toString(); + long containerID = writeDataAndGetContainer(true, volume, bucket); // Pick a datanode and truncate its checksum file to zero length. HddsDatanodeService targetDN = cluster.getHddsDatanodes().get(0); Container container = targetDN.getDatanodeStateMachine().getContainer() .getContainerSet().getContainer(containerID); - File treeFile = ContainerChecksumTreeManager.getContainerChecksumFile(container.getContainerData()); + File treeFile = getContainerChecksumFile(container.getContainerData()); // TODO After HDDS-10379 the file will already exist and need to be overwritten. assertTrue(treeFile.exists()); Files.write(treeFile.toPath(), new byte[]{}, @@ -229,7 +309,9 @@ public void testGetEmptyChecksumInfo() throws Exception { @Test public void testGetChecksumInfoSuccess() throws Exception { - long containerID = writeDataAndGetContainer(true); + String volume = UUID.randomUUID().toString(); + String bucket = UUID.randomUUID().toString(); + long containerID = writeDataAndGetContainer(true, volume, bucket); // Overwrite the existing tree with a custom one for testing. We will check that it is returned properly from the // API. ContainerMerkleTreeWriter tree = buildTestTree(conf); @@ -247,26 +329,217 @@ public void testGetChecksumInfoSuccess() throws Exception { } } - private long writeDataAndGetContainer(boolean close) throws Exception { - String volumeName = UUID.randomUUID().toString(); - String bucketName = UUID.randomUUID().toString(); + @Test + public void testContainerChecksumWithBlockMissing() throws Exception { + // 1. Write data to a container. + // Read the key back and check its hash. + String volume = UUID.randomUUID().toString(); + String bucket = UUID.randomUUID().toString(); + Pair containerAndData = getDataAndContainer(true, 20 * 1024 * 1024, volume, bucket); + long containerID = containerAndData.getLeft(); + byte[] data = containerAndData.getRight(); + // Get the datanodes where the container replicas are stored. + List dataNodeDetails = cluster.getStorageContainerManager().getContainerManager() + .getContainerReplicas(ContainerID.valueOf(containerID)) + .stream().map(ContainerReplica::getDatanodeDetails) + .collect(Collectors.toList()); + Assertions.assertEquals(3, dataNodeDetails.size()); + HddsDatanodeService hddsDatanodeService = cluster.getHddsDatanode(dataNodeDetails.get(0)); + DatanodeStateMachine datanodeStateMachine = hddsDatanodeService.getDatanodeStateMachine(); + Container container = datanodeStateMachine.getContainer().getContainerSet().getContainer(containerID); + KeyValueContainerData containerData = (KeyValueContainerData) container.getContainerData(); + ContainerProtos.ContainerChecksumInfo oldContainerChecksumInfo = readChecksumFile(container.getContainerData()); + KeyValueHandler kvHandler = (KeyValueHandler) datanodeStateMachine.getContainer().getDispatcher() + .getHandler(ContainerProtos.ContainerType.KeyValueContainer); + + BlockManager blockManager = kvHandler.getBlockManager(); + List blockDataList = blockManager.listBlock(container, -1, 100); + String chunksPath = container.getContainerData().getChunksPath(); + long oldDataChecksum = oldContainerChecksumInfo.getContainerMerkleTree().getDataChecksum(); + + // 2. Delete some blocks to simulate missing blocks. + try (DBHandle db = BlockUtils.getDB(containerData, conf); + BatchOperation op = db.getStore().getBatchHandler().initBatchOperation()) { + for (int i = 0; i < blockDataList.size(); i += 2) { + BlockData blockData = blockDataList.get(i); + // Delete the block metadata from the container db + db.getStore().getBlockDataTable().deleteWithBatch(op, containerData.getBlockKey(blockData.getLocalID())); + // Delete the block file. + Files.deleteIfExists(Paths.get(chunksPath + "/" + blockData.getBlockID().getLocalID() + ".block")); + } + db.getStore().getBatchHandler().commitBatchOperation(op); + db.getStore().flushDB(); + } + + // TODO: Use On-demand container scanner to build the new container merkle tree. (HDDS-10374) + Files.deleteIfExists(getContainerChecksumFile(container.getContainerData()).toPath()); + kvHandler.createContainerMerkleTree(container); + ContainerProtos.ContainerChecksumInfo containerChecksumAfterBlockDelete = + readChecksumFile(container.getContainerData()); + long dataChecksumAfterBlockDelete = containerChecksumAfterBlockDelete.getContainerMerkleTree().getDataChecksum(); + // Checksum should have changed after block delete. + Assertions.assertNotEquals(oldDataChecksum, dataChecksumAfterBlockDelete); + + // Since the container is already closed, we have manually updated the container checksum file. + // This doesn't update the checksum reported to SCM, and we need to trigger an ICR. + // Marking a container unhealthy will send an ICR. + kvHandler.markContainerUnhealthy(container, MetadataScanResult.deleted()); + waitForDataChecksumsAtSCM(containerID, 2); + + // 3. Reconcile the container. + cluster.getStorageContainerLocationClient().reconcileContainer(containerID); + // Compare and check if dataChecksum is same on all replicas. + waitForDataChecksumsAtSCM(containerID, 1); + ContainerProtos.ContainerChecksumInfo newContainerChecksumInfo = readChecksumFile(container.getContainerData()); + assertTreesSortedAndMatch(oldContainerChecksumInfo.getContainerMerkleTree(), + newContainerChecksumInfo.getContainerMerkleTree()); + TestHelper.validateData(KEY_NAME, data, store, volume, bucket); + } + + @Test + public void testContainerChecksumChunkCorruption() throws Exception { + // 1. Write data to a container. + String volume = UUID.randomUUID().toString(); + String bucket = UUID.randomUUID().toString(); + Pair containerAndData = getDataAndContainer(true, 20 * 1024 * 1024, volume, bucket); + long containerID = containerAndData.getLeft(); + byte[] data = containerAndData.getRight(); + // Get the datanodes where the container replicas are stored. + List dataNodeDetails = cluster.getStorageContainerManager().getContainerManager() + .getContainerReplicas(ContainerID.valueOf(containerID)) + .stream().map(ContainerReplica::getDatanodeDetails) + .collect(Collectors.toList()); + Assertions.assertEquals(3, dataNodeDetails.size()); + HddsDatanodeService hddsDatanodeService = cluster.getHddsDatanode(dataNodeDetails.get(0)); + DatanodeStateMachine datanodeStateMachine = hddsDatanodeService.getDatanodeStateMachine(); + Container container = datanodeStateMachine.getContainer().getContainerSet().getContainer(containerID); + KeyValueContainerData containerData = (KeyValueContainerData) container.getContainerData(); + ContainerProtos.ContainerChecksumInfo oldContainerChecksumInfo = readChecksumFile(container.getContainerData()); + KeyValueHandler kvHandler = (KeyValueHandler) datanodeStateMachine.getContainer().getDispatcher() + .getHandler(ContainerProtos.ContainerType.KeyValueContainer); + + BlockManager blockManager = kvHandler.getBlockManager(); + List blockDatas = blockManager.listBlock(container, -1, 100); + long oldDataChecksum = oldContainerChecksumInfo.getContainerMerkleTree().getDataChecksum(); + + // 2. Corrupt first chunk for all the blocks + try (DBHandle db = BlockUtils.getDB(containerData, conf); + BatchOperation op = db.getStore().getBatchHandler().initBatchOperation()) { + for (BlockData blockData : blockDatas) { + // Modify the block metadata to simulate chunk corruption. + ContainerProtos.BlockData.Builder blockDataBuilder = blockData.getProtoBufMessage().toBuilder(); + blockDataBuilder.clearChunks(); + + ContainerProtos.ChunkInfo chunkInfo = blockData.getChunks().get(0); + ContainerProtos.ChecksumData.Builder checksumDataBuilder = ContainerProtos.ChecksumData.newBuilder() + .setBytesPerChecksum(chunkInfo.getChecksumData().getBytesPerChecksum()) + .setType(chunkInfo.getChecksumData().getType()); + + for (ByteString checksum : chunkInfo.getChecksumData().getChecksumsList()) { + byte[] checksumBytes = checksum.toByteArray(); + // Modify the checksum bytes to simulate corruption. + checksumBytes[0] = (byte) (checksumBytes[0] - 1); + checksumDataBuilder.addChecksums(ByteString.copyFrom(checksumBytes)).build(); + } + chunkInfo = chunkInfo.toBuilder().setChecksumData(checksumDataBuilder.build()).build(); + blockDataBuilder.addChunks(chunkInfo); + for (int i = 1; i < blockData.getChunks().size(); i++) { + blockDataBuilder.addChunks(blockData.getChunks().get(i)); + } + + // Modify the block metadata from the container db to simulate chunk corruption. + db.getStore().getBlockDataTable().putWithBatch(op, containerData.getBlockKey(blockData.getLocalID()), + BlockData.getFromProtoBuf(blockDataBuilder.build())); + } + db.getStore().getBatchHandler().commitBatchOperation(op); + db.getStore().flushDB(); + } + + Files.deleteIfExists(getContainerChecksumFile(container.getContainerData()).toPath()); + kvHandler.createContainerMerkleTree(container); + // To set unhealthy for chunks that are corrupted. + ContainerProtos.ContainerChecksumInfo containerChecksumAfterChunkCorruption = + readChecksumFile(container.getContainerData()); + long dataChecksumAfterAfterChunkCorruption = containerChecksumAfterChunkCorruption + .getContainerMerkleTree().getDataChecksum(); + // Checksum should have changed after chunk corruption. + Assertions.assertNotEquals(oldDataChecksum, dataChecksumAfterAfterChunkCorruption); + + // 3. Set Unhealthy for first chunk of all blocks. This should be done by the scanner, Until then this is a + // manual step. + // // TODO: Use On-demand container scanner to build the new container merkle tree (HDDS-10374) + Random random = new Random(); + ContainerProtos.ContainerChecksumInfo.Builder builder = containerChecksumAfterChunkCorruption.toBuilder(); + List blockMerkleTreeList = builder.getContainerMerkleTree() + .getBlockMerkleTreeList(); + builder.getContainerMerkleTreeBuilder().clearBlockMerkleTree(); + for (ContainerProtos.BlockMerkleTree blockMerkleTree : blockMerkleTreeList) { + ContainerProtos.BlockMerkleTree.Builder blockMerkleTreeBuilder = blockMerkleTree.toBuilder(); + List chunkMerkleTreeBuilderList = + blockMerkleTreeBuilder.getChunkMerkleTreeBuilderList(); + chunkMerkleTreeBuilderList.get(0).setIsHealthy(false).setDataChecksum(random.nextLong()); + blockMerkleTreeBuilder.setDataChecksum(random.nextLong()); + builder.getContainerMerkleTreeBuilder().addBlockMerkleTree(blockMerkleTreeBuilder.build()); + } + builder.getContainerMerkleTreeBuilder().setDataChecksum(random.nextLong()); + Files.deleteIfExists(getContainerChecksumFile(container.getContainerData()).toPath()); + writeContainerDataTreeProto(container.getContainerData(), builder.getContainerMerkleTree()); + + // Since the container is already closed, we have manually updated the container checksum file. + // This doesn't update the checksum reported to SCM, and we need to trigger an ICR. + // Marking a container unhealthy will send an ICR. + kvHandler.markContainerUnhealthy(container, MetadataScanResult.deleted()); + waitForDataChecksumsAtSCM(containerID, 2); + + // 4. Reconcile the container. + cluster.getStorageContainerLocationClient().reconcileContainer(containerID); + // Compare and check if dataChecksum is same on all replicas. + waitForDataChecksumsAtSCM(containerID, 1); + ContainerProtos.ContainerChecksumInfo newContainerChecksumInfo = readChecksumFile(container.getContainerData()); + assertTreesSortedAndMatch(oldContainerChecksumInfo.getContainerMerkleTree(), + newContainerChecksumInfo.getContainerMerkleTree()); + Assertions.assertEquals(oldDataChecksum, newContainerChecksumInfo.getContainerMerkleTree().getDataChecksum()); + TestHelper.validateData(KEY_NAME, data, store, volume, bucket); + } + + private void waitForDataChecksumsAtSCM(long containerID, int expectedSize) throws Exception { + GenericTestUtils.waitFor(() -> { + try { + Set dataChecksums = cluster.getStorageContainerLocationClient().getContainerReplicas(containerID, + ClientVersion.CURRENT_VERSION).stream() + .map(HddsProtos.SCMContainerReplicaProto::getDataChecksum) + .collect(Collectors.toSet()); + return dataChecksums.size() == expectedSize; + } catch (Exception ex) { + return false; + } + }, 500, 20000); + } + + private Pair getDataAndContainer(boolean close, int dataLen, String volumeName, String bucketName) + throws Exception { store.createVolume(volumeName); OzoneVolume volume = store.getVolume(volumeName); volume.createBucket(bucketName); OzoneBucket bucket = volume.getBucket(bucketName); - byte[] data = "Test content".getBytes(UTF_8); + byte[] data = randomAlphabetic(dataLen).getBytes(UTF_8); // Write Key - try (OzoneOutputStream os = TestHelper.createKey("testkey", RATIS, THREE, 0, store, volumeName, bucketName)) { + try (OzoneOutputStream os = TestHelper.createKey(KEY_NAME, RATIS, THREE, dataLen, store, volumeName, bucketName)) { IOUtils.write(data, os); } - long containerID = bucket.getKey("testkey").getOzoneKeyLocations().stream() + long containerID = bucket.getKey(KEY_NAME).getOzoneKeyLocations().stream() .findFirst().get().getContainerID(); if (close) { TestHelper.waitForContainerClose(cluster, containerID); + TestHelper.waitForScmContainerState(cluster, containerID, HddsProtos.LifeCycleState.CLOSED); } - return containerID; + return Pair.of(containerID, data); + } + + private long writeDataAndGetContainer(boolean close, String volume, String bucket) throws Exception { + return getDataAndContainer(close, 5, volume, bucket).getLeft(); } public static void writeChecksumFileToDatanodes(long containerID, ContainerMerkleTreeWriter tree) throws Exception { @@ -278,8 +551,84 @@ public static void writeChecksumFileToDatanodes(long containerID, ContainerMerkl KeyValueContainer keyValueContainer = (KeyValueContainer) dn.getDatanodeStateMachine().getContainer().getController() .getContainer(containerID); - keyValueHandler.getChecksumManager().writeContainerDataTree( - keyValueContainer.getContainerData(), tree); + if (keyValueContainer != null) { + keyValueHandler.getChecksumManager().writeContainerDataTree( + keyValueContainer.getContainerData(), tree); + } } } + + private static void setSecretKeysConfig() { + // Secret key lifecycle configs. + conf.set(HDDS_SECRET_KEY_ROTATE_CHECK_DURATION, "500s"); + conf.set(HDDS_SECRET_KEY_ROTATE_DURATION, "500s"); + conf.set(HDDS_SECRET_KEY_EXPIRY_DURATION, "500s"); + + // enable tokens + conf.setBoolean(HDDS_BLOCK_TOKEN_ENABLED, true); + conf.setBoolean(HDDS_CONTAINER_TOKEN_ENABLED, true); + } + + private static void createCredentialsInKDC() throws Exception { + ScmConfig scmConfig = conf.getObject(ScmConfig.class); + SCMHTTPServerConfig httpServerConfig = + conf.getObject(SCMHTTPServerConfig.class); + createPrincipal(ozoneKeytab, scmConfig.getKerberosPrincipal()); + createPrincipal(spnegoKeytab, httpServerConfig.getKerberosPrincipal()); + createPrincipal(testUserKeytab, testUserPrincipal); + } + + private static void createPrincipal(File keytab, String... principal) + throws Exception { + miniKdc.createPrincipal(keytab, principal); + } + + private static void startMiniKdc() throws Exception { + Properties securityProperties = MiniKdc.createConf(); + miniKdc = new MiniKdc(securityProperties, workDir); + miniKdc.start(); + } + + private static void setSecureConfig() throws IOException { + conf.setBoolean(OZONE_SECURITY_ENABLED_KEY, true); + host = InetAddress.getLocalHost().getCanonicalHostName() + .toLowerCase(); + conf.set(HADOOP_SECURITY_AUTHENTICATION, KERBEROS.name()); + String curUser = UserGroupInformation.getCurrentUser().getUserName(); + conf.set(OZONE_ADMINISTRATORS, curUser); + String realm = miniKdc.getRealm(); + String hostAndRealm = host + "@" + realm; + conf.set(HDDS_SCM_KERBEROS_PRINCIPAL_KEY, "scm/" + hostAndRealm); + conf.set(HDDS_SCM_HTTP_KERBEROS_PRINCIPAL_KEY, "HTTP_SCM/" + hostAndRealm); + conf.set(OZONE_OM_KERBEROS_PRINCIPAL_KEY, "scm/" + hostAndRealm); + conf.set(OZONE_OM_HTTP_KERBEROS_PRINCIPAL_KEY, "HTTP_OM/" + hostAndRealm); + conf.set(DFS_DATANODE_KERBEROS_PRINCIPAL_KEY, "scm/" + hostAndRealm); + + ozoneKeytab = new File(workDir, "scm.keytab"); + spnegoKeytab = new File(workDir, "http.keytab"); + testUserKeytab = new File(workDir, "testuser.keytab"); + testUserPrincipal = "test@" + realm; + + conf.set(HDDS_SCM_KERBEROS_KEYTAB_FILE_KEY, ozoneKeytab.getAbsolutePath()); + conf.set(HDDS_SCM_HTTP_KERBEROS_KEYTAB_FILE_KEY, spnegoKeytab.getAbsolutePath()); + conf.set(OZONE_OM_KERBEROS_KEYTAB_FILE_KEY, ozoneKeytab.getAbsolutePath()); + conf.set(OZONE_OM_HTTP_KERBEROS_KEYTAB_FILE, spnegoKeytab.getAbsolutePath()); + conf.set(DFS_DATANODE_KERBEROS_KEYTAB_FILE_KEY, ozoneKeytab.getAbsolutePath()); + } + + private static void startCluster() throws Exception { + OzoneManager.setTestSecureOmFlag(true); + cluster = MiniOzoneCluster.newHABuilder(conf) + .setSCMServiceId("SecureSCM") + .setNumOfStorageContainerManagers(3) + .setNumOfOzoneManagers(1) + .setNumDatanodes(3) + .build(); + cluster.waitForClusterToBeReady(); + rpcClient = OzoneClientFactory.getRpcClient(conf); + store = rpcClient.getObjectStore(); + SecretKeyClient secretKeyClient = cluster.getStorageContainerManager().getSecretKeyManager(); + CertificateClient certClient = cluster.getStorageContainerManager().getScmCertificateClient(); + dnClient = new DNContainerOperationClient(conf, certClient, secretKeyClient); + } }