diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java index 116e7676673..b91e7f37175 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java @@ -153,6 +153,11 @@ void setLeaderId(UUID leaderId) { this.leaderId = leaderId; } + /** @return the number of datanodes in this pipeline. */ + public int size() { + return nodeStatus.size(); + } + /** * Returns the list of nodes which form this pipeline. * @@ -216,18 +221,46 @@ public DatanodeDetails getLeaderNode() throws IOException { } public DatanodeDetails getFirstNode() throws IOException { + return getFirstNode(null); + } + + public DatanodeDetails getFirstNode(Set excluded) + throws IOException { + if (excluded == null) { + excluded = Collections.emptySet(); + } if (nodeStatus.isEmpty()) { throw new IOException(String.format("Pipeline=%s is empty", id)); } - return nodeStatus.keySet().iterator().next(); + for (DatanodeDetails d : nodeStatus.keySet()) { + if (!excluded.contains(d)) { + return d; + } + } + throw new IOException(String.format( + "All nodes are excluded: Pipeline=%s, excluded=%s", id, excluded)); } public DatanodeDetails getClosestNode() throws IOException { + return getClosestNode(null); + } + + public DatanodeDetails getClosestNode(Set excluded) + throws IOException { + if (excluded == null) { + excluded = Collections.emptySet(); + } if (nodesInOrder.get() == null || nodesInOrder.get().isEmpty()) { LOG.debug("Nodes in order is empty, delegate to getFirstNode"); - return getFirstNode(); + return getFirstNode(excluded); + } + for (DatanodeDetails d : nodesInOrder.get()) { + if (!excluded.contains(d)) { + return d; + } } - return nodesInOrder.get().get(0); + throw new IOException(String.format( + "All nodes are excluded: Pipeline=%s, excluded=%s", id, excluded)); } public boolean isClosed() { diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java index b921d4c8976..bfe3ebceaa7 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java @@ -21,8 +21,10 @@ import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ExecutionException; import org.apache.hadoop.hdds.annotation.InterfaceStability; @@ -47,6 +49,7 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.PutSmallFileRequestProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.PutSmallFileResponseProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadChunkRequestProto; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadChunkResponseProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadContainerRequestProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadContainerResponseProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type; @@ -64,12 +67,16 @@ import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Implementation of all container protocol calls performed by Container * clients. */ public final class ContainerProtocolCalls { + private static final Logger LOG = + LoggerFactory.getLogger(ContainerProtocolCalls.class); /** * There is no need to instantiate this class. @@ -278,7 +285,32 @@ public static ContainerProtos.ReadChunkResponseProto readChunk( .setBlockID(blockID.getDatanodeBlockIDProtobuf()) .setChunkData(chunk) .setReadChunkVersion(ContainerProtos.ReadChunkVersion.V1); - String id = xceiverClient.getPipeline().getClosestNode().getUuidString(); + final Pipeline pipeline = xceiverClient.getPipeline(); + final Set excluded = new HashSet<>(); + for (; ;) { + final DatanodeDetails d = pipeline.getClosestNode(excluded); + + try { + return readChunk(xceiverClient, chunk, blockID, + validators, token, readChunkRequest, d); + } catch (IOException e) { + excluded.add(d); + if (excluded.size() < pipeline.size()) { + LOG.warn(toErrorMessage(chunk, blockID, d), e); + } else { + throw e; + } + } + } + } + + private static ContainerProtos.ReadChunkResponseProto readChunk( + XceiverClientSpi xceiverClient, ChunkInfo chunk, BlockID blockID, + List validators, + Token token, + ReadChunkRequestProto.Builder readChunkRequest, + DatanodeDetails d) throws IOException { + final String id = d.getUuidString(); ContainerCommandRequestProto.Builder builder = ContainerCommandRequestProto.newBuilder().setCmdType(Type.ReadChunk) .setContainerID(blockID.getContainerID()) @@ -289,7 +321,30 @@ public static ContainerProtos.ReadChunkResponseProto readChunk( ContainerCommandRequestProto request = builder.build(); ContainerCommandResponseProto reply = xceiverClient.sendCommand(request, validators); - return reply.getReadChunk(); + final ReadChunkResponseProto response = reply.getReadChunk(); + final long readLen = getLen(response); + if (readLen != chunk.getLen()) { + throw new IOException(toErrorMessage(chunk, blockID, d) + + ": readLen=" + readLen); + } + return response; + } + + static String toErrorMessage(ChunkInfo chunk, BlockID blockId, + DatanodeDetails d) { + return String.format("Failed to read chunk %s (len=%s) %s from %s", + chunk.getChunkName(), chunk.getLen(), blockId, d); + } + + static long getLen(ReadChunkResponseProto response) { + if (response.hasData()) { + return response.getData().size(); + } else if (response.hasDataBuffers()) { + return response.getDataBuffers() .getBuffersList().stream() + .mapToLong(ByteString::size).sum(); + } else { + return -1; + } } /** diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java index 00970788eaf..e94074e827d 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java @@ -452,8 +452,11 @@ public void hflush() throws IOException { @Override public void hsync() throws IOException { checkNotClosed(); + final long hsyncPos = writeOffset; handleFlushOrClose(StreamAction.HSYNC); - blockOutputStreamEntryPool.hsyncKey(offset); + Preconditions.checkState(offset >= hsyncPos, + "offset = %s < hsyncPos = %s", offset, hsyncPos); + blockOutputStreamEntryPool.hsyncKey(hsyncPos); } /** diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java index 3e5ba5aa32f..b991ac185cf 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java @@ -46,6 +46,8 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import static org.apache.hadoop.ozone.OzoneConsts.OZONE_OFS_URI_SCHEME; import static org.apache.hadoop.ozone.OzoneConsts.OZONE_ROOT; @@ -63,6 +65,8 @@ */ @Timeout(value = 300) public class TestHSync { + private static final Logger LOG = + LoggerFactory.getLogger(TestHSync.class); private static MiniOzoneCluster cluster; private static OzoneBucket bucket; @@ -112,10 +116,11 @@ public void testO3fsHSync() throws Exception { OZONE_URI_SCHEME, bucket.getName(), bucket.getVolumeName()); CONF.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath); - final Path file = new Path("/file"); - try (FileSystem fs = FileSystem.get(CONF)) { - runTestHSync(fs, file); + for (int i = 0; i < 10; i++) { + final Path file = new Path("/file" + i); + runTestHSync(fs, file, 1 << i); + } } } @@ -129,17 +134,20 @@ public void testOfsHSync() throws Exception { final String dir = OZONE_ROOT + bucket.getVolumeName() + OZONE_URI_DELIMITER + bucket.getName(); - final Path file = new Path(dir, "file"); try (FileSystem fs = FileSystem.get(CONF)) { - runTestHSync(fs, file); + for (int i = 0; i < 10; i++) { + final Path file = new Path(dir, "file" + i); + runTestHSync(fs, file, 1 << i); + } } } - static void runTestHSync(FileSystem fs, Path file) throws Exception { + static void runTestHSync(FileSystem fs, Path file, int initialDataSize) + throws Exception { try (StreamWithLength out = new StreamWithLength( fs.create(file, true))) { - runTestHSync(fs, file, out, 1); + runTestHSync(fs, file, out, initialDataSize); for (int i = 1; i < 5; i++) { for (int j = -1; j <= 1; j++) { int dataSize = (1 << (i * 5)) + j; @@ -177,6 +185,8 @@ static void runTestHSync(FileSystem fs, Path file, StreamWithLength out, int dataSize) throws Exception { final long length = out.getLength(); + LOG.info("runTestHSync {} with size {}, skipLength={}", + file, dataSize, length); final byte[] data = new byte[dataSize]; ThreadLocalRandom.current().nextBytes(data); out.writeAndHsync(data); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientGrpc.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientGrpc.java index d42f29b89e0..ca4bb9e20e2 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientGrpc.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientGrpc.java @@ -173,8 +173,8 @@ private void invokeXceiverClientReadChunk(XceiverClientSpi client) .setBytesPerChecksum(512) .setType(ContainerProtos.ChecksumType.CRC32) .build()) - .setLen(100) - .setOffset(100) + .setLen(-1) + .setOffset(0) .build(), bid, null, null);