From db42f178602e4dba1c98705a9ed683fa16aa32c8 Mon Sep 17 00:00:00 2001 From: Afzal Ansari Date: Sun, 23 Jul 2023 09:14:13 +0000 Subject: [PATCH 01/16] adds otel tracer to spanReader Signed-off-by: Afzal Ansari --- plugin/storage/es/spanstore/reader.go | 52 +++++++++++++--------- plugin/storage/es/spanstore/reader_test.go | 24 +++++----- 2 files changed, 44 insertions(+), 32 deletions(-) diff --git a/plugin/storage/es/spanstore/reader.go b/plugin/storage/es/spanstore/reader.go index 7165d9d1e38..859ced4e322 100644 --- a/plugin/storage/es/spanstore/reader.go +++ b/plugin/storage/es/spanstore/reader.go @@ -21,16 +21,16 @@ import ( "encoding/json" "errors" "fmt" + "log" "time" "github.com/olivere/elastic" - "github.com/opentracing/opentracing-go" - ottag "github.com/opentracing/opentracing-go/ext" - otlog "github.com/opentracing/opentracing-go/log" + "go.opentelemetry.io/otel/trace" "go.uber.org/zap" "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/pkg/es" + "github.com/jaegertracing/jaeger/pkg/jtracer" "github.com/jaegertracing/jaeger/pkg/metrics" "github.com/jaegertracing/jaeger/plugin/storage/es/spanstore/dbmodel" "github.com/jaegertracing/jaeger/storage/spanstore" @@ -109,6 +109,7 @@ type SpanReader struct { sourceFn sourceFn maxDocCount int useReadWriteAliases bool + tracer *jtracer.JTracer } // SpanReaderParams holds constructor params for NewSpanReader @@ -127,6 +128,7 @@ type SpanReaderParams struct { Archive bool UseReadWriteAliases bool RemoteReadClusters []string + Tracer *jtracer.JTracer } // NewSpanReader returns a new SpanReader with a metrics. @@ -137,6 +139,14 @@ func NewSpanReader(p SpanReaderParams) *SpanReader { if p.UseReadWriteAliases { maxSpanAge = rolloverMaxSpanAge } + jt, err := jtracer.New("spanstore") + if err != nil { + log.Fatal("Failed to initialise tracer", zap.Error(err)) + } + p.Tracer = jt + if err := p.Tracer.Close(context.Background()); err != nil { + log.Fatal("Error shutting down tracer provider", zap.Error(err)) + } return &SpanReader{ client: p.Client, logger: p.Logger, @@ -153,6 +163,7 @@ func NewSpanReader(p SpanReaderParams) *SpanReader { sourceFn: getSourceFn(p.Archive, p.MaxDocCount), maxDocCount: p.MaxDocCount, useReadWriteAliases: p.UseReadWriteAliases, + tracer: p.Tracer, } } @@ -238,8 +249,8 @@ func indexNames(prefix, index string) string { // GetTrace takes a traceID and returns a Trace associated with that traceID func (s *SpanReader) GetTrace(ctx context.Context, traceID model.TraceID) (*model.Trace, error) { - span, ctx := opentracing.StartSpanFromContext(ctx, "GetTrace") - defer span.Finish() + ctx, span := s.tracer.OTEL.Tracer("spanreader").Start(ctx, "GetTrace") + defer span.End() currentTime := time.Now() traces, err := s.multiRead(ctx, []model.TraceID{traceID}, currentTime.Add(-s.maxSpanAge), currentTime) if err != nil { @@ -283,8 +294,8 @@ func (s *SpanReader) unmarshalJSONSpan(esSpanRaw *elastic.SearchHit) (*dbmodel.S // GetServices returns all services traced by Jaeger, ordered by frequency func (s *SpanReader) GetServices(ctx context.Context) ([]string, error) { - span, ctx := opentracing.StartSpanFromContext(ctx, "GetServices") - defer span.Finish() + ctx, span := s.tracer.OTEL.Tracer("spanreader").Start(ctx, "GetService") + defer span.End() currentTime := time.Now() jaegerIndices := s.timeRangeIndices(s.serviceIndexPrefix, s.serviceIndexDateLayout, currentTime.Add(-s.maxSpanAge), currentTime, s.serviceIndexRolloverFrequency) return s.serviceOperationStorage.getServices(ctx, jaegerIndices, s.maxDocCount) @@ -295,8 +306,8 @@ func (s *SpanReader) GetOperations( ctx context.Context, query spanstore.OperationQueryParameters, ) ([]spanstore.Operation, error) { - span, ctx := opentracing.StartSpanFromContext(ctx, "GetOperations") - defer span.Finish() + ctx, span := s.tracer.OTEL.Tracer("spanreader").Start(ctx, "GetOperations") + defer span.End() currentTime := time.Now() jaegerIndices := s.timeRangeIndices(s.serviceIndexPrefix, s.serviceIndexDateLayout, currentTime.Add(-s.maxSpanAge), currentTime, s.serviceIndexRolloverFrequency) operations, err := s.serviceOperationStorage.getOperations(ctx, jaegerIndices, query.ServiceName, s.maxDocCount) @@ -329,8 +340,8 @@ func bucketToStringArray(buckets []*elastic.AggregationBucketKeyItem) ([]string, // FindTraces retrieves traces that match the traceQuery func (s *SpanReader) FindTraces(ctx context.Context, traceQuery *spanstore.TraceQueryParameters) ([]*model.Trace, error) { - span, ctx := opentracing.StartSpanFromContext(ctx, "FindTraces") - defer span.Finish() + ctx, span := s.tracer.OTEL.Tracer("spanreader").Start(ctx, "GetOperations") + defer span.End() uniqueTraceIDs, err := s.FindTraceIDs(ctx, traceQuery) if err != nil { @@ -341,8 +352,8 @@ func (s *SpanReader) FindTraces(ctx context.Context, traceQuery *spanstore.Trace // FindTraceIDs retrieves traces IDs that match the traceQuery func (s *SpanReader) FindTraceIDs(ctx context.Context, traceQuery *spanstore.TraceQueryParameters) ([]model.TraceID, error) { - span, ctx := opentracing.StartSpanFromContext(ctx, "FindTraceIDs") - defer span.Finish() + ctx, span := s.tracer.OTEL.Tracer("spanreader").Start(ctx, "FindTraceIDs") + defer span.End() if err := validateQuery(traceQuery); err != nil { return nil, err @@ -360,9 +371,9 @@ func (s *SpanReader) FindTraceIDs(ctx context.Context, traceQuery *spanstore.Tra } func (s *SpanReader) multiRead(ctx context.Context, traceIDs []model.TraceID, startTime, endTime time.Time) ([]*model.Trace, error) { - childSpan, _ := opentracing.StartSpanFromContext(ctx, "multiRead") - childSpan.LogFields(otlog.Object("trace_ids", traceIDs)) - defer childSpan.Finish() + ctx, childSpan := s.tracer.OTEL.Tracer("spanreader").Start(ctx, "multiRead") + childSpan.AddEvent("trace_ids", trace.WithAttributes()) + defer childSpan.End() if len(traceIDs) == 0 { return []*model.Trace{}, nil @@ -503,8 +514,8 @@ func validateQuery(p *spanstore.TraceQueryParameters) error { } func (s *SpanReader) findTraceIDs(ctx context.Context, traceQuery *spanstore.TraceQueryParameters) ([]string, error) { - childSpan, _ := opentracing.StartSpanFromContext(ctx, "findTraceIDs") - defer childSpan.Finish() + ctx, childSpan := s.tracer.OTEL.Tracer("spanreader").Start(ctx, "findTraceIDs") + defer childSpan.End() // Below is the JSON body to our HTTP GET request to ElasticSearch. This function creates this. // { // "size": 0, @@ -686,7 +697,6 @@ func (s *SpanReader) buildObjectQuery(field string, k string, v string) elastic. return elastic.NewBoolQuery().Must(keyQuery) } -func logErrorToSpan(span opentracing.Span, err error) { - ottag.Error.Set(span, true) - span.LogFields(otlog.Error(err)) +func logErrorToSpan(span trace.Span, err error) { + span.RecordError(err) } diff --git a/plugin/storage/es/spanstore/reader_test.go b/plugin/storage/es/spanstore/reader_test.go index ea7f0073f50..5f3a4805f57 100644 --- a/plugin/storage/es/spanstore/reader_test.go +++ b/plugin/storage/es/spanstore/reader_test.go @@ -34,6 +34,7 @@ import ( "github.com/jaegertracing/jaeger/internal/metricstest" "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/pkg/es/mocks" + "github.com/jaegertracing/jaeger/pkg/jtracer" "github.com/jaegertracing/jaeger/pkg/testutils" "github.com/jaegertracing/jaeger/plugin/storage/es/spanstore/dbmodel" "github.com/jaegertracing/jaeger/storage/spanstore" @@ -170,6 +171,7 @@ func TestSpanReaderIndices(t *testing.T) { serviceDataLayout := "2006-01-02" spanDataLayoutFormat := date.UTC().Format(spanDataLayout) serviceDataLayoutFormat := date.UTC().Format(serviceDataLayout) + tracer := jtracer.NoOp() testCases := []struct { indices []string @@ -177,56 +179,56 @@ func TestSpanReaderIndices(t *testing.T) { }{ { params: SpanReaderParams{ - Client: client, Logger: logger, MetricsFactory: metricsFactory, + Client: client, Logger: logger, MetricsFactory: metricsFactory, Tracer: tracer, IndexPrefix: "", Archive: false, SpanIndexDateLayout: spanDataLayout, ServiceIndexDateLayout: serviceDataLayout, }, indices: []string{spanIndex + spanDataLayoutFormat, serviceIndex + serviceDataLayoutFormat}, }, { params: SpanReaderParams{ - Client: client, Logger: logger, MetricsFactory: metricsFactory, + Client: client, Logger: logger, MetricsFactory: metricsFactory, Tracer: tracer, IndexPrefix: "", UseReadWriteAliases: true, }, indices: []string{spanIndex + "read", serviceIndex + "read"}, }, { params: SpanReaderParams{ - Client: client, Logger: logger, MetricsFactory: metricsFactory, + Client: client, Logger: logger, MetricsFactory: metricsFactory, Tracer: tracer, IndexPrefix: "foo:", Archive: false, SpanIndexDateLayout: spanDataLayout, ServiceIndexDateLayout: serviceDataLayout, }, indices: []string{"foo:" + indexPrefixSeparator + spanIndex + spanDataLayoutFormat, "foo:" + indexPrefixSeparator + serviceIndex + serviceDataLayoutFormat}, }, { params: SpanReaderParams{ - Client: client, Logger: logger, MetricsFactory: metricsFactory, + Client: client, Logger: logger, MetricsFactory: metricsFactory, Tracer: tracer, IndexPrefix: "foo:", UseReadWriteAliases: true, }, indices: []string{"foo:-" + spanIndex + "read", "foo:-" + serviceIndex + "read"}, }, { params: SpanReaderParams{ - Client: client, Logger: logger, MetricsFactory: metricsFactory, + Client: client, Logger: logger, MetricsFactory: metricsFactory, Tracer: tracer, IndexPrefix: "", Archive: true, }, indices: []string{spanIndex + archiveIndexSuffix, serviceIndex + archiveIndexSuffix}, }, { params: SpanReaderParams{ - Client: client, Logger: logger, MetricsFactory: metricsFactory, + Client: client, Logger: logger, MetricsFactory: metricsFactory, Tracer: tracer, IndexPrefix: "foo:", Archive: true, }, indices: []string{"foo:" + indexPrefixSeparator + spanIndex + archiveIndexSuffix, "foo:" + indexPrefixSeparator + serviceIndex + archiveIndexSuffix}, }, { params: SpanReaderParams{ - Client: client, Logger: logger, MetricsFactory: metricsFactory, + Client: client, Logger: logger, MetricsFactory: metricsFactory, Tracer: tracer, IndexPrefix: "foo:", Archive: true, UseReadWriteAliases: true, }, indices: []string{"foo:" + indexPrefixSeparator + spanIndex + archiveReadIndexSuffix, "foo:" + indexPrefixSeparator + serviceIndex + archiveReadIndexSuffix}, }, { params: SpanReaderParams{ - Client: client, Logger: logger, MetricsFactory: metricsFactory, + Client: client, Logger: logger, MetricsFactory: metricsFactory, Tracer: tracer, IndexPrefix: "", Archive: false, RemoteReadClusters: []string{"cluster_one", "cluster_two"}, SpanIndexDateLayout: spanDataLayout, ServiceIndexDateLayout: serviceDataLayout, }, indices: []string{ @@ -240,7 +242,7 @@ func TestSpanReaderIndices(t *testing.T) { }, { params: SpanReaderParams{ - Client: client, Logger: logger, MetricsFactory: metricsFactory, + Client: client, Logger: logger, MetricsFactory: metricsFactory, Tracer: tracer, IndexPrefix: "", Archive: true, RemoteReadClusters: []string{"cluster_one", "cluster_two"}, }, indices: []string{ @@ -254,7 +256,7 @@ func TestSpanReaderIndices(t *testing.T) { }, { params: SpanReaderParams{ - Client: client, Logger: logger, MetricsFactory: metricsFactory, + Client: client, Logger: logger, MetricsFactory: metricsFactory, Tracer: tracer, IndexPrefix: "", Archive: false, UseReadWriteAliases: true, RemoteReadClusters: []string{"cluster_one", "cluster_two"}, }, indices: []string{ @@ -268,7 +270,7 @@ func TestSpanReaderIndices(t *testing.T) { }, { params: SpanReaderParams{ - Client: client, Logger: logger, MetricsFactory: metricsFactory, + Client: client, Logger: logger, MetricsFactory: metricsFactory, Tracer: tracer, IndexPrefix: "", Archive: true, UseReadWriteAliases: true, RemoteReadClusters: []string{"cluster_one", "cluster_two"}, }, indices: []string{ From a9b5adb25901d790a1ba22f46738229eb1bbfe52 Mon Sep 17 00:00:00 2001 From: Afzal Ansari Date: Sun, 23 Jul 2023 09:31:21 +0000 Subject: [PATCH 02/16] updates jtracer API call Signed-off-by: Afzal Ansari --- plugin/storage/es/spanstore/reader.go | 28 ++++++++++------------ plugin/storage/es/spanstore/reader_test.go | 2 +- 2 files changed, 14 insertions(+), 16 deletions(-) diff --git a/plugin/storage/es/spanstore/reader.go b/plugin/storage/es/spanstore/reader.go index 859ced4e322..d9b0e43ad31 100644 --- a/plugin/storage/es/spanstore/reader.go +++ b/plugin/storage/es/spanstore/reader.go @@ -25,6 +25,7 @@ import ( "time" "github.com/olivere/elastic" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" "go.uber.org/zap" @@ -109,7 +110,7 @@ type SpanReader struct { sourceFn sourceFn maxDocCount int useReadWriteAliases bool - tracer *jtracer.JTracer + tracer trace.Tracer } // SpanReaderParams holds constructor params for NewSpanReader @@ -128,7 +129,7 @@ type SpanReaderParams struct { Archive bool UseReadWriteAliases bool RemoteReadClusters []string - Tracer *jtracer.JTracer + Tracer trace.Tracer } // NewSpanReader returns a new SpanReader with a metrics. @@ -143,10 +144,7 @@ func NewSpanReader(p SpanReaderParams) *SpanReader { if err != nil { log.Fatal("Failed to initialise tracer", zap.Error(err)) } - p.Tracer = jt - if err := p.Tracer.Close(context.Background()); err != nil { - log.Fatal("Error shutting down tracer provider", zap.Error(err)) - } + defer jt.Close(context.Background()) return &SpanReader{ client: p.Client, logger: p.Logger, @@ -163,7 +161,7 @@ func NewSpanReader(p SpanReaderParams) *SpanReader { sourceFn: getSourceFn(p.Archive, p.MaxDocCount), maxDocCount: p.MaxDocCount, useReadWriteAliases: p.UseReadWriteAliases, - tracer: p.Tracer, + tracer: jt.OTEL.Tracer("spanreader"), } } @@ -249,7 +247,7 @@ func indexNames(prefix, index string) string { // GetTrace takes a traceID and returns a Trace associated with that traceID func (s *SpanReader) GetTrace(ctx context.Context, traceID model.TraceID) (*model.Trace, error) { - ctx, span := s.tracer.OTEL.Tracer("spanreader").Start(ctx, "GetTrace") + ctx, span := s.tracer.Start(ctx, "GetTrace") defer span.End() currentTime := time.Now() traces, err := s.multiRead(ctx, []model.TraceID{traceID}, currentTime.Add(-s.maxSpanAge), currentTime) @@ -294,7 +292,7 @@ func (s *SpanReader) unmarshalJSONSpan(esSpanRaw *elastic.SearchHit) (*dbmodel.S // GetServices returns all services traced by Jaeger, ordered by frequency func (s *SpanReader) GetServices(ctx context.Context) ([]string, error) { - ctx, span := s.tracer.OTEL.Tracer("spanreader").Start(ctx, "GetService") + ctx, span := s.tracer.Start(ctx, "GetService") defer span.End() currentTime := time.Now() jaegerIndices := s.timeRangeIndices(s.serviceIndexPrefix, s.serviceIndexDateLayout, currentTime.Add(-s.maxSpanAge), currentTime, s.serviceIndexRolloverFrequency) @@ -306,7 +304,7 @@ func (s *SpanReader) GetOperations( ctx context.Context, query spanstore.OperationQueryParameters, ) ([]spanstore.Operation, error) { - ctx, span := s.tracer.OTEL.Tracer("spanreader").Start(ctx, "GetOperations") + ctx, span := s.tracer.Start(ctx, "GetOperations") defer span.End() currentTime := time.Now() jaegerIndices := s.timeRangeIndices(s.serviceIndexPrefix, s.serviceIndexDateLayout, currentTime.Add(-s.maxSpanAge), currentTime, s.serviceIndexRolloverFrequency) @@ -340,7 +338,7 @@ func bucketToStringArray(buckets []*elastic.AggregationBucketKeyItem) ([]string, // FindTraces retrieves traces that match the traceQuery func (s *SpanReader) FindTraces(ctx context.Context, traceQuery *spanstore.TraceQueryParameters) ([]*model.Trace, error) { - ctx, span := s.tracer.OTEL.Tracer("spanreader").Start(ctx, "GetOperations") + ctx, span := s.tracer.Start(ctx, "GetOperations") defer span.End() uniqueTraceIDs, err := s.FindTraceIDs(ctx, traceQuery) @@ -352,7 +350,7 @@ func (s *SpanReader) FindTraces(ctx context.Context, traceQuery *spanstore.Trace // FindTraceIDs retrieves traces IDs that match the traceQuery func (s *SpanReader) FindTraceIDs(ctx context.Context, traceQuery *spanstore.TraceQueryParameters) ([]model.TraceID, error) { - ctx, span := s.tracer.OTEL.Tracer("spanreader").Start(ctx, "FindTraceIDs") + ctx, span := s.tracer.Start(ctx, "FindTraceIDs") defer span.End() if err := validateQuery(traceQuery); err != nil { @@ -371,8 +369,8 @@ func (s *SpanReader) FindTraceIDs(ctx context.Context, traceQuery *spanstore.Tra } func (s *SpanReader) multiRead(ctx context.Context, traceIDs []model.TraceID, startTime, endTime time.Time) ([]*model.Trace, error) { - ctx, childSpan := s.tracer.OTEL.Tracer("spanreader").Start(ctx, "multiRead") - childSpan.AddEvent("trace_ids", trace.WithAttributes()) + ctx, childSpan := s.tracer.Start(ctx, "multiRead") + childSpan.SetAttributes(attribute.Key("trace_ids").String("")) defer childSpan.End() if len(traceIDs) == 0 { @@ -514,7 +512,7 @@ func validateQuery(p *spanstore.TraceQueryParameters) error { } func (s *SpanReader) findTraceIDs(ctx context.Context, traceQuery *spanstore.TraceQueryParameters) ([]string, error) { - ctx, childSpan := s.tracer.OTEL.Tracer("spanreader").Start(ctx, "findTraceIDs") + ctx, childSpan := s.tracer.Start(ctx, "findTraceIDs") defer childSpan.End() // Below is the JSON body to our HTTP GET request to ElasticSearch. This function creates this. // { diff --git a/plugin/storage/es/spanstore/reader_test.go b/plugin/storage/es/spanstore/reader_test.go index 5f3a4805f57..cfebbbad406 100644 --- a/plugin/storage/es/spanstore/reader_test.go +++ b/plugin/storage/es/spanstore/reader_test.go @@ -171,7 +171,7 @@ func TestSpanReaderIndices(t *testing.T) { serviceDataLayout := "2006-01-02" spanDataLayoutFormat := date.UTC().Format(spanDataLayout) serviceDataLayoutFormat := date.UTC().Format(serviceDataLayout) - tracer := jtracer.NoOp() + tracer := jtracer.NoOp().OTEL.Tracer("") testCases := []struct { indices []string From e61c5c4888e972dbd4fb9485efcbebba6bfc4eec Mon Sep 17 00:00:00 2001 From: Afzal Ansari Date: Sun, 23 Jul 2023 12:02:41 +0000 Subject: [PATCH 03/16] rmvs defer for err Signed-off-by: Afzal Ansari --- plugin/storage/es/spanstore/reader.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/plugin/storage/es/spanstore/reader.go b/plugin/storage/es/spanstore/reader.go index d9b0e43ad31..e2283a9d4f7 100644 --- a/plugin/storage/es/spanstore/reader.go +++ b/plugin/storage/es/spanstore/reader.go @@ -144,7 +144,9 @@ func NewSpanReader(p SpanReaderParams) *SpanReader { if err != nil { log.Fatal("Failed to initialise tracer", zap.Error(err)) } - defer jt.Close(context.Background()) + if err := jt.Close(context.Background()); err != nil { + log.Fatal("Error shutting down tracer provider", zap.Error(err)) + } return &SpanReader{ client: p.Client, logger: p.Logger, From 8577650d06f355a3941be3a3fe761a574d0feb7d Mon Sep 17 00:00:00 2001 From: Afzal Ansari Date: Mon, 24 Jul 2023 19:45:48 +0000 Subject: [PATCH 04/16] adds otel tracer as a global Signed-off-by: Afzal Ansari --- plugin/storage/es/factory.go | 2 ++ plugin/storage/es/spanstore/reader.go | 20 +++++++++---------- plugin/storage/es/spanstore/reader_test.go | 18 +++++++---------- .../storage/integration/elasticsearch_test.go | 2 ++ 4 files changed, 20 insertions(+), 22 deletions(-) diff --git a/plugin/storage/es/factory.go b/plugin/storage/es/factory.go index d0f78c294f3..9a22957b559 100644 --- a/plugin/storage/es/factory.go +++ b/plugin/storage/es/factory.go @@ -21,6 +21,7 @@ import ( "io" "github.com/spf13/viper" + "go.opentelemetry.io/otel" "go.uber.org/zap" "github.com/jaegertracing/jaeger/pkg/es" @@ -162,6 +163,7 @@ func createSpanReader( UseReadWriteAliases: cfg.UseReadWriteAliases, Archive: archive, RemoteReadClusters: cfg.RemoteReadClusters, + Tracer: otel.GetTracerProvider().Tracer("span-reader"), }), nil } diff --git a/plugin/storage/es/spanstore/reader.go b/plugin/storage/es/spanstore/reader.go index e2283a9d4f7..00a5a928070 100644 --- a/plugin/storage/es/spanstore/reader.go +++ b/plugin/storage/es/spanstore/reader.go @@ -21,7 +21,6 @@ import ( "encoding/json" "errors" "fmt" - "log" "time" "github.com/olivere/elastic" @@ -31,7 +30,6 @@ import ( "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/pkg/es" - "github.com/jaegertracing/jaeger/pkg/jtracer" "github.com/jaegertracing/jaeger/pkg/metrics" "github.com/jaegertracing/jaeger/plugin/storage/es/spanstore/dbmodel" "github.com/jaegertracing/jaeger/storage/spanstore" @@ -140,13 +138,6 @@ func NewSpanReader(p SpanReaderParams) *SpanReader { if p.UseReadWriteAliases { maxSpanAge = rolloverMaxSpanAge } - jt, err := jtracer.New("spanstore") - if err != nil { - log.Fatal("Failed to initialise tracer", zap.Error(err)) - } - if err := jt.Close(context.Background()); err != nil { - log.Fatal("Error shutting down tracer provider", zap.Error(err)) - } return &SpanReader{ client: p.Client, logger: p.Logger, @@ -163,7 +154,7 @@ func NewSpanReader(p SpanReaderParams) *SpanReader { sourceFn: getSourceFn(p.Archive, p.MaxDocCount), maxDocCount: p.MaxDocCount, useReadWriteAliases: p.UseReadWriteAliases, - tracer: jt.OTEL.Tracer("spanreader"), + tracer: p.Tracer, } } @@ -372,9 +363,16 @@ func (s *SpanReader) FindTraceIDs(ctx context.Context, traceQuery *spanstore.Tra func (s *SpanReader) multiRead(ctx context.Context, traceIDs []model.TraceID, startTime, endTime time.Time) ([]*model.Trace, error) { ctx, childSpan := s.tracer.Start(ctx, "multiRead") - childSpan.SetAttributes(attribute.Key("trace_ids").String("")) defer childSpan.End() + tracesIDs := make([]string, len(traceIDs)) + if childSpan.IsRecording() { + for i, traceID := range traceIDs { + tracesIDs[i] = traceID.String() + } + } + childSpan.SetAttributes(attribute.Key("trace_ids").StringSlice(tracesIDs)) + if len(traceIDs) == 0 { return []*model.Trace{}, nil } diff --git a/plugin/storage/es/spanstore/reader_test.go b/plugin/storage/es/spanstore/reader_test.go index cfebbbad406..70f877e2919 100644 --- a/plugin/storage/es/spanstore/reader_test.go +++ b/plugin/storage/es/spanstore/reader_test.go @@ -179,56 +179,48 @@ func TestSpanReaderIndices(t *testing.T) { }{ { params: SpanReaderParams{ - Client: client, Logger: logger, MetricsFactory: metricsFactory, Tracer: tracer, IndexPrefix: "", Archive: false, SpanIndexDateLayout: spanDataLayout, ServiceIndexDateLayout: serviceDataLayout, }, indices: []string{spanIndex + spanDataLayoutFormat, serviceIndex + serviceDataLayoutFormat}, }, { params: SpanReaderParams{ - Client: client, Logger: logger, MetricsFactory: metricsFactory, Tracer: tracer, IndexPrefix: "", UseReadWriteAliases: true, }, indices: []string{spanIndex + "read", serviceIndex + "read"}, }, { params: SpanReaderParams{ - Client: client, Logger: logger, MetricsFactory: metricsFactory, Tracer: tracer, IndexPrefix: "foo:", Archive: false, SpanIndexDateLayout: spanDataLayout, ServiceIndexDateLayout: serviceDataLayout, }, indices: []string{"foo:" + indexPrefixSeparator + spanIndex + spanDataLayoutFormat, "foo:" + indexPrefixSeparator + serviceIndex + serviceDataLayoutFormat}, }, { params: SpanReaderParams{ - Client: client, Logger: logger, MetricsFactory: metricsFactory, Tracer: tracer, IndexPrefix: "foo:", UseReadWriteAliases: true, }, indices: []string{"foo:-" + spanIndex + "read", "foo:-" + serviceIndex + "read"}, }, { params: SpanReaderParams{ - Client: client, Logger: logger, MetricsFactory: metricsFactory, Tracer: tracer, IndexPrefix: "", Archive: true, }, indices: []string{spanIndex + archiveIndexSuffix, serviceIndex + archiveIndexSuffix}, }, { params: SpanReaderParams{ - Client: client, Logger: logger, MetricsFactory: metricsFactory, Tracer: tracer, IndexPrefix: "foo:", Archive: true, }, indices: []string{"foo:" + indexPrefixSeparator + spanIndex + archiveIndexSuffix, "foo:" + indexPrefixSeparator + serviceIndex + archiveIndexSuffix}, }, { params: SpanReaderParams{ - Client: client, Logger: logger, MetricsFactory: metricsFactory, Tracer: tracer, IndexPrefix: "foo:", Archive: true, UseReadWriteAliases: true, }, indices: []string{"foo:" + indexPrefixSeparator + spanIndex + archiveReadIndexSuffix, "foo:" + indexPrefixSeparator + serviceIndex + archiveReadIndexSuffix}, }, { params: SpanReaderParams{ - Client: client, Logger: logger, MetricsFactory: metricsFactory, Tracer: tracer, IndexPrefix: "", Archive: false, RemoteReadClusters: []string{"cluster_one", "cluster_two"}, SpanIndexDateLayout: spanDataLayout, ServiceIndexDateLayout: serviceDataLayout, }, indices: []string{ @@ -242,7 +234,6 @@ func TestSpanReaderIndices(t *testing.T) { }, { params: SpanReaderParams{ - Client: client, Logger: logger, MetricsFactory: metricsFactory, Tracer: tracer, IndexPrefix: "", Archive: true, RemoteReadClusters: []string{"cluster_one", "cluster_two"}, }, indices: []string{ @@ -256,7 +247,6 @@ func TestSpanReaderIndices(t *testing.T) { }, { params: SpanReaderParams{ - Client: client, Logger: logger, MetricsFactory: metricsFactory, Tracer: tracer, IndexPrefix: "", Archive: false, UseReadWriteAliases: true, RemoteReadClusters: []string{"cluster_one", "cluster_two"}, }, indices: []string{ @@ -270,7 +260,6 @@ func TestSpanReaderIndices(t *testing.T) { }, { params: SpanReaderParams{ - Client: client, Logger: logger, MetricsFactory: metricsFactory, Tracer: tracer, IndexPrefix: "", Archive: true, UseReadWriteAliases: true, RemoteReadClusters: []string{"cluster_one", "cluster_two"}, }, indices: []string{ @@ -284,6 +273,13 @@ func TestSpanReaderIndices(t *testing.T) { }, } for _, testCase := range testCases { + testCase.params = SpanReaderParams{ + Client: client, + Logger: logger, + MetricsFactory: metricsFactory, + Tracer: tracer, + + } r := NewSpanReader(testCase.params) actualSpan := r.timeRangeIndices(r.spanIndexPrefix, r.spanIndexDateLayout, date, date, -1*time.Hour) diff --git a/plugin/storage/integration/elasticsearch_test.go b/plugin/storage/integration/elasticsearch_test.go index 9d2ec031b1f..80784e8cb4f 100644 --- a/plugin/storage/integration/elasticsearch_test.go +++ b/plugin/storage/integration/elasticsearch_test.go @@ -33,6 +33,7 @@ import ( "github.com/jaegertracing/jaeger/model" estemplate "github.com/jaegertracing/jaeger/pkg/es" eswrapper "github.com/jaegertracing/jaeger/pkg/es/wrapper" + "github.com/jaegertracing/jaeger/pkg/jtracer" "github.com/jaegertracing/jaeger/pkg/metrics" "github.com/jaegertracing/jaeger/pkg/testutils" "github.com/jaegertracing/jaeger/plugin/storage/es/dependencystore" @@ -151,6 +152,7 @@ func (s *ESStorageIntegration) initSpanstore(allTagsAsFields, archive bool) erro TagDotReplacement: tagKeyDeDotChar, Archive: archive, MaxDocCount: defaultMaxDocCount, + Tracer: jtracer.NoOp().OTEL.Tracer(""), }) dependencyStore := dependencystore.NewDependencyStore(dependencystore.DependencyStoreParams{ Client: client, From f0fbead7b4368407a097b0f709d4bb79cc58fa8e Mon Sep 17 00:00:00 2001 From: Afzal Ansari Date: Tue, 25 Jul 2023 18:58:14 +0000 Subject: [PATCH 05/16] adds tracer to factory to init Signed-off-by: Afzal Ansari --- plugin/storage/es/factory.go | 34 +++++++++++-------- plugin/storage/es/spanstore/reader.go | 12 +++---- plugin/storage/es/spanstore/reader_test.go | 17 ++++------ .../storage/integration/elasticsearch_test.go | 2 +- 4 files changed, 33 insertions(+), 32 deletions(-) diff --git a/plugin/storage/es/factory.go b/plugin/storage/es/factory.go index 9a22957b559..bad23bed879 100644 --- a/plugin/storage/es/factory.go +++ b/plugin/storage/es/factory.go @@ -22,6 +22,7 @@ import ( "github.com/spf13/viper" "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/trace" "go.uber.org/zap" "github.com/jaegertracing/jaeger/pkg/es" @@ -51,6 +52,7 @@ type Factory struct { metricsFactory metrics.Factory logger *zap.Logger + tracer trace.TracerProvider newClientFn func(c *config.Configuration, logger *zap.Logger, metricsFactory metrics.Factory) (es.Client, error) @@ -65,6 +67,7 @@ func NewFactory() *Factory { return &Factory{ Options: NewOptions(primaryNamespace, archiveNamespace), newClientFn: config.NewClient, + tracer: otel.GetTracerProvider(), } } @@ -109,17 +112,17 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) // CreateSpanReader implements storage.Factory func (f *Factory) CreateSpanReader() (spanstore.Reader, error) { - return createSpanReader(f.metricsFactory, f.logger, f.primaryClient, f.primaryConfig, false) + return createSpanReader(f.primaryClient, f.primaryConfig, false, f.metricsFactory, f.logger, f.tracer) } // CreateSpanWriter implements storage.Factory func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) { - return createSpanWriter(f.metricsFactory, f.logger, f.primaryClient, f.primaryConfig, false) + return createSpanWriter(f.primaryClient, f.primaryConfig, false, f.metricsFactory, f.logger) } // CreateDependencyReader implements storage.Factory func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) { - return createDependencyReader(f.logger, f.primaryClient, f.primaryConfig) + return createDependencyReader(f.primaryClient, f.primaryConfig, f.logger) } // CreateArchiveSpanReader implements storage.ArchiveFactory @@ -127,7 +130,7 @@ func (f *Factory) CreateArchiveSpanReader() (spanstore.Reader, error) { if !f.archiveConfig.Enabled { return nil, nil } - return createSpanReader(f.metricsFactory, f.logger, f.archiveClient, f.archiveConfig, true) + return createSpanReader(f.archiveClient, f.archiveConfig, true, f.metricsFactory, f.logger, f.tracer) } // CreateArchiveSpanWriter implements storage.ArchiveFactory @@ -135,23 +138,22 @@ func (f *Factory) CreateArchiveSpanWriter() (spanstore.Writer, error) { if !f.archiveConfig.Enabled { return nil, nil } - return createSpanWriter(f.metricsFactory, f.logger, f.archiveClient, f.archiveConfig, true) + return createSpanWriter(f.archiveClient, f.archiveConfig, true, f.metricsFactory, f.logger) } func createSpanReader( - mFactory metrics.Factory, - logger *zap.Logger, client es.Client, cfg *config.Configuration, archive bool, + mFactory metrics.Factory, + logger *zap.Logger, + tp trace.TracerProvider, ) (spanstore.Reader, error) { if cfg.UseILM && !cfg.UseReadWriteAliases { return nil, fmt.Errorf("--es.use-ilm must always be used in conjunction with --es.use-aliases to ensure ES writers and readers refer to the single index mapping") } return esSpanStore.NewSpanReader(esSpanStore.SpanReaderParams{ Client: client, - Logger: logger, - MetricsFactory: mFactory, MaxDocCount: cfg.MaxDocCount, MaxSpanAge: cfg.MaxSpanAge, IndexPrefix: cfg.IndexPrefix, @@ -163,16 +165,18 @@ func createSpanReader( UseReadWriteAliases: cfg.UseReadWriteAliases, Archive: archive, RemoteReadClusters: cfg.RemoteReadClusters, - Tracer: otel.GetTracerProvider().Tracer("span-reader"), + Logger: logger, + MetricsFactory: mFactory, + Tracer: tp.Tracer("esSpanStore.SpanReader"), }), nil } func createSpanWriter( - mFactory metrics.Factory, - logger *zap.Logger, client es.Client, cfg *config.Configuration, archive bool, + mFactory metrics.Factory, + logger *zap.Logger, ) (spanstore.Writer, error) { var tags []string var err error @@ -199,8 +203,6 @@ func createSpanWriter( } writer := esSpanStore.NewSpanWriter(esSpanStore.SpanWriterParams{ Client: client, - Logger: logger, - MetricsFactory: mFactory, IndexPrefix: cfg.IndexPrefix, SpanIndexDateLayout: cfg.IndexDateLayoutSpans, ServiceIndexDateLayout: cfg.IndexDateLayoutServices, @@ -209,6 +211,8 @@ func createSpanWriter( TagDotReplacement: cfg.Tags.DotReplacement, Archive: archive, UseReadWriteAliases: cfg.UseReadWriteAliases, + Logger: logger, + MetricsFactory: mFactory, }) // Creating a template here would conflict with the one created for ILM resulting to no index rollover @@ -222,9 +226,9 @@ func createSpanWriter( } func createDependencyReader( - logger *zap.Logger, client es.Client, cfg *config.Configuration, + logger *zap.Logger, ) (dependencystore.Reader, error) { reader := esDepStore.NewDependencyStore(esDepStore.DependencyStoreParams{ Client: client, diff --git a/plugin/storage/es/spanstore/reader.go b/plugin/storage/es/spanstore/reader.go index 00a5a928070..d766b087670 100644 --- a/plugin/storage/es/spanstore/reader.go +++ b/plugin/storage/es/spanstore/reader.go @@ -92,7 +92,6 @@ var ( // SpanReader can query for and load traces from ElasticSearch type SpanReader struct { client es.Client - logger *zap.Logger // The age of the oldest service/operation we will look for. Because indices in ElasticSearch are by day, // this will be rounded down to UTC 00:00 of that day. maxSpanAge time.Duration @@ -108,16 +107,15 @@ type SpanReader struct { sourceFn sourceFn maxDocCount int useReadWriteAliases bool + logger *zap.Logger tracer trace.Tracer } // SpanReaderParams holds constructor params for NewSpanReader type SpanReaderParams struct { Client es.Client - Logger *zap.Logger MaxSpanAge time.Duration MaxDocCount int - MetricsFactory metrics.Factory IndexPrefix string SpanIndexDateLayout string ServiceIndexDateLayout string @@ -127,6 +125,8 @@ type SpanReaderParams struct { Archive bool UseReadWriteAliases bool RemoteReadClusters []string + MetricsFactory metrics.Factory + Logger *zap.Logger Tracer trace.Tracer } @@ -140,7 +140,6 @@ func NewSpanReader(p SpanReaderParams) *SpanReader { } return &SpanReader{ client: p.Client, - logger: p.Logger, maxSpanAge: maxSpanAge, serviceOperationStorage: NewServiceOperationStorage(p.Client, p.Logger, 0), // the decorator takes care of metrics spanIndexPrefix: indexNames(p.IndexPrefix, spanIndex), @@ -154,6 +153,7 @@ func NewSpanReader(p SpanReaderParams) *SpanReader { sourceFn: getSourceFn(p.Archive, p.MaxDocCount), maxDocCount: p.MaxDocCount, useReadWriteAliases: p.UseReadWriteAliases, + logger: p.Logger, tracer: p.Tracer, } } @@ -365,13 +365,13 @@ func (s *SpanReader) multiRead(ctx context.Context, traceIDs []model.TraceID, st ctx, childSpan := s.tracer.Start(ctx, "multiRead") defer childSpan.End() - tracesIDs := make([]string, len(traceIDs)) if childSpan.IsRecording() { + tracesIDs := make([]string, len(traceIDs)) for i, traceID := range traceIDs { tracesIDs[i] = traceID.String() } + childSpan.SetAttributes(attribute.Key("trace_ids").StringSlice(tracesIDs)) } - childSpan.SetAttributes(attribute.Key("trace_ids").StringSlice(tracesIDs)) if len(traceIDs) == 0 { return []*model.Trace{}, nil diff --git a/plugin/storage/es/spanstore/reader_test.go b/plugin/storage/es/spanstore/reader_test.go index 70f877e2919..0838a7dc039 100644 --- a/plugin/storage/es/spanstore/reader_test.go +++ b/plugin/storage/es/spanstore/reader_test.go @@ -164,14 +164,14 @@ func TestNewSpanReader(t *testing.T) { func TestSpanReaderIndices(t *testing.T) { client := &mocks.Client{} - logger, _ := testutils.NewLogger() - metricsFactory := metricstest.NewFactory(0) date := time.Date(2019, 10, 10, 5, 0, 0, 0, time.UTC) spanDataLayout := "2006-01-02-15" serviceDataLayout := "2006-01-02" spanDataLayoutFormat := date.UTC().Format(spanDataLayout) serviceDataLayoutFormat := date.UTC().Format(serviceDataLayout) - tracer := jtracer.NoOp().OTEL.Tracer("") + metricsFactory := metricstest.NewFactory(0) + logger, _ := testutils.NewLogger() + tracer := jtracer.NoOp().OTEL testCases := []struct { indices []string @@ -273,13 +273,10 @@ func TestSpanReaderIndices(t *testing.T) { }, } for _, testCase := range testCases { - testCase.params = SpanReaderParams{ - Client: client, - Logger: logger, - MetricsFactory: metricsFactory, - Tracer: tracer, - - } + testCase.params.Client = client + testCase.params.Logger = logger + testCase.params.MetricsFactory = metricsFactory + testCase.params.Tracer = tracer.Tracer("test") r := NewSpanReader(testCase.params) actualSpan := r.timeRangeIndices(r.spanIndexPrefix, r.spanIndexDateLayout, date, date, -1*time.Hour) diff --git a/plugin/storage/integration/elasticsearch_test.go b/plugin/storage/integration/elasticsearch_test.go index 80784e8cb4f..1289006438d 100644 --- a/plugin/storage/integration/elasticsearch_test.go +++ b/plugin/storage/integration/elasticsearch_test.go @@ -152,7 +152,7 @@ func (s *ESStorageIntegration) initSpanstore(allTagsAsFields, archive bool) erro TagDotReplacement: tagKeyDeDotChar, Archive: archive, MaxDocCount: defaultMaxDocCount, - Tracer: jtracer.NoOp().OTEL.Tracer(""), + Tracer: jtracer.NoOp().OTEL.Tracer("test"), }) dependencyStore := dependencystore.NewDependencyStore(dependencystore.DependencyStoreParams{ Client: client, From bdc587165037011716ffd7a8094da8d455454cb0 Mon Sep 17 00:00:00 2001 From: Afzal Ansari Date: Wed, 26 Jul 2023 17:57:07 +0000 Subject: [PATCH 06/16] sets otel to main Signed-off-by: Afzal Ansari --- cmd/all-in-one/main.go | 2 ++ cmd/query/main.go | 2 ++ plugin/storage/es/spanstore/reader.go | 2 +- 3 files changed, 5 insertions(+), 1 deletion(-) diff --git a/cmd/all-in-one/main.go b/cmd/all-in-one/main.go index 34d86f534ce..89297fdde2e 100644 --- a/cmd/all-in-one/main.go +++ b/cmd/all-in-one/main.go @@ -24,6 +24,7 @@ import ( "github.com/spf13/cobra" "github.com/spf13/viper" + "go.opentelemetry.io/otel" _ "go.uber.org/automaxprocs" "go.uber.org/zap" @@ -104,6 +105,7 @@ by default uses only in-memory database.`, if err != nil { logger.Fatal("Failed to initialize tracer", zap.Error(err)) } + otel.SetTracerProvider(tracer.OTEL) storageFactory.InitFromViper(v, logger) if err := storageFactory.Initialize(metricsFactory, logger); err != nil { diff --git a/cmd/query/main.go b/cmd/query/main.go index aadb7915787..0442dcd06a3 100644 --- a/cmd/query/main.go +++ b/cmd/query/main.go @@ -23,6 +23,7 @@ import ( "github.com/spf13/cobra" "github.com/spf13/viper" + "go.opentelemetry.io/otel" _ "go.uber.org/automaxprocs" "go.uber.org/zap" @@ -76,6 +77,7 @@ func main() { if err != nil { logger.Fatal("Failed to create tracer:", zap.Error(err)) } + otel.SetTracerProvider(jtracer.OTEL) queryOpts, err := new(app.QueryOptions).InitFromViper(v, logger) if err != nil { logger.Fatal("Failed to configure query service", zap.Error(err)) diff --git a/plugin/storage/es/spanstore/reader.go b/plugin/storage/es/spanstore/reader.go index d766b087670..c124d1391cf 100644 --- a/plugin/storage/es/spanstore/reader.go +++ b/plugin/storage/es/spanstore/reader.go @@ -331,7 +331,7 @@ func bucketToStringArray(buckets []*elastic.AggregationBucketKeyItem) ([]string, // FindTraces retrieves traces that match the traceQuery func (s *SpanReader) FindTraces(ctx context.Context, traceQuery *spanstore.TraceQueryParameters) ([]*model.Trace, error) { - ctx, span := s.tracer.Start(ctx, "GetOperations") + ctx, span := s.tracer.Start(ctx, "FindTraces") defer span.End() uniqueTraceIDs, err := s.FindTraceIDs(ctx, traceQuery) From 33ea64e5836f497c4cb61cd8e4fa53abec4f93e6 Mon Sep 17 00:00:00 2001 From: Afzal Ansari Date: Thu, 27 Jul 2023 06:06:17 +0000 Subject: [PATCH 07/16] adds test tracer to spanreader_test Signed-off-by: Afzal Ansari --- plugin/storage/es/spanstore/reader_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/plugin/storage/es/spanstore/reader_test.go b/plugin/storage/es/spanstore/reader_test.go index 0838a7dc039..f96f9d0f47b 100644 --- a/plugin/storage/es/spanstore/reader_test.go +++ b/plugin/storage/es/spanstore/reader_test.go @@ -100,6 +100,7 @@ func withSpanReader(fn func(r *spanReaderTest)) { reader: NewSpanReader(SpanReaderParams{ Client: client, Logger: zap.NewNop(), + Tracer: jtracer.NoOp().OTEL.Tracer("test"), MaxSpanAge: 0, IndexPrefix: "", TagDotReplacement: "@", @@ -119,6 +120,7 @@ func withArchiveSpanReader(readAlias bool, fn func(r *spanReaderTest)) { reader: NewSpanReader(SpanReaderParams{ Client: client, Logger: zap.NewNop(), + Tracer: jtracer.NoOp().OTEL.Tracer("test"), MaxSpanAge: 0, IndexPrefix: "", TagDotReplacement: "@", From 11840a0dee81424d9db9f7fac9906c9fbc474f5f Mon Sep 17 00:00:00 2001 From: Afzal Ansari Date: Thu, 27 Jul 2023 20:00:52 +0000 Subject: [PATCH 08/16] adds tp to readertest Signed-off-by: Afzal Ansari --- plugin/storage/es/spanstore/reader_test.go | 77 ++++++++++++++++++- .../storage/integration/elasticsearch_test.go | 23 +++++- 2 files changed, 94 insertions(+), 6 deletions(-) diff --git a/plugin/storage/es/spanstore/reader_test.go b/plugin/storage/es/spanstore/reader_test.go index f96f9d0f47b..2c02212f01d 100644 --- a/plugin/storage/es/spanstore/reader_test.go +++ b/plugin/storage/es/spanstore/reader_test.go @@ -29,12 +29,14 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/sdk/trace/tracetest" + "go.opentelemetry.io/otel/trace" "go.uber.org/zap" "github.com/jaegertracing/jaeger/internal/metricstest" "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/pkg/es/mocks" - "github.com/jaegertracing/jaeger/pkg/jtracer" "github.com/jaegertracing/jaeger/pkg/testutils" "github.com/jaegertracing/jaeger/plugin/storage/es/spanstore/dbmodel" "github.com/jaegertracing/jaeger/storage/spanstore" @@ -87,20 +89,37 @@ type spanReaderTest struct { client *mocks.Client logger *zap.Logger logBuffer *testutils.Buffer + exporter *tracetest.InMemoryExporter + closer error reader *SpanReader } +func tracerProvider() (trace.TracerProvider, *tracetest.InMemoryExporter, func() error) { + exporter := tracetest.NewInMemoryExporter() + tp := sdktrace.NewTracerProvider( + sdktrace.WithSampler(sdktrace.AlwaysSample()), + sdktrace.WithSyncer(exporter), + ) + closer := func() error { + return tp.Shutdown(context.Background()) + } + return tp, exporter, closer +} + func withSpanReader(fn func(r *spanReaderTest)) { client := &mocks.Client{} + tracer, exp, closer := tracerProvider() logger, logBuffer := testutils.NewLogger() r := &spanReaderTest{ client: client, logger: logger, logBuffer: logBuffer, + exporter: exp, + closer: closer(), reader: NewSpanReader(SpanReaderParams{ Client: client, Logger: zap.NewNop(), - Tracer: jtracer.NoOp().OTEL.Tracer("test"), + Tracer: tracer.Tracer("test"), MaxSpanAge: 0, IndexPrefix: "", TagDotReplacement: "@", @@ -112,15 +131,18 @@ func withSpanReader(fn func(r *spanReaderTest)) { func withArchiveSpanReader(readAlias bool, fn func(r *spanReaderTest)) { client := &mocks.Client{} + tracer, exp, closer := tracerProvider() logger, logBuffer := testutils.NewLogger() r := &spanReaderTest{ client: client, logger: logger, logBuffer: logBuffer, + exporter: exp, + closer: closer(), reader: NewSpanReader(SpanReaderParams{ Client: client, Logger: zap.NewNop(), - Tracer: jtracer.NoOp().OTEL.Tracer("test"), + Tracer: tracer.Tracer("test"), MaxSpanAge: 0, IndexPrefix: "", TagDotReplacement: "@", @@ -173,7 +195,7 @@ func TestSpanReaderIndices(t *testing.T) { serviceDataLayoutFormat := date.UTC().Format(serviceDataLayout) metricsFactory := metricstest.NewFactory(0) logger, _ := testutils.NewLogger() - tracer := jtracer.NoOp().OTEL + tracer, exp, closer := tracerProvider() testCases := []struct { indices []string @@ -280,6 +302,8 @@ func TestSpanReaderIndices(t *testing.T) { testCase.params.MetricsFactory = metricsFactory testCase.params.Tracer = tracer.Tracer("test") r := NewSpanReader(testCase.params) + assert.NotEmpty(t, exp.GetSpans(), "Spans recorded") + assert.NoError(t, closer()) actualSpan := r.timeRangeIndices(r.spanIndexPrefix, r.spanIndexDateLayout, date, date, -1*time.Hour) actualService := r.timeRangeIndices(r.serviceIndexPrefix, r.serviceIndexDateLayout, date, date, -24*time.Hour) @@ -303,6 +327,9 @@ func TestSpanReader_GetTrace(t *testing.T) { }, }, nil) + assert.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") + assert.NoError(t, r.closer) + trace, err := r.reader.GetTrace(context.Background(), model.NewTraceID(0, 1)) require.NoError(t, err) require.NotNil(t, trace) @@ -379,6 +406,9 @@ func TestSpanReader_multiRead_followUp_query(t *testing.T) { }, }, nil) + assert.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") + assert.NoError(t, r.closer) + traces, err := r.reader.multiRead(context.Background(), []model.TraceID{{High: 0, Low: 1}, {High: 0, Low: 2}}, date, date) require.NoError(t, err) require.NotNil(t, traces) @@ -416,6 +446,9 @@ func TestSpanReader_SearchAfter(t *testing.T) { }, }, nil).Times(2) + assert.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") + assert.NoError(t, r.closer) + trace, err := r.reader.GetTrace(context.Background(), model.NewTraceID(0, 1)) require.NoError(t, err) require.NotNil(t, trace) @@ -435,6 +468,8 @@ func TestSpanReader_GetTraceQueryError(t *testing.T) { Return(&elastic.MultiSearchResult{ Responses: []*elastic.SearchResult{}, }, nil) + assert.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") + assert.NoError(t, r.closer) trace, err := r.reader.GetTrace(context.Background(), model.NewTraceID(0, 1)) require.EqualError(t, err, "trace not found") require.Nil(t, trace) @@ -454,6 +489,9 @@ func TestSpanReader_GetTraceNilHits(t *testing.T) { }, }, nil) + assert.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") + assert.NoError(t, r.closer) + trace, err := r.reader.GetTrace(context.Background(), model.NewTraceID(0, 1)) require.EqualError(t, err, "trace not found") require.Nil(t, trace) @@ -476,6 +514,8 @@ func TestSpanReader_GetTraceInvalidSpanError(t *testing.T) { {Hits: searchHits}, }, }, nil) + assert.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") + assert.NoError(t, r.closer) trace, err := r.reader.GetTrace(context.Background(), model.NewTraceID(0, 1)) require.Error(t, err, "invalid span") @@ -500,6 +540,8 @@ func TestSpanReader_GetTraceSpanConversionError(t *testing.T) { {Hits: searchHits}, }, }, nil) + assert.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") + assert.NoError(t, r.closer) trace, err := r.reader.GetTrace(context.Background(), model.NewTraceID(0, 1)) require.Error(t, err, "span conversion error, because lacks elements") @@ -720,6 +762,9 @@ func TestSpanReader_FindTraces(t *testing.T) { }, }, nil) + assert.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") + assert.NoError(t, r.closer) + traceQuery := &spanstore.TraceQueryParameters{ ServiceName: serviceName, Tags: map[string]string{ @@ -765,6 +810,9 @@ func TestSpanReader_FindTracesInvalidQuery(t *testing.T) { }, }, nil) + assert.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") + assert.NoError(t, r.closer) + traceQuery := &spanstore.TraceQueryParameters{ ServiceName: "", Tags: map[string]string{ @@ -797,6 +845,9 @@ func TestSpanReader_FindTracesAggregationFailure(t *testing.T) { Responses: []*elastic.SearchResult{}, }, nil) + assert.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") + assert.NoError(t, r.closer) + traceQuery := &spanstore.TraceQueryParameters{ ServiceName: serviceName, Tags: map[string]string{ @@ -831,6 +882,9 @@ func TestSpanReader_FindTracesNoTraceIDs(t *testing.T) { Responses: []*elastic.SearchResult{}, }, nil) + assert.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") + assert.NoError(t, r.closer) + traceQuery := &spanstore.TraceQueryParameters{ ServiceName: serviceName, Tags: map[string]string{ @@ -864,6 +918,9 @@ func TestSpanReader_FindTracesReadTraceFailure(t *testing.T) { mockMultiSearchService(r). Return(nil, errors.New("read error")) + assert.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") + assert.NoError(t, r.closer) + traceQuery := &spanstore.TraceQueryParameters{ ServiceName: serviceName, Tags: map[string]string{ @@ -902,6 +959,9 @@ func TestSpanReader_FindTracesSpanCollectionFailure(t *testing.T) { }, }, nil) + assert.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") + assert.NoError(t, r.closer) + traceQuery := &spanstore.TraceQueryParameters{ ServiceName: serviceName, Tags: map[string]string{ @@ -1206,6 +1266,9 @@ func TestSpanReader_GetEmptyIndex(t *testing.T) { Responses: []*elastic.SearchResult{}, }, nil) + assert.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") + assert.NoError(t, r.closer) + traceQuery := &spanstore.TraceQueryParameters{ ServiceName: serviceName, Tags: map[string]string{ @@ -1231,6 +1294,9 @@ func TestSpanReader_ArchiveTraces(t *testing.T) { Responses: []*elastic.SearchResult{}, }, nil) + assert.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") + assert.NoError(t, r.closer) + trace, err := r.reader.GetTrace(context.Background(), model.TraceID{}) require.Nil(t, trace) assert.EqualError(t, err, "trace not found") @@ -1246,6 +1312,9 @@ func TestSpanReader_ArchiveTraces_ReadAlias(t *testing.T) { Responses: []*elastic.SearchResult{}, }, nil) + assert.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") + assert.NoError(t, r.closer) + trace, err := r.reader.GetTrace(context.Background(), model.TraceID{}) require.Nil(t, trace) assert.EqualError(t, err, "trace not found") diff --git a/plugin/storage/integration/elasticsearch_test.go b/plugin/storage/integration/elasticsearch_test.go index 1289006438d..b6cc77f93f6 100644 --- a/plugin/storage/integration/elasticsearch_test.go +++ b/plugin/storage/integration/elasticsearch_test.go @@ -28,12 +28,14 @@ import ( "github.com/olivere/elastic" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/sdk/trace/tracetest" + "go.opentelemetry.io/otel/trace" "go.uber.org/zap" "github.com/jaegertracing/jaeger/model" estemplate "github.com/jaegertracing/jaeger/pkg/es" eswrapper "github.com/jaegertracing/jaeger/pkg/es/wrapper" - "github.com/jaegertracing/jaeger/pkg/jtracer" "github.com/jaegertracing/jaeger/pkg/metrics" "github.com/jaegertracing/jaeger/pkg/testutils" "github.com/jaegertracing/jaeger/plugin/storage/es/dependencystore" @@ -61,6 +63,18 @@ type ESStorageIntegration struct { logger *zap.Logger } +func tracerProvider() (trace.TracerProvider, *tracetest.InMemoryExporter, func() error) { + exporter := tracetest.NewInMemoryExporter() + tp := sdktrace.NewTracerProvider( + sdktrace.WithSampler(sdktrace.AlwaysSample()), + sdktrace.WithSyncer(exporter), + ) + closer := func() error { + return tp.Shutdown(context.Background()) + } + return tp, exporter, closer +} + func (s *ESStorageIntegration) getVersion() (uint, error) { pingResult, _, err := s.client.Ping(queryURL).Do(context.Background()) if err != nil { @@ -142,6 +156,7 @@ func (s *ESStorageIntegration) initSpanstore(allTagsAsFields, archive bool) erro if err != nil { return err } + tracer, _, closer := tracerProvider() s.SpanWriter = w s.SpanReader = spanstore.NewSpanReader(spanstore.SpanReaderParams{ Client: client, @@ -152,7 +167,7 @@ func (s *ESStorageIntegration) initSpanstore(allTagsAsFields, archive bool) erro TagDotReplacement: tagKeyDeDotChar, Archive: archive, MaxDocCount: defaultMaxDocCount, - Tracer: jtracer.NoOp().OTEL.Tracer("test"), + Tracer: tracer.Tracer("test"), }) dependencyStore := dependencystore.NewDependencyStore(dependencystore.DependencyStoreParams{ Client: client, @@ -162,6 +177,10 @@ func (s *ESStorageIntegration) initSpanstore(allTagsAsFields, archive bool) erro MaxDocCount: defaultMaxDocCount, }) + if closer != nil { + return err + } + depMapping, err := mappingBuilder.GetDependenciesMappings() if err != nil { return err From e1d551506f041e6f8524b45ce961a58991df9c2a Mon Sep 17 00:00:00 2001 From: Afzal Ansari Date: Fri, 28 Jul 2023 00:43:45 +0000 Subject: [PATCH 09/16] makes closer as err Signed-off-by: Afzal Ansari --- plugin/storage/es/spanstore/reader_test.go | 82 +++++++++------------- 1 file changed, 35 insertions(+), 47 deletions(-) diff --git a/plugin/storage/es/spanstore/reader_test.go b/plugin/storage/es/spanstore/reader_test.go index 2c02212f01d..1de9370e471 100644 --- a/plugin/storage/es/spanstore/reader_test.go +++ b/plugin/storage/es/spanstore/reader_test.go @@ -90,7 +90,7 @@ type spanReaderTest struct { logger *zap.Logger logBuffer *testutils.Buffer exporter *tracetest.InMemoryExporter - closer error + err error reader *SpanReader } @@ -115,7 +115,7 @@ func withSpanReader(fn func(r *spanReaderTest)) { logger: logger, logBuffer: logBuffer, exporter: exp, - closer: closer(), + err: closer(), reader: NewSpanReader(SpanReaderParams{ Client: client, Logger: zap.NewNop(), @@ -138,7 +138,7 @@ func withArchiveSpanReader(readAlias bool, fn func(r *spanReaderTest)) { logger: logger, logBuffer: logBuffer, exporter: exp, - closer: closer(), + err: closer(), reader: NewSpanReader(SpanReaderParams{ Client: client, Logger: zap.NewNop(), @@ -303,12 +303,12 @@ func TestSpanReaderIndices(t *testing.T) { testCase.params.Tracer = tracer.Tracer("test") r := NewSpanReader(testCase.params) assert.NotEmpty(t, exp.GetSpans(), "Spans recorded") - assert.NoError(t, closer()) actualSpan := r.timeRangeIndices(r.spanIndexPrefix, r.spanIndexDateLayout, date, date, -1*time.Hour) actualService := r.timeRangeIndices(r.serviceIndexPrefix, r.serviceIndexDateLayout, date, date, -24*time.Hour) assert.Equal(t, testCase.indices, append(actualSpan, actualService...)) } + require.NoError(t, closer()) } func TestSpanReader_GetTrace(t *testing.T) { @@ -327,10 +327,9 @@ func TestSpanReader_GetTrace(t *testing.T) { }, }, nil) - assert.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") - assert.NoError(t, r.closer) - trace, err := r.reader.GetTrace(context.Background(), model.NewTraceID(0, 1)) + assert.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") + require.NoError(t, r.err) require.NoError(t, err) require.NotNil(t, trace) @@ -406,10 +405,9 @@ func TestSpanReader_multiRead_followUp_query(t *testing.T) { }, }, nil) - assert.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") - assert.NoError(t, r.closer) - traces, err := r.reader.multiRead(context.Background(), []model.TraceID{{High: 0, Low: 1}, {High: 0, Low: 2}}, date, date) + assert.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") + require.NoError(t, r.err) require.NoError(t, err) require.NotNil(t, traces) require.Len(t, traces, 2) @@ -446,10 +444,9 @@ func TestSpanReader_SearchAfter(t *testing.T) { }, }, nil).Times(2) - assert.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") - assert.NoError(t, r.closer) - trace, err := r.reader.GetTrace(context.Background(), model.NewTraceID(0, 1)) + assert.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") + require.NoError(t, r.err) require.NoError(t, err) require.NotNil(t, trace) @@ -468,9 +465,9 @@ func TestSpanReader_GetTraceQueryError(t *testing.T) { Return(&elastic.MultiSearchResult{ Responses: []*elastic.SearchResult{}, }, nil) - assert.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") - assert.NoError(t, r.closer) trace, err := r.reader.GetTrace(context.Background(), model.NewTraceID(0, 1)) + assert.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") + require.NoError(t, r.err) require.EqualError(t, err, "trace not found") require.Nil(t, trace) }) @@ -489,10 +486,9 @@ func TestSpanReader_GetTraceNilHits(t *testing.T) { }, }, nil) - assert.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") - assert.NoError(t, r.closer) - trace, err := r.reader.GetTrace(context.Background(), model.NewTraceID(0, 1)) + assert.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") + require.NoError(t, r.err) require.EqualError(t, err, "trace not found") require.Nil(t, trace) }) @@ -514,10 +510,10 @@ func TestSpanReader_GetTraceInvalidSpanError(t *testing.T) { {Hits: searchHits}, }, }, nil) - assert.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") - assert.NoError(t, r.closer) trace, err := r.reader.GetTrace(context.Background(), model.NewTraceID(0, 1)) + assert.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") + require.NoError(t, r.err) require.Error(t, err, "invalid span") require.Nil(t, trace) }) @@ -540,10 +536,10 @@ func TestSpanReader_GetTraceSpanConversionError(t *testing.T) { {Hits: searchHits}, }, }, nil) - assert.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") - assert.NoError(t, r.closer) trace, err := r.reader.GetTrace(context.Background(), model.NewTraceID(0, 1)) + assert.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") + require.NoError(t, r.err) require.Error(t, err, "span conversion error, because lacks elements") require.Nil(t, trace) }) @@ -762,9 +758,6 @@ func TestSpanReader_FindTraces(t *testing.T) { }, }, nil) - assert.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") - assert.NoError(t, r.closer) - traceQuery := &spanstore.TraceQueryParameters{ ServiceName: serviceName, Tags: map[string]string{ @@ -776,6 +769,8 @@ func TestSpanReader_FindTraces(t *testing.T) { } traces, err := r.reader.FindTraces(context.Background(), traceQuery) + assert.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") + require.NoError(t, r.err) require.NoError(t, err) assert.Len(t, traces, 1) @@ -810,9 +805,6 @@ func TestSpanReader_FindTracesInvalidQuery(t *testing.T) { }, }, nil) - assert.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") - assert.NoError(t, r.closer) - traceQuery := &spanstore.TraceQueryParameters{ ServiceName: "", Tags: map[string]string{ @@ -823,6 +815,8 @@ func TestSpanReader_FindTracesInvalidQuery(t *testing.T) { } traces, err := r.reader.FindTraces(context.Background(), traceQuery) + assert.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") + require.NoError(t, r.err) require.Error(t, err) assert.Nil(t, traces) }) @@ -845,9 +839,6 @@ func TestSpanReader_FindTracesAggregationFailure(t *testing.T) { Responses: []*elastic.SearchResult{}, }, nil) - assert.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") - assert.NoError(t, r.closer) - traceQuery := &spanstore.TraceQueryParameters{ ServiceName: serviceName, Tags: map[string]string{ @@ -858,6 +849,8 @@ func TestSpanReader_FindTracesAggregationFailure(t *testing.T) { } traces, err := r.reader.FindTraces(context.Background(), traceQuery) + assert.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") + require.NoError(t, r.err) require.Error(t, err) assert.Nil(t, traces) }) @@ -882,9 +875,6 @@ func TestSpanReader_FindTracesNoTraceIDs(t *testing.T) { Responses: []*elastic.SearchResult{}, }, nil) - assert.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") - assert.NoError(t, r.closer) - traceQuery := &spanstore.TraceQueryParameters{ ServiceName: serviceName, Tags: map[string]string{ @@ -895,6 +885,8 @@ func TestSpanReader_FindTracesNoTraceIDs(t *testing.T) { } traces, err := r.reader.FindTraces(context.Background(), traceQuery) + assert.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") + require.NoError(t, r.err) require.NoError(t, err) assert.Len(t, traces, 0) }) @@ -918,9 +910,6 @@ func TestSpanReader_FindTracesReadTraceFailure(t *testing.T) { mockMultiSearchService(r). Return(nil, errors.New("read error")) - assert.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") - assert.NoError(t, r.closer) - traceQuery := &spanstore.TraceQueryParameters{ ServiceName: serviceName, Tags: map[string]string{ @@ -931,6 +920,8 @@ func TestSpanReader_FindTracesReadTraceFailure(t *testing.T) { } traces, err := r.reader.FindTraces(context.Background(), traceQuery) + assert.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") + require.NoError(t, r.err) require.EqualError(t, err, "read error") assert.Len(t, traces, 0) }) @@ -959,9 +950,6 @@ func TestSpanReader_FindTracesSpanCollectionFailure(t *testing.T) { }, }, nil) - assert.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") - assert.NoError(t, r.closer) - traceQuery := &spanstore.TraceQueryParameters{ ServiceName: serviceName, Tags: map[string]string{ @@ -972,6 +960,8 @@ func TestSpanReader_FindTracesSpanCollectionFailure(t *testing.T) { } traces, err := r.reader.FindTraces(context.Background(), traceQuery) + assert.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") + require.NoError(t, r.err) require.Error(t, err) assert.Len(t, traces, 0) }) @@ -1266,9 +1256,6 @@ func TestSpanReader_GetEmptyIndex(t *testing.T) { Responses: []*elastic.SearchResult{}, }, nil) - assert.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") - assert.NoError(t, r.closer) - traceQuery := &spanstore.TraceQueryParameters{ ServiceName: serviceName, Tags: map[string]string{ @@ -1280,6 +1267,8 @@ func TestSpanReader_GetEmptyIndex(t *testing.T) { } services, err := r.reader.FindTraces(context.Background(), traceQuery) + assert.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") + require.NoError(t, r.err) require.NoError(t, err) assert.Empty(t, services) }) @@ -1294,10 +1283,9 @@ func TestSpanReader_ArchiveTraces(t *testing.T) { Responses: []*elastic.SearchResult{}, }, nil) - assert.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") - assert.NoError(t, r.closer) - trace, err := r.reader.GetTrace(context.Background(), model.TraceID{}) + assert.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") + require.NoError(t, r.err) require.Nil(t, trace) assert.EqualError(t, err, "trace not found") }) @@ -1313,7 +1301,7 @@ func TestSpanReader_ArchiveTraces_ReadAlias(t *testing.T) { }, nil) assert.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") - assert.NoError(t, r.closer) + require.NoError(t, r.err) trace, err := r.reader.GetTrace(context.Background(), model.TraceID{}) require.Nil(t, trace) From 03ee675e2c4012f14a34ea0576d2df3a12e9bd02 Mon Sep 17 00:00:00 2001 From: Afzal Ansari Date: Fri, 28 Jul 2023 18:07:54 +0000 Subject: [PATCH 10/16] adds otel tracer to reader Signed-off-by: Afzal Ansari --- plugin/storage/es/spanstore/reader.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/plugin/storage/es/spanstore/reader.go b/plugin/storage/es/spanstore/reader.go index c124d1391cf..f3746c6022b 100644 --- a/plugin/storage/es/spanstore/reader.go +++ b/plugin/storage/es/spanstore/reader.go @@ -24,6 +24,7 @@ import ( "time" "github.com/olivere/elastic" + "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" "go.uber.org/zap" @@ -138,6 +139,9 @@ func NewSpanReader(p SpanReaderParams) *SpanReader { if p.UseReadWriteAliases { maxSpanAge = rolloverMaxSpanAge } + if p.Tracer == nil { + p.Tracer = otel.Tracer("eSpanstore.SpanReader") + } return &SpanReader{ client: p.Client, maxSpanAge: maxSpanAge, From 52d50696302a32845a682659e30f0beb34fab400 Mon Sep 17 00:00:00 2001 From: Afzal Ansari Date: Fri, 28 Jul 2023 18:08:26 +0000 Subject: [PATCH 11/16] replace assert with require Signed-off-by: Afzal Ansari --- plugin/storage/es/spanstore/reader_test.go | 32 +++++++++++----------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/plugin/storage/es/spanstore/reader_test.go b/plugin/storage/es/spanstore/reader_test.go index 1de9370e471..852c1db1134 100644 --- a/plugin/storage/es/spanstore/reader_test.go +++ b/plugin/storage/es/spanstore/reader_test.go @@ -328,7 +328,7 @@ func TestSpanReader_GetTrace(t *testing.T) { }, nil) trace, err := r.reader.GetTrace(context.Background(), model.NewTraceID(0, 1)) - assert.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") + require.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") require.NoError(t, r.err) require.NoError(t, err) require.NotNil(t, trace) @@ -406,7 +406,7 @@ func TestSpanReader_multiRead_followUp_query(t *testing.T) { }, nil) traces, err := r.reader.multiRead(context.Background(), []model.TraceID{{High: 0, Low: 1}, {High: 0, Low: 2}}, date, date) - assert.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") + require.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") require.NoError(t, r.err) require.NoError(t, err) require.NotNil(t, traces) @@ -445,7 +445,7 @@ func TestSpanReader_SearchAfter(t *testing.T) { }, nil).Times(2) trace, err := r.reader.GetTrace(context.Background(), model.NewTraceID(0, 1)) - assert.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") + require.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") require.NoError(t, r.err) require.NoError(t, err) require.NotNil(t, trace) @@ -466,7 +466,7 @@ func TestSpanReader_GetTraceQueryError(t *testing.T) { Responses: []*elastic.SearchResult{}, }, nil) trace, err := r.reader.GetTrace(context.Background(), model.NewTraceID(0, 1)) - assert.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") + require.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") require.NoError(t, r.err) require.EqualError(t, err, "trace not found") require.Nil(t, trace) @@ -487,7 +487,7 @@ func TestSpanReader_GetTraceNilHits(t *testing.T) { }, nil) trace, err := r.reader.GetTrace(context.Background(), model.NewTraceID(0, 1)) - assert.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") + require.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") require.NoError(t, r.err) require.EqualError(t, err, "trace not found") require.Nil(t, trace) @@ -512,7 +512,7 @@ func TestSpanReader_GetTraceInvalidSpanError(t *testing.T) { }, nil) trace, err := r.reader.GetTrace(context.Background(), model.NewTraceID(0, 1)) - assert.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") + require.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") require.NoError(t, r.err) require.Error(t, err, "invalid span") require.Nil(t, trace) @@ -538,7 +538,7 @@ func TestSpanReader_GetTraceSpanConversionError(t *testing.T) { }, nil) trace, err := r.reader.GetTrace(context.Background(), model.NewTraceID(0, 1)) - assert.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") + require.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") require.NoError(t, r.err) require.Error(t, err, "span conversion error, because lacks elements") require.Nil(t, trace) @@ -769,7 +769,7 @@ func TestSpanReader_FindTraces(t *testing.T) { } traces, err := r.reader.FindTraces(context.Background(), traceQuery) - assert.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") + require.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") require.NoError(t, r.err) require.NoError(t, err) assert.Len(t, traces, 1) @@ -815,7 +815,7 @@ func TestSpanReader_FindTracesInvalidQuery(t *testing.T) { } traces, err := r.reader.FindTraces(context.Background(), traceQuery) - assert.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") + require.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") require.NoError(t, r.err) require.Error(t, err) assert.Nil(t, traces) @@ -849,7 +849,7 @@ func TestSpanReader_FindTracesAggregationFailure(t *testing.T) { } traces, err := r.reader.FindTraces(context.Background(), traceQuery) - assert.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") + require.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") require.NoError(t, r.err) require.Error(t, err) assert.Nil(t, traces) @@ -885,7 +885,7 @@ func TestSpanReader_FindTracesNoTraceIDs(t *testing.T) { } traces, err := r.reader.FindTraces(context.Background(), traceQuery) - assert.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") + require.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") require.NoError(t, r.err) require.NoError(t, err) assert.Len(t, traces, 0) @@ -920,7 +920,7 @@ func TestSpanReader_FindTracesReadTraceFailure(t *testing.T) { } traces, err := r.reader.FindTraces(context.Background(), traceQuery) - assert.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") + require.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") require.NoError(t, r.err) require.EqualError(t, err, "read error") assert.Len(t, traces, 0) @@ -960,7 +960,7 @@ func TestSpanReader_FindTracesSpanCollectionFailure(t *testing.T) { } traces, err := r.reader.FindTraces(context.Background(), traceQuery) - assert.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") + require.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") require.NoError(t, r.err) require.Error(t, err) assert.Len(t, traces, 0) @@ -1267,7 +1267,7 @@ func TestSpanReader_GetEmptyIndex(t *testing.T) { } services, err := r.reader.FindTraces(context.Background(), traceQuery) - assert.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") + require.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") require.NoError(t, r.err) require.NoError(t, err) assert.Empty(t, services) @@ -1284,7 +1284,7 @@ func TestSpanReader_ArchiveTraces(t *testing.T) { }, nil) trace, err := r.reader.GetTrace(context.Background(), model.TraceID{}) - assert.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") + require.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") require.NoError(t, r.err) require.Nil(t, trace) assert.EqualError(t, err, "trace not found") @@ -1300,7 +1300,7 @@ func TestSpanReader_ArchiveTraces_ReadAlias(t *testing.T) { Responses: []*elastic.SearchResult{}, }, nil) - assert.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") + require.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") require.NoError(t, r.err) trace, err := r.reader.GetTrace(context.Background(), model.TraceID{}) From 8def751c50bea2ea268088267430d00ef9a44d76 Mon Sep 17 00:00:00 2001 From: Afzal Ansari Date: Fri, 28 Jul 2023 18:36:07 +0000 Subject: [PATCH 12/16] updates closer() call Signed-off-by: Afzal Ansari --- plugin/storage/es/spanstore/reader_test.go | 44 +++++++++++----------- 1 file changed, 21 insertions(+), 23 deletions(-) diff --git a/plugin/storage/es/spanstore/reader_test.go b/plugin/storage/es/spanstore/reader_test.go index 852c1db1134..250cfeeba9a 100644 --- a/plugin/storage/es/spanstore/reader_test.go +++ b/plugin/storage/es/spanstore/reader_test.go @@ -90,7 +90,7 @@ type spanReaderTest struct { logger *zap.Logger logBuffer *testutils.Buffer exporter *tracetest.InMemoryExporter - err error + err func() error reader *SpanReader } @@ -115,7 +115,7 @@ func withSpanReader(fn func(r *spanReaderTest)) { logger: logger, logBuffer: logBuffer, exporter: exp, - err: closer(), + err: closer, reader: NewSpanReader(SpanReaderParams{ Client: client, Logger: zap.NewNop(), @@ -138,7 +138,7 @@ func withArchiveSpanReader(readAlias bool, fn func(r *spanReaderTest)) { logger: logger, logBuffer: logBuffer, exporter: exp, - err: closer(), + err: closer, reader: NewSpanReader(SpanReaderParams{ Client: client, Logger: zap.NewNop(), @@ -195,7 +195,7 @@ func TestSpanReaderIndices(t *testing.T) { serviceDataLayoutFormat := date.UTC().Format(serviceDataLayout) metricsFactory := metricstest.NewFactory(0) logger, _ := testutils.NewLogger() - tracer, exp, closer := tracerProvider() + tracer, _, closer := tracerProvider() testCases := []struct { indices []string @@ -302,7 +302,6 @@ func TestSpanReaderIndices(t *testing.T) { testCase.params.MetricsFactory = metricsFactory testCase.params.Tracer = tracer.Tracer("test") r := NewSpanReader(testCase.params) - assert.NotEmpty(t, exp.GetSpans(), "Spans recorded") actualSpan := r.timeRangeIndices(r.spanIndexPrefix, r.spanIndexDateLayout, date, date, -1*time.Hour) actualService := r.timeRangeIndices(r.serviceIndexPrefix, r.serviceIndexDateLayout, date, date, -24*time.Hour) @@ -329,7 +328,7 @@ func TestSpanReader_GetTrace(t *testing.T) { trace, err := r.reader.GetTrace(context.Background(), model.NewTraceID(0, 1)) require.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") - require.NoError(t, r.err) + require.NoError(t, r.err()) require.NoError(t, err) require.NotNil(t, trace) @@ -407,7 +406,7 @@ func TestSpanReader_multiRead_followUp_query(t *testing.T) { traces, err := r.reader.multiRead(context.Background(), []model.TraceID{{High: 0, Low: 1}, {High: 0, Low: 2}}, date, date) require.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") - require.NoError(t, r.err) + require.NoError(t, r.err()) require.NoError(t, err) require.NotNil(t, traces) require.Len(t, traces, 2) @@ -446,7 +445,7 @@ func TestSpanReader_SearchAfter(t *testing.T) { trace, err := r.reader.GetTrace(context.Background(), model.NewTraceID(0, 1)) require.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") - require.NoError(t, r.err) + require.NoError(t, r.err()) require.NoError(t, err) require.NotNil(t, trace) @@ -467,7 +466,7 @@ func TestSpanReader_GetTraceQueryError(t *testing.T) { }, nil) trace, err := r.reader.GetTrace(context.Background(), model.NewTraceID(0, 1)) require.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") - require.NoError(t, r.err) + require.NoError(t, r.err()) require.EqualError(t, err, "trace not found") require.Nil(t, trace) }) @@ -488,7 +487,7 @@ func TestSpanReader_GetTraceNilHits(t *testing.T) { trace, err := r.reader.GetTrace(context.Background(), model.NewTraceID(0, 1)) require.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") - require.NoError(t, r.err) + require.NoError(t, r.err()) require.EqualError(t, err, "trace not found") require.Nil(t, trace) }) @@ -513,7 +512,7 @@ func TestSpanReader_GetTraceInvalidSpanError(t *testing.T) { trace, err := r.reader.GetTrace(context.Background(), model.NewTraceID(0, 1)) require.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") - require.NoError(t, r.err) + require.NoError(t, r.err()) require.Error(t, err, "invalid span") require.Nil(t, trace) }) @@ -539,7 +538,7 @@ func TestSpanReader_GetTraceSpanConversionError(t *testing.T) { trace, err := r.reader.GetTrace(context.Background(), model.NewTraceID(0, 1)) require.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") - require.NoError(t, r.err) + require.NoError(t, r.err()) require.Error(t, err, "span conversion error, because lacks elements") require.Nil(t, trace) }) @@ -770,7 +769,7 @@ func TestSpanReader_FindTraces(t *testing.T) { traces, err := r.reader.FindTraces(context.Background(), traceQuery) require.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") - require.NoError(t, r.err) + require.NoError(t, r.err()) require.NoError(t, err) assert.Len(t, traces, 1) @@ -816,7 +815,7 @@ func TestSpanReader_FindTracesInvalidQuery(t *testing.T) { traces, err := r.reader.FindTraces(context.Background(), traceQuery) require.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") - require.NoError(t, r.err) + require.NoError(t, r.err()) require.Error(t, err) assert.Nil(t, traces) }) @@ -850,7 +849,7 @@ func TestSpanReader_FindTracesAggregationFailure(t *testing.T) { traces, err := r.reader.FindTraces(context.Background(), traceQuery) require.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") - require.NoError(t, r.err) + require.NoError(t, r.err()) require.Error(t, err) assert.Nil(t, traces) }) @@ -886,7 +885,7 @@ func TestSpanReader_FindTracesNoTraceIDs(t *testing.T) { traces, err := r.reader.FindTraces(context.Background(), traceQuery) require.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") - require.NoError(t, r.err) + require.NoError(t, r.err()) require.NoError(t, err) assert.Len(t, traces, 0) }) @@ -921,7 +920,7 @@ func TestSpanReader_FindTracesReadTraceFailure(t *testing.T) { traces, err := r.reader.FindTraces(context.Background(), traceQuery) require.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") - require.NoError(t, r.err) + require.NoError(t, r.err()) require.EqualError(t, err, "read error") assert.Len(t, traces, 0) }) @@ -961,7 +960,7 @@ func TestSpanReader_FindTracesSpanCollectionFailure(t *testing.T) { traces, err := r.reader.FindTraces(context.Background(), traceQuery) require.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") - require.NoError(t, r.err) + require.NoError(t, r.err()) require.Error(t, err) assert.Len(t, traces, 0) }) @@ -1268,7 +1267,7 @@ func TestSpanReader_GetEmptyIndex(t *testing.T) { services, err := r.reader.FindTraces(context.Background(), traceQuery) require.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") - require.NoError(t, r.err) + require.NoError(t, r.err()) require.NoError(t, err) assert.Empty(t, services) }) @@ -1285,7 +1284,7 @@ func TestSpanReader_ArchiveTraces(t *testing.T) { trace, err := r.reader.GetTrace(context.Background(), model.TraceID{}) require.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") - require.NoError(t, r.err) + require.NoError(t, r.err()) require.Nil(t, trace) assert.EqualError(t, err, "trace not found") }) @@ -1300,10 +1299,9 @@ func TestSpanReader_ArchiveTraces_ReadAlias(t *testing.T) { Responses: []*elastic.SearchResult{}, }, nil) - require.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") - require.NoError(t, r.err) - trace, err := r.reader.GetTrace(context.Background(), model.TraceID{}) + require.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") + require.NoError(t, r.err()) require.Nil(t, trace) assert.EqualError(t, err, "trace not found") }) From 0d078fb26bf5865bc561c1b47de77db2725f5578 Mon Sep 17 00:00:00 2001 From: Afzal Ansari Date: Fri, 28 Jul 2023 23:51:19 +0000 Subject: [PATCH 13/16] adds type closer Signed-off-by: Afzal Ansari --- plugin/storage/es/spanstore/reader_test.go | 64 +++++++++++----------- 1 file changed, 32 insertions(+), 32 deletions(-) diff --git a/plugin/storage/es/spanstore/reader_test.go b/plugin/storage/es/spanstore/reader_test.go index 250cfeeba9a..a449c8f7738 100644 --- a/plugin/storage/es/spanstore/reader_test.go +++ b/plugin/storage/es/spanstore/reader_test.go @@ -86,12 +86,12 @@ var exampleESSpan = []byte( }`) type spanReaderTest struct { - client *mocks.Client - logger *zap.Logger - logBuffer *testutils.Buffer - exporter *tracetest.InMemoryExporter - err func() error - reader *SpanReader + client *mocks.Client + logger *zap.Logger + logBuffer *testutils.Buffer + exporter *tracetest.InMemoryExporter + tracerCloser func() error + reader *SpanReader } func tracerProvider() (trace.TracerProvider, *tracetest.InMemoryExporter, func() error) { @@ -111,11 +111,11 @@ func withSpanReader(fn func(r *spanReaderTest)) { tracer, exp, closer := tracerProvider() logger, logBuffer := testutils.NewLogger() r := &spanReaderTest{ - client: client, - logger: logger, - logBuffer: logBuffer, - exporter: exp, - err: closer, + client: client, + logger: logger, + logBuffer: logBuffer, + exporter: exp, + tracerCloser: closer, reader: NewSpanReader(SpanReaderParams{ Client: client, Logger: zap.NewNop(), @@ -134,11 +134,11 @@ func withArchiveSpanReader(readAlias bool, fn func(r *spanReaderTest)) { tracer, exp, closer := tracerProvider() logger, logBuffer := testutils.NewLogger() r := &spanReaderTest{ - client: client, - logger: logger, - logBuffer: logBuffer, - exporter: exp, - err: closer, + client: client, + logger: logger, + logBuffer: logBuffer, + exporter: exp, + tracerCloser: closer, reader: NewSpanReader(SpanReaderParams{ Client: client, Logger: zap.NewNop(), @@ -328,7 +328,7 @@ func TestSpanReader_GetTrace(t *testing.T) { trace, err := r.reader.GetTrace(context.Background(), model.NewTraceID(0, 1)) require.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") - require.NoError(t, r.err()) + require.NoError(t, r.tracerCloser()) require.NoError(t, err) require.NotNil(t, trace) @@ -406,7 +406,7 @@ func TestSpanReader_multiRead_followUp_query(t *testing.T) { traces, err := r.reader.multiRead(context.Background(), []model.TraceID{{High: 0, Low: 1}, {High: 0, Low: 2}}, date, date) require.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") - require.NoError(t, r.err()) + require.NoError(t, r.tracerCloser()) require.NoError(t, err) require.NotNil(t, traces) require.Len(t, traces, 2) @@ -445,7 +445,7 @@ func TestSpanReader_SearchAfter(t *testing.T) { trace, err := r.reader.GetTrace(context.Background(), model.NewTraceID(0, 1)) require.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") - require.NoError(t, r.err()) + require.NoError(t, r.tracerCloser()) require.NoError(t, err) require.NotNil(t, trace) @@ -466,7 +466,7 @@ func TestSpanReader_GetTraceQueryError(t *testing.T) { }, nil) trace, err := r.reader.GetTrace(context.Background(), model.NewTraceID(0, 1)) require.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") - require.NoError(t, r.err()) + require.NoError(t, r.tracerCloser()) require.EqualError(t, err, "trace not found") require.Nil(t, trace) }) @@ -487,7 +487,7 @@ func TestSpanReader_GetTraceNilHits(t *testing.T) { trace, err := r.reader.GetTrace(context.Background(), model.NewTraceID(0, 1)) require.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") - require.NoError(t, r.err()) + require.NoError(t, r.tracerCloser()) require.EqualError(t, err, "trace not found") require.Nil(t, trace) }) @@ -512,7 +512,7 @@ func TestSpanReader_GetTraceInvalidSpanError(t *testing.T) { trace, err := r.reader.GetTrace(context.Background(), model.NewTraceID(0, 1)) require.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") - require.NoError(t, r.err()) + require.NoError(t, r.tracerCloser()) require.Error(t, err, "invalid span") require.Nil(t, trace) }) @@ -538,7 +538,7 @@ func TestSpanReader_GetTraceSpanConversionError(t *testing.T) { trace, err := r.reader.GetTrace(context.Background(), model.NewTraceID(0, 1)) require.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") - require.NoError(t, r.err()) + require.NoError(t, r.tracerCloser()) require.Error(t, err, "span conversion error, because lacks elements") require.Nil(t, trace) }) @@ -769,7 +769,7 @@ func TestSpanReader_FindTraces(t *testing.T) { traces, err := r.reader.FindTraces(context.Background(), traceQuery) require.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") - require.NoError(t, r.err()) + require.NoError(t, r.tracerCloser()) require.NoError(t, err) assert.Len(t, traces, 1) @@ -815,7 +815,7 @@ func TestSpanReader_FindTracesInvalidQuery(t *testing.T) { traces, err := r.reader.FindTraces(context.Background(), traceQuery) require.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") - require.NoError(t, r.err()) + require.NoError(t, r.tracerCloser()) require.Error(t, err) assert.Nil(t, traces) }) @@ -849,7 +849,7 @@ func TestSpanReader_FindTracesAggregationFailure(t *testing.T) { traces, err := r.reader.FindTraces(context.Background(), traceQuery) require.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") - require.NoError(t, r.err()) + require.NoError(t, r.tracerCloser()) require.Error(t, err) assert.Nil(t, traces) }) @@ -885,7 +885,7 @@ func TestSpanReader_FindTracesNoTraceIDs(t *testing.T) { traces, err := r.reader.FindTraces(context.Background(), traceQuery) require.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") - require.NoError(t, r.err()) + require.NoError(t, r.tracerCloser()) require.NoError(t, err) assert.Len(t, traces, 0) }) @@ -920,7 +920,7 @@ func TestSpanReader_FindTracesReadTraceFailure(t *testing.T) { traces, err := r.reader.FindTraces(context.Background(), traceQuery) require.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") - require.NoError(t, r.err()) + require.NoError(t, r.tracerCloser()) require.EqualError(t, err, "read error") assert.Len(t, traces, 0) }) @@ -960,7 +960,7 @@ func TestSpanReader_FindTracesSpanCollectionFailure(t *testing.T) { traces, err := r.reader.FindTraces(context.Background(), traceQuery) require.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") - require.NoError(t, r.err()) + require.NoError(t, r.tracerCloser()) require.Error(t, err) assert.Len(t, traces, 0) }) @@ -1267,7 +1267,7 @@ func TestSpanReader_GetEmptyIndex(t *testing.T) { services, err := r.reader.FindTraces(context.Background(), traceQuery) require.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") - require.NoError(t, r.err()) + require.NoError(t, r.tracerCloser()) require.NoError(t, err) assert.Empty(t, services) }) @@ -1284,7 +1284,7 @@ func TestSpanReader_ArchiveTraces(t *testing.T) { trace, err := r.reader.GetTrace(context.Background(), model.TraceID{}) require.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") - require.NoError(t, r.err()) + require.NoError(t, r.tracerCloser()) require.Nil(t, trace) assert.EqualError(t, err, "trace not found") }) @@ -1301,7 +1301,7 @@ func TestSpanReader_ArchiveTraces_ReadAlias(t *testing.T) { trace, err := r.reader.GetTrace(context.Background(), model.TraceID{}) require.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") - require.NoError(t, r.err()) + require.NoError(t, r.tracerCloser()) require.Nil(t, trace) assert.EqualError(t, err, "trace not found") }) From f3e92fa2b1e0e0821fe8e656452d8025fd673997 Mon Sep 17 00:00:00 2001 From: Afzal <94980910+afzalbin64@users.noreply.github.com> Date: Sun, 30 Jul 2023 17:58:11 +0000 Subject: [PATCH 14/16] rmvs unwanted object init Signed-off-by: Afzal <94980910+afzalbin64@users.noreply.github.com> --- plugin/storage/es/spanstore/reader.go | 4 -- plugin/storage/es/spanstore/reader_test.go | 79 +++++++++------------- 2 files changed, 31 insertions(+), 52 deletions(-) diff --git a/plugin/storage/es/spanstore/reader.go b/plugin/storage/es/spanstore/reader.go index f3746c6022b..c124d1391cf 100644 --- a/plugin/storage/es/spanstore/reader.go +++ b/plugin/storage/es/spanstore/reader.go @@ -24,7 +24,6 @@ import ( "time" "github.com/olivere/elastic" - "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" "go.uber.org/zap" @@ -139,9 +138,6 @@ func NewSpanReader(p SpanReaderParams) *SpanReader { if p.UseReadWriteAliases { maxSpanAge = rolloverMaxSpanAge } - if p.Tracer == nil { - p.Tracer = otel.Tracer("eSpanstore.SpanReader") - } return &SpanReader{ client: p.Client, maxSpanAge: maxSpanAge, diff --git a/plugin/storage/es/spanstore/reader_test.go b/plugin/storage/es/spanstore/reader_test.go index a449c8f7738..3d358d8ef10 100644 --- a/plugin/storage/es/spanstore/reader_test.go +++ b/plugin/storage/es/spanstore/reader_test.go @@ -86,12 +86,11 @@ var exampleESSpan = []byte( }`) type spanReaderTest struct { - client *mocks.Client - logger *zap.Logger - logBuffer *testutils.Buffer - exporter *tracetest.InMemoryExporter - tracerCloser func() error - reader *SpanReader + client *mocks.Client + logger *zap.Logger + logBuffer *testutils.Buffer + traceBuffer *tracetest.InMemoryExporter + reader *SpanReader } func tracerProvider() (trace.TracerProvider, *tracetest.InMemoryExporter, func() error) { @@ -111,11 +110,10 @@ func withSpanReader(fn func(r *spanReaderTest)) { tracer, exp, closer := tracerProvider() logger, logBuffer := testutils.NewLogger() r := &spanReaderTest{ - client: client, - logger: logger, - logBuffer: logBuffer, - exporter: exp, - tracerCloser: closer, + client: client, + logger: logger, + logBuffer: logBuffer, + traceBuffer: exp, reader: NewSpanReader(SpanReaderParams{ Client: client, Logger: zap.NewNop(), @@ -126,6 +124,7 @@ func withSpanReader(fn func(r *spanReaderTest)) { MaxDocCount: defaultMaxDocCount, }), } + defer closer() fn(r) } @@ -134,11 +133,10 @@ func withArchiveSpanReader(readAlias bool, fn func(r *spanReaderTest)) { tracer, exp, closer := tracerProvider() logger, logBuffer := testutils.NewLogger() r := &spanReaderTest{ - client: client, - logger: logger, - logBuffer: logBuffer, - exporter: exp, - tracerCloser: closer, + client: client, + logger: logger, + logBuffer: logBuffer, + traceBuffer: exp, reader: NewSpanReader(SpanReaderParams{ Client: client, Logger: zap.NewNop(), @@ -150,6 +148,7 @@ func withArchiveSpanReader(readAlias bool, fn func(r *spanReaderTest)) { UseReadWriteAliases: readAlias, }), } + defer closer() fn(r) } @@ -327,8 +326,7 @@ func TestSpanReader_GetTrace(t *testing.T) { }, nil) trace, err := r.reader.GetTrace(context.Background(), model.NewTraceID(0, 1)) - require.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") - require.NoError(t, r.tracerCloser()) + require.NotEmpty(t, r.traceBuffer.GetSpans(), "Spans recorded") require.NoError(t, err) require.NotNil(t, trace) @@ -405,8 +403,7 @@ func TestSpanReader_multiRead_followUp_query(t *testing.T) { }, nil) traces, err := r.reader.multiRead(context.Background(), []model.TraceID{{High: 0, Low: 1}, {High: 0, Low: 2}}, date, date) - require.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") - require.NoError(t, r.tracerCloser()) + require.NotEmpty(t, r.traceBuffer.GetSpans(), "Spans recorded") require.NoError(t, err) require.NotNil(t, traces) require.Len(t, traces, 2) @@ -444,8 +441,7 @@ func TestSpanReader_SearchAfter(t *testing.T) { }, nil).Times(2) trace, err := r.reader.GetTrace(context.Background(), model.NewTraceID(0, 1)) - require.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") - require.NoError(t, r.tracerCloser()) + require.NotEmpty(t, r.traceBuffer.GetSpans(), "Spans recorded") require.NoError(t, err) require.NotNil(t, trace) @@ -465,8 +461,7 @@ func TestSpanReader_GetTraceQueryError(t *testing.T) { Responses: []*elastic.SearchResult{}, }, nil) trace, err := r.reader.GetTrace(context.Background(), model.NewTraceID(0, 1)) - require.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") - require.NoError(t, r.tracerCloser()) + require.NotEmpty(t, r.traceBuffer.GetSpans(), "Spans recorded") require.EqualError(t, err, "trace not found") require.Nil(t, trace) }) @@ -486,8 +481,7 @@ func TestSpanReader_GetTraceNilHits(t *testing.T) { }, nil) trace, err := r.reader.GetTrace(context.Background(), model.NewTraceID(0, 1)) - require.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") - require.NoError(t, r.tracerCloser()) + require.NotEmpty(t, r.traceBuffer.GetSpans(), "Spans recorded") require.EqualError(t, err, "trace not found") require.Nil(t, trace) }) @@ -511,8 +505,7 @@ func TestSpanReader_GetTraceInvalidSpanError(t *testing.T) { }, nil) trace, err := r.reader.GetTrace(context.Background(), model.NewTraceID(0, 1)) - require.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") - require.NoError(t, r.tracerCloser()) + require.NotEmpty(t, r.traceBuffer.GetSpans(), "Spans recorded") require.Error(t, err, "invalid span") require.Nil(t, trace) }) @@ -537,8 +530,7 @@ func TestSpanReader_GetTraceSpanConversionError(t *testing.T) { }, nil) trace, err := r.reader.GetTrace(context.Background(), model.NewTraceID(0, 1)) - require.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") - require.NoError(t, r.tracerCloser()) + require.NotEmpty(t, r.traceBuffer.GetSpans(), "Spans recorded") require.Error(t, err, "span conversion error, because lacks elements") require.Nil(t, trace) }) @@ -768,8 +760,7 @@ func TestSpanReader_FindTraces(t *testing.T) { } traces, err := r.reader.FindTraces(context.Background(), traceQuery) - require.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") - require.NoError(t, r.tracerCloser()) + require.NotEmpty(t, r.traceBuffer.GetSpans(), "Spans recorded") require.NoError(t, err) assert.Len(t, traces, 1) @@ -814,8 +805,7 @@ func TestSpanReader_FindTracesInvalidQuery(t *testing.T) { } traces, err := r.reader.FindTraces(context.Background(), traceQuery) - require.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") - require.NoError(t, r.tracerCloser()) + require.NotEmpty(t, r.traceBuffer.GetSpans(), "Spans recorded") require.Error(t, err) assert.Nil(t, traces) }) @@ -848,8 +838,7 @@ func TestSpanReader_FindTracesAggregationFailure(t *testing.T) { } traces, err := r.reader.FindTraces(context.Background(), traceQuery) - require.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") - require.NoError(t, r.tracerCloser()) + require.NotEmpty(t, r.traceBuffer.GetSpans(), "Spans recorded") require.Error(t, err) assert.Nil(t, traces) }) @@ -884,8 +873,7 @@ func TestSpanReader_FindTracesNoTraceIDs(t *testing.T) { } traces, err := r.reader.FindTraces(context.Background(), traceQuery) - require.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") - require.NoError(t, r.tracerCloser()) + require.NotEmpty(t, r.traceBuffer.GetSpans(), "Spans recorded") require.NoError(t, err) assert.Len(t, traces, 0) }) @@ -919,8 +907,7 @@ func TestSpanReader_FindTracesReadTraceFailure(t *testing.T) { } traces, err := r.reader.FindTraces(context.Background(), traceQuery) - require.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") - require.NoError(t, r.tracerCloser()) + require.NotEmpty(t, r.traceBuffer.GetSpans(), "Spans recorded") require.EqualError(t, err, "read error") assert.Len(t, traces, 0) }) @@ -959,8 +946,7 @@ func TestSpanReader_FindTracesSpanCollectionFailure(t *testing.T) { } traces, err := r.reader.FindTraces(context.Background(), traceQuery) - require.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") - require.NoError(t, r.tracerCloser()) + require.NotEmpty(t, r.traceBuffer.GetSpans(), "Spans recorded") require.Error(t, err) assert.Len(t, traces, 0) }) @@ -1266,8 +1252,7 @@ func TestSpanReader_GetEmptyIndex(t *testing.T) { } services, err := r.reader.FindTraces(context.Background(), traceQuery) - require.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") - require.NoError(t, r.tracerCloser()) + require.NotEmpty(t, r.traceBuffer.GetSpans(), "Spans recorded") require.NoError(t, err) assert.Empty(t, services) }) @@ -1283,8 +1268,7 @@ func TestSpanReader_ArchiveTraces(t *testing.T) { }, nil) trace, err := r.reader.GetTrace(context.Background(), model.TraceID{}) - require.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") - require.NoError(t, r.tracerCloser()) + require.NotEmpty(t, r.traceBuffer.GetSpans(), "Spans recorded") require.Nil(t, trace) assert.EqualError(t, err, "trace not found") }) @@ -1300,8 +1284,7 @@ func TestSpanReader_ArchiveTraces_ReadAlias(t *testing.T) { }, nil) trace, err := r.reader.GetTrace(context.Background(), model.TraceID{}) - require.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded") - require.NoError(t, r.tracerCloser()) + require.NotEmpty(t, r.traceBuffer.GetSpans(), "Spans recorded") require.Nil(t, trace) assert.EqualError(t, err, "trace not found") }) From a037b42347caa0144065fc82651907a46dd8d277 Mon Sep 17 00:00:00 2001 From: Afzal <94980910+afzalbin64@users.noreply.github.com> Date: Sun, 30 Jul 2023 19:48:44 +0000 Subject: [PATCH 15/16] defers the call Signed-off-by: Afzal <94980910+afzalbin64@users.noreply.github.com> --- plugin/storage/es/spanstore/reader_test.go | 6 +++--- plugin/storage/integration/elasticsearch_test.go | 5 +---- 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/plugin/storage/es/spanstore/reader_test.go b/plugin/storage/es/spanstore/reader_test.go index 3d358d8ef10..b13f70b23bd 100644 --- a/plugin/storage/es/spanstore/reader_test.go +++ b/plugin/storage/es/spanstore/reader_test.go @@ -108,6 +108,7 @@ func tracerProvider() (trace.TracerProvider, *tracetest.InMemoryExporter, func() func withSpanReader(fn func(r *spanReaderTest)) { client := &mocks.Client{} tracer, exp, closer := tracerProvider() + defer closer() logger, logBuffer := testutils.NewLogger() r := &spanReaderTest{ client: client, @@ -124,13 +125,13 @@ func withSpanReader(fn func(r *spanReaderTest)) { MaxDocCount: defaultMaxDocCount, }), } - defer closer() fn(r) } func withArchiveSpanReader(readAlias bool, fn func(r *spanReaderTest)) { client := &mocks.Client{} tracer, exp, closer := tracerProvider() + defer closer() logger, logBuffer := testutils.NewLogger() r := &spanReaderTest{ client: client, @@ -148,7 +149,6 @@ func withArchiveSpanReader(readAlias bool, fn func(r *spanReaderTest)) { UseReadWriteAliases: readAlias, }), } - defer closer() fn(r) } @@ -195,6 +195,7 @@ func TestSpanReaderIndices(t *testing.T) { metricsFactory := metricstest.NewFactory(0) logger, _ := testutils.NewLogger() tracer, _, closer := tracerProvider() + defer closer() testCases := []struct { indices []string @@ -306,7 +307,6 @@ func TestSpanReaderIndices(t *testing.T) { actualService := r.timeRangeIndices(r.serviceIndexPrefix, r.serviceIndexDateLayout, date, date, -24*time.Hour) assert.Equal(t, testCase.indices, append(actualSpan, actualService...)) } - require.NoError(t, closer()) } func TestSpanReader_GetTrace(t *testing.T) { diff --git a/plugin/storage/integration/elasticsearch_test.go b/plugin/storage/integration/elasticsearch_test.go index b6cc77f93f6..524b05e7d24 100644 --- a/plugin/storage/integration/elasticsearch_test.go +++ b/plugin/storage/integration/elasticsearch_test.go @@ -157,6 +157,7 @@ func (s *ESStorageIntegration) initSpanstore(allTagsAsFields, archive bool) erro return err } tracer, _, closer := tracerProvider() + defer closer() s.SpanWriter = w s.SpanReader = spanstore.NewSpanReader(spanstore.SpanReaderParams{ Client: client, @@ -177,10 +178,6 @@ func (s *ESStorageIntegration) initSpanstore(allTagsAsFields, archive bool) erro MaxDocCount: defaultMaxDocCount, }) - if closer != nil { - return err - } - depMapping, err := mappingBuilder.GetDependenciesMappings() if err != nil { return err From a98a0ed30e30bec2259c2995589e8f803306c042 Mon Sep 17 00:00:00 2001 From: Afzal <94980910+afzalbin64@users.noreply.github.com> Date: Sun, 30 Jul 2023 19:54:40 +0000 Subject: [PATCH 16/16] sets span status Signed-off-by: Afzal <94980910+afzalbin64@users.noreply.github.com> --- plugin/storage/es/spanstore/reader.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/plugin/storage/es/spanstore/reader.go b/plugin/storage/es/spanstore/reader.go index c124d1391cf..d21ba25c28c 100644 --- a/plugin/storage/es/spanstore/reader.go +++ b/plugin/storage/es/spanstore/reader.go @@ -25,6 +25,7 @@ import ( "github.com/olivere/elastic" "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/trace" "go.uber.org/zap" @@ -697,4 +698,5 @@ func (s *SpanReader) buildObjectQuery(field string, k string, v string) elastic. func logErrorToSpan(span trace.Span, err error) { span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) }