diff --git a/go/trace/fake.go b/go/trace/fake.go index f1f8be19dac..a5394e05868 100644 --- a/go/trace/fake.go +++ b/go/trace/fake.go @@ -23,25 +23,26 @@ import ( "google.golang.org/grpc" ) -type fakeSpanFactory struct{} - -func (fakeSpanFactory) New(Span, string) Span { return fakeSpan{} } -func (fakeSpanFactory) NewClientSpan(parent Span, serviceName, label string) Span { return fakeSpan{} } -func (fakeSpanFactory) FromContext(context.Context) (Span, bool) { return nil, false } -func (fakeSpanFactory) NewContext(parent context.Context, _ Span) context.Context { return parent } -func (fakeSpanFactory) AddGrpcServerOptions(addInterceptors func(s grpc.StreamServerInterceptor, u grpc.UnaryServerInterceptor)) { +type noopTracingServer struct{} + +func (noopTracingServer) New(Span, string) Span { return NoopSpan{} } +func (noopTracingServer) NewClientSpan(parent Span, serviceName, label string) Span { return NoopSpan{} } +func (noopTracingServer) FromContext(context.Context) (Span, bool) { return nil, false } +func (noopTracingServer) NewFromString(parent, label string) (Span, error) { return NoopSpan{}, nil } +func (noopTracingServer) NewContext(parent context.Context, _ Span) context.Context { return parent } +func (noopTracingServer) AddGrpcServerOptions(addInterceptors func(s grpc.StreamServerInterceptor, u grpc.UnaryServerInterceptor)) { } -func (fakeSpanFactory) AddGrpcClientOptions(addInterceptors func(s grpc.StreamClientInterceptor, u grpc.UnaryClientInterceptor)) { +func (noopTracingServer) AddGrpcClientOptions(addInterceptors func(s grpc.StreamClientInterceptor, u grpc.UnaryClientInterceptor)) { } -// fakeSpan implements Span with no-op methods. -type fakeSpan struct{} +// NoopSpan implements Span with no-op methods. +type NoopSpan struct{} -func (fakeSpan) Finish() {} -func (fakeSpan) Annotate(string, interface{}) {} +func (NoopSpan) Finish() {} +func (NoopSpan) Annotate(string, interface{}) {} func init() { tracingBackendFactories["noop"] = func(_ string) (tracingService, io.Closer, error) { - return fakeSpanFactory{}, &nilCloser{}, nil + return noopTracingServer{}, &nilCloser{}, nil } } diff --git a/go/trace/opentracing.go b/go/trace/opentracing.go index ea41443de70..6b47177d45b 100644 --- a/go/trace/opentracing.go +++ b/go/trace/opentracing.go @@ -16,10 +16,11 @@ limitations under the License. package trace import ( - "github.com/opentracing-contrib/go-grpc" + otgrpc "github.com/opentracing-contrib/go-grpc" "github.com/opentracing/opentracing-go" "golang.org/x/net/context" "google.golang.org/grpc" + "vitess.io/vitess/go/vt/vterrors" ) var _ Span = (*openTracingSpan)(nil) @@ -41,7 +42,9 @@ func (js openTracingSpan) Annotate(key string, value interface{}) { var _ tracingService = (*openTracingService)(nil) type openTracingService struct { - Tracer opentracing.Tracer + Tracer opentracing.Tracer + fromString func(string) (opentracing.SpanContext, error) + toString func(Span) string } // AddGrpcServerOptions is part of an interface implementation @@ -74,15 +77,23 @@ func (jf openTracingService) New(parent Span, label string) Span { return openTracingSpan{otSpan: innerSpan} } +func (jf openTracingService) NewFromString(parent, label string) (Span, error) { + spanContext, err := jf.fromString(parent) + if err != nil { + return nil, vterrors.Wrap(err, "failed to deserialize span context") + } + innerSpan := jf.Tracer.StartSpan(label, opentracing.ChildOf(spanContext)) + return openTracingSpan{otSpan: innerSpan}, nil +} + // FromContext is part of an interface implementation func (jf openTracingService) FromContext(ctx context.Context) (Span, bool) { innerSpan := opentracing.SpanFromContext(ctx) - if innerSpan != nil { - return openTracingSpan{otSpan: innerSpan}, true - } else { + if innerSpan == nil { return nil, false } + return openTracingSpan{otSpan: innerSpan}, true } // NewContext is part of an interface implementation diff --git a/go/trace/plugin_jaeger.go b/go/trace/plugin_jaeger.go index 652cb42352c..abae1bfbe6d 100644 --- a/go/trace/plugin_jaeger.go +++ b/go/trace/plugin_jaeger.go @@ -56,6 +56,9 @@ var ( // JAEGER_AGENT_PORT func newJagerTracerFromEnv(serviceName string) (tracingService, io.Closer, error) { cfg, err := config.FromEnv() + if err != nil { + return nil, nil, err + } if cfg.ServiceName == "" { cfg.ServiceName = serviceName } @@ -79,7 +82,18 @@ func newJagerTracerFromEnv(serviceName string) (tracingService, io.Closer, error opentracing.SetGlobalTracer(tracer) - return openTracingService{tracer}, closer, nil + f1 := func(s string) (context opentracing.SpanContext, e error) { + // I don't understand why I have to do this and not just use `jaeger.ContextFromString` directly + return jaeger.ContextFromString(s) + } + + f2 := func(in Span) string { + otSpan := in.(*openTracingSpan) + jaegerSpanContext := otSpan.otSpan.Context().(*jaeger.SpanContext) + return jaegerSpanContext.String() + } + + return openTracingService{Tracer: tracer, fromString: f1, toString: f2}, closer, nil } func init() { diff --git a/go/trace/trace.go b/go/trace/trace.go index b52c4ceb901..7d495bda4f7 100644 --- a/go/trace/trace.go +++ b/go/trace/trace.go @@ -45,13 +45,24 @@ type Span interface { // NewSpan creates a new Span with the currently installed tracing plugin. // If no tracing plugin is installed, it returns a fake Span that does nothing. func NewSpan(inCtx context.Context, label string) (Span, context.Context) { - parent, _ := spanFactory.FromContext(inCtx) - span := spanFactory.New(parent, label) - outCtx := spanFactory.NewContext(inCtx, span) + parent, _ := currentTracer.FromContext(inCtx) + span := currentTracer.New(parent, label) + outCtx := currentTracer.NewContext(inCtx, span) return span, outCtx } +// NewFromString creates a new Span with the currently installed tracing plugin, extracting the span context from +// the provided string. +func NewFromString(inCtx context.Context, parent, label string) (Span, context.Context, error) { + span, err := currentTracer.NewFromString(parent, label) + if err != nil { + return nil, nil, err + } + outCtx := currentTracer.NewContext(inCtx, span) + return span, outCtx, nil +} + // AnnotateSQL annotates information about a sql query in the span. This is done in a way // so as to not leak personally identifying information (PII), or sensitive personal information (SPI) func AnnotateSQL(span Span, sql string) { @@ -61,12 +72,12 @@ func AnnotateSQL(span Span, sql string) { // FromContext returns the Span from a Context if present. The bool return // value indicates whether a Span was present in the Context. func FromContext(ctx context.Context) (Span, bool) { - return spanFactory.FromContext(ctx) + return currentTracer.FromContext(ctx) } // NewContext returns a context based on parent with a new Span value. func NewContext(parent context.Context, span Span) context.Context { - return spanFactory.NewContext(parent, span) + return currentTracer.NewContext(parent, span) } // CopySpan creates a new context from parentCtx, with only the trace span @@ -78,12 +89,14 @@ func CopySpan(parentCtx, spanCtx context.Context) context.Context { return parentCtx } +// AddGrpcServerOptions adds GRPC interceptors that read the parent span from the grpc packets func AddGrpcServerOptions(addInterceptors func(s grpc.StreamServerInterceptor, u grpc.UnaryServerInterceptor)) { - spanFactory.AddGrpcServerOptions(addInterceptors) + currentTracer.AddGrpcServerOptions(addInterceptors) } +// AddGrpcClientOptions adds GRPC interceptors that add parent information to outgoing grpc packets func AddGrpcClientOptions(addInterceptors func(s grpc.StreamClientInterceptor, u grpc.UnaryClientInterceptor)) { - spanFactory.AddGrpcClientOptions(addInterceptors) + currentTracer.AddGrpcClientOptions(addInterceptors) } // tracingService is an interface for creating spans or extracting them from Contexts. @@ -91,7 +104,10 @@ type tracingService interface { // New creates a new span from an existing one, if provided. The parent can also be nil New(parent Span, label string) Span - // FromContext extracts a span from a context, making it possible to annotate the span with additional information. + // NewFromString creates a new span and uses the provided string to reconstitute the parent span + NewFromString(parent, label string) (Span, error) + + // FromContext extracts a span from a context, making it possible to annotate the span with additional information FromContext(ctx context.Context) (Span, bool) // NewContext creates a new context containing the provided span @@ -104,12 +120,14 @@ type tracingService interface { AddGrpcClientOptions(addInterceptors func(s grpc.StreamClientInterceptor, u grpc.UnaryClientInterceptor)) } +// TracerFactory creates a tracing service for the service provided. It's important to close the provided io.Closer +// object to make sure that all spans are sent to the backend before the process exits. type TracerFactory func(serviceName string) (tracingService, io.Closer, error) // tracingBackendFactories should be added to by a plugin during init() to install itself var tracingBackendFactories = make(map[string]TracerFactory) -var spanFactory tracingService = fakeSpanFactory{} +var currentTracer tracingService = noopTracingServer{} var ( tracingServer = flag.String("tracer", "noop", "tracing service to use") @@ -128,7 +146,7 @@ func StartTracing(serviceName string) io.Closer { return &nilCloser{} } - spanFactory = tracer + currentTracer = tracer log.Infof("successfully started tracing with [%s]", *tracingServer) diff --git a/go/trace/trace_test.go b/go/trace/trace_test.go index 97cc3b2c2dd..f7414b9bf31 100644 --- a/go/trace/trace_test.go +++ b/go/trace/trace_test.go @@ -37,7 +37,7 @@ func TestFakeSpan(t *testing.T) { span2.Annotate("key", 42) span2.Finish() - span3, ctx := NewSpan(ctx, "label") + span3, _ := NewSpan(ctx, "label") span3.Annotate("key", 42) span3.Finish() } @@ -93,7 +93,7 @@ type fakeTracer struct { log []string } -func (f *fakeTracer) NewClientSpan(parent Span, serviceName, label string) Span { +func (f *fakeTracer) NewFromString(parent, label string) (Span, error) { panic("implement me") } diff --git a/go/vt/vtgate/planbuilder/testdata/select_cases.txt b/go/vt/vtgate/planbuilder/testdata/select_cases.txt index da728217844..e8885a33a28 100644 --- a/go/vt/vtgate/planbuilder/testdata/select_cases.txt +++ b/go/vt/vtgate/planbuilder/testdata/select_cases.txt @@ -154,6 +154,32 @@ } } +# select aggregation with partial scatter directive - added comments to try to confuse the hint extraction +"/*VT_SPAN_CONTEXT=123*/select /*vt+ SCATTER_ERRORS_AS_WARNINGS=1 */ count(*) from user" +{ + "Original": "/*VT_SPAN_CONTEXT=123*/select /*vt+ SCATTER_ERRORS_AS_WARNINGS=1 */ count(*) from user", + "Instructions": { + "Aggregates": [ + { + "Opcode": "count", + "Col": 0 + } + ], + "Keys": null, + "Input": { + "Opcode": "SelectScatter", + "Keyspace": { + "Name": "user", + "Sharded": true + }, + "Query": "select /*vt+ SCATTER_ERRORS_AS_WARNINGS=1 */ count(*) from user", + "FieldQuery": "select count(*) from user where 1 != 1", + "ScatterErrorsAsWarnings": true, + "Table": "user" + } + } +} + # select limit with partial scatter directive "select /*vt+ SCATTER_ERRORS_AS_WARNINGS=1 */ * from user limit 10" { diff --git a/go/vt/vtgate/plugin_mysql_server.go b/go/vt/vtgate/plugin_mysql_server.go index 85240db4804..1e93fb6e024 100644 --- a/go/vt/vtgate/plugin_mysql_server.go +++ b/go/vt/vtgate/plugin_mysql_server.go @@ -21,10 +21,14 @@ import ( "fmt" "net" "os" + "regexp" "sync/atomic" "syscall" "time" + "vitess.io/vitess/go/vt/sqlparser" + "vitess.io/vitess/go/vt/vterrors" + "golang.org/x/net/context" "vitess.io/vitess/go/trace" @@ -99,6 +103,35 @@ func (vh *vtgateHandler) ConnectionClosed(c *mysql.Conn) { } } +// Regexp to extract parent span id over the sql query +var r = regexp.MustCompile("/\\*VT_SPAN_CONTEXT=(.*)\\*/") + +// this function is here to make this logic easy to test by decoupling the logic from the `trace.NewSpan` and `trace.NewFromString` functions +func startSpanTestable(ctx context.Context, query, label string, + newSpan func(context.Context, string) (trace.Span, context.Context), + newSpanFromString func(context.Context, string, string) (trace.Span, context.Context, error)) (trace.Span, context.Context, error) { + _, comments := sqlparser.SplitMarginComments(query) + match := r.FindStringSubmatch(comments.Leading) + var span trace.Span + if len(match) == 0 { + span, ctx = newSpan(ctx, label) + } else { + var err error + span, ctx, err = newSpanFromString(ctx, match[1], label) + if err != nil { + return nil, nil, err + } + } + + trace.AnnotateSQL(span, query) + + return span, ctx, nil +} + +func startSpan(ctx context.Context, query, label string) (trace.Span, context.Context, error) { + return startSpanTestable(ctx, query, label, trace.NewSpan, trace.NewFromString) +} + func (vh *vtgateHandler) ComQuery(c *mysql.Conn, query string, callback func(*sqltypes.Result) error) error { ctx := context.Background() var cancel context.CancelFunc @@ -106,8 +139,11 @@ func (vh *vtgateHandler) ComQuery(c *mysql.Conn, query string, callback func(*sq ctx, cancel = context.WithTimeout(ctx, *mysqlQueryTimeout) defer cancel() } - span, ctx := trace.NewSpan(ctx, "vtgateHandler.ComQuery") - trace.AnnotateSQL(span, query) + + span, ctx, err := startSpan(ctx, query, "vtgateHandler.ComQuery") + if err != nil { + return vterrors.Wrap(err, "failed to extract span") + } defer span.Finish() ctx = callinfo.MysqlCallInfo(ctx, c) diff --git a/go/vt/vtgate/plugin_mysql_server_test.go b/go/vt/vtgate/plugin_mysql_server_test.go index c190f053e83..1a4ee4e8a6c 100644 --- a/go/vt/vtgate/plugin_mysql_server_test.go +++ b/go/vt/vtgate/plugin_mysql_server_test.go @@ -22,6 +22,9 @@ import ( "strings" "testing" + "github.com/stretchr/testify/assert" + "vitess.io/vitess/go/trace" + "golang.org/x/net/context" "vitess.io/vitess/go/mysql" @@ -168,3 +171,54 @@ func TestConnectionRespectsExistingUnixSocket(t *testing.T) { t.Errorf("Error: %v, want prefix %s", err, want) } } + +var newSpanOK = func(ctx context.Context, label string) (trace.Span, context.Context) { + return trace.NoopSpan{}, context.Background() +} + +var newFromStringOK = func(ctx context.Context, spanContext, label string) (trace.Span, context.Context, error) { + return trace.NoopSpan{}, context.Background(), nil +} + +func newFromStringFail(t *testing.T) func(ctx context.Context, parentSpan string, label string) (trace.Span, context.Context, error) { + return func(ctx context.Context, parentSpan string, label string) (trace.Span, context.Context, error) { + t.Fatalf("we didn't provide a parent span in the sql query. this should not have been called. got: %v", parentSpan) + return trace.NoopSpan{}, context.Background(), nil + } +} + +func newFromStringExpect(t *testing.T, expected string) func(ctx context.Context, parentSpan string, label string) (trace.Span, context.Context, error) { + return func(ctx context.Context, parentSpan string, label string) (trace.Span, context.Context, error) { + assert.Equal(t, expected, parentSpan) + return trace.NoopSpan{}, context.Background(), nil + } +} + +func newSpanFail(t *testing.T) func(ctx context.Context, label string) (trace.Span, context.Context) { + return func(ctx context.Context, label string) (trace.Span, context.Context) { + t.Fatalf("we provided a span context but newFromString was not used as expected") + return trace.NoopSpan{}, context.Background() + } +} + +func TestNoSpanContextPassed(t *testing.T) { + _, _, err := startSpanTestable(context.Background(), "sql without comments", "someLabel", newSpanOK, newFromStringFail(t)) + assert.NoError(t, err) +} + +func TestSpanContextNoPassedInButExistsInString(t *testing.T) { + _, _, err := startSpanTestable(context.Background(), "SELECT * FROM SOMETABLE WHERE COL = \"/*VT_SPAN_CONTEXT=123*/", "someLabel", newSpanOK, newFromStringFail(t)) + assert.NoError(t, err) +} + +func TestSpanContextPassedIn(t *testing.T) { + _, _, err := startSpanTestable(context.Background(), "/*VT_SPAN_CONTEXT=123*/SQL QUERY", "someLabel", newSpanFail(t), newFromStringOK) + assert.NoError(t, err) +} + +func TestSpanContextPassedInEvenAroundOtherComments(t *testing.T) { + _, _, err := startSpanTestable(context.Background(), "/*VT_SPAN_CONTEXT=123*/SELECT /*vt+ SCATTER_ERRORS_AS_WARNINGS */ col1, col2 FROM TABLE ", "someLabel", + newSpanFail(t), + newFromStringExpect(t, "123")) + assert.NoError(t, err) +}