From f96cf9f99c088795eff9018358f9e3a2d14e9863 Mon Sep 17 00:00:00 2001 From: Sammi Chen Date: Mon, 22 Jan 2024 18:02:20 +0800 Subject: [PATCH 1/4] HDDS-9387. [hsync] Skip updating block length at OM during hsync --- .../db/managed/ManagedRocksObjectUtils.java | 2 +- .../client/io/BlockOutputStreamEntryPool.java | 17 ++- .../ozone/client/io/KeyOutputStream.java | 1 + .../org/apache/hadoop/fs/ozone/TestHSync.java | 139 ++++++++---------- .../org/apache/hadoop/ozone/om/OMMetrics.java | 5 + 5 files changed, 88 insertions(+), 76 deletions(-) diff --git a/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedRocksObjectUtils.java b/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedRocksObjectUtils.java index 8ae82a688bbb..9ddf0a8921be 100644 --- a/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedRocksObjectUtils.java +++ b/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedRocksObjectUtils.java @@ -54,7 +54,7 @@ static UncheckedAutoCloseable track(AutoCloseable object) { static void reportLeak(Class clazz, String stackTrace) { ManagedRocksObjectMetrics.INSTANCE.increaseLeakObject(); - String warning = String.format("%s is not closed properly", clazz.getSimpleName()); + String warning = String.format("%s is not closed properly", clazz.getSimpleName()); if (stackTrace != null && LOG.isDebugEnabled()) { String debugMessage = String.format("%nStackTrace for unclosed instance: %s", stackTrace); warning = warning.concat(debugMessage); diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java index d0f3b5728a8b..752c6732d8c1 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java @@ -25,7 +25,11 @@ import java.util.List; import java.util.ListIterator; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.hadoop.hdds.client.BlockID; +import org.apache.hadoop.hdds.client.ContainerBlockID; import org.apache.hadoop.hdds.client.ReplicationConfig; import org.apache.hadoop.hdds.scm.ByteStringConversion; import org.apache.hadoop.hdds.scm.ContainerClientMetrics; @@ -85,6 +89,8 @@ public class BlockOutputStreamEntryPool implements KeyMetadataAware { private final ExcludeList excludeList; private final ContainerClientMetrics clientMetrics; private final StreamBufferArgs streamBufferArgs; + // update blocks on OM + private ContainerBlockID lastUpdatedBlockId = new ContainerBlockID(-1, -1); @SuppressWarnings({"parameternumber", "squid:S00107"}) public BlockOutputStreamEntryPool( @@ -368,7 +374,16 @@ void hsyncKey(long offset) throws IOException { if (keyArgs.getIsMultipartKey()) { throw new IOException("Hsync is unsupported for multipart keys."); } else { - omClient.hsyncKey(keyArgs, openID); + if (keyArgs.getLocationInfoList().size() == 0) { + omClient.hsyncKey(keyArgs, openID); + } else { + ContainerBlockID lastBLockId = keyArgs.getLocationInfoList().get(keyArgs.getLocationInfoList().size() - 1) + .getBlockID().getContainerBlockID(); + if (!lastUpdatedBlockId.equals(lastBLockId)) { + omClient.hsyncKey(keyArgs, openID); + lastUpdatedBlockId = lastBLockId; + } + } } } else { LOG.warn("Closing KeyOutputStream, but key args is null"); diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java index 8b128e9cd945..290e610e24a8 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import java.util.stream.Collectors; diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java index 559b8da4982e..f95536cba171 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java @@ -23,11 +23,15 @@ import java.io.IOException; import java.security.GeneralSecurityException; import java.security.PrivilegedExceptionAction; +import java.util.HashMap; import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicReference; +import org.apache.commons.lang3.RandomStringUtils; import org.apache.hadoop.fs.FileAlreadyExistsException; +import org.apache.hadoop.hdds.client.ReplicationFactor; +import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.utils.IOUtils; import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.crypto.CipherSuite; @@ -35,7 +39,6 @@ import org.apache.hadoop.crypto.CryptoOutputStream; import org.apache.hadoop.crypto.Encryptor; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; -import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -57,6 +60,7 @@ import org.apache.hadoop.ozone.client.io.KeyOutputStream; import org.apache.hadoop.ozone.client.io.OzoneOutputStream; import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.apache.hadoop.ozone.om.OMMetrics; import org.apache.hadoop.ozone.om.OzoneManager; import org.apache.hadoop.ozone.om.helpers.BucketLayout; @@ -79,7 +83,6 @@ import static org.apache.hadoop.ozone.OzoneConsts.OZONE_OFS_URI_SCHEME; import static org.apache.hadoop.ozone.OzoneConsts.OZONE_ROOT; import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER; -import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_SCHEME; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DEFAULT_BUCKET_LAYOUT; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY; @@ -193,39 +196,6 @@ public void testKeyHSyncThenClose() throws Exception { } } - @Test - public void testO3fsHSync() throws Exception { - // Set the fs.defaultFS - final String rootPath = String.format("%s://%s.%s/", - OZONE_URI_SCHEME, bucket.getName(), bucket.getVolumeName()); - CONF.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath); - - try (FileSystem fs = FileSystem.get(CONF)) { - for (int i = 0; i < 10; i++) { - final Path file = new Path("/file" + i); - runTestHSync(fs, file, 1 << i); - } - } - } - - @Test - public void testOfsHSync() throws Exception { - // Set the fs.defaultFS - final String rootPath = String.format("%s://%s/", - OZONE_OFS_URI_SCHEME, CONF.get(OZONE_OM_ADDRESS_KEY)); - CONF.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath); - - final String dir = OZONE_ROOT + bucket.getVolumeName() - + OZONE_URI_DELIMITER + bucket.getName(); - - try (FileSystem fs = FileSystem.get(CONF)) { - for (int i = 0; i < 10; i++) { - final Path file = new Path(dir, "file" + i); - runTestHSync(fs, file, 1 << i); - } - } - } - @Test public void testUncommittedBlocks() throws Exception { // Set the fs.defaultFS @@ -279,18 +249,69 @@ public void testOverwriteHSyncFile() throws Exception { } } - static void runTestHSync(FileSystem fs, Path file, int initialDataSize) - throws Exception { - try (StreamWithLength out = new StreamWithLength( - fs.create(file, true))) { - runTestHSync(fs, file, out, initialDataSize); - for (int i = 1; i < 5; i++) { - for (int j = -1; j <= 1; j++) { - int dataSize = (1 << (i * 5)) + j; - runTestHSync(fs, file, out, dataSize); + @Test + public void testHsyncKeyCallCount() throws Exception { + // Set the fs.defaultFS + final String rootPath = String.format("%s://%s/", + OZONE_OFS_URI_SCHEME, CONF.get(OZONE_OM_ADDRESS_KEY)); + CONF.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath); + + final String dir = OZONE_ROOT + bucket.getVolumeName() + + OZONE_URI_DELIMITER + bucket.getName(); + + OMMetrics omMetrics = cluster.getOzoneManager().getMetrics(); + omMetrics.resetNumKeyHSyncs(); + String data = "random data"; + final Path file = new Path(dir, "file-hsync-then-close"); + long blockSize; + try (FileSystem fs = FileSystem.get(CONF)) { + blockSize = fs.getDefaultBlockSize(file); + long fileSize = 0; + try (FSDataOutputStream outputStream = fs.create(file, true)) { + // make sure at least writing 2 blocks data + while (fileSize < blockSize) { + outputStream.write(data.getBytes(UTF_8), 0, data.length()); + outputStream.hsync(); + fileSize += data.length(); } } } + assertEquals(2, omMetrics.getNumKeyHSyncs()); + } + + @Test + public void testPreAllocatedFileHsyncKeyCallCount() throws Exception { + // Set the fs.defaultFS + final String rootPath = String.format("%s://%s/", + OZONE_OFS_URI_SCHEME, CONF.get(OZONE_OM_ADDRESS_KEY)); + CONF.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath); + + final String dir = OZONE_ROOT + bucket.getVolumeName() + + OZONE_URI_DELIMITER + bucket.getName(); + + String data = "random data"; + String keyName = "key-" + RandomStringUtils.randomNumeric(5); + final Path file = new Path(dir, keyName); + long blockSize; + long fileSize; + try (FileSystem fs = FileSystem.get(CONF)) { + blockSize = fs.getDefaultBlockSize(file); + fileSize = 2 * blockSize; + } + + OMMetrics omMetrics = cluster.getOzoneManager().getMetrics(); + omMetrics.resetNumKeyHSyncs(); + long writtenSize = 0; + try (OzoneOutputStream outputStream = bucket.createKey(keyName, fileSize, ReplicationType.RATIS, + ReplicationFactor.THREE, new HashMap<>())) { + // make sure at least writing 2 blocks data + while (writtenSize < blockSize) { + outputStream.write(data.getBytes(UTF_8), 0, data.length()); + outputStream.hsync(); + writtenSize += data.length(); + } + } + assertEquals(2, omMetrics.getNumKeyHSyncs()); } private static class StreamWithLength implements Closeable { @@ -317,36 +338,6 @@ public void close() throws IOException { } } - static void runTestHSync(FileSystem fs, Path file, - StreamWithLength out, int dataSize) - throws Exception { - final long length = out.getLength(); - LOG.info("runTestHSync {} with size {}, skipLength={}", - file, dataSize, length); - final byte[] data = new byte[dataSize]; - ThreadLocalRandom.current().nextBytes(data); - out.writeAndHsync(data); - - final byte[] buffer = new byte[4 << 10]; - int offset = 0; - try (FSDataInputStream in = fs.open(file)) { - final long skipped = in.skip(length); - assertEquals(length, skipped); - - for (; ;) { - final int n = in.read(buffer, 0, buffer.length); - if (n <= 0) { - break; - } - for (int i = 0; i < n; i++) { - assertEquals(data[offset + i], buffer[i]); - } - offset += n; - } - } - assertEquals(data.length, offset); - } - private void runConcurrentWriteHSync(Path file, final FSDataOutputStream out, int initialDataSize) throws InterruptedException, IOException { diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java index 86fa867060f5..c0e7fd6f679b 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java @@ -1100,6 +1100,11 @@ public long getNumKeyHSyncs() { return numKeyHSyncs.value(); } + @VisibleForTesting + public void resetNumKeyHSyncs() { + numKeyHSyncs.incr( - numKeyHSyncs.value()); + } + @VisibleForTesting public long getNumKeyCommitFails() { return numKeyCommitFails.value(); From db75f83e08d0477fae9d06c885e89922d5e9c1fa Mon Sep 17 00:00:00 2001 From: Sammi Chen Date: Mon, 22 Jan 2024 22:12:06 +0800 Subject: [PATCH 2/4] fix checkstyle and revert accidental change --- .../hadoop/hdds/utils/db/managed/ManagedRocksObjectUtils.java | 2 +- .../hadoop/ozone/client/io/BlockOutputStreamEntryPool.java | 3 --- .../org/apache/hadoop/ozone/client/io/KeyOutputStream.java | 1 - .../src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java | 2 +- 4 files changed, 2 insertions(+), 6 deletions(-) diff --git a/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedRocksObjectUtils.java b/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedRocksObjectUtils.java index 9ddf0a8921be..8ae82a688bbb 100644 --- a/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedRocksObjectUtils.java +++ b/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedRocksObjectUtils.java @@ -54,7 +54,7 @@ static UncheckedAutoCloseable track(AutoCloseable object) { static void reportLeak(Class clazz, String stackTrace) { ManagedRocksObjectMetrics.INSTANCE.increaseLeakObject(); - String warning = String.format("%s is not closed properly", clazz.getSimpleName()); + String warning = String.format("%s is not closed properly", clazz.getSimpleName()); if (stackTrace != null && LOG.isDebugEnabled()) { String debugMessage = String.format("%nStackTrace for unclosed instance: %s", stackTrace); warning = warning.concat(debugMessage); diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java index 752c6732d8c1..52ef31daf590 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java @@ -25,10 +25,7 @@ import java.util.List; import java.util.ListIterator; import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; -import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.client.ContainerBlockID; import org.apache.hadoop.hdds.client.ReplicationConfig; import org.apache.hadoop.hdds.scm.ByteStringConversion; diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java index 290e610e24a8..8b128e9cd945 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java @@ -24,7 +24,6 @@ import java.util.List; import java.util.Map; import java.util.UUID; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import java.util.stream.Collectors; diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java index c0e7fd6f679b..2fbbbe153040 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java @@ -1102,7 +1102,7 @@ public long getNumKeyHSyncs() { @VisibleForTesting public void resetNumKeyHSyncs() { - numKeyHSyncs.incr( - numKeyHSyncs.value()); + numKeyHSyncs.incr(-numKeyHSyncs.value()); } @VisibleForTesting From c7c41689ab92f19c8d81dbd3e4280d9eba39fedb Mon Sep 17 00:00:00 2001 From: Sammi Chen Date: Tue, 23 Jan 2024 16:37:03 +0800 Subject: [PATCH 3/4] fix failed ITestOzoneContractCreate and bring back removed tests in TestHSync --- .../hdds/scm/storage/BlockInputStream.java | 47 +++++-- .../io/BlockInputStreamFactoryImpl.java | 4 +- .../scm/storage/DummyBlockInputStream.java | 11 +- .../DummyBlockInputStreamWithRetry.java | 7 +- .../scm/storage/TestBlockInputStream.java | 13 +- .../hdds/scm/storage/BlockLocationInfo.java | 10 ++ .../ozone/client/io/KeyInputStream.java | 10 +- .../org/apache/hadoop/fs/ozone/TestHSync.java | 126 +++++++++++++----- .../hadoop/ozone/om/TestChunkStreams.java | 2 +- 9 files changed, 174 insertions(+), 56 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java index 385ea6d0c3ea..34d0bcaa7b1f 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.client.StandaloneReplicationConfig; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.BlockData; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.DatanodeBlockID; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.GetBlockResponseProto; @@ -66,7 +67,8 @@ public class BlockInputStream extends BlockExtendedInputStream { LoggerFactory.getLogger(BlockInputStream.class); private final BlockID blockID; - private final long length; + private long length; + private final BlockLocationInfo blockInfo; private final AtomicReference pipelineRef = new AtomicReference<>(); private final AtomicReference> tokenRef = @@ -111,12 +113,13 @@ public class BlockInputStream extends BlockExtendedInputStream { private final Function refreshFunction; - public BlockInputStream(BlockID blockId, long blockLen, Pipeline pipeline, + public BlockInputStream(BlockLocationInfo blockInfo, Pipeline pipeline, Token token, boolean verifyChecksum, XceiverClientFactory xceiverClientFactory, Function refreshFunction) { - this.blockID = blockId; - this.length = blockLen; + this.blockInfo = blockInfo; + this.blockID = blockInfo.getBlockID(); + this.length = blockInfo.getLength(); setPipeline(pipeline); tokenRef.set(token); this.verifyChecksum = verifyChecksum; @@ -124,14 +127,16 @@ public BlockInputStream(BlockID blockId, long blockLen, Pipeline pipeline, this.refreshFunction = refreshFunction; } + // only for unit tests public BlockInputStream(BlockID blockId, long blockLen, Pipeline pipeline, Token token, boolean verifyChecksum, - XceiverClientFactory xceiverClientFactory - ) { - this(blockId, blockLen, pipeline, token, verifyChecksum, + XceiverClientFactory xceiverClientFactory) { + this(new BlockLocationInfo(new BlockLocationInfo.Builder().setBlockID(blockId).setLength(blockLen)), + pipeline, token, verifyChecksum, xceiverClientFactory, null); } + /** * Initialize the BlockInputStream. Get the BlockData (list of chunks) from * the Container and create the ChunkInputStreams for each Chunk in the Block. @@ -143,11 +148,16 @@ public synchronized void initialize() throws IOException { return; } + BlockData blockData = null; List chunks = null; IOException catchEx = null; do { try { - chunks = getChunkInfoList(); + blockData = getBlockData(); + chunks = blockData.getChunksList(); + if (blockInfo != null && blockInfo.isUnderConstruction()) { + length = blockData.getSize(); + } break; // If we get a StorageContainerException or an IOException due to // datanodes are not reachable, refresh to get the latest pipeline @@ -228,17 +238,29 @@ private void refreshBlockInfo(IOException cause) throws IOException { * Send RPC call to get the block info from the container. * @return List of chunks in this block. */ + @VisibleForTesting protected List getChunkInfoList() throws IOException { + return getBlockData().getChunksList(); + } + + /** + * Send RPC call to get the block info from the container. + * @return BlockData. + */ + protected BlockData getBlockData() throws IOException { acquireClient(); try { - return getChunkInfoListUsingClient(); + return getBlockDataUsingClient(); } finally { releaseClient(); } } - @VisibleForTesting - protected List getChunkInfoListUsingClient() throws IOException { + /** + * Send RPC call to get the block info from the container. + * @return BlockData. + */ + protected BlockData getBlockDataUsingClient() throws IOException { final Pipeline pipeline = xceiverClient.getPipeline(); if (LOG.isDebugEnabled()) { @@ -258,8 +280,7 @@ protected List getChunkInfoListUsingClient() throws IOException { GetBlockResponseProto response = ContainerProtocolCalls.getBlock( xceiverClient, VALIDATORS, blkIDBuilder.build(), tokenRef.get()); - - return response.getBlockData().getChunksList(); + return response.getBlockData(); } private void setPipeline(Pipeline pipeline) { diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java index 40063f9ce492..b9233f42d555 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java @@ -84,8 +84,8 @@ public BlockExtendedInputStream create(ReplicationConfig repConfig, blockInfo, verifyChecksum, xceiverFactory, refreshFunction, ecBlockStreamFactory); } else { - return new BlockInputStream(blockInfo.getBlockID(), blockInfo.getLength(), - pipeline, token, verifyChecksum, xceiverFactory, refreshFunction); + return new BlockInputStream(blockInfo, pipeline, token, verifyChecksum, xceiverFactory, + refreshFunction); } } diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStream.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStream.java index 3e7779f0d10a..45907e277c11 100644 --- a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStream.java +++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStream.java @@ -23,6 +23,7 @@ import java.util.function.Function; import org.apache.hadoop.hdds.client.BlockID; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo; import org.apache.hadoop.hdds.scm.XceiverClientFactory; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; @@ -49,13 +50,21 @@ class DummyBlockInputStream extends BlockInputStream { Function refreshFunction, List chunkList, Map chunks) { - super(blockId, blockLen, pipeline, token, verifyChecksum, + super(new BlockLocationInfo(new BlockLocationInfo.Builder().setBlockID(blockId).setLength(blockLen)), + pipeline, token, verifyChecksum, xceiverClientManager, refreshFunction); this.chunkDataMap = chunks; this.chunks = chunkList; } + @Override + protected ContainerProtos.BlockData getBlockData() throws IOException { + BlockID blockID = getBlockID(); + ContainerProtos.DatanodeBlockID datanodeBlockID = blockID.getDatanodeBlockIDProtobuf(); + return ContainerProtos.BlockData.newBuilder().addAllChunks(chunks).setBlockID(datanodeBlockID).build(); + } + @Override protected List getChunkInfoList() throws IOException { return chunks; diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStreamWithRetry.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStreamWithRetry.java index 24a35745144d..d66c76dcddcb 100644 --- a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStreamWithRetry.java +++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStreamWithRetry.java @@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.hdds.client.BlockID; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo; import org.apache.hadoop.hdds.scm.XceiverClientFactory; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; @@ -73,16 +74,16 @@ final class DummyBlockInputStreamWithRetry } @Override - protected List getChunkInfoList() throws IOException { + protected ContainerProtos.BlockData getBlockData() throws IOException { if (getChunkInfoCount == 0) { getChunkInfoCount++; if (ioException != null) { - throw ioException; + throw ioException; } throw new StorageContainerException("Exception encountered", CONTAINER_NOT_FOUND); } else { - return super.getChunkInfoList(); + return super.getBlockData(); } } } diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java index 3dc5a82b3355..2d50128a00b0 100644 --- a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java +++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java @@ -22,6 +22,7 @@ import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.client.ContainerBlockID; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo; import org.apache.hadoop.hdds.scm.XceiverClientFactory; @@ -409,10 +410,11 @@ public void testRefreshOnReadFailureAfterUnbuffer(IOException ex) .thenReturn(blockLocationInfo); when(blockLocationInfo.getPipeline()).thenReturn(newPipeline); - BlockInputStream subject = new BlockInputStream(blockID, blockSize, + BlockInputStream subject = new BlockInputStream( + new BlockLocationInfo(new BlockLocationInfo.Builder().setBlockID(blockID).setLength(blockSize)), pipeline, null, false, clientFactory, refreshFunction) { @Override - protected List getChunkInfoListUsingClient() { + protected List getChunkInfoList() { return chunks; } @@ -420,6 +422,13 @@ protected List getChunkInfoListUsingClient() { protected ChunkInputStream createChunkInputStream(ChunkInfo chunkInfo) { return stream; } + + @Override + protected ContainerProtos.BlockData getBlockDataUsingClient() throws IOException { + BlockID blockID = getBlockID(); + ContainerProtos.DatanodeBlockID datanodeBlockID = blockID.getDatanodeBlockIDProtobuf(); + return ContainerProtos.BlockData.newBuilder().addAllChunks(chunks).setBlockID(datanodeBlockID).build(); + } }; try { diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockLocationInfo.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockLocationInfo.java index 019e16c2f13f..a6b291c3f48a 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockLocationInfo.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockLocationInfo.java @@ -40,6 +40,8 @@ public class BlockLocationInfo { // PartNumber is set for Multipart upload Keys. private int partNumber; + // The block is under construction. Apply to hsynced file last block. + private boolean underConstruction; protected BlockLocationInfo(Builder builder) { this.blockID = builder.blockID; @@ -111,6 +113,14 @@ public int getPartNumber() { return partNumber; } + public void setUnderConstruction(boolean uc) { + this.underConstruction = uc; + } + + public boolean isUnderConstruction() { + return this.underConstruction; + } + /** * Builder of BlockLocationInfo. */ diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java index 4843c1c45e6c..01922a7abe7b 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hdds.scm.storage.ByteReaderStrategy; import org.apache.hadoop.hdds.scm.storage.MultipartInputStream; import org.apache.hadoop.hdds.scm.storage.PartInputStream; +import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; @@ -61,8 +62,10 @@ private static List createStreams( boolean verifyChecksum, Function retryFunction, BlockInputStreamFactory blockStreamFactory) { + boolean isHsyncFile = keyInfo.getMetadata().containsKey(OzoneConsts.HSYNC_CLIENT_ID); List partStreams = new ArrayList<>(); - for (OmKeyLocationInfo omKeyLocationInfo : blockInfos) { + for (int i = 0; i < blockInfos.size(); i ++) { + OmKeyLocationInfo omKeyLocationInfo = blockInfos.get(i); if (LOG.isDebugEnabled()) { LOG.debug("Adding stream for accessing {}. The stream will be " + "initialized later.", omKeyLocationInfo); @@ -85,6 +88,11 @@ private static List createStreams( retry = null; } + if (i == (blockInfos.size() - 1) && isHsyncFile) { + // fetch last block length from DN + omKeyLocationInfo.setUnderConstruction(true); + } + BlockExtendedInputStream stream = blockStreamFactory.create( keyInfo.getReplicationConfig(), diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java index f95536cba171..ad18c82ee21f 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java @@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.lang3.RandomStringUtils; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.hdds.client.ReplicationFactor; import org.apache.hadoop.hdds.client.ReplicationType; @@ -83,6 +84,7 @@ import static org.apache.hadoop.ozone.OzoneConsts.OZONE_OFS_URI_SCHEME; import static org.apache.hadoop.ozone.OzoneConsts.OZONE_ROOT; import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER; +import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_SCHEME; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DEFAULT_BUCKET_LAYOUT; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY; @@ -110,7 +112,7 @@ public class TestHSync { @BeforeAll public static void init() throws Exception { - final int chunkSize = 16 << 10; + final int chunkSize = 4 << 10; final int flushSize = 2 * chunkSize; final int maxFlushSize = 2 * flushSize; final int blockSize = 2 * maxFlushSize; @@ -196,6 +198,39 @@ public void testKeyHSyncThenClose() throws Exception { } } + @Test + public void testO3fsHSync() throws Exception { + // Set the fs.defaultFS + final String rootPath = String.format("%s://%s.%s/", + OZONE_URI_SCHEME, bucket.getName(), bucket.getVolumeName()); + CONF.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath); + + try (FileSystem fs = FileSystem.get(CONF)) { + for (int i = 0; i < 10; i++) { + final Path file = new Path("/file" + i); + runTestHSync(fs, file, 1 << i); + } + } + } + + @Test + public void testOfsHSync() throws Exception { + // Set the fs.defaultFS + final String rootPath = String.format("%s://%s/", + OZONE_OFS_URI_SCHEME, CONF.get(OZONE_OM_ADDRESS_KEY)); + CONF.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath); + + final String dir = OZONE_ROOT + bucket.getVolumeName() + + OZONE_URI_DELIMITER + bucket.getName(); + + try (FileSystem fs = FileSystem.get(CONF)) { + for (int i = 0; i < 10; i++) { + final Path file = new Path(dir, "file" + i); + runTestHSync(fs, file, 1 << i); + } + } + } + @Test public void testUncommittedBlocks() throws Exception { // Set the fs.defaultFS @@ -261,7 +296,9 @@ public void testHsyncKeyCallCount() throws Exception { OMMetrics omMetrics = cluster.getOzoneManager().getMetrics(); omMetrics.resetNumKeyHSyncs(); - String data = "random data"; + final byte[] data = new byte[128]; + ThreadLocalRandom.current().nextBytes(data); + final Path file = new Path(dir, "file-hsync-then-close"); long blockSize; try (FileSystem fs = FileSystem.get(CONF)) { @@ -269,51 +306,44 @@ public void testHsyncKeyCallCount() throws Exception { long fileSize = 0; try (FSDataOutputStream outputStream = fs.create(file, true)) { // make sure at least writing 2 blocks data - while (fileSize < blockSize) { - outputStream.write(data.getBytes(UTF_8), 0, data.length()); + while (fileSize <= blockSize) { + outputStream.write(data, 0, data.length); outputStream.hsync(); - fileSize += data.length(); + fileSize += data.length; } } } assertEquals(2, omMetrics.getNumKeyHSyncs()); - } - @Test - public void testPreAllocatedFileHsyncKeyCallCount() throws Exception { - // Set the fs.defaultFS - final String rootPath = String.format("%s://%s/", - OZONE_OFS_URI_SCHEME, CONF.get(OZONE_OM_ADDRESS_KEY)); - CONF.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath); - - final String dir = OZONE_ROOT + bucket.getVolumeName() - + OZONE_URI_DELIMITER + bucket.getName(); - - String data = "random data"; - String keyName = "key-" + RandomStringUtils.randomNumeric(5); - final Path file = new Path(dir, keyName); - long blockSize; - long fileSize; - try (FileSystem fs = FileSystem.get(CONF)) { - blockSize = fs.getDefaultBlockSize(file); - fileSize = 2 * blockSize; - } - - OMMetrics omMetrics = cluster.getOzoneManager().getMetrics(); + // test file with all blocks pre-allocated omMetrics.resetNumKeyHSyncs(); long writtenSize = 0; - try (OzoneOutputStream outputStream = bucket.createKey(keyName, fileSize, ReplicationType.RATIS, - ReplicationFactor.THREE, new HashMap<>())) { + try (OzoneOutputStream outputStream = bucket.createKey("key-" + RandomStringUtils.randomNumeric(5) + , blockSize * 2, ReplicationType.RATIS, ReplicationFactor.THREE, new HashMap<>())) { // make sure at least writing 2 blocks data - while (writtenSize < blockSize) { - outputStream.write(data.getBytes(UTF_8), 0, data.length()); + while (writtenSize <= blockSize) { + outputStream.write(data, 0, data.length); outputStream.hsync(); - writtenSize += data.length(); + writtenSize += data.length; } } assertEquals(2, omMetrics.getNumKeyHSyncs()); } + static void runTestHSync(FileSystem fs, Path file, int initialDataSize) + throws Exception { + try (StreamWithLength out = new StreamWithLength( + fs.create(file, true))) { + runTestHSync(fs, file, out, initialDataSize); + for (int i = 1; i < 5; i++) { + for (int j = -1; j <= 1; j++) { + int dataSize = (1 << (i * 5)) + j; + runTestHSync(fs, file, out, dataSize); + } + } + } + } + private static class StreamWithLength implements Closeable { private final FSDataOutputStream out; private long length = 0; @@ -338,6 +368,36 @@ public void close() throws IOException { } } + static void runTestHSync(FileSystem fs, Path file, + StreamWithLength out, int dataSize) + throws Exception { + final long length = out.getLength(); + LOG.info("runTestHSync {} with size {}, skipLength={}", + file, dataSize, length); + final byte[] data = new byte[dataSize]; + ThreadLocalRandom.current().nextBytes(data); + out.writeAndHsync(data); + + final byte[] buffer = new byte[4 << 10]; + int offset = 0; + try (FSDataInputStream in = fs.open(file)) { + final long skipped = in.skip(length); + assertEquals(length, skipped); + + for (; ;) { + final int n = in.read(buffer, 0, buffer.length); + if (n <= 0) { + break; + } + for (int i = 0; i < n; i++) { + assertEquals(data[offset + i], buffer[i]); + } + offset += n; + } + } + assertEquals(data.length, offset); + } + private void runConcurrentWriteHSync(Path file, final FSDataOutputStream out, int initialDataSize) throws InterruptedException, IOException { @@ -400,7 +460,7 @@ public void testConcurrentWriteHSync() + OZONE_URI_DELIMITER + bucket.getName(); try (FileSystem fs = FileSystem.get(CONF)) { - for (int i = 0; i < 10; i++) { + for (int i = 0; i < 5; i++) { final Path file = new Path(dir, "file" + i); try (FSDataOutputStream out = fs.create(file, true)) { diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestChunkStreams.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestChunkStreams.java index dbcf9f6ea4e4..60cfcd1a2c11 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestChunkStreams.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestChunkStreams.java @@ -90,7 +90,7 @@ private List createInputStreams(String dataString) { } private BlockInputStream createStream(byte[] buf, int offset) { - return new BlockInputStream(null, 100, null, null, true, null) { + return new BlockInputStream(null, 100L, null, null, true, null) { private long pos; private final ByteArrayInputStream in = new ByteArrayInputStream(buf, offset, 100); From 6343c5ea0cb47686db6e6b5e086ac3445794d2dc Mon Sep 17 00:00:00 2001 From: Sammi Chen Date: Tue, 23 Jan 2024 17:09:30 +0800 Subject: [PATCH 4/4] remove unused functions --- .../hadoop/hdds/scm/storage/BlockInputStream.java | 10 +--------- .../hadoop/hdds/scm/storage/DummyBlockInputStream.java | 5 ----- .../hadoop/hdds/scm/storage/TestBlockInputStream.java | 5 ----- .../apache/hadoop/ozone/client/io/KeyInputStream.java | 4 ++-- .../java/org/apache/hadoop/fs/ozone/TestHSync.java | 4 ++-- 5 files changed, 5 insertions(+), 23 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java index 34d0bcaa7b1f..a12f9067ce2d 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java @@ -156,6 +156,7 @@ public synchronized void initialize() throws IOException { blockData = getBlockData(); chunks = blockData.getChunksList(); if (blockInfo != null && blockInfo.isUnderConstruction()) { + // use the block length from DN if block is under construction. length = blockData.getSize(); } break; @@ -234,15 +235,6 @@ private void refreshBlockInfo(IOException cause) throws IOException { } } - /** - * Send RPC call to get the block info from the container. - * @return List of chunks in this block. - */ - @VisibleForTesting - protected List getChunkInfoList() throws IOException { - return getBlockData().getChunksList(); - } - /** * Send RPC call to get the block info from the container. * @return BlockData. diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStream.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStream.java index 45907e277c11..ca3199d8acfb 100644 --- a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStream.java +++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStream.java @@ -65,11 +65,6 @@ protected ContainerProtos.BlockData getBlockData() throws IOException { return ContainerProtos.BlockData.newBuilder().addAllChunks(chunks).setBlockID(datanodeBlockID).build(); } - @Override - protected List getChunkInfoList() throws IOException { - return chunks; - } - @Override protected ChunkInputStream createChunkInputStream(ChunkInfo chunkInfo) { return new DummyChunkInputStream( diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java index 2d50128a00b0..9d1feafb9a48 100644 --- a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java +++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java @@ -413,11 +413,6 @@ public void testRefreshOnReadFailureAfterUnbuffer(IOException ex) BlockInputStream subject = new BlockInputStream( new BlockLocationInfo(new BlockLocationInfo.Builder().setBlockID(blockID).setLength(blockSize)), pipeline, null, false, clientFactory, refreshFunction) { - @Override - protected List getChunkInfoList() { - return chunks; - } - @Override protected ChunkInputStream createChunkInputStream(ChunkInfo chunkInfo) { return stream; diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java index 01922a7abe7b..6b6be1abd40e 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java @@ -64,7 +64,7 @@ private static List createStreams( BlockInputStreamFactory blockStreamFactory) { boolean isHsyncFile = keyInfo.getMetadata().containsKey(OzoneConsts.HSYNC_CLIENT_ID); List partStreams = new ArrayList<>(); - for (int i = 0; i < blockInfos.size(); i ++) { + for (int i = 0; i < blockInfos.size(); i++) { OmKeyLocationInfo omKeyLocationInfo = blockInfos.get(i); if (LOG.isDebugEnabled()) { LOG.debug("Adding stream for accessing {}. The stream will be " + @@ -89,7 +89,7 @@ private static List createStreams( } if (i == (blockInfos.size() - 1) && isHsyncFile) { - // fetch last block length from DN + // block is under construction omKeyLocationInfo.setUnderConstruction(true); } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java index ad18c82ee21f..7a1c055b00e4 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java @@ -318,8 +318,8 @@ public void testHsyncKeyCallCount() throws Exception { // test file with all blocks pre-allocated omMetrics.resetNumKeyHSyncs(); long writtenSize = 0; - try (OzoneOutputStream outputStream = bucket.createKey("key-" + RandomStringUtils.randomNumeric(5) - , blockSize * 2, ReplicationType.RATIS, ReplicationFactor.THREE, new HashMap<>())) { + try (OzoneOutputStream outputStream = bucket.createKey("key-" + RandomStringUtils.randomNumeric(5), + blockSize * 2, ReplicationType.RATIS, ReplicationFactor.THREE, new HashMap<>())) { // make sure at least writing 2 blocks data while (writtenSize <= blockSize) { outputStream.write(data, 0, data.length);