Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import static org.apache.hadoop.hdds.scm.ScmConfigKeys
.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys
.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT_DEFAULT;

/**
* Creates a ratis server endpoint that acts as the communication layer for
* Ozone containers.
Expand Down Expand Up @@ -213,6 +218,20 @@ private RaftProperties newRaftProperties() {
RaftServerConfigKeys.setStorageDirs(properties,
Collections.singletonList(new File(storageDir)));

// Check raft storage dir number and max allowed pipeline number
String[] dirs = storageDir.split(",");
int maxPipelinePerNode = conf.getInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT,
OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT_DEFAULT);
if (maxPipelinePerNode == 0 ||
(maxPipelinePerNode > 2 && dirs.length < (maxPipelinePerNode - 1))) {
LOG.warn("{} = {} is smaller than {} = {}. Suggest increase {} or " +
"lower {} ", OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR,
dirs.length,
OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, maxPipelinePerNode,
OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR,
OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT);
}

// For grpc set the maximum message size
GrpcConfigKeys.setMessageSizeMax(properties,
SizeInBytes.valueOf(maxChunkSize + raftSegmentPreallocatedSize));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,26 @@
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.ozone.HddsDatanodeService;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.container.common.transport.server
.XceiverServerSpi;
import org.apache.hadoop.ozone.container.common.transport.server
.ratis.XceiverServerRatis;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.server.impl.RaftServerImpl;
import org.apache.ratis.server.impl.RaftServerProxy;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;

import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_AUTO_CREATE_FACTOR_ONE;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT;
Expand All @@ -47,15 +57,16 @@ public class TestRatisPipelineCreateAndDestroy {
private static MiniOzoneCluster cluster;
private OzoneConfiguration conf = new OzoneConfiguration();
private static PipelineManager pipelineManager;
private static int maxPipelinePerNode = 4;

public void init(int numDatanodes) throws Exception {
conf.set(HddsConfigKeys.OZONE_METADATA_DIRS,
GenericTestUtils.getRandomizedTempPath());
conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 2);
conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, maxPipelinePerNode);

cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(numDatanodes)
.setTotalPipelineNumLimit(numDatanodes + numDatanodes/3)
.setTotalPipelineNumLimit(numDatanodes + numDatanodes)
.setHbInterval(2000)
.setHbProcessorInterval(1000)
.build();
Expand Down Expand Up @@ -162,6 +173,72 @@ public void testPipelineCreationOnNodeRestart() throws Exception {
}
}

@Test(timeout = 300000)
public void testMultiRaftStorageDir() throws Exception {
final String suffix = "-testMultiRaftStorageDir-";
Map<String, AtomicInteger> directories = new ConcurrentHashMap<>();
int maxPipeline = maxPipelinePerNode;
int index = 0;
while(maxPipeline > 1) {
directories.put("ratis" + suffix + (index++), new AtomicInteger(0));
maxPipeline--;
}

conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL,
5, TimeUnit.SECONDS);
conf.set("dfs.container.ratis.datanode.storage.dir.suffix", suffix);

// Create 3 RATIS THREE pipeline
init(3);
// make sure a pipelines is created
waitForPipelines(3);
List<Pipeline> pipelines =
pipelineManager.getPipelines(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE);
List<RaftGroupId> raftGroupIds = new ArrayList<>();
pipelines.stream().forEach(pipeline ->
raftGroupIds.add(RaftGroupId.valueOf(pipeline.getId().getId())));

List<HddsDatanodeService> dns = new ArrayList<>(cluster.getHddsDatanodes());
dns.stream().forEach(dn -> {
XceiverServerSpi writeChannel =
dn.getDatanodeStateMachine().getContainer().getWriteChannel();
RaftServerProxy server =
(RaftServerProxy)((XceiverServerRatis)writeChannel).getServer();
raftGroupIds.stream().forEach(group -> {
try {
RaftServerImpl raft = server.getImpl(group);
String raftDir =
raft.getState().getStorage().getStorageDir().getRoot().toString();
directories.keySet().stream().forEach(path -> {
if (raftDir.contains(path)) {
directories.get(path).incrementAndGet();
}
});
} catch (IOException e) {
e.printStackTrace();
}
});
});

directories.values().stream().forEach(
count -> Assert.assertEquals(maxPipelinePerNode - 1, count.get()));
}

@Test(timeout = 30000)
public void testMultiRaftPipelineWithSingleStorageDir() throws Exception {
int datanodeNum = 3;
// Create 3 RATIS THREE pipeline
init(datanodeNum);
// make sure a pipelines is created
waitForPipelines(datanodeNum);
List<Pipeline> pipelines =
pipelineManager.getPipelines(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE);
Assert.assertEquals((datanodeNum * (maxPipelinePerNode - 1) /
HddsProtos.ReplicationFactor.THREE.getNumber()), pipelines.size());
}

private void waitForPipelines(int numPipelines)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would good to add a test to verify that even with single ratis directory configured, we are still able to create multiple pipelines as per the limit configured.

Copy link
Contributor Author

@ChenSammi ChenSammi Dec 17, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure @bshashikant , a new commit addressed the concern.

throws TimeoutException, InterruptedException {
GenericTestUtils.waitFor(() -> pipelineManager
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import java.util.Collections;
import java.util.List;
import java.util.Optional;

import io.netty.util.internal.StringUtil;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.StorageUnit;
Expand Down Expand Up @@ -69,6 +71,8 @@
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState
.HEALTHY;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys
.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT;
import static org.apache.hadoop.ozone.OzoneConfigKeys
.DFS_CONTAINER_IPC_PORT;
import static org.apache.hadoop.ozone.OzoneConfigKeys
Expand Down Expand Up @@ -604,22 +608,39 @@ List<HddsDatanodeService> createHddsDatanodes(
String[] args = new String[] {};
conf.setStrings(ScmConfigKeys.OZONE_SCM_NAMES, scmAddress);
List<HddsDatanodeService> hddsDatanodes = new ArrayList<>();

String suffix =
conf.get("dfs.container.ratis.datanode.storage.dir.suffix");
for (int i = 0; i < numOfDatanodes; i++) {
OzoneConfiguration dnConf = new OzoneConfiguration(conf);
String datanodeBaseDir = path + "/datanode-" + Integer.toString(i);
Path metaDir = Paths.get(datanodeBaseDir, "meta");
Path dataDir = Paths.get(datanodeBaseDir, "data", "containers");
Path ratisDir = Paths.get(datanodeBaseDir, "data", "ratis");
String ratisPath = "";
if (StringUtil.isNullOrEmpty(suffix)) {
ratisPath = Paths.get(datanodeBaseDir, "data", "ratis").toString();
} else {
int index = 0;
int maxPipelinePerNode =
conf.getInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 0);
while (maxPipelinePerNode > 1) {
ratisPath += Paths.get(datanodeBaseDir,
"data", "ratis" + suffix + (index++)).toString() + ",";
maxPipelinePerNode--;
}
// remove the tail ","
ratisPath = ratisPath.substring(0, ratisPath.length() - 1);
}

Path wrokDir = Paths.get(datanodeBaseDir, "data", "replication",
"work");
Files.createDirectories(metaDir);
Files.createDirectories(dataDir);
Files.createDirectories(ratisDir);
Files.createDirectories(wrokDir);
dnConf.set(HddsConfigKeys.OZONE_METADATA_DIRS, metaDir.toString());
dnConf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, dataDir.toString());
dnConf.set(OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR,
ratisDir.toString());
ratisPath);
dnConf.set(OzoneConfigKeys.OZONE_CONTAINER_COPY_WORKDIR,
wrokDir.toString());

Expand Down