Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,12 @@ public static ClientProtocol createProxyWithAlignmentContext(
boolean withRetries, AtomicBoolean fallbackToSimpleAuth,
AlignmentContext alignmentContext)
throws IOException {
if (alignmentContext == null &&
conf.getBoolean(HdfsClientConfigKeys.DFS_RBF_OBSERVER_READ_ENABLE,
HdfsClientConfigKeys.DFS_RBF_OBSERVER_READ_ENABLE_DEFAULT)) {
alignmentContext = new ClientGSIContext();
}

RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class,
ProtobufRpcEngine2.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ public interface HdfsClientConfigKeys {
int DFS_NAMENODE_HTTPS_PORT_DEFAULT = 9871;
String DFS_NAMENODE_HTTPS_ADDRESS_KEY = "dfs.namenode.https-address";
String DFS_HA_NAMENODES_KEY_PREFIX = "dfs.ha.namenodes";
String DFS_RBF_OBSERVER_READ_ENABLE = "dfs.client.rbf.observer.read.enable";
boolean DFS_RBF_OBSERVER_READ_ENABLE_DEFAULT = false;
int DFS_NAMENODE_RPC_PORT_DEFAULT = 8020;
String DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY =
"dfs.namenode.kerberos.principal";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,12 @@ public FileSystem getFileSystem() throws IOException {
return DistributedFileSystem.get(conf);
}

public FileSystem getFileSystemWithObserverReadsEnabled() throws IOException {
public FileSystem getFileSystem(Configuration configuration) throws IOException {
configuration.addResource(conf);
return DistributedFileSystem.get(configuration);
}

public FileSystem getFileSystemWithObserverReadProxyProvider() throws IOException {
Configuration observerReadConf = new Configuration(conf);
observerReadConf.set(DFS_NAMESERVICES,
observerReadConf.get(DFS_NAMESERVICES)+ ",router-service");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
Expand Down Expand Up @@ -122,11 +123,17 @@ public void startUpCluster(int numberOfObserver, Configuration confOverrides) th

cluster.waitActiveNamespaces();
routerContext = cluster.getRandomRouter();
fileSystem = routerContext.getFileSystemWithObserverReadsEnabled();
}

private static Configuration getConfToEnableObserverReads() {
Configuration conf = new Configuration();
conf.setBoolean(HdfsClientConfigKeys.DFS_RBF_OBSERVER_READ_ENABLE, true);
return conf;
}

@Test
public void testObserverRead() throws Exception {
fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads());
internalTestObserverRead();
}

Expand All @@ -137,7 +144,6 @@ public void testObserverRead() throws Exception {
*/
@Test
public void testReadWithoutObserverClientConfigurations() throws Exception {
fileSystem.close();
fileSystem = routerContext.getFileSystem();
assertThrows(AssertionError.class, this::internalTestObserverRead);
}
Expand Down Expand Up @@ -173,6 +179,7 @@ public void testObserverReadWithoutFederatedStatePropagation() throws Exception
Configuration confOverrides = new Configuration(false);
confOverrides.setInt(RBFConfigKeys.DFS_ROUTER_OBSERVER_FEDERATED_STATE_PROPAGATION_MAXSIZE, 0);
startUpCluster(2, confOverrides);
fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads());
List<? extends FederationNamenodeContext> namenodes = routerContext
.getRouter().getNamenodeResolver()
.getNamenodesForNameserviceId(cluster.getNameservices().get(0), true);
Expand Down Expand Up @@ -202,6 +209,7 @@ public void testDisablingObserverReadUsingNameserviceOverride() throws Exception
Configuration confOverrides = new Configuration(false);
confOverrides.set(RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_OVERRIDES, "ns0");
startUpCluster(2, confOverrides);
fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads());

Path path = new Path("/testFile");
fileSystem.create(path).close();
Expand All @@ -219,6 +227,7 @@ public void testDisablingObserverReadUsingNameserviceOverride() throws Exception

@Test
public void testReadWhenObserverIsDown() throws Exception {
fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads());
Path path = new Path("/testFile1");
// Send Create call to active
fileSystem.create(path).close();
Expand Down Expand Up @@ -246,6 +255,7 @@ public void testReadWhenObserverIsDown() throws Exception {

@Test
public void testMultipleObserver() throws Exception {
fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads());
Path path = new Path("/testFile1");
// Send Create call to active
fileSystem.create(path).close();
Expand Down Expand Up @@ -384,6 +394,7 @@ public void testMultipleObserverRouter() throws Exception {

@Test
public void testUnavailableObserverNN() throws Exception {
fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads());
stopObserver(2);

Path path = new Path("/testFile");
Expand Down Expand Up @@ -417,10 +428,9 @@ public void testUnavailableObserverNN() throws Exception {
assertTrue("There must be unavailable namenodes", hasUnavailable);
}



@Test
public void testRouterMsync() throws Exception {
fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads());
Path path = new Path("/testFile");

// Send Create call to active
Expand All @@ -439,4 +449,60 @@ public void testRouterMsync() throws Exception {
assertEquals("Four calls should be sent to active", 4,
rpcCountForActive);
}

@Test
public void testSingleRead() throws Exception {
fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads());
List<? extends FederationNamenodeContext> namenodes = routerContext
.getRouter().getNamenodeResolver()
.getNamenodesForNameserviceId(cluster.getNameservices().get(0), true);
assertEquals("First namenode should be observer", namenodes.get(0).getState(),
FederationNamenodeServiceState.OBSERVER);
Path path = new Path("/");

long rpcCountForActive;
long rpcCountForObserver;

// Send read request
fileSystem.listFiles(path, false);
fileSystem.close();

rpcCountForActive = routerContext.getRouter().getRpcServer()
.getRPCMetrics().getActiveProxyOps();
// getListingCall sent to active.
assertEquals("Only one call should be sent to active", 1, rpcCountForActive);

rpcCountForObserver = routerContext.getRouter().getRpcServer()
.getRPCMetrics().getObserverProxyOps();
// getList call should be sent to observer
assertEquals("No calls should be sent to observer", 0, rpcCountForObserver);
}

@Test
public void testSingleReadUsingObserverReadProxyProvider() throws Exception {
fileSystem = routerContext.getFileSystemWithObserverReadProxyProvider();
List<? extends FederationNamenodeContext> namenodes = routerContext
.getRouter().getNamenodeResolver()
.getNamenodesForNameserviceId(cluster.getNameservices().get(0), true);
assertEquals("First namenode should be observer", namenodes.get(0).getState(),
FederationNamenodeServiceState.OBSERVER);
Path path = new Path("/");

long rpcCountForActive;
long rpcCountForObserver;

// Send read request
fileSystem.listFiles(path, false);
fileSystem.close();

rpcCountForActive = routerContext.getRouter().getRpcServer()
.getRPCMetrics().getActiveProxyOps();
// Two msync calls to the active namenodes.
assertEquals("Two calls should be sent to active", 2, rpcCountForActive);

rpcCountForObserver = routerContext.getRouter().getRpcServer()
.getRPCMetrics().getObserverProxyOps();
// getList call should be sent to observer
assertEquals("One call should be sent to observer", 1, rpcCountForObserver);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6442,4 +6442,11 @@
If the namespace is DEFAULT, it's best to change this conf to other value.
</description>
</property>
<property>
<name>dfs.client.rbf.observer.read.enable</name>
<value>false</value>
<description>
Enables observer reads for clients. This should only be enabled when clients are using routers.
</description>
</property>
</configuration>