diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterWebHdfsMethods.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterWebHdfsMethods.java index f6ac70c368a52..afc4a3d8fac21 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterWebHdfsMethods.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterWebHdfsMethods.java @@ -93,6 +93,7 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -104,6 +105,8 @@ import java.util.HashSet; import java.util.List; import java.util.Random; +import java.util.regex.Matcher; +import java.util.regex.Pattern; /** * WebHDFS Router implementation. This is an extension of @@ -453,21 +456,33 @@ private DatanodeInfo chooseDatanode(final Router router, final String path, final HttpOpParam.Op op, final long openOffset, final String excludeDatanodes) throws IOException { final RouterRpcServer rpcServer = getRPCServer(router); - DatanodeInfo[] dns = null; + DatanodeInfo[] dns = {}; + String resolvedNs = ""; try { dns = rpcServer.getCachedDatanodeReport(DatanodeReportType.LIVE); } catch (IOException e) { LOG.error("Cannot get the datanodes from the RPC server", e); } + if (op == PutOpParam.Op.CREATE) { + try { + resolvedNs = rpcServer.getCreateLocation(path).getNameserviceId(); + } catch (IOException e) { + LOG.error("Cannot get the name service " + + "to create file for path {} ", path, e); + } + } + HashSet excludes = new HashSet(); - if (excludeDatanodes != null) { - Collection collection = - getTrimmedStringCollection(excludeDatanodes); - for (DatanodeInfo dn : dns) { - if (collection.contains(dn.getName())) { - excludes.add(dn); - } + Collection collection = + getTrimmedStringCollection(excludeDatanodes); + for (DatanodeInfo dn : dns) { + String ns = getNsFromDataNodeNetworkLocation(dn.getNetworkLocation()); + if (collection.contains(dn.getName())) { + excludes.add(dn); + } else if (op == PutOpParam.Op.CREATE && !ns.equals(resolvedNs)) { + // for CREATE, the dest dn should be in the resolved ns + excludes.add(dn); } } @@ -502,6 +517,22 @@ private DatanodeInfo chooseDatanode(final Router router, return getRandomDatanode(dns, excludes); } + /** + * Get the nameservice info from datanode network location. + * @param location network location with format `/ns0/rack1` + * @return nameservice this datanode is in + */ + @VisibleForTesting + public static String getNsFromDataNodeNetworkLocation(String location) { + // network location should be in the format of /ns/rack + Pattern pattern = Pattern.compile("^/([^/]*)/"); + Matcher matcher = pattern.matcher(location); + if (matcher.find()) { + return matcher.group(1); + } + return ""; + } + /** * Get a random Datanode from a subcluster. * @param dns Nodes to be chosen from. diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/fs/contract/router/web/RouterWebHDFSContract.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/fs/contract/router/web/RouterWebHDFSContract.java index 1d308073290d4..6b90faecc78f3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/fs/contract/router/web/RouterWebHDFSContract.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/fs/contract/router/web/RouterWebHDFSContract.java @@ -64,6 +64,8 @@ public static void createCluster(Configuration conf) throws IOException { conf.addResource(CONTRACT_WEBHDFS_XML); cluster = new MiniRouterDFSCluster(true, 2, conf); + cluster.setIndependentDNs(); + cluster.setNumDatanodesPerNameservice(3); // Start NNs and DNs and wait until ready cluster.startCluster(conf); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSCluster.java index 896d08f2c49b6..8a7a03e018b95 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSCluster.java @@ -774,6 +774,15 @@ public void startCluster(Configuration overrideConf) { } topology.setFederation(true); + // Generate conf for namenodes and datanodes + String ns0 = nameservices.get(0); + Configuration nnConf = generateNamenodeConfiguration(ns0); + if (overrideConf != null) { + nnConf.addResource(overrideConf); + // Router also uses this configurations as initial values. + routerConf = new Configuration(overrideConf); + } + // Set independent DNs across subclusters int numDNs = nameservices.size() * numDatanodesPerNameservice; Configuration[] dnConfs = null; @@ -781,7 +790,7 @@ public void startCluster(Configuration overrideConf) { dnConfs = new Configuration[numDNs]; int dnId = 0; for (String nsId : nameservices) { - Configuration subclusterConf = new Configuration(); + Configuration subclusterConf = new Configuration(nnConf); subclusterConf.set(DFS_INTERNAL_NAMESERVICES_KEY, nsId); for (int i = 0; i < numDatanodesPerNameservice; i++) { dnConfs[dnId] = subclusterConf; @@ -791,14 +800,6 @@ public void startCluster(Configuration overrideConf) { } // Start mini DFS cluster - String ns0 = nameservices.get(0); - Configuration nnConf = generateNamenodeConfiguration(ns0); - if (overrideConf != null) { - nnConf.addResource(overrideConf); - // Router also uses this configurations as initial values. - routerConf = new Configuration(overrideConf); - } - cluster = new MiniDFSCluster.Builder(nnConf) .numDataNodes(numDNs) .nnTopology(topology) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterWebHdfsMethods.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterWebHdfsMethods.java new file mode 100644 index 0000000000000..7028928041452 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterWebHdfsMethods.java @@ -0,0 +1,147 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.federation.router; + +import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.createMountTableEntry; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import java.io.FileNotFoundException; +import java.net.HttpURLConnection; +import java.net.URL; +import java.util.Collections; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext; +import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; +import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster; +import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Test suite for Router Web Hdfs methods. + */ +public class TestRouterWebHdfsMethods { + static final Logger LOG = + LoggerFactory.getLogger(TestRouterWebHdfsMethods.class); + + private static StateStoreDFSCluster cluster; + private static RouterContext router; + private static String httpUri; + + @BeforeClass + public static void globalSetUp() throws Exception { + cluster = new StateStoreDFSCluster(false, 2); + Configuration conf = new RouterConfigBuilder() + .stateStore() + .rpc() + .http() + .admin() + .build(); + cluster.addRouterOverrides(conf); + cluster.startCluster(); + cluster.startRouters(); + cluster.waitClusterUp(); + router = cluster.getRandomRouter(); + httpUri = "http://"+router.getHttpAddress(); + } + + @AfterClass + public static void tearDown() { + if (cluster != null) { + cluster.shutdown(); + cluster = null; + } + } + + @Test + public void testWebHdfsCreate() throws Exception { + // the file is created at default ns (ns0) + String path = "/tmp/file"; + URL url = new URL(getUri(path)); + LOG.info("URL: {}", url); + HttpURLConnection conn = (HttpURLConnection) url.openConnection(); + conn.setRequestMethod("PUT"); + assertEquals(HttpURLConnection.HTTP_CREATED, conn.getResponseCode()); + verifyFile("ns0", path, true); + verifyFile("ns1", path, false); + conn.disconnect(); + } + + @Test + public void testWebHdfsCreateWithMounts() throws Exception { + // the file is created at mounted ns (ns1) + String mountPoint = "/tmp-ns1"; + String path = "/tmp-ns1/file"; + createMountTableEntry( + router.getRouter(), mountPoint, + DestinationOrder.RANDOM, Collections.singletonList("ns1")); + URL url = new URL(getUri(path)); + LOG.info("URL: {}", url); + HttpURLConnection conn = (HttpURLConnection) url.openConnection(); + conn.setRequestMethod("PUT"); + assertEquals(HttpURLConnection.HTTP_CREATED, conn.getResponseCode()); + verifyFile("ns1", path, true); + verifyFile("ns0", path, false); + conn.disconnect(); + } + + private String getUri(String path) { + final String user = System.getProperty("user.name"); + final StringBuilder uri = new StringBuilder(httpUri); + uri.append("/webhdfs/v1"). + append(path). + append("?op=CREATE"). + append("&user.name=" + user); + return uri.toString(); + } + + private void verifyFile(String ns, String path, boolean shouldExist) + throws Exception { + FileSystem fs = cluster.getNamenode(ns, null).getFileSystem(); + try { + fs.getFileStatus(new Path(path)); + if (!shouldExist) { + fail(path + " should not exist in ns " + ns); + } + } catch (FileNotFoundException e) { + if (shouldExist) { + fail(path + " should exist in ns " + ns); + } + } + } + + @Test + public void testGetNsFromDataNodeNetworkLocation() { + assertEquals("ns0", RouterWebHdfsMethods + .getNsFromDataNodeNetworkLocation("/ns0/rack-info1")); + assertEquals("ns0", RouterWebHdfsMethods + .getNsFromDataNodeNetworkLocation("/ns0/row1/rack-info1")); + assertEquals("", RouterWebHdfsMethods + .getNsFromDataNodeNetworkLocation("/row0")); + assertEquals("", RouterWebHdfsMethods + .getNsFromDataNodeNetworkLocation("whatever-rack-info1")); + } +}