Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,9 @@ public static AsyncGet<Message, Exception> getAsyncReturnMessage() {
@Override
@SuppressWarnings("unchecked")
public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
ConnectionId connId, Configuration conf, SocketFactory factory)
throws IOException {
final Invoker invoker = new Invoker(protocol, connId, conf, factory);
ConnectionId connId, Configuration conf, SocketFactory factory,
AlignmentContext alignmentContext) throws IOException {
final Invoker invoker = new Invoker(protocol, connId, conf, factory, alignmentContext);
return new ProtocolProxy<T>(protocol, (T) Proxy.newProxyInstance(
protocol.getClassLoader(), new Class[] {protocol}, invoker), false);
}
Expand Down Expand Up @@ -126,7 +126,7 @@ public ProtocolProxy<ProtocolMetaInfoPB> getProtocolMetaInfoProxy(
return new ProtocolProxy<ProtocolMetaInfoPB>(protocol,
(ProtocolMetaInfoPB) Proxy.newProxyInstance(protocol.getClassLoader(),
new Class[] { protocol }, new Invoker(protocol, connId, conf,
factory)), false);
factory, null)), false);
}

protected static class Invoker implements RpcInvocationHandler {
Expand All @@ -147,9 +147,8 @@ protected Invoker(Class<?> protocol, InetSocketAddress addr,
throws IOException {
this(protocol, Client.ConnectionId.getConnectionId(
addr, protocol, ticket, rpcTimeout, connectionRetryPolicy, conf),
conf, factory);
conf, factory, alignmentContext);
this.fallbackToSimpleAuth = fallbackToSimpleAuth;
this.alignmentContext = alignmentContext;
}

/**
Expand All @@ -158,14 +157,16 @@ protected Invoker(Class<?> protocol, InetSocketAddress addr,
* @param connId input connId.
* @param conf input Configuration.
* @param factory input factory.
* @param alignmentContext Alignment context
*/
protected Invoker(Class<?> protocol, Client.ConnectionId connId,
Configuration conf, SocketFactory factory) {
Configuration conf, SocketFactory factory, AlignmentContext alignmentContext) {
this.remoteId = connId;
this.client = CLIENTS.getClient(conf, factory, RpcWritable.Buffer.class);
this.protocolName = RPC.getProtocolName(protocol);
this.clientProtocolVersion = RPC
.getProtocolVersion(protocol);
this.alignmentContext = alignmentContext;
}

private RequestHeaderProto constructRpcRequestHeader(Method method) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,9 @@ public <T> ProtocolProxy<T> getProxy(
@Override
@SuppressWarnings("unchecked")
public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
ConnectionId connId, Configuration conf, SocketFactory factory)
throws IOException {
final Invoker invoker = new Invoker(protocol, connId, conf, factory);
ConnectionId connId, Configuration conf, SocketFactory factory,
AlignmentContext alignmentContext) throws IOException {
final Invoker invoker = new Invoker(protocol, connId, conf, factory, alignmentContext);
return new ProtocolProxy<T>(protocol, (T) Proxy.newProxyInstance(
protocol.getClassLoader(), new Class[] {protocol}, invoker), false);
}
Expand Down Expand Up @@ -133,7 +133,7 @@ public ProtocolProxy<ProtocolMetaInfoPB> getProtocolMetaInfoProxy(
return new ProtocolProxy<ProtocolMetaInfoPB>(protocol,
(ProtocolMetaInfoPB) Proxy.newProxyInstance(protocol.getClassLoader(),
new Class[]{protocol}, new Invoker(protocol, connId, conf,
factory)), false);
factory, null)), false);
}

protected static class Invoker implements RpcInvocationHandler {
Expand All @@ -154,9 +154,8 @@ protected Invoker(Class<?> protocol, InetSocketAddress addr,
throws IOException {
this(protocol, Client.ConnectionId.getConnectionId(
addr, protocol, ticket, rpcTimeout, connectionRetryPolicy, conf),
conf, factory);
conf, factory, alignmentContext);
this.fallbackToSimpleAuth = fallbackToSimpleAuth;
this.alignmentContext = alignmentContext;
}

/**
Expand All @@ -166,14 +165,16 @@ protected Invoker(Class<?> protocol, InetSocketAddress addr,
* @param connId input connId.
* @param conf input Configuration.
* @param factory input factory.
* @param alignmentContext Alignment context
*/
protected Invoker(Class<?> protocol, Client.ConnectionId connId,
Configuration conf, SocketFactory factory) {
Configuration conf, SocketFactory factory, AlignmentContext alignmentContext) {
this.remoteId = connId;
this.client = CLIENTS.getClient(conf, factory, RpcWritable.Buffer.class);
this.protocolName = RPC.getProtocolName(protocol);
this.clientProtocolVersion = RPC
.getProtocolVersion(protocol);
this.alignmentContext = alignmentContext;
}

private RequestHeaderProto constructRpcRequestHeader(Method method) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -558,11 +558,32 @@ public static <T> ProtocolProxy<T> getProtocolProxy(Class<T> protocol,
public static <T> ProtocolProxy<T> getProtocolProxy(Class<T> protocol,
long clientVersion, ConnectionId connId, Configuration conf,
SocketFactory factory) throws IOException {
return getProtocolProxy(protocol, clientVersion, connId, conf,
factory, null);
}

/**
* Get a protocol proxy that contains a proxy connection to a remote server
* and a set of methods that are supported by the server.
*
* @param <T> Generics Type T
* @param protocol protocol class
* @param clientVersion client's version
* @param connId client connection identifier
* @param conf configuration
* @param factory socket factory
* @param alignmentContext StateID alignment context
* @return the protocol proxy
* @throws IOException if the far end through a RemoteException
*/
public static <T> ProtocolProxy<T> getProtocolProxy(Class<T> protocol,
long clientVersion, ConnectionId connId, Configuration conf,
SocketFactory factory, AlignmentContext alignmentContext) throws IOException {
if (UserGroupInformation.isSecurityEnabled()) {
SaslRpcServer.init(conf);
}
return getProtocolEngine(protocol, conf).getProxy(
protocol, clientVersion, connId, conf, factory);
protocol, clientVersion, connId, conf, factory, alignmentContext);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,13 @@ <T> ProtocolProxy<T> getProxy(Class<T> protocol,
* @param connId input ConnectionId.
* @param conf input Configuration.
* @param factory input factory.
* @param alignmentContext Alignment context
* @throws IOException raised on errors performing I/O.
* @return ProtocolProxy.
*/
<T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
Client.ConnectionId connId, Configuration conf, SocketFactory factory)
Client.ConnectionId connId, Configuration conf, SocketFactory factory,
AlignmentContext alignmentContext)
throws IOException;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,16 +315,18 @@ public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
* @param connId input ConnectionId.
* @param conf input Configuration.
* @param factory input factory.
* @param alignmentContext Alignment context
* @throws IOException raised on errors performing I/O.
* @return ProtocolProxy.
*/
@Override
public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
Client.ConnectionId connId, Configuration conf, SocketFactory factory)
Client.ConnectionId connId, Configuration conf, SocketFactory factory,
AlignmentContext alignmentContext)
throws IOException {
return getProxy(protocol, clientVersion, connId.getAddress(),
connId.getTicket(), conf, factory, connId.getRpcTimeout(),
connId.getRetryPolicy(), null, null);
connId.getRetryPolicy(), null, alignmentContext);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,8 @@ public <T> ProtocolProxy<T> getProxy(

@Override
public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
ConnectionId connId, Configuration conf, SocketFactory factory)
ConnectionId connId, Configuration conf, SocketFactory factory,
AlignmentContext alignmentContext)
throws IOException {
throw new UnsupportedOperationException("This proxy is not supported");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,8 @@ protected static TestRpcService getClient(ConnectionId connId,
0,
connId,
clientConf,
NetUtils.getDefaultSocketFactory(clientConf)).getProxy();
NetUtils.getDefaultSocketFactory(clientConf),
null).getProxy();
} catch (IOException e) {
throw new ServiceException(e);
}
Expand Down