Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge exceptions, remove redundant interface implementations #4730

Merged
merged 10 commits into from
Apr 21, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
/**
* InjvmProtocol
*/
public class InjvmProtocol extends AbstractProtocol implements Protocol {
public class InjvmProtocol extends AbstractProtocol implements Protocol{

public static final String NAME = LOCAL_PROTOCOL;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,64 +83,12 @@ protected <T> T doRefer(Class<T> type, URL url) throws RpcException {

private <T> Runnable exportThreadedSelectorServer(T impl, Class<T> type, URL url) throws RpcException {

TThreadedSelectorServer.Args tArgs = null;
String typeName = type.getName();

if (typeName.endsWith(THRIFT_IFACE)) {
String processorClsName = typeName.substring(0, typeName.indexOf(THRIFT_IFACE)) + THRIFT_PROCESSOR;
try {
Class<?> clazz = Class.forName(processorClsName);
Constructor constructor = clazz.getConstructor(type);
try {
TProcessor tprocessor = (TProcessor) constructor.newInstance(impl);
processor.registerProcessor(typeName, tprocessor);

if (SERVER_MAP.get(url.getAddress()) == null) {

/**Solve the problem of only 50 of the default number of concurrent connections*/
TNonblockingServerSocket.NonblockingAbstractServerSocketArgs args = new TNonblockingServerSocket.NonblockingAbstractServerSocketArgs();
/**1000 connections*/
args.backlog(1000);

String bindIp = url.getParameter(Constants.BIND_IP_KEY, url.getHost());
if (url.getParameter(ANYHOST_KEY, false)) {
bindIp = ANYHOST_VALUE;
}
int bindPort = url.getParameter(Constants.BIND_PORT_KEY, url.getPort());
args.bindAddr(new InetSocketAddress(bindIp, bindPort));

/**timeout: 10s */
args.clientTimeout(10000);

TNonblockingServerSocket transport = new TNonblockingServerSocket(args);

tArgs = new TThreadedSelectorServer.Args(transport);
tArgs.workerThreads(200);
tArgs.selectorThreads(4);
tArgs.acceptQueueSizePerThread(256);
tArgs.processor(processor);
tArgs.transportFactory(new TFramedTransport.Factory());
tArgs.protocolFactory(new TCompactProtocol.Factory());
} else {
return null; // if server is starting, return and do nothing here
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new RpcException("Fail to create nativethrift server(" + url + ") : " + e.getMessage(), e);
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new RpcException("Fail to create nativethrift server(" + url + ") : " + e.getMessage(), e);
}
}

if (tArgs == null) {
logger.error("Fail to create nativethrift server(" + url + ") due to null args");
throw new RpcException("Fail to create nativethrift server(" + url + ") due to null args");
final TServer thriftServer = getTServer(impl, type, url);
if (thriftServer == null) {
return null;
}
final TServer thriftServer = new TThreadedSelectorServer(tArgs);
SERVER_MAP.put(url.getAddress(), thriftServer);

new Thread(() -> {
logger.info("Start Thrift ThreadedSelectorServer");
thriftServer.serve();
Expand Down Expand Up @@ -186,4 +134,64 @@ private <T> T doReferFrameAndCompact(Class<T> type, URL url) throws RpcException
}
}

private <T> TServer getTServer(T impl, Class<T> type, URL url) {

TThreadedSelectorServer.Args tArgs = null;
String typeName = type.getName();

TServer tserver;
if (typeName.endsWith(THRIFT_IFACE)) {
String processorClsName = typeName.substring(0, typeName.indexOf(THRIFT_IFACE)) + THRIFT_PROCESSOR;
try {
Class<?> clazz = Class.forName(processorClsName);
Constructor constructor = clazz.getConstructor(type);

TProcessor tprocessor = (TProcessor) constructor.newInstance(impl);
processor.registerProcessor(typeName, tprocessor);

tserver = SERVER_MAP.get(url.getAddress());
if (tserver == null) {

/**Solve the problem of only 50 of the default number of concurrent connections*/
TNonblockingServerSocket.NonblockingAbstractServerSocketArgs args = new TNonblockingServerSocket.NonblockingAbstractServerSocketArgs();
/**1000 connections*/
args.backlog(1000);
String bindIp = url.getParameter(Constants.BIND_IP_KEY, url.getHost());
if (url.getParameter(ANYHOST_KEY, false)) {
bindIp = ANYHOST_VALUE;
}
int bindPort = url.getParameter(Constants.BIND_PORT_KEY, url.getPort());
args.bindAddr(new InetSocketAddress(bindIp, bindPort));
/**timeout: 10s */
args.clientTimeout(10000);

TNonblockingServerSocket transport = new TNonblockingServerSocket(args);

tArgs = new TThreadedSelectorServer.Args(transport);
tArgs.workerThreads(200);
tArgs.selectorThreads(4);
tArgs.acceptQueueSizePerThread(256);
tArgs.processor(processor);
tArgs.transportFactory(new TFramedTransport.Factory());
tArgs.protocolFactory(new TCompactProtocol.Factory());
} else {
// if server is starting, return and do nothing here
return null;
}

} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new RpcException("Fail to create nativethrift server(" + url + ") : " + e.getMessage(), e);
}
}

if (tArgs == null) {
logger.error("Fail to create native thrift server(" + url + ") due to null args");
throw new RpcException("Fail to create nativethrift server(" + url + ") due to null args");
}
tserver = new TThreadedSelectorServer(tArgs);
return tserver;

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
/**
* RmiProtocol.
*/
public class RmiProtocol extends AbstractProxyProtocol {
public class RmiProtocol extends AbstractProxyProtocol{

public static final int DEFAULT_PORT = 1099;

Expand Down