Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize the code: fix CallbackServiceCodec.java exportOrunexportCallbackService method issue. #3199

Merged
merged 6 commits into from
Jan 15, 2019
Original file line number Diff line number Diff line change
Expand Up @@ -77,49 +77,49 @@ private static byte isCallBack(URL url, String methodName, int argIndex) {
* @throws IOException
*/
@SuppressWarnings({"unchecked", "rawtypes"})
private static String exportOrunexportCallbackService(Channel channel, URL url, Class clazz, Object inst, Boolean export) throws IOException {
private static String exportOrUnexportCallbackService(Channel channel, URL url, Class clazz, Object inst, Boolean export) throws IOException {
int instid = System.identityHashCode(inst);

Map<String, String> params = new HashMap<String, String>(3);
Map<String, String> params = new HashMap<>(3);
// no need to new client again
params.put(Constants.IS_SERVER_KEY, Boolean.FALSE.toString());
// mark it's a callback, for troubleshooting
params.put(Constants.IS_CALLBACK_SERVICE, Boolean.TRUE.toString());
String group = url.getParameter(Constants.GROUP_KEY);
String group = (url == null ? null : url.getParameter(Constants.GROUP_KEY));
ralf0131 marked this conversation as resolved.
Show resolved Hide resolved
if (group != null && group.length() > 0) {
params.put(Constants.GROUP_KEY, group);
}
// add method, for verifying against method, automatic fallback (see dubbo protocol)
params.put(Constants.METHODS_KEY, StringUtils.join(Wrapper.getWrapper(clazz).getDeclaredMethodNames(), ","));

Map<String, String> tmpmap = new HashMap<String, String>(url.getParameters());
tmpmap.putAll(params);
tmpmap.remove(Constants.VERSION_KEY);// doesn't need to distinguish version for callback
tmpmap.put(Constants.INTERFACE_KEY, clazz.getName());
URL exporturl = new URL(DubboProtocol.NAME, channel.getLocalAddress().getAddress().getHostAddress(), channel.getLocalAddress().getPort(), clazz.getName() + "." + instid, tmpmap);
Map<String, String> tmpMap = new HashMap<>(url.getParameters());
tmpMap.putAll(params);
tmpMap.remove(Constants.VERSION_KEY);// doesn't need to distinguish version for callback
tmpMap.put(Constants.INTERFACE_KEY, clazz.getName());
URL exportUrl = new URL(DubboProtocol.NAME, channel.getLocalAddress().getAddress().getHostAddress(), channel.getLocalAddress().getPort(), clazz.getName() + "." + instid, tmpMap);

// no need to generate multiple exporters for different channel in the same JVM, cache key cannot collide.
String cacheKey = getClientSideCallbackServiceCacheKey(instid);
String countkey = getClientSideCountKey(clazz.getName());
String countKey = getClientSideCountKey(clazz.getName());
if (export) {
// one channel can have multiple callback instances, no need to re-export for different instance.
if (!channel.hasAttribute(cacheKey)) {
if (!isInstancesOverLimit(channel, url, clazz.getName(), instid, false)) {
Invoker<?> invoker = proxyFactory.getInvoker(inst, clazz, exporturl);
Invoker<?> invoker = proxyFactory.getInvoker(inst, clazz, exportUrl);
// should destroy resource?
Exporter<?> exporter = protocol.export(invoker);
// this is used for tracing if instid has published service or not.
channel.setAttribute(cacheKey, exporter);
logger.info("export a callback service :" + exporturl + ", on " + channel + ", url is: " + url);
increaseInstanceCount(channel, countkey);
logger.info("Export a callback service :" + exportUrl + ", on " + channel + ", url is: " + url);
increaseInstanceCount(channel, countKey);
}
}
} else {
if (channel.hasAttribute(cacheKey)) {
Exporter<?> exporter = (Exporter<?>) channel.getAttribute(cacheKey);
exporter.unexport();
channel.removeAttribute(cacheKey);
decreaseInstanceCount(channel, countkey);
decreaseInstanceCount(channel, countKey);
}
}
return String.valueOf(instid);
Expand Down Expand Up @@ -245,17 +245,17 @@ private static void decreaseInstanceCount(Channel channel, String countkey) {
public static Object encodeInvocationArgument(Channel channel, RpcInvocation inv, int paraIndex) throws IOException {
// get URL directly
URL url = inv.getInvoker() == null ? null : inv.getInvoker().getUrl();
byte callbackstatus = isCallBack(url, inv.getMethodName(), paraIndex);
byte callbackStatus = isCallBack(url, inv.getMethodName(), paraIndex);
Object[] args = inv.getArguments();
Class<?>[] pts = inv.getParameterTypes();
switch (callbackstatus) {
switch (callbackStatus) {
case CallbackServiceCodec.CALLBACK_NONE:
return args[paraIndex];
case CallbackServiceCodec.CALLBACK_CREATE:
inv.setAttachment(INV_ATT_CALLBACK_KEY + paraIndex, exportOrunexportCallbackService(channel, url, pts[paraIndex], args[paraIndex], true));
inv.setAttachment(INV_ATT_CALLBACK_KEY + paraIndex, exportOrUnexportCallbackService(channel, url, pts[paraIndex], args[paraIndex], true));
return null;
case CallbackServiceCodec.CALLBACK_DESTROY:
inv.setAttachment(INV_ATT_CALLBACK_KEY + paraIndex, exportOrunexportCallbackService(channel, url, pts[paraIndex], args[paraIndex], false));
inv.setAttachment(INV_ATT_CALLBACK_KEY + paraIndex, exportOrUnexportCallbackService(channel, url, pts[paraIndex], args[paraIndex], false));
return null;
default:
return args[paraIndex];
Expand Down