Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
69 commits
Select commit Hold shift + click to select a range
a8b8dbc
Merge branch 'HDDS-10239-container-reconciliation' into HDDS-10374-sc…
errose28 Nov 22, 2024
999a913
Add code to build and write the tree from the data scanners
errose28 Nov 22, 2024
b0d1ba9
Update todo in acceptance test
errose28 Nov 25, 2024
382bce2
Add unit tests for tree generation by scanners based on container state
errose28 Nov 25, 2024
28b1889
Add initial (failing) unit test for KeyValueContaienrCheck
errose28 Nov 26, 2024
dc182e8
Update container data checksum when building the tree
errose28 Nov 26, 2024
a3401a9
Fix handling of fully truncated block of 0 size
errose28 Jan 7, 2025
a25d44d
Add unit tests for new addBlock method in tree
errose28 Jan 7, 2025
7550a3c
Test that SCM gets a checksum with the container report
errose28 Jan 7, 2025
847f8d8
Add (failing) tests that SCM sees updated checksums
errose28 Jan 7, 2025
452c294
Update acceptance test
errose28 Jan 8, 2025
dc45eca
Add javadoc for tree generation from metadata
errose28 Jan 8, 2025
1cb291f
Data integration tests passing
errose28 Jan 8, 2025
d6b21d2
Don't generate tree from metadata for unhealthy container
errose28 Jan 9, 2025
2a2dbbd
Checkstyle
errose28 Jan 9, 2025
c9a077c
Marking container unhealthy should not write a merkle tree (test fix)
errose28 Jan 9, 2025
0bbbdc5
Checkstyle
errose28 Jan 9, 2025
7b971a9
Address review comments
errose28 Jan 13, 2025
15d6848
Merge branch 'HDDS-10239-container-reconciliation' into HDDS-10374-sc…
errose28 Apr 11, 2025
0989881
Initial use of on demand scan in TestKeyValueHandler
errose28 Apr 11, 2025
834be96
Make on-demand scanner a normal instance
errose28 Apr 15, 2025
e73757e
Register on-demand scan callback in ContainerSet
errose28 Apr 15, 2025
f0d8efe
Migrate scanContainer usage in prod code
errose28 Apr 15, 2025
4cb054c
Switch terminology from error to scan. Add existence checks
errose28 Apr 15, 2025
8abedb6
Update tests
errose28 Apr 15, 2025
577a075
Add unit test for ContainerSet
errose28 Apr 16, 2025
4c8d843
Checkstyle
errose28 Apr 16, 2025
0bd4127
Improve comments and test
errose28 Apr 16, 2025
61fae12
Merge branch 'non-static-on-demand-scan' into HDDS-10374-scanner-buil…
errose28 Apr 16, 2025
61f30f3
WIP migrate reconciliation unit tests
errose28 Apr 17, 2025
192eb7b
Most tests passing
errose28 Apr 23, 2025
0cf79f6
Improve logging in test and prod code
errose28 Apr 28, 2025
8b30f54
Fix tree tracking during reconcile process
errose28 Apr 28, 2025
9c74f4b
Use mixin to standardize scanner operations, log checksum changes in …
errose28 Apr 29, 2025
d550669
Logging improvements
errose28 Apr 29, 2025
97e02ea
Add checksum validation, generate readable data
errose28 Apr 30, 2025
22b41b8
Use tree writer between peer updates. All tests pass
errose28 May 5, 2025
f49a9dd
Wait for on-demand scans to complete in test
errose28 May 5, 2025
f5d4dbf
Improve char data generation, reset scan metrics
errose28 May 5, 2025
1140c90
Update test name
errose28 May 5, 2025
e0aa7cb
Checkstyle
errose28 May 5, 2025
62d7794
Merge branch 'HDDS-10239-container-reconciliation' into HDDS-10374-sc…
errose28 May 6, 2025
9c3b87c
Merge branch 'reconcile-unit-test-framework' into HDDS-10374-scanner-…
errose28 May 6, 2025
9322b4a
Fix TODOs dependent on this patch
errose28 May 13, 2025
9b75957
Rename container scan helper
errose28 May 13, 2025
f615275
Add comment on failure type
errose28 May 13, 2025
dadc829
Fix checkstyle unique to this PR
errose28 May 13, 2025
076a82e
Merge branch 'HDDS-10239-container-reconciliation' into HDDS-10374-sc…
errose28 May 14, 2025
cc55527
Fix sending ICR when only checksum changes (pending test)
errose28 May 14, 2025
35879b4
Updates after reviewing diff
errose28 May 14, 2025
1ab8c14
Add unit test for KeyValueHandler#updateContainerChecksum
errose28 May 14, 2025
6c8be07
Improve and update scanner integration tests
errose28 May 14, 2025
60a1a6e
Add unit tests that checksum update failure does not stop container s…
errose28 May 14, 2025
d035c17
Checkstyle
errose28 May 14, 2025
53336ae
Fix scan gap for unit test
errose28 May 15, 2025
56e7ed4
Merge branch 'HDDS-10239-container-reconciliation' into HDDS-10374-sc…
errose28 May 16, 2025
2504638
Fix metadata scan test
errose28 May 16, 2025
4be9992
Update based on review
errose28 May 19, 2025
c0b89dd
pmd
errose28 May 19, 2025
e24a24e
Update ContainerData checksum info after reconcile with each peer
errose28 May 22, 2025
dc27f74
Support bypassing scan gap (tests are failing)
errose28 May 22, 2025
e2974b4
Checkstyle
errose28 May 27, 2025
34b4b9a
Fix scan gap bug. All tests expected to pass
errose28 May 27, 2025
5fda700
Fix scan gap call
errose28 Jun 2, 2025
de6a757
Use temp dir for test, fix space overflow in CI
errose28 Jun 3, 2025
1574cdd
Add configs in test to support restarting DN and SCM quickly
errose28 Jun 3, 2025
7ff972c
Use standard corruption injection in failing test
errose28 Jun 3, 2025
02a3ac6
Checkstyle
errose28 Jun 3, 2025
54cbf92
Findbugs
errose28 Jun 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -83,15 +83,10 @@ public void stop() {
* The data merkle tree within the file is replaced with the {@code tree} parameter, but all other content of the
* file remains unchanged.
* Concurrent writes to the same file are coordinated internally.
* This method also updates the container's data checksum in the {@code data} parameter, which will be seen by SCM
* on container reports.
*/
public ContainerProtos.ContainerChecksumInfo writeContainerDataTree(ContainerData data,
ContainerMerkleTreeWriter tree) throws IOException {
long containerID = data.getContainerID();
// If there is an error generating the tree and we cannot obtain a final checksum, use 0 to indicate a metadata
// failure.
long dataChecksum = 0;
ContainerProtos.ContainerChecksumInfo checksumInfo = null;
Lock writeLock = getLock(containerID);
writeLock.lock();
Expand All @@ -113,13 +108,9 @@ public ContainerProtos.ContainerChecksumInfo writeContainerDataTree(ContainerDat
.setContainerMerkleTree(treeProto);
checksumInfo = checksumInfoBuilder.build();
write(data, checksumInfo);
// If write succeeds, update the checksum in memory. Otherwise 0 will be used to indicate the metadata failure.
dataChecksum = treeProto.getDataChecksum();
LOG.debug("Merkle tree for container {} updated with container data checksum {}", containerID,
checksumToString(dataChecksum));
LOG.debug("Data merkle tree for container {} updated with container checksum {}", containerID,
checksumToString(treeProto.getDataChecksum()));
} finally {
// Even if persisting the tree fails, we should still update the data checksum in memory to report back to SCM.
data.setDataChecksum(dataChecksum);
writeLock.unlock();
}
return checksumInfo;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,6 @@
import java.nio.file.Paths;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.time.Duration;
import java.time.Instant;
import java.util.Optional;
import java.util.UUID;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand All @@ -54,7 +51,6 @@
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -237,29 +233,6 @@ public static String getContainerFileChecksum(String containerDataYamlStr)
}
}

public static boolean recentlyScanned(Container<?> container,
long minScanGap, Logger log) {
Optional<Instant> lastScanTime =
container.getContainerData().lastDataScanTime();
Instant now = Instant.now();
// Container is considered recently scanned if it was scanned within the
// configured time frame. If the optional is empty, the container was
// never scanned.
boolean recentlyScanned = lastScanTime.map(scanInstant ->
Duration.between(now, scanInstant).abs()
.compareTo(Duration.ofMillis(minScanGap)) < 0)
.orElse(false);

if (recentlyScanned && log.isDebugEnabled()) {
log.debug("Skipping scan for container {} which was last " +
"scanned at {}. Current time is {}.",
container.getContainerData().getContainerID(), lastScanTime.get(),
now);
}

return recentlyScanned;
}

/**
* Get the .container file from the containerBaseDir.
* @param containerBaseDir container base directory. The name of this
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.ToLongFunction;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State;
Expand All @@ -49,6 +48,7 @@
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.container.common.utils.ContainerLogger;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
import org.apache.hadoop.ozone.container.ozoneimpl.OnDemandContainerDataScanner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -69,7 +69,7 @@ public class ContainerSet implements Iterable<Container<?>> {
private long recoveringTimeout;
private final Table<ContainerID, String> containerIdsTable;
// Handler that will be invoked when a scan of a container in this set is requested.
private Consumer<Container<?>> containerScanHandler;
private OnDemandContainerDataScanner containerScanner;

public static ContainerSet newReadOnlyContainerSet(long recoveringTimeout) {
return new ContainerSet(null, recoveringTimeout);
Expand Down Expand Up @@ -129,22 +129,38 @@ public void ensureContainerNotMissing(long containerId, State state) throws Stor
}

/**
* @param scanner A callback that will be invoked when a scan of a container in this set is requested.
* @param scanner The scanner instance will be invoked when a scan of a container in this set is requested.
*/
public void registerContainerScanHandler(Consumer<Container<?>> scanner) {
this.containerScanHandler = scanner;
public void registerOnDemandScanner(OnDemandContainerDataScanner scanner) {
this.containerScanner = scanner;
}

/**
* Triggers a scan of a container in this set using the registered scan handler. This is a no-op if no scan handler
* is registered or the container does not exist in the set.
* Triggers a scan of a container in this set. This is a no-op if no scanner is registered or the container does not
* exist in the set.
* @param containerID The container in this set to scan.
*/
public void scanContainer(long containerID) {
if (containerScanHandler != null) {
if (containerScanner != null) {
Container<?> container = getContainer(containerID);
if (container != null) {
containerScanHandler.accept(container);
containerScanner.scanContainer(container);
} else {
LOG.warn("Request to scan container {} which was not found in the container set", containerID);
}
}
}

/**
* Triggers a scan of a container in this set regardless of whether it was recently scanned.
* This is a no-op if no scanner is registered or the container does not exist in the set.
* @param containerID The container in this set to scan.
*/
public void scanContainerWithoutGap(long containerID) {
if (containerScanner != null) {
Container<?> container = getContainer(containerID);
if (container != null) {
containerScanner.scanContainerWithoutGap(container);
} else {
LOG.warn("Request to scan container {} which was not found in the container set", containerID);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerType;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.ozone.container.checksum.ContainerChecksumTreeManager;
import org.apache.hadoop.ozone.container.checksum.ContainerMerkleTreeWriter;
import org.apache.hadoop.ozone.container.checksum.DNContainerOperationClient;
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
Expand Down Expand Up @@ -154,6 +155,18 @@ public abstract void exportContainer(
public abstract void markContainerForClose(Container container)
throws IOException;

/**
* Updates the container checksum information on disk and in memory and sends an ICR if the container checksum was
* changed from its previous value.
*
* @param container The container to update
* @param treeWriter The container merkle tree with the updated information about the container
* @throws IOException For errors sending an ICR or updating the container checksum on disk. If the disk update
* fails, the checksum in memory will not be updated and an ICR will not be sent.
*/
public abstract void updateContainerChecksum(Container container, ContainerMerkleTreeWriter treeWriter)
throws IOException;

/**
* Marks the container Unhealthy. Moves the container to UNHEALTHY state.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,18 @@ public static void logRecovered(ContainerData containerData) {
LOG.info(getMessage(containerData));
}

/**
* Logged when a container's checksum is updated.
*
* @param containerData The container which has the updated data checksum.
* @param oldDataChecksum The old data checksum.
*/
public static void logChecksumUpdated(ContainerData containerData, long oldDataChecksum) {
LOG.warn(getMessage(containerData,
"Container data checksum updated from " + checksumToString(oldDataChecksum) + " to "
+ checksumToString(containerData.getDataChecksum())));
}

/**
* Logged when a container is reconciled.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,6 @@ public DataScanResult fullCheck(DataTransferThrottler throttler, Canceler cancel

LOG.debug("Running data checks for container {}", containerID);
try {
// TODO HDDS-10374 this tree will get updated with the container's contents as it is scanned.
ContainerMerkleTreeWriter dataTree = new ContainerMerkleTreeWriter();
List<ContainerScanError> dataErrors = scanData(dataTree, throttler, canceler);
if (containerIsDeleted()) {
Expand Down Expand Up @@ -376,12 +375,15 @@ private List<ContainerScanError> scanBlock(DBHandle db, File dbFile, BlockData b
// So, we need to make sure, chunk length > 0, before declaring
// the missing chunk file.
if (!block.getChunks().isEmpty() && block.getChunks().get(0).getLen() > 0) {
ContainerScanError error = new ContainerScanError(FailureType.MISSING_CHUNK_FILE,
ContainerScanError error = new ContainerScanError(FailureType.MISSING_DATA_FILE,
new File(containerDataFromDisk.getChunksPath()), new IOException("Missing chunk file " +
chunkFile.getAbsolutePath()));
blockErrors.add(error);
}
} else if (chunk.getChecksumData().getType() != ContainerProtos.ChecksumType.NONE) {
// Before adding chunks, add a block entry to the tree to represent cases where the block exists but has no
// chunks.
currentTree.addBlock(block.getBlockID().getLocalID());
int bytesPerChecksum = chunk.getChecksumData().getBytesPerChecksum();
ByteBuffer buffer = BUFFER_POOL.getBuffer(bytesPerChecksum);
// Keep scanning the block even if there are errors with individual chunks.
Expand Down Expand Up @@ -419,6 +421,14 @@ private static List<ContainerScanError> verifyChecksum(BlockData block,

List<ContainerScanError> scanErrors = new ArrayList<>();

// Information used to populate the merkle tree. Chunk metadata will be the same, but we must fill in the
// checksums with what we actually observe.
ContainerProtos.ChunkInfo.Builder observedChunkBuilder = chunk.toBuilder();
ContainerProtos.ChecksumData.Builder observedChecksumData = chunk.getChecksumData().toBuilder();
observedChecksumData.clearChecksums();
boolean chunkHealthy = true;
boolean chunkMissing = false;

ChecksumData checksumData =
ChecksumData.getFromProtoBuf(chunk.getChecksumData());
int checksumCount = checksumData.getChecksums().size();
Expand All @@ -431,10 +441,7 @@ private static List<ContainerScanError> verifyChecksum(BlockData block,
if (layout == ContainerLayoutVersion.FILE_PER_BLOCK) {
channel.position(chunk.getOffset());
}
// Only report one error per chunk. Reporting corruption at every "bytes per checksum" interval will lead to a
// large amount of errors when a full chunk is corrupted.
boolean chunkHealthy = true;
for (int i = 0; i < checksumCount && chunkHealthy; i++) {
for (int i = 0; i < checksumCount; i++) {
// limit last read for FILE_PER_BLOCK, to avoid reading next chunk
if (layout == ContainerLayoutVersion.FILE_PER_BLOCK &&
i == checksumCount - 1 &&
Expand All @@ -454,7 +461,11 @@ private static List<ContainerScanError> verifyChecksum(BlockData block,
ByteString expected = checksumData.getChecksums().get(i);
ByteString actual = cal.computeChecksum(buffer)
.getChecksums().get(0);
if (!expected.equals(actual)) {
observedChecksumData.addChecksums(actual);
// Only report one error per chunk. Reporting corruption at every "bytes per checksum" interval will lead to a
// large amount of errors when a full chunk is corrupted.
// Continue scanning the chunk even after the first error so the full merkle tree can be built.
if (chunkHealthy && !expected.equals(actual)) {
String message = String
.format("Inconsistent read for chunk=%s" +
" checksum item %d" +
Expand All @@ -466,26 +477,46 @@ private static List<ContainerScanError> verifyChecksum(BlockData block,
StringUtils.bytes2Hex(expected.asReadOnlyByteBuffer()),
StringUtils.bytes2Hex(actual.asReadOnlyByteBuffer()),
block.getBlockID());
chunkHealthy = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can break the loop here because smallest unit is chunk.

scanErrors.add(new ContainerScanError(FailureType.CORRUPT_CHUNK, chunkFile,
new OzoneChecksumException(message)));
chunkHealthy = false;
}
}
// If all the checksums match, also check that the length stored in the metadata matches the number of bytes
// seen on the disk.

observedChunkBuilder.setLen(bytesRead);
// If we haven't seen any errors after scanning the whole chunk, verify that the length stored in the metadata
// matches the number of bytes seen on the disk.
if (chunkHealthy && bytesRead != chunk.getLen()) {
String message = String
.format("Inconsistent read for chunk=%s expected length=%d"
+ " actual length=%d for block %s",
chunk.getChunkName(),
chunk.getLen(), bytesRead, block.getBlockID());
scanErrors.add(new ContainerScanError(FailureType.INCONSISTENT_CHUNK_LENGTH, chunkFile,
new IOException(message)));
if (bytesRead == 0) {
// If we could not find any data for the chunk, report it as missing.
chunkMissing = true;
chunkHealthy = false;
String message = String.format("Missing chunk=%s with expected length=%d for block %s",
chunk.getChunkName(), chunk.getLen(), block.getBlockID());
scanErrors.add(new ContainerScanError(FailureType.MISSING_CHUNK, chunkFile, new IOException(message)));
} else {
// We found data for the chunk, but it was shorter than expected.
String message = String
.format("Inconsistent read for chunk=%s expected length=%d"
+ " actual length=%d for block %s",
chunk.getChunkName(),
chunk.getLen(), bytesRead, block.getBlockID());
chunkHealthy = false;
scanErrors.add(new ContainerScanError(FailureType.INCONSISTENT_CHUNK_LENGTH, chunkFile,
new IOException(message)));
}
}
} catch (IOException ex) {
scanErrors.add(new ContainerScanError(FailureType.MISSING_CHUNK_FILE, chunkFile, ex));
// An unknown error occurred trying to access the chunk. Report it as corrupted.
chunkHealthy = false;
scanErrors.add(new ContainerScanError(FailureType.CORRUPT_CHUNK, chunkFile, ex));
}

// Missing chunks should not be added to the merkle tree.
if (!chunkMissing) {
observedChunkBuilder.setChecksumData(observedChecksumData);
currentTree.addChunks(block.getBlockID().getLocalID(), chunkHealthy, observedChunkBuilder.build());
}
return scanErrors;
}

Expand Down
Loading