diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java index a76944b9f070..706a8e37b433 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java @@ -82,6 +82,11 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys + .OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys + .OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT_DEFAULT; + /** * Creates a ratis server endpoint that acts as the communication layer for * Ozone containers. @@ -213,6 +218,20 @@ private RaftProperties newRaftProperties() { RaftServerConfigKeys.setStorageDirs(properties, Collections.singletonList(new File(storageDir))); + // Check raft storage dir number and max allowed pipeline number + String[] dirs = storageDir.split(","); + int maxPipelinePerNode = conf.getInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, + OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT_DEFAULT); + if (maxPipelinePerNode == 0 || + (maxPipelinePerNode > 2 && dirs.length < (maxPipelinePerNode - 1))) { + LOG.warn("{} = {} is smaller than {} = {}. Suggest increase {} or " + + "lower {} ", OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR, + dirs.length, + OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, maxPipelinePerNode, + OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR, + OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT); + } + // For grpc set the maximum message size GrpcConfigKeys.setMessageSizeMax(properties, SizeInBytes.valueOf(maxChunkSize + raftSegmentPreallocatedSize)); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineCreateAndDestroy.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineCreateAndDestroy.java index 0874f8be30eb..c76068fe4010 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineCreateAndDestroy.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineCreateAndDestroy.java @@ -24,7 +24,14 @@ import org.apache.hadoop.hdds.scm.server.StorageContainerManager; import org.apache.hadoop.ozone.HddsDatanodeService; import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.container.common.transport.server + .XceiverServerSpi; +import org.apache.hadoop.ozone.container.common.transport.server + .ratis.XceiverServerRatis; import org.apache.hadoop.test.GenericTestUtils; +import org.apache.ratis.protocol.RaftGroupId; +import org.apache.ratis.server.impl.RaftServerImpl; +import org.apache.ratis.server.impl.RaftServerProxy; import org.junit.After; import org.junit.Assert; import org.junit.Test; @@ -32,8 +39,11 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_AUTO_CREATE_FACTOR_ONE; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT; @@ -47,15 +57,16 @@ public class TestRatisPipelineCreateAndDestroy { private static MiniOzoneCluster cluster; private OzoneConfiguration conf = new OzoneConfiguration(); private static PipelineManager pipelineManager; + private static int maxPipelinePerNode = 4; public void init(int numDatanodes) throws Exception { conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, GenericTestUtils.getRandomizedTempPath()); - conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 2); + conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, maxPipelinePerNode); cluster = MiniOzoneCluster.newBuilder(conf) .setNumDatanodes(numDatanodes) - .setTotalPipelineNumLimit(numDatanodes + numDatanodes/3) + .setTotalPipelineNumLimit(numDatanodes + numDatanodes) .setHbInterval(2000) .setHbProcessorInterval(1000) .build(); @@ -162,6 +173,72 @@ public void testPipelineCreationOnNodeRestart() throws Exception { } } + @Test(timeout = 300000) + public void testMultiRaftStorageDir() throws Exception { + final String suffix = "-testMultiRaftStorageDir-"; + Map directories = new ConcurrentHashMap<>(); + int maxPipeline = maxPipelinePerNode; + int index = 0; + while(maxPipeline > 1) { + directories.put("ratis" + suffix + (index++), new AtomicInteger(0)); + maxPipeline--; + } + + conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, + 5, TimeUnit.SECONDS); + conf.set("dfs.container.ratis.datanode.storage.dir.suffix", suffix); + + // Create 3 RATIS THREE pipeline + init(3); + // make sure a pipelines is created + waitForPipelines(3); + List pipelines = + pipelineManager.getPipelines(HddsProtos.ReplicationType.RATIS, + HddsProtos.ReplicationFactor.THREE); + List raftGroupIds = new ArrayList<>(); + pipelines.stream().forEach(pipeline -> + raftGroupIds.add(RaftGroupId.valueOf(pipeline.getId().getId()))); + + List dns = new ArrayList<>(cluster.getHddsDatanodes()); + dns.stream().forEach(dn -> { + XceiverServerSpi writeChannel = + dn.getDatanodeStateMachine().getContainer().getWriteChannel(); + RaftServerProxy server = + (RaftServerProxy)((XceiverServerRatis)writeChannel).getServer(); + raftGroupIds.stream().forEach(group -> { + try { + RaftServerImpl raft = server.getImpl(group); + String raftDir = + raft.getState().getStorage().getStorageDir().getRoot().toString(); + directories.keySet().stream().forEach(path -> { + if (raftDir.contains(path)) { + directories.get(path).incrementAndGet(); + } + }); + } catch (IOException e) { + e.printStackTrace(); + } + }); + }); + + directories.values().stream().forEach( + count -> Assert.assertEquals(maxPipelinePerNode - 1, count.get())); + } + + @Test(timeout = 30000) + public void testMultiRaftPipelineWithSingleStorageDir() throws Exception { + int datanodeNum = 3; + // Create 3 RATIS THREE pipeline + init(datanodeNum); + // make sure a pipelines is created + waitForPipelines(datanodeNum); + List pipelines = + pipelineManager.getPipelines(HddsProtos.ReplicationType.RATIS, + HddsProtos.ReplicationFactor.THREE); + Assert.assertEquals((datanodeNum * (maxPipelinePerNode - 1) / + HddsProtos.ReplicationFactor.THREE.getNumber()), pipelines.size()); + } + private void waitForPipelines(int numPipelines) throws TimeoutException, InterruptedException { GenericTestUtils.waitFor(() -> pipelineManager diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java index bc937aaca91f..fd7d221efcb9 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java @@ -23,6 +23,8 @@ import java.util.Collections; import java.util.List; import java.util.Optional; + +import io.netty.util.internal.StringUtil; import org.apache.commons.io.FileUtils; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.StorageUnit; @@ -69,6 +71,8 @@ import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL; import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState .HEALTHY; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys + .OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT; import static org.apache.hadoop.ozone.OzoneConfigKeys .DFS_CONTAINER_IPC_PORT; import static org.apache.hadoop.ozone.OzoneConfigKeys @@ -604,22 +608,39 @@ List createHddsDatanodes( String[] args = new String[] {}; conf.setStrings(ScmConfigKeys.OZONE_SCM_NAMES, scmAddress); List hddsDatanodes = new ArrayList<>(); + + String suffix = + conf.get("dfs.container.ratis.datanode.storage.dir.suffix"); for (int i = 0; i < numOfDatanodes; i++) { OzoneConfiguration dnConf = new OzoneConfiguration(conf); String datanodeBaseDir = path + "/datanode-" + Integer.toString(i); Path metaDir = Paths.get(datanodeBaseDir, "meta"); Path dataDir = Paths.get(datanodeBaseDir, "data", "containers"); - Path ratisDir = Paths.get(datanodeBaseDir, "data", "ratis"); + String ratisPath = ""; + if (StringUtil.isNullOrEmpty(suffix)) { + ratisPath = Paths.get(datanodeBaseDir, "data", "ratis").toString(); + } else { + int index = 0; + int maxPipelinePerNode = + conf.getInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 0); + while (maxPipelinePerNode > 1) { + ratisPath += Paths.get(datanodeBaseDir, + "data", "ratis" + suffix + (index++)).toString() + ","; + maxPipelinePerNode--; + } + // remove the tail "," + ratisPath = ratisPath.substring(0, ratisPath.length() - 1); + } + Path wrokDir = Paths.get(datanodeBaseDir, "data", "replication", "work"); Files.createDirectories(metaDir); Files.createDirectories(dataDir); - Files.createDirectories(ratisDir); Files.createDirectories(wrokDir); dnConf.set(HddsConfigKeys.OZONE_METADATA_DIRS, metaDir.toString()); dnConf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, dataDir.toString()); dnConf.set(OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR, - ratisDir.toString()); + ratisPath); dnConf.set(OzoneConfigKeys.OZONE_CONTAINER_COPY_WORKDIR, wrokDir.toString());