Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.Arrays;
import java.util.Collection;
import java.util.LinkedHashMap;
Expand All @@ -34,6 +35,10 @@
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.viewfs.Constants;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
Expand Down Expand Up @@ -131,8 +136,8 @@ private String getUsage(String cmd) {
String[] commands =
{"-add", "-update", "-rm", "-ls", "-getDestination", "-setQuota",
"-setStorageTypeQuota", "-clrQuota", "-clrStorageTypeQuota",
"-safemode", "-nameservice", "-getDisabledNameservices",
"-refresh", "-refreshRouterArgs",
"-initViewFsToMountTable", "-safemode", "-nameservice",
"-getDisabledNameservices", "-refresh", "-refreshRouterArgs",
"-refreshSuperUserGroupsConfiguration"};
StringBuilder usage = new StringBuilder();
usage.append("Usage: hdfs dfsrouteradmin :\n");
Expand Down Expand Up @@ -171,7 +176,9 @@ private String getUsage(String cmd) {
return "\t[-clrQuota <path>]";
} else if (cmd.equals("-clrStorageTypeQuota")) {
return "\t[-clrStorageTypeQuota <path>]";
} else if (cmd.equals("-safemode")) {
} else if (cmd.equals("-initViewFsToMountTable")) {
return "\t[-initViewFsToMountTable <clusterName>]";
}else if (cmd.equals("-safemode")) {
return "\t[-safemode enter | leave | get]";
} else if (cmd.equals("-nameservice")) {
return "\t[-nameservice enable | disable <nameservice>]";
Expand Down Expand Up @@ -242,6 +249,10 @@ private boolean validateMin(String[] argv) {
if (argv.length < 2) {
return false;
}
} else if ("-initViewFsToMountTable".equals(cmd)) {
if (argv.length < 2) {
return false;
}
} else if ("-getDestination".equals(cmd)) {
if (argv.length < 2) {
return false;
Expand Down Expand Up @@ -384,6 +395,13 @@ public int run(String[] argv) throws Exception {
getDisabledNameservices();
} else if ("-refresh".equals(cmd)) {
refresh(address);
} else if ("-initViewFsToMountTable".equals(cmd)) {
if (initViewFsToMountTable(argv[i])) {
System.out.println("Successfully init ViewFs mapping to router " +
argv[i]);
} else {
exitCode = -1;
}
} else if ("-refreshRouterArgs".equals(cmd)) {
exitCode = genericRefresh(argv, i);
} else if ("-refreshSuperUserGroupsConfiguration".equals(cmd)) {
Expand Down Expand Up @@ -1036,6 +1054,74 @@ private boolean updateQuota(String mount, long nsQuota, long ssQuota)
return updateResponse.getStatus();
}

/**
* initViewFsToMountTable.
* @param clusterName The specified cluster to initialize.
* @return If the quota was updated.
* @throws IOException Error adding the mount point.
*/
public boolean initViewFsToMountTable(String clusterName)
throws IOException {
// fs.viewfs.mounttable.ClusterX.link./data
final String mountTablePrefix =
Constants.CONFIG_VIEWFS_PREFIX + "." + clusterName + "." +
Constants.CONFIG_VIEWFS_LINK + ".";
final String rootPath = "/";
Map<String, String> viewFsMap = getConf().getValByRegex(
mountTablePrefix + rootPath);
if (viewFsMap.isEmpty()) {
System.out.println("There is no ViewFs mapping to initialize.");
return true;
}
for (Entry<String, String> entry : viewFsMap.entrySet()) {
Path path = new Path(entry.getValue());
URI destUri = path.toUri();
String mountKey = entry.getKey();
DestinationOrder order = DestinationOrder.HASH;
String mount = mountKey.replaceAll(mountTablePrefix, "");
if (!destUri.getScheme().equals("hdfs")) {
System.out.println("Only supports HDFS, " +
"added Mount Point failed , " + mountKey);
}
if (!mount.startsWith(rootPath) ||
!destUri.getPath().startsWith(rootPath)) {
System.out.println("Added Mount Point failed " + mountKey);
continue;
}
String[] nss = new String[]{destUri.getAuthority()};
boolean added = addMount(
mount, nss, destUri.getPath(), false,
false, order, getACLEntityFormHdfsPath(path, getConf()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we specify clustername, it could be not proper to invoke directly, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Hexiaoqiao I didn't find any problems here, can you tell me the details, thank you very much.

if (added) {
System.out.println("Added mount point " + mount);
}
}
return true;
}

/**
* Returns ACLEntity according to a HDFS pat.
* @param path A path of HDFS.
*/
static public ACLEntity getACLEntityFormHdfsPath(
Path path, Configuration conf) {
String owner = null;
String group = null;
FsPermission mode = null;
try {
FileSystem fs = path.getFileSystem(conf);
if (fs.exists(path)) {
FileStatus fileStatus = fs.getFileStatus(path);
owner = fileStatus.getOwner();
group = fileStatus.getGroup();
mode = fileStatus.getPermission();
}
} catch (IOException e) {
System.err.println("Exception encountered " + e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just suggest to throw exception rather than just print the error information.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When FileStatus cannot be obtained, I think the default ACLEntity should be used to add the mapping.

}
return new ACLEntity(owner, group, mode);
}

/**
* Update storage type quota of specified mount table.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,22 @@ Mount table permission can be set by following command:

The option mode is UNIX-style permissions for the mount table. Permissions are specified in octal, e.g. 0755. By default, this is set to 0755.

#### Init ViewFs To Router
Router supports initializing the ViewFS mount point to the Router. The mapping directory protocol of ViewFS must be HDFS, and the initializer only supports one-to-one mapping.

For example, use the following viewfs to configure the initial mount table to the router.

<configuration>
<property>
<name>fs.viewfs.mounttable.ClusterX.link./data</name>
<value>hdfs://nn1-clusterx.example.com:8020/data</value>
</property>
</configuration>

The ViewFS mount table can be initialized to the Router by using the following command:

[hdfs]$ $HADOOP_HOME/bin/hdfs dfsrouteradmin -initViewFsToMountTable ClusterX

#### Quotas
Router-based federation supports global quota at mount table level. Mount table entries may spread multiple subclusters and the global quota will be
accounted across these subclusters.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@
import java.util.regex.Pattern;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
Expand Down Expand Up @@ -78,6 +80,8 @@ public class TestRouterAdminCLI {
private static RouterClient client;
private static Router router;

private static DistributedFileSystem hdfs;

private static final String TEST_USER = "test-user";

private final ByteArrayOutputStream out = new ByteArrayOutputStream();
Expand All @@ -102,10 +106,12 @@ public static void globalSetUp() throws Exception {

// Start routers
cluster.startRouters();
cluster.startCluster();

routerContext = cluster.getRandomRouter();
router = routerContext.getRouter();
stateStore = router.getStateStore();
hdfs = cluster.getCluster().getFileSystem();

Configuration routerConf = new Configuration();
InetSocketAddress routerSocket = router.getAdminServerAddress();
Expand Down Expand Up @@ -700,6 +706,49 @@ public void testAddMountTableIfParentExist() throws Exception {
}
}

@Test
public void testInitViewFsToMountTable() throws Exception {
// re-set system out for testing
System.setOut(new PrintStream(out));
stateStore.loadCache(MountTableStoreImpl.class, true);
String nnAddress = cluster.getRandomNamenode().
getNamenode().getHostAndPort();

String src = "/data";
Path destPath = new Path("hdfs://" + nnAddress + "/data");
String user = "user1";
String group = "group1";
String clusterName = "ClusterX";

// 0.mkdir destPath
hdfs.mkdirs(destPath);
// 1.set owner
hdfs.setOwner(destPath, user, group);
// 2.set viewFs mapping
admin.getConf().set(
"fs.viewfs.mounttable.ClusterX.link." + src, destPath.toString());
// 3.run initialization
String[] argv = new String[]{"-initViewFsToMountTable", clusterName};
assertEquals(0, ToolRunner.run(admin, argv));
// 4.gets the mount point entries
stateStore.loadCache(MountTableStoreImpl.class, true);
GetMountTableEntriesRequest getRequest = GetMountTableEntriesRequest
.newInstance(src);
GetMountTableEntriesResponse getResponse = client.getMountTableManager()
.getMountTableEntries(getRequest);
List<MountTable> mountTables = getResponse.getEntries();
// 5.check
assertEquals(1, mountTables.size());
MountTable mountTable = mountTables.get(0);
assertEquals(user, mountTable.getOwnerName());
assertEquals(group, mountTable.getGroupName());
assertEquals(destPath.toUri().getPath(), mountTable.
getDestinations().get(0).getDest());
assertEquals(nnAddress, mountTable.
getDestinations().get(0).getNameserviceId());
assertEquals(src, mountTable.getSourcePath());
}

@Test
public void testMountTablePermissions() throws Exception {
// re-set system out for testing
Expand Down Expand Up @@ -810,6 +859,13 @@ public void testInvalidArgumentMessage() throws Exception {
assertTrue(out.toString().contains("\t[-clrQuota <path>]"));
out.reset();

argv = new String[] {"-initViewFsToMountTable"};
assertEquals(-1, ToolRunner.run(admin, argv));
System.err.println(out.toString());
assertTrue(out.toString().
contains("[-initViewFsToMountTable <clusterName>]"));
out.reset();

argv = new String[] {"-safemode"};
assertEquals(-1, ToolRunner.run(admin, argv));
assertTrue(out.toString().contains("\t[-safemode enter | leave | get]"));
Expand Down Expand Up @@ -852,6 +908,7 @@ public void testInvalidArgumentMessage() throws Exception {
+ " <quota in bytes or quota size string>]\n"
+ "\t[-clrQuota <path>]\n"
+ "\t[-clrStorageTypeQuota <path>]\n"
+"\t[-initViewFsToMountTable <clusterName>]\n"
+ "\t[-safemode enter | leave | get]\n"
+ "\t[-nameservice enable | disable <nameservice>]\n"
+ "\t[-getDisabledNameservices]\n"
Expand Down