Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,5 +73,9 @@ public void onCreateBlockReader(LocatedBlock block, int chunkIndex, long offset,

public void failCreateBlockReader() throws InvalidBlockTokenException {}

public void failWhenReadWithStrategy(boolean isRetryRead) throws IOException {};
public void failWhenReadWithStrategy(boolean isRetryRead) throws IOException {}

public boolean mockEndBlockGroupInAdvance() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_EC_WRITE_ALLOW_END_BLOCKGROUP_INADVANCE;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_EC_WRITE_ALLOW_END_BLOCKGROUP_INADVANCE_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_EC_WRITE_MAX_END_BLOCKGROUP_INADVANCE_COUNT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_EC_WRITE_MAX_END_BLOCKGROUP_INADVANCE_COUNT_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Write.ECRedundancy.DFS_CLIENT_EC_WRITE_FAILED_BLOCKS_TOLERATED;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Write.ECRedundancy.DFS_CLIENT_EC_WRITE_FAILED_BLOCKS_TOLERATED_DEFAILT;

Expand Down Expand Up @@ -287,6 +291,10 @@ private void flipDataBuffers() {
private int blockGroupIndex;
private long datanodeRestartTimeout;
private final int failedBlocksTolerated;
private final int maxEndBlockGroupInAdvanceCount;
private int curEndBlockGroupInAdvanceCount;
private final boolean allowEndBlockGroupInAdvance;
private boolean endBlockGroupInAdvance;

/** Construct a new output stream for creating a file. */
DFSStripedOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat,
Expand Down Expand Up @@ -335,6 +343,12 @@ private void flipDataBuffers() {
}
failedBlocksTolerated = Math.min(failedBlocksToleratedTmp,
ecPolicy.getNumParityUnits());
allowEndBlockGroupInAdvance = dfsClient.getConfiguration().getBoolean(
DFS_CLIENT_EC_WRITE_ALLOW_END_BLOCKGROUP_INADVANCE,
DFS_CLIENT_EC_WRITE_ALLOW_END_BLOCKGROUP_INADVANCE_DEFAULT);
maxEndBlockGroupInAdvanceCount = dfsClient.getConfiguration().getInt(
DFS_CLIENT_EC_WRITE_MAX_END_BLOCKGROUP_INADVANCE_COUNT,
DFS_CLIENT_EC_WRITE_MAX_END_BLOCKGROUP_INADVANCE_COUNT_DEFAULT);
}

/** Construct a new output stream for appending to a file. */
Expand Down Expand Up @@ -424,6 +438,16 @@ private Set<StripedDataStreamer> checkStreamers() throws IOException {
return newFailed;
}

private Set<StripedDataStreamer> checkStreamersWithoutThrowException() {
Set<StripedDataStreamer> newFailed = new HashSet<>();
for(StripedDataStreamer s : streamers) {
if (!s.isHealthy() && !failedStreamers.contains(s)) {
newFailed.add(s);
}
}
return newFailed;
}

private void closeAllStreamers() {
// The write has failed, Close all the streamers.
for (StripedDataStreamer streamer : streamers) {
Expand Down Expand Up @@ -559,15 +583,53 @@ private boolean shouldEndBlockGroup() {
currentBlockGroup.getNumBytes() == blockSize * numDataBlocks;
}

private boolean shouldEndBlockGroupInAdvance() {
if (!allowEndBlockGroupInAdvance ||
curEndBlockGroupInAdvanceCount > maxEndBlockGroupInAdvanceCount) {
return false;
}
if (DFSClientFaultInjector.get().mockEndBlockGroupInAdvance()) {
LOG.info("Block group {} ends in advance.", currentBlockGroup);
this.endBlockGroupInAdvance = true;
curEndBlockGroupInAdvanceCount++;
return true;
}

Set<StripedDataStreamer> newFailed = checkStreamersWithoutThrowException();
boolean overFailedStreamer =
failedStreamers.size() + newFailed.size() >= failedBlocksTolerated;
boolean stripeFull = currentBlockGroup.getNumBytes() > 0 &&
currentBlockGroup.getNumBytes() % ((long) numDataBlocks * cellSize) == 0;
if (overFailedStreamer && stripeFull) {
LOG.info("Block group {} ends in advance.", currentBlockGroup);
this.endBlockGroupInAdvance = true;
curEndBlockGroupInAdvanceCount++;
return true;
}
return false;
}

@Override
void endBlock() throws IOException {
if (getStreamer().getBytesCurBlock() == blockSize || getStreamer().isEndBlockFlag()) {
setCurrentPacketToEmpty();
enqueueCurrentPacket();
getStreamer().setBytesCurBlock(0);
getStreamer().setEndBlockFlag(false);
lastFlushOffset = 0;
}
}

@Override
protected synchronized void writeChunk(byte[] bytes, int offset, int len,
byte[] checksum, int ckoff, int cklen) throws IOException {
final int index = getCurrentIndex();
final int pos = cellBuffers.addTo(index, bytes, offset, len);
final boolean cellFull = pos == cellSize;

if (currentBlockGroup == null || shouldEndBlockGroup()) {
// the incoming data should belong to a new block. Allocate a new block.
if (currentBlockGroup == null || shouldEndBlockGroup() || endBlockGroupInAdvance) {
this.endBlockGroupInAdvance = false;
// The incoming data should belong to a new block. Allocate a new block.
allocateNewBlock();
}

Expand Down Expand Up @@ -596,13 +658,14 @@ protected synchronized void writeChunk(byte[] bytes, int offset, int len,
next = 0;

// if this is the end of the block group, end each internal block
if (shouldEndBlockGroup()) {
if (shouldEndBlockGroup() || shouldEndBlockGroupInAdvance()) {
flushAllInternals();
checkStreamerFailures(false);
for (int i = 0; i < numAllBlocks; i++) {
final StripedDataStreamer s = setCurrentStreamer(i);
if (s.isHealthy()) {
try {
getStreamer().setEndBlockFlag(true);
endBlock();
} catch (IOException ignored) {}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,7 @@ boolean doWaitForRestart() {
protected final LoadingCache<DatanodeInfo, DatanodeInfo> excludedNodes;
private final String[] favoredNodes;
private final EnumSet<AddBlockFlag> addBlockFlags;
private volatile boolean endBlockFlag = false;

private DataStreamer(HdfsFileStatus stat, ExtendedBlock block,
DFSClient dfsClient, String src,
Expand Down Expand Up @@ -2285,4 +2286,12 @@ public String toString() {
return extendedBlock == null ?
"block==null" : "" + extendedBlock.getLocalBlock();
}

public boolean isEndBlockFlag() {
return endBlockFlag;
}

public void setEndBlockFlag(boolean endBlockFlag) {
this.endBlockFlag = endBlockFlag;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,14 @@ public interface HdfsClientConfigKeys {
int DFS_CLIENT_CONGESTION_BACKOFF_MAX_TIME_DEFAULT =
DFS_CLIENT_CONGESTION_BACKOFF_MEAN_TIME_DEFAULT * 10;

String DFS_CLIENT_EC_WRITE_ALLOW_END_BLOCKGROUP_INADVANCE =
"dfs.client.ec.write.allow.end.blockgroup.inadvance";
boolean DFS_CLIENT_EC_WRITE_ALLOW_END_BLOCKGROUP_INADVANCE_DEFAULT = false;

String DFS_CLIENT_EC_WRITE_MAX_END_BLOCKGROUP_INADVANCE_COUNT =
"dfs.client.ec.write.max.end.blockgroup.inadvance.count";
int DFS_CLIENT_EC_WRITE_MAX_END_BLOCKGROUP_INADVANCE_COUNT_DEFAULT = 10;

/**
* These are deprecated config keys to client code.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6701,4 +6701,19 @@
Enables observer reads for clients. This should only be enabled when clients are using routers.
</description>
</property>
<property>
<name>dfs.client.ec.write.allow.end.blockgroup.inadvance</name>
<value>false</value>
<description>
Whether allow client ends non-full block group in advance or not.
</description>
</property>
<property>
<name>dfs.client.ec.write.max.end.blockgroup.inadvance.count</name>
<value>10</value>
<description>
Max count of ending block group in advance allowed.
This prevents the creation of large amounts of metadata in NameNode.
</description>
</property>
</configuration>
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
*/
package org.apache.hadoop.hdfs;

import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_EC_WRITE_ALLOW_END_BLOCKGROUP_INADVANCE;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Write.RECOVER_LEASE_ON_CLOSE_EXCEPTION_KEY;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
Expand All @@ -28,10 +30,13 @@
import java.io.InputStream;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import java.util.concurrent.TimeoutException;

import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -96,6 +101,7 @@ public void setup() throws IOException {
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_KEY,
false);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 0);
conf.setBoolean(DFS_CLIENT_EC_WRITE_ALLOW_END_BLOCKGROUP_INADVANCE, true);
if (ErasureCodeNative.isNativeCodeLoaded()) {
conf.set(
CodecUtil.IO_ERASURECODE_CODEC_RS_RAWCODERS_KEY,
Expand Down Expand Up @@ -190,6 +196,32 @@ public void testFileMoreThanABlockGroup3() throws Exception {
+ cellSize + 123);
}

@Test
public void testEndBlockGroupInadvance() throws Exception {
DFSClientFaultInjector old = DFSClientFaultInjector.get();
String src = "/testEndBlockGroupInadvance";
Path testPath = new Path(src);
try {
DFSClientFaultInjector.set(new DFSClientFaultInjector() {
@Override
public boolean mockEndBlockGroupInAdvance() {
return true;
}
});
byte[] bytes = StripedFileTestUtil.generateBytes(2 * cellSize * dataBlocks + 123);
DFSTestUtil.writeFile(fs, testPath, new String(bytes));
StripedFileTestUtil.waitBlockGroupsReported(fs, src);
StripedFileTestUtil.verifyLength(fs, testPath, bytes.length);
List<List<LocatedBlock>> blockGroupList = new ArrayList<>();
LocatedBlocks lbs = fs.getClient().getLocatedBlocks(testPath.toString(), 0L,
Long.MAX_VALUE);
assertEquals(3, lbs.getLocatedBlocks().size());
} finally {
DFSClientFaultInjector.set(old);
}
}


/**
* {@link DFSStripedOutputStream} doesn't support hflush() or hsync() yet.
* This test is to make sure that DFSStripedOutputStream doesn't throw any
Expand Down