From 9a4a6ba47a04776162c095813f64280fc17842d9 Mon Sep 17 00:00:00 2001 From: "zhanghaobo@kanzhun.com" Date: Mon, 18 Dec 2023 10:31:56 +0800 Subject: [PATCH 1/7] HDFS-17293. First packet data + checksum size will be set to 516 bytes when writing to a new block. --- .../java/org/apache/hadoop/hdfs/DFSOutputStream.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index 6ddd56cf72703..f485dc323a5d4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -536,8 +536,13 @@ protected void adjustChunkBoundary() { } if (!getStreamer().getAppendChunk()) { - final int psize = (int) Math - .min(blockSize - getStreamer().getBytesCurBlock(), writePacketSize); + int psize = 0; + if (blockSize == getStreamer().getBytesCurBlock()) { + psize = writePacketSize; + } else { + psize = (int) Math + .min(blockSize - getStreamer().getBytesCurBlock(), writePacketSize); + } computePacketChunkSize(psize, bytesPerChecksum); } } From 338aa6fec80680032c5bbba38f17407aff0a6ea2 Mon Sep 17 00:00:00 2001 From: "zhanghaobo@kanzhun.com" Date: Wed, 27 Dec 2023 22:55:49 +0800 Subject: [PATCH 2/7] add unit test. --- .../hadoop/hdfs/TestDFSOutputStream.java | 33 +++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java index 0f1b965cc2649..6e6dd9c7d91fa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java @@ -63,6 +63,7 @@ import org.junit.BeforeClass; import org.junit.Test; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_BLOCK_SIZE_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Write.RECOVER_LEASE_ON_CLOSE_EXCEPTION_KEY; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -184,6 +185,38 @@ public void testPreventOverflow() throws IOException, NoSuchFieldException, runAdjustChunkBoundary(configuredWritePacketSize, finalWritePacketSize); } + @Test(timeout=60000) + public void testFirstPacketSizeInNewBlocks() throws IOException { + final long BLOCK_SIZE = 1L * 1024 * 1024; + final int numDataNodes = 3; + final Configuration dfsConf = new Configuration(); + dfsConf.setLong(DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); + MiniDFSCluster dfsCluster = null; + dfsCluster = new MiniDFSCluster.Builder(dfsConf).numDataNodes(numDataNodes).build(); + dfsCluster.waitActive(); + + DistributedFileSystem fs = dfsCluster.getFileSystem(); + Path fileName = new Path("/testfile.dat"); + FSDataOutputStream fos = fs.create(fileName); + + long loop = 0; + Random r = new Random(); + byte[] buf = new byte[1 * 1024 * 1024]; + r.nextBytes(buf); + fos.write(buf); + fos.hflush(); + + while (loop < 20) { + r.nextBytes(buf); + fos.write(buf); + fos.hflush(); + loop++; + Assert.assertNotEquals(516, ((DFSOutputStream)fos.getWrappedStream()).packetSize); + } + + fos.close(); + } + /** * @configuredWritePacketSize the configured WritePacketSize. * @finalWritePacketSize the final WritePacketSize picked by From 87ae30afd0c2473239720b86308b8e3c5050320a Mon Sep 17 00:00:00 2001 From: "zhanghaobo@kanzhun.com" Date: Thu, 28 Dec 2023 08:09:23 +0800 Subject: [PATCH 3/7] fix checkstyle. --- .../java/org/apache/hadoop/hdfs/TestDFSOutputStream.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java index 6e6dd9c7d91fa..0a47cebc232c2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java @@ -187,10 +187,10 @@ public void testPreventOverflow() throws IOException, NoSuchFieldException, @Test(timeout=60000) public void testFirstPacketSizeInNewBlocks() throws IOException { - final long BLOCK_SIZE = 1L * 1024 * 1024; + final long blockSize = 1L * 1024 * 1024; final int numDataNodes = 3; final Configuration dfsConf = new Configuration(); - dfsConf.setLong(DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); + dfsConf.setLong(DFS_BLOCK_SIZE_KEY, blockSize); MiniDFSCluster dfsCluster = null; dfsCluster = new MiniDFSCluster.Builder(dfsConf).numDataNodes(numDataNodes).build(); dfsCluster.waitActive(); @@ -205,7 +205,7 @@ public void testFirstPacketSizeInNewBlocks() throws IOException { r.nextBytes(buf); fos.write(buf); fos.hflush(); - + while (loop < 20) { r.nextBytes(buf); fos.write(buf); @@ -213,7 +213,7 @@ public void testFirstPacketSizeInNewBlocks() throws IOException { loop++; Assert.assertNotEquals(516, ((DFSOutputStream)fos.getWrappedStream()).packetSize); } - + fos.close(); } From c1ba0b58dfe5d8c6aa73b97694c3362162618581 Mon Sep 17 00:00:00 2001 From: "zhanghaobo@kanzhun.com" Date: Thu, 28 Dec 2023 09:50:30 +0800 Subject: [PATCH 4/7] optimize united tests. --- .../java/org/apache/hadoop/hdfs/TestDFSOutputStream.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java index 0a47cebc232c2..498c07733ff04 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java @@ -32,6 +32,7 @@ import java.util.Random; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.zip.Checksum; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CreateFlag; @@ -58,6 +59,7 @@ import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.PathUtils; import org.apache.hadoop.test.Whitebox; +import org.apache.hadoop.util.DataChecksum; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; @@ -198,6 +200,7 @@ public void testFirstPacketSizeInNewBlocks() throws IOException { DistributedFileSystem fs = dfsCluster.getFileSystem(); Path fileName = new Path("/testfile.dat"); FSDataOutputStream fos = fs.create(fileName); + DataChecksum crc32c = DataChecksum.newDataChecksum(DataChecksum.Type.CRC32C, 512); long loop = 0; Random r = new Random(); @@ -211,7 +214,8 @@ public void testFirstPacketSizeInNewBlocks() throws IOException { fos.write(buf); fos.hflush(); loop++; - Assert.assertNotEquals(516, ((DFSOutputStream)fos.getWrappedStream()).packetSize); + Assert.assertNotEquals(crc32c.getBytesPerChecksum() + crc32c.getChecksumSize(), + ((DFSOutputStream)fos.getWrappedStream()).packetSize); } fos.close(); From bad42b71d5ce0804f6647bfd8b522fa556316323 Mon Sep 17 00:00:00 2001 From: "zhanghaobo@kanzhun.com" Date: Thu, 28 Dec 2023 13:57:04 +0800 Subject: [PATCH 5/7] trigger yetus again. From 38d52f6b6cda73a6e6a22832d92d561c5e376df3 Mon Sep 17 00:00:00 2001 From: "zhanghaobo@kanzhun.com" Date: Thu, 28 Dec 2023 19:06:57 +0800 Subject: [PATCH 6/7] fix checkstyle --- .../test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java | 1 - 1 file changed, 1 deletion(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java index 498c07733ff04..465f9dfea80ce 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java @@ -32,7 +32,6 @@ import java.util.Random; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.zip.Checksum; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CreateFlag; From 96da2c85a6d951d1897c0d994f5379e2ec1db9e2 Mon Sep 17 00:00:00 2001 From: "zhanghaobo@kanzhun.com" Date: Fri, 19 Jan 2024 17:15:59 +0800 Subject: [PATCH 7/7] fix unit test --- .../hadoop/hdfs/TestDFSOutputStream.java | 75 ++++++++++--------- 1 file changed, 40 insertions(+), 35 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java index 465f9dfea80ce..bdb91f91bc5e4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java @@ -49,6 +49,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage; +import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; @@ -64,7 +65,6 @@ import org.junit.BeforeClass; import org.junit.Test; -import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_BLOCK_SIZE_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Write.RECOVER_LEASE_ON_CLOSE_EXCEPTION_KEY; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -186,40 +186,6 @@ public void testPreventOverflow() throws IOException, NoSuchFieldException, runAdjustChunkBoundary(configuredWritePacketSize, finalWritePacketSize); } - @Test(timeout=60000) - public void testFirstPacketSizeInNewBlocks() throws IOException { - final long blockSize = 1L * 1024 * 1024; - final int numDataNodes = 3; - final Configuration dfsConf = new Configuration(); - dfsConf.setLong(DFS_BLOCK_SIZE_KEY, blockSize); - MiniDFSCluster dfsCluster = null; - dfsCluster = new MiniDFSCluster.Builder(dfsConf).numDataNodes(numDataNodes).build(); - dfsCluster.waitActive(); - - DistributedFileSystem fs = dfsCluster.getFileSystem(); - Path fileName = new Path("/testfile.dat"); - FSDataOutputStream fos = fs.create(fileName); - DataChecksum crc32c = DataChecksum.newDataChecksum(DataChecksum.Type.CRC32C, 512); - - long loop = 0; - Random r = new Random(); - byte[] buf = new byte[1 * 1024 * 1024]; - r.nextBytes(buf); - fos.write(buf); - fos.hflush(); - - while (loop < 20) { - r.nextBytes(buf); - fos.write(buf); - fos.hflush(); - loop++; - Assert.assertNotEquals(crc32c.getBytesPerChecksum() + crc32c.getChecksumSize(), - ((DFSOutputStream)fos.getWrappedStream()).packetSize); - } - - fos.close(); - } - /** * @configuredWritePacketSize the configured WritePacketSize. * @finalWritePacketSize the final WritePacketSize picked by @@ -544,6 +510,45 @@ public void testExceptionInCloseWithoutRecoverLease() throws Exception { } } + @Test(timeout=60000) + public void testFirstPacketSizeInNewBlocks() throws IOException { + final long blockSize = (long) 1024 * 1024; + MiniDFSCluster dfsCluster = cluster; + DistributedFileSystem fs = dfsCluster.getFileSystem(); + Configuration dfsConf = fs.getConf(); + + EnumSet flags = EnumSet.of(CreateFlag.CREATE); + try(FSDataOutputStream fos = fs.create(new Path("/testfile.dat"), + FsPermission.getDefault(), + flags, 512, (short)3, blockSize, null)) { + + DataChecksum crc32c = DataChecksum.newDataChecksum( + DataChecksum.Type.CRC32C, 512); + + long loop = 0; + Random r = new Random(); + byte[] buf = new byte[(int) blockSize]; + r.nextBytes(buf); + fos.write(buf); + fos.hflush(); + + int chunkSize = crc32c.getBytesPerChecksum() + crc32c.getChecksumSize(); + int packetContentSize = (dfsConf.getInt(DFS_CLIENT_WRITE_PACKET_SIZE_KEY, + DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT) - + PacketHeader.PKT_MAX_HEADER_LEN) / chunkSize * chunkSize; + + while (loop < 20) { + r.nextBytes(buf); + fos.write(buf); + fos.hflush(); + loop++; + Assert.assertEquals(((DFSOutputStream) fos.getWrappedStream()).packetSize, + packetContentSize); + } + } + fs.delete(new Path("/testfile.dat"), true); + } + @AfterClass public static void tearDown() { if (cluster != null) {