diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java index 5c44f31c0d67..3769f35e5301 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.client.ReplicationConfig; import org.apache.hadoop.hdds.client.StandaloneReplicationConfig; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.DatanodeBlockID; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.GetBlockResponseProto; @@ -232,7 +233,6 @@ protected List getChunkInfos() throws IOException { .build(); } acquireClient(); - List chunks; try { if (LOG.isDebugEnabled()) { LOG.debug("Initializing BlockInputStream for get key to access {}", @@ -249,14 +249,32 @@ protected List getChunkInfos() throws IOException { blkIDBuilder.setReplicaIndex(replicaIndex); } GetBlockResponseProto response = ContainerProtocolCalls - .getBlock(xceiverClient, blkIDBuilder.build(), token); + .getBlock(xceiverClient, VALIDATORS, blkIDBuilder.build(), token); - chunks = response.getBlockData().getChunksList(); + return response.getBlockData().getChunksList(); } finally { releaseClient(); } + } + + private static final List VALIDATORS + = ContainerProtocolCalls.toValidatorList( + (request, response) -> validate(response)); - return chunks; + static void validate(ContainerCommandResponseProto response) + throws IOException { + if (!response.hasGetBlock()) { + throw new IllegalArgumentException("Not GetBlock: response=" + response); + } + final GetBlockResponseProto b = response.getGetBlock(); + final List chunks = b.getBlockData().getChunksList(); + for (int i = 0; i < chunks.size(); i++) { + final ChunkInfo c = chunks.get(i); + if (c.getLen() <= 0) { + throw new IOException("Failed to get chunkInfo[" + + i + "]: len == " + c.getLen()); + } + } } protected void acquireClient() throws IOException { diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java index e30df34b85d9..97ff33915ae7 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java @@ -425,8 +425,7 @@ protected ByteBuffer[] readChunk(ChunkInfo readChunkInfo) ReadChunkResponseProto readChunkResponse; List validators = - ContainerProtocolCalls.getValidatorList(); - validators.add(validator); + ContainerProtocolCalls.toValidatorList(validator); readChunkResponse = ContainerProtocolCalls.readChunk(xceiverClient, readChunkInfo, blockID, validators, token); diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java index bfe3ebceaa75..d3c4de22f359 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java @@ -20,12 +20,14 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutionException; +import java.util.function.Function; import org.apache.hadoop.hdds.annotation.InterfaceStability; import org.apache.hadoop.hdds.client.BlockID; @@ -67,6 +69,7 @@ import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; +import org.apache.ratis.util.function.CheckedFunction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -128,36 +131,78 @@ public static ListBlockResponseProto listBlock(XceiverClientSpi xceiverClient, return response.getListBlock(); } + static T tryEachDatanode(Pipeline pipeline, + CheckedFunction op, + Function toErrorMessage) + throws IOException { + final Set excluded = new HashSet<>(); + for (; ;) { + final DatanodeDetails d = pipeline.getClosestNode(excluded); + + try { + return op.apply(d); + } catch (IOException e) { + excluded.add(d); + if (excluded.size() < pipeline.size()) { + LOG.warn(toErrorMessage.apply(d) + + "; will try another datanode.", e); + } else { + throw e; + } + } + } + } + /** * Calls the container protocol to get a container block. * * @param xceiverClient client to perform call + * @param validators functions to validate the response * @param datanodeBlockID blockID to identify container * @param token a token for this block (may be null) * @return container protocol get block response * @throws IOException if there is an I/O error while performing the call */ public static GetBlockResponseProto getBlock(XceiverClientSpi xceiverClient, + List validators, DatanodeBlockID datanodeBlockID, Token token) throws IOException { GetBlockRequestProto.Builder readBlockRequest = GetBlockRequestProto .newBuilder() .setBlockID(datanodeBlockID); - String id = xceiverClient.getPipeline().getFirstNode().getUuidString(); - ContainerCommandRequestProto.Builder builder = ContainerCommandRequestProto .newBuilder() .setCmdType(Type.GetBlock) .setContainerID(datanodeBlockID.getContainerID()) - .setDatanodeUuid(id) .setGetBlock(readBlockRequest); if (token != null) { builder.setEncodedToken(token.encodeToUrlString()); } - ContainerCommandRequestProto request = builder.build(); + return tryEachDatanode(xceiverClient.getPipeline(), + d -> getBlock(xceiverClient, validators, builder, d), + d -> toErrorMessage(datanodeBlockID, d)); + } + + static String toErrorMessage(DatanodeBlockID blockId, DatanodeDetails d) { + return String.format("Failed to get block #%s in container #%s from %s", + blockId.getLocalID(), blockId.getContainerID(), d); + } + + public static GetBlockResponseProto getBlock(XceiverClientSpi xceiverClient, + DatanodeBlockID datanodeBlockID, + Token token) throws IOException { + return getBlock(xceiverClient, getValidatorList(), datanodeBlockID, token); + } + + private static GetBlockResponseProto getBlock(XceiverClientSpi xceiverClient, + List validators, + ContainerCommandRequestProto.Builder builder, + DatanodeDetails datanode) throws IOException { + final ContainerCommandRequestProto request = builder + .setDatanodeUuid(datanode.getUuidString()).build(); ContainerCommandResponseProto response = - xceiverClient.sendCommand(request, getValidatorList()); + xceiverClient.sendCommand(request, validators); return response.getGetBlock(); } @@ -285,40 +330,27 @@ public static ContainerProtos.ReadChunkResponseProto readChunk( .setBlockID(blockID.getDatanodeBlockIDProtobuf()) .setChunkData(chunk) .setReadChunkVersion(ContainerProtos.ReadChunkVersion.V1); - final Pipeline pipeline = xceiverClient.getPipeline(); - final Set excluded = new HashSet<>(); - for (; ;) { - final DatanodeDetails d = pipeline.getClosestNode(excluded); - - try { - return readChunk(xceiverClient, chunk, blockID, - validators, token, readChunkRequest, d); - } catch (IOException e) { - excluded.add(d); - if (excluded.size() < pipeline.size()) { - LOG.warn(toErrorMessage(chunk, blockID, d), e); - } else { - throw e; - } - } + ContainerCommandRequestProto.Builder builder = + ContainerCommandRequestProto.newBuilder().setCmdType(Type.ReadChunk) + .setContainerID(blockID.getContainerID()) + .setReadChunk(readChunkRequest); + if (token != null) { + builder.setEncodedToken(token.encodeToUrlString()); } + + return tryEachDatanode(xceiverClient.getPipeline(), + d -> readChunk(xceiverClient, chunk, blockID, + validators, builder, d), + d -> toErrorMessage(chunk, blockID, d)); } private static ContainerProtos.ReadChunkResponseProto readChunk( XceiverClientSpi xceiverClient, ChunkInfo chunk, BlockID blockID, List validators, - Token token, - ReadChunkRequestProto.Builder readChunkRequest, + ContainerCommandRequestProto.Builder builder, DatanodeDetails d) throws IOException { - final String id = d.getUuidString(); - ContainerCommandRequestProto.Builder builder = - ContainerCommandRequestProto.newBuilder().setCmdType(Type.ReadChunk) - .setContainerID(blockID.getContainerID()) - .setDatanodeUuid(id).setReadChunk(readChunkRequest); - if (token != null) { - builder.setEncodedToken(token.encodeToUrlString()); - } - ContainerCommandRequestProto request = builder.build(); + final ContainerCommandRequestProto request = builder + .setDatanodeUuid(d.getUuidString()).build(); ContainerCommandResponseProto reply = xceiverClient.sendCommand(request, validators); final ReadChunkResponseProto response = reply.getReadChunk(); @@ -687,12 +719,28 @@ public static void validateContainerResponse( } public static List getValidatorList() { - List validators = new ArrayList<>(1); + return VALIDATORS; + } + + private static final List VALIDATORS = createValidators(); + + private static List createValidators() { CheckedBiFunction validator = (request, response) -> validateContainerResponse(response); + return Collections.singletonList(validator); + } + + public static List toValidatorList( + CheckedBiFunction validator) { + final List defaults + = ContainerProtocolCalls.getValidatorList(); + final List validators + = new ArrayList<>(defaults.size() + 1); + validators.addAll(defaults); validators.add(validator); - return validators; + return Collections.unmodifiableList(validators); } public static HashMap diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java index 11fe92e5ed42..3751cf665fb5 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java @@ -117,7 +117,7 @@ public long getFlushCheckpoint() { } private ECKeyOutputStream(Builder builder) { - super(builder.getClientMetrics()); + super(builder.getReplicationConfig(), builder.getClientMetrics()); this.config = builder.getClientConfig(); this.bufferPool = builder.getByteBufferPool(); // For EC, cell/chunk size and buffer size can be same for now. diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java index e94074e827d2..1425ab837bd0 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java @@ -30,6 +30,7 @@ import org.apache.hadoop.fs.Syncable; import org.apache.hadoop.hdds.client.ReplicationConfig; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; import org.apache.hadoop.hdds.scm.ContainerClientMetrics; import org.apache.hadoop.hdds.scm.OzoneClientConfig; import org.apache.hadoop.hdds.scm.XceiverClientFactory; @@ -66,6 +67,7 @@ public class KeyOutputStream extends OutputStream implements Syncable { private OzoneClientConfig config; + private final ReplicationConfig replication; /** * Defines stream action while calling handleFlushOrClose. @@ -93,7 +95,9 @@ enum StreamAction { private OzoneManagerProtocol omClient; - public KeyOutputStream(ContainerClientMetrics clientMetrics) { + public KeyOutputStream(ReplicationConfig replicationConfig, + ContainerClientMetrics clientMetrics) { + this.replication = replicationConfig; closed = false; this.retryPolicyMap = HddsClientUtils.getExceptionList() .stream() @@ -141,6 +145,7 @@ public KeyOutputStream( ContainerClientMetrics clientMetrics ) { this.config = config; + this.replication = replicationConfig; blockOutputStreamEntryPool = new BlockOutputStreamEntryPool( config, @@ -451,6 +456,14 @@ public void hflush() throws IOException { @Override public void hsync() throws IOException { + if (replication.getReplicationType() != ReplicationType.RATIS) { + throw new UnsupportedOperationException( + "Replication type is not " + ReplicationType.RATIS); + } + if (replication.getRequiredNodes() <= 1) { + throw new UnsupportedOperationException("The replication factor = " + + replication.getRequiredNodes() + " <= 1"); + } checkNotClosed(); final long hsyncPos = writeOffset; handleFlushOrClose(StreamAction.HSYNC); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientGrpc.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientGrpc.java index ca4bb9e20e23..baa7a49e2904 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientGrpc.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientGrpc.java @@ -92,7 +92,7 @@ public void testRandomFirstNodeIsCommandTarget() throws IOException { // the DNs on each call with a new client. This test will timeout if this // is not happening. while (allDNs.size() > 0) { - XceiverClientGrpc client = new XceiverClientGrpc(pipeline, conf) { + try (XceiverClientGrpc client = new XceiverClientGrpc(pipeline, conf) { @Override public XceiverClientReply sendCommandAsync( ContainerProtos.ContainerCommandRequestProto request, @@ -100,9 +100,52 @@ public XceiverClientReply sendCommandAsync( allDNs.remove(dn); return buildValidResponse(); } - }; + }) { + invokeXceiverClientGetBlock(client); + } + } + } + + @Test + @Timeout(5) + public void testGetBlockRetryAlNodes() { + final ArrayList allDNs = new ArrayList<>(dns); + Assertions.assertTrue(allDNs.size() > 1); + try (XceiverClientGrpc client = new XceiverClientGrpc(pipeline, conf) { + @Override + public XceiverClientReply sendCommandAsync( + ContainerProtos.ContainerCommandRequestProto request, + DatanodeDetails dn) throws IOException { + allDNs.remove(dn); + throw new IOException("Failed " + dn); + } + }) { invokeXceiverClientGetBlock(client); + } catch (IOException e) { + e.printStackTrace(); } + Assertions.assertEquals(0, allDNs.size()); + } + + @Test + @Timeout(5) + public void testReadChunkRetryAllNodes() { + final ArrayList allDNs = new ArrayList<>(dns); + Assertions.assertTrue(allDNs.size() > 1); + try (XceiverClientGrpc client = new XceiverClientGrpc(pipeline, conf) { + @Override + public XceiverClientReply sendCommandAsync( + ContainerProtos.ContainerCommandRequestProto request, + DatanodeDetails dn) throws IOException { + allDNs.remove(dn); + throw new IOException("Failed " + dn); + } + }) { + invokeXceiverClientReadChunk(client); + } catch (IOException e) { + e.printStackTrace(); + } + Assertions.assertEquals(0, allDNs.size()); } @Test @@ -115,7 +158,7 @@ public void testFirstNodeIsCorrectWithTopologyForCommandTarget() // each time. The logic should always use the sorted node, so we can check // only a single DN is ever seen after 100 calls. for (int i = 0; i < 100; i++) { - XceiverClientGrpc client = new XceiverClientGrpc(pipeline, conf) { + try (XceiverClientGrpc client = new XceiverClientGrpc(pipeline, conf) { @Override public XceiverClientReply sendCommandAsync( ContainerProtos.ContainerCommandRequestProto request, @@ -123,8 +166,9 @@ public XceiverClientReply sendCommandAsync( seenDNs.add(dn); return buildValidResponse(); } - }; - invokeXceiverClientGetBlock(client); + }) { + invokeXceiverClientGetBlock(client); + } } Assertions.assertEquals(1, seenDNs.size()); } @@ -135,7 +179,7 @@ public void testConnectionReusedAfterGetBlock() throws IOException { // DN is seen, indicating the same DN connection is reused. for (int i = 0; i < 100; i++) { final Set seenDNs = new HashSet<>(); - XceiverClientGrpc client = new XceiverClientGrpc(pipeline, conf) { + try (XceiverClientGrpc client = new XceiverClientGrpc(pipeline, conf) { @Override public XceiverClientReply sendCommandAsync( ContainerProtos.ContainerCommandRequestProto request, @@ -143,11 +187,12 @@ public XceiverClientReply sendCommandAsync( seenDNs.add(dn); return buildValidResponse(); } - }; - invokeXceiverClientGetBlock(client); - invokeXceiverClientGetBlock(client); - invokeXceiverClientReadChunk(client); - invokeXceiverClientReadSmallFile(client); + }) { + invokeXceiverClientGetBlock(client); + invokeXceiverClientGetBlock(client); + invokeXceiverClientReadChunk(client); + invokeXceiverClientReadSmallFile(client); + } Assertions.assertEquals(1, seenDNs.size()); } }