diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java index 385ea6d0c3ea..a12f9067ce2d 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.client.StandaloneReplicationConfig; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.BlockData; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.DatanodeBlockID; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.GetBlockResponseProto; @@ -66,7 +67,8 @@ public class BlockInputStream extends BlockExtendedInputStream { LoggerFactory.getLogger(BlockInputStream.class); private final BlockID blockID; - private final long length; + private long length; + private final BlockLocationInfo blockInfo; private final AtomicReference pipelineRef = new AtomicReference<>(); private final AtomicReference> tokenRef = @@ -111,12 +113,13 @@ public class BlockInputStream extends BlockExtendedInputStream { private final Function refreshFunction; - public BlockInputStream(BlockID blockId, long blockLen, Pipeline pipeline, + public BlockInputStream(BlockLocationInfo blockInfo, Pipeline pipeline, Token token, boolean verifyChecksum, XceiverClientFactory xceiverClientFactory, Function refreshFunction) { - this.blockID = blockId; - this.length = blockLen; + this.blockInfo = blockInfo; + this.blockID = blockInfo.getBlockID(); + this.length = blockInfo.getLength(); setPipeline(pipeline); tokenRef.set(token); this.verifyChecksum = verifyChecksum; @@ -124,14 +127,16 @@ public BlockInputStream(BlockID blockId, long blockLen, Pipeline pipeline, this.refreshFunction = refreshFunction; } + // only for unit tests public BlockInputStream(BlockID blockId, long blockLen, Pipeline pipeline, Token token, boolean verifyChecksum, - XceiverClientFactory xceiverClientFactory - ) { - this(blockId, blockLen, pipeline, token, verifyChecksum, + XceiverClientFactory xceiverClientFactory) { + this(new BlockLocationInfo(new BlockLocationInfo.Builder().setBlockID(blockId).setLength(blockLen)), + pipeline, token, verifyChecksum, xceiverClientFactory, null); } + /** * Initialize the BlockInputStream. Get the BlockData (list of chunks) from * the Container and create the ChunkInputStreams for each Chunk in the Block. @@ -143,11 +148,17 @@ public synchronized void initialize() throws IOException { return; } + BlockData blockData = null; List chunks = null; IOException catchEx = null; do { try { - chunks = getChunkInfoList(); + blockData = getBlockData(); + chunks = blockData.getChunksList(); + if (blockInfo != null && blockInfo.isUnderConstruction()) { + // use the block length from DN if block is under construction. + length = blockData.getSize(); + } break; // If we get a StorageContainerException or an IOException due to // datanodes are not reachable, refresh to get the latest pipeline @@ -226,19 +237,22 @@ private void refreshBlockInfo(IOException cause) throws IOException { /** * Send RPC call to get the block info from the container. - * @return List of chunks in this block. + * @return BlockData. */ - protected List getChunkInfoList() throws IOException { + protected BlockData getBlockData() throws IOException { acquireClient(); try { - return getChunkInfoListUsingClient(); + return getBlockDataUsingClient(); } finally { releaseClient(); } } - @VisibleForTesting - protected List getChunkInfoListUsingClient() throws IOException { + /** + * Send RPC call to get the block info from the container. + * @return BlockData. + */ + protected BlockData getBlockDataUsingClient() throws IOException { final Pipeline pipeline = xceiverClient.getPipeline(); if (LOG.isDebugEnabled()) { @@ -258,8 +272,7 @@ protected List getChunkInfoListUsingClient() throws IOException { GetBlockResponseProto response = ContainerProtocolCalls.getBlock( xceiverClient, VALIDATORS, blkIDBuilder.build(), tokenRef.get()); - - return response.getBlockData().getChunksList(); + return response.getBlockData(); } private void setPipeline(Pipeline pipeline) { diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java index 40063f9ce492..b9233f42d555 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java @@ -84,8 +84,8 @@ public BlockExtendedInputStream create(ReplicationConfig repConfig, blockInfo, verifyChecksum, xceiverFactory, refreshFunction, ecBlockStreamFactory); } else { - return new BlockInputStream(blockInfo.getBlockID(), blockInfo.getLength(), - pipeline, token, verifyChecksum, xceiverFactory, refreshFunction); + return new BlockInputStream(blockInfo, pipeline, token, verifyChecksum, xceiverFactory, + refreshFunction); } } diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStream.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStream.java index 3e7779f0d10a..ca3199d8acfb 100644 --- a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStream.java +++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStream.java @@ -23,6 +23,7 @@ import java.util.function.Function; import org.apache.hadoop.hdds.client.BlockID; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo; import org.apache.hadoop.hdds.scm.XceiverClientFactory; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; @@ -49,7 +50,8 @@ class DummyBlockInputStream extends BlockInputStream { Function refreshFunction, List chunkList, Map chunks) { - super(blockId, blockLen, pipeline, token, verifyChecksum, + super(new BlockLocationInfo(new BlockLocationInfo.Builder().setBlockID(blockId).setLength(blockLen)), + pipeline, token, verifyChecksum, xceiverClientManager, refreshFunction); this.chunkDataMap = chunks; this.chunks = chunkList; @@ -57,8 +59,10 @@ class DummyBlockInputStream extends BlockInputStream { } @Override - protected List getChunkInfoList() throws IOException { - return chunks; + protected ContainerProtos.BlockData getBlockData() throws IOException { + BlockID blockID = getBlockID(); + ContainerProtos.DatanodeBlockID datanodeBlockID = blockID.getDatanodeBlockIDProtobuf(); + return ContainerProtos.BlockData.newBuilder().addAllChunks(chunks).setBlockID(datanodeBlockID).build(); } @Override diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStreamWithRetry.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStreamWithRetry.java index 24a35745144d..d66c76dcddcb 100644 --- a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStreamWithRetry.java +++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStreamWithRetry.java @@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.hdds.client.BlockID; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo; import org.apache.hadoop.hdds.scm.XceiverClientFactory; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; @@ -73,16 +74,16 @@ final class DummyBlockInputStreamWithRetry } @Override - protected List getChunkInfoList() throws IOException { + protected ContainerProtos.BlockData getBlockData() throws IOException { if (getChunkInfoCount == 0) { getChunkInfoCount++; if (ioException != null) { - throw ioException; + throw ioException; } throw new StorageContainerException("Exception encountered", CONTAINER_NOT_FOUND); } else { - return super.getChunkInfoList(); + return super.getBlockData(); } } } diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java index 3dc5a82b3355..9d1feafb9a48 100644 --- a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java +++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java @@ -22,6 +22,7 @@ import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.client.ContainerBlockID; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo; import org.apache.hadoop.hdds.scm.XceiverClientFactory; @@ -409,16 +410,19 @@ public void testRefreshOnReadFailureAfterUnbuffer(IOException ex) .thenReturn(blockLocationInfo); when(blockLocationInfo.getPipeline()).thenReturn(newPipeline); - BlockInputStream subject = new BlockInputStream(blockID, blockSize, + BlockInputStream subject = new BlockInputStream( + new BlockLocationInfo(new BlockLocationInfo.Builder().setBlockID(blockID).setLength(blockSize)), pipeline, null, false, clientFactory, refreshFunction) { @Override - protected List getChunkInfoListUsingClient() { - return chunks; + protected ChunkInputStream createChunkInputStream(ChunkInfo chunkInfo) { + return stream; } @Override - protected ChunkInputStream createChunkInputStream(ChunkInfo chunkInfo) { - return stream; + protected ContainerProtos.BlockData getBlockDataUsingClient() throws IOException { + BlockID blockID = getBlockID(); + ContainerProtos.DatanodeBlockID datanodeBlockID = blockID.getDatanodeBlockIDProtobuf(); + return ContainerProtos.BlockData.newBuilder().addAllChunks(chunks).setBlockID(datanodeBlockID).build(); } }; diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockLocationInfo.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockLocationInfo.java index 019e16c2f13f..a6b291c3f48a 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockLocationInfo.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockLocationInfo.java @@ -40,6 +40,8 @@ public class BlockLocationInfo { // PartNumber is set for Multipart upload Keys. private int partNumber; + // The block is under construction. Apply to hsynced file last block. + private boolean underConstruction; protected BlockLocationInfo(Builder builder) { this.blockID = builder.blockID; @@ -111,6 +113,14 @@ public int getPartNumber() { return partNumber; } + public void setUnderConstruction(boolean uc) { + this.underConstruction = uc; + } + + public boolean isUnderConstruction() { + return this.underConstruction; + } + /** * Builder of BlockLocationInfo. */ diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java index d0f3b5728a8b..52ef31daf590 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java @@ -26,6 +26,7 @@ import java.util.ListIterator; import java.util.Map; +import org.apache.hadoop.hdds.client.ContainerBlockID; import org.apache.hadoop.hdds.client.ReplicationConfig; import org.apache.hadoop.hdds.scm.ByteStringConversion; import org.apache.hadoop.hdds.scm.ContainerClientMetrics; @@ -85,6 +86,8 @@ public class BlockOutputStreamEntryPool implements KeyMetadataAware { private final ExcludeList excludeList; private final ContainerClientMetrics clientMetrics; private final StreamBufferArgs streamBufferArgs; + // update blocks on OM + private ContainerBlockID lastUpdatedBlockId = new ContainerBlockID(-1, -1); @SuppressWarnings({"parameternumber", "squid:S00107"}) public BlockOutputStreamEntryPool( @@ -368,7 +371,16 @@ void hsyncKey(long offset) throws IOException { if (keyArgs.getIsMultipartKey()) { throw new IOException("Hsync is unsupported for multipart keys."); } else { - omClient.hsyncKey(keyArgs, openID); + if (keyArgs.getLocationInfoList().size() == 0) { + omClient.hsyncKey(keyArgs, openID); + } else { + ContainerBlockID lastBLockId = keyArgs.getLocationInfoList().get(keyArgs.getLocationInfoList().size() - 1) + .getBlockID().getContainerBlockID(); + if (!lastUpdatedBlockId.equals(lastBLockId)) { + omClient.hsyncKey(keyArgs, openID); + lastUpdatedBlockId = lastBLockId; + } + } } } else { LOG.warn("Closing KeyOutputStream, but key args is null"); diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java index 4843c1c45e6c..6b6be1abd40e 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hdds.scm.storage.ByteReaderStrategy; import org.apache.hadoop.hdds.scm.storage.MultipartInputStream; import org.apache.hadoop.hdds.scm.storage.PartInputStream; +import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; @@ -61,8 +62,10 @@ private static List createStreams( boolean verifyChecksum, Function retryFunction, BlockInputStreamFactory blockStreamFactory) { + boolean isHsyncFile = keyInfo.getMetadata().containsKey(OzoneConsts.HSYNC_CLIENT_ID); List partStreams = new ArrayList<>(); - for (OmKeyLocationInfo omKeyLocationInfo : blockInfos) { + for (int i = 0; i < blockInfos.size(); i++) { + OmKeyLocationInfo omKeyLocationInfo = blockInfos.get(i); if (LOG.isDebugEnabled()) { LOG.debug("Adding stream for accessing {}. The stream will be " + "initialized later.", omKeyLocationInfo); @@ -85,6 +88,11 @@ private static List createStreams( retry = null; } + if (i == (blockInfos.size() - 1) && isHsyncFile) { + // block is under construction + omKeyLocationInfo.setUnderConstruction(true); + } + BlockExtendedInputStream stream = blockStreamFactory.create( keyInfo.getReplicationConfig(), diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java index 559b8da4982e..7a1c055b00e4 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java @@ -23,11 +23,16 @@ import java.io.IOException; import java.security.GeneralSecurityException; import java.security.PrivilegedExceptionAction; +import java.util.HashMap; import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicReference; +import org.apache.commons.lang3.RandomStringUtils; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileAlreadyExistsException; +import org.apache.hadoop.hdds.client.ReplicationFactor; +import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.utils.IOUtils; import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.crypto.CipherSuite; @@ -35,7 +40,6 @@ import org.apache.hadoop.crypto.CryptoOutputStream; import org.apache.hadoop.crypto.Encryptor; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; -import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -57,6 +61,7 @@ import org.apache.hadoop.ozone.client.io.KeyOutputStream; import org.apache.hadoop.ozone.client.io.OzoneOutputStream; import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.apache.hadoop.ozone.om.OMMetrics; import org.apache.hadoop.ozone.om.OzoneManager; import org.apache.hadoop.ozone.om.helpers.BucketLayout; @@ -107,7 +112,7 @@ public class TestHSync { @BeforeAll public static void init() throws Exception { - final int chunkSize = 16 << 10; + final int chunkSize = 4 << 10; final int flushSize = 2 * chunkSize; final int maxFlushSize = 2 * flushSize; final int blockSize = 2 * maxFlushSize; @@ -279,6 +284,52 @@ public void testOverwriteHSyncFile() throws Exception { } } + @Test + public void testHsyncKeyCallCount() throws Exception { + // Set the fs.defaultFS + final String rootPath = String.format("%s://%s/", + OZONE_OFS_URI_SCHEME, CONF.get(OZONE_OM_ADDRESS_KEY)); + CONF.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath); + + final String dir = OZONE_ROOT + bucket.getVolumeName() + + OZONE_URI_DELIMITER + bucket.getName(); + + OMMetrics omMetrics = cluster.getOzoneManager().getMetrics(); + omMetrics.resetNumKeyHSyncs(); + final byte[] data = new byte[128]; + ThreadLocalRandom.current().nextBytes(data); + + final Path file = new Path(dir, "file-hsync-then-close"); + long blockSize; + try (FileSystem fs = FileSystem.get(CONF)) { + blockSize = fs.getDefaultBlockSize(file); + long fileSize = 0; + try (FSDataOutputStream outputStream = fs.create(file, true)) { + // make sure at least writing 2 blocks data + while (fileSize <= blockSize) { + outputStream.write(data, 0, data.length); + outputStream.hsync(); + fileSize += data.length; + } + } + } + assertEquals(2, omMetrics.getNumKeyHSyncs()); + + // test file with all blocks pre-allocated + omMetrics.resetNumKeyHSyncs(); + long writtenSize = 0; + try (OzoneOutputStream outputStream = bucket.createKey("key-" + RandomStringUtils.randomNumeric(5), + blockSize * 2, ReplicationType.RATIS, ReplicationFactor.THREE, new HashMap<>())) { + // make sure at least writing 2 blocks data + while (writtenSize <= blockSize) { + outputStream.write(data, 0, data.length); + outputStream.hsync(); + writtenSize += data.length; + } + } + assertEquals(2, omMetrics.getNumKeyHSyncs()); + } + static void runTestHSync(FileSystem fs, Path file, int initialDataSize) throws Exception { try (StreamWithLength out = new StreamWithLength( @@ -409,7 +460,7 @@ public void testConcurrentWriteHSync() + OZONE_URI_DELIMITER + bucket.getName(); try (FileSystem fs = FileSystem.get(CONF)) { - for (int i = 0; i < 10; i++) { + for (int i = 0; i < 5; i++) { final Path file = new Path(dir, "file" + i); try (FSDataOutputStream out = fs.create(file, true)) { diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java index 86fa867060f5..2fbbbe153040 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java @@ -1100,6 +1100,11 @@ public long getNumKeyHSyncs() { return numKeyHSyncs.value(); } + @VisibleForTesting + public void resetNumKeyHSyncs() { + numKeyHSyncs.incr(-numKeyHSyncs.value()); + } + @VisibleForTesting public long getNumKeyCommitFails() { return numKeyCommitFails.value(); diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestChunkStreams.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestChunkStreams.java index dbcf9f6ea4e4..60cfcd1a2c11 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestChunkStreams.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestChunkStreams.java @@ -90,7 +90,7 @@ private List createInputStreams(String dataString) { } private BlockInputStream createStream(byte[] buf, int offset) { - return new BlockInputStream(null, 100, null, null, true, null) { + return new BlockInputStream(null, 100L, null, null, true, null) { private long pos; private final ByteArrayInputStream in = new ByteArrayInputStream(buf, offset, 100);