Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.BlockData;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.DatanodeBlockID;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.GetBlockResponseProto;
Expand Down Expand Up @@ -66,7 +67,8 @@ public class BlockInputStream extends BlockExtendedInputStream {
LoggerFactory.getLogger(BlockInputStream.class);

private final BlockID blockID;
private final long length;
private long length;
private final BlockLocationInfo blockInfo;
private final AtomicReference<Pipeline> pipelineRef =
new AtomicReference<>();
private final AtomicReference<Token<OzoneBlockTokenIdentifier>> tokenRef =
Expand Down Expand Up @@ -111,27 +113,30 @@ public class BlockInputStream extends BlockExtendedInputStream {

private final Function<BlockID, BlockLocationInfo> refreshFunction;

public BlockInputStream(BlockID blockId, long blockLen, Pipeline pipeline,
public BlockInputStream(BlockLocationInfo blockInfo, Pipeline pipeline,
Token<OzoneBlockTokenIdentifier> token, boolean verifyChecksum,
XceiverClientFactory xceiverClientFactory,
Function<BlockID, BlockLocationInfo> refreshFunction) {
this.blockID = blockId;
this.length = blockLen;
this.blockInfo = blockInfo;
this.blockID = blockInfo.getBlockID();
this.length = blockInfo.getLength();
setPipeline(pipeline);
tokenRef.set(token);
this.verifyChecksum = verifyChecksum;
this.xceiverClientFactory = xceiverClientFactory;
this.refreshFunction = refreshFunction;
}

// only for unit tests
public BlockInputStream(BlockID blockId, long blockLen, Pipeline pipeline,
Token<OzoneBlockTokenIdentifier> token,
boolean verifyChecksum,
XceiverClientFactory xceiverClientFactory
) {
this(blockId, blockLen, pipeline, token, verifyChecksum,
XceiverClientFactory xceiverClientFactory) {
this(new BlockLocationInfo(new BlockLocationInfo.Builder().setBlockID(blockId).setLength(blockLen)),
pipeline, token, verifyChecksum,
xceiverClientFactory, null);
}

/**
* Initialize the BlockInputStream. Get the BlockData (list of chunks) from
* the Container and create the ChunkInputStreams for each Chunk in the Block.
Expand All @@ -143,11 +148,17 @@ public synchronized void initialize() throws IOException {
return;
}

BlockData blockData = null;
List<ChunkInfo> chunks = null;
IOException catchEx = null;
do {
try {
chunks = getChunkInfoList();
blockData = getBlockData();
chunks = blockData.getChunksList();
if (blockInfo != null && blockInfo.isUnderConstruction()) {
// use the block length from DN if block is under construction.
length = blockData.getSize();
}
break;
// If we get a StorageContainerException or an IOException due to
// datanodes are not reachable, refresh to get the latest pipeline
Expand Down Expand Up @@ -226,19 +237,22 @@ private void refreshBlockInfo(IOException cause) throws IOException {

/**
* Send RPC call to get the block info from the container.
* @return List of chunks in this block.
* @return BlockData.
*/
protected List<ChunkInfo> getChunkInfoList() throws IOException {
protected BlockData getBlockData() throws IOException {
acquireClient();
try {
return getChunkInfoListUsingClient();
return getBlockDataUsingClient();
} finally {
releaseClient();
}
}

@VisibleForTesting
protected List<ChunkInfo> getChunkInfoListUsingClient() throws IOException {
/**
* Send RPC call to get the block info from the container.
* @return BlockData.
*/
protected BlockData getBlockDataUsingClient() throws IOException {
final Pipeline pipeline = xceiverClient.getPipeline();

if (LOG.isDebugEnabled()) {
Expand All @@ -258,8 +272,7 @@ protected List<ChunkInfo> getChunkInfoListUsingClient() throws IOException {

GetBlockResponseProto response = ContainerProtocolCalls.getBlock(
xceiverClient, VALIDATORS, blkIDBuilder.build(), tokenRef.get());

return response.getBlockData().getChunksList();
return response.getBlockData();
}

private void setPipeline(Pipeline pipeline) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ public BlockExtendedInputStream create(ReplicationConfig repConfig,
blockInfo, verifyChecksum, xceiverFactory, refreshFunction,
ecBlockStreamFactory);
} else {
return new BlockInputStream(blockInfo.getBlockID(), blockInfo.getLength(),
pipeline, token, verifyChecksum, xceiverFactory, refreshFunction);
return new BlockInputStream(blockInfo, pipeline, token, verifyChecksum, xceiverFactory,
refreshFunction);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.function.Function;

import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
Expand All @@ -49,16 +50,19 @@ class DummyBlockInputStream extends BlockInputStream {
Function<BlockID, BlockLocationInfo> refreshFunction,
List<ChunkInfo> chunkList,
Map<String, byte[]> chunks) {
super(blockId, blockLen, pipeline, token, verifyChecksum,
super(new BlockLocationInfo(new BlockLocationInfo.Builder().setBlockID(blockId).setLength(blockLen)),
pipeline, token, verifyChecksum,
xceiverClientManager, refreshFunction);
this.chunkDataMap = chunks;
this.chunks = chunkList;

}

@Override
protected List<ChunkInfo> getChunkInfoList() throws IOException {
return chunks;
protected ContainerProtos.BlockData getBlockData() throws IOException {
BlockID blockID = getBlockID();
ContainerProtos.DatanodeBlockID datanodeBlockID = blockID.getDatanodeBlockIDProtobuf();
return ContainerProtos.BlockData.newBuilder().addAllChunks(chunks).setBlockID(datanodeBlockID).build();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
Expand Down Expand Up @@ -73,16 +74,16 @@ final class DummyBlockInputStreamWithRetry
}

@Override
protected List<ChunkInfo> getChunkInfoList() throws IOException {
protected ContainerProtos.BlockData getBlockData() throws IOException {
if (getChunkInfoCount == 0) {
getChunkInfoCount++;
if (ioException != null) {
throw ioException;
throw ioException;
}
throw new StorageContainerException("Exception encountered",
CONTAINER_NOT_FOUND);
} else {
return super.getChunkInfoList();
return super.getBlockData();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.ContainerBlockID;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
Expand Down Expand Up @@ -409,16 +410,19 @@ public void testRefreshOnReadFailureAfterUnbuffer(IOException ex)
.thenReturn(blockLocationInfo);
when(blockLocationInfo.getPipeline()).thenReturn(newPipeline);

BlockInputStream subject = new BlockInputStream(blockID, blockSize,
BlockInputStream subject = new BlockInputStream(
new BlockLocationInfo(new BlockLocationInfo.Builder().setBlockID(blockID).setLength(blockSize)),
pipeline, null, false, clientFactory, refreshFunction) {
@Override
protected List<ChunkInfo> getChunkInfoListUsingClient() {
return chunks;
protected ChunkInputStream createChunkInputStream(ChunkInfo chunkInfo) {
return stream;
}

@Override
protected ChunkInputStream createChunkInputStream(ChunkInfo chunkInfo) {
return stream;
protected ContainerProtos.BlockData getBlockDataUsingClient() throws IOException {
BlockID blockID = getBlockID();
ContainerProtos.DatanodeBlockID datanodeBlockID = blockID.getDatanodeBlockIDProtobuf();
return ContainerProtos.BlockData.newBuilder().addAllChunks(chunks).setBlockID(datanodeBlockID).build();
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ public class BlockLocationInfo {

// PartNumber is set for Multipart upload Keys.
private int partNumber;
// The block is under construction. Apply to hsynced file last block.
private boolean underConstruction;

protected BlockLocationInfo(Builder builder) {
this.blockID = builder.blockID;
Expand Down Expand Up @@ -111,6 +113,14 @@ public int getPartNumber() {
return partNumber;
}

public void setUnderConstruction(boolean uc) {
this.underConstruction = uc;
}

public boolean isUnderConstruction() {
return this.underConstruction;
}

/**
* Builder of BlockLocationInfo.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.ListIterator;
import java.util.Map;

import org.apache.hadoop.hdds.client.ContainerBlockID;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.scm.ByteStringConversion;
import org.apache.hadoop.hdds.scm.ContainerClientMetrics;
Expand Down Expand Up @@ -85,6 +86,8 @@ public class BlockOutputStreamEntryPool implements KeyMetadataAware {
private final ExcludeList excludeList;
private final ContainerClientMetrics clientMetrics;
private final StreamBufferArgs streamBufferArgs;
// update blocks on OM
private ContainerBlockID lastUpdatedBlockId = new ContainerBlockID(-1, -1);

@SuppressWarnings({"parameternumber", "squid:S00107"})
public BlockOutputStreamEntryPool(
Expand Down Expand Up @@ -368,7 +371,16 @@ void hsyncKey(long offset) throws IOException {
if (keyArgs.getIsMultipartKey()) {
throw new IOException("Hsync is unsupported for multipart keys.");
} else {
omClient.hsyncKey(keyArgs, openID);
if (keyArgs.getLocationInfoList().size() == 0) {
omClient.hsyncKey(keyArgs, openID);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we want to hsync without any block info? Or can we return error in this case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hsync is called right after file is created. See TestHSync#testUncommittedBlocks.

} else {
ContainerBlockID lastBLockId = keyArgs.getLocationInfoList().get(keyArgs.getLocationInfoList().size() - 1)
.getBlockID().getContainerBlockID();
if (!lastUpdatedBlockId.equals(lastBLockId)) {
omClient.hsyncKey(keyArgs, openID);
lastUpdatedBlockId = lastBLockId;
}
}
}
} else {
LOG.warn("Closing KeyOutputStream, but key args is null");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.hadoop.hdds.scm.storage.ByteReaderStrategy;
import org.apache.hadoop.hdds.scm.storage.MultipartInputStream;
import org.apache.hadoop.hdds.scm.storage.PartInputStream;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;

Expand Down Expand Up @@ -61,8 +62,10 @@ private static List<BlockExtendedInputStream> createStreams(
boolean verifyChecksum,
Function<OmKeyInfo, OmKeyInfo> retryFunction,
BlockInputStreamFactory blockStreamFactory) {
boolean isHsyncFile = keyInfo.getMetadata().containsKey(OzoneConsts.HSYNC_CLIENT_ID);
List<BlockExtendedInputStream> partStreams = new ArrayList<>();
for (OmKeyLocationInfo omKeyLocationInfo : blockInfos) {
for (int i = 0; i < blockInfos.size(); i++) {
OmKeyLocationInfo omKeyLocationInfo = blockInfos.get(i);
if (LOG.isDebugEnabled()) {
LOG.debug("Adding stream for accessing {}. The stream will be " +
"initialized later.", omKeyLocationInfo);
Expand All @@ -85,6 +88,11 @@ private static List<BlockExtendedInputStream> createStreams(
retry = null;
}

if (i == (blockInfos.size() - 1) && isHsyncFile) {
// block is under construction
omKeyLocationInfo.setUnderConstruction(true);
}

BlockExtendedInputStream stream =
blockStreamFactory.create(
keyInfo.getReplicationConfig(),
Expand Down
Loading