diff --git a/protocol/dubbo/dubbo_invoker.go b/protocol/dubbo/dubbo_invoker.go index 607ef007b5..67d1d1e7f1 100644 --- a/protocol/dubbo/dubbo_invoker.go +++ b/protocol/dubbo/dubbo_invoker.go @@ -24,6 +24,7 @@ import ( ) import ( + "github.com/opentracing/opentracing-go" perrors "github.com/pkg/errors" ) @@ -72,6 +73,10 @@ func (di *DubboInvoker) Invoke(ctx context.Context, invocation protocol.Invocati inv.SetAttachments(k, v) } } + + // put the ctx into attachment + di.appendCtx(ctx, inv) + url := di.GetUrl() // async async, err := strconv.ParseBool(inv.AttachmentsByKey(constant.ASYNC_KEY, "false")) @@ -112,3 +117,17 @@ func (di *DubboInvoker) Destroy() { } }) } + +// Finally, I made the decision that I don't provide a general way to transfer the whole context +// because it could be misused. If the context contains to many key-value pairs, the performance will be much lower. +func (di *DubboInvoker) appendCtx(ctx context.Context, inv *invocation_impl.RPCInvocation) { + // inject opentracing ctx + currentSpan := opentracing.SpanFromContext(ctx) + if currentSpan != nil { + carrier := opentracing.TextMapCarrier(inv.Attachments()) + err := opentracing.GlobalTracer().Inject(currentSpan.Context(), opentracing.TextMap, carrier) + if err != nil { + logger.Errorf("Could not inject the span context into attachments: %v", err) + } + } +} diff --git a/protocol/dubbo/dubbo_invoker_test.go b/protocol/dubbo/dubbo_invoker_test.go index e360d57b8c..1a64301f82 100644 --- a/protocol/dubbo/dubbo_invoker_test.go +++ b/protocol/dubbo/dubbo_invoker_test.go @@ -25,6 +25,7 @@ import ( ) import ( + "github.com/opentracing/opentracing-go" "github.com/stretchr/testify/assert" ) @@ -81,6 +82,11 @@ func TestDubboInvoker_Invoke(t *testing.T) { res = invoker.Invoke(context.Background(), inv) assert.EqualError(t, res.Error(), "request need @response") + // testing appendCtx + span, ctx := opentracing.StartSpanFromContext(context.Background(), "TestOperation") + invoker.Invoke(ctx, inv) + span.Finish() + // destroy lock.Lock() proto.Destroy() diff --git a/protocol/dubbo/listener.go b/protocol/dubbo/listener.go index 204e8a1c5d..430c4e49d8 100644 --- a/protocol/dubbo/listener.go +++ b/protocol/dubbo/listener.go @@ -29,6 +29,7 @@ import ( import ( "github.com/apache/dubbo-go-hessian2" "github.com/dubbogo/getty" + "github.com/opentracing/opentracing-go" perrors "github.com/pkg/errors" ) @@ -63,9 +64,9 @@ func (s *rpcSession) GetReqNum() int32 { return atomic.LoadInt32(&s.reqNum) } -//////////////////////////////////////////// +// ////////////////////////////////////////// // RpcClientHandler -//////////////////////////////////////////// +// ////////////////////////////////////////// // RpcClientHandler ... type RpcClientHandler struct { @@ -157,9 +158,9 @@ func (h *RpcClientHandler) OnCron(session getty.Session) { h.conn.pool.rpcClient.heartbeat(session) } -//////////////////////////////////////////// +// ////////////////////////////////////////// // RpcServerHandler -//////////////////////////////////////////// +// ////////////////////////////////////////// // RpcServerHandler ... type RpcServerHandler struct { @@ -284,7 +285,10 @@ func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) { args := p.Body.(map[string]interface{})["args"].([]interface{}) inv := invocation.NewRPCInvocation(p.Service.Method, args, attachments) - result := invoker.Invoke(context.Background(), inv) + + ctx := rebuildCtx(inv) + + result := invoker.Invoke(ctx, inv) if err := result.Error(); err != nil { p.Header.ResponseStatus = hessian.Response_OK p.Body = hessian.NewResponse(nil, err, result.Attachments()) @@ -327,6 +331,21 @@ func (h *RpcServerHandler) OnCron(session getty.Session) { } } +// rebuildCtx rebuild the context by attachment. +// Once we decided to transfer more context's key-value, we should change this. +// now we only support rebuild the tracing context +func rebuildCtx(inv *invocation.RPCInvocation) context.Context { + ctx := context.Background() + + // actually, if user do not use any opentracing framework, the err will not be nil. + spanCtx, err := opentracing.GlobalTracer().Extract(opentracing.TextMap, + opentracing.TextMapCarrier(inv.Attachments())) + if err == nil { + ctx = context.WithValue(ctx, constant.TRACING_REMOTE_SPAN_CTX, spanCtx) + } + return ctx +} + func reply(session getty.Session, req *DubboPackage, tp hessian.PackageType) { resp := &DubboPackage{ Header: hessian.DubboHeader{ diff --git a/protocol/dubbo/listener_test.go b/protocol/dubbo/listener_test.go new file mode 100644 index 0000000000..5f80981460 --- /dev/null +++ b/protocol/dubbo/listener_test.go @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dubbo + +import ( + "testing" +) + +import ( + "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go/mocktracer" + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/protocol/invocation" +) + +// test rebuild the ctx +func TestRebuildCtx(t *testing.T) { + opentracing.SetGlobalTracer(mocktracer.New()) + attach := make(map[string]string, 10) + attach[constant.VERSION_KEY] = "1.0" + attach[constant.GROUP_KEY] = "MyGroup" + inv := invocation.NewRPCInvocation("MethodName", []interface{}{"OK", "Hello"}, attach) + + // attachment doesn't contains any tracing key-value pair, + ctx := rebuildCtx(inv) + assert.NotNil(t, ctx) + assert.Nil(t, ctx.Value(constant.TRACING_REMOTE_SPAN_CTX)) + + span, ctx := opentracing.StartSpanFromContext(ctx, "Test-Client") + + opentracing.GlobalTracer().Inject(span.Context(), opentracing.TextMap, + opentracing.TextMapCarrier(inv.Attachments())) + // rebuild the context success + inv = invocation.NewRPCInvocation("MethodName", []interface{}{"OK", "Hello"}, attach) + ctx = rebuildCtx(inv) + span.Finish() + assert.NotNil(t, ctx) + assert.NotNil(t, ctx.Value(constant.TRACING_REMOTE_SPAN_CTX)) +}