Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 14 additions & 13 deletions go/trace/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,25 +23,26 @@ import (
"google.golang.org/grpc"
)

type fakeSpanFactory struct{}

func (fakeSpanFactory) New(Span, string) Span { return fakeSpan{} }
func (fakeSpanFactory) NewClientSpan(parent Span, serviceName, label string) Span { return fakeSpan{} }
func (fakeSpanFactory) FromContext(context.Context) (Span, bool) { return nil, false }
func (fakeSpanFactory) NewContext(parent context.Context, _ Span) context.Context { return parent }
func (fakeSpanFactory) AddGrpcServerOptions(addInterceptors func(s grpc.StreamServerInterceptor, u grpc.UnaryServerInterceptor)) {
type noopTracingServer struct{}

func (noopTracingServer) New(Span, string) Span { return NoopSpan{} }
func (noopTracingServer) NewClientSpan(parent Span, serviceName, label string) Span { return NoopSpan{} }
func (noopTracingServer) FromContext(context.Context) (Span, bool) { return nil, false }
func (noopTracingServer) NewFromString(parent, label string) (Span, error) { return NoopSpan{}, nil }
func (noopTracingServer) NewContext(parent context.Context, _ Span) context.Context { return parent }
func (noopTracingServer) AddGrpcServerOptions(addInterceptors func(s grpc.StreamServerInterceptor, u grpc.UnaryServerInterceptor)) {
}
func (fakeSpanFactory) AddGrpcClientOptions(addInterceptors func(s grpc.StreamClientInterceptor, u grpc.UnaryClientInterceptor)) {
func (noopTracingServer) AddGrpcClientOptions(addInterceptors func(s grpc.StreamClientInterceptor, u grpc.UnaryClientInterceptor)) {
}

// fakeSpan implements Span with no-op methods.
type fakeSpan struct{}
// NoopSpan implements Span with no-op methods.
type NoopSpan struct{}

func (fakeSpan) Finish() {}
func (fakeSpan) Annotate(string, interface{}) {}
func (NoopSpan) Finish() {}
func (NoopSpan) Annotate(string, interface{}) {}

func init() {
tracingBackendFactories["noop"] = func(_ string) (tracingService, io.Closer, error) {
return fakeSpanFactory{}, &nilCloser{}, nil
return noopTracingServer{}, &nilCloser{}, nil
}
}
21 changes: 16 additions & 5 deletions go/trace/opentracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@ limitations under the License.
package trace

import (
"github.com/opentracing-contrib/go-grpc"
otgrpc "github.com/opentracing-contrib/go-grpc"
"github.com/opentracing/opentracing-go"
"golang.org/x/net/context"
"google.golang.org/grpc"
"vitess.io/vitess/go/vt/vterrors"
)

var _ Span = (*openTracingSpan)(nil)
Expand All @@ -41,7 +42,9 @@ func (js openTracingSpan) Annotate(key string, value interface{}) {
var _ tracingService = (*openTracingService)(nil)

type openTracingService struct {
Tracer opentracing.Tracer
Tracer opentracing.Tracer
fromString func(string) (opentracing.SpanContext, error)
toString func(Span) string
}

// AddGrpcServerOptions is part of an interface implementation
Expand Down Expand Up @@ -74,15 +77,23 @@ func (jf openTracingService) New(parent Span, label string) Span {
return openTracingSpan{otSpan: innerSpan}
}

func (jf openTracingService) NewFromString(parent, label string) (Span, error) {
spanContext, err := jf.fromString(parent)
if err != nil {
return nil, vterrors.Wrap(err, "failed to deserialize span context")
}
innerSpan := jf.Tracer.StartSpan(label, opentracing.ChildOf(spanContext))
return openTracingSpan{otSpan: innerSpan}, nil
}

// FromContext is part of an interface implementation
func (jf openTracingService) FromContext(ctx context.Context) (Span, bool) {
innerSpan := opentracing.SpanFromContext(ctx)

if innerSpan != nil {
return openTracingSpan{otSpan: innerSpan}, true
} else {
if innerSpan == nil {
return nil, false
}
return openTracingSpan{otSpan: innerSpan}, true
}

// NewContext is part of an interface implementation
Expand Down
16 changes: 15 additions & 1 deletion go/trace/plugin_jaeger.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ var (
// JAEGER_AGENT_PORT
func newJagerTracerFromEnv(serviceName string) (tracingService, io.Closer, error) {
cfg, err := config.FromEnv()
if err != nil {
return nil, nil, err
}
if cfg.ServiceName == "" {
cfg.ServiceName = serviceName
}
Expand All @@ -79,7 +82,18 @@ func newJagerTracerFromEnv(serviceName string) (tracingService, io.Closer, error

opentracing.SetGlobalTracer(tracer)

return openTracingService{tracer}, closer, nil
f1 := func(s string) (context opentracing.SpanContext, e error) {
// I don't understand why I have to do this and not just use `jaeger.ContextFromString` directly
return jaeger.ContextFromString(s)
}

f2 := func(in Span) string {
otSpan := in.(*openTracingSpan)
jaegerSpanContext := otSpan.otSpan.Context().(*jaeger.SpanContext)
return jaegerSpanContext.String()
}

return openTracingService{Tracer: tracer, fromString: f1, toString: f2}, closer, nil
}

func init() {
Expand Down
38 changes: 28 additions & 10 deletions go/trace/trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,24 @@ type Span interface {
// NewSpan creates a new Span with the currently installed tracing plugin.
// If no tracing plugin is installed, it returns a fake Span that does nothing.
func NewSpan(inCtx context.Context, label string) (Span, context.Context) {
parent, _ := spanFactory.FromContext(inCtx)
span := spanFactory.New(parent, label)
outCtx := spanFactory.NewContext(inCtx, span)
parent, _ := currentTracer.FromContext(inCtx)
span := currentTracer.New(parent, label)
outCtx := currentTracer.NewContext(inCtx, span)

return span, outCtx
}

// NewFromString creates a new Span with the currently installed tracing plugin, extracting the span context from
// the provided string.
func NewFromString(inCtx context.Context, parent, label string) (Span, context.Context, error) {
span, err := currentTracer.NewFromString(parent, label)
if err != nil {
return nil, nil, err
}
outCtx := currentTracer.NewContext(inCtx, span)
return span, outCtx, nil
}

// AnnotateSQL annotates information about a sql query in the span. This is done in a way
// so as to not leak personally identifying information (PII), or sensitive personal information (SPI)
func AnnotateSQL(span Span, sql string) {
Expand All @@ -61,12 +72,12 @@ func AnnotateSQL(span Span, sql string) {
// FromContext returns the Span from a Context if present. The bool return
// value indicates whether a Span was present in the Context.
func FromContext(ctx context.Context) (Span, bool) {
return spanFactory.FromContext(ctx)
return currentTracer.FromContext(ctx)
}

// NewContext returns a context based on parent with a new Span value.
func NewContext(parent context.Context, span Span) context.Context {
return spanFactory.NewContext(parent, span)
return currentTracer.NewContext(parent, span)
}

// CopySpan creates a new context from parentCtx, with only the trace span
Expand All @@ -78,20 +89,25 @@ func CopySpan(parentCtx, spanCtx context.Context) context.Context {
return parentCtx
}

// AddGrpcServerOptions adds GRPC interceptors that read the parent span from the grpc packets
func AddGrpcServerOptions(addInterceptors func(s grpc.StreamServerInterceptor, u grpc.UnaryServerInterceptor)) {
spanFactory.AddGrpcServerOptions(addInterceptors)
currentTracer.AddGrpcServerOptions(addInterceptors)
}

// AddGrpcClientOptions adds GRPC interceptors that add parent information to outgoing grpc packets
func AddGrpcClientOptions(addInterceptors func(s grpc.StreamClientInterceptor, u grpc.UnaryClientInterceptor)) {
spanFactory.AddGrpcClientOptions(addInterceptors)
currentTracer.AddGrpcClientOptions(addInterceptors)
}

// tracingService is an interface for creating spans or extracting them from Contexts.
type tracingService interface {
// New creates a new span from an existing one, if provided. The parent can also be nil
New(parent Span, label string) Span

// FromContext extracts a span from a context, making it possible to annotate the span with additional information.
// NewFromString creates a new span and uses the provided string to reconstitute the parent span
NewFromString(parent, label string) (Span, error)

// FromContext extracts a span from a context, making it possible to annotate the span with additional information
FromContext(ctx context.Context) (Span, bool)

// NewContext creates a new context containing the provided span
Expand All @@ -104,12 +120,14 @@ type tracingService interface {
AddGrpcClientOptions(addInterceptors func(s grpc.StreamClientInterceptor, u grpc.UnaryClientInterceptor))
}

// TracerFactory creates a tracing service for the service provided. It's important to close the provided io.Closer
// object to make sure that all spans are sent to the backend before the process exits.
type TracerFactory func(serviceName string) (tracingService, io.Closer, error)

// tracingBackendFactories should be added to by a plugin during init() to install itself
var tracingBackendFactories = make(map[string]TracerFactory)

var spanFactory tracingService = fakeSpanFactory{}
var currentTracer tracingService = noopTracingServer{}

var (
tracingServer = flag.String("tracer", "noop", "tracing service to use")
Expand All @@ -128,7 +146,7 @@ func StartTracing(serviceName string) io.Closer {
return &nilCloser{}
}

spanFactory = tracer
currentTracer = tracer

log.Infof("successfully started tracing with [%s]", *tracingServer)

Expand Down
4 changes: 2 additions & 2 deletions go/trace/trace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestFakeSpan(t *testing.T) {
span2.Annotate("key", 42)
span2.Finish()

span3, ctx := NewSpan(ctx, "label")
span3, _ := NewSpan(ctx, "label")
span3.Annotate("key", 42)
span3.Finish()
}
Expand Down Expand Up @@ -93,7 +93,7 @@ type fakeTracer struct {
log []string
}

func (f *fakeTracer) NewClientSpan(parent Span, serviceName, label string) Span {
func (f *fakeTracer) NewFromString(parent, label string) (Span, error) {
panic("implement me")
}

Expand Down
26 changes: 26 additions & 0 deletions go/vt/vtgate/planbuilder/testdata/select_cases.txt
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,32 @@
}
}

# select aggregation with partial scatter directive - added comments to try to confuse the hint extraction
"/*VT_SPAN_CONTEXT=123*/select /*vt+ SCATTER_ERRORS_AS_WARNINGS=1 */ count(*) from user"
{
"Original": "/*VT_SPAN_CONTEXT=123*/select /*vt+ SCATTER_ERRORS_AS_WARNINGS=1 */ count(*) from user",
"Instructions": {
"Aggregates": [
{
"Opcode": "count",
"Col": 0
}
],
"Keys": null,
"Input": {
"Opcode": "SelectScatter",
"Keyspace": {
"Name": "user",
"Sharded": true
},
"Query": "select /*vt+ SCATTER_ERRORS_AS_WARNINGS=1 */ count(*) from user",
"FieldQuery": "select count(*) from user where 1 != 1",
"ScatterErrorsAsWarnings": true,
"Table": "user"
}
}
}

# select limit with partial scatter directive
"select /*vt+ SCATTER_ERRORS_AS_WARNINGS=1 */ * from user limit 10"
{
Expand Down
40 changes: 38 additions & 2 deletions go/vt/vtgate/plugin_mysql_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,14 @@ import (
"fmt"
"net"
"os"
"regexp"
"sync/atomic"
"syscall"
"time"

"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vterrors"

"golang.org/x/net/context"
"vitess.io/vitess/go/trace"

Expand Down Expand Up @@ -99,15 +103,47 @@ func (vh *vtgateHandler) ConnectionClosed(c *mysql.Conn) {
}
}

// Regexp to extract parent span id over the sql query
var r = regexp.MustCompile("/\\*VT_SPAN_CONTEXT=(.*)\\*/")

// this function is here to make this logic easy to test by decoupling the logic from the `trace.NewSpan` and `trace.NewFromString` functions
func startSpanTestable(ctx context.Context, query, label string,
newSpan func(context.Context, string) (trace.Span, context.Context),
newSpanFromString func(context.Context, string, string) (trace.Span, context.Context, error)) (trace.Span, context.Context, error) {
_, comments := sqlparser.SplitMarginComments(query)
match := r.FindStringSubmatch(comments.Leading)
var span trace.Span
if len(match) == 0 {
span, ctx = newSpan(ctx, label)
} else {
var err error
span, ctx, err = newSpanFromString(ctx, match[1], label)
if err != nil {
return nil, nil, err
}
}

trace.AnnotateSQL(span, query)

return span, ctx, nil
}

func startSpan(ctx context.Context, query, label string) (trace.Span, context.Context, error) {
return startSpanTestable(ctx, query, label, trace.NewSpan, trace.NewFromString)
}

func (vh *vtgateHandler) ComQuery(c *mysql.Conn, query string, callback func(*sqltypes.Result) error) error {
ctx := context.Background()
var cancel context.CancelFunc
if *mysqlQueryTimeout != 0 {
ctx, cancel = context.WithTimeout(ctx, *mysqlQueryTimeout)
defer cancel()
}
span, ctx := trace.NewSpan(ctx, "vtgateHandler.ComQuery")
trace.AnnotateSQL(span, query)

span, ctx, err := startSpan(ctx, query, "vtgateHandler.ComQuery")
if err != nil {
return vterrors.Wrap(err, "failed to extract span")
}
defer span.Finish()

ctx = callinfo.MysqlCallInfo(ctx, c)
Expand Down
54 changes: 54 additions & 0 deletions go/vt/vtgate/plugin_mysql_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ import (
"strings"
"testing"

"github.com/stretchr/testify/assert"
"vitess.io/vitess/go/trace"

"golang.org/x/net/context"

"vitess.io/vitess/go/mysql"
Expand Down Expand Up @@ -168,3 +171,54 @@ func TestConnectionRespectsExistingUnixSocket(t *testing.T) {
t.Errorf("Error: %v, want prefix %s", err, want)
}
}

var newSpanOK = func(ctx context.Context, label string) (trace.Span, context.Context) {
return trace.NoopSpan{}, context.Background()
}

var newFromStringOK = func(ctx context.Context, spanContext, label string) (trace.Span, context.Context, error) {
return trace.NoopSpan{}, context.Background(), nil
}

func newFromStringFail(t *testing.T) func(ctx context.Context, parentSpan string, label string) (trace.Span, context.Context, error) {
return func(ctx context.Context, parentSpan string, label string) (trace.Span, context.Context, error) {
t.Fatalf("we didn't provide a parent span in the sql query. this should not have been called. got: %v", parentSpan)
return trace.NoopSpan{}, context.Background(), nil
}
}

func newFromStringExpect(t *testing.T, expected string) func(ctx context.Context, parentSpan string, label string) (trace.Span, context.Context, error) {
return func(ctx context.Context, parentSpan string, label string) (trace.Span, context.Context, error) {
assert.Equal(t, expected, parentSpan)
return trace.NoopSpan{}, context.Background(), nil
}
}

func newSpanFail(t *testing.T) func(ctx context.Context, label string) (trace.Span, context.Context) {
return func(ctx context.Context, label string) (trace.Span, context.Context) {
t.Fatalf("we provided a span context but newFromString was not used as expected")
return trace.NoopSpan{}, context.Background()
}
}

func TestNoSpanContextPassed(t *testing.T) {
_, _, err := startSpanTestable(context.Background(), "sql without comments", "someLabel", newSpanOK, newFromStringFail(t))
assert.NoError(t, err)
}

func TestSpanContextNoPassedInButExistsInString(t *testing.T) {
_, _, err := startSpanTestable(context.Background(), "SELECT * FROM SOMETABLE WHERE COL = \"/*VT_SPAN_CONTEXT=123*/", "someLabel", newSpanOK, newFromStringFail(t))
assert.NoError(t, err)
}

func TestSpanContextPassedIn(t *testing.T) {
_, _, err := startSpanTestable(context.Background(), "/*VT_SPAN_CONTEXT=123*/SQL QUERY", "someLabel", newSpanFail(t), newFromStringOK)
assert.NoError(t, err)
}

func TestSpanContextPassedInEvenAroundOtherComments(t *testing.T) {
_, _, err := startSpanTestable(context.Background(), "/*VT_SPAN_CONTEXT=123*/SELECT /*vt+ SCATTER_ERRORS_AS_WARNINGS */ col1, col2 FROM TABLE ", "someLabel",
newSpanFail(t),
newFromStringExpect(t, "123"))
assert.NoError(t, err)
}