Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,13 @@

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -142,8 +146,34 @@ ContainerCommandResponseProto> executePutBlock(boolean close,
}

if (checksumBlockData != null) {
List<ChunkInfo> currentChunks = getContainerBlockData().getChunksList();

// For the same BlockGroupLength, we need to find the larger value of Block DataSize.
// This is because we do not send empty chunks to the DataNode, so the larger value is more accurate.
Map<Long, Optional<BlockData>> maxDataSizeByGroup = Arrays.stream(blockData)
.filter(Objects::nonNull)
.collect(Collectors.groupingBy(BlockData::getBlockGroupLength,
Collectors.maxBy(Comparator.comparingLong(BlockData::getSize))));
BlockData maxBlockData = maxDataSizeByGroup.get(blockGroupLength).get();

// When calculating the checksum size,
// We need to consider both blockGroupLength and the actual size of blockData.
//
// We use the smaller value to determine the size of the ChunkList.
//
// 1. In most cases, blockGroupLength is equal to the size of blockData.
// 2. Occasionally, blockData is not fully filled; if a chunk is empty,
// it is not sent to the DN, resulting in blockData size being smaller than blockGroupLength.
// 3. In cases with 'dirty data',
// if an error occurs when writing to the EC-Stripe (e.g., DN reports Container Closed),
// and the length confirmed with OM is smaller, blockGroupLength may be smaller than blockData size.
long blockDataSize = Math.min(maxBlockData.getSize(), blockGroupLength);
int chunkSize = (int) Math.ceil(((double) blockDataSize / repConfig.getEcChunkSize()));
List<ChunkInfo> checksumBlockDataChunks = checksumBlockData.getChunks();
if (chunkSize > 0) {
checksumBlockDataChunks = checksumBlockData.getChunks().subList(0, chunkSize);
}

List<ChunkInfo> currentChunks = getContainerBlockData().getChunksList();

Preconditions.checkArgument(
currentChunks.size() == checksumBlockDataChunks.size(),
Expand Down Expand Up @@ -269,7 +299,7 @@ public CompletableFuture<PutBlockResult> executePutBlock(boolean close,
throw ce;
});
} catch (IOException | ExecutionException e) {
throw new IOException(EXCEPTION_MSG + e.toString(), e);
throw new IOException(EXCEPTION_MSG + e, e);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
handleInterruptedException(ex, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hadoop.hdds.utils.db.Codec;
import org.apache.hadoop.hdds.utils.db.DelegatedCodec;
import org.apache.hadoop.hdds.utils.db.Proto3Codec;
import org.apache.hadoop.ozone.OzoneConsts;

import java.io.IOException;
import java.util.Collections;
Expand Down Expand Up @@ -280,4 +281,14 @@ public void appendTo(StringBuilder sb) {
sb.append(", size=").append(size);
sb.append("]");
}

public long getBlockGroupLength() {
String lenStr = getMetadata()
.get(OzoneConsts.BLOCK_GROUP_LEN_KEY_IN_PUT_BLOCK);
// If we don't have the length, then it indicates a problem with the stripe.
// All replica should carry the length, so if it is not there, we return 0,
// which will cause us to set the length of the block to zero and not
// attempt to reconstruct it.
return (lenStr == null) ? 0 : Long.parseLong(lenStr);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.apache.hadoop.ozone.client.OzoneKeyDetails;
import org.apache.hadoop.ozone.client.OzoneVolume;
import org.apache.hadoop.ozone.client.SecretKeyTestClient;
import org.apache.hadoop.ozone.client.io.BlockOutputStreamEntry;
import org.apache.hadoop.ozone.client.io.InsufficientLocationsException;
import org.apache.hadoop.ozone.client.io.KeyOutputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
Expand All @@ -83,6 +84,7 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import java.io.IOException;
Expand All @@ -99,6 +101,7 @@
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand All @@ -117,6 +120,7 @@
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.params.provider.Arguments.arguments;

/**
* This class tests container commands on EC containers.
Expand Down Expand Up @@ -613,30 +617,33 @@ private static byte[] getBytesWith(int singleDigitNumber, int total) {

@ParameterizedTest
@MethodSource("recoverableMissingIndexes")
void testECReconstructionCoordinatorWith(List<Integer> missingIndexes)
void testECReconstructionCoordinatorWith(List<Integer> missingIndexes, boolean triggerRetry)
throws Exception {
testECReconstructionCoordinator(missingIndexes, 3);
testECReconstructionCoordinator(missingIndexes, 3, triggerRetry);
}

@ParameterizedTest
@MethodSource("recoverableMissingIndexes")
void testECReconstructionCoordinatorWithPartialStripe(List<Integer> missingIndexes)
throws Exception {
testECReconstructionCoordinator(missingIndexes, 1);
void testECReconstructionCoordinatorWithPartialStripe(List<Integer> missingIndexes,
boolean triggerRetry) throws Exception {
testECReconstructionCoordinator(missingIndexes, 1, triggerRetry);
}

@ParameterizedTest
@MethodSource("recoverableMissingIndexes")
void testECReconstructionCoordinatorWithFullAndPartialStripe(List<Integer> missingIndexes)
throws Exception {
testECReconstructionCoordinator(missingIndexes, 4);
void testECReconstructionCoordinatorWithFullAndPartialStripe(List<Integer> missingIndexes,
boolean triggerRetry) throws Exception {
testECReconstructionCoordinator(missingIndexes, 4, triggerRetry);
}

static Stream<List<Integer>> recoverableMissingIndexes() {
return Stream
.concat(IntStream.rangeClosed(1, 5).mapToObj(ImmutableList::of), Stream
.of(ImmutableList.of(2, 3), ImmutableList.of(2, 4),
ImmutableList.of(3, 5), ImmutableList.of(4, 5)));
static Stream<Arguments> recoverableMissingIndexes() {
Stream<Arguments> args = IntStream.rangeClosed(1, 5).mapToObj(i -> arguments(ImmutableList.of(i), true));
Stream<Arguments> args1 = IntStream.rangeClosed(1, 5).mapToObj(i -> arguments(ImmutableList.of(i), false));
Stream<Arguments> args2 = Stream.of(arguments(ImmutableList.of(2, 3), true),
arguments(ImmutableList.of(2, 4), true), arguments(ImmutableList.of(3, 5), true));
Stream<Arguments> args3 = Stream.of(arguments(ImmutableList.of(2, 3), false),
arguments(ImmutableList.of(2, 4), false), arguments(ImmutableList.of(3, 5), false));
return Stream.concat(Stream.concat(args, args1), Stream.concat(args2, args3));
}

/**
Expand All @@ -647,7 +654,7 @@ static Stream<List<Integer>> recoverableMissingIndexes() {
public void testECReconstructionCoordinatorWithMissingIndexes135() {
InsufficientLocationsException exception =
assertThrows(InsufficientLocationsException.class, () -> {
testECReconstructionCoordinator(ImmutableList.of(1, 3, 5), 3);
testECReconstructionCoordinator(ImmutableList.of(1, 3, 5), 3, false);
});

String expectedMessage =
Expand All @@ -658,7 +665,7 @@ public void testECReconstructionCoordinatorWithMissingIndexes135() {
}

private void testECReconstructionCoordinator(List<Integer> missingIndexes,
int numInputChunks) throws Exception {
int numInputChunks, boolean triggerRetry) throws Exception {
ObjectStore objectStore = rpcClient.getObjectStore();
String keyString = UUID.randomUUID().toString();
String volumeName = UUID.randomUUID().toString();
Expand All @@ -667,7 +674,7 @@ private void testECReconstructionCoordinator(List<Integer> missingIndexes,
objectStore.getVolume(volumeName).createBucket(bucketName);
OzoneVolume volume = objectStore.getVolume(volumeName);
OzoneBucket bucket = volume.getBucket(bucketName);
createKeyAndWriteData(keyString, bucket, numInputChunks);
createKeyAndWriteData(keyString, bucket, numInputChunks, triggerRetry);

try (
XceiverClientManager xceiverClientManager =
Expand Down Expand Up @@ -779,7 +786,7 @@ private void testECReconstructionCoordinator(List<Integer> missingIndexes,
.getReplicationConfig(), cToken);
assertEquals(blockDataArrList.get(i).length,
reconstructedBlockData.length);
checkBlockData(blockDataArrList.get(i), reconstructedBlockData);
checkBlockDataWithRetry(blockDataArrList.get(i), reconstructedBlockData, triggerRetry);
XceiverClientSpi client = xceiverClientManager.acquireClient(
newTargetPipeline);
try {
Expand All @@ -800,7 +807,7 @@ private void testECReconstructionCoordinator(List<Integer> missingIndexes,
}

private void createKeyAndWriteData(String keyString, OzoneBucket bucket,
int numChunks) throws IOException {
int numChunks, boolean triggerRetry) throws IOException {
for (int i = 0; i < numChunks; i++) {
inputChunks[i] = getBytesWith(i + 1, EC_CHUNK_SIZE);
}
Expand All @@ -809,11 +816,48 @@ private void createKeyAndWriteData(String keyString, OzoneBucket bucket,
new HashMap<>())) {
assertInstanceOf(KeyOutputStream.class, out.getOutputStream());
for (int i = 0; i < numChunks; i++) {
// We generally wait until the data is written to the last chunk
// before attempting to trigger CloseContainer.
// We use an asynchronous approach for this trigger,
// aiming to ensure that closing the container does not interfere with the write operation.
// However, this process often needs to be executed multiple times before it takes effect.
if (i == numChunks - 1 && triggerRetry) {
triggerRetryByCloseContainer(out);
}
out.write(inputChunks[i]);
}
}
}

private void triggerRetryByCloseContainer(OzoneOutputStream out) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the production environment, we encountered a situation where data was written to a certain DN, and the Container for that DN Closed. I designed the following steps to replicate the scenario we see in production: writing data and Closed the Container concurrently. Since the entire process is asynchronous, we may need to execute it multiple times to reproduce the issue seen in production.

Example

2024-09-11 07:46:20,917 [FixedThreadPoolWithAffinityExecutor-1-0] INFO  container.IncrementalContainerReportHandler (AbstractContainerReportHandler.java:updateContainerState(312)) - Moving container #1 to CLOSED state, datanode 4366cc44-4875-4f4d-8afb-5ac7ed9ba40d(bogon/192.168.1.16) reported CLOSED replica with index 4.
07:46:20.914 [4366cc44-4875-4f4d-8afb-5ac7ed9ba40d-ChunkReader-0] ERROR DNAudit - user=null | ip=null | op=PUT_BLOCK {blockData=[blockId=conID: 1 locID: 113750153625600007 bcsId: 0 replicaIndex: 4, size=2097152]} | ret=FAILURE
java.lang.Exception: Requested operation not allowed as ContainerState is CLOSED
	at org.apache.hadoop.ozone.container.common.impl.HddsDispatcher.dispatchRequest(HddsDispatcher.java:431) ~[classes/:?]
	at org.apache.hadoop.ozone.container.common.impl.HddsDispatcher.lambda$dispatch$0(HddsDispatcher.java:197) ~[classes/:?]
	at org.apache.hadoop.hdds.server.OzoneProtocolMessageDispatcher.processRequest(OzoneProtocolMessageDispatcher.java:89) [classes/:?]
	at org.apache.hadoop.ozone.container.common.impl.HddsDispatcher.dispatch(HddsDispatcher.java:196) [classes/:?]
	at org.apache.hadoop.ozone.container.common.transport.server.GrpcXceiverService$1.onNext(GrpcXceiverService.java:112) [classes/:?]
	at org.apache.hadoop.ozone.container.common.transport.server.GrpcXceiverService$1.onNext(GrpcXceiverService.java:105) [classes/:?]
	at org.apache.ratis.thirdparty.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:262) [ratis-thirdparty-misc-1.0.6.jar:1.0.6]
	at org.apache.ratis.thirdparty.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33) [ratis-thirdparty-misc-1.0.6.jar:1.0.6]
	at org.apache.hadoop.hdds.tracing.GrpcServerInterceptor$1.onMessage(GrpcServerInterceptor.java:49) [classes/:?]
	at org.apache.ratis.thirdparty.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:329) [ratis-thirdparty-misc-1.0.6.jar:1.0.6]
	at org.apache.ratis.thirdparty.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:314) [ratis-thirdparty-misc-1.0.6.jar:1.0.6]
	at org.apache.ratis.thirdparty.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:833) [ratis-thirdparty-misc-1.0.6.jar:1.0.6]
	at org.apache.ratis.thirdparty.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) [ratis-thirdparty-misc-1.0.6.jar:1.0.6]
	at org.apache.ratis.thirdparty.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133) [ratis-thirdparty-misc-1.0.6.jar:1.0.6]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_412]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_412]
	at java.lang.Thread.run(Thread.java:750) [?:1.8.0_412]
2024-09-11 07:46:20,929 [client-write-TID-0] WARN  io.KeyOutputStream (ECKeyOutputStream.java:logStreamError(200)) - Put block failed: S S S F S
2024-09-11 07:46:20,929 [client-write-TID-0] WARN  io.KeyOutputStream (ECKeyOutputStream.java:logStreamError(202)) - Failure for replica index: 4, DatanodeDetails: 4366cc44-4875-4f4d-8afb-5ac7ed9ba40d(bogon/192.168.1.16)
java.io.IOException: Unexpected Storage Container Exception: org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException: Requested operation not allowed as ContainerState is CLOSED
	at org.apache.hadoop.hdds.scm.storage.BlockOutputStream.setIoException(BlockOutputStream.java:815)
	at org.apache.hadoop.hdds.scm.storage.ECBlockOutputStream.validateResponse(ECBlockOutputStream.java:351)
	at org.apache.hadoop.hdds.scm.storage.ECBlockOutputStream.lambda$executePutBlock$1(ECBlockOutputStream.java:280)
	at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
	at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
	at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException: Requested operation not allowed as ContainerState is CLOSED
	at org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls.validateContainerResponse(ContainerProtocolCalls.java:787)
	at org.apache.hadoop.hdds.scm.storage.ECBlockOutputStream.validateResponse(ECBlockOutputStream.java:349)
	... 7 more

Reconstruction Result

We can see that during the recovery process, the BlockGroupLength differs across different DNs.

2024-09-11 07:46:21,375 [main] INFO  reconstruction.ECReconstructionCoordinator (ECReconstructionCoordinator.java:logBlockGroupDetails(356)) - Block group details for conID: 1 locID: 113750153625600007 bcsId: 0 replicaIndex: null. Replication Config EC{rs-3-2-1024k}. Calculated safe length: 3145728. 
2024-09-11 07:46:21,375 [main] INFO  reconstruction.ECReconstructionCoordinator (ECReconstructionCoordinator.java:logBlockGroupDetails(387)) - Block Data for: conID: 1 locID: 113750153625600007 bcsId: 0 replicaIndex: 2 replica Index: 2 block length: 1048576 block group length: 4194304 chunk list: 
  chunkNum: 1 length: 1048576 offset: 0
2024-09-11 07:46:21,375 [main] INFO  reconstruction.ECReconstructionCoordinator (ECReconstructionCoordinator.java:logBlockGroupDetails(387)) - Block Data for: conID: 1 locID: 113750153625600007 bcsId: 0 replicaIndex: 3 replica Index: 3 block length: 1048576 block group length: 4194304 chunk list: 
  chunkNum: 1 length: 1048576 offset: 0
2024-09-11 07:46:21,375 [main] INFO  reconstruction.ECReconstructionCoordinator (ECReconstructionCoordinator.java:logBlockGroupDetails(387)) - Block Data for: conID: 1 locID: 113750153625600007 bcsId: 0 replicaIndex: 4 replica Index: 4 block length: 1048576 block group length: 3145728 chunk list: 
  chunkNum: 1 length: 1048576 offset: 0
2024-09-11 07:46:21,375 [main] INFO  reconstruction.ECReconstructionCoordinator (ECReconstructionCoordinator.java:logBlockGroupDetails(387)) - Block Data for: conID: 1 locID: 113750153625600007 bcsId: 0 replicaIndex: 5 replica Index: 5 block length: 2097152 block group length: 4194304 chunk list: 
  chunkNum: 1 length: 1048576 offset: 0
  chunkNum: 2 length: 1048576 offset: 1048576

CompletableFuture.runAsync(() -> {
BlockOutputStreamEntry blockOutputStreamEntry = out.getKeyOutputStream().getStreamEntries().get(0);
BlockID entryBlockID = blockOutputStreamEntry.getBlockID();
long entryContainerID = entryBlockID.getContainerID();
Pipeline entryPipeline = blockOutputStreamEntry.getPipeline();
Map<DatanodeDetails, Integer> replicaIndexes = entryPipeline.getReplicaIndexes();
try {
for (Map.Entry<DatanodeDetails, Integer> entry : replicaIndexes.entrySet()) {
DatanodeDetails key = entry.getKey();
Integer value = entry.getValue();
XceiverClientManager xceiverClientManager = new XceiverClientManager(config);
Token<ContainerTokenIdentifier> cToken = containerTokenGenerator
.generateToken(ANY_USER, ContainerID.valueOf(entryContainerID));
XceiverClientSpi client = xceiverClientManager.acquireClient(
createSingleNodePipeline(entryPipeline, key, value));
try {
ContainerProtocolCalls.closeContainer(client, entryContainerID, cToken.encodeToUrlString());
} finally {
xceiverClientManager.releaseClient(client, false);
}
break;
}
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}

@Test
public void testECReconstructionCoordinatorShouldCleanupContainersOnFailure()
throws Exception {
Expand All @@ -826,7 +870,7 @@ public void testECReconstructionCoordinatorShouldCleanupContainersOnFailure()
objectStore.getVolume(volumeName).createBucket(bucketName);
OzoneVolume volume = objectStore.getVolume(volumeName);
OzoneBucket bucket = volume.getBucket(bucketName);
createKeyAndWriteData(keyString, bucket, 3);
createKeyAndWriteData(keyString, bucket, 3, false);

OzoneKeyDetails key = bucket.getKey(keyString);
long conID = key.getOzoneKeyLocations().get(0).getContainerID();
Expand Down Expand Up @@ -900,6 +944,25 @@ private void closeContainer(long conID)
HddsProtos.LifeCycleEvent.CLOSE);
}

private void checkBlockDataWithRetry(
org.apache.hadoop.ozone.container.common.helpers.BlockData[] blockData,
org.apache.hadoop.ozone.container.common.helpers.BlockData[]
reconstructedBlockData, boolean triggerRetry) {
if (triggerRetry) {
for (int i = 0; i < reconstructedBlockData.length; i++) {
assertEquals(blockData[i].getBlockID(), reconstructedBlockData[i].getBlockID());
List<ContainerProtos.ChunkInfo> oldBlockDataChunks = blockData[i].getChunks();
List<ContainerProtos.ChunkInfo> newBlockDataChunks = reconstructedBlockData[i].getChunks();
for (int j = 0; j < newBlockDataChunks.size(); j++) {
ContainerProtos.ChunkInfo chunkInfo = oldBlockDataChunks.get(j);
assertEquals(chunkInfo, newBlockDataChunks.get(j));
}
}
return;
}
checkBlockData(blockData, reconstructedBlockData);
}

private void checkBlockData(
org.apache.hadoop.ozone.container.common.helpers.BlockData[] blockData,
org.apache.hadoop.ozone.container.common.helpers.BlockData[]
Expand Down Expand Up @@ -967,8 +1030,7 @@ public static void prepareData(int[][] ranges) throws Exception {
out.write(values[i]);
}
}
// List<ContainerID> containerIDs =
// new ArrayList<>(scm.getContainerManager().getContainerIDs());

List<ContainerID> containerIDs =
scm.getContainerManager().getContainers()
.stream()
Expand Down