diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ContainerChecksumTreeManager.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ContainerChecksumTreeManager.java index 2b9735fe8534..f32f1debcdc5 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ContainerChecksumTreeManager.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ContainerChecksumTreeManager.java @@ -83,15 +83,10 @@ public void stop() { * The data merkle tree within the file is replaced with the {@code tree} parameter, but all other content of the * file remains unchanged. * Concurrent writes to the same file are coordinated internally. - * This method also updates the container's data checksum in the {@code data} parameter, which will be seen by SCM - * on container reports. */ public ContainerProtos.ContainerChecksumInfo writeContainerDataTree(ContainerData data, ContainerMerkleTreeWriter tree) throws IOException { long containerID = data.getContainerID(); - // If there is an error generating the tree and we cannot obtain a final checksum, use 0 to indicate a metadata - // failure. - long dataChecksum = 0; ContainerProtos.ContainerChecksumInfo checksumInfo = null; Lock writeLock = getLock(containerID); writeLock.lock(); @@ -113,13 +108,9 @@ public ContainerProtos.ContainerChecksumInfo writeContainerDataTree(ContainerDat .setContainerMerkleTree(treeProto); checksumInfo = checksumInfoBuilder.build(); write(data, checksumInfo); - // If write succeeds, update the checksum in memory. Otherwise 0 will be used to indicate the metadata failure. - dataChecksum = treeProto.getDataChecksum(); - LOG.debug("Merkle tree for container {} updated with container data checksum {}", containerID, - checksumToString(dataChecksum)); + LOG.debug("Data merkle tree for container {} updated with container checksum {}", containerID, + checksumToString(treeProto.getDataChecksum())); } finally { - // Even if persisting the tree fails, we should still update the data checksum in memory to report back to SCM. - data.setDataChecksum(dataChecksum); writeLock.unlock(); } return checksumInfo; diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java index f7af6f322893..d6f7817e28ce 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java @@ -34,9 +34,6 @@ import java.nio.file.Paths; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; -import java.time.Duration; -import java.time.Instant; -import java.util.Optional; import java.util.UUID; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -54,7 +51,6 @@ import org.apache.hadoop.ozone.container.common.impl.ContainerData; import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml; import org.apache.hadoop.ozone.container.common.impl.ContainerSet; -import org.apache.hadoop.ozone.container.common.interfaces.Container; import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -237,29 +233,6 @@ public static String getContainerFileChecksum(String containerDataYamlStr) } } - public static boolean recentlyScanned(Container container, - long minScanGap, Logger log) { - Optional lastScanTime = - container.getContainerData().lastDataScanTime(); - Instant now = Instant.now(); - // Container is considered recently scanned if it was scanned within the - // configured time frame. If the optional is empty, the container was - // never scanned. - boolean recentlyScanned = lastScanTime.map(scanInstant -> - Duration.between(now, scanInstant).abs() - .compareTo(Duration.ofMillis(minScanGap)) < 0) - .orElse(false); - - if (recentlyScanned && log.isDebugEnabled()) { - log.debug("Skipping scan for container {} which was last " + - "scanned at {}. Current time is {}.", - container.getContainerData().getContainerID(), lastScanTime.get(), - now); - } - - return recentlyScanned; - } - /** * Get the .container file from the containerBaseDir. * @param containerBaseDir container base directory. The name of this diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java index 7e95518e2cf0..e03e61605eb1 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java @@ -37,7 +37,6 @@ import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Consumer; import java.util.function.ToLongFunction; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State; @@ -49,6 +48,7 @@ import org.apache.hadoop.ozone.container.common.statemachine.StateContext; import org.apache.hadoop.ozone.container.common.utils.ContainerLogger; import org.apache.hadoop.ozone.container.common.volume.HddsVolume; +import org.apache.hadoop.ozone.container.ozoneimpl.OnDemandContainerDataScanner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -69,7 +69,7 @@ public class ContainerSet implements Iterable> { private long recoveringTimeout; private final Table containerIdsTable; // Handler that will be invoked when a scan of a container in this set is requested. - private Consumer> containerScanHandler; + private OnDemandContainerDataScanner containerScanner; public static ContainerSet newReadOnlyContainerSet(long recoveringTimeout) { return new ContainerSet(null, recoveringTimeout); @@ -129,22 +129,38 @@ public void ensureContainerNotMissing(long containerId, State state) throws Stor } /** - * @param scanner A callback that will be invoked when a scan of a container in this set is requested. + * @param scanner The scanner instance will be invoked when a scan of a container in this set is requested. */ - public void registerContainerScanHandler(Consumer> scanner) { - this.containerScanHandler = scanner; + public void registerOnDemandScanner(OnDemandContainerDataScanner scanner) { + this.containerScanner = scanner; } /** - * Triggers a scan of a container in this set using the registered scan handler. This is a no-op if no scan handler - * is registered or the container does not exist in the set. + * Triggers a scan of a container in this set. This is a no-op if no scanner is registered or the container does not + * exist in the set. * @param containerID The container in this set to scan. */ public void scanContainer(long containerID) { - if (containerScanHandler != null) { + if (containerScanner != null) { Container container = getContainer(containerID); if (container != null) { - containerScanHandler.accept(container); + containerScanner.scanContainer(container); + } else { + LOG.warn("Request to scan container {} which was not found in the container set", containerID); + } + } + } + + /** + * Triggers a scan of a container in this set regardless of whether it was recently scanned. + * This is a no-op if no scanner is registered or the container does not exist in the set. + * @param containerID The container in this set to scan. + */ + public void scanContainerWithoutGap(long containerID) { + if (containerScanner != null) { + Container container = getContainer(containerID); + if (container != null) { + containerScanner.scanContainerWithoutGap(container); } else { LOG.warn("Request to scan container {} which was not found in the container set", containerID); } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java index 26ec6226fc35..d998d0f7bd91 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerType; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.ozone.container.checksum.ContainerChecksumTreeManager; +import org.apache.hadoop.ozone.container.checksum.ContainerMerkleTreeWriter; import org.apache.hadoop.ozone.container.checksum.DNContainerOperationClient; import org.apache.hadoop.ozone.container.common.helpers.BlockData; import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics; @@ -154,6 +155,18 @@ public abstract void exportContainer( public abstract void markContainerForClose(Container container) throws IOException; + /** + * Updates the container checksum information on disk and in memory and sends an ICR if the container checksum was + * changed from its previous value. + * + * @param container The container to update + * @param treeWriter The container merkle tree with the updated information about the container + * @throws IOException For errors sending an ICR or updating the container checksum on disk. If the disk update + * fails, the checksum in memory will not be updated and an ICR will not be sent. + */ + public abstract void updateContainerChecksum(Container container, ContainerMerkleTreeWriter treeWriter) + throws IOException; + /** * Marks the container Unhealthy. Moves the container to UNHEALTHY state. * diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/ContainerLogger.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/ContainerLogger.java index 682348ea2019..2063bac80b89 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/ContainerLogger.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/ContainerLogger.java @@ -148,6 +148,18 @@ public static void logRecovered(ContainerData containerData) { LOG.info(getMessage(containerData)); } + /** + * Logged when a container's checksum is updated. + * + * @param containerData The container which has the updated data checksum. + * @param oldDataChecksum The old data checksum. + */ + public static void logChecksumUpdated(ContainerData containerData, long oldDataChecksum) { + LOG.warn(getMessage(containerData, + "Container data checksum updated from " + checksumToString(oldDataChecksum) + " to " + + checksumToString(containerData.getDataChecksum()))); + } + /** * Logged when a container is reconciled. * diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerCheck.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerCheck.java index 9c754831aa29..e769781da4a4 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerCheck.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerCheck.java @@ -189,7 +189,6 @@ public DataScanResult fullCheck(DataTransferThrottler throttler, Canceler cancel LOG.debug("Running data checks for container {}", containerID); try { - // TODO HDDS-10374 this tree will get updated with the container's contents as it is scanned. ContainerMerkleTreeWriter dataTree = new ContainerMerkleTreeWriter(); List dataErrors = scanData(dataTree, throttler, canceler); if (containerIsDeleted()) { @@ -376,12 +375,15 @@ private List scanBlock(DBHandle db, File dbFile, BlockData b // So, we need to make sure, chunk length > 0, before declaring // the missing chunk file. if (!block.getChunks().isEmpty() && block.getChunks().get(0).getLen() > 0) { - ContainerScanError error = new ContainerScanError(FailureType.MISSING_CHUNK_FILE, + ContainerScanError error = new ContainerScanError(FailureType.MISSING_DATA_FILE, new File(containerDataFromDisk.getChunksPath()), new IOException("Missing chunk file " + chunkFile.getAbsolutePath())); blockErrors.add(error); } } else if (chunk.getChecksumData().getType() != ContainerProtos.ChecksumType.NONE) { + // Before adding chunks, add a block entry to the tree to represent cases where the block exists but has no + // chunks. + currentTree.addBlock(block.getBlockID().getLocalID()); int bytesPerChecksum = chunk.getChecksumData().getBytesPerChecksum(); ByteBuffer buffer = BUFFER_POOL.getBuffer(bytesPerChecksum); // Keep scanning the block even if there are errors with individual chunks. @@ -419,6 +421,14 @@ private static List verifyChecksum(BlockData block, List scanErrors = new ArrayList<>(); + // Information used to populate the merkle tree. Chunk metadata will be the same, but we must fill in the + // checksums with what we actually observe. + ContainerProtos.ChunkInfo.Builder observedChunkBuilder = chunk.toBuilder(); + ContainerProtos.ChecksumData.Builder observedChecksumData = chunk.getChecksumData().toBuilder(); + observedChecksumData.clearChecksums(); + boolean chunkHealthy = true; + boolean chunkMissing = false; + ChecksumData checksumData = ChecksumData.getFromProtoBuf(chunk.getChecksumData()); int checksumCount = checksumData.getChecksums().size(); @@ -431,10 +441,7 @@ private static List verifyChecksum(BlockData block, if (layout == ContainerLayoutVersion.FILE_PER_BLOCK) { channel.position(chunk.getOffset()); } - // Only report one error per chunk. Reporting corruption at every "bytes per checksum" interval will lead to a - // large amount of errors when a full chunk is corrupted. - boolean chunkHealthy = true; - for (int i = 0; i < checksumCount && chunkHealthy; i++) { + for (int i = 0; i < checksumCount; i++) { // limit last read for FILE_PER_BLOCK, to avoid reading next chunk if (layout == ContainerLayoutVersion.FILE_PER_BLOCK && i == checksumCount - 1 && @@ -454,7 +461,11 @@ private static List verifyChecksum(BlockData block, ByteString expected = checksumData.getChecksums().get(i); ByteString actual = cal.computeChecksum(buffer) .getChecksums().get(0); - if (!expected.equals(actual)) { + observedChecksumData.addChecksums(actual); + // Only report one error per chunk. Reporting corruption at every "bytes per checksum" interval will lead to a + // large amount of errors when a full chunk is corrupted. + // Continue scanning the chunk even after the first error so the full merkle tree can be built. + if (chunkHealthy && !expected.equals(actual)) { String message = String .format("Inconsistent read for chunk=%s" + " checksum item %d" + @@ -466,26 +477,46 @@ private static List verifyChecksum(BlockData block, StringUtils.bytes2Hex(expected.asReadOnlyByteBuffer()), StringUtils.bytes2Hex(actual.asReadOnlyByteBuffer()), block.getBlockID()); + chunkHealthy = false; scanErrors.add(new ContainerScanError(FailureType.CORRUPT_CHUNK, chunkFile, new OzoneChecksumException(message))); - chunkHealthy = false; } } - // If all the checksums match, also check that the length stored in the metadata matches the number of bytes - // seen on the disk. + + observedChunkBuilder.setLen(bytesRead); + // If we haven't seen any errors after scanning the whole chunk, verify that the length stored in the metadata + // matches the number of bytes seen on the disk. if (chunkHealthy && bytesRead != chunk.getLen()) { - String message = String - .format("Inconsistent read for chunk=%s expected length=%d" - + " actual length=%d for block %s", - chunk.getChunkName(), - chunk.getLen(), bytesRead, block.getBlockID()); - scanErrors.add(new ContainerScanError(FailureType.INCONSISTENT_CHUNK_LENGTH, chunkFile, - new IOException(message))); + if (bytesRead == 0) { + // If we could not find any data for the chunk, report it as missing. + chunkMissing = true; + chunkHealthy = false; + String message = String.format("Missing chunk=%s with expected length=%d for block %s", + chunk.getChunkName(), chunk.getLen(), block.getBlockID()); + scanErrors.add(new ContainerScanError(FailureType.MISSING_CHUNK, chunkFile, new IOException(message))); + } else { + // We found data for the chunk, but it was shorter than expected. + String message = String + .format("Inconsistent read for chunk=%s expected length=%d" + + " actual length=%d for block %s", + chunk.getChunkName(), + chunk.getLen(), bytesRead, block.getBlockID()); + chunkHealthy = false; + scanErrors.add(new ContainerScanError(FailureType.INCONSISTENT_CHUNK_LENGTH, chunkFile, + new IOException(message))); + } } } catch (IOException ex) { - scanErrors.add(new ContainerScanError(FailureType.MISSING_CHUNK_FILE, chunkFile, ex)); + // An unknown error occurred trying to access the chunk. Report it as corrupted. + chunkHealthy = false; + scanErrors.add(new ContainerScanError(FailureType.CORRUPT_CHUNK, chunkFile, ex)); } + // Missing chunks should not be added to the merkle tree. + if (!chunkMissing) { + observedChunkBuilder.setChecksumData(observedChecksumData); + currentTree.addChunks(block.getBlockID().getLocalID(), chunkHealthy, observedChunkBuilder.build()); + } return scanErrors; } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java index 6ef87063a65a..b956e6f50081 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java @@ -301,8 +301,7 @@ public ContainerCommandResponseProto handle( @VisibleForTesting static ContainerCommandResponseProto dispatchRequest(KeyValueHandler handler, - ContainerCommandRequestProto request, KeyValueContainer kvContainer, - DispatcherContext dispatcherContext) { + ContainerCommandRequestProto request, KeyValueContainer kvContainer, DispatcherContext dispatcherContext) { Type cmdType = request.getCmdType(); // Validate the request has been made to the correct datanode with the node id matching. if (kvContainer != null) { @@ -621,31 +620,6 @@ ContainerCommandResponseProto handleCloseContainer( return getSuccessResponse(request); } - /** - * Write the merkle tree for this container using the existing checksum metadata only. The data is not read or - * validated by this method, so it is expected to run quickly. - * - * If a checksum file already exists on the disk, this method will do nothing. The existing file would have either - * been made from the metadata or data itself so there is no need to recreate it from the metadata. - * - * TODO: This method should be changed to private after HDDS-10374 is merged. - * - * @param container The container which will have a tree generated. - */ - public void createContainerMerkleTreeFromMetadata(Container container) { - if (ContainerChecksumTreeManager.checksumFileExist(container)) { - return; - } - - try { - KeyValueContainerData containerData = (KeyValueContainerData) container.getContainerData(); - updateAndGetContainerChecksum(containerData); - } catch (IOException ex) { - LOG.error("Cannot create container checksum for container {} , Exception: ", - container.getContainerData().getContainerID(), ex); - } - } - /** * Handle Put Block operation. Calls BlockManager to process the request. */ @@ -1097,7 +1071,6 @@ ContainerCommandResponseProto handleWriteChunk( /** * Handle Write Chunk operation for closed container. Calls ChunkManager to process the request. - * */ public void writeChunkForClosedContainer(ChunkInfo chunkInfo, BlockID blockID, ChunkBuffer data, KeyValueContainer kvContainer) @@ -1132,7 +1105,7 @@ public void writeChunkForClosedContainer(ChunkInfo chunkInfo, BlockID blockID, */ public void putBlockForClosedContainer(KeyValueContainer kvContainer, BlockData blockData, long blockCommitSequenceId, boolean overwriteBscId) - throws IOException { + throws IOException { Preconditions.checkNotNull(kvContainer); Preconditions.checkNotNull(blockData); long startTime = Time.monotonicNowNanos(); @@ -1346,7 +1319,8 @@ private boolean checkContainerClose(KeyValueContainer kvContainer) { @Override public Container importContainer(ContainerData originalContainerData, final InputStream rawContainerStream, - final TarContainerPacker packer) throws IOException { + final TarContainerPacker packer) + throws IOException { Preconditions.checkState(originalContainerData instanceof KeyValueContainerData, "Should be KeyValueContainerData instance"); @@ -1367,8 +1341,8 @@ public Container importContainer(ContainerData originalContainerData, @Override public void exportContainer(final Container container, - final OutputStream outputStream, - final TarContainerPacker packer) + final OutputStream outputStream, + final TarContainerPacker packer) throws IOException { final KeyValueContainer kvc = (KeyValueContainer) container; kvc.exportContainerData(outputStream, packer); @@ -1394,11 +1368,97 @@ public void markContainerForClose(Container container) } finally { container.writeUnlock(); } - createContainerMerkleTreeFromMetadata(container); + updateContainerChecksumFromMetadataIfNeeded(container); ContainerLogger.logClosing(container.getContainerData()); sendICR(container); } + @Override + public void updateContainerChecksum(Container container, ContainerMerkleTreeWriter treeWriter) + throws IOException { + updateAndGetContainerChecksum(container, treeWriter, true); + } + + /** + * Write the merkle tree for this container using the existing checksum metadata only. The data is not read or + * validated by this method, so it is expected to run quickly. + *

+ * If a checksum file already exists on the disk, this method will do nothing. The existing file would have either + * been made from the metadata or data itself so there is no need to recreate it from the metadata. This method + * does not send an ICR with the updated checksum info. + *

+ * + * @param container The container which will have a tree generated. + */ + private void updateContainerChecksumFromMetadataIfNeeded(Container container) { + if (ContainerChecksumTreeManager.checksumFileExist(container)) { + return; + } + + try { + KeyValueContainer keyValueContainer = (KeyValueContainer) container; + updateAndGetContainerChecksumFromMetadata(keyValueContainer); + } catch (IOException ex) { + LOG.error("Cannot create container checksum for container {} , Exception: ", + container.getContainerData().getContainerID(), ex); + } + } + + /** + * Updates the container merkle tree based on the RocksDb's block metadata and returns the updated checksum info. + * This method does not send an ICR with the updated checksum info. + * @param container - Container for which the container merkle tree needs to be updated. + */ + private ContainerProtos.ContainerChecksumInfo updateAndGetContainerChecksumFromMetadata( + KeyValueContainer container) throws IOException { + ContainerMerkleTreeWriter merkleTree = new ContainerMerkleTreeWriter(); + try (DBHandle dbHandle = BlockUtils.getDB(container.getContainerData(), conf); + BlockIterator blockIterator = dbHandle.getStore(). + getBlockIterator(container.getContainerData().getContainerID())) { + while (blockIterator.hasNext()) { + BlockData blockData = blockIterator.nextBlock(); + merkleTree.addBlock(blockData.getLocalID()); + // Assume all chunks are healthy when building the tree from metadata. Scanner will identify corruption when + // it runs after. + List chunkInfos = blockData.getChunks(); + merkleTree.addChunks(blockData.getLocalID(), true, chunkInfos); + } + } + return updateAndGetContainerChecksum(container, merkleTree, false); + } + + private ContainerProtos.ContainerChecksumInfo updateAndGetContainerChecksum(Container container, + ContainerMerkleTreeWriter treeWriter, boolean sendICR) throws IOException { + ContainerData containerData = container.getContainerData(); + + // Attempt to write the new data checksum to disk. If persisting this fails, keep using the original data + // checksum to prevent divergence from what SCM sees in the ICR vs what datanode peers will see when pulling the + // merkle tree. + long originalDataChecksum = containerData.getDataChecksum(); + ContainerProtos.ContainerChecksumInfo updateChecksumInfo = checksumManager.writeContainerDataTree(containerData, + treeWriter); + long updatedDataChecksum = updateChecksumInfo.getContainerMerkleTree().getDataChecksum(); + + if (updatedDataChecksum != originalDataChecksum) { + containerData.setDataChecksum(updatedDataChecksum); + String message = + "Container data checksum updated from " + checksumToString(originalDataChecksum) + " to " + + checksumToString(updatedDataChecksum); + if (sendICR) { + sendICR(container); + } + if (ContainerChecksumTreeManager.hasContainerChecksumFile(containerData)) { + LOG.warn(message); + ContainerLogger.logChecksumUpdated(containerData, originalDataChecksum); + } else { + // If this is the first time the scanner has run with the feature to generate a checksum file, don't + // log a warning for the checksum update. + LOG.debug(message); + } + } + return updateChecksumInfo; + } + @Override public void markContainerUnhealthy(Container container, ScanResult reason) throws IOException { @@ -1427,7 +1487,7 @@ public void markContainerUnhealthy(Container container, ScanResult reason) } finally { container.writeUnlock(); } - createContainerMerkleTreeFromMetadata(container); + updateContainerChecksumFromMetadataIfNeeded(container); // Even if the container file is corrupted/missing and the unhealthy // update fails, the unhealthy state is kept in memory and sent to // SCM. Write a corresponding entry to the container log as well. @@ -1458,7 +1518,7 @@ public void quasiCloseContainer(Container container, String reason) } finally { container.writeUnlock(); } - createContainerMerkleTreeFromMetadata(container); + updateContainerChecksumFromMetadataIfNeeded(container); ContainerLogger.logQuasiClosed(container.getContainerData(), reason); sendICR(container); } @@ -1492,7 +1552,7 @@ public void closeContainer(Container container) } finally { container.writeUnlock(); } - createContainerMerkleTreeFromMetadata(container); + updateContainerChecksumFromMetadataIfNeeded(container); ContainerLogger.logClosed(container.getContainerData()); sendICR(container); } @@ -1518,7 +1578,7 @@ public void reconcileContainer(DNContainerOperationClient dnClient, Container originalChecksumInfo = optionalChecksumInfo.get(); } else { // Try creating the checksum info from RocksDB metadata if it is not present. - originalChecksumInfo = updateAndGetContainerChecksum(containerData); + originalChecksumInfo = updateAndGetContainerChecksumFromMetadata(kvContainer); } // This holds our current most up-to-date checksum info that we are using for the container. ContainerProtos.ContainerChecksumInfo latestChecksumInfo = originalChecksumInfo; @@ -1608,7 +1668,7 @@ public void reconcileContainer(DNContainerOperationClient dnClient, Container // Based on repaired done with this peer, write the updated merkle tree to the container. // This updated tree will be used when we reconcile with the next peer. ContainerProtos.ContainerChecksumInfo previousChecksumInfo = latestChecksumInfo; - latestChecksumInfo = checksumManager.writeContainerDataTree(containerData, updatedTreeWriter); + latestChecksumInfo = updateAndGetContainerChecksum(container, updatedTreeWriter, false); // Log the results of reconciliation with this peer. long duration = Instant.now().toEpochMilli() - start; @@ -1655,32 +1715,10 @@ containerID, peer, checksumToString(previousDataChecksum), checksumToString(late } // Trigger on demand scanner, which will build the merkle tree based on the newly ingested data. - containerSet.scanContainer(containerID); + containerSet.scanContainerWithoutGap(containerID); sendICR(container); } - /** - * Updates the container merkle tree based on the RocksDb's block metadata and returns the updated checksum info. - * @param containerData - Container data for which the container merkle tree needs to be updated. - */ - private ContainerProtos.ContainerChecksumInfo updateAndGetContainerChecksum(KeyValueContainerData containerData) - throws IOException { - ContainerMerkleTreeWriter merkleTree = new ContainerMerkleTreeWriter(); - try (DBHandle dbHandle = BlockUtils.getDB(containerData, conf); - BlockIterator blockIterator = dbHandle.getStore(). - getBlockIterator(containerData.getContainerID())) { - while (blockIterator.hasNext()) { - BlockData blockData = blockIterator.nextBlock(); - List chunkInfos = blockData.getChunks(); - // TODO: Add empty blocks to the merkle tree. Done in HDDS-10374, needs to be backported. - // Assume all chunks are healthy when building the tree from metadata. Scanner will identify corruption when - // it runs after. - merkleTree.addChunks(blockData.getLocalID(), true, chunkInfos); - } - } - return checksumManager.writeContainerDataTree(containerData, merkleTree); - } - /** * Read chunks from a peer datanode and use them to repair our container. * diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/AbstractBackgroundContainerScanner.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/AbstractBackgroundContainerScanner.java index 729e3c7af95e..72662bd4571e 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/AbstractBackgroundContainerScanner.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/AbstractBackgroundContainerScanner.java @@ -23,7 +23,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.ozone.container.common.interfaces.Container; -import org.apache.hadoop.ozone.container.common.interfaces.ScanResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -140,15 +139,6 @@ public final void handleRemainingSleep(long remainingSleep) { } } - public static void logUnhealthyScanResult(long containerID, ScanResult result, Logger log) { - LOG.error("Corruption detected in container [{}]. Marking it UNHEALTHY. {}", containerID, result); - if (log.isDebugEnabled()) { - StringBuilder allErrorString = new StringBuilder(); - result.getErrors().forEach(r -> allErrorString.append(r).append('\n')); - log.debug("Complete list of errors detected while scanning container {}:\n{}", containerID, allErrorString); - } - } - /** * Shutdown the current container scanning thread. * If the thread is already being shutdown, the call will block until the diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/BackgroundContainerDataScanner.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/BackgroundContainerDataScanner.java index 1b75c5b8e075..8b1da664159e 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/BackgroundContainerDataScanner.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/BackgroundContainerDataScanner.java @@ -19,14 +19,9 @@ import com.google.common.annotations.VisibleForTesting; import java.io.IOException; -import java.time.Instant; import java.util.Iterator; -import java.util.Optional; import org.apache.hadoop.hdfs.util.Canceler; import org.apache.hadoop.hdfs.util.DataTransferThrottler; -import org.apache.hadoop.ozone.container.checksum.ContainerChecksumTreeManager; -import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils; -import org.apache.hadoop.ozone.container.common.impl.ContainerData; import org.apache.hadoop.ozone.container.common.interfaces.Container; import org.apache.hadoop.ozone.container.common.volume.HddsVolume; import org.slf4j.Logger; @@ -49,12 +44,11 @@ public class BackgroundContainerDataScanner extends private final Canceler canceler; private static final String NAME_FORMAT = "ContainerDataScanner(%s)"; private final ContainerDataScannerMetrics metrics; - private final long minScanGap; - private final ContainerChecksumTreeManager checksumManager; + private final ContainerScanHelper scanHelper; public BackgroundContainerDataScanner(ContainerScannerConfiguration conf, ContainerController controller, - HddsVolume volume, ContainerChecksumTreeManager checksumManager) { + HddsVolume volume) { super(String.format(NAME_FORMAT, volume), conf.getDataScanInterval()); this.controller = controller; this.volume = volume; @@ -62,13 +56,7 @@ public BackgroundContainerDataScanner(ContainerScannerConfiguration conf, canceler = new Canceler(); this.metrics = ContainerDataScannerMetrics.create(volume.toString()); this.metrics.setStorageDirectory(volume.toString()); - this.minScanGap = conf.getContainerScanMinGap(); - this.checksumManager = checksumManager; - } - - private boolean shouldScan(Container container) { - return container.shouldScanData() && - !ContainerUtils.recentlyScanned(container, minScanGap, LOG); + this.scanHelper = ContainerScanHelper.withScanGap(LOG, controller, metrics, conf); } @Override @@ -80,40 +68,7 @@ public void scanContainer(Container c) shutdown("The volume has failed."); return; } - - if (!shouldScan(c)) { - return; - } - ContainerData containerData = c.getContainerData(); - long containerId = containerData.getContainerID(); - logScanStart(containerData); - DataScanResult result = c.scanData(throttler, canceler); - - if (result.isDeleted()) { - LOG.debug("Container [{}] has been deleted during the data scan.", containerId); - } else { - if (!result.isHealthy()) { - logUnhealthyScanResult(containerId, result, LOG); - - // Only increment the number of unhealthy containers if the container was not already unhealthy. - // TODO HDDS-11593 (to be merged in to the feature branch from master): Scanner counters will start from zero - // at the beginning of each run, so this will need to be incremented for every unhealthy container seen - // regardless of its previous state. - if (controller.markContainerUnhealthy(containerId, result)) { - metrics.incNumUnHealthyContainers(); - } - } - checksumManager.writeContainerDataTree(containerData, result.getDataTree()); - metrics.incNumContainersScanned(); - } - - // Even if the container was deleted, mark the scan as completed since we already logged it as starting. - Instant now = Instant.now(); - logScanCompleted(containerData, now); - - if (!result.isDeleted()) { - controller.updateDataScanTimestamp(containerId, now); - } + scanHelper.scanData(c, throttler, canceler); } @Override @@ -121,23 +76,6 @@ public Iterator> getContainerIterator() { return controller.getContainers(volume); } - private static void logScanStart(ContainerData containerData) { - if (LOG.isDebugEnabled()) { - Optional scanTimestamp = containerData.lastDataScanTime(); - Object lastScanTime = scanTimestamp.map(ts -> "at " + ts).orElse("never"); - LOG.debug("Scanning container {}, last scanned {}", - containerData.getContainerID(), lastScanTime); - } - } - - private static void logScanCompleted( - ContainerData containerData, Instant timestamp) { - if (LOG.isDebugEnabled()) { - LOG.debug("Completed scan of container {} at {}", - containerData.getContainerID(), timestamp); - } - } - @Override public synchronized void shutdown() { shutdown(""); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/BackgroundContainerMetadataScanner.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/BackgroundContainerMetadataScanner.java index 6cdbf3c9d396..02c786fed749 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/BackgroundContainerMetadataScanner.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/BackgroundContainerMetadataScanner.java @@ -20,10 +20,7 @@ import com.google.common.annotations.VisibleForTesting; import java.io.IOException; import java.util.Iterator; -import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils; -import org.apache.hadoop.ozone.container.common.impl.ContainerData; import org.apache.hadoop.ozone.container.common.interfaces.Container; -import org.apache.hadoop.ozone.container.common.volume.HddsVolume; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,17 +33,16 @@ public class BackgroundContainerMetadataScanner extends AbstractBackgroundContainerScanner { private static final Logger LOG = LoggerFactory.getLogger(BackgroundContainerMetadataScanner.class); - private final ContainerMetadataScannerMetrics metrics; private final ContainerController controller; - private final long minScanGap; + private final ContainerScanHelper scanHelper; public BackgroundContainerMetadataScanner(ContainerScannerConfiguration conf, ContainerController controller) { super("ContainerMetadataScanner", conf.getMetadataScanInterval()); this.controller = controller; this.metrics = ContainerMetadataScannerMetrics.create(); - this.minScanGap = conf.getContainerScanMinGap(); + this.scanHelper = ContainerScanHelper.withScanGap(LOG, controller, metrics, conf); } @Override @@ -58,21 +54,11 @@ public Iterator> getContainerIterator() { @Override public void scanContainer(Container container) throws IOException, InterruptedException { - // There is one background container metadata scanner per datanode. - // If this container's volume has failed, skip the container. - // The iterator returned by getContainerIterator may have stale results. - ContainerData data = container.getContainerData(); - long containerID = data.getContainerID(); - HddsVolume containerVolume = data.getVolume(); - if (containerVolume.isFailed()) { - LOG.debug("Skipping scan of container {}. Its volume {} has failed.", - containerID, containerVolume); + if (!scanHelper.shouldScanMetadata(container)) { return; } - if (!shouldScan(container)) { - return; - } + long containerID = container.getContainerData().getContainerID(); MetadataScanResult result = container.scanMetaData(); if (result.isDeleted()) { @@ -80,11 +66,7 @@ public void scanContainer(Container container) return; } if (!result.isHealthy()) { - logUnhealthyScanResult(containerID, result, LOG); - boolean containerMarkedUnhealthy = controller.markContainerUnhealthy(containerID, result); - if (containerMarkedUnhealthy) { - metrics.incNumUnHealthyContainers(); - } + scanHelper.handleUnhealthyScanResult(containerID, result); } // Do not update the scan timestamp after the scan since this was just a @@ -97,9 +79,4 @@ public ContainerMetadataScannerMetrics getMetrics() { return this.metrics; } - private boolean shouldScan(Container container) { - // Full data scan also does a metadata scan. If a full data scan was done - // recently, we can skip this metadata scan. - return !ContainerUtils.recentlyScanned(container, minScanGap, LOG); - } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java index 37e50953f050..9f328fee4de7 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerType; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto; import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException; +import org.apache.hadoop.ozone.container.checksum.ContainerMerkleTreeWriter; import org.apache.hadoop.ozone.container.checksum.DNContainerOperationClient; import org.apache.hadoop.ozone.container.common.impl.ContainerData; import org.apache.hadoop.ozone.container.common.impl.ContainerSet; @@ -124,6 +125,24 @@ public boolean markContainerUnhealthy(final long containerId, ScanResult reason) } } + /** + * Updates the container checksum information on disk and in memory. + * + * @param containerId The ID of the container to update + * @param treeWriter The container merkle tree with the updated information about the container + * @throws IOException For errors sending an ICR or updating the container checksum on disk. If the disk update + * fails, the checksum in memory will not be updated. + */ + public void updateContainerChecksum(long containerId, ContainerMerkleTreeWriter treeWriter) + throws IOException { + Container container = getContainer(containerId); + if (container == null) { + LOG.warn("Container {} not found, may be deleted, skip updating checksums", containerId); + } else { + getHandler(container).updateContainerChecksum(container, treeWriter); + } + } + /** * Returns the container report. * diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerScanError.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerScanError.java index 1b167dabd87c..a5dfe5bb8e21 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerScanError.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerScanError.java @@ -28,6 +28,24 @@ public class ContainerScanError { private final FailureType failureType; private final Throwable exception; + /** + * Represents the reason a container scan failed and a container should + * be marked unhealthy. + */ + public enum FailureType { + MISSING_CONTAINER_DIR, + MISSING_METADATA_DIR, + MISSING_CONTAINER_FILE, + MISSING_CHUNKS_DIR, + MISSING_DATA_FILE, + CORRUPT_CONTAINER_FILE, + CORRUPT_CHUNK, + MISSING_CHUNK, + INCONSISTENT_CHUNK_LENGTH, + INACCESSIBLE_DB, + WRITE_FAILURE, + } + public ContainerScanError(FailureType failure, File unhealthyFile, Exception exception) { this.unhealthyFile = unhealthyFile; this.failureType = failure; @@ -50,21 +68,4 @@ public Throwable getException() { public String toString() { return failureType + " for file " + unhealthyFile + " with exception: " + exception; } - - /** - * Represents the reason a container scan failed and a container should - * be marked unhealthy. - */ - public enum FailureType { - MISSING_CONTAINER_DIR, - MISSING_METADATA_DIR, - MISSING_CONTAINER_FILE, - MISSING_CHUNKS_DIR, - MISSING_CHUNK_FILE, - CORRUPT_CONTAINER_FILE, - CORRUPT_CHUNK, - INCONSISTENT_CHUNK_LENGTH, - INACCESSIBLE_DB, - WRITE_FAILURE, - } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerScanHelper.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerScanHelper.java new file mode 100644 index 000000000000..9e04b7df157d --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerScanHelper.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.ozoneimpl; + +import java.io.IOException; +import java.time.Duration; +import java.time.Instant; +import java.util.Optional; +import org.apache.hadoop.hdfs.util.Canceler; +import org.apache.hadoop.hdfs.util.DataTransferThrottler; +import org.apache.hadoop.ozone.container.common.impl.ContainerData; +import org.apache.hadoop.ozone.container.common.interfaces.Container; +import org.apache.hadoop.ozone.container.common.interfaces.ScanResult; +import org.apache.hadoop.ozone.container.common.volume.HddsVolume; +import org.slf4j.Logger; + +/** + * Mixin to handle common data and metadata scan operations among background and on-demand scanners. + */ +public final class ContainerScanHelper { + private final Logger log; + private final ContainerController controller; + private final AbstractContainerScannerMetrics metrics; + private final long minScanGap; + + public static ContainerScanHelper withoutScanGap(Logger log, ContainerController controller, + AbstractContainerScannerMetrics metrics) { + return new ContainerScanHelper(log, controller, metrics, 0); + } + + public static ContainerScanHelper withScanGap(Logger log, ContainerController controller, + AbstractContainerScannerMetrics metrics, ContainerScannerConfiguration conf) { + return new ContainerScanHelper(log, controller, metrics, conf.getContainerScanMinGap()); + } + + private ContainerScanHelper(Logger log, ContainerController controller, + AbstractContainerScannerMetrics metrics, long minScanGap) { + this.log = log; + this.controller = controller; + this.metrics = metrics; + this.minScanGap = minScanGap; + } + + public void scanData(Container container, DataTransferThrottler throttler, Canceler canceler) + throws IOException, InterruptedException { + if (!shouldScanData(container)) { + return; + } + ContainerData containerData = container.getContainerData(); + long containerId = containerData.getContainerID(); + logScanStart(containerData); + DataScanResult result = container.scanData(throttler, canceler); + + if (result.isDeleted()) { + log.debug("Container [{}] has been deleted during the data scan.", containerId); + } else { + try { + controller.updateContainerChecksum(containerId, result.getDataTree()); + } catch (IOException ex) { + log.warn("Failed to update container checksum after scan of container {}", containerId, ex); + } + if (!result.isHealthy()) { + handleUnhealthyScanResult(containerId, result); + } + metrics.incNumContainersScanned(); + } + + Instant now = Instant.now(); + if (!result.isDeleted()) { + controller.updateDataScanTimestamp(containerId, now); + } + // Even if the container was deleted, mark the scan as completed since we already logged it as starting. + logScanCompleted(containerData, now); + } + + public void handleUnhealthyScanResult(long containerID, ScanResult result) throws IOException { + + log.error("Corruption detected in container [{}]. Marking it UNHEALTHY. {}", containerID, result); + if (log.isDebugEnabled()) { + StringBuilder allErrorString = new StringBuilder(); + result.getErrors().forEach(r -> allErrorString.append(r).append('\n')); + log.debug("Complete list of errors detected while scanning container {}:\n{}", containerID, allErrorString); + } + + // Only increment the number of unhealthy containers if the container was not already unhealthy. + // TODO HDDS-11593: Scanner counters will start from zero + // at the beginning of each run, so this will need to be incremented for every unhealthy container seen + // regardless of its previous state. + boolean containerMarkedUnhealthy = controller.markContainerUnhealthy(containerID, result); + if (containerMarkedUnhealthy) { + metrics.incNumUnHealthyContainers(); + } + } + + public boolean shouldScanMetadata(Container container) { + if (container == null) { + return false; + } + long containerID = container.getContainerData().getContainerID(); + + HddsVolume containerVolume = container.getContainerData().getVolume(); + if (containerVolume.isFailed()) { + log.debug("Skipping scan for container {} since its volume {} has failed.", containerID, containerVolume); + return false; + } + + return !recentlyScanned(container.getContainerData()); + } + + public boolean shouldScanData(Container container) { + return shouldScanMetadata(container) && container.shouldScanData(); + } + + private boolean recentlyScanned(ContainerData containerData) { + Optional lastScanTime = containerData.lastDataScanTime(); + Instant now = Instant.now(); + // Container is considered recently scanned if it was scanned within the + // configured time frame. If the optional is empty, the container was + // never scanned. + boolean recentlyScanned = lastScanTime.map(scanInstant -> + Duration.between(now, scanInstant).abs() + .compareTo(Duration.ofMillis(minScanGap)) < 0) + .orElse(false); + + if (recentlyScanned && log.isDebugEnabled()) { + log.debug("Skipping scan for container {} which was last " + + "scanned at {}. Current time is {}.", + containerData.getContainerID(), lastScanTime.get(), + now); + } + + return recentlyScanned; + } + + private void logScanStart(ContainerData containerData) { + if (log.isDebugEnabled()) { + Optional scanTimestamp = containerData.lastDataScanTime(); + Object lastScanTime = scanTimestamp.map(ts -> "at " + ts).orElse("never"); + log.debug("Scanning container {}, last scanned {}", + containerData.getContainerID(), lastScanTime); + } + } + + private void logScanCompleted( + ContainerData containerData, Instant timestamp) { + log.debug("Completed scan of container {} at {}", + containerData.getContainerID(), timestamp); + } +} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerScannerConfiguration.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerScannerConfiguration.java index 92ccd0d619e8..e8a9105131f1 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerScannerConfiguration.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerScannerConfiguration.java @@ -217,4 +217,8 @@ public long getOnDemandBandwidthPerVolume() { public long getContainerScanMinGap() { return containerScanMinGap; } + + public void setContainerScanMinGap(long scanGap) { + containerScanMinGap = scanGap; + } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OnDemandContainerDataScanner.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OnDemandContainerDataScanner.java index ad30aa0c4371..a85358406bd1 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OnDemandContainerDataScanner.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OnDemandContainerDataScanner.java @@ -17,10 +17,7 @@ package org.apache.hadoop.ozone.container.ozoneimpl; -import static org.apache.hadoop.ozone.container.ozoneimpl.AbstractBackgroundContainerScanner.logUnhealthyScanResult; - import java.io.IOException; -import java.time.Instant; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; @@ -29,11 +26,7 @@ import java.util.concurrent.TimeUnit; import org.apache.hadoop.hdfs.util.Canceler; import org.apache.hadoop.hdfs.util.DataTransferThrottler; -import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils; -import org.apache.hadoop.ozone.container.common.impl.ContainerData; import org.apache.hadoop.ozone.container.common.interfaces.Container; -import org.apache.hadoop.ozone.container.common.interfaces.ScanResult; -import org.apache.hadoop.ozone.container.common.volume.HddsVolume; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,45 +38,46 @@ public final class OnDemandContainerDataScanner { LoggerFactory.getLogger(OnDemandContainerDataScanner.class); private final ExecutorService scanExecutor; - private final ContainerController containerController; private final DataTransferThrottler throttler; private final Canceler canceler; private final ConcurrentHashMap .KeySetView containerRescheduleCheckSet; private final OnDemandScannerMetrics metrics; - private final long minScanGap; + private final ContainerScanHelper scannerHelper; + private final ContainerScanHelper scannerHelperWithoutGap; public OnDemandContainerDataScanner( ContainerScannerConfiguration conf, ContainerController controller) { - containerController = controller; throttler = new DataTransferThrottler( conf.getOnDemandBandwidthPerVolume()); canceler = new Canceler(); metrics = OnDemandScannerMetrics.create(); scanExecutor = Executors.newSingleThreadExecutor(); containerRescheduleCheckSet = ConcurrentHashMap.newKeySet(); - minScanGap = conf.getContainerScanMinGap(); + this.scannerHelper = ContainerScanHelper.withScanGap(LOG, controller, metrics, conf); + this.scannerHelperWithoutGap = ContainerScanHelper.withoutScanGap(LOG, controller, metrics); } - private boolean shouldScan(Container container) { - if (container == null) { - return false; - } - long containerID = container.getContainerData().getContainerID(); - - HddsVolume containerVolume = container.getContainerData().getVolume(); - if (containerVolume.isFailed()) { - LOG.debug("Skipping on demand scan for container {} since its volume {}" + - " has failed.", containerID, containerVolume); - return false; - } + /** + * Triggers an on-demand scan of this container. + * @return An Optional containing a Future representing the pending scan task if the task is queued. + * The optional is empty if the task is not queued due to an ongoing scan. + */ + public Optional> scanContainer(Container container) { + return scanContainer(container, scannerHelper); + } - return !ContainerUtils.recentlyScanned(container, minScanGap, - LOG) && container.shouldScanData(); + /** + * Triggers an on-demand scan of this container regardless of whether it was recently scanned. + * @return An Optional containing a Future representing the pending scan task if the task is queued. + * The optional is empty if the task is not queued due to an ongoing scan. + */ + public Optional> scanContainerWithoutGap(Container container) { + return scanContainer(container, scannerHelperWithoutGap); } - public Optional> scanContainer(Container container) { - if (!shouldScan(container)) { + private Optional> scanContainer(Container container, ContainerScanHelper helper) { + if (!helper.shouldScanData(container)) { return Optional.empty(); } @@ -91,7 +85,7 @@ public Optional> scanContainer(Container container) { long containerId = container.getContainerData().getContainerID(); if (addContainerToScheduledContainers(containerId)) { resultFuture = scanExecutor.submit(() -> { - performOnDemandScan(container); + performOnDemandScan(container, helper); removeContainerFromScheduledContainers(containerId); }); } @@ -107,43 +101,12 @@ private void removeContainerFromScheduledContainers( containerRescheduleCheckSet.remove(containerId); } - private void performOnDemandScan(Container container) { - if (!shouldScan(container)) { - return; - } - - long containerId = container.getContainerData().getContainerID(); + private void performOnDemandScan(Container container, ContainerScanHelper helper) { try { - ContainerData containerData = container.getContainerData(); - logScanStart(containerData); - - ScanResult result = container.scanData(throttler, canceler); - // Metrics for skipped containers should not be updated. - if (result.isDeleted()) { - LOG.debug("Container [{}] has been deleted during the data scan.", containerId); - } else { - if (!result.isHealthy()) { - logUnhealthyScanResult(containerId, result, LOG); - boolean containerMarkedUnhealthy = containerController - .markContainerUnhealthy(containerId, result); - if (containerMarkedUnhealthy) { - metrics.incNumUnHealthyContainers(); - } - } - // TODO HDDS-10374 will need to update the merkle tree here as well. - metrics.incNumContainersScanned(); - } - - // Even if the container was deleted, mark the scan as completed since we already logged it as starting. - Instant now = Instant.now(); - logScanCompleted(containerData, now); - - if (!result.isDeleted()) { - containerController.updateDataScanTimestamp(containerId, now); - } + helper.scanData(container, throttler, canceler); } catch (IOException e) { LOG.warn("Unexpected exception while scanning container " - + containerId, e); + + container.getContainerData().getContainerID(), e); } catch (InterruptedException ex) { // This should only happen as part of shutdown, which will stop the // ExecutorService. @@ -151,21 +114,6 @@ private void performOnDemandScan(Container container) { } } - private void logScanStart(ContainerData containerData) { - if (LOG.isDebugEnabled()) { - Optional scanTimestamp = containerData.lastDataScanTime(); - Object lastScanTime = scanTimestamp.map(ts -> "at " + ts).orElse("never"); - LOG.debug("Scanning container {}, last scanned {}", - containerData.getContainerID(), lastScanTime); - } - } - - private void logScanCompleted( - ContainerData containerData, Instant timestamp) { - LOG.debug("Completed scan of container {} at {}", - containerData.getContainerID(), timestamp); - } - public OnDemandScannerMetrics getMetrics() { return metrics; } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java index 36cad1ba2081..e8a25aae1da6 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java @@ -383,8 +383,6 @@ private void startContainerScrub() { return; } - initOnDemandContainerScanner(c); - backgroundScanners = new LinkedList<>(); // This config is for testing the scanners in isolation. if (c.isMetadataScanEnabled()) { @@ -394,6 +392,7 @@ private void startContainerScrub() { // This config is for testing the scanners in isolation. if (c.isDataScanEnabled()) { initContainerScanner(c); + initOnDemandContainerScanner(c); } } @@ -406,7 +405,7 @@ private void initContainerScanner(ContainerScannerConfiguration c) { dataScanners = new ArrayList<>(); for (StorageVolume v : volumeSet.getVolumesList()) { BackgroundContainerDataScanner s = - new BackgroundContainerDataScanner(c, controller, (HddsVolume) v, checksumTreeManager); + new BackgroundContainerDataScanner(c, controller, (HddsVolume) v); s.start(); dataScanners.add(s); backgroundScanners.add(s); @@ -441,7 +440,7 @@ private void initOnDemandContainerScanner(ContainerScannerConfiguration c) { return; } onDemandScanner = new OnDemandContainerDataScanner(c, controller); - containerSet.registerContainerScanHandler(onDemandScanner::scanContainer); + containerSet.registerOnDemandScanner(onDemandScanner); } /** diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/checksum/ContainerMerkleTreeTestUtils.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/checksum/ContainerMerkleTreeTestUtils.java index 321e6a547b7c..981b0960f670 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/checksum/ContainerMerkleTreeTestUtils.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/checksum/ContainerMerkleTreeTestUtils.java @@ -28,7 +28,6 @@ import java.io.OutputStream; import java.nio.ByteBuffer; import java.nio.file.Files; -import java.util.ArrayList; import java.util.Arrays; import java.util.Comparator; import java.util.HashMap; @@ -44,7 +43,6 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.scm.OzoneClientConfig; import org.apache.hadoop.hdds.scm.ScmConfigKeys; -import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.ozone.HddsDatanodeService; import org.apache.hadoop.ozone.container.common.impl.ContainerData; import org.apache.hadoop.ozone.container.common.interfaces.Container; @@ -150,11 +148,10 @@ public static ContainerMerkleTreeWriter buildTestTree(ConfigurationSource conf, ContainerMerkleTreeWriter tree = new ContainerMerkleTreeWriter(); byte byteValue = 1; for (int blockIndex = 1; blockIndex <= numBlocks; blockIndex++) { - List chunks = new ArrayList<>(); for (int chunkIndex = 0; chunkIndex < 4; chunkIndex++) { - chunks.add(buildChunk(conf, chunkIndex, ByteBuffer.wrap(new byte[]{byteValue++, byteValue++, byteValue++}))); + tree.addChunks(blockIndex, true, + buildChunk(conf, chunkIndex, ByteBuffer.wrap(new byte[]{byteValue++, byteValue++, byteValue++}))); } - tree.addChunks(blockIndex, true, chunks); } return tree; } @@ -335,10 +332,9 @@ private static void assertEqualsChunkMerkleTree(List container = ozoneContainer.getController().getContainer(containerID); return ContainerChecksumTreeManager.checksumFileExist(container); } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/checksum/TestContainerMerkleTreeWriter.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/checksum/TestContainerMerkleTreeWriter.java index 8fbae2ee687d..3f42670c70c2 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/checksum/TestContainerMerkleTreeWriter.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/checksum/TestContainerMerkleTreeWriter.java @@ -68,9 +68,9 @@ public void testBuildOneChunkTree() { Collections.singletonList(chunkTree)); ContainerProtos.ContainerMerkleTree expectedTree = buildExpectedContainerTree(Collections.singletonList(blockTree)); - // Use the ContainerMerkleTree to build the same tree. + // Use the ContainerMerkleTreeWriter to build the same tree. ContainerMerkleTreeWriter actualTree = new ContainerMerkleTreeWriter(); - actualTree.addChunks(blockID, true, Collections.singletonList(chunk)); + actualTree.addChunks(blockID, true, chunk); // Ensure the trees match. ContainerProtos.ContainerMerkleTree actualTreeProto = actualTree.toProto(); @@ -106,7 +106,43 @@ public void testBuildTreeWithMissingChunks() { // Use the ContainerMerkleTree to build the same tree. ContainerMerkleTreeWriter actualTree = new ContainerMerkleTreeWriter(); - actualTree.addChunks(blockID, true, Arrays.asList(chunk1, chunk3)); + actualTree.addChunks(blockID, true, chunk1, chunk3); + + // Ensure the trees match. + ContainerProtos.ContainerMerkleTree actualTreeProto = actualTree.toProto(); + assertTreesSortedAndMatch(expectedTree, actualTreeProto); + } + + @Test + public void testBuildTreeWithEmptyBlock() { + final long blockID = 1; + ContainerProtos.BlockMerkleTree blockTree = buildExpectedBlockTree(blockID, Collections.emptyList()); + ContainerProtos.ContainerMerkleTree expectedTree = buildExpectedContainerTree(Collections.singletonList(blockTree)); + + // Use the ContainerMerkleTree to build the same tree. + ContainerMerkleTreeWriter actualTree = new ContainerMerkleTreeWriter(); + actualTree.addBlock(blockID); + + // Ensure the trees match. + ContainerProtos.ContainerMerkleTree actualTreeProto = actualTree.toProto(); + assertTreesSortedAndMatch(expectedTree, actualTreeProto); + } + + @Test + public void testAddBlockIdempotent() { + final long blockID = 1; + // Build the expected proto. + ContainerProtos.ChunkInfo chunk1 = buildChunk(config, 0, ByteBuffer.wrap(new byte[]{1, 2, 3})); + ContainerProtos.BlockMerkleTree blockTree = buildExpectedBlockTree(blockID, + Collections.singletonList(buildExpectedChunkTree(chunk1))); + ContainerProtos.ContainerMerkleTree expectedTree = buildExpectedContainerTree(Collections.singletonList(blockTree)); + + // Use the ContainerMerkleTree to build the same tree, calling addBlock in between adding chunks. + ContainerMerkleTreeWriter actualTree = new ContainerMerkleTreeWriter(); + actualTree.addBlock(blockID); + actualTree.addChunks(blockID, true, chunk1); + // This should not overwrite the chunk already added to the block. + actualTree.addBlock(blockID); // Ensure the trees match. ContainerProtos.ContainerMerkleTree actualTreeProto = actualTree.toProto(); @@ -137,8 +173,8 @@ public void testBuildTreeWithNonContiguousBlockIDs() { // Use the ContainerMerkleTree to build the same tree. // Add blocks and chunks out of order to test sorting. ContainerMerkleTreeWriter actualTree = new ContainerMerkleTreeWriter(); - actualTree.addChunks(blockID3, true, Arrays.asList(b3c2, b3c1)); - actualTree.addChunks(blockID1, true, Arrays.asList(b1c1, b1c2)); + actualTree.addChunks(blockID3, true, b3c2, b3c1); + actualTree.addChunks(blockID1, true, b1c1, b1c2); // Ensure the trees match. ContainerProtos.ContainerMerkleTree actualTreeProto = actualTree.toProto(); @@ -173,13 +209,13 @@ public void testAppendToBlocksWhileBuilding() throws Exception { // Test building by adding chunks to the blocks individually and out of order. ContainerMerkleTreeWriter actualTree = new ContainerMerkleTreeWriter(); // Add all of block 2 first. - actualTree.addChunks(blockID2, true, Arrays.asList(b2c1, b2c2)); + actualTree.addChunks(blockID2, true, b2c1, b2c2); // Then add block 1 in multiple steps wth chunks out of order. - actualTree.addChunks(blockID1, true, Collections.singletonList(b1c2)); - actualTree.addChunks(blockID1, true, Arrays.asList(b1c3, b1c1)); + actualTree.addChunks(blockID1, true, b1c2); + actualTree.addChunks(blockID1, true, b1c3, b1c1); // Add a duplicate chunk to block 3. It should overwrite the existing one. - actualTree.addChunks(blockID3, true, Arrays.asList(b3c1, b3c2)); - actualTree.addChunks(blockID3, true, Collections.singletonList(b3c2)); + actualTree.addChunks(blockID3, true, b3c1, b3c2); + actualTree.addChunks(blockID3, true, b3c2); // Ensure the trees match. ContainerProtos.ContainerMerkleTree actualTreeProto = actualTree.toProto(); diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerSet.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerSet.java index c5b8e41b69b4..10c897b1f54c 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerSet.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerSet.java @@ -26,6 +26,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -50,6 +51,7 @@ import org.apache.hadoop.ozone.container.keyvalue.ContainerLayoutTestInfo; import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer; import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; +import org.apache.hadoop.ozone.container.ozoneimpl.OnDemandContainerDataScanner; /** * Class used to test ContainerSet operations. @@ -294,11 +296,15 @@ public void testContainerScanHandler(ContainerLayoutVersion layout) throws Excep containerSet.scanContainer(FIRST_ID); AtomicLong invocationCount = new AtomicLong(); - containerSet.registerContainerScanHandler(c -> { + OnDemandContainerDataScanner mockScanner = mock(OnDemandContainerDataScanner.class); + when(mockScanner.scanContainer(any())).then(inv -> { + KeyValueContainer c = inv.getArgument(0); // If the handler was incorrectly triggered for a non-existent container, this assert would fail. assertEquals(FIRST_ID, c.getContainerData().getContainerID()); invocationCount.getAndIncrement(); + return null; }); + containerSet.registerOnDemandScanner(mockScanner); // Scan of an existing container when a handler is registered should trigger a scan. containerSet.scanContainer(FIRST_ID); @@ -309,6 +315,33 @@ public void testContainerScanHandler(ContainerLayoutVersion layout) throws Excep assertEquals(1, invocationCount.get()); } + @ContainerLayoutTestInfo.ContainerTest + public void testContainerScanHandlerWithoutGap(ContainerLayoutVersion layout) throws Exception { + setLayoutVersion(layout); + ContainerSet containerSet = createContainerSet(); + // Scan when no handler is registered should not throw an exception. + containerSet.scanContainer(FIRST_ID); + + AtomicLong invocationCount = new AtomicLong(); + OnDemandContainerDataScanner mockScanner = mock(OnDemandContainerDataScanner.class); + when(mockScanner.scanContainerWithoutGap(any())).then(inv -> { + KeyValueContainer c = inv.getArgument(0); + // If the handler was incorrectly triggered for a non-existent container, this assert would fail. + assertEquals(FIRST_ID, c.getContainerData().getContainerID()); + invocationCount.getAndIncrement(); + return null; + }); + containerSet.registerOnDemandScanner(mockScanner); + + // Scan of an existing container when a handler is registered should trigger a scan. + containerSet.scanContainerWithoutGap(FIRST_ID); + assertEquals(1, invocationCount.get()); + + // Scan of non-existent container should not throw exception or trigger an additional invocation. + containerSet.scanContainerWithoutGap(FIRST_ID - 1); + assertEquals(1, invocationCount.get()); + } + /** * Verify that {@code result} contains {@code count} containers * with IDs in increasing order starting at {@code startId}. diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestContainerCorruptions.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestContainerCorruptions.java index 20ad00676b23..28482088fc27 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestContainerCorruptions.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestContainerCorruptions.java @@ -91,7 +91,7 @@ public enum TestContainerCorruptions { MISSING_BLOCK((container, blockID) -> { File blockFile = getBlock(container, blockID); assertTrue(blockFile.delete()); - }, ContainerScanError.FailureType.MISSING_CHUNK_FILE), + }, ContainerScanError.FailureType.MISSING_DATA_FILE), CORRUPT_CONTAINER_FILE((container, blockID) -> { File containerFile = container.getContainerFile(); @@ -111,7 +111,10 @@ public enum TestContainerCorruptions { TRUNCATED_BLOCK((container, blockID) -> { File blockFile = getBlock(container, blockID); truncateFile(blockFile); - }, ContainerScanError.FailureType.INCONSISTENT_CHUNK_LENGTH); + }, + // This test completely removes all content from the block file. The scanner will see this as all the chunks in + // the block missing, hence MISSING_CHUNK instead of INCONSISTENT_CHUNK_LENGTH. + ContainerScanError.FailureType.MISSING_CHUNK); private final BiConsumer, Long> corruption; private final ContainerScanError.FailureType expectedResult; diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestContainerReconciliationWithMockDatanodes.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestContainerReconciliationWithMockDatanodes.java index ced198eeccd7..e95fc3ab3ec3 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestContainerReconciliationWithMockDatanodes.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestContainerReconciliationWithMockDatanodes.java @@ -89,6 +89,7 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; import org.mockito.MockedStatic; @@ -184,8 +185,7 @@ public static void teardown() { } } - // TODO HDDS-10374 once on-demand scanner can build merkle trees this test should pass. - // @ParameterizedTest + @ParameterizedTest @MethodSource("corruptionValues") public void testContainerReconciliation(int numBlocksToDelete, int numChunksToCorrupt) throws Exception { LOG.info("Healthy data checksum for container {} in this test is {}", CONTAINER_ID, @@ -225,13 +225,13 @@ public void testContainerReconciliation(int numBlocksToDelete, int numChunksToCo * Uses the on-demand container scanner metrics to wait for the expected number of on-demand scans to complete on * every datanode. */ - private void waitForExpectedScanCount(int expectedCount) throws Exception { + private void waitForExpectedScanCount(int expectedCountPerDatanode) throws Exception { for (MockDatanode datanode: datanodes) { try { - GenericTestUtils.waitFor(() -> datanode.getOnDemandScanCount() == expectedCount, 100, 10_000); + GenericTestUtils.waitFor(() -> datanode.getOnDemandScanCount() == expectedCountPerDatanode, 100, 10_000); } catch (TimeoutException ex) { LOG.error("Timed out waiting for on-demand scan count {} to reach expected count {} on datanode {}", - datanode.getOnDemandScanCount(), expectedCount, datanode); + datanode.getOnDemandScanCount(), expectedCountPerDatanode, datanode); throw ex; } } @@ -325,7 +325,7 @@ private static class MockDatanode { onDemandScanner = new OnDemandContainerDataScanner( conf.getObject(ContainerScannerConfiguration.class), controller); // Register the on-demand container scanner with the container set used by the KeyValueHandler. - containerSet.registerContainerScanHandler(onDemandScanner::scanContainer); + containerSet.registerOnDemandScanner(onDemandScanner); } public DatanodeDetails getDnDetails() { @@ -420,7 +420,7 @@ public KeyValueContainer getContainer(long containerID) { * Triggers a synchronous scan of the container. This method will block until the scan completes. */ public void scanContainer(long containerID) { - Optional> scanFuture = onDemandScanner.scanContainer(containerSet.getContainer(containerID)); + Optional> scanFuture = onDemandScanner.scanContainerWithoutGap(containerSet.getContainer(containerID)); assertTrue(scanFuture.isPresent()); try { diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainerCheck.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainerCheck.java index e9353b63bee3..51a1d774ead6 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainerCheck.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainerCheck.java @@ -37,6 +37,8 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Map; +import java.util.Optional; import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.commons.io.FileUtils; @@ -45,6 +47,8 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdfs.util.Canceler; import org.apache.hadoop.hdfs.util.DataTransferThrottler; +import org.apache.hadoop.ozone.container.checksum.ContainerChecksumTreeManager; +import org.apache.hadoop.ozone.container.checksum.ContainerDiffReport; import org.apache.hadoop.ozone.container.common.helpers.BlockData; import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion; import org.apache.hadoop.ozone.container.common.interfaces.BlockIterator; @@ -154,13 +158,22 @@ public void testAllDataErrorsCollected(ContainerTestVersionInfo versionInfo) thr DataScanResult result = kvCheck.fullCheck(throttler, null); assertTrue(result.isHealthy()); + // The scanner would write the checksum file to disk. `KeyValueContainerCheck` does not, so we will create the + // result here. + ContainerProtos.ContainerChecksumInfo healthyChecksumInfo = ContainerProtos.ContainerChecksumInfo.newBuilder() + .setContainerID(containerID) + .setContainerMerkleTree(result.getDataTree().toProto()) + .build(); // Put different types of block failures in the middle of the container. - CORRUPT_BLOCK.applyTo(container, 1); - MISSING_BLOCK.applyTo(container, 2); - TRUNCATED_BLOCK.applyTo(container, 4); + long corruptBlockID = 1; + long missingBlockID = 2; + long truncatedBlockID = 4; + CORRUPT_BLOCK.applyTo(container, corruptBlockID); + MISSING_BLOCK.applyTo(container, missingBlockID); + TRUNCATED_BLOCK.applyTo(container, truncatedBlockID); List expectedErrors = new ArrayList<>(); - // Corruption is applied to two different chunks within the block. + // Corruption is applied to two different chunks within the block, so the error will be raised twice. expectedErrors.add(CORRUPT_BLOCK.getExpectedResult()); expectedErrors.add(CORRUPT_BLOCK.getExpectedResult()); expectedErrors.add(MISSING_BLOCK.getExpectedResult()); @@ -175,12 +188,42 @@ public void testAllDataErrorsCollected(ContainerTestVersionInfo versionInfo) thr assertFalse(result.isHealthy()); // Check that all data errors were detected in order. - // TODO HDDS-10374 Use merkle tree to check the actual content affected by the errors. assertEquals(expectedErrors.size(), result.getErrors().size()); List actualErrors = result.getErrors().stream() .map(ContainerScanError::getFailureType) .collect(Collectors.toList()); assertEquals(expectedErrors, actualErrors); + + // Write the new tree into the container, as the scanner would do. + ContainerChecksumTreeManager checksumManager = new ContainerChecksumTreeManager(conf); + KeyValueContainerData containerData = container.getContainerData(); + checksumManager.writeContainerDataTree(containerData, result.getDataTree()); + // This will read the corrupted tree from the disk, which represents the current state of the container, and + // compare it against the original healthy tree. The diff we get back should match the failures we injected. + Optional generatedChecksumInfo = checksumManager.read(containerData); + assertTrue(generatedChecksumInfo.isPresent()); + ContainerDiffReport diffReport = checksumManager.diff(generatedChecksumInfo.get(), healthyChecksumInfo); + + LOG.info("Diff of healthy container with actual container {}", diffReport); + + // Check that the new tree identified all the expected errors by checking the diff. + Map> corruptChunks = diffReport.getCorruptChunks(); + // One block had corrupted chunks. + assertEquals(1, corruptChunks.size()); + List corruptChunksInBlock = corruptChunks.get(corruptBlockID); + assertEquals(2, corruptChunksInBlock.size()); + + // One block was truncated which resulted in all of its chunks being reported as missing. + Map> missingChunks = diffReport.getMissingChunks(); + assertEquals(1, missingChunks.size()); + List missingChunksInBlock = missingChunks.get(truncatedBlockID); + assertEquals(CHUNKS_PER_BLOCK, missingChunksInBlock.size()); + + // Check missing block was correctly identified in the tree diff. + List missingBlocks = diffReport.getMissingBlocks(); + assertEquals(1, missingBlocks.size()); + assertEquals(missingBlockID, missingBlocks.get(0).getBlockID()); + } /** diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java index 95ed16b2d965..503d8c0855d8 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java @@ -25,10 +25,13 @@ import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_DATANODE_DIR_KEY; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CONTAINER_LAYOUT_KEY; import static org.apache.hadoop.ozone.OzoneConsts.GB; +import static org.apache.hadoop.ozone.container.checksum.ContainerMerkleTreeTestUtils.assertTreesSortedAndMatch; +import static org.apache.hadoop.ozone.container.checksum.ContainerMerkleTreeTestUtils.buildTestTree; import static org.apache.hadoop.ozone.container.common.ContainerTestUtils.createBlockMetaData; import static org.apache.hadoop.ozone.container.common.impl.ContainerImplTestUtils.newContainerSet; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; @@ -75,9 +78,11 @@ import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.hdds.security.token.TokenVerifier; import org.apache.hadoop.ozone.container.checksum.ContainerChecksumTreeManager; +import org.apache.hadoop.ozone.container.checksum.ContainerMerkleTreeWriter; import org.apache.hadoop.ozone.container.checksum.DNContainerOperationClient; import org.apache.hadoop.ozone.container.common.ContainerTestUtils; import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics; +import org.apache.hadoop.ozone.container.common.impl.ContainerImplTestUtils; import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion; import org.apache.hadoop.ozone.container.common.impl.ContainerSet; import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher; @@ -641,6 +646,53 @@ public void testContainerChecksumInvocation(ContainerLayoutVersion layoutVersion Assertions.assertEquals(1, icrCount.get()); } + @ContainerLayoutTestInfo.ContainerTest + public void testUpdateContainerChecksum(ContainerLayoutVersion layoutVersion) throws Exception { + conf = new OzoneConfiguration(); + KeyValueContainerData data = new KeyValueContainerData(123L, layoutVersion, GB, + PipelineID.randomId().toString(), randomDatanodeDetails().getUuidString()); + data.setMetadataPath(tempDir.toString()); + data.setDbFile(dbFile.toFile()); + KeyValueContainer container = new KeyValueContainer(data, conf); + KeyValueContainerData containerData = container.getContainerData(); + ContainerSet containerSet = ContainerImplTestUtils.newContainerSet(); + containerSet.addContainer(container); + + // Allows checking the invocation count of the lambda. + AtomicInteger icrCount = new AtomicInteger(0); + ContainerMerkleTreeWriter treeWriter = buildTestTree(conf); + final long updatedDataChecksum = treeWriter.toProto().getDataChecksum(); + IncrementalReportSender icrSender = c -> { + // Check that the ICR contains expected info about the container. + ContainerReplicaProto report = c.getContainerReport(); + long reportedID = report.getContainerID(); + Assertions.assertEquals(containerData.getContainerID(), reportedID); + + assertEquals(updatedDataChecksum, report.getDataChecksum()); + icrCount.incrementAndGet(); + }; + + ContainerChecksumTreeManager checksumManager = new ContainerChecksumTreeManager(conf); + KeyValueHandler keyValueHandler = new KeyValueHandler(conf, randomDatanodeDetails().getUuidString(), containerSet, + mock(MutableVolumeSet.class), mock(ContainerMetrics.class), icrSender, checksumManager); + + + // Initially, container should have no checksum information. + assertEquals(0, containerData.getDataChecksum()); + assertFalse(checksumManager.read(containerData).isPresent()); + assertEquals(0, icrCount.get()); + + // Update container with checksum information. + keyValueHandler.updateContainerChecksum(container, treeWriter); + // Check ICR sent. The ICR sender verifies that the expected checksum is present in the report. + assertEquals(1, icrCount.get()); + // Check checksum in memory. + assertEquals(updatedDataChecksum, containerData.getDataChecksum()); + // Check disk content. + ContainerProtos.ContainerChecksumInfo checksumInfo = checksumManager.read(containerData).get(); + assertTreesSortedAndMatch(treeWriter.toProto(), checksumInfo.getContainerMerkleTree()); + } + @Test public void testGetContainerChecksumInfoOnInvalidContainerStates() { when(handler.handleGetContainerChecksumInfo(any(), any())).thenCallRealMethod(); @@ -768,7 +820,7 @@ private KeyValueHandler createKeyValueHandler(Path path) throws IOException { Collections.singletonMap(ContainerType.KeyValueContainer, kvHandler)); OnDemandContainerDataScanner onDemandScanner = new OnDemandContainerDataScanner( conf.getObject(ContainerScannerConfiguration.class), controller); - containerSet.registerContainerScanHandler(onDemandScanner::scanContainer); + containerSet.registerOnDemandScanner(onDemandScanner); return kvHandler; } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestFilePerBlockStrategy.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestFilePerBlockStrategy.java index 6265ac7d3be9..243fe218c5e8 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestFilePerBlockStrategy.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestFilePerBlockStrategy.java @@ -18,6 +18,8 @@ package org.apache.hadoop.ozone.container.keyvalue.impl; import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.hadoop.hdds.HddsConfigKeys.OZONE_METADATA_DIRS; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_DATANODE_DIR_KEY; import static org.apache.hadoop.ozone.container.ContainerTestHelper.getChunk; import static org.apache.hadoop.ozone.container.ContainerTestHelper.setDataChecksum; import static org.apache.hadoop.ozone.container.common.ContainerTestUtils.WRITE_STAGE; @@ -25,8 +27,11 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; +import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.file.Path; +import java.nio.file.Paths; import java.security.MessageDigest; import java.util.ArrayList; import java.util.List; @@ -57,6 +62,7 @@ import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; @@ -66,6 +72,9 @@ */ public class TestFilePerBlockStrategy extends CommonChunkManagerTestCases { + @TempDir + private File tempDir; + @Test public void testDeletePartialChunkWithOffsetUnsupportedRequest() { // GIVEN @@ -313,6 +322,10 @@ public KeyValueHandler createKeyValueHandler(ContainerSet containerSet) throws IOException { OzoneConfiguration conf = new OzoneConfiguration(); String dnUuid = UUID.randomUUID().toString(); + Path dataVolume = Paths.get(tempDir.toString(), "data"); + Path metadataVolume = Paths.get(tempDir.toString(), "metadata"); + conf.set(HDDS_DATANODE_DIR_KEY, dataVolume.toString()); + conf.set(OZONE_METADATA_DIRS, metadataVolume.toString()); MutableVolumeSet volumeSet = new MutableVolumeSet(dnUuid, conf, null, StorageVolume.VolumeType.DATA_VOLUME, null); return ContainerTestUtils.getKeyValueHandler(conf, dnUuid, containerSet, volumeSet); diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestBackgroundContainerDataScanner.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestBackgroundContainerDataScanner.java index 8c46b8d8f621..93ef66680d86 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestBackgroundContainerDataScanner.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestBackgroundContainerDataScanner.java @@ -25,24 +25,28 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.any; import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.atMost; import static org.mockito.Mockito.atMostOnce; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.eq; import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.io.IOException; import java.time.Duration; +import java.util.Arrays; import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdfs.util.Canceler; import org.apache.hadoop.hdfs.util.DataTransferThrottler; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; -import org.apache.hadoop.ozone.container.checksum.ContainerChecksumTreeManager; +import org.apache.hadoop.ozone.container.common.impl.ContainerData; import org.apache.hadoop.ozone.container.common.interfaces.Container; import org.apache.ozone.test.GenericTestUtils; import org.junit.jupiter.api.BeforeEach; @@ -63,8 +67,7 @@ public class TestBackgroundContainerDataScanner extends @BeforeEach public void setup() { super.setup(); - scanner = new BackgroundContainerDataScanner(conf, controller, vol, - new ContainerChecksumTreeManager(new OzoneConfiguration())); + scanner = new BackgroundContainerDataScanner(conf, controller, vol); } @Test @@ -194,6 +197,16 @@ public void testUnhealthyContainerRescanned() throws Exception { assertEquals(1, metrics.getNumUnHealthyContainers()); } + @Test + @Override + public void testChecksumUpdateFailure() throws Exception { + doThrow(new IOException("Checksum update error for testing")).when(controller) + .updateContainerChecksum(anyLong(), any()); + scanner.runIteration(); + verifyContainerMarkedUnhealthy(corruptData, atMostOnce()); + verify(corruptData.getContainerData(), atMostOnce()).setState(UNHEALTHY); + } + /** * A datanode will have one background data scanner per volume. When the * volume fails, the scanner thread should be terminated. @@ -242,6 +255,22 @@ public void testShutdownDuringScan() throws Exception { scanner.shutdown(); // The container should remain healthy. verifyContainerMarkedUnhealthy(healthy, never()); + } + + @Test + public void testMerkleTreeWritten() throws Exception { + scanner.runIteration(); + + // Merkle trees should not be written for open or deleted containers + for (Container container : Arrays.asList(openContainer, openCorruptMetadata, deletedContainer)) { + verify(controller, times(0)) + .updateContainerChecksum(eq(container.getContainerData().getContainerID()), any()); + } + // Merkle trees should be written for all other containers. + for (Container container : Arrays.asList(healthy, corruptData)) { + verify(controller, times(1)) + .updateContainerChecksum(eq(container.getContainerData().getContainerID()), any()); + } } } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestBackgroundContainerMetadataScanner.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestBackgroundContainerMetadataScanner.java index 9e684736f967..abc3126f7629 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestBackgroundContainerMetadataScanner.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestBackgroundContainerMetadataScanner.java @@ -25,15 +25,18 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.any; import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.atMost; import static org.mockito.Mockito.atMostOnce; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.io.IOException; import java.time.Duration; import java.util.Optional; import java.util.concurrent.CountDownLatch; @@ -165,6 +168,20 @@ public void testUnhealthyContainerRescanned() throws Exception { assertEquals(1, metrics.getNumUnHealthyContainers()); } + /** + * Metadata scanner should not update container checksum, so any errors that may be injected here should have no + * effect. + */ + @Test + @Override + public void testChecksumUpdateFailure() throws Exception { + doThrow(new IOException("Checksum update error for testing")).when(controller) + .updateContainerChecksum(anyLong(), any()); + scanner.runIteration(); + verifyContainerMarkedUnhealthy(openCorruptMetadata, atMostOnce()); + verify(openCorruptMetadata.getContainerData(), atMostOnce()).setState(UNHEALTHY); + } + /** * A datanode will have one metadata scanner thread for the whole process. * When a volume fails, any the containers queued for scanning in that volume diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestContainerScannersAbstract.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestContainerScannersAbstract.java index 4a8c5ef6dd71..f35374809706 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestContainerScannersAbstract.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestContainerScannersAbstract.java @@ -123,6 +123,13 @@ public abstract void testPreviouslyScannedContainerIsScanned() @Test public abstract void testUnhealthyContainerRescanned() throws Exception; + /** + * When the container checksum cannot be updated, the scan should still complete and move the container state without + * throwing an exception. + */ + @Test + public abstract void testChecksumUpdateFailure() throws Exception; + // HELPER METHODS protected void setScannedTimestampOld(Container container) { diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOnDemandContainerDataScanner.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOnDemandContainerDataScanner.java index 4045b959d850..42b3da8050d6 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOnDemandContainerDataScanner.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOnDemandContainerDataScanner.java @@ -25,18 +25,23 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.any; import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.atMost; import static org.mockito.Mockito.atMostOnce; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.eq; import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; +import java.io.IOException; import java.time.Duration; import java.util.ArrayList; +import java.util.Arrays; import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; @@ -45,6 +50,7 @@ import org.apache.hadoop.hdfs.util.Canceler; import org.apache.hadoop.hdfs.util.DataTransferThrottler; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.ozone.container.common.impl.ContainerData; import org.apache.hadoop.ozone.container.common.interfaces.Container; import org.apache.hadoop.ozone.container.common.interfaces.ScanResult; import org.junit.jupiter.api.AfterEach; @@ -78,6 +84,17 @@ public void testRecentlyScannedContainerIsSkipped() throws Exception { verify(healthy, never()).scanData(any(), any()); } + @Test + public void testBypassScanGap() throws Exception { + setScannedTimestampRecent(healthy); + + Optional> scanFutureOptional = onDemandScanner.scanContainerWithoutGap(healthy); + assertTrue(scanFutureOptional.isPresent()); + Future scanFuture = scanFutureOptional.get(); + scanFuture.get(); + verify(healthy, times(1)).scanData(any(), any()); + } + @Test @Override public void testPreviouslyScannedContainerIsScanned() throws Exception { @@ -281,6 +298,33 @@ public void testUnhealthyContainerRescanned() throws Exception { assertEquals(1, metrics.getNumUnHealthyContainers()); } + @Test + @Override + public void testChecksumUpdateFailure() throws Exception { + doThrow(new IOException("Checksum update error for testing")).when(controller) + .updateContainerChecksum(anyLong(), any()); + scanContainer(corruptData); + verifyContainerMarkedUnhealthy(corruptData, atMostOnce()); + verify(corruptData.getContainerData(), atMostOnce()).setState(UNHEALTHY); + } + + @Test + public void testMerkleTreeWritten() throws Exception { + // Merkle trees should not be written for open or deleted containers + for (Container container : Arrays.asList(openContainer, openCorruptMetadata, deletedContainer)) { + scanContainer(container); + verify(controller, times(0)) + .updateContainerChecksum(eq(container.getContainerData().getContainerID()), any()); + } + + // Merkle trees should be written for all other containers. + for (Container container : Arrays.asList(healthy, corruptData)) { + scanContainer(container); + verify(controller, times(1)) + .updateContainerChecksum(eq(container.getContainerData().getContainerID()), any()); + } + } + private void scanContainer(Container container) throws Exception { Optional> scanFuture = onDemandScanner.scanContainer(container); if (scanFuture.isPresent()) { diff --git a/hadoop-ozone/dist/src/main/smoketest/admincli/container.robot b/hadoop-ozone/dist/src/main/smoketest/admincli/container.robot index 425b5df63661..f784f45645e0 100644 --- a/hadoop-ozone/dist/src/main/smoketest/admincli/container.robot +++ b/hadoop-ozone/dist/src/main/smoketest/admincli/container.robot @@ -34,11 +34,14 @@ Container is closed ${output} = Execute ozone admin container info "${container}" Should contain ${output} CLOSED -Reconciliation complete - [arguments] ${container} - ${data_checksum} = Execute ozone admin container info "${container}" --json | jq -r '.replicas[].dataChecksum' | head -n1 - Should not be empty ${data_checksum} - Should not be equal as strings 0 ${data_checksum} +Container checksums should match + [arguments] ${container} ${expected_checksum} + ${data_checksum1} = Execute ozone admin container info "${container}" --json | jq -r '.replicas[0].dataChecksum' | head -n1 + ${data_checksum2} = Execute ozone admin container info "${container}" --json | jq -r '.replicas[1].dataChecksum' | head -n1 + ${data_checksum3} = Execute ozone admin container info "${container}" --json | jq -r '.replicas[2].dataChecksum' | head -n1 + Should be equal as strings ${data_checksum1} ${expected_checksum} + Should be equal as strings ${data_checksum2} ${expected_checksum} + Should be equal as strings ${data_checksum3} ${expected_checksum} *** Test Cases *** Create container @@ -179,11 +182,9 @@ Cannot reconcile open container # At this point we should have an open Ratis Three container. ${container} = Execute ozone admin container list --state OPEN | jq -r '.[] | select(.replicationConfig.replicationFactor == "THREE") | .containerID' | head -n1 Execute and check rc ozone admin container reconcile "${container}" 255 - # The container should not yet have any replica checksums. - # TODO When the scanner is computing checksums automatically, this test may need to be updated. - ${data_checksum} = Execute ozone admin container info "${container}" --json | jq -r '.replicas[].dataChecksum' | head -n1 + # The container should not yet have any replica checksums since it is still open. # 0 is the hex value of an empty checksum. - Should Be Equal As Strings 0 ${data_checksum} + Container checksums should match ${container} 0 Close container ${container} = Execute ozone admin container list --state OPEN | jq -r '.[] | select(.replicationConfig.replicationFactor == "THREE") | .containerID' | head -1 @@ -198,8 +199,9 @@ Reconcile closed container # Check that info does not show replica checksums, since manual reconciliation has not yet been triggered. ${container} = Execute ozone admin container list --state CLOSED | jq -r '.[] | select(.replicationConfig.replicationFactor == "THREE") | .containerID' | head -1 ${data_checksum} = Execute ozone admin container info "${container}" --json | jq -r '.replicas[].dataChecksum' | head -n1 - # 0 is the hex value of an empty checksum. After container close the data checksum should not be 0. + # Once the container is closed, the data checksum should be populated Should Not Be Equal As Strings 0 ${data_checksum} - # When reconciliation finishes, replica checksums should be shown. + Container checksums should match ${container} ${data_checksum} + # Check that reconcile CLI returns success. Without fault injection, there is no change expected to the + # container's checksums to indicate it made a difference Execute ozone admin container reconcile ${container} - Wait until keyword succeeds 1min 5sec Reconciliation complete ${container} diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestCloseContainer.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestCloseContainer.java index 3062b6c07c28..99cbde901c75 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestCloseContainer.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestCloseContainer.java @@ -133,14 +133,14 @@ public void testReplicasAreReportedForClosedContainerAfterRestart() // Checksum file doesn't exist before container close List hddsDatanodes = cluster.getHddsDatanodes(); for (HddsDatanodeService hddsDatanode: hddsDatanodes) { - assertFalse(containerChecksumFileExists(hddsDatanode, container)); + assertFalse(containerChecksumFileExists(hddsDatanode, container.getContainerID())); } OzoneTestUtils.closeContainer(scm, container); // Checksum file exists after container close for (HddsDatanodeService hddsDatanode: hddsDatanodes) { GenericTestUtils.waitFor(() -> checkContainerCloseInDatanode(hddsDatanode, container), 100, 5000); - assertTrue(containerChecksumFileExists(hddsDatanode, container)); + assertTrue(containerChecksumFileExists(hddsDatanode, container.getContainerID())); } long originalSeq = container.getSequenceId(); @@ -189,7 +189,7 @@ public void testCloseClosedContainer() // Checksum file doesn't exist before container close List hddsDatanodes = cluster.getHddsDatanodes(); for (HddsDatanodeService hddsDatanode: hddsDatanodes) { - assertFalse(containerChecksumFileExists(hddsDatanode, container)); + assertFalse(containerChecksumFileExists(hddsDatanode, container.getContainerID())); } // Close container OzoneTestUtils.closeContainer(scm, container); @@ -197,7 +197,7 @@ public void testCloseClosedContainer() // Checksum file exists after container close for (HddsDatanodeService hddsDatanode: hddsDatanodes) { GenericTestUtils.waitFor(() -> checkContainerCloseInDatanode(hddsDatanode, container), 100, 5000); - assertTrue(containerChecksumFileExists(hddsDatanode, container)); + assertTrue(containerChecksumFileExists(hddsDatanode, container.getContainerID())); } for (ContainerReplica replica : getContainerReplicas(container)) { @@ -221,7 +221,7 @@ public void testContainerChecksumForClosedContainer() throws Exception { // Checksum file doesn't exist before container close List hddsDatanodes = cluster.getHddsDatanodes(); for (HddsDatanodeService hddsDatanode : hddsDatanodes) { - assertFalse(containerChecksumFileExists(hddsDatanode, containerInfo1)); + assertFalse(containerChecksumFileExists(hddsDatanode, containerInfo1.getContainerID())); } // Close container. OzoneTestUtils.closeContainer(scm, containerInfo1); @@ -230,7 +230,7 @@ public void testContainerChecksumForClosedContainer() throws Exception { // merkle tree for all the datanodes for (HddsDatanodeService hddsDatanode : hddsDatanodes) { GenericTestUtils.waitFor(() -> checkContainerCloseInDatanode(hddsDatanode, containerInfo1), 100, 5000); - assertTrue(containerChecksumFileExists(hddsDatanode, containerInfo1)); + assertTrue(containerChecksumFileExists(hddsDatanode, containerInfo1.getContainerID())); OzoneContainer ozoneContainer = hddsDatanode.getDatanodeStateMachine().getContainer(); Container container1 = ozoneContainer.getController().getContainer(containerInfo1.getContainerID()); ContainerProtos.ContainerChecksumInfo containerChecksumInfo = ContainerMerkleTreeTestUtils.readChecksumFile( @@ -247,7 +247,7 @@ public void testContainerChecksumForClosedContainer() throws Exception { TestDataUtil.createKey(bucket, "key2", repConfig, "this is the different content".getBytes(UTF_8)); ContainerInfo containerInfo2 = scm.getContainerManager().getContainers().get(1); for (HddsDatanodeService hddsDatanode : hddsDatanodes) { - assertFalse(containerChecksumFileExists(hddsDatanode, containerInfo2)); + assertFalse(containerChecksumFileExists(hddsDatanode, containerInfo2.getContainerID())); } // Close container. @@ -257,7 +257,7 @@ public void testContainerChecksumForClosedContainer() throws Exception { // merkle tree for all the datanodes for (HddsDatanodeService hddsDatanode : hddsDatanodes) { GenericTestUtils.waitFor(() -> checkContainerCloseInDatanode(hddsDatanode, containerInfo2), 100, 5000); - assertTrue(containerChecksumFileExists(hddsDatanode, containerInfo2)); + assertTrue(containerChecksumFileExists(hddsDatanode, containerInfo2.getContainerID())); OzoneContainer ozoneContainer = hddsDatanode.getDatanodeStateMachine().getContainer(); Container container2 = ozoneContainer.getController().getContainer(containerInfo2.getContainerID()); ContainerProtos.ContainerChecksumInfo containerChecksumInfo = ContainerMerkleTreeTestUtils.readChecksumFile( diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/checksum/TestContainerCommandReconciliation.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/checksum/TestContainerCommandReconciliation.java index 213f6dfb6e06..fa4c66a2e028 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/checksum/TestContainerCommandReconciliation.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/checksum/TestContainerCommandReconciliation.java @@ -21,9 +21,12 @@ import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION; import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED; +import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL; import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_TOKEN_ENABLED; import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DATANODE_KERBEROS_KEYTAB_FILE_KEY; import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DATANODE_KERBEROS_PRINCIPAL_KEY; +import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL; +import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_NODE_REPORT_INTERVAL; import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_SECRET_KEY_EXPIRY_DURATION; import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_SECRET_KEY_ROTATE_CHECK_DURATION; import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_SECRET_KEY_ROTATE_DURATION; @@ -34,6 +37,9 @@ import static org.apache.hadoop.hdds.scm.ScmConfig.ConfigStrings.HDDS_SCM_KERBEROS_PRINCIPAL_KEY; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DEADNODE_INTERVAL; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL; import static org.apache.hadoop.hdds.scm.server.SCMHTTPServerConfig.ConfigStrings.HDDS_SCM_HTTP_KERBEROS_KEYTAB_FILE_KEY; import static org.apache.hadoop.hdds.scm.server.SCMHTTPServerConfig.ConfigStrings.HDDS_SCM_HTTP_KERBEROS_PRINCIPAL_KEY; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ADMINISTRATORS; @@ -43,7 +49,6 @@ import static org.apache.hadoop.ozone.container.checksum.ContainerMerkleTreeTestUtils.assertTreesSortedAndMatch; import static org.apache.hadoop.ozone.container.checksum.ContainerMerkleTreeTestUtils.buildTestTree; import static org.apache.hadoop.ozone.container.checksum.ContainerMerkleTreeTestUtils.readChecksumFile; -import static org.apache.hadoop.ozone.container.checksum.ContainerMerkleTreeTestUtils.writeContainerDataTreeProto; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_HTTP_KERBEROS_KEYTAB_FILE; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_HTTP_KERBEROS_PRINCIPAL_KEY; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_KERBEROS_KEYTAB_FILE_KEY; @@ -62,7 +67,6 @@ import java.nio.file.StandardOpenOption; import java.util.List; import java.util.Properties; -import java.util.Random; import java.util.Set; import java.util.UUID; import java.util.stream.Collectors; @@ -104,19 +108,19 @@ import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer; import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler; +import org.apache.hadoop.ozone.container.keyvalue.TestContainerCorruptions; import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils; import org.apache.hadoop.ozone.container.keyvalue.interfaces.BlockManager; -import org.apache.hadoop.ozone.container.ozoneimpl.ContainerScannerConfiguration; -import org.apache.hadoop.ozone.container.ozoneimpl.MetadataScanResult; import org.apache.hadoop.ozone.om.OzoneManager; import org.apache.hadoop.security.UserGroupInformation; import org.apache.ozone.test.GenericTestUtils; -import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * This class tests container commands for reconciliation. @@ -129,6 +133,7 @@ public class TestContainerCommandReconciliation { private static OzoneConfiguration conf; private static DNContainerOperationClient dnClient; private static final String KEY_NAME = "testkey"; + private static final Logger LOG = LoggerFactory.getLogger(TestContainerCommandReconciliation.class); @TempDir private static File testDir; @@ -148,10 +153,15 @@ public static void init() throws Exception { conf.set(OZONE_METADATA_DIRS, testDir.getAbsolutePath()); conf.setStorageSize(OZONE_SCM_CHUNK_SIZE_KEY, 128 * 1024, StorageUnit.BYTES); conf.setStorageSize(OZONE_SCM_BLOCK_SIZE, 512 * 1024, StorageUnit.BYTES); - // Disable the container scanner so it does not create merkle tree files that interfere with this test. - // TODO: Currently container scrub sets the checksum to 0, Revert this after HDDS-10374 is merged. - conf.getObject(ContainerScannerConfiguration.class).setEnabled(false); - conf.setBoolean("hdds.container.scrub.enabled", false); + // Support restarting datanodes and SCM in a rolling fashion to test checksum reporting after restart. + // Datanodes need to heartbeat more frequently, because they will not know that SCM was restarted until they + // heartbeat and SCM indicates they need to re-register. + conf.set(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, "200ms"); + conf.set(HDDS_HEARTBEAT_INTERVAL, "1s"); + conf.set(OZONE_SCM_STALENODE_INTERVAL, "3s"); + conf.set(OZONE_SCM_DEADNODE_INTERVAL, "6s"); + conf.set(HDDS_NODE_REPORT_INTERVAL, "5s"); + conf.set(HDDS_CONTAINER_REPORT_INTERVAL, "5s"); startMiniKdc(); setSecureConfig(); @@ -299,7 +309,6 @@ public void testGetEmptyChecksumInfo() throws Exception { Container container = targetDN.getDatanodeStateMachine().getContainer() .getContainerSet().getContainer(containerID); File treeFile = getContainerChecksumFile(container.getContainerData()); - // TODO After HDDS-10379 the file will already exist and need to be overwritten. assertTrue(treeFile.exists()); Files.write(treeFile.toPath(), new byte[]{}, StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.SYNC); @@ -376,21 +385,14 @@ public void testContainerChecksumWithBlockMissing() throws Exception { db.getStore().flushDB(); } - // TODO: Use On-demand container scanner to build the new container merkle tree. (HDDS-10374) - Files.deleteIfExists(getContainerChecksumFile(container.getContainerData()).toPath()); - kvHandler.createContainerMerkleTreeFromMetadata(container); + datanodeStateMachine.getContainer().getContainerSet().scanContainerWithoutGap(containerID); + waitForDataChecksumsAtSCM(containerID, 2); ContainerProtos.ContainerChecksumInfo containerChecksumAfterBlockDelete = readChecksumFile(container.getContainerData()); long dataChecksumAfterBlockDelete = containerChecksumAfterBlockDelete.getContainerMerkleTree().getDataChecksum(); // Checksum should have changed after block delete. assertNotEquals(oldDataChecksum, dataChecksumAfterBlockDelete); - // Since the container is already closed, we have manually updated the container checksum file. - // This doesn't update the checksum reported to SCM, and we need to trigger an ICR. - // Marking a container unhealthy will send an ICR. - kvHandler.markContainerUnhealthy(container, MetadataScanResult.deleted()); - waitForDataChecksumsAtSCM(containerID, 2); - // 3. Reconcile the container. cluster.getStorageContainerLocationClient().reconcileContainer(containerID); // Compare and check if dataChecksum is same on all replicas. @@ -418,7 +420,6 @@ public void testContainerChecksumChunkCorruption() throws Exception { HddsDatanodeService hddsDatanodeService = cluster.getHddsDatanode(dataNodeDetails.get(0)); DatanodeStateMachine datanodeStateMachine = hddsDatanodeService.getDatanodeStateMachine(); Container container = datanodeStateMachine.getContainer().getContainerSet().getContainer(containerID); - KeyValueContainerData containerData = (KeyValueContainerData) container.getContainerData(); ContainerProtos.ContainerChecksumInfo oldContainerChecksumInfo = readChecksumFile(container.getContainerData()); KeyValueHandler kvHandler = (KeyValueHandler) datanodeStateMachine.getContainer().getDispatcher() .getHandler(ContainerProtos.ContainerType.KeyValueContainer); @@ -427,42 +428,14 @@ public void testContainerChecksumChunkCorruption() throws Exception { List blockDatas = blockManager.listBlock(container, -1, 100); long oldDataChecksum = oldContainerChecksumInfo.getContainerMerkleTree().getDataChecksum(); - // 2. Corrupt first chunk for all the blocks - try (DBHandle db = BlockUtils.getDB(containerData, conf); - BatchOperation op = db.getStore().getBatchHandler().initBatchOperation()) { - for (BlockData blockData : blockDatas) { - // Modify the block metadata to simulate chunk corruption. - ContainerProtos.BlockData.Builder blockDataBuilder = blockData.getProtoBufMessage().toBuilder(); - blockDataBuilder.clearChunks(); - - ContainerProtos.ChunkInfo chunkInfo = blockData.getChunks().get(0); - ContainerProtos.ChecksumData.Builder checksumDataBuilder = ContainerProtos.ChecksumData.newBuilder() - .setBytesPerChecksum(chunkInfo.getChecksumData().getBytesPerChecksum()) - .setType(chunkInfo.getChecksumData().getType()); - - for (ByteString checksum : chunkInfo.getChecksumData().getChecksumsList()) { - byte[] checksumBytes = checksum.toByteArray(); - // Modify the checksum bytes to simulate corruption. - checksumBytes[0] = (byte) (checksumBytes[0] - 1); - checksumDataBuilder.addChecksums(ByteString.copyFrom(checksumBytes)).build(); - } - chunkInfo = chunkInfo.toBuilder().setChecksumData(checksumDataBuilder.build()).build(); - blockDataBuilder.addChunks(chunkInfo); - for (int i = 1; i < blockData.getChunks().size(); i++) { - blockDataBuilder.addChunks(blockData.getChunks().get(i)); - } - - // Modify the block metadata from the container db to simulate chunk corruption. - db.getStore().getBlockDataTable().putWithBatch(op, containerData.getBlockKey(blockData.getLocalID()), - BlockData.getFromProtoBuf(blockDataBuilder.build())); - } - db.getStore().getBatchHandler().commitBatchOperation(op); - db.getStore().flushDB(); + // 2. Corrupt every block in one replica. + for (BlockData blockData : blockDatas) { + long blockID = blockData.getLocalID(); + TestContainerCorruptions.CORRUPT_BLOCK.applyTo(container, blockID); } - Files.deleteIfExists(getContainerChecksumFile(container.getContainerData()).toPath()); - kvHandler.createContainerMerkleTreeFromMetadata(container); - // To set unhealthy for chunks that are corrupted. + datanodeStateMachine.getContainer().getContainerSet().scanContainerWithoutGap(containerID); + waitForDataChecksumsAtSCM(containerID, 2); ContainerProtos.ContainerChecksumInfo containerChecksumAfterChunkCorruption = readChecksumFile(container.getContainerData()); long dataChecksumAfterAfterChunkCorruption = containerChecksumAfterChunkCorruption @@ -470,32 +443,6 @@ public void testContainerChecksumChunkCorruption() throws Exception { // Checksum should have changed after chunk corruption. assertNotEquals(oldDataChecksum, dataChecksumAfterAfterChunkCorruption); - // 3. Set Unhealthy for first chunk of all blocks. This should be done by the scanner, Until then this is a - // manual step. - // TODO: Use On-demand container scanner to build the new container merkle tree (HDDS-10374) - Random random = new Random(); - ContainerProtos.ContainerChecksumInfo.Builder builder = containerChecksumAfterChunkCorruption.toBuilder(); - List blockMerkleTreeList = builder.getContainerMerkleTree() - .getBlockMerkleTreeList(); - builder.getContainerMerkleTreeBuilder().clearBlockMerkleTree(); - for (ContainerProtos.BlockMerkleTree blockMerkleTree : blockMerkleTreeList) { - ContainerProtos.BlockMerkleTree.Builder blockMerkleTreeBuilder = blockMerkleTree.toBuilder(); - List chunkMerkleTreeBuilderList = - blockMerkleTreeBuilder.getChunkMerkleTreeBuilderList(); - chunkMerkleTreeBuilderList.get(0).setIsHealthy(false).setDataChecksum(random.nextLong()); - blockMerkleTreeBuilder.setDataChecksum(random.nextLong()); - builder.getContainerMerkleTreeBuilder().addBlockMerkleTree(blockMerkleTreeBuilder.build()); - } - builder.getContainerMerkleTreeBuilder().setDataChecksum(random.nextLong()); - Files.deleteIfExists(getContainerChecksumFile(container.getContainerData()).toPath()); - writeContainerDataTreeProto(container.getContainerData(), builder.getContainerMerkleTree()); - - // Since the container is already closed, we have manually updated the container checksum file. - // This doesn't update the checksum reported to SCM, and we need to trigger an ICR. - // Marking a container unhealthy will send an ICR. - kvHandler.markContainerUnhealthy(container, MetadataScanResult.deleted()); - waitForDataChecksumsAtSCM(containerID, 2); - // 4. Reconcile the container. cluster.getStorageContainerLocationClient().reconcileContainer(containerID); // Compare and check if dataChecksum is same on all replicas. @@ -557,22 +504,15 @@ public void testDataChecksumReportedAtSCM() throws Exception { db.getStore().flushDB(); } - // TODO: Use On-demand container scanner to build the new container merkle tree. (HDDS-10374) - Files.deleteIfExists(getContainerChecksumFile(container.getContainerData()).toPath()); - kvHandler.createContainerMerkleTreeFromMetadata(container); + datanodeStateMachine.getContainer().getContainerSet().scanContainerWithoutGap(containerID); + waitForDataChecksumsAtSCM(containerID, 2); ContainerProtos.ContainerChecksumInfo containerChecksumAfterBlockDelete = readChecksumFile(container.getContainerData()); long dataChecksumAfterBlockDelete = containerChecksumAfterBlockDelete.getContainerMerkleTree().getDataChecksum(); // Checksum should have changed after block delete. assertNotEquals(oldDataChecksum, dataChecksumAfterBlockDelete); - // Since the container is already closed, we have manually updated the container checksum file. - // This doesn't update the checksum reported to SCM, and we need to trigger an ICR. - // Marking a container unhealthy will send an ICR. - kvHandler.markContainerUnhealthy(container, MetadataScanResult.deleted()); - waitForDataChecksumsAtSCM(containerID, 2); scmClient.reconcileContainer(containerID); - waitForDataChecksumsAtSCM(containerID, 1); // Check non-zero checksum after container reconciliation containerReplicas = scmClient.getContainerReplicas(containerID, ClientVersion.CURRENT_VERSION); @@ -604,11 +544,13 @@ private void waitForDataChecksumsAtSCM(long containerID, int expectedSize) throw ClientVersion.CURRENT_VERSION).stream() .map(HddsProtos.SCMContainerReplicaProto::getDataChecksum) .collect(Collectors.toSet()); + LOG.info("Waiting for {} total unique checksums from container {} to be reported to SCM. Currently {} unique" + + "checksums are reported.", expectedSize, containerID, dataChecksums.size()); return dataChecksums.size() == expectedSize; } catch (Exception ex) { return false; } - }, 500, 20000); + }, 1000, 20000); } private Pair getDataAndContainer(boolean close, int dataLen, String volumeName, String bucketName) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scanner/TestBackgroundContainerDataScannerIntegration.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scanner/TestBackgroundContainerDataScannerIntegration.java index db9bbfbbd732..2306af221c43 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scanner/TestBackgroundContainerDataScannerIntegration.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scanner/TestBackgroundContainerDataScannerIntegration.java @@ -17,10 +17,17 @@ package org.apache.hadoop.ozone.dn.scanner; +import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.CLOSED; +import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.UNHEALTHY; +import static org.apache.hadoop.ozone.container.checksum.ContainerMerkleTreeTestUtils.readChecksumFile; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.concurrent.TimeUnit; import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State; import org.apache.hadoop.ozone.container.common.interfaces.Container; import org.apache.hadoop.ozone.container.common.utils.ContainerLogger; @@ -75,6 +82,10 @@ void testCorruptionDetected(TestContainerCorruptions corruption) // Container corruption has not yet been introduced. Container container = getDnContainer(containerID); assertEquals(State.CLOSED, container.getContainerState()); + assertTrue(containerChecksumFileExists(containerID)); + + waitForScmToSeeReplicaState(containerID, CLOSED); + long initialReportedDataChecksum = getContainerReplica(containerID).getDataChecksum(); corruption.applyTo(container); @@ -85,8 +96,22 @@ void testCorruptionDetected(TestContainerCorruptions corruption) () -> container.getContainerState() == State.UNHEALTHY, 500, 15_000); - // Wait for SCM to get a report of the unhealthy replica. - waitForScmToSeeUnhealthyReplica(containerID); + // Wait for SCM to get a report of the unhealthy replica with a different checksum than before. + waitForScmToSeeReplicaState(containerID, UNHEALTHY); + long newReportedDataChecksum = getContainerReplica(containerID).getDataChecksum(); + if (corruption == TestContainerCorruptions.MISSING_METADATA_DIR || + corruption == TestContainerCorruptions.MISSING_CONTAINER_DIR) { + // In these cases, the new tree will not be able to be written since it exists in the metadata directory. + // When the tree write fails, the in-memory checksum should remain at its original value. + assertEquals(initialReportedDataChecksum, newReportedDataChecksum); + assertFalse(containerChecksumFileExists(containerID)); + } else { + assertNotEquals(initialReportedDataChecksum, newReportedDataChecksum); + // Test that the scanner wrote updated checksum info to the disk. + assertTrue(containerChecksumFileExists(containerID)); + ContainerProtos.ContainerChecksumInfo updatedChecksumInfo = readChecksumFile(container.getContainerData()); + assertEquals(newReportedDataChecksum, updatedChecksumInfo.getContainerMerkleTree().getDataChecksum()); + } if (corruption == TestContainerCorruptions.TRUNCATED_BLOCK || corruption == TestContainerCorruptions.CORRUPT_BLOCK) { diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scanner/TestBackgroundContainerMetadataScannerIntegration.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scanner/TestBackgroundContainerMetadataScannerIntegration.java index 45fb7288bc59..b25df7e11369 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scanner/TestBackgroundContainerMetadataScannerIntegration.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scanner/TestBackgroundContainerMetadataScannerIntegration.java @@ -17,7 +17,14 @@ package org.apache.hadoop.ozone.dn.scanner; +import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.CLOSED; +import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.OPEN; +import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.UNHEALTHY; +import static org.apache.hadoop.ozone.container.keyvalue.TestContainerCorruptions.MISSING_CONTAINER_DIR; +import static org.apache.hadoop.ozone.container.keyvalue.TestContainerCorruptions.MISSING_METADATA_DIR; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; import java.time.Duration; import java.util.Collection; @@ -89,10 +96,17 @@ void testCorruptionDetected(TestContainerCorruptions corruption) long closedContainerID = writeDataThenCloseContainer(); Container closedContainer = getDnContainer(closedContainerID); assertEquals(State.CLOSED, closedContainer.getContainerState()); + assertTrue(containerChecksumFileExists(closedContainerID)); + waitForScmToSeeReplicaState(closedContainerID, CLOSED); + long initialClosedChecksum = getContainerReplica(closedContainerID).getDataChecksum(); + assertNotEquals(0, initialClosedChecksum); long openContainerID = writeDataToOpenContainer(); Container openContainer = getDnContainer(openContainerID); assertEquals(State.OPEN, openContainer.getContainerState()); + waitForScmToSeeReplicaState(openContainerID, OPEN); + // Open containers should not yet have a checksum generated. + assertEquals(0, getContainerReplica(openContainerID).getDataChecksum()); // Corrupt both containers. corruption.applyTo(closedContainer); @@ -107,8 +121,21 @@ void testCorruptionDetected(TestContainerCorruptions corruption) 500, 5000); // Wait for SCM to get reports of the unhealthy replicas. - waitForScmToSeeUnhealthyReplica(closedContainerID); - waitForScmToSeeUnhealthyReplica(openContainerID); + // The metadata scanner does not generate data checksums and the other scanners have been turned off for this + // test, so the data checksums should not change. + waitForScmToSeeReplicaState(closedContainerID, UNHEALTHY); + assertEquals(initialClosedChecksum, getContainerReplica(closedContainerID).getDataChecksum()); + waitForScmToSeeReplicaState(openContainerID, UNHEALTHY); + if (corruption == MISSING_METADATA_DIR || corruption == MISSING_CONTAINER_DIR) { + // In these cases the tree cannot be generated when the container is marked unhealthy and the checksum should + // remain at 0. + // The tree is generated from metadata by the container changing to unhealthy, not by the metadata scanner. + assertEquals(0, getContainerReplica(openContainerID).getDataChecksum()); + } else { + // The checksum will be generated for the first time when the container is marked unhealthy. + // The tree is generated from metadata by the container changing to unhealthy, not by the metadata scanner. + assertNotEquals(0, getContainerReplica(openContainerID).getDataChecksum()); + } // Once the unhealthy replica is reported, the open container's lifecycle // state in SCM should move to closed. diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scanner/TestContainerScannerIntegrationAbstract.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scanner/TestContainerScannerIntegrationAbstract.java index 4363cb8830dc..2584138c126b 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scanner/TestContainerScannerIntegrationAbstract.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scanner/TestContainerScannerIntegrationAbstract.java @@ -48,6 +48,7 @@ import org.apache.hadoop.ozone.client.io.OzoneOutputStream; import org.apache.hadoop.ozone.container.ContainerTestHelper; import org.apache.hadoop.ozone.container.TestHelper; +import org.apache.hadoop.ozone.container.checksum.ContainerMerkleTreeTestUtils; import org.apache.hadoop.ozone.container.common.interfaces.Container; import org.apache.hadoop.ozone.container.ozoneimpl.ContainerScannerConfiguration; import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; @@ -112,13 +113,10 @@ static void shutdown() throws IOException { } } - protected void waitForScmToSeeUnhealthyReplica(long containerID) + protected void waitForScmToSeeReplicaState(long containerID, State state) throws Exception { - ContainerManager scmContainerManager = cluster.getStorageContainerManager() - .getContainerManager(); LambdaTestUtils.await(5000, 500, - () -> getContainerReplica(scmContainerManager, containerID) - .getState() == State.UNHEALTHY); + () -> getContainerReplica(containerID).getState() == state); } protected void waitForScmToCloseContainer(long containerID) throws Exception { @@ -139,6 +137,12 @@ protected Container getDnContainer(long containerID) { return getOzoneContainer().getContainerSet().getContainer(containerID); } + protected boolean containerChecksumFileExists(long containerID) { + assertEquals(1, cluster.getHddsDatanodes().size()); + HddsDatanodeService dn = cluster.getHddsDatanodes().get(0); + return ContainerMerkleTreeTestUtils.containerChecksumFileExists(dn, containerID); + } + protected long writeDataThenCloseContainer() throws Exception { return writeDataThenCloseContainer("keyName"); } @@ -181,11 +185,9 @@ protected byte[] getTestData() { .getBytes(UTF_8); } - protected ContainerReplica getContainerReplica( - ContainerManager cm, long containerId) throws ContainerNotFoundException { - Set containerReplicas = cm.getContainerReplicas( - ContainerID.valueOf( - containerId)); + protected ContainerReplica getContainerReplica(long containerId) throws ContainerNotFoundException { + ContainerManager cm = cluster.getStorageContainerManager().getContainerManager(); + Set containerReplicas = cm.getContainerReplicas(ContainerID.valueOf(containerId)); // Only using a single datanode cluster. assertEquals(1, containerReplicas.size()); return containerReplicas.iterator().next(); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scanner/TestOnDemandContainerDataScannerIntegration.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scanner/TestOnDemandContainerDataScannerIntegration.java index 5553b3703c8e..d3b3ed46fdeb 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scanner/TestOnDemandContainerDataScannerIntegration.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scanner/TestOnDemandContainerDataScannerIntegration.java @@ -17,10 +17,17 @@ package org.apache.hadoop.ozone.dn.scanner; +import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.CLOSED; +import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.UNHEALTHY; +import static org.apache.hadoop.ozone.container.checksum.ContainerMerkleTreeTestUtils.readChecksumFile; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.Collection; import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State; import org.apache.hadoop.ozone.container.common.interfaces.Container; import org.apache.hadoop.ozone.container.common.utils.ContainerLogger; @@ -95,6 +102,11 @@ void testCorruptionDetected(TestContainerCorruptions corruption) // Container corruption has not yet been introduced. Container container = getDnContainer(containerID); assertEquals(State.CLOSED, container.getContainerState()); + assertTrue(containerChecksumFileExists(containerID)); + + waitForScmToSeeReplicaState(containerID, CLOSED); + long initialReportedDataChecksum = getContainerReplica(containerID).getDataChecksum(); + // Corrupt the container. corruption.applyTo(container); // This method will check that reading from the corrupted key returns an @@ -107,7 +119,22 @@ void testCorruptionDetected(TestContainerCorruptions corruption) 500, 5000); // Wait for SCM to get a report of the unhealthy replica. - waitForScmToSeeUnhealthyReplica(containerID); + waitForScmToSeeReplicaState(containerID, UNHEALTHY); corruption.assertLogged(containerID, 1, logCapturer); + long newReportedDataChecksum = getContainerReplica(containerID).getDataChecksum(); + + if (corruption == TestContainerCorruptions.MISSING_METADATA_DIR || + corruption == TestContainerCorruptions.MISSING_CONTAINER_DIR) { + // In these cases, the new tree will not be able to be written since it exists in the metadata directory. + // When the tree write fails, the in-memory checksum should remain at its original value. + assertEquals(initialReportedDataChecksum, newReportedDataChecksum); + assertFalse(containerChecksumFileExists(containerID)); + } else { + assertNotEquals(initialReportedDataChecksum, newReportedDataChecksum); + // Test that the scanner wrote updated checksum info to the disk. + assertTrue(containerChecksumFileExists(containerID)); + ContainerProtos.ContainerChecksumInfo updatedChecksumInfo = readChecksumFile(container.getContainerData()); + assertEquals(newReportedDataChecksum, updatedChecksumInfo.getContainerMerkleTree().getDataChecksum()); + } } }