diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java index 72a8107fd7fd..5e73caf62f2d 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java @@ -66,6 +66,7 @@ import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.malformedRequest; import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.unsupportedRequest; +import org.apache.ratis.statemachine.StateMachine; import org.apache.ratis.thirdparty.com.google.protobuf.ProtocolMessageEnum; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -669,4 +670,21 @@ private boolean isAllowed(String action) { default: return false; } } + + @Override + public StateMachine.DataChannel getStreamDataChannel( + ContainerCommandRequestProto msg) + throws StorageContainerException { + long containerID = msg.getContainerID(); + Container container = getContainer(containerID); + if (container != null) { + Handler handler = getHandler(getContainerType(container)); + return handler.getStreamDataChannel(container, msg); + } else { + throw new StorageContainerException( + "ContainerID " + containerID + " does not exist", + ContainerProtos.Result.CONTAINER_NOT_FOUND); + } + } + } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDispatcher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDispatcher.java index a2e397d54615..d02bae0a35ad 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDispatcher.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDispatcher.java @@ -25,6 +25,7 @@ .ContainerCommandResponseProto; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext; +import org.apache.ratis.statemachine.StateMachine; import java.util.Map; @@ -84,4 +85,13 @@ void validateContainerCommand( * @param clusterId */ void setClusterId(String clusterId); + + /** + * When uploading using stream, get StreamDataChannel. + */ + default StateMachine.DataChannel getStreamDataChannel( + ContainerCommandRequestProto msg) throws StorageContainerException { + throw new UnsupportedOperationException( + "getStreamDataChannel not supported."); + } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java index e58523414be4..201a1e24bce4 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java @@ -37,6 +37,7 @@ import org.apache.hadoop.ozone.container.common.volume.VolumeSet; import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler; import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker; +import org.apache.ratis.statemachine.StateMachine; /** * Dispatcher sends ContainerCommandRequests to Handler. Each Container Type @@ -81,6 +82,10 @@ public static Handler getHandlerForContainerType( } } + public abstract StateMachine.DataChannel getStreamDataChannel( + Container container, ContainerCommandRequestProto msg) + throws StorageContainerException; + /** * Returns the Id of this datanode. * diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java index 9e4a9e9b8c70..5b59407a7e82 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java @@ -23,7 +23,6 @@ import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStream; -import java.nio.file.Paths; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -78,6 +77,7 @@ import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.server.raftlog.RaftLog; import org.apache.ratis.server.storage.RaftStorage; +import org.apache.ratis.statemachine.StateMachine; import org.apache.ratis.statemachine.StateMachineStorage; import org.apache.ratis.statemachine.TransactionContext; import org.apache.ratis.statemachine.impl.BaseStateMachine; @@ -501,6 +501,19 @@ private CompletableFuture handleWriteChunk( return raftFuture; } + private StateMachine.DataChannel getStreamDataChannel( + ContainerCommandRequestProto requestProto, + DispatcherContext context) throws StorageContainerException { + if (LOG.isDebugEnabled()) { + LOG.debug("{}: getStreamDataChannel {} containerID={} pipelineID={} " + + "traceID={}", gid, requestProto.getCmdType(), + requestProto.getContainerID(), requestProto.getPipelineID(), + requestProto.getTraceID()); + } + runCommand(requestProto, context); // stream init + return dispatcher.getStreamDataChannel(requestProto); + } + @Override public CompletableFuture stream(RaftClientRequest request) { return CompletableFuture.supplyAsync(() -> { @@ -512,11 +525,7 @@ public CompletableFuture stream(RaftClientRequest request) { .setStage(DispatcherContext.WriteChunkStage.WRITE_DATA) .setContainer2BCSIDMap(container2BCSIDMap) .build(); - - ContainerCommandResponseProto response = runCommand( - requestProto, context); - final StreamDataChannel channel = new StreamDataChannel( - Paths.get(response.getMessage())); + DataChannel channel = getStreamDataChannel(requestProto, context); final ExecutorService chunkExecutor = requestProto.hasWriteChunk() ? getChunkExecutor(requestProto.getWriteChunk()) : null; return new LocalStream(channel, chunkExecutor); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/StreamDataChannel.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/StreamDataChannel.java deleted file mode 100644 index 3df66e26dcc9..000000000000 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/StreamDataChannel.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ozone.container.common.transport.server.ratis; - -import org.apache.ratis.statemachine.StateMachine; - -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.RandomAccessFile; -import java.nio.ByteBuffer; -import java.nio.file.Path; - -class StreamDataChannel implements StateMachine.DataChannel { - private final Path path; - private final RandomAccessFile randomAccessFile; - - StreamDataChannel(Path path) throws FileNotFoundException { - this.path = path; - this.randomAccessFile = new RandomAccessFile(path.toFile(), "rw"); - } - - @Override - public void force(boolean metadata) throws IOException { - randomAccessFile.getChannel().force(metadata); - } - - @Override - public int write(ByteBuffer src) throws IOException { - return randomAccessFile.getChannel().write(src); - } - - @Override - public boolean isOpen() { - return randomAccessFile.getChannel().isOpen(); - } - - @Override - public void close() throws IOException { - randomAccessFile.close(); - } -} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java index a2b82e5a3b21..a8202e342dc0 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java @@ -105,6 +105,7 @@ import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.unsupportedRequest; import static org.apache.hadoop.hdds.scm.utils.ClientCommandsUtils.getReadChunkVersion; +import org.apache.ratis.statemachine.StateMachine; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -175,6 +176,17 @@ public VolumeChoosingPolicy getVolumeChoosingPolicyForTesting() { return volumeChoosingPolicy; } + @Override + public StateMachine.DataChannel getStreamDataChannel( + Container container, ContainerCommandRequestProto msg) + throws StorageContainerException { + KeyValueContainer kvContainer = (KeyValueContainer) container; + checkContainerOpen(kvContainer); + BlockID blockID = BlockID.getFromProtobuf(msg.getWriteChunk().getBlockID()); + return chunkManager.getStreamDataChannel(kvContainer, + blockID, metrics); + } + @Override public void stop() { } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerDispatcher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerDispatcher.java index 92d260688fbb..a5f8535bbb68 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerDispatcher.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerDispatcher.java @@ -26,6 +26,7 @@ import org.apache.hadoop.ozone.common.ChunkBuffer; import org.apache.hadoop.ozone.container.common.helpers.BlockData; import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; +import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics; import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext; import org.apache.hadoop.ozone.container.common.impl.ChunkLayOutVersion; import org.apache.hadoop.ozone.container.common.volume.VolumeSet; @@ -34,6 +35,7 @@ import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager; import org.apache.hadoop.ozone.container.common.interfaces.Container; +import org.apache.ratis.statemachine.StateMachine; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -80,6 +82,14 @@ public String streamInit(Container container, BlockID blockID) .streamInit(container, blockID); } + @Override + public StateMachine.DataChannel getStreamDataChannel( + Container container, BlockID blockID, ContainerMetrics metrics) + throws StorageContainerException { + return selectHandler(container) + .getStreamDataChannel(container, blockID, metrics); + } + @Override public void finishWriteChunks(KeyValueContainer kvContainer, BlockData blockData) throws IOException { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java index 8b13c9346597..b5107723faf3 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java @@ -32,6 +32,7 @@ import org.apache.hadoop.ozone.common.utils.BufferUtils; import org.apache.hadoop.ozone.container.common.helpers.BlockData; import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; +import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics; import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext; import org.apache.hadoop.ozone.container.common.volume.VolumeSet; import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer; @@ -42,6 +43,7 @@ import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager; import org.apache.hadoop.ozone.container.common.interfaces.Container; +import org.apache.ratis.statemachine.StateMachine; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -97,6 +99,16 @@ public String streamInit(Container container, BlockID blockID) return chunkFile.getAbsolutePath(); } + @Override + public StateMachine.DataChannel getStreamDataChannel( + Container container, BlockID blockID, ContainerMetrics metrics) + throws StorageContainerException { + checkLayoutVersion(container); + File chunkFile = getChunkFile(container, blockID, null); + return new KeyValueStreamDataChannel(chunkFile, + container.getContainerData(), metrics); + } + @Override public void writeChunk(Container container, BlockID blockID, ChunkInfo info, ChunkBuffer data, DispatcherContext dispatcherContext) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java new file mode 100644 index 000000000000..c0570f5d4df7 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.keyvalue.impl; + +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; +import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics; +import org.apache.hadoop.ozone.container.common.impl.ContainerData; +import org.apache.ratis.statemachine.StateMachine; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; + +/** + * This class is used to get the DataChannel for streaming. + */ +class KeyValueStreamDataChannel implements StateMachine.DataChannel { + private final RandomAccessFile randomAccessFile; + private final File file; + + private final ContainerData containerData; + private final ContainerMetrics metrics; + + KeyValueStreamDataChannel(File file, ContainerData containerData, + ContainerMetrics metrics) + throws StorageContainerException { + try { + this.file = file; + this.randomAccessFile = new RandomAccessFile(file, "rw"); + } catch (FileNotFoundException e) { + throw new StorageContainerException("BlockFile not exists with " + + "container Id " + containerData.getContainerID() + + " file " + file.getAbsolutePath(), + ContainerProtos.Result.IO_EXCEPTION); + } + this.containerData = containerData; + this.metrics = metrics; + } + + @Override + public void force(boolean metadata) throws IOException { + randomAccessFile.getChannel().force(metadata); + } + + @Override + public int write(ByteBuffer src) throws IOException { + int writeBytes = randomAccessFile.getChannel().write(src); + metrics + .incContainerBytesStats(ContainerProtos.Type.StreamWrite, writeBytes); + containerData.updateWriteStats(writeBytes, false); + return writeBytes; + } + + @Override + public boolean isOpen() { + return randomAccessFile.getChannel().isOpen(); + } + + @Override + public void close() throws IOException { + randomAccessFile.close(); + } + + @Override + public String toString() { + return "KeyValueStreamDataChannel{" + + "File=" + file.getAbsolutePath() + + ", containerID=" + containerData.getContainerID() + + '}'; + } +} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java index ba06eebd6998..7a64f076281b 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java @@ -25,9 +25,11 @@ import org.apache.hadoop.ozone.common.ChunkBuffer; import org.apache.hadoop.ozone.container.common.helpers.BlockData; import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; +import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics; import org.apache.hadoop.ozone.container.common.interfaces.Container; import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext; import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer; +import org.apache.ratis.statemachine.StateMachine; import java.io.IOException; import java.nio.ByteBuffer; @@ -109,6 +111,12 @@ default String streamInit(Container container, BlockID blockID) return null; } + default StateMachine.DataChannel getStreamDataChannel( + Container container, BlockID blockID, ContainerMetrics metrics) + throws StorageContainerException { + return null; + } + static long getBufferCapacityForChunkRead(ChunkInfo chunkInfo, long defaultReadBufferCapacity) { long bufferCapacity = 0; diff --git a/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto b/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto index 90ecbfcf30d4..05f453fce2f5 100644 --- a/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto +++ b/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto @@ -102,6 +102,7 @@ enum Type { GetCommittedBlockLength = 18; StreamInit = 19; + StreamWrite = 20; } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineStream.java new file mode 100644 index 000000000000..3b174503767a --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineStream.java @@ -0,0 +1,183 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.client.rpc; + + +import org.apache.hadoop.hdds.client.ReplicationType; +import org.apache.hadoop.hdds.conf.DatanodeRatisServerConfig; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.ratis.conf.RatisClientConfig; +import org.apache.hadoop.hdds.scm.OzoneClientConfig; +import org.apache.hadoop.ozone.HddsDatanodeService; +import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.client.ObjectStore; +import org.apache.hadoop.ozone.client.OzoneClient; +import org.apache.hadoop.ozone.client.OzoneClientFactory; +import org.apache.hadoop.ozone.client.io.KeyDataStreamOutput; +import org.apache.hadoop.ozone.client.io.OzoneDataStreamOutput; +import org.apache.hadoop.ozone.container.ContainerTestHelper; +import org.apache.hadoop.ozone.container.TestHelper; +import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_COMMAND_STATUS_REPORT_INTERVAL; +import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL; +import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL; + +/** + * Tests the containerStateMachine stream handling. + */ +public class TestContainerStateMachineStream { + + /** + * Set a timeout for each test. + */ + @Rule + public Timeout timeout = Timeout.seconds(300); + + private MiniOzoneCluster cluster; + private OzoneConfiguration conf = new OzoneConfiguration(); + private OzoneClient client; + private ObjectStore objectStore; + private String volumeName; + private String bucketName; + + /** + * Create a MiniDFSCluster for testing. + * + * @throws IOException + */ + @Before + public void setup() throws Exception { + conf = new OzoneConfiguration(); + + OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class); + clientConfig.setStreamBufferFlushDelay(false); + conf.setFromObject(clientConfig); + + conf.setTimeDuration(HDDS_CONTAINER_REPORT_INTERVAL, 200, + TimeUnit.MILLISECONDS); + conf.setTimeDuration(HDDS_COMMAND_STATUS_REPORT_INTERVAL, 200, + TimeUnit.MILLISECONDS); + conf.setTimeDuration(HDDS_PIPELINE_REPORT_INTERVAL, 200, + TimeUnit.MILLISECONDS); + conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 30, TimeUnit.SECONDS); + conf.setTimeDuration(OZONE_SCM_PIPELINE_DESTROY_TIMEOUT, 1, + TimeUnit.SECONDS); + + RatisClientConfig ratisClientConfig = + conf.getObject(RatisClientConfig.class); + ratisClientConfig.setWriteRequestTimeout(Duration.ofSeconds(10)); + ratisClientConfig.setWatchRequestTimeout(Duration.ofSeconds(10)); + conf.setFromObject(ratisClientConfig); + + DatanodeRatisServerConfig ratisServerConfig = + conf.getObject(DatanodeRatisServerConfig.class); + ratisServerConfig.setRequestTimeOut(Duration.ofSeconds(3)); + ratisServerConfig.setWatchTimeOut(Duration.ofSeconds(10)); + conf.setFromObject(ratisServerConfig); + + RatisClientConfig.RaftConfig raftClientConfig = + conf.getObject(RatisClientConfig.RaftConfig.class); + raftClientConfig.setRpcRequestTimeout(Duration.ofSeconds(3)); + raftClientConfig.setRpcWatchRequestTimeout(Duration.ofSeconds(10)); + conf.setFromObject(raftClientConfig); + + conf.setLong(OzoneConfigKeys.DFS_RATIS_SNAPSHOT_THRESHOLD_KEY, 1); + conf.setQuietMode(false); + cluster = + MiniOzoneCluster.newBuilder(conf).setNumDatanodes(3).setHbInterval(200) + .build(); + cluster.waitForClusterToBeReady(); + cluster.waitForPipelineTobeReady(HddsProtos.ReplicationFactor.ONE, 60000); + //the easiest way to create an open container is creating a key + client = OzoneClientFactory.getRpcClient(conf); + objectStore = client.getObjectStore(); + + volumeName = "testcontainerstatemachinestream"; + bucketName = "teststreambucket"; + objectStore.createVolume(volumeName); + objectStore.getVolume(volumeName).createBucket(bucketName); + + } + + /** + * Shutdown MiniDFSCluster. + */ + @After + public void shutdown() { + if (cluster != null) { + cluster.shutdown(); + } + } + + @Test + public void testContainerStateMachineForStreaming() throws Exception { + long size = 1024 * 8; + + OzoneDataStreamOutput key = TestHelper.createStreamKey( + "ozone-stream-test.txt", ReplicationType.RATIS, size, objectStore, + volumeName, bucketName); + + byte[] data = + ContainerTestHelper + .getFixedLengthString(UUID.randomUUID().toString(), + (int) (size / 2)) + .getBytes(UTF_8); + key.write(ByteBuffer.wrap(data)); + key.write(ByteBuffer.wrap(data)); + + key.flush(); + + KeyDataStreamOutput streamOutput = + (KeyDataStreamOutput) key.getByteBufStreamOutput(); + List locationInfoList = + streamOutput.getLocationInfoList(); + + key.close(); + + OmKeyLocationInfo omKeyLocationInfo = locationInfoList.get(0); + HddsDatanodeService dn = TestHelper.getDatanodeService(omKeyLocationInfo, + cluster); + + long bytesUsed = dn.getDatanodeStateMachine() + .getContainer().getContainerSet() + .getContainer(omKeyLocationInfo.getContainerID()). + getContainerData().getBytesUsed(); + + Assert.assertTrue(bytesUsed == size); + } + +} diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java index 29f19eba0d70..34055d191f3c 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java @@ -65,6 +65,7 @@ import org.apache.ratis.rpc.RpcType; import static org.apache.ratis.rpc.SupportedRpcType.GRPC; import static org.apache.ratis.rpc.SupportedRpcType.NETTY; + import org.apache.ratis.util.function.CheckedBiConsumer; import org.junit.Assert; import org.junit.BeforeClass;