Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ private RpcConstants() {


public static final int INVALID_RETRY_COUNT = -1;

/**
* The Rpc-connection header is as follows
* +----------------------------------+
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -937,6 +937,9 @@ public static class Call implements Schedulable,
// the priority level assigned by scheduler, 0 by default
private long clientStateId;
private boolean isCallCoordinated;
// Serialized RouterFederatedStateProto message to
// store last seen states for multiple namespaces.
private ByteString federatedNamespaceState;

Call() {
this(RpcConstants.INVALID_CALL_ID, RpcConstants.INVALID_RETRY_COUNT,
Expand Down Expand Up @@ -994,6 +997,14 @@ public ProcessingDetails getProcessingDetails() {
return processingDetails;
}

public void setFederatedNamespaceState(ByteString federatedNamespaceState) {
this.federatedNamespaceState = federatedNamespaceState;
}

public ByteString getFederatedNamespaceState() {
return this.federatedNamespaceState;
}

@Override
public String toString() {
return "Call#" + callId + " Retry#" + retryCount;
Expand Down Expand Up @@ -2868,6 +2879,9 @@ private void processRpcRequest(RpcRequestHeaderProto header,
stateId = alignmentContext.receiveRequestState(
header, getMaxIdleTime());
call.setClientStateId(stateId);
if (header.hasRouterFederatedState()) {
call.setFederatedNamespaceState(header.getRouterFederatedState());
}
}
} catch (IOException ioe) {
throw new RpcServerException("Processing RPC request caught ", ioe);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@

package org.apache.hadoop.hdfs;

import java.util.concurrent.atomic.LongAccumulator;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.ipc.AlignmentContext;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;

import java.io.IOException;
import java.util.concurrent.atomic.LongAccumulator;
import org.apache.hadoop.thirdparty.protobuf.ByteString;

/**
* Global State Id context for the client.
Expand All @@ -37,8 +38,17 @@
@InterfaceStability.Evolving
public class ClientGSIContext implements AlignmentContext {

private final LongAccumulator lastSeenStateId =
new LongAccumulator(Math::max, Long.MIN_VALUE);
private final LongAccumulator lastSeenStateId;
private ByteString routerFederatedState;

public ClientGSIContext() {
this(new LongAccumulator(Math::max, Long.MIN_VALUE));
}

public ClientGSIContext(LongAccumulator lastSeenStateId) {
this.lastSeenStateId = lastSeenStateId;
routerFederatedState = null;
}

@Override
public long getLastSeenStateId() {
Expand All @@ -65,16 +75,25 @@ public void updateResponseState(RpcResponseHeaderProto.Builder header) {
* in responses.
*/
@Override
public void receiveResponseState(RpcResponseHeaderProto header) {
lastSeenStateId.accumulate(header.getStateId());
public synchronized void receiveResponseState(RpcResponseHeaderProto header) {
if (header.hasRouterFederatedState()) {
routerFederatedState = header.getRouterFederatedState();
} else {
lastSeenStateId.accumulate(header.getStateId());
}
}

/**
* Client side implementation for providing state alignment info in requests.
*/
@Override
public void updateRequestState(RpcRequestHeaderProto.Builder header) {
header.setStateId(lastSeenStateId.longValue());
public synchronized void updateRequestState(RpcRequestHeaderProto.Builder header) {
if (lastSeenStateId.get() != Long.MIN_VALUE) {
header.setStateId(lastSeenStateId.get());
}
if (routerFederatedState != null) {
header.setRouterFederatedState(routerFederatedState);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,9 @@ public static ClientProtocol createProxyWithAlignmentContext(
boolean withRetries, AtomicBoolean fallbackToSimpleAuth,
AlignmentContext alignmentContext)
throws IOException {
if (alignmentContext == null) {
alignmentContext = new ClientGSIContext();
}
RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class,
ProtobufRpcEngine2.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,14 @@ public class ConnectionManager {

/** Queue for creating new connections. */
private final BlockingQueue<ConnectionPool> creatorQueue;
/**
* Global federated namespace context for router.
*/
private final RouterStateIdContext routerStateIdContext;
/**
* Maps from connection pool ID to namespace.
*/
private final Map<ConnectionPoolId, String> connectionPoolToNamespaceMap;
/** Max size of queue for creating new connections. */
private final int creatorQueueMaxSize;

Expand All @@ -85,15 +93,19 @@ public class ConnectionManager {
/** If the connection manager is running. */
private boolean running = false;

public ConnectionManager(Configuration config) {
this(config, new RouterStateIdContext(config));
}

/**
* Creates a proxy client connection pool manager.
*
* @param config Configuration for the connections.
*/
public ConnectionManager(Configuration config) {
public ConnectionManager(Configuration config, RouterStateIdContext routerStateIdContext) {
this.conf = config;

this.routerStateIdContext = routerStateIdContext;
this.connectionPoolToNamespaceMap = new HashMap<>();
// Configure minimum, maximum and active connection pools
this.maxSize = this.conf.getInt(
RBFConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_POOL_SIZE,
Expand Down Expand Up @@ -160,6 +172,10 @@ public void close() {
pool.close();
}
this.pools.clear();
for (String nsID: connectionPoolToNamespaceMap.values()) {
routerStateIdContext.removeNamespaceStateId(nsID);
}
connectionPoolToNamespaceMap.clear();
} finally {
writeLock.unlock();
}
Expand All @@ -172,12 +188,12 @@ public void close() {
* @param ugi User group information.
* @param nnAddress Namenode address for the connection.
* @param protocol Protocol for the connection.
* @param nsId Nameservice identity.
* @return Proxy client to connect to nnId as UGI.
* @throws IOException If the connection cannot be obtained.
*/
public ConnectionContext getConnection(UserGroupInformation ugi,
String nnAddress, Class<?> protocol) throws IOException {

String nnAddress, Class<?> protocol, String nsId) throws IOException {
// Check if the manager is shutdown
if (!this.running) {
LOG.error(
Expand Down Expand Up @@ -205,9 +221,13 @@ public ConnectionContext getConnection(UserGroupInformation ugi,
if (pool == null) {
pool = new ConnectionPool(
this.conf, nnAddress, ugi, this.minSize, this.maxSize,
this.minActiveRatio, protocol);
this.minActiveRatio, protocol,
new PoolAlignmentContext(this.routerStateIdContext, nsId));
this.pools.put(connectionId, pool);
this.connectionPoolToNamespaceMap.put(connectionId, nsId);
}
long clientStateId = RouterStateIdContext.getClientStateIdFromCurrentCall(nsId);
pool.getPoolAlignmentContext().advanceClientStateId(clientStateId);
} finally {
writeLock.unlock();
}
Expand Down Expand Up @@ -430,6 +450,11 @@ public void run() {
try {
for (ConnectionPoolId poolId : toRemove) {
pools.remove(poolId);
String nsID = connectionPoolToNamespaceMap.get(poolId);
connectionPoolToNamespaceMap.remove(poolId);
if (!connectionPoolToNamespaceMap.values().contains(nsID)) {
routerStateIdContext.removeNamespaceStateId(nsID);
}
}
} finally {
writeLock.unlock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import javax.net.SocketFactory;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.ipc.AlignmentContext;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -108,6 +109,8 @@ public class ConnectionPool {

/** Enable using multiple physical socket or not. **/
private final boolean enableMultiSocket;
/** StateID alignment context. */
private final PoolAlignmentContext alignmentContext;

/** Map for the protocols and their protobuf implementations. */
private final static Map<Class<?>, ProtoImpl> PROTO_MAP = new HashMap<>();
Expand Down Expand Up @@ -138,7 +141,8 @@ private static class ProtoImpl {

protected ConnectionPool(Configuration config, String address,
UserGroupInformation user, int minPoolSize, int maxPoolSize,
float minActiveRatio, Class<?> proto) throws IOException {
float minActiveRatio, Class<?> proto, PoolAlignmentContext alignmentContext)
throws IOException {

this.conf = config;

Expand All @@ -157,6 +161,8 @@ protected ConnectionPool(Configuration config, String address,
RBFConfigKeys.DFS_ROUTER_NAMENODE_ENABLE_MULTIPLE_SOCKET_KEY,
RBFConfigKeys.DFS_ROUTER_NAMENODE_ENABLE_MULTIPLE_SOCKET_DEFAULT);

this.alignmentContext = alignmentContext;

// Add minimum connections to the pool
for (int i = 0; i < this.minSize; i++) {
ConnectionContext newConnection = newConnection();
Expand Down Expand Up @@ -211,6 +217,14 @@ public AtomicInteger getClientIndex() {
return this.clientIndex;
}

/**
* Get the alignment context for this pool
* @return Alignment context
*/
public PoolAlignmentContext getPoolAlignmentContext() {
return this.alignmentContext;
}

/**
* Return the next connection round-robin.
*
Expand Down Expand Up @@ -398,7 +412,7 @@ public String getJSON() {
public ConnectionContext newConnection() throws IOException {
return newConnection(this.conf, this.namenodeAddress,
this.ugi, this.protocol, this.enableMultiSocket,
this.socketIndex.incrementAndGet());
this.socketIndex.incrementAndGet(), alignmentContext);
}

/**
Expand All @@ -413,13 +427,15 @@ public ConnectionContext newConnection() throws IOException {
* @param ugi User context.
* @param proto Interface of the protocol.
* @param enableMultiSocket Enable multiple socket or not.
* @param alignmentContext client alignment context.
* @return proto for the target ClientProtocol that contains the user's
* security context.
* @throws IOException If it cannot be created.
*/
protected static <T> ConnectionContext newConnection(Configuration conf,
String nnAddress, UserGroupInformation ugi, Class<T> proto,
boolean enableMultiSocket, int socketIndex) throws IOException {
boolean enableMultiSocket, int socketIndex,
AlignmentContext alignmentContext) throws IOException {
if (!PROTO_MAP.containsKey(proto)) {
String msg = "Unsupported protocol for connection to NameNode: "
+ ((proto != null) ? proto.getName() : "null");
Expand Down Expand Up @@ -448,10 +464,11 @@ protected static <T> ConnectionContext newConnection(Configuration conf,
socket, classes.protoPb, ugi, RPC.getRpcTimeout(conf),
defaultPolicy, conf, socketIndex);
proxy = RPC.getProtocolProxy(classes.protoPb, version, connectionId,
conf, factory).getProxy();
conf, factory, alignmentContext).getProxy();
} else {
proxy = RPC.getProtocolProxy(classes.protoPb, version, socket, ugi,
conf, factory, RPC.getRpcTimeout(conf), defaultPolicy, null).getProxy();
conf, factory, RPC.getRpcTimeout(conf), defaultPolicy, null,
alignmentContext).getProxy();
}

T client = newProtoClient(proto, classes, proxy);
Expand Down
Loading