Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.zookeeper.EmptyWatcher;
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DistributedFileSystem;
Expand All @@ -144,8 +145,10 @@
import org.apache.hadoop.mapred.MiniMRCluster;
import org.apache.hadoop.mapred.TaskLog;
import org.apache.hadoop.minikdc.MiniKdc;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooKeeper.States;
Expand Down Expand Up @@ -220,6 +223,9 @@ public class HBaseTestingUtil extends HBaseZKTestingUtil {
/** This is for unit tests parameterized with a single boolean. */
public static final List<Object[]> MEMSTORETS_TAGS_PARAMETRIZED = memStoreTSAndTagsCombination();

private boolean isExternalDFS = false;
private boolean isExternalZK = false;

/**
* Checks to see if a specific port is available.
* @param port the port number to check for availability
Expand Down Expand Up @@ -720,6 +726,12 @@ private String createDirAndSetProperty(final String relPath, String property) {
return path;
}

private void addPropertiesToConf(Properties properties) {
properties.forEach((k, v) -> {
this.conf.set(k.toString(), v.toString());
});
}

/**
* Shuts down instance created by call to {@link #startMiniDFSCluster(int)} or does nothing.
*/
Expand Down Expand Up @@ -772,21 +784,24 @@ public SingleProcessHBaseCluster startMiniCluster(StartTestingClusterOption opti
}
miniClusterRunning = true;

setupClusterTestDir();
System.setProperty(TEST_DIRECTORY_KEY, this.clusterTestDir.getPath());

// Bring up mini dfs cluster. This spews a bunch of warnings about missing
// scheme. Complaints are 'Scheme is undefined for build/test/data/dfs/name1'.
if (dfsCluster == null) {
LOG.info("STARTING DFS");
dfsCluster = startMiniDFSCluster(option.getNumDataNodes(), option.getDataNodeHosts());
} else {
LOG.info("NOT STARTING DFS");
if (option.getExternalDFS() == null) {
setupClusterTestDir();
System.setProperty(TEST_DIRECTORY_KEY, this.clusterTestDir.getPath());
// Bring up mini dfs cluster. This spews a bunch of warnings about missing
// scheme. Complaints are 'Scheme is undefined for build/test/data/dfs/name1'.
if (dfsCluster == null) {
LOG.info("STARTING DFS");
dfsCluster = startMiniDFSCluster(option.getNumDataNodes(), option.getDataNodeHosts());
} else {
LOG.info("NOT STARTING DFS");
}
}

// Start up a zk cluster.
if (getZkCluster() == null) {
startMiniZKCluster(option.getNumZkServers());
if (option.getExternalZK() == null) {
// Start up a zk cluster.
if (getZkCluster() == null) {
startMiniZKCluster(option.getNumZkServers());
}
}

// Start the MiniHBaseCluster
Expand All @@ -802,6 +817,31 @@ public SingleProcessHBaseCluster startMiniCluster(StartTestingClusterOption opti
*/
public SingleProcessHBaseCluster startMiniHBaseCluster(StartTestingClusterOption option)
throws IOException, InterruptedException {
if (option.getExternalZK() != null) {
if(option.getExternalZK().get(HConstants.ZOOKEEPER_QUORUM) == null) {
throw new IllegalArgumentException("ZOOKEEPER_QUORUM can not be null.");
}

addPropertiesToConf(option.getExternalZK());
this.isExternalZK = true;
}

if (option.getExternalDFS() != null) {
if(option.getExternalDFS().get("fs.defaultFS") == null) {
throw new IllegalArgumentException("fs.defaultFS can not be null.");
}

addPropertiesToConf(option.getExternalDFS());

// RS is started with a different user, @see #HBaseTestingUtil.getDifferentUser
// this is to ensure the user has permissions to read and write external HDFS.
this.conf.set("fs.permissions.umask-mode", "000");
LOG.info("USING EXTERNAL DFS: {}, user: {}.",
conf.get("fs.defaultFS"), UserGroupInformation.getCurrentUser().getUserName());

this.isExternalDFS = true;
}

// Now do the mini hbase cluster. Set the hbase.rootdir in config.
createRootDir(option.isCreateRootDir());
if (option.isCreateWALDir()) {
Expand Down Expand Up @@ -984,8 +1024,12 @@ public SingleProcessHBaseCluster getMiniHBaseCluster() {
public void shutdownMiniCluster() throws IOException {
LOG.info("Shutting down minicluster");
shutdownMiniHBaseCluster();
shutdownMiniDFSCluster();
shutdownMiniZKCluster();
if (!this.isExternalDFS) {
shutdownMiniDFSCluster();
}
if (!this.isExternalZK) {
shutdownMiniZKCluster();
}

cleanupTestDir();
miniClusterRunning = false;
Expand All @@ -1008,6 +1052,23 @@ public void shutdownMiniHBaseCluster() throws IOException {
zooKeeperWatcher.close();
zooKeeperWatcher = null;
}

// clean external dfs dir and znode
if (this.isExternalDFS) {
FileSystem fs = FileSystem.get(this.conf);
fs.delete(new Path(this.conf.get(HConstants.HBASE_DIR)).getParent(), true);
fs.close();
}
if (this.isExternalZK) {
try (ZKWatcher watcher = new ZKWatcher(this.conf, "", null)) {
String znode = this.conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT, HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
if (ZKUtil.checkExists(watcher, znode) != -1) {
ZKUtil.deleteNodeRecursively(watcher, znode);
}
} catch(KeeperException e) {
throw new IOException(e.getMessage(), e);
}
}
}

/**
Expand All @@ -1034,6 +1095,12 @@ private void cleanup() throws IOException {
conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, -1);
}

// When we use external HDFS, we should use an authorised user.
// If UGI is not reseted, setting hadoop user with HADOOP_USER_NAME does not work.
public void resetUserGroupInformation() {
UserGroupInformation.reset();
}

/**
* Returns the path to the default root dir the minicluster uses. If <code>create</code> is true,
* a new root directory path is fetched irrespective of whether it has been fetched before or not.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.yetus.audience.InterfaceAudience;
Expand Down Expand Up @@ -106,14 +107,25 @@ public final class StartTestingClusterOption {
*/
private final boolean createWALDir;

/**
* conf of external DFS.
* Use the Properties rather than Configuration,
* because the Configuration may contain default conf.
*/
private Properties externalDFS;
/**
* conf of external ZK.
*/
private Properties externalZK;

/**
* Private constructor. Use {@link Builder#build()}.
*/
private StartTestingClusterOption(int numMasters, int numAlwaysStandByMasters,
Class<? extends HMaster> masterClass, int numRegionServers, List<Integer> rsPorts,
Class<? extends SingleProcessHBaseCluster.MiniHBaseClusterRegionServer> rsClass,
int numDataNodes, String[] dataNodeHosts, int numZkServers, boolean createRootDir,
boolean createWALDir) {
boolean createWALDir, Properties externalDFS, Properties externalZK) {
this.numMasters = numMasters;
this.numAlwaysStandByMasters = numAlwaysStandByMasters;
this.masterClass = masterClass;
Expand All @@ -125,6 +137,8 @@ private StartTestingClusterOption(int numMasters, int numAlwaysStandByMasters,
this.numZkServers = numZkServers;
this.createRootDir = createRootDir;
this.createWALDir = createWALDir;
this.externalDFS = externalDFS;
this.externalZK = externalZK;
}

public int getNumMasters() {
Expand Down Expand Up @@ -171,13 +185,22 @@ public boolean isCreateWALDir() {
return createWALDir;
}

public Properties getExternalDFS() {
return externalDFS;
}

public Properties getExternalZK() {
return externalZK;
}

@Override
public String toString() {
return "StartMiniClusterOption{" + "numMasters=" + numMasters + ", masterClass=" + masterClass +
", numRegionServers=" + numRegionServers + ", rsPorts=" + StringUtils.join(rsPorts) +
", rsClass=" + rsClass + ", numDataNodes=" + numDataNodes + ", dataNodeHosts=" +
Arrays.toString(dataNodeHosts) + ", numZkServers=" + numZkServers + ", createRootDir=" +
createRootDir + ", createWALDir=" + createWALDir + '}';
createRootDir + ", createWALDir=" + createWALDir + ", externalDFS=" + externalDFS +
", externalZK=" + externalZK +'}';
}

/**
Expand Down Expand Up @@ -205,6 +228,8 @@ public static final class Builder {
private int numZkServers = 1;
private boolean createRootDir = false;
private boolean createWALDir = false;
private Properties externalDFS = null;
private Properties externalZK = null;

private Builder() {
}
Expand All @@ -215,7 +240,7 @@ public StartTestingClusterOption build() {
}
return new StartTestingClusterOption(numMasters, numAlwaysStandByMasters, masterClass,
numRegionServers, rsPorts, rsClass, numDataNodes, dataNodeHosts, numZkServers,
createRootDir, createWALDir);
createRootDir, createWALDir, externalDFS, externalZK);
}

public Builder numMasters(int numMasters) {
Expand Down Expand Up @@ -273,6 +298,16 @@ public Builder createWALDir(boolean createWALDir) {
this.createWALDir = createWALDir;
return this;
}

public Builder externalDFS(Properties externalDFS) {
this.externalDFS = externalDFS;
return this;
}

public Builder externalZK(Properties externalZK) {
this.externalZK = externalZK;
return this;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import java.io.File;
import java.util.List;
import java.util.Properties;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
Expand All @@ -41,6 +42,8 @@
import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.security.ssl.SSLFactory;
import org.junit.ClassRule;
Expand Down Expand Up @@ -140,6 +143,45 @@ public void testMultiClusters() throws Exception {
}
}

@Test
public void testMiniClusterWithExternalDFSAndZK() throws Exception {
HBaseTestingUtil hbt1 = new HBaseTestingUtil();

// Let's say this is external Zk and DFS
MiniDFSCluster dfsCluster = hbt1.startMiniDFSCluster(3);
MiniZooKeeperCluster zkCluster = hbt1.startMiniZKCluster();

Properties hdfsConf = new Properties();
hdfsConf.setProperty("fs.defaultFS", new Path(dfsCluster.getFileSystem().getUri()).toString());

Properties zkConf = new Properties();
zkConf.setProperty(HConstants.ZOOKEEPER_QUORUM, zkCluster.getAddress().getHostName());
zkConf.setProperty(HConstants.ZOOKEEPER_CLIENT_PORT, Integer.toString(zkCluster.getAddress().getPort()));
zkConf.setProperty(HConstants.ZOOKEEPER_ZNODE_PARENT, "/hbase_test");

HBaseTestingUtil hbt2 = new HBaseTestingUtil();
StartTestingClusterOption option = StartTestingClusterOption.builder()
.externalZK(zkConf).externalDFS(hdfsConf).build();
SingleProcessHBaseCluster cluster = hbt2.startMiniCluster(option);
String hbaseRootDir = hbt2.conf.get(HConstants.HBASE_DIR);
String znode = hbt2.conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT);
try {
assertEquals(1, cluster.getLiveRegionServerThreads().size());
} finally {
hbt2.shutdownMiniCluster();
}

// check if we cleaned the dir or znode created by miniHbaseCluster
FileSystem fs = FileSystem.get(hbt2.conf);
assertFalse(fs.exists(new Path(hbaseRootDir).getParent()));

ZKWatcher watcher = new ZKWatcher(hbt2.conf, "TestHbase", null);
assertEquals(ZKUtil.checkExists(watcher, znode), -1);

hbt1.shutdownMiniDFSCluster();
hbt1.shutdownMiniZKCluster();
}

@Test public void testMiniCluster() throws Exception {
HBaseTestingUtil hbt = new HBaseTestingUtil();

Expand Down Expand Up @@ -406,6 +448,7 @@ public void testMiniZooKeeperWithMultipleClientPorts() throws Exception {
assertTrue(!fs.exists(testdir));
assertTrue(fs.mkdirs(testdir));
assertTrue(hbt.cleanupTestDir());
hbt.resetUserGroupInformation();
}

@Test public void testResolvePortConflict() throws Exception {
Expand Down
Loading