diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java index 05ec2b18e95ef..de5e81843ecb1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java @@ -73,5 +73,9 @@ public void onCreateBlockReader(LocatedBlock block, int chunkIndex, long offset, public void failCreateBlockReader() throws InvalidBlockTokenException {} - public void failWhenReadWithStrategy(boolean isRetryRead) throws IOException {}; + public void failWhenReadWithStrategy(boolean isRetryRead) throws IOException {} + + public boolean mockEndBlockGroupInAdvance() { + return false; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java index 9c7ff64c6c8cd..3ab9219a3aee6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java @@ -73,6 +73,10 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_EC_WRITE_ALLOW_END_BLOCKGROUP_INADVANCE; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_EC_WRITE_ALLOW_END_BLOCKGROUP_INADVANCE_DEFAULT; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_EC_WRITE_MAX_END_BLOCKGROUP_INADVANCE_COUNT; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_EC_WRITE_MAX_END_BLOCKGROUP_INADVANCE_COUNT_DEFAULT; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Write.ECRedundancy.DFS_CLIENT_EC_WRITE_FAILED_BLOCKS_TOLERATED; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Write.ECRedundancy.DFS_CLIENT_EC_WRITE_FAILED_BLOCKS_TOLERATED_DEFAILT; @@ -287,6 +291,10 @@ private void flipDataBuffers() { private int blockGroupIndex; private long datanodeRestartTimeout; private final int failedBlocksTolerated; + private final int maxEndBlockGroupInAdvanceCount; + private int curEndBlockGroupInAdvanceCount; + private final boolean allowEndBlockGroupInAdvance; + private boolean endBlockGroupInAdvance; /** Construct a new output stream for creating a file. */ DFSStripedOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat, @@ -335,6 +343,12 @@ private void flipDataBuffers() { } failedBlocksTolerated = Math.min(failedBlocksToleratedTmp, ecPolicy.getNumParityUnits()); + allowEndBlockGroupInAdvance = dfsClient.getConfiguration().getBoolean( + DFS_CLIENT_EC_WRITE_ALLOW_END_BLOCKGROUP_INADVANCE, + DFS_CLIENT_EC_WRITE_ALLOW_END_BLOCKGROUP_INADVANCE_DEFAULT); + maxEndBlockGroupInAdvanceCount = dfsClient.getConfiguration().getInt( + DFS_CLIENT_EC_WRITE_MAX_END_BLOCKGROUP_INADVANCE_COUNT, + DFS_CLIENT_EC_WRITE_MAX_END_BLOCKGROUP_INADVANCE_COUNT_DEFAULT); } /** Construct a new output stream for appending to a file. */ @@ -424,6 +438,16 @@ private Set checkStreamers() throws IOException { return newFailed; } + private Set checkStreamersWithoutThrowException() { + Set newFailed = new HashSet<>(); + for(StripedDataStreamer s : streamers) { + if (!s.isHealthy() && !failedStreamers.contains(s)) { + newFailed.add(s); + } + } + return newFailed; + } + private void closeAllStreamers() { // The write has failed, Close all the streamers. for (StripedDataStreamer streamer : streamers) { @@ -559,6 +583,43 @@ private boolean shouldEndBlockGroup() { currentBlockGroup.getNumBytes() == blockSize * numDataBlocks; } + private boolean shouldEndBlockGroupInAdvance() { + if (!allowEndBlockGroupInAdvance || + curEndBlockGroupInAdvanceCount > maxEndBlockGroupInAdvanceCount) { + return false; + } + if (DFSClientFaultInjector.get().mockEndBlockGroupInAdvance()) { + LOG.info("Block group {} ends in advance.", currentBlockGroup); + this.endBlockGroupInAdvance = true; + curEndBlockGroupInAdvanceCount++; + return true; + } + + Set newFailed = checkStreamersWithoutThrowException(); + boolean overFailedStreamer = + failedStreamers.size() + newFailed.size() >= failedBlocksTolerated; + boolean stripeFull = currentBlockGroup.getNumBytes() > 0 && + currentBlockGroup.getNumBytes() % ((long) numDataBlocks * cellSize) == 0; + if (overFailedStreamer && stripeFull) { + LOG.info("Block group {} ends in advance.", currentBlockGroup); + this.endBlockGroupInAdvance = true; + curEndBlockGroupInAdvanceCount++; + return true; + } + return false; + } + + @Override + void endBlock() throws IOException { + if (getStreamer().getBytesCurBlock() == blockSize || getStreamer().isEndBlockFlag()) { + setCurrentPacketToEmpty(); + enqueueCurrentPacket(); + getStreamer().setBytesCurBlock(0); + getStreamer().setEndBlockFlag(false); + lastFlushOffset = 0; + } + } + @Override protected synchronized void writeChunk(byte[] bytes, int offset, int len, byte[] checksum, int ckoff, int cklen) throws IOException { @@ -566,8 +627,9 @@ protected synchronized void writeChunk(byte[] bytes, int offset, int len, final int pos = cellBuffers.addTo(index, bytes, offset, len); final boolean cellFull = pos == cellSize; - if (currentBlockGroup == null || shouldEndBlockGroup()) { - // the incoming data should belong to a new block. Allocate a new block. + if (currentBlockGroup == null || shouldEndBlockGroup() || endBlockGroupInAdvance) { + this.endBlockGroupInAdvance = false; + // The incoming data should belong to a new block. Allocate a new block. allocateNewBlock(); } @@ -596,13 +658,14 @@ protected synchronized void writeChunk(byte[] bytes, int offset, int len, next = 0; // if this is the end of the block group, end each internal block - if (shouldEndBlockGroup()) { + if (shouldEndBlockGroup() || shouldEndBlockGroupInAdvance()) { flushAllInternals(); checkStreamerFailures(false); for (int i = 0; i < numAllBlocks; i++) { final StripedDataStreamer s = setCurrentStreamer(i); if (s.isHealthy()) { try { + getStreamer().setEndBlockFlag(true); endBlock(); } catch (IOException ignored) {} } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java index 8d13640eadb18..69e31bb80b109 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java @@ -539,6 +539,7 @@ boolean doWaitForRestart() { protected final LoadingCache excludedNodes; private final String[] favoredNodes; private final EnumSet addBlockFlags; + private volatile boolean endBlockFlag = false; private DataStreamer(HdfsFileStatus stat, ExtendedBlock block, DFSClient dfsClient, String src, @@ -2285,4 +2286,12 @@ public String toString() { return extendedBlock == null ? "block==null" : "" + extendedBlock.getLocalBlock(); } + + public boolean isEndBlockFlag() { + return endBlockFlag; + } + + public void setEndBlockFlag(boolean endBlockFlag) { + this.endBlockFlag = endBlockFlag; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java index 4a32e7362d5f3..ce3df54becdd0 100755 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java @@ -296,6 +296,14 @@ public interface HdfsClientConfigKeys { int DFS_CLIENT_CONGESTION_BACKOFF_MAX_TIME_DEFAULT = DFS_CLIENT_CONGESTION_BACKOFF_MEAN_TIME_DEFAULT * 10; + String DFS_CLIENT_EC_WRITE_ALLOW_END_BLOCKGROUP_INADVANCE = + "dfs.client.ec.write.allow.end.blockgroup.inadvance"; + boolean DFS_CLIENT_EC_WRITE_ALLOW_END_BLOCKGROUP_INADVANCE_DEFAULT = false; + + String DFS_CLIENT_EC_WRITE_MAX_END_BLOCKGROUP_INADVANCE_COUNT = + "dfs.client.ec.write.max.end.blockgroup.inadvance.count"; + int DFS_CLIENT_EC_WRITE_MAX_END_BLOCKGROUP_INADVANCE_COUNT_DEFAULT = 10; + /** * These are deprecated config keys to client code. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index b9d8b67dc122a..359cd2214eccd 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -6701,4 +6701,19 @@ Enables observer reads for clients. This should only be enabled when clients are using routers. + + dfs.client.ec.write.allow.end.blockgroup.inadvance + false + + Whether allow client ends non-full block group in advance or not. + + + + dfs.client.ec.write.max.end.blockgroup.inadvance.count + 10 + + Max count of ending block group in advance allowed. + This prevents the creation of large amounts of metadata in NameNode. + + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java index 791a1afc01f79..df6faf3d5c096 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java @@ -17,7 +17,9 @@ */ package org.apache.hadoop.hdfs; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_EC_WRITE_ALLOW_END_BLOCKGROUP_INADVANCE; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Write.RECOVER_LEASE_ON_CLOSE_EXCEPTION_KEY; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; @@ -28,10 +30,13 @@ import java.io.InputStream; import java.util.ArrayList; import java.util.EnumSet; +import java.util.List; import java.util.concurrent.TimeoutException; import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -96,6 +101,7 @@ public void setup() throws IOException { conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_KEY, false); conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 0); + conf.setBoolean(DFS_CLIENT_EC_WRITE_ALLOW_END_BLOCKGROUP_INADVANCE, true); if (ErasureCodeNative.isNativeCodeLoaded()) { conf.set( CodecUtil.IO_ERASURECODE_CODEC_RS_RAWCODERS_KEY, @@ -190,6 +196,32 @@ public void testFileMoreThanABlockGroup3() throws Exception { + cellSize + 123); } + @Test + public void testEndBlockGroupInadvance() throws Exception { + DFSClientFaultInjector old = DFSClientFaultInjector.get(); + String src = "/testEndBlockGroupInadvance"; + Path testPath = new Path(src); + try { + DFSClientFaultInjector.set(new DFSClientFaultInjector() { + @Override + public boolean mockEndBlockGroupInAdvance() { + return true; + } + }); + byte[] bytes = StripedFileTestUtil.generateBytes(2 * cellSize * dataBlocks + 123); + DFSTestUtil.writeFile(fs, testPath, new String(bytes)); + StripedFileTestUtil.waitBlockGroupsReported(fs, src); + StripedFileTestUtil.verifyLength(fs, testPath, bytes.length); + List> blockGroupList = new ArrayList<>(); + LocatedBlocks lbs = fs.getClient().getLocatedBlocks(testPath.toString(), 0L, + Long.MAX_VALUE); + assertEquals(3, lbs.getLocatedBlocks().size()); + } finally { + DFSClientFaultInjector.set(old); + } + } + + /** * {@link DFSStripedOutputStream} doesn't support hflush() or hsync() yet. * This test is to make sure that DFSStripedOutputStream doesn't throw any