Skip to content

Commit

Permalink
Cheery pick dubbo 2.7.3 (#242)
Browse files Browse the repository at this point in the history
  • Loading branch information
glmapper authored and ujjboy committed Aug 1, 2019
1 parent cea1a07 commit 52fe696
Show file tree
Hide file tree
Showing 7 changed files with 136 additions and 62 deletions.
2 changes: 1 addition & 1 deletion sofa-tracer-plugins/sofa-tracer-dubbo-plugin/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo</artifactId>
<version>2.7.0</version>
<version>2.7.3</version>
<scope>provided</scope>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,19 @@
import com.alipay.common.tracer.core.span.LogData;
import com.alipay.common.tracer.core.span.SofaTracerSpan;
import com.alipay.common.tracer.core.utils.StringUtils;
import com.alipay.sofa.tracer.plugins.dubbo.constants.AttachmentKeyConstants;
import com.alipay.sofa.tracer.plugins.dubbo.tracer.DubboConsumerSofaTracer;
import com.alipay.sofa.tracer.plugins.dubbo.tracer.DubboProviderSofaTracer;
import io.opentracing.tag.Tags;
import org.apache.dubbo.common.Constants;
import org.apache.dubbo.common.constants.CommonConstants;
import org.apache.dubbo.common.extension.Activate;
import org.apache.dubbo.remoting.TimeoutException;
import org.apache.dubbo.rpc.*;
import org.apache.dubbo.rpc.Filter;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.support.RpcUtils;
import java.util.HashMap;
import java.util.Map;
Expand All @@ -44,7 +50,7 @@
* @author: guolei.sgl ([email protected]) 2019/2/26 2:02 PM
* @since: 2.3.4
**/
@Activate(group = { Constants.PROVIDER, Constants.CONSUMER }, value = "dubboSofaTracerFilter", order = 1)
@Activate(group = { CommonConstants.PROVIDER, CommonConstants.CONSUMER }, value = "dubboSofaTracerFilter", order = 1)
public class DubboSofaTracerFilter implements Filter {

private String appName = StringUtils.EMPTY_STRING;
Expand Down Expand Up @@ -76,7 +82,7 @@ public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcExcept
String spanKind = spanKind(rpcContext);
Result result;
if (spanKind.equals(Tags.SPAN_KIND_SERVER)) {
result = doServerFilter(rpcContext, invoker, invocation);
result = doServerFilter(invoker, invocation);
} else {
result = doClientFilter(rpcContext, invoker, invocation);
}
Expand All @@ -87,7 +93,7 @@ public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcExcept
public Result onResponse(Result result, Invoker<?> invoker, Invocation invocation) {
String spanKey = getTracerSpanMapKey(invoker);
try {
// 只有异步才进行回调打印
// only the asynchronous callback to print
boolean isAsync = RpcUtils.isAsync(invoker.getUrl(), invocation);
if (!isAsync) {
return result;
Expand Down Expand Up @@ -223,12 +229,11 @@ private Result doClientFilter(RpcContext rpcContext, Invoker<?> invoker, Invocat

/**
* rpc client handler
* @param rpcContext
* @param invoker
* @param invocation
* @return
*/
private Result doServerFilter(RpcContext rpcContext, Invoker<?> invoker, Invocation invocation) {
private Result doServerFilter(Invoker<?> invoker, Invocation invocation) {
if (dubboProviderSofaTracer == null) {
this.dubboProviderSofaTracer = DubboProviderSofaTracer
.getDubboProviderSofaTracerSingleton();
Expand Down Expand Up @@ -309,29 +314,38 @@ private void appendElapsedTimeTags(Invocation invocation, SofaTracerSpan sofaTra
if (sofaTracerSpan == null) {
return;
}
String reqSize = invocation.getAttachment(Constants.INPUT_KEY);
String respSize = result.getAttachment(Constants.OUTPUT_KEY);
String reqSize;
String respSize;
String elapsed;
String deElapsed;
if (isClient) {
elapsed = invocation.getAttachment(CommonSpanTags.CLIENT_SERIALIZE_TIME);
deElapsed = invocation.getAttachment(CommonSpanTags.CLIENT_DESERIALIZE_TIME);
//客户端请求序列化耗时
sofaTracerSpan
.setTag(CommonSpanTags.CLIENT_SERIALIZE_TIME, parseAttachment(elapsed, 0));
//客户端接受响应反序列化耗时
sofaTracerSpan.setTag(CommonSpanTags.CLIENT_DESERIALIZE_TIME,
reqSize = invocation.getAttachment(AttachmentKeyConstants.CLIENT_SERIALIZE_SIZE);
elapsed = invocation.getAttachment(AttachmentKeyConstants.CLIENT_SERIALIZE_TIME);
respSize = result.getAttachment(AttachmentKeyConstants.CLIENT_DESERIALIZE_SIZE);
deElapsed = result.getAttachment(AttachmentKeyConstants.CLIENT_DESERIALIZE_TIME);
sofaTracerSpan.setTag(AttachmentKeyConstants.CLIENT_SERIALIZE_TIME,
parseAttachment(elapsed, 0));
sofaTracerSpan.setTag(AttachmentKeyConstants.CLIENT_DESERIALIZE_TIME,
parseAttachment(deElapsed, 0));
sofaTracerSpan.setTag(AttachmentKeyConstants.CLIENT_SERIALIZE_SIZE,
parseAttachment(reqSize, 0));
sofaTracerSpan.setTag(AttachmentKeyConstants.CLIENT_DESERIALIZE_SIZE,
parseAttachment(respSize, 0));
} else {
elapsed = invocation.getAttachment(CommonSpanTags.SERVER_SERIALIZE_TIME);
deElapsed = invocation.getAttachment(CommonSpanTags.SERVER_DESERIALIZE_TIME);
sofaTracerSpan
.setTag(CommonSpanTags.SERVER_SERIALIZE_TIME, parseAttachment(elapsed, 0));
sofaTracerSpan.setTag(CommonSpanTags.SERVER_DESERIALIZE_TIME,
reqSize = invocation.getAttachment(AttachmentKeyConstants.SERVER_DESERIALIZE_SIZE);
deElapsed = invocation.getAttachment(AttachmentKeyConstants.SERVER_DESERIALIZE_TIME);
respSize = result.getAttachment(AttachmentKeyConstants.SERVER_SERIALIZE_SIZE);
elapsed = result.getAttachment(AttachmentKeyConstants.SERVER_SERIALIZE_TIME);
sofaTracerSpan.setTag(AttachmentKeyConstants.SERVER_DESERIALIZE_SIZE,
parseAttachment(reqSize, 0));
sofaTracerSpan.setTag(AttachmentKeyConstants.SERVER_DESERIALIZE_TIME,
parseAttachment(deElapsed, 0));
sofaTracerSpan.setTag(AttachmentKeyConstants.SERVER_SERIALIZE_SIZE,
parseAttachment(respSize, 0));
sofaTracerSpan.setTag(AttachmentKeyConstants.SERVER_SERIALIZE_TIME,
parseAttachment(elapsed, 0));
}
sofaTracerSpan.setTag(CommonSpanTags.REQ_SIZE, parseAttachment(reqSize, 0));
sofaTracerSpan.setTag(CommonSpanTags.RESP_SIZE, parseAttachment(respSize, 0));

}

private int parseAttachment(String value, int defaultVal) {
Expand Down Expand Up @@ -361,7 +375,7 @@ private void appendRpcServerSpanTags(Invoker<?> invoker, SofaTracerSpan sofaTrac
tagsStr.put(CommonSpanTags.SERVICE, service == null ? BLANK : service);
String methodName = rpcContext.getMethodName();
tagsStr.put(CommonSpanTags.METHOD, methodName == null ? BLANK : methodName);
String app = rpcContext.getUrl().getParameter(Constants.APPLICATION_KEY);
String app = rpcContext.getUrl().getParameter(CommonConstants.APPLICATION_KEY);
tagsStr.put(CommonSpanTags.REMOTE_HOST, rpcContext.getRemoteHost());
tagsStr.put(CommonSpanTags.LOCAL_APP, app == null ? BLANK : app);
tagsStr.put(CommonSpanTags.CURRENT_THREAD_NAME, Thread.currentThread().getName());
Expand All @@ -385,7 +399,7 @@ private void appendRpcClientSpanTags(Invoker<?> invoker, SofaTracerSpan sofaTrac
String methodName = rpcContext.getMethodName();
tagsStr.put(CommonSpanTags.METHOD, methodName == null ? BLANK : methodName);
tagsStr.put(CommonSpanTags.CURRENT_THREAD_NAME, Thread.currentThread().getName());
String app = rpcContext.getUrl().getParameter(Constants.APPLICATION_KEY);
String app = rpcContext.getUrl().getParameter(CommonConstants.APPLICATION_KEY);
tagsStr.put(CommonSpanTags.LOCAL_APP, app == null ? BLANK : app);
tagsStr.put(CommonSpanTags.REMOTE_HOST, rpcContext.getRemoteHost());
tagsStr.put(CommonSpanTags.REMOTE_PORT, String.valueOf(rpcContext.getRemotePort()));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package com.alipay.sofa.tracer.plugins.dubbo.constants;

/**
* @author: guolei.sgl ([email protected]) 2019/8/1 5:16 PM
* @since:
**/
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/**
* @author: guolei.sgl ([email protected]) 2019/7/26 5:25 PM
* @since:
**/
public class AttachmentKeyConstants {

public static final String SERVER_DESERIALIZE_SIZE = "server.deserialize.size";
public static final String SERVER_SERIALIZE_SIZE = "server.serialize.size";
public static final String CLIENT_DESERIALIZE_SIZE = "client.deserialize.size";
public static final String CLIENT_SERIALIZE_SIZE = "client.serialize.size";

public static final String SERVER_DESERIALIZE_TIME = "server.deserialize.time";
public static final String SERVER_SERIALIZE_TIME = "server.serialize.time";
public static final String CLIENT_DESERIALIZE_TIME = "client.deserialize.time";
public static final String CLIENT_SERIALIZE_TIME = "client.serialize.time";
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.alipay.common.tracer.core.span.CommonSpanTags;
import com.alipay.common.tracer.core.span.SofaTracerSpan;
import com.alipay.common.tracer.core.utils.StringUtils;
import com.alipay.sofa.tracer.plugins.dubbo.constants.AttachmentKeyConstants;
import io.opentracing.tag.Tags;

import java.io.IOException;
Expand Down Expand Up @@ -69,16 +70,17 @@ public String encode(SofaTracerSpan sofaTracerSpan) throws IOException {
data.append(CommonSpanTags.LOCAL_HOST, tagStr.get(CommonSpanTags.LOCAL_HOST));
//request serialize time
data.append(CommonSpanTags.CLIENT_SERIALIZE_TIME,
tagNum.get(CommonSpanTags.CLIENT_SERIALIZE_TIME));
tagNum.get(AttachmentKeyConstants.CLIENT_SERIALIZE_TIME));
//response deserialize time
data.append(CommonSpanTags.CLIENT_DESERIALIZE_TIME,
tagNum.get(CommonSpanTags.CLIENT_DESERIALIZE_TIME));
tagNum.get(AttachmentKeyConstants.CLIENT_DESERIALIZE_TIME));
//Request Body bytes length
Number reqSizeNum = tagNum.get(CommonSpanTags.REQ_SIZE);
Number reqSizeNum = tagNum.get(AttachmentKeyConstants.CLIENT_SERIALIZE_SIZE);
data.append(CommonSpanTags.REQ_SIZE, reqSizeNum == null ? 0 : reqSizeNum.longValue());
//Response Body bytes length
Number respSizeNum = tagNum.get(CommonSpanTags.REQ_SIZE);
Number respSizeNum = tagNum.get(AttachmentKeyConstants.CLIENT_DESERIALIZE_SIZE);
data.append(CommonSpanTags.RESP_SIZE, respSizeNum == null ? 0 : respSizeNum.longValue());

//Http status code
data.append(CommonSpanTags.RESULT_CODE, tagStr.get(CommonSpanTags.RESULT_CODE));
//error message
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.alipay.common.tracer.core.span.CommonSpanTags;
import com.alipay.common.tracer.core.span.SofaTracerSpan;
import com.alipay.common.tracer.core.utils.StringUtils;
import com.alipay.sofa.tracer.plugins.dubbo.constants.AttachmentKeyConstants;
import io.opentracing.tag.Tags;

import java.io.IOException;
Expand Down Expand Up @@ -60,10 +61,17 @@ public String encode(SofaTracerSpan sofaTracerSpan) throws IOException {
data.append(CommonSpanTags.LOCAL_PORT, tagStr.get(CommonSpanTags.LOCAL_PORT));
//protocol
data.append(CommonSpanTags.PROTOCOL, tagStr.get(CommonSpanTags.PROTOCOL));
long serializeTime = getTime(tagNum.get(CommonSpanTags.SERVER_SERIALIZE_TIME));

long serializeTime = getTime(tagNum.get(AttachmentKeyConstants.SERVER_SERIALIZE_TIME));
data.append(CommonSpanTags.SERVER_SERIALIZE_TIME, serializeTime);
long deserializeTime = getTime(tagNum.get(CommonSpanTags.SERVER_DESERIALIZE_TIME));
long deserializeTime = getTime(tagNum.get(AttachmentKeyConstants.SERVER_DESERIALIZE_TIME));
data.append(CommonSpanTags.SERVER_DESERIALIZE_TIME, deserializeTime);

Number reqSizeNum = tagNum.get(AttachmentKeyConstants.SERVER_DESERIALIZE_SIZE);
data.append(CommonSpanTags.REQ_SIZE, reqSizeNum == null ? 0 : reqSizeNum.longValue());
Number respSizeNum = tagNum.get(AttachmentKeyConstants.SERVER_SERIALIZE_SIZE);
data.append(CommonSpanTags.RESP_SIZE, respSizeNum == null ? 0 : respSizeNum.longValue());

//Http status code
data.append(CommonSpanTags.RESULT_CODE, tagStr.get(CommonSpanTags.RESULT_CODE));
//error message
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,14 @@
*/
package com.alipay.sofa.tracer.plugins.dubbo.wrapper;

import com.alipay.common.tracer.core.span.CommonSpanTags;
import org.apache.dubbo.common.Constants;
import com.alipay.sofa.tracer.plugins.dubbo.constants.AttachmentKeyConstants;
import org.apache.dubbo.remoting.Channel;
import org.apache.dubbo.remoting.Codec2;
import org.apache.dubbo.remoting.buffer.ChannelBuffer;
import org.apache.dubbo.remoting.exchange.Request;
import org.apache.dubbo.remoting.exchange.Response;
import org.apache.dubbo.rpc.AppResponse;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.RpcResult;
import java.io.IOException;

/**
Expand All @@ -51,10 +50,9 @@ public void encode(Channel channel, ChannelBuffer buffer, Object message) throws
return;
}
} else if (message instanceof Response) {
Object result = ((Response) message).getResult();
if (result instanceof RpcResult) {
RpcResult rpcResult = (RpcResult) result;
encodeResultWithTracer(channel, buffer, message, rpcResult);
Object response = ((Response) message).getResult();
if (response instanceof AppResponse) {
encodeResultWithTracer(channel, buffer, message);
return;
}
}
Expand All @@ -76,8 +74,10 @@ protected void encodeRequestWithTracer(Channel channel, ChannelBuffer buffer, Ob
codec.encode(channel, buffer, message);
int reqSize = buffer.writerIndex() - index;
long elapsed = System.currentTimeMillis() - startTime;
invocation.setAttachment(Constants.INPUT_KEY, String.valueOf(reqSize));
invocation.setAttachment(CommonSpanTags.CLIENT_SERIALIZE_TIME, String.valueOf(elapsed));
invocation.setAttachment(AttachmentKeyConstants.CLIENT_SERIALIZE_SIZE,
String.valueOf(reqSize));
invocation.setAttachment(AttachmentKeyConstants.CLIENT_SERIALIZE_TIME,
String.valueOf(elapsed));
}

/**
Expand All @@ -86,16 +86,18 @@ protected void encodeRequestWithTracer(Channel channel, ChannelBuffer buffer, Ob
* @param message the original Request object
* @throws IOException serialization exception
*/
protected void encodeResultWithTracer(Channel channel, ChannelBuffer buffer, Object result,
RpcResult rpcResult) throws IOException {

protected void encodeResultWithTracer(Channel channel, ChannelBuffer buffer, Object message)
throws IOException {
Object result = ((Response) message).getResult();
long startTime = System.currentTimeMillis();
int index = buffer.writerIndex();
codec.encode(channel, buffer, result);
codec.encode(channel, buffer, message);
int respSize = buffer.writerIndex() - index;
long elapsed = System.currentTimeMillis() - startTime;
rpcResult.setAttachment(Constants.OUTPUT_KEY, String.valueOf(respSize));
rpcResult.setAttachment(CommonSpanTags.SERVER_SERIALIZE_TIME, String.valueOf(elapsed));
((AppResponse) result).setAttachment(AttachmentKeyConstants.SERVER_SERIALIZE_SIZE,
String.valueOf(respSize));
((AppResponse) result).setAttachment(AttachmentKeyConstants.SERVER_SERIALIZE_TIME,
String.valueOf(elapsed));
}

/**
Expand All @@ -117,17 +119,19 @@ public Object decode(Channel channel, ChannelBuffer input) throws IOException {
Object data = ((Request) ret).getData();
if (data instanceof RpcInvocation) {
RpcInvocation invocation = (RpcInvocation) data;
invocation.setAttachment(Constants.INPUT_KEY, String.valueOf(size));
invocation.setAttachment(CommonSpanTags.SERVER_DESERIALIZE_TIME,
invocation.setAttachment(AttachmentKeyConstants.SERVER_DESERIALIZE_SIZE,
String.valueOf(size));
invocation.setAttachment(AttachmentKeyConstants.SERVER_DESERIALIZE_TIME,
String.valueOf(elapsed));
}
} else if (ret instanceof Response) {
// client-side deserialize the Response
Object result = ((Response) ret).getResult();
if (result instanceof RpcResult) {
RpcResult rpcResult = (RpcResult) result;
rpcResult.setAttachment(Constants.OUTPUT_KEY, String.valueOf(size));
rpcResult.setAttachment(CommonSpanTags.CLIENT_DESERIALIZE_TIME,
if (result instanceof AppResponse) {
AppResponse rpcResult = (AppResponse) result;
rpcResult.setAttachment(AttachmentKeyConstants.CLIENT_DESERIALIZE_SIZE,
String.valueOf(size));
rpcResult.setAttachment(AttachmentKeyConstants.CLIENT_DESERIALIZE_TIME,
String.valueOf(elapsed));
}
}
Expand Down
Loading

0 comments on commit 52fe696

Please sign in to comment.