diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java index 12ca9978c685..7776e245be05 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java @@ -38,9 +38,13 @@ import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Arrays; import java.util.ArrayList; +import java.util.Comparator; import java.util.List; import java.util.Objects; +import java.util.Optional; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; @@ -142,8 +146,34 @@ ContainerCommandResponseProto> executePutBlock(boolean close, } if (checksumBlockData != null) { - List currentChunks = getContainerBlockData().getChunksList(); + + // For the same BlockGroupLength, we need to find the larger value of Block DataSize. + // This is because we do not send empty chunks to the DataNode, so the larger value is more accurate. + Map> maxDataSizeByGroup = Arrays.stream(blockData) + .filter(Objects::nonNull) + .collect(Collectors.groupingBy(BlockData::getBlockGroupLength, + Collectors.maxBy(Comparator.comparingLong(BlockData::getSize)))); + BlockData maxBlockData = maxDataSizeByGroup.get(blockGroupLength).get(); + + // When calculating the checksum size, + // We need to consider both blockGroupLength and the actual size of blockData. + // + // We use the smaller value to determine the size of the ChunkList. + // + // 1. In most cases, blockGroupLength is equal to the size of blockData. + // 2. Occasionally, blockData is not fully filled; if a chunk is empty, + // it is not sent to the DN, resulting in blockData size being smaller than blockGroupLength. + // 3. In cases with 'dirty data', + // if an error occurs when writing to the EC-Stripe (e.g., DN reports Container Closed), + // and the length confirmed with OM is smaller, blockGroupLength may be smaller than blockData size. + long blockDataSize = Math.min(maxBlockData.getSize(), blockGroupLength); + int chunkSize = (int) Math.ceil(((double) blockDataSize / repConfig.getEcChunkSize())); List checksumBlockDataChunks = checksumBlockData.getChunks(); + if (chunkSize > 0) { + checksumBlockDataChunks = checksumBlockData.getChunks().subList(0, chunkSize); + } + + List currentChunks = getContainerBlockData().getChunksList(); Preconditions.checkArgument( currentChunks.size() == checksumBlockDataChunks.size(), @@ -269,7 +299,7 @@ public CompletableFuture executePutBlock(boolean close, throw ce; }); } catch (IOException | ExecutionException e) { - throw new IOException(EXCEPTION_MSG + e.toString(), e); + throw new IOException(EXCEPTION_MSG + e, e); } catch (InterruptedException ex) { Thread.currentThread().interrupt(); handleInterruptedException(ex, false); diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/BlockData.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/BlockData.java index 4bd170df8e82..ea5c5453f3f0 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/BlockData.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/BlockData.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hdds.utils.db.Codec; import org.apache.hadoop.hdds.utils.db.DelegatedCodec; import org.apache.hadoop.hdds.utils.db.Proto3Codec; +import org.apache.hadoop.ozone.OzoneConsts; import java.io.IOException; import java.util.Collections; @@ -280,4 +281,14 @@ public void appendTo(StringBuilder sb) { sb.append(", size=").append(size); sb.append("]"); } + + public long getBlockGroupLength() { + String lenStr = getMetadata() + .get(OzoneConsts.BLOCK_GROUP_LEN_KEY_IN_PUT_BLOCK); + // If we don't have the length, then it indicates a problem with the stripe. + // All replica should carry the length, so if it is not there, we return 0, + // which will cause us to set the length of the block to zero and not + // attempt to reconstruct it. + return (lenStr == null) ? 0 : Long.parseLong(lenStr); + } } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java index c274d8fea301..6f79839cd021 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java @@ -61,6 +61,7 @@ import org.apache.hadoop.ozone.client.OzoneKeyDetails; import org.apache.hadoop.ozone.client.OzoneVolume; import org.apache.hadoop.ozone.client.SecretKeyTestClient; +import org.apache.hadoop.ozone.client.io.BlockOutputStreamEntry; import org.apache.hadoop.ozone.client.io.InsufficientLocationsException; import org.apache.hadoop.ozone.client.io.KeyOutputStream; import org.apache.hadoop.ozone.client.io.OzoneOutputStream; @@ -83,6 +84,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; import java.io.IOException; @@ -99,6 +101,7 @@ import java.util.SortedMap; import java.util.TreeMap; import java.util.UUID; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Collectors; @@ -117,6 +120,7 @@ import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.params.provider.Arguments.arguments; /** * This class tests container commands on EC containers. @@ -613,30 +617,33 @@ private static byte[] getBytesWith(int singleDigitNumber, int total) { @ParameterizedTest @MethodSource("recoverableMissingIndexes") - void testECReconstructionCoordinatorWith(List missingIndexes) + void testECReconstructionCoordinatorWith(List missingIndexes, boolean triggerRetry) throws Exception { - testECReconstructionCoordinator(missingIndexes, 3); + testECReconstructionCoordinator(missingIndexes, 3, triggerRetry); } @ParameterizedTest @MethodSource("recoverableMissingIndexes") - void testECReconstructionCoordinatorWithPartialStripe(List missingIndexes) - throws Exception { - testECReconstructionCoordinator(missingIndexes, 1); + void testECReconstructionCoordinatorWithPartialStripe(List missingIndexes, + boolean triggerRetry) throws Exception { + testECReconstructionCoordinator(missingIndexes, 1, triggerRetry); } @ParameterizedTest @MethodSource("recoverableMissingIndexes") - void testECReconstructionCoordinatorWithFullAndPartialStripe(List missingIndexes) - throws Exception { - testECReconstructionCoordinator(missingIndexes, 4); + void testECReconstructionCoordinatorWithFullAndPartialStripe(List missingIndexes, + boolean triggerRetry) throws Exception { + testECReconstructionCoordinator(missingIndexes, 4, triggerRetry); } - static Stream> recoverableMissingIndexes() { - return Stream - .concat(IntStream.rangeClosed(1, 5).mapToObj(ImmutableList::of), Stream - .of(ImmutableList.of(2, 3), ImmutableList.of(2, 4), - ImmutableList.of(3, 5), ImmutableList.of(4, 5))); + static Stream recoverableMissingIndexes() { + Stream args = IntStream.rangeClosed(1, 5).mapToObj(i -> arguments(ImmutableList.of(i), true)); + Stream args1 = IntStream.rangeClosed(1, 5).mapToObj(i -> arguments(ImmutableList.of(i), false)); + Stream args2 = Stream.of(arguments(ImmutableList.of(2, 3), true), + arguments(ImmutableList.of(2, 4), true), arguments(ImmutableList.of(3, 5), true)); + Stream args3 = Stream.of(arguments(ImmutableList.of(2, 3), false), + arguments(ImmutableList.of(2, 4), false), arguments(ImmutableList.of(3, 5), false)); + return Stream.concat(Stream.concat(args, args1), Stream.concat(args2, args3)); } /** @@ -647,7 +654,7 @@ static Stream> recoverableMissingIndexes() { public void testECReconstructionCoordinatorWithMissingIndexes135() { InsufficientLocationsException exception = assertThrows(InsufficientLocationsException.class, () -> { - testECReconstructionCoordinator(ImmutableList.of(1, 3, 5), 3); + testECReconstructionCoordinator(ImmutableList.of(1, 3, 5), 3, false); }); String expectedMessage = @@ -658,7 +665,7 @@ public void testECReconstructionCoordinatorWithMissingIndexes135() { } private void testECReconstructionCoordinator(List missingIndexes, - int numInputChunks) throws Exception { + int numInputChunks, boolean triggerRetry) throws Exception { ObjectStore objectStore = rpcClient.getObjectStore(); String keyString = UUID.randomUUID().toString(); String volumeName = UUID.randomUUID().toString(); @@ -667,7 +674,7 @@ private void testECReconstructionCoordinator(List missingIndexes, objectStore.getVolume(volumeName).createBucket(bucketName); OzoneVolume volume = objectStore.getVolume(volumeName); OzoneBucket bucket = volume.getBucket(bucketName); - createKeyAndWriteData(keyString, bucket, numInputChunks); + createKeyAndWriteData(keyString, bucket, numInputChunks, triggerRetry); try ( XceiverClientManager xceiverClientManager = @@ -779,7 +786,7 @@ private void testECReconstructionCoordinator(List missingIndexes, .getReplicationConfig(), cToken); assertEquals(blockDataArrList.get(i).length, reconstructedBlockData.length); - checkBlockData(blockDataArrList.get(i), reconstructedBlockData); + checkBlockDataWithRetry(blockDataArrList.get(i), reconstructedBlockData, triggerRetry); XceiverClientSpi client = xceiverClientManager.acquireClient( newTargetPipeline); try { @@ -800,7 +807,7 @@ private void testECReconstructionCoordinator(List missingIndexes, } private void createKeyAndWriteData(String keyString, OzoneBucket bucket, - int numChunks) throws IOException { + int numChunks, boolean triggerRetry) throws IOException { for (int i = 0; i < numChunks; i++) { inputChunks[i] = getBytesWith(i + 1, EC_CHUNK_SIZE); } @@ -809,11 +816,48 @@ private void createKeyAndWriteData(String keyString, OzoneBucket bucket, new HashMap<>())) { assertInstanceOf(KeyOutputStream.class, out.getOutputStream()); for (int i = 0; i < numChunks; i++) { + // We generally wait until the data is written to the last chunk + // before attempting to trigger CloseContainer. + // We use an asynchronous approach for this trigger, + // aiming to ensure that closing the container does not interfere with the write operation. + // However, this process often needs to be executed multiple times before it takes effect. + if (i == numChunks - 1 && triggerRetry) { + triggerRetryByCloseContainer(out); + } out.write(inputChunks[i]); } } } + private void triggerRetryByCloseContainer(OzoneOutputStream out) { + CompletableFuture.runAsync(() -> { + BlockOutputStreamEntry blockOutputStreamEntry = out.getKeyOutputStream().getStreamEntries().get(0); + BlockID entryBlockID = blockOutputStreamEntry.getBlockID(); + long entryContainerID = entryBlockID.getContainerID(); + Pipeline entryPipeline = blockOutputStreamEntry.getPipeline(); + Map replicaIndexes = entryPipeline.getReplicaIndexes(); + try { + for (Map.Entry entry : replicaIndexes.entrySet()) { + DatanodeDetails key = entry.getKey(); + Integer value = entry.getValue(); + XceiverClientManager xceiverClientManager = new XceiverClientManager(config); + Token cToken = containerTokenGenerator + .generateToken(ANY_USER, ContainerID.valueOf(entryContainerID)); + XceiverClientSpi client = xceiverClientManager.acquireClient( + createSingleNodePipeline(entryPipeline, key, value)); + try { + ContainerProtocolCalls.closeContainer(client, entryContainerID, cToken.encodeToUrlString()); + } finally { + xceiverClientManager.releaseClient(client, false); + } + break; + } + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + @Test public void testECReconstructionCoordinatorShouldCleanupContainersOnFailure() throws Exception { @@ -826,7 +870,7 @@ public void testECReconstructionCoordinatorShouldCleanupContainersOnFailure() objectStore.getVolume(volumeName).createBucket(bucketName); OzoneVolume volume = objectStore.getVolume(volumeName); OzoneBucket bucket = volume.getBucket(bucketName); - createKeyAndWriteData(keyString, bucket, 3); + createKeyAndWriteData(keyString, bucket, 3, false); OzoneKeyDetails key = bucket.getKey(keyString); long conID = key.getOzoneKeyLocations().get(0).getContainerID(); @@ -900,6 +944,25 @@ private void closeContainer(long conID) HddsProtos.LifeCycleEvent.CLOSE); } + private void checkBlockDataWithRetry( + org.apache.hadoop.ozone.container.common.helpers.BlockData[] blockData, + org.apache.hadoop.ozone.container.common.helpers.BlockData[] + reconstructedBlockData, boolean triggerRetry) { + if (triggerRetry) { + for (int i = 0; i < reconstructedBlockData.length; i++) { + assertEquals(blockData[i].getBlockID(), reconstructedBlockData[i].getBlockID()); + List oldBlockDataChunks = blockData[i].getChunks(); + List newBlockDataChunks = reconstructedBlockData[i].getChunks(); + for (int j = 0; j < newBlockDataChunks.size(); j++) { + ContainerProtos.ChunkInfo chunkInfo = oldBlockDataChunks.get(j); + assertEquals(chunkInfo, newBlockDataChunks.get(j)); + } + } + return; + } + checkBlockData(blockData, reconstructedBlockData); + } + private void checkBlockData( org.apache.hadoop.ozone.container.common.helpers.BlockData[] blockData, org.apache.hadoop.ozone.container.common.helpers.BlockData[] @@ -967,8 +1030,7 @@ public static void prepareData(int[][] ranges) throws Exception { out.write(values[i]); } } -// List containerIDs = -// new ArrayList<>(scm.getContainerManager().getContainerIDs()); + List containerIDs = scm.getContainerManager().getContainers() .stream()