Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -229,10 +229,17 @@ protected List<ChunkInfo> getChunkInfos() throws IOException {
blockID.getContainerID());
}

DatanodeBlockID datanodeBlockID = blockID
.getDatanodeBlockIDProtobuf();
DatanodeBlockID.Builder blkIDBuilder =
DatanodeBlockID.newBuilder().setContainerID(blockID.getContainerID())
.setLocalID(blockID.getLocalID())
.setBlockCommitSequenceId(blockID.getBlockCommitSequenceId());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

blockCommitSequenceID is used for handling quosi_closed ratis container which has a non-determined state. so i don`t think it has any use for EC container. maybe we can remove it for EC case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have not really thought much about BCSID. Currently we are not using it, but we need to think whether we have benefit or take advantage of any other scenarios which we have not covered yet. So, I think it may be too early to discard. Eventually we can ignore that later times if we never find any use of it. I suggest to get set as it is available now. Also we surely don't want to ignore in BlockInputStream as that is non EC code flow too.


int replicaIndex = pipeline.getReplicaIndex(pipeline.getClosestNode());
if (replicaIndex > 0) {
blkIDBuilder.setReplicaIndex(replicaIndex);
}
GetBlockResponseProto response = ContainerProtocolCalls
.getBlock(xceiverClient, datanodeBlockID, token);
.getBlock(xceiverClient, blkIDBuilder.build(), token);

chunks = response.getBlockData().getChunksList();
success = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,17 +140,24 @@ public BlockOutputStream(
this.xceiverClientFactory = xceiverClientManager;
this.config = config;
this.blockID = new AtomicReference<>(blockID);
replicationIndex = pipeline.getReplicaIndex(pipeline.getClosestNode());
KeyValue keyValue =
KeyValue.newBuilder().setKey("TYPE").setValue("KEY").build();
this.containerBlockData =
BlockData.newBuilder().setBlockID(blockID.getDatanodeBlockIDProtobuf())
.addMetadata(keyValue);

ContainerProtos.DatanodeBlockID.Builder blkIDBuilder =
ContainerProtos.DatanodeBlockID.newBuilder()
.setContainerID(blockID.getContainerID())
.setLocalID(blockID.getLocalID())
.setBlockCommitSequenceId(blockID.getBlockCommitSequenceId());
if (replicationIndex > 0) {
blkIDBuilder.setReplicaIndex(replicationIndex);
}
this.containerBlockData = BlockData.newBuilder().setBlockID(
blkIDBuilder.build()).addMetadata(keyValue);
this.xceiverClient = xceiverClientManager.acquireClient(pipeline);
this.bufferPool = bufferPool;
this.token = token;

replicationIndex = pipeline.getReplicaIndex(pipeline.getClosestNode());

//number of buffers used before doing a flush
refreshCurrentBuffer();
flushPeriod = (int) (config.getStreamBufferFlushSize() / config
Expand Down Expand Up @@ -268,11 +275,15 @@ public void write(byte[] b, int off, int len) throws IOException {
writeChunkIfNeeded();
off += writeLen;
len -= writeLen;
writtenDataLength += writeLen;
updateWrittenDataLength(writeLen);
doFlushOrWatchIfNeeded();
}
}

public void updateWrittenDataLength(int writeLen) {
writtenDataLength += writeLen;
}

private void doFlushOrWatchIfNeeded() throws IOException {
if (currentBufferRemaining == 0) {
if (bufferPool.getNumberOfUsedBuffers() % flushPeriod == 0) {
Expand Down Expand Up @@ -611,6 +622,7 @@ public void setIoException(Exception e) {
if (ioe == null) {
IOException exception = new IOException(EXCEPTION_MSG + e.toString(), e);
ioException.compareAndSet(null, exception);
LOG.debug("Exception: for block ID: " + blockID, e);
} else {
LOG.debug("Previous request had already failed with {} " +
"so subsequent request also encounters " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ public ECBlockOutputStream(
public void write(byte[] b, int off, int len) throws IOException {
this.currentChunkRspFuture =
writeChunkToContainer(ChunkBuffer.wrap(ByteBuffer.wrap(b, off, len)));
updateWrittenDataLength(len);
}

public CompletableFuture<ContainerProtos.ContainerCommandResponseProto> write(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hadoop.ozone.client.io;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
Expand Down Expand Up @@ -167,7 +168,8 @@ protected BlockExtendedInputStream getOrOpenStream(int locationIndex) {
.setReplicationConfig(StandaloneReplicationConfig.getInstance(
HddsProtos.ReplicationFactor.ONE))
.setNodes(Arrays.asList(dataLocations[locationIndex]))
.setId(PipelineID.randomId())
.setId(PipelineID.randomId()).setReplicaIndexes(
ImmutableMap.of(dataLocations[locationIndex], locationIndex + 1))
.setState(Pipeline.PipelineState.CLOSED)
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,8 @@ public static ContainerProtos.PutBlockResponseProto putBlock(
* @throws InterruptedException
* @throws ExecutionException
*/
public static XceiverClientReply putBlockAsync(
XceiverClientSpi xceiverClient, BlockData containerBlockData, boolean eof,
public static XceiverClientReply putBlockAsync(XceiverClientSpi xceiverClient,
BlockData containerBlockData, boolean eof,
Token<? extends TokenIdentifier> token)
throws IOException, InterruptedException, ExecutionException {
PutBlockRequestProto.Builder createBlockRequest =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,9 +250,14 @@ private ContainerCommandResponseProto dispatchRequest(
/**
* Create Container should happen only as part of Write_Data phase of
* writeChunk.
* In EC, we are doing empty putBlock. In the partial stripe writes, if
* file size is less than chunkSize*(ECData-1), we are making empty block
* to get the container created in non writing nodes. If replica index is
* >0 then we know it's for ec container.
*/
if (container == null && ((isWriteStage || isCombinedStage)
|| cmdType == Type.PutSmallFile)) {
|| cmdType == Type.PutSmallFile
|| cmdType == Type.PutBlock)) {
// If container does not exist, create one for WriteChunk and
// PutSmallFile request
responseProto = createContainer(msg);
Expand All @@ -264,7 +269,8 @@ private ContainerCommandResponseProto dispatchRequest(
return ContainerUtils.logAndReturnError(LOG, sce, msg);
}
Preconditions.checkArgument(isWriteStage && container2BCSIDMap != null
|| dispatcherContext == null);
|| dispatcherContext == null
|| cmdType == Type.PutBlock);
if (container2BCSIDMap != null) {
// adds this container to list of containers created in the pipeline
// with initial BCSID recorded as 0.
Expand Down Expand Up @@ -416,6 +422,12 @@ ContainerCommandResponseProto createContainer(
containerRequest.getWriteChunk().getBlockID().getReplicaIndex());
}

if (containerRequest.hasPutBlock()) {
createRequest.setReplicaIndex(
containerRequest.getPutBlock().getBlockData().getBlockID()
.getReplicaIndex());
}

ContainerCommandRequestProto.Builder requestBuilder =
ContainerCommandRequestProto.newBuilder()
.setCmdType(Type.CreateContainer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,9 +250,13 @@ private void scanData(DataTransferThrottler throttler, Canceler canceler)
BlockData bdata = db.getStore()
.getBlockDataTable()
.get(blockKey);
if (bdata != null) {
throw new IOException("Missing chunk file "
+ chunkFile.getAbsolutePath());
// In EC, client may write empty putBlock in padding block nodes.
// So, we need to make sure, chunk length > 0, before declaring
// the missing chunk file.
if (bdata != null && bdata.getChunks().size() > 0 && bdata
.getChunks().get(0).getLen() > 0) {
throw new IOException(
"Missing chunk file " + chunkFile.getAbsolutePath());
}
} else if (chunk.getChecksumData().getType()
!= ContainerProtos.ChecksumType.NONE) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,11 @@ ContainerCommandResponseProto handlePutBlock(

boolean endOfBlock = false;
if (!request.getPutBlock().hasEof() || request.getPutBlock().getEof()) {
chunkManager.finishWriteChunks(kvContainer, blockData);
// in EC, we will be doing empty put block. So, there may not be dat
// a available. So, let's flush only when data size is > 0.
if (request.getPutBlock().getBlockData().getSize() > 0) {
chunkManager.finishWriteChunks(kvContainer, blockData);
}
endOfBlock = true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,8 @@ private void ensureIndexWithinBounds(Integer index, String setName) {
if (index < 1 || index > repConfig.getRequiredNodes()) {
throw new IllegalArgumentException("Replica Index in " + setName
+ " for containerID " + containerInfo.getContainerID()
+ "must be between 1 and " + repConfig.getRequiredNodes());
+ "must be between 1 and " + repConfig.getRequiredNodes()
+ ". But the given index is: " + index);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,26 +90,17 @@ void checkStream() throws IOException {
if (!isInitialized()) {
blockOutputStreams =
new ECBlockOutputStream[replicationConfig.getRequiredNodes()];
}
if (blockOutputStreams[currentStreamIdx] == null) {
createOutputStream();
for (int i = currentStreamIdx; i < replicationConfig
.getRequiredNodes(); i++) {
List<DatanodeDetails> nodes = getPipeline().getNodes();
blockOutputStreams[i] =
new ECBlockOutputStream(getBlockID(), getXceiverClientManager(),
createSingleECBlockPipeline(getPipeline(), nodes.get(i), i + 1),
getBufferPool(), getConf(), getToken());
}
}
}

@Override
void createOutputStream() throws IOException {
Pipeline ecPipeline = getPipeline();
List<DatanodeDetails> nodes = getPipeline().getNodes();
blockOutputStreams[currentStreamIdx] = new ECBlockOutputStream(
getBlockID(),
getXceiverClientManager(),
createSingleECBlockPipeline(
ecPipeline, nodes.get(currentStreamIdx), currentStreamIdx + 1),
getBufferPool(),
getConf(),
getToken());
}

@Override
public OutputStream getOutputStream() {
if (!isInitialized()) {
Expand Down Expand Up @@ -318,6 +309,13 @@ private List<ECBlockOutputStream> getFailedStreams(boolean forPutBlock) {
List<ECBlockOutputStream> failedStreams = new ArrayList<>();
while (iter.hasNext()) {
final ECBlockOutputStream stream = iter.next();
if (!forPutBlock && stream.getWrittenDataLength() <= 0) {
// If we did not write any data to this stream yet, let's not consider
// for failure checking. But we should do failure checking for putBlock
// though. In the case of padding stripes, we do send empty put blocks
// for creating empty containers at DNs ( Refer: HDDS-6794).
continue;
}
CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
responseFuture = null;
if (forPutBlock) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,9 @@ public void testPutBlockHasBlockGroupLen() throws IOException {
keyDetails.getOzoneKeyLocations().get(0).getContainerID())
.setLocalID(
keyDetails.getOzoneKeyLocations().get(0).getLocalID())
.setBlockCommitSequenceId(1).build());
.setBlockCommitSequenceId(1).setReplicaIndex(
blockList.getKeyLocations(0).getPipeline()
.getMemberReplicaIndexes(i)).build());

List<ContainerProtos.KeyValue> metadataList =
block.getMetadataList().stream().filter(kv -> kv.getKey()
Expand Down Expand Up @@ -560,6 +562,9 @@ public void test10D4PConfigWithPartialStripe()
OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT);
int dataBlks = 10;
int parityBlks = 4;
MultiNodePipelineBlockAllocator blkAllocator =
new MultiNodePipelineBlockAllocator(conf, dataBlks + parityBlks, 14);
createNewClient(conf, blkAllocator);
store.createVolume(volumeName);
OzoneVolume volume = store.getVolume(volumeName);
volume.createBucket(bucketName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,9 +230,12 @@ public void testListBlock() throws Exception {
continue;
}
ListBlockResponseProto response = ContainerProtocolCalls
.listBlock(clients.get(i), containerID, null, numExpectedBlocks + 1,
.listBlock(clients.get(i), containerID, null, Integer.MAX_VALUE,
containerToken);
Assertions.assertEquals(numExpectedBlocks, response.getBlockDataCount(),
Assertions.assertEquals(numExpectedBlocks,
response.getBlockDataList().stream().filter(
k -> k.getChunksCount() > 0 && k.getChunks(0).getLen() > 0)
.collect(Collectors.toList()).size(),
"blocks count doesn't match on DN " + i);
Assertions.assertEquals(numExpectedChunks,
response.getBlockDataList().stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ public void testMultipleChunksInSingleWriteOp(int numChunks)
}

@Test
public void testECContainerKeysCount()
public void testECContainerKeysCountAndNumContainerReplicas()
throws IOException, InterruptedException, TimeoutException {
byte[] inputData = getInputBytes(1);
final OzoneBucket bucket = getOzoneBucket();
Expand Down Expand Up @@ -320,8 +320,9 @@ public void testECContainerKeysCount()

GenericTestUtils.waitFor(() -> {
try {
return containerOperationClient.getContainer(currentKeyContainerID)
.getNumberOfKeys() == 1;
return (containerOperationClient.getContainer(currentKeyContainerID)
.getNumberOfKeys() == 1) && (containerOperationClient
.getContainerReplicas(currentKeyContainerID).size() == 5);
} catch (IOException exception) {
Assert.fail("Unexpected exception " + exception);
return false;
Expand Down