Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
15d803b
HDDS-9915. [hsync] Interface to retrieve block info and finalize bloc…
ashishkumar50 Dec 27, 2023
b16087a
HDDS-9638. [hsync] File recovery support in OM (#5847)
ChenSammi Jan 9, 2024
a281b93
HDDS-10093. Make recoverLease call idempotent (#5958)
ChenSammi Jan 10, 2024
7224393
HDDS-9750. [hsync] Make Putblock performance acceptable - Skeleton co…
jojochuang Jan 11, 2024
1c20d84
HDDS-10104. [hsync]Introduce soft limit support for lease recovery. (…
ashishkumar50 Jan 16, 2024
082d759
HDDS-8830. Add admin CLI to list open files (#5920)
smengcl Jan 19, 2024
04b6aa5
HDDS-10044. [hsync] File recovery support in Client (#5978)
ChenSammi Jan 19, 2024
305a176
HDDS-9751. [hsync] Make Putblock performance acceptable - DataNode si…
jojochuang Jan 23, 2024
54a75ca
HDDS-9387. [hsync] Reduce updating block length times at OM during hs…
ChenSammi Jan 25, 2024
2e2d08e
HDDS-10141. [hsync] Support hard limit and auto recovery for hsync fi…
ashishkumar50 Jan 25, 2024
f5f737d
HDDS-10190. [hsync] Handle lease recovery for file without blocks. (#…
ashishkumar50 Jan 26, 2024
2651d4c
HDDS-9930. Remove open keys as well when keys are deleted from KeyTab…
smengcl Jan 26, 2024
b532f81
HDDS-10077. Add hsync metadata to hsync'ed keys in OpenKeyTable as we…
smengcl Jan 30, 2024
f065781
HDDS-10147. Migrate TestFinalizeBlock to JUnit5. (#6133)
ashishkumar50 Jan 31, 2024
76a573a
HDDS-9752. [hsync] Make Putblock performance acceptable - Client side…
jojochuang Feb 7, 2024
c1d7b43
Merge remote-tracking branch 'asf/master' into HEAD
jojochuang Feb 7, 2024
1f028f0
Checkstyle
jojochuang Feb 7, 2024
8aa8a36
Force testIgnoreExpiredRecoverhsyncKeys() to run first so that the as…
jojochuang Feb 7, 2024
370b9d7
HDDS-10256. Retry block allocation when SCM is in safe mode. (#6189)
ashishkumar50 Feb 10, 2024
0a1c5d4
HDDS-10242. [hsync] Handle penultimate block finalization. (#6164)
ashishkumar50 Feb 10, 2024
463a09b
HDDS-9884. Pass DatanodeVersion to the client (#6155)
smengcl Feb 15, 2024
9b13435
HDDS-10361. [hsync] Output stream should support direct byte buffer. …
jojochuang Feb 26, 2024
d3c5cce
Merge remote-tracking branch 'asf/master' into HDDS-7593
smengcl Feb 27, 2024
1cf9e95
HDDS-10427. Retry read wait based on policy. (#6292)
ashishkumar50 Mar 4, 2024
f244a67
HDDS-10252. [hsync] Revisit configuration keys for incremental chunk …
jojochuang Mar 4, 2024
be5ffc2
HDDS-10471. [hsync] MockDatanodeStorage.writeChunk should make a copy…
jojochuang Mar 6, 2024
1d81c32
Merge remote-tracking branch 'asf/HDDS-7593' into HDDS-7593
smengcl Mar 6, 2024
3cadb24
Merge remote-tracking branch 'asf/master' into HDDS-7593
smengcl Mar 7, 2024
dd5b49b
HDDS-10497. [hsync] Refresh block token immediately if block token ex…
jojochuang Mar 14, 2024
825c340
HDDS-10511. Support ByteBufferPositionedReadable in OzoneFSInputStrea…
ashishkumar50 Mar 15, 2024
5448ebc
Merge remote-tracking branch 'asf/master' into HDDS-7593
smengcl Mar 22, 2024
509c970
HDDS-10442. [hsync] Add a Freon tool to measure client to DataNode ro…
jojochuang Mar 27, 2024
3fe5cde
Merge remote-tracking branch 'asf/HDDS-7593' into HDDS-7593-master-me…
smengcl Mar 27, 2024
91e5d2e
HDDS-9130. [hsync] Combine WriteData and PutBlock requests into one (…
jojochuang Apr 1, 2024
6a4e699
HDDS-10564. Make Outputstream writeExecutor daemon threads. (#6418)
jojochuang Mar 24, 2024
6cfe9cf
HDDS-10626. [LeaseRecovery] OM shuts down with 'SecretKey client must…
ChenSammi Apr 3, 2024
d4314c9
HDDS-10591. [hsync] improve block token refresh message. (#6444)
jojochuang Apr 5, 2024
f4d7716
Merge remote-tracking branch 'asf/master' into HEAD
jojochuang Apr 10, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,23 @@ public enum ChecksumCombineMode {
tags = ConfigTag.CLIENT)
private String fsDefaultBucketLayout = "FILE_SYSTEM_OPTIMIZED";

@Config(key = "incremental.chunk.list",
defaultValue = "false",
type = ConfigType.BOOLEAN,
description = "Client PutBlock request can choose incremental chunk " +
"list rather than full chunk list to optimize performance. " +
"Critical to HBase.",
tags = ConfigTag.CLIENT)
private boolean incrementalChunkList = true;

@Config(key = "stream.putblock.piggybacking",
defaultValue = "false",
type = ConfigType.BOOLEAN,
description = "Allow PutBlock to be piggybacked in WriteChunk " +
"requests if the chunk is small.",
tags = ConfigTag.CLIENT)
private boolean enablePutblockPiggybacking = false;

@PostConstruct
public void validate() {
Preconditions.checkState(streamBufferSize > 0);
Expand Down Expand Up @@ -445,11 +462,27 @@ public String getFsDefaultBucketLayout() {
return fsDefaultBucketLayout;
}

public void setEnablePutblockPiggybacking(boolean enablePutblockPiggybacking) {
this.enablePutblockPiggybacking = enablePutblockPiggybacking;
}

public boolean getEnablePutblockPiggybacking() {
return enablePutblockPiggybacking;
}

public boolean isDatastreamPipelineMode() {
return datastreamPipelineMode;
}

public void setDatastreamPipelineMode(boolean datastreamPipelineMode) {
this.datastreamPipelineMode = datastreamPipelineMode;
}

public void setIncrementalChunkList(boolean enable) {
this.incrementalChunkList = enable;
}

public boolean getIncrementalChunkList() {
return this.incrementalChunkList;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ private XceiverClientReply sendCommandWithRetry(
LOG.debug(message + " on the pipeline {}.",
processForDebug(request), pipeline);
} else {
LOG.error(message + " on the pipeline {}.",
LOG.warn(message + " on the pipeline {}.",
request.getCmdType(), pipeline);
}
throw ioException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
Expand All @@ -32,6 +33,7 @@
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.BlockData;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.DatanodeBlockID;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.GetBlockResponseProto;
Expand Down Expand Up @@ -63,11 +65,12 @@
*/
public class BlockInputStream extends BlockExtendedInputStream {

private static final Logger LOG =
public static final Logger LOG =
LoggerFactory.getLogger(BlockInputStream.class);

private final BlockID blockID;
private final long length;
private long length;
private final BlockLocationInfo blockInfo;
private final AtomicReference<Pipeline> pipelineRef =
new AtomicReference<>();
private final AtomicReference<Token<OzoneBlockTokenIdentifier>> tokenRef =
Expand Down Expand Up @@ -112,13 +115,16 @@ public class BlockInputStream extends BlockExtendedInputStream {

private final Function<BlockID, BlockLocationInfo> refreshFunction;

public BlockInputStream(BlockID blockId, long blockLen, Pipeline pipeline,
public BlockInputStream(
BlockLocationInfo blockInfo,
Pipeline pipeline,
Token<OzoneBlockTokenIdentifier> token,
XceiverClientFactory xceiverClientFactory,
Function<BlockID, BlockLocationInfo> refreshFunction,
OzoneClientConfig config) {
this.blockID = blockId;
this.length = blockLen;
this.blockInfo = blockInfo;
this.blockID = blockInfo.getBlockID();
this.length = blockInfo.getLength();
setPipeline(pipeline);
tokenRef.set(token);
this.verifyChecksum = config.isChecksumVerify();
Expand All @@ -129,14 +135,16 @@ public BlockInputStream(BlockID blockId, long blockLen, Pipeline pipeline,
TimeUnit.SECONDS.toMillis(config.getReadRetryInterval()));
}

// only for unit tests
public BlockInputStream(BlockID blockId, long blockLen, Pipeline pipeline,
Token<OzoneBlockTokenIdentifier> token,
XceiverClientFactory xceiverClientFactory,
OzoneClientConfig config
) {
this(blockId, blockLen, pipeline, token,
xceiverClientFactory, null, config);
this(new BlockLocationInfo(new BlockLocationInfo.Builder().setBlockID(blockId).setLength(blockLen)),
pipeline, token, xceiverClientFactory, null, config);
}

/**
* Initialize the BlockInputStream. Get the BlockData (list of chunks) from
* the Container and create the ChunkInputStreams for each Chunk in the Block.
Expand All @@ -148,11 +156,17 @@ public synchronized void initialize() throws IOException {
return;
}

BlockData blockData = null;
List<ChunkInfo> chunks = null;
IOException catchEx = null;
do {
try {
chunks = getChunkInfoList();
blockData = getBlockData();
chunks = blockData.getChunksList();
if (blockInfo != null && blockInfo.isUnderConstruction()) {
// use the block length from DN if block is under construction.
length = blockData.getSize();
}
break;
// If we get a StorageContainerException or an IOException due to
// datanodes are not reachable, refresh to get the latest pipeline
Expand Down Expand Up @@ -211,18 +225,25 @@ private boolean isConnectivityIssue(IOException ex) {
}

private void refreshBlockInfo(IOException cause) throws IOException {
LOG.info("Unable to read information for block {} from pipeline {}: {}",
LOG.info("Attempting to update pipeline and block token for block {} from pipeline {}: {}",
blockID, pipelineRef.get().getId(), cause.getMessage());
if (refreshFunction != null) {
LOG.debug("Re-fetching pipeline and block token for block {}", blockID);
BlockLocationInfo blockLocationInfo = refreshFunction.apply(blockID);
if (blockLocationInfo == null) {
LOG.debug("No new block location info for block {}", blockID);
LOG.warn("No new block location info for block {}", blockID);
} else {
LOG.debug("New pipeline for block {}: {}", blockID,
blockLocationInfo.getPipeline());
setPipeline(blockLocationInfo.getPipeline());
LOG.info("New pipeline for block {}: {}", blockID,
blockLocationInfo.getPipeline());

tokenRef.set(blockLocationInfo.getToken());
if (blockLocationInfo.getToken() != null) {
OzoneBlockTokenIdentifier tokenId = new OzoneBlockTokenIdentifier();
tokenId.readFromByteArray(tokenRef.get().getIdentifier());
LOG.info("A new token is added for block {}. Expiry: {}",
blockID, Instant.ofEpochMilli(tokenId.getExpiryDate()));
}
}
} else {
throw cause;
Expand All @@ -231,24 +252,27 @@ private void refreshBlockInfo(IOException cause) throws IOException {

/**
* Send RPC call to get the block info from the container.
* @return List of chunks in this block.
* @return BlockData.
*/
protected List<ChunkInfo> getChunkInfoList() throws IOException {
protected BlockData getBlockData() throws IOException {
acquireClient();
try {
return getChunkInfoListUsingClient();
return getBlockDataUsingClient();
} finally {
releaseClient();
}
}

@VisibleForTesting
protected List<ChunkInfo> getChunkInfoListUsingClient() throws IOException {
/**
* Send RPC call to get the block info from the container.
* @return BlockData.
*/
protected BlockData getBlockDataUsingClient() throws IOException {
final Pipeline pipeline = xceiverClient.getPipeline();

if (LOG.isDebugEnabled()) {
LOG.debug("Initializing BlockInputStream for get key to access {}",
blockID.getContainerID());
LOG.debug("Initializing BlockInputStream for get key to access block {}",
blockID);
}

DatanodeBlockID.Builder blkIDBuilder =
Expand All @@ -263,8 +287,7 @@ protected List<ChunkInfo> getChunkInfoListUsingClient() throws IOException {

GetBlockResponseProto response = ContainerProtocolCalls.getBlock(
xceiverClient, VALIDATORS, blkIDBuilder.build(), tokenRef.get());

return response.getBlockData().getChunksList();
return response.getBlockData();
}

private void setPipeline(Pipeline pipeline) {
Expand Down Expand Up @@ -566,7 +589,20 @@ private boolean shouldRetryRead(IOException cause) throws IOException {
} catch (Exception e) {
throw new IOException(e);
}
return retryAction.action == RetryPolicy.RetryAction.RetryDecision.RETRY;
if (retryAction.action == RetryPolicy.RetryAction.RetryDecision.RETRY) {
if (retryAction.delayMillis > 0) {
try {
LOG.debug("Retry read after {}ms", retryAction.delayMillis);
Thread.sleep(retryAction.delayMillis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
String msg = "Interrupted: action=" + retryAction.action + ", retry policy=" + retryPolicy;
throw new IOException(msg, e);
}
}
return true;
}
return false;
}

private void handleReadError(IOException cause) throws IOException {
Expand Down
Loading