diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java index 07a444a2486e..a0a210dd5857 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java @@ -229,10 +229,17 @@ protected List getChunkInfos() throws IOException { blockID.getContainerID()); } - DatanodeBlockID datanodeBlockID = blockID - .getDatanodeBlockIDProtobuf(); + DatanodeBlockID.Builder blkIDBuilder = + DatanodeBlockID.newBuilder().setContainerID(blockID.getContainerID()) + .setLocalID(blockID.getLocalID()) + .setBlockCommitSequenceId(blockID.getBlockCommitSequenceId()); + + int replicaIndex = pipeline.getReplicaIndex(pipeline.getClosestNode()); + if (replicaIndex > 0) { + blkIDBuilder.setReplicaIndex(replicaIndex); + } GetBlockResponseProto response = ContainerProtocolCalls - .getBlock(xceiverClient, datanodeBlockID, token); + .getBlock(xceiverClient, blkIDBuilder.build(), token); chunks = response.getBlockData().getChunksList(); success = true; diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java index b86464c0976a..036b1007d7b6 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java @@ -140,17 +140,24 @@ public BlockOutputStream( this.xceiverClientFactory = xceiverClientManager; this.config = config; this.blockID = new AtomicReference<>(blockID); + replicationIndex = pipeline.getReplicaIndex(pipeline.getClosestNode()); KeyValue keyValue = KeyValue.newBuilder().setKey("TYPE").setValue("KEY").build(); - this.containerBlockData = - BlockData.newBuilder().setBlockID(blockID.getDatanodeBlockIDProtobuf()) - .addMetadata(keyValue); + + ContainerProtos.DatanodeBlockID.Builder blkIDBuilder = + ContainerProtos.DatanodeBlockID.newBuilder() + .setContainerID(blockID.getContainerID()) + .setLocalID(blockID.getLocalID()) + .setBlockCommitSequenceId(blockID.getBlockCommitSequenceId()); + if (replicationIndex > 0) { + blkIDBuilder.setReplicaIndex(replicationIndex); + } + this.containerBlockData = BlockData.newBuilder().setBlockID( + blkIDBuilder.build()).addMetadata(keyValue); this.xceiverClient = xceiverClientManager.acquireClient(pipeline); this.bufferPool = bufferPool; this.token = token; - replicationIndex = pipeline.getReplicaIndex(pipeline.getClosestNode()); - //number of buffers used before doing a flush refreshCurrentBuffer(); flushPeriod = (int) (config.getStreamBufferFlushSize() / config @@ -268,11 +275,15 @@ public void write(byte[] b, int off, int len) throws IOException { writeChunkIfNeeded(); off += writeLen; len -= writeLen; - writtenDataLength += writeLen; + updateWrittenDataLength(writeLen); doFlushOrWatchIfNeeded(); } } + public void updateWrittenDataLength(int writeLen) { + writtenDataLength += writeLen; + } + private void doFlushOrWatchIfNeeded() throws IOException { if (currentBufferRemaining == 0) { if (bufferPool.getNumberOfUsedBuffers() % flushPeriod == 0) { @@ -611,6 +622,7 @@ public void setIoException(Exception e) { if (ioe == null) { IOException exception = new IOException(EXCEPTION_MSG + e.toString(), e); ioException.compareAndSet(null, exception); + LOG.debug("Exception: for block ID: " + blockID, e); } else { LOG.debug("Previous request had already failed with {} " + "so subsequent request also encounters " + diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java index 174e507829b7..9f6bf059506c 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java @@ -78,6 +78,7 @@ public ECBlockOutputStream( public void write(byte[] b, int off, int len) throws IOException { this.currentChunkRspFuture = writeChunkToContainer(ChunkBuffer.wrap(ByteBuffer.wrap(b, off, len))); + updateWrittenDataLength(len); } public CompletableFuture write( diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java index 89be7839ac0f..40d454a0a19b 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java @@ -18,6 +18,7 @@ package org.apache.hadoop.ozone.client.io; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; import org.apache.hadoop.fs.FSExceptionMessages; import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.client.ECReplicationConfig; @@ -167,7 +168,8 @@ protected BlockExtendedInputStream getOrOpenStream(int locationIndex) { .setReplicationConfig(StandaloneReplicationConfig.getInstance( HddsProtos.ReplicationFactor.ONE)) .setNodes(Arrays.asList(dataLocations[locationIndex])) - .setId(PipelineID.randomId()) + .setId(PipelineID.randomId()).setReplicaIndexes( + ImmutableMap.of(dataLocations[locationIndex], locationIndex + 1)) .setState(Pipeline.PipelineState.CLOSED) .build(); diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java index 16bef6971215..e024d79b9a7d 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java @@ -229,8 +229,8 @@ public static ContainerProtos.PutBlockResponseProto putBlock( * @throws InterruptedException * @throws ExecutionException */ - public static XceiverClientReply putBlockAsync( - XceiverClientSpi xceiverClient, BlockData containerBlockData, boolean eof, + public static XceiverClientReply putBlockAsync(XceiverClientSpi xceiverClient, + BlockData containerBlockData, boolean eof, Token token) throws IOException, InterruptedException, ExecutionException { PutBlockRequestProto.Builder createBlockRequest = diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java index 802104a17140..60bb8e59bf89 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java @@ -250,9 +250,14 @@ private ContainerCommandResponseProto dispatchRequest( /** * Create Container should happen only as part of Write_Data phase of * writeChunk. + * In EC, we are doing empty putBlock. In the partial stripe writes, if + * file size is less than chunkSize*(ECData-1), we are making empty block + * to get the container created in non writing nodes. If replica index is + * >0 then we know it's for ec container. */ if (container == null && ((isWriteStage || isCombinedStage) - || cmdType == Type.PutSmallFile)) { + || cmdType == Type.PutSmallFile + || cmdType == Type.PutBlock)) { // If container does not exist, create one for WriteChunk and // PutSmallFile request responseProto = createContainer(msg); @@ -264,7 +269,8 @@ private ContainerCommandResponseProto dispatchRequest( return ContainerUtils.logAndReturnError(LOG, sce, msg); } Preconditions.checkArgument(isWriteStage && container2BCSIDMap != null - || dispatcherContext == null); + || dispatcherContext == null + || cmdType == Type.PutBlock); if (container2BCSIDMap != null) { // adds this container to list of containers created in the pipeline // with initial BCSID recorded as 0. @@ -416,6 +422,12 @@ ContainerCommandResponseProto createContainer( containerRequest.getWriteChunk().getBlockID().getReplicaIndex()); } + if (containerRequest.hasPutBlock()) { + createRequest.setReplicaIndex( + containerRequest.getPutBlock().getBlockData().getBlockID() + .getReplicaIndex()); + } + ContainerCommandRequestProto.Builder requestBuilder = ContainerCommandRequestProto.newBuilder() .setCmdType(Type.CreateContainer) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerCheck.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerCheck.java index c560aabbe630..90c392bc01a5 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerCheck.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerCheck.java @@ -250,9 +250,13 @@ private void scanData(DataTransferThrottler throttler, Canceler canceler) BlockData bdata = db.getStore() .getBlockDataTable() .get(blockKey); - if (bdata != null) { - throw new IOException("Missing chunk file " - + chunkFile.getAbsolutePath()); + // In EC, client may write empty putBlock in padding block nodes. + // So, we need to make sure, chunk length > 0, before declaring + // the missing chunk file. + if (bdata != null && bdata.getChunks().size() > 0 && bdata + .getChunks().get(0).getLen() > 0) { + throw new IOException( + "Missing chunk file " + chunkFile.getAbsolutePath()); } } else if (chunk.getChecksumData().getType() != ContainerProtos.ChecksumType.NONE) { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java index 95907116506a..1c79683326e2 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java @@ -465,7 +465,11 @@ ContainerCommandResponseProto handlePutBlock( boolean endOfBlock = false; if (!request.getPutBlock().hasEof() || request.getPutBlock().getEof()) { - chunkManager.finishWriteChunks(kvContainer, blockData); + // in EC, we will be doing empty put block. So, there may not be dat + // a available. So, let's flush only when data size is > 0. + if (request.getPutBlock().getBlockData().getSize() > 0) { + chunkManager.finishWriteChunks(kvContainer, blockData); + } endOfBlock = true; } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECContainerReplicaCount.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECContainerReplicaCount.java index ff0a75053a32..f62afc8b777e 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECContainerReplicaCount.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECContainerReplicaCount.java @@ -423,7 +423,8 @@ private void ensureIndexWithinBounds(Integer index, String setName) { if (index < 1 || index > repConfig.getRequiredNodes()) { throw new IllegalArgumentException("Replica Index in " + setName + " for containerID " + containerInfo.getContainerID() - + "must be between 1 and " + repConfig.getRequiredNodes()); + + "must be between 1 and " + repConfig.getRequiredNodes() + + ". But the given index is: " + index); } } } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java index 99cebf8217ad..13c6a8647ac7 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java @@ -90,26 +90,17 @@ void checkStream() throws IOException { if (!isInitialized()) { blockOutputStreams = new ECBlockOutputStream[replicationConfig.getRequiredNodes()]; - } - if (blockOutputStreams[currentStreamIdx] == null) { - createOutputStream(); + for (int i = currentStreamIdx; i < replicationConfig + .getRequiredNodes(); i++) { + List nodes = getPipeline().getNodes(); + blockOutputStreams[i] = + new ECBlockOutputStream(getBlockID(), getXceiverClientManager(), + createSingleECBlockPipeline(getPipeline(), nodes.get(i), i + 1), + getBufferPool(), getConf(), getToken()); + } } } - @Override - void createOutputStream() throws IOException { - Pipeline ecPipeline = getPipeline(); - List nodes = getPipeline().getNodes(); - blockOutputStreams[currentStreamIdx] = new ECBlockOutputStream( - getBlockID(), - getXceiverClientManager(), - createSingleECBlockPipeline( - ecPipeline, nodes.get(currentStreamIdx), currentStreamIdx + 1), - getBufferPool(), - getConf(), - getToken()); - } - @Override public OutputStream getOutputStream() { if (!isInitialized()) { @@ -318,6 +309,13 @@ private List getFailedStreams(boolean forPutBlock) { List failedStreams = new ArrayList<>(); while (iter.hasNext()) { final ECBlockOutputStream stream = iter.next(); + if (!forPutBlock && stream.getWrittenDataLength() <= 0) { + // If we did not write any data to this stream yet, let's not consider + // for failure checking. But we should do failure checking for putBlock + // though. In the case of padding stripes, we do send empty put blocks + // for creating empty containers at DNs ( Refer: HDDS-6794). + continue; + } CompletableFuture responseFuture = null; if (forPutBlock) { diff --git a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java index 3438e3f2f98b..5003f8e5e632 100644 --- a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java +++ b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java @@ -384,7 +384,9 @@ public void testPutBlockHasBlockGroupLen() throws IOException { keyDetails.getOzoneKeyLocations().get(0).getContainerID()) .setLocalID( keyDetails.getOzoneKeyLocations().get(0).getLocalID()) - .setBlockCommitSequenceId(1).build()); + .setBlockCommitSequenceId(1).setReplicaIndex( + blockList.getKeyLocations(0).getPipeline() + .getMemberReplicaIndexes(i)).build()); List metadataList = block.getMetadataList().stream().filter(kv -> kv.getKey() @@ -560,6 +562,9 @@ public void test10D4PConfigWithPartialStripe() OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT); int dataBlks = 10; int parityBlks = 4; + MultiNodePipelineBlockAllocator blkAllocator = + new MultiNodePipelineBlockAllocator(conf, dataBlks + parityBlks, 14); + createNewClient(conf, blkAllocator); store.createVolume(volumeName); OzoneVolume volume = store.getVolume(volumeName); volume.createBucket(bucketName); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java index e57ec400cc9d..0b037af8c162 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java @@ -230,9 +230,12 @@ public void testListBlock() throws Exception { continue; } ListBlockResponseProto response = ContainerProtocolCalls - .listBlock(clients.get(i), containerID, null, numExpectedBlocks + 1, + .listBlock(clients.get(i), containerID, null, Integer.MAX_VALUE, containerToken); - Assertions.assertEquals(numExpectedBlocks, response.getBlockDataCount(), + Assertions.assertEquals(numExpectedBlocks, + response.getBlockDataList().stream().filter( + k -> k.getChunksCount() > 0 && k.getChunks(0).getLen() > 0) + .collect(Collectors.toList()).size(), "blocks count doesn't match on DN " + i); Assertions.assertEquals(numExpectedChunks, response.getBlockDataList().stream() diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java index 948c48143540..4f0a09177173 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java @@ -292,7 +292,7 @@ public void testMultipleChunksInSingleWriteOp(int numChunks) } @Test - public void testECContainerKeysCount() + public void testECContainerKeysCountAndNumContainerReplicas() throws IOException, InterruptedException, TimeoutException { byte[] inputData = getInputBytes(1); final OzoneBucket bucket = getOzoneBucket(); @@ -320,8 +320,9 @@ public void testECContainerKeysCount() GenericTestUtils.waitFor(() -> { try { - return containerOperationClient.getContainer(currentKeyContainerID) - .getNumberOfKeys() == 1; + return (containerOperationClient.getContainer(currentKeyContainerID) + .getNumberOfKeys() == 1) && (containerOperationClient + .getContainerReplicas(currentKeyContainerID).size() == 5); } catch (IOException exception) { Assert.fail("Unexpected exception " + exception); return false;