Skip to content

Commit

Permalink
Replace cassandra-spanstore tracing instrumentation withOTEL (#4599)
Browse files Browse the repository at this point in the history
  • Loading branch information
afzal442 authored Jul 31, 2023
1 parent e0c83b0 commit 2881485
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 49 deletions.
8 changes: 6 additions & 2 deletions plugin/storage/cassandra/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"io"

"github.com/spf13/viper"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/pkg/cassandra"
Expand Down Expand Up @@ -57,6 +59,7 @@ type Factory struct {
primaryMetricsFactory metrics.Factory
archiveMetricsFactory metrics.Factory
logger *zap.Logger
tracer trace.TracerProvider

primaryConfig config.SessionBuilder
primarySession cassandra.Session
Expand All @@ -67,6 +70,7 @@ type Factory struct {
// NewFactory creates a new Factory.
func NewFactory() *Factory {
return &Factory{
tracer: otel.GetTracerProvider(),
Options: NewOptions(primaryStorageConfig, archiveStorageConfig),
}
}
Expand Down Expand Up @@ -120,7 +124,7 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger)

// CreateSpanReader implements storage.Factory
func (f *Factory) CreateSpanReader() (spanstore.Reader, error) {
return cSpanStore.NewSpanReader(f.primarySession, f.primaryMetricsFactory, f.logger), nil
return cSpanStore.NewSpanReader(f.primarySession, f.primaryMetricsFactory, f.logger, f.tracer.Tracer("cSpanStore.SpanReader")), nil
}

// CreateSpanWriter implements storage.Factory
Expand All @@ -143,7 +147,7 @@ func (f *Factory) CreateArchiveSpanReader() (spanstore.Reader, error) {
if f.archiveSession == nil {
return nil, storage.ErrArchiveStorageNotConfigured
}
return cSpanStore.NewSpanReader(f.archiveSession, f.archiveMetricsFactory, f.logger), nil
return cSpanStore.NewSpanReader(f.archiveSession, f.archiveMetricsFactory, f.logger, f.tracer.Tracer("cSpanStore.SpanReader")), nil
}

// CreateArchiveSpanWriter implements storage.ArchiveFactory
Expand Down
7 changes: 6 additions & 1 deletion plugin/storage/cassandra/savetracetest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/jaegertracing/jaeger/model"
cascfg "github.com/jaegertracing/jaeger/pkg/cassandra/config"
"github.com/jaegertracing/jaeger/pkg/jtracer"
"github.com/jaegertracing/jaeger/pkg/metrics"
cSpanStore "github.com/jaegertracing/jaeger/plugin/storage/cassandra/spanstore"
"github.com/jaegertracing/jaeger/storage/spanstore"
Expand All @@ -44,8 +45,12 @@ func main() {
if err != nil {
logger.Fatal("Cannot create Cassandra session", zap.Error(err))
}
tracer, err := jtracer.New("savetracetest")
if err != nil {
logger.Fatal("Failed to initialize tracer", zap.Error(err))
}
spanStore := cSpanStore.NewSpanWriter(cqlSession, time.Hour*12, noScope, logger)
spanReader := cSpanStore.NewSpanReader(cqlSession, noScope, logger)
spanReader := cSpanStore.NewSpanReader(cqlSession, noScope, logger, tracer.OTEL.Tracer("cSpanStore.SpanReader"))
ctx := context.Background()
if err = spanStore.WriteSpan(ctx, getSomeSpan()); err != nil {
logger.Fatal("Failed to save", zap.Error(err))
Expand Down
70 changes: 39 additions & 31 deletions plugin/storage/cassandra/spanstore/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ import (
"fmt"
"time"

"github.com/opentracing/opentracing-go"
ottag "github.com/opentracing/opentracing-go/ext"
otlog "github.com/opentracing/opentracing-go/log"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/model"
Expand Down Expand Up @@ -110,13 +111,15 @@ type SpanReader struct {
operationNamesReader operationNamesReader
metrics spanReaderMetrics
logger *zap.Logger
tracer trace.Tracer
}

// NewSpanReader returns a new SpanReader.
func NewSpanReader(
session cassandra.Session,
metricsFactory metrics.Factory,
logger *zap.Logger,
tracer trace.Tracer,
) *SpanReader {
readFactory := metricsFactory.Namespace(metrics.NSOptions{Name: "read", Tags: nil})
serviceNamesStorage := NewServiceNamesStorage(session, 0, metricsFactory, logger)
Expand All @@ -134,6 +137,7 @@ func NewSpanReader(
queryServiceNameIndex: casMetrics.NewTable(readFactory, "service_name_index"),
},
logger: logger,
tracer: tracer,
}
}

Expand All @@ -151,9 +155,9 @@ func (s *SpanReader) GetOperations(
}

func (s *SpanReader) readTrace(ctx context.Context, traceID dbmodel.TraceID) (*model.Trace, error) {
span, ctx := startSpanForQuery(ctx, "readTrace", querySpanByTraceID)
defer span.Finish()
span.LogFields(otlog.String("event", "searching"), otlog.Object("trace_id", traceID))
ctx, span := s.startSpanForQuery(ctx, "readTrace", querySpanByTraceID)
defer span.End()
span.SetAttributes(attribute.Key("trace_id").String(traceID.String()))

trace, err := s.readTraceInSpan(ctx, traceID)
logErrorToSpan(span, err)
Expand Down Expand Up @@ -305,13 +309,16 @@ func (s *SpanReader) findTraceIDs(ctx context.Context, traceQuery *spanstore.Tra
}

func (s *SpanReader) queryByTagsAndLogs(ctx context.Context, tq *spanstore.TraceQueryParameters) (dbmodel.UniqueTraceIDs, error) {
span, ctx := startSpanForQuery(ctx, "queryByTagsAndLogs", queryByTag)
defer span.Finish()
ctx, span := s.startSpanForQuery(ctx, "queryByTagsAndLogs", queryByTag)
defer span.End()

results := make([]dbmodel.UniqueTraceIDs, 0, len(tq.Tags))
for k, v := range tq.Tags {
childSpan, _ := opentracing.StartSpanFromContext(ctx, "queryByTag")
childSpan.LogFields(otlog.String("tag.key", k), otlog.String("tag.value", v))
_, childSpan := s.tracer.Start(ctx, "queryByTag")
childSpan.SetAttributes(
attribute.Key("tag.key").String(k),
attribute.Key("tag.value").String(v),
)
query := s.session.Query(
queryByTag,
tq.ServiceName,
Expand All @@ -322,7 +329,7 @@ func (s *SpanReader) queryByTagsAndLogs(ctx context.Context, tq *spanstore.Trace
tq.NumTraces*limitMultiple,
).PageSize(0)
t, err := s.executeQuery(childSpan, query, s.metrics.queryTagIndex)
childSpan.Finish()
childSpan.End()
if err != nil {
return nil, err
}
Expand All @@ -332,8 +339,8 @@ func (s *SpanReader) queryByTagsAndLogs(ctx context.Context, tq *spanstore.Trace
}

func (s *SpanReader) queryByDuration(ctx context.Context, traceQuery *spanstore.TraceQueryParameters) (dbmodel.UniqueTraceIDs, error) {
span, ctx := startSpanForQuery(ctx, "queryByDuration", queryByDuration)
defer span.Finish()
ctx, span := s.startSpanForQuery(ctx, "queryByDuration", queryByDuration)
defer span.End()

results := dbmodel.UniqueTraceIDs{}

Expand All @@ -349,8 +356,8 @@ func (s *SpanReader) queryByDuration(ctx context.Context, traceQuery *spanstore.
endTimeByHour := traceQuery.StartTimeMax.Round(durationBucketSize)

for timeBucket := endTimeByHour; timeBucket.After(startTimeByHour) || timeBucket.Equal(startTimeByHour); timeBucket = timeBucket.Add(-1 * durationBucketSize) {
childSpan, _ := opentracing.StartSpanFromContext(ctx, "queryForTimeBucket")
childSpan.LogFields(otlog.String("timeBucket", timeBucket.String()))
_, childSpan := s.tracer.Start(ctx, "queryForTimeBucket")
childSpan.SetAttributes(attribute.Key("timeBucket").String(timeBucket.String()))
query := s.session.Query(
queryByDuration,
timeBucket,
Expand All @@ -360,7 +367,7 @@ func (s *SpanReader) queryByDuration(ctx context.Context, traceQuery *spanstore.
maxDurationMicros,
traceQuery.NumTraces*limitMultiple)
t, err := s.executeQuery(childSpan, query, s.metrics.queryDurationIndex)
childSpan.Finish()
childSpan.End()
if err != nil {
return nil, err
}
Expand All @@ -376,8 +383,8 @@ func (s *SpanReader) queryByDuration(ctx context.Context, traceQuery *spanstore.
}

func (s *SpanReader) queryByServiceNameAndOperation(ctx context.Context, tq *spanstore.TraceQueryParameters) (dbmodel.UniqueTraceIDs, error) {
span, _ := startSpanForQuery(ctx, "queryByServiceNameAndOperation", queryByServiceAndOperationName)
defer span.Finish()
_, span := s.startSpanForQuery(ctx, "queryByServiceNameAndOperation", queryByServiceAndOperationName)
defer span.End()
query := s.session.Query(
queryByServiceAndOperationName,
tq.ServiceName,
Expand All @@ -390,8 +397,8 @@ func (s *SpanReader) queryByServiceNameAndOperation(ctx context.Context, tq *spa
}

func (s *SpanReader) queryByService(ctx context.Context, tq *spanstore.TraceQueryParameters) (dbmodel.UniqueTraceIDs, error) {
span, _ := startSpanForQuery(ctx, "queryByService", queryByServiceName)
defer span.Finish()
_, span := s.startSpanForQuery(ctx, "queryByService", queryByServiceAndOperationName)
defer span.End()
query := s.session.Query(
queryByServiceName,
tq.ServiceName,
Expand All @@ -402,7 +409,7 @@ func (s *SpanReader) queryByService(ctx context.Context, tq *spanstore.TraceQuer
return s.executeQuery(span, query, s.metrics.queryServiceNameIndex)
}

func (s *SpanReader) executeQuery(span opentracing.Span, query cassandra.Query, tableMetrics *casMetrics.Table) (dbmodel.UniqueTraceIDs, error) {
func (s *SpanReader) executeQuery(span trace.Span, query cassandra.Query, tableMetrics *casMetrics.Table) (dbmodel.UniqueTraceIDs, error) {
start := time.Now()
i := query.Iter()
retMe := dbmodel.UniqueTraceIDs{}
Expand All @@ -414,25 +421,26 @@ func (s *SpanReader) executeQuery(span opentracing.Span, query cassandra.Query,
tableMetrics.Emit(err, time.Since(start))
if err != nil {
logErrorToSpan(span, err)
span.LogFields(otlog.String("query", query.String()))
s.logger.Error("Failed to exec query", zap.Error(err), zap.String("query", query.String()))
return nil, err
}
return retMe, nil
}

func startSpanForQuery(ctx context.Context, name, query string) (opentracing.Span, context.Context) {
span, ctx := opentracing.StartSpanFromContext(ctx, name)
ottag.DBStatement.Set(span, query)
ottag.DBType.Set(span, "cassandra")
ottag.Component.Set(span, "gocql")
return span, ctx
func (s *SpanReader) startSpanForQuery(ctx context.Context, name, query string) (context.Context, trace.Span) {
ctx, span := s.tracer.Start(ctx, name)
span.SetAttributes(
attribute.Key(semconv.DBStatementKey).String(query),
attribute.Key(semconv.DBSystemKey).String("cassandra"),
attribute.Key("component").String("gocql"),
)
return ctx, span
}

func logErrorToSpan(span opentracing.Span, err error) {
func logErrorToSpan(span trace.Span, err error) {
if err == nil {
return
}
ottag.Error.Set(span, true)
span.LogFields(otlog.Error(err))
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
}
53 changes: 38 additions & 15 deletions plugin/storage/cassandra/spanstore/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/sdk/trace/tracetest"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/internal/metricstest"
Expand All @@ -37,13 +40,26 @@ import (
)

type spanReaderTest struct {
session *mocks.Session
logger *zap.Logger
logBuffer *testutils.Buffer
reader *SpanReader
session *mocks.Session
logger *zap.Logger
logBuffer *testutils.Buffer
traceBuffer *tracetest.InMemoryExporter
reader *SpanReader
}

func withSpanReader(fn func(r *spanReaderTest)) {
func tracerProvider(t *testing.T) (trace.TracerProvider, *tracetest.InMemoryExporter, func()) {
exporter := tracetest.NewInMemoryExporter()
tp := sdktrace.NewTracerProvider(
sdktrace.WithSampler(sdktrace.AlwaysSample()),
sdktrace.WithSyncer(exporter),
)
closer := func() {
assert.NoError(t, tp.Shutdown(context.Background()))
}
return tp, exporter, closer
}

func withSpanReader(t *testing.T, fn func(r *spanReaderTest)) {
session := &mocks.Session{}
query := &mocks.Query{}
session.On("Query",
Expand All @@ -52,19 +68,22 @@ func withSpanReader(fn func(r *spanReaderTest)) {
query.On("Exec").Return(nil)
logger, logBuffer := testutils.NewLogger()
metricsFactory := metricstest.NewFactory(0)
tracer, exp, closer := tracerProvider(t)
defer closer()
r := &spanReaderTest{
session: session,
logger: logger,
logBuffer: logBuffer,
reader: NewSpanReader(session, metricsFactory, logger),
session: session,
logger: logger,
logBuffer: logBuffer,
traceBuffer: exp,
reader: NewSpanReader(session, metricsFactory, logger, tracer.Tracer("test")),
}
fn(r)
}

var _ spanstore.Reader = &SpanReader{} // check API conformance

func TestSpanReaderGetServices(t *testing.T) {
withSpanReader(func(r *spanReaderTest) {
withSpanReader(t, func(r *spanReaderTest) {
r.reader.serviceNamesReader = func() ([]string, error) { return []string{"service-a"}, nil }
s, err := r.reader.GetServices(context.Background())
assert.NoError(t, err)
Expand All @@ -73,7 +92,7 @@ func TestSpanReaderGetServices(t *testing.T) {
}

func TestSpanReaderGetOperations(t *testing.T) {
withSpanReader(func(r *spanReaderTest) {
withSpanReader(t, func(r *spanReaderTest) {
expectedOperations := []spanstore.Operation{
{
Name: "operation-a",
Expand Down Expand Up @@ -121,7 +140,7 @@ func TestSpanReaderGetTrace(t *testing.T) {
for _, tc := range testCases {
testCase := tc // capture loop var
t.Run("expected err="+testCase.expectedErr, func(t *testing.T) {
withSpanReader(func(r *spanReaderTest) {
withSpanReader(t, func(r *spanReaderTest) {
iter := &mocks.Iterator{}
iter.On("Scan", testCase.scanner).Return(true)
iter.On("Scan", matchEverything()).Return(false)
Expand All @@ -135,6 +154,7 @@ func TestSpanReaderGetTrace(t *testing.T) {

trace, err := r.reader.GetTrace(context.Background(), model.TraceID{})
if testCase.expectedErr == "" {
require.NotEmpty(t, r.traceBuffer.GetSpans(), "Spans recorded")
assert.NoError(t, err)
assert.NotNil(t, trace)
} else {
Expand All @@ -148,7 +168,7 @@ func TestSpanReaderGetTrace(t *testing.T) {
}

func TestSpanReaderGetTrace_TraceNotFound(t *testing.T) {
withSpanReader(func(r *spanReaderTest) {
withSpanReader(t, func(r *spanReaderTest) {
iter := &mocks.Iterator{}
iter.On("Scan", matchEverything()).Return(false)
iter.On("Close").Return(nil)
Expand All @@ -160,14 +180,16 @@ func TestSpanReaderGetTrace_TraceNotFound(t *testing.T) {
r.session.On("Query", mock.AnythingOfType("string"), matchEverything()).Return(query)

trace, err := r.reader.GetTrace(context.Background(), model.TraceID{})
require.NotEmpty(t, r.traceBuffer.GetSpans(), "Spans recorded")
assert.Nil(t, trace)
assert.EqualError(t, err, "trace not found")
})
}

func TestSpanReaderFindTracesBadRequest(t *testing.T) {
withSpanReader(func(r *spanReaderTest) {
withSpanReader(t, func(r *spanReaderTest) {
_, err := r.reader.FindTraces(context.Background(), nil)
require.Empty(t, r.traceBuffer.GetSpans(), "Spans Not recorded")
assert.Error(t, err)
})
}
Expand Down Expand Up @@ -286,7 +308,7 @@ func TestSpanReaderFindTraces(t *testing.T) {
for _, tc := range testCases {
testCase := tc // capture loop var
t.Run(testCase.caption, func(t *testing.T) {
withSpanReader(func(r *spanReaderTest) {
withSpanReader(t, func(r *spanReaderTest) {
// scanMatcher can match Iter.Scan() parameters and set trace ID fields
scanMatcher := func(name string) interface{} {
traceIDs := []dbmodel.TraceID{
Expand Down Expand Up @@ -384,6 +406,7 @@ func TestSpanReaderFindTraces(t *testing.T) {
}
res, err := r.reader.FindTraces(context.Background(), queryParams)
if testCase.expectedError == "" {
require.NotEmpty(t, r.traceBuffer.GetSpans(), "Spans recorded")
assert.NoError(t, err)
assert.Len(t, res, testCase.expectedCount, "expecting certain number of traces")
} else {
Expand Down

0 comments on commit 2881485

Please sign in to comment.