From 026ab6ec774117f4cb8ba39da9be22a67652823d Mon Sep 17 00:00:00 2001 From: Pavol Loffay Date: Wed, 2 Sep 2020 16:32:11 +0200 Subject: [PATCH] Use partial retries in ES spanstore Signed-off-by: Pavol Loffay --- .../esmodeltranslator/modeltranslator.go | 29 ++- .../esmodeltranslator/modeltranslator_test.go | 100 +++++----- .../elasticsearchexporter/exporter.go | 4 +- .../exporter/elasticsearchexporter/factory.go | 2 +- .../elasticsearchexporter/spanstore.go | 80 +++++--- .../elasticsearchexporter/spanstore_test.go | 186 +++++++++++++++++- .../elasticsearchexporter/storagefactory.go | 5 +- .../storagefactory_test.go | 7 +- cmd/opentelemetry/go.sum | 1 - 9 files changed, 322 insertions(+), 92 deletions(-) diff --git a/cmd/opentelemetry/app/exporter/elasticsearchexporter/esmodeltranslator/modeltranslator.go b/cmd/opentelemetry/app/exporter/elasticsearchexporter/esmodeltranslator/modeltranslator.go index 1267be265eb..0216ad92e1c 100644 --- a/cmd/opentelemetry/app/exporter/elasticsearchexporter/esmodeltranslator/modeltranslator.go +++ b/cmd/opentelemetry/app/exporter/elasticsearchexporter/esmodeltranslator/modeltranslator.go @@ -61,26 +61,34 @@ func NewTranslator(allTagsAsFields bool, tagsKeysAsFields []string, tagDotReplac } } +// ConvertedData holds DB span and the original data used to construct it. +type ConvertedData struct { + Span pdata.Span + Resource pdata.Resource + InstrumentationLibrary pdata.InstrumentationLibrary + DBSpan *dbmodel.Span +} + // ConvertSpans converts spans from OTEL model to Jaeger Elasticsearch model -func (c *Translator) ConvertSpans(traces pdata.Traces) ([]*dbmodel.Span, error) { +func (c *Translator) ConvertSpans(traces pdata.Traces) ([]ConvertedData, error) { rss := traces.ResourceSpans() if rss.Len() == 0 { return nil, nil } - dbSpans := make([]*dbmodel.Span, 0, traces.SpanCount()) + containers := make([]ConvertedData, 0, traces.SpanCount()) for i := 0; i < rss.Len(); i++ { // this would correspond to a single batch - err := c.resourceSpans(rss.At(i), &dbSpans) + err := c.resourceSpans(rss.At(i), &containers) if err != nil { return nil, err } } - return dbSpans, nil + return containers, nil } -func (c *Translator) resourceSpans(spans pdata.ResourceSpans, dbSpans *[]*dbmodel.Span) error { - ils := spans.InstrumentationLibrarySpans() - process := c.process(spans.Resource()) +func (c *Translator) resourceSpans(rspans pdata.ResourceSpans, containers *[]ConvertedData) error { + ils := rspans.InstrumentationLibrarySpans() + process := c.process(rspans.Resource()) for i := 0; i < ils.Len(); i++ { // TODO convert instrumentation library info //ils.At(i).InstrumentationLibrary() @@ -91,7 +99,12 @@ func (c *Translator) resourceSpans(spans pdata.ResourceSpans, dbSpans *[]*dbmode return err } dbSpan.Process = *process - *dbSpans = append(*dbSpans, dbSpan) + *containers = append(*containers, ConvertedData{ + Span: spans.At(j), + Resource: rspans.Resource(), + InstrumentationLibrary: ils.At(i).InstrumentationLibrary(), + DBSpan: dbSpan, + }) } } return nil diff --git a/cmd/opentelemetry/app/exporter/elasticsearchexporter/esmodeltranslator/modeltranslator_test.go b/cmd/opentelemetry/app/exporter/elasticsearchexporter/esmodeltranslator/modeltranslator_test.go index 52cbab7a24c..9b8cd177c24 100644 --- a/cmd/opentelemetry/app/exporter/elasticsearchexporter/esmodeltranslator/modeltranslator_test.go +++ b/cmd/opentelemetry/app/exporter/elasticsearchexporter/esmodeltranslator/modeltranslator_test.go @@ -129,34 +129,40 @@ func TestConvertSpan(t *testing.T) { c := &Translator{ tagKeysAsFields: map[string]bool{"toTagMap": true}, } - spans, err := c.ConvertSpans(traces) + spanDataContainers, err := c.ConvertSpans(traces) require.NoError(t, err) - assert.Equal(t, 1, len(spans)) - assert.Equal(t, &dbmodel.Span{ - TraceID: "30313233343536373839616263646566", - SpanID: "3031323334353637", - StartTime: 1000, - Duration: 1000, - OperationName: "root", - StartTimeMillis: 1, - Tags: []dbmodel.KeyValue{ - {Key: "span.kind", Type: dbmodel.StringType, Value: "client"}, - {Key: "status.code", Type: dbmodel.StringType, Value: "Cancelled"}, - {Key: "error", Type: dbmodel.BoolType, Value: "true"}, - {Key: "status.message", Type: dbmodel.StringType, Value: "messagetext"}, - {Key: "foo", Type: dbmodel.BoolType, Value: "true"}}, - Tag: map[string]interface{}{"toTagMap": "val"}, - Logs: []dbmodel.Log{{Fields: []dbmodel.KeyValue{ - {Key: "event", Value: "eventName", Type: dbmodel.StringType}, - {Key: "foo", Value: "bar", Type: dbmodel.StringType}}, Timestamp: 500}}, - References: []dbmodel.Reference{ - {SpanID: "3031323334353637", TraceID: "30313233343536373839616263646566", RefType: dbmodel.ChildOf}, - {SpanID: "3031323334353637", TraceID: "30313233343536373839616263646566", RefType: dbmodel.FollowsFrom}}, - Process: dbmodel.Process{ - ServiceName: "myservice", - Tags: []dbmodel.KeyValue{{Key: "num", Value: "16.66", Type: dbmodel.Float64Type}}, - }, - }, spans[0]) + assert.Equal(t, 1, len(spanDataContainers)) + assert.Equal(t, + ConvertedData{ + Span: span, + Resource: resource, + InstrumentationLibrary: traces.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).InstrumentationLibrary(), + DBSpan: &dbmodel.Span{ + TraceID: "30313233343536373839616263646566", + SpanID: "3031323334353637", + StartTime: 1000, + Duration: 1000, + OperationName: "root", + StartTimeMillis: 1, + Tags: []dbmodel.KeyValue{ + {Key: "span.kind", Type: dbmodel.StringType, Value: "client"}, + {Key: "status.code", Type: dbmodel.StringType, Value: "Cancelled"}, + {Key: "error", Type: dbmodel.BoolType, Value: "true"}, + {Key: "status.message", Type: dbmodel.StringType, Value: "messagetext"}, + {Key: "foo", Type: dbmodel.BoolType, Value: "true"}}, + Tag: map[string]interface{}{"toTagMap": "val"}, + Logs: []dbmodel.Log{{Fields: []dbmodel.KeyValue{ + {Key: "event", Value: "eventName", Type: dbmodel.StringType}, + {Key: "foo", Value: "bar", Type: dbmodel.StringType}}, Timestamp: 500}}, + References: []dbmodel.Reference{ + {SpanID: "3031323334353637", TraceID: "30313233343536373839616263646566", RefType: dbmodel.ChildOf}, + {SpanID: "3031323334353637", TraceID: "30313233343536373839616263646566", RefType: dbmodel.FollowsFrom}}, + Process: dbmodel.Process{ + ServiceName: "myservice", + Tags: []dbmodel.KeyValue{{Key: "num", Value: "16.66", Type: dbmodel.Float64Type}}, + }, + }, + }, spanDataContainers[0]) } func TestSpanEmptyRef(t *testing.T) { @@ -166,24 +172,30 @@ func TestSpanEmptyRef(t *testing.T) { span.SetEndTime(pdata.TimestampUnixNano(2000000)) c := &Translator{} - spans, err := c.ConvertSpans(traces) + spanDataContainers, err := c.ConvertSpans(traces) require.NoError(t, err) - assert.Equal(t, 1, len(spans)) - assert.Equal(t, &dbmodel.Span{ - TraceID: "30313233343536373839616263646566", - SpanID: "3031323334353637", - StartTime: 1000, - Duration: 1000, - OperationName: "root", - StartTimeMillis: 1, - Tags: []dbmodel.KeyValue{}, // should not be nil - Logs: []dbmodel.Log{}, // should not be nil - References: []dbmodel.Reference{}, // should not be nil - Process: dbmodel.Process{ - ServiceName: "myservice", - Tags: nil, - }, - }, spans[0]) + assert.Equal(t, 1, len(spanDataContainers)) + assert.Equal(t, + ConvertedData{ + Span: span, + Resource: traces.ResourceSpans().At(0).Resource(), + InstrumentationLibrary: traces.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).InstrumentationLibrary(), + DBSpan: &dbmodel.Span{ + TraceID: "30313233343536373839616263646566", + SpanID: "3031323334353637", + StartTime: 1000, + Duration: 1000, + OperationName: "root", + StartTimeMillis: 1, + Tags: []dbmodel.KeyValue{}, // should not be nil + Logs: []dbmodel.Log{}, // should not be nil + References: []dbmodel.Reference{}, // should not be nil + Process: dbmodel.Process{ + ServiceName: "myservice", + Tags: nil, + }, + }, + }, spanDataContainers[0]) } func TestEmpty(t *testing.T) { diff --git a/cmd/opentelemetry/app/exporter/elasticsearchexporter/exporter.go b/cmd/opentelemetry/app/exporter/elasticsearchexporter/exporter.go index 2d71a1eba99..d330452472a 100644 --- a/cmd/opentelemetry/app/exporter/elasticsearchexporter/exporter.go +++ b/cmd/opentelemetry/app/exporter/elasticsearchexporter/exporter.go @@ -23,8 +23,8 @@ import ( "github.com/jaegertracing/jaeger/plugin/storage/es" ) -// new creates Elasticsearch exporter/storage. -func new(ctx context.Context, config *Config, params component.ExporterCreateParams) (component.TraceExporter, error) { +// newExporter creates Elasticsearch exporter/storage. +func newExporter(ctx context.Context, config *Config, params component.ExporterCreateParams) (component.TraceExporter, error) { esCfg := config.GetPrimary() w, err := newEsSpanWriter(*esCfg, params.Logger, false, config.Name()) if err != nil { diff --git a/cmd/opentelemetry/app/exporter/elasticsearchexporter/factory.go b/cmd/opentelemetry/app/exporter/elasticsearchexporter/factory.go index 8d42c88745b..20611f23fd1 100644 --- a/cmd/opentelemetry/app/exporter/elasticsearchexporter/factory.go +++ b/cmd/opentelemetry/app/exporter/elasticsearchexporter/factory.go @@ -78,7 +78,7 @@ func (Factory) CreateTraceExporter( if !ok { return nil, fmt.Errorf("could not cast configuration to %s", TypeStr) } - return new(ctx, esCfg, params) + return newExporter(ctx, esCfg, params) } // CreateMetricsExporter is not implemented. diff --git a/cmd/opentelemetry/app/exporter/elasticsearchexporter/spanstore.go b/cmd/opentelemetry/app/exporter/elasticsearchexporter/spanstore.go index 0ed3a7ed32a..241955d00a4 100644 --- a/cmd/opentelemetry/app/exporter/elasticsearchexporter/spanstore.go +++ b/cmd/opentelemetry/app/exporter/elasticsearchexporter/spanstore.go @@ -37,6 +37,7 @@ import ( "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/pkg/cache" "github.com/jaegertracing/jaeger/pkg/es/config" + "github.com/jaegertracing/jaeger/pkg/multierror" "github.com/jaegertracing/jaeger/plugin/storage/es/spanstore/dbmodel" ) @@ -114,69 +115,75 @@ func (w *esSpanWriter) WriteTraces(ctx context.Context, traces pdata.Traces) (in return w.writeSpans(ctx, spans) } -func (w *esSpanWriter) writeSpans(ctx context.Context, spans []*dbmodel.Span) (int, error) { +func (w *esSpanWriter) writeSpans(ctx context.Context, spansData []esmodeltranslator.ConvertedData) (int, error) { buffer := &bytes.Buffer{} // mapping for bulk operation to span - var bulkOperations []bulkItem + var bulkItems []bulkItem var errs []error dropped := 0 - for _, span := range spans { - data, err := json.Marshal(span) + for _, spanData := range spansData { + data, err := json.Marshal(spanData.DBSpan) if err != nil { errs = append(errs, err) dropped++ continue } - indexName := w.spanIndexName.IndexName(model.EpochMicrosecondsAsTime(span.StartTime)) - bulkOperations = append(bulkOperations, bulkItem{span: span, isService: false}) + indexName := w.spanIndexName.IndexName(model.EpochMicrosecondsAsTime(spanData.DBSpan.StartTime)) + bulkItems = append(bulkItems, bulkItem{spanData: spanData, isService: false}) w.client.AddDataToBulkBuffer(buffer, data, indexName, spanTypeName) - if !w.isArchive { - storeService, err := w.writeService(span, buffer) + storeService, err := w.writeService(spanData.DBSpan, buffer) if err != nil { errs = append(errs, err) // dropped is not increased since this is only service name, the span could be written well continue } else if storeService { - bulkOperations = append(bulkOperations, bulkItem{span: span, isService: true}) + bulkItems = append(bulkItems, bulkItem{spanData: spanData, isService: true}) } } } - res, err := w.client.Bulk(ctx, bytes.NewReader(buffer.Bytes())) + res, err := w.client.Bulk(ctx, buffer) + if err != nil { + errs = append(errs, err) + return len(spansData), componenterror.CombineErrors(errs) + } + failedOperations, err := w.handleResponse(ctx, res, bulkItems) if err != nil { errs = append(errs, err) - return len(spans), componenterror.CombineErrors(errs) } - droppedFromResponse := w.handleResponse(ctx, res, bulkOperations) - dropped += droppedFromResponse + dropped += len(failedOperations) + if len(failedOperations) > 0 { + return dropped, consumererror.PartialTracesError(componenterror.CombineErrors(errs), bulkItemsToTraces(failedOperations)) + } return dropped, componenterror.CombineErrors(errs) } -func (w *esSpanWriter) handleResponse(ctx context.Context, blk *esclient.BulkResponse, operationToSpan []bulkItem) int { - numErrors := 0 +// handleResponse processes blk response and returns spans that +func (w *esSpanWriter) handleResponse(ctx context.Context, blk *esclient.BulkResponse, bulkItems []bulkItem) ([]bulkItem, error) { storedSpans := map[string]int64{} notStoredSpans := map[string]int64{} + var failed []bulkItem + var errs []error for i, d := range blk.Items { - bulkOp := operationToSpan[i] + bulkItem := bulkItems[i] if d.Index.Status > 201 { - numErrors++ w.logger.Error("Part of the bulk request failed", zap.String("result", d.Index.Result), zap.String("error.reason", d.Index.Error.Reason), zap.String("error.type", d.Index.Error.Type), zap.String("error.cause.type", d.Index.Error.Cause.Type), zap.String("error.cause.reason", d.Index.Error.Cause.Reason)) - // TODO return an error or a struct that indicates which spans should be retried - // https://github.com/open-telemetry/opentelemetry-collector/issues/990 - if !bulkOp.isService { - notStoredSpans[bulkOp.span.Process.ServiceName] = notStoredSpans[bulkOp.span.Process.ServiceName] + 1 + errs = append(errs, fmt.Errorf("bulk request failed, reason %v, result: %v", d.Index.Error.Reason, d.Index.Result)) + if !bulkItem.isService { + failed = append(failed, bulkItem) + notStoredSpans[bulkItem.spanData.DBSpan.Process.ServiceName] = notStoredSpans[bulkItem.spanData.DBSpan.Process.ServiceName] + 1 } } else { // passed - if !bulkOp.isService { - storedSpans[bulkOp.span.Process.ServiceName] = storedSpans[bulkOp.span.Process.ServiceName] + 1 + if !bulkItem.isService { + storedSpans[bulkItem.spanData.DBSpan.Process.ServiceName] = storedSpans[bulkItem.spanData.DBSpan.Process.ServiceName] + 1 } else { - cacheKey := hashCode(bulkOp.span.Process.ServiceName, bulkOp.span.OperationName) + cacheKey := hashCode(bulkItem.spanData.DBSpan.Process.ServiceName, bulkItem.spanData.DBSpan.OperationName) w.serviceCache.Put(cacheKey, cacheKey) } } @@ -191,7 +198,7 @@ func (w *esSpanWriter) handleResponse(ctx context.Context, blk *esclient.BulkRes tag.Insert(storagemetrics.TagServiceName(), k), w.nameTag) stats.Record(ctx, storagemetrics.StatSpansStoredCount().M(v)) } - return numErrors + return failed, multierror.Wrap(errs) } func (w *esSpanWriter) writeService(span *dbmodel.Span, buffer *bytes.Buffer) (bool, error) { @@ -221,7 +228,7 @@ func hashCode(serviceName, operationName string) string { type bulkItem struct { // span associated with the bulk operation - span *dbmodel.Span + spanData esmodeltranslator.ConvertedData // isService indicates that this bulk operation is for service index isService bool } @@ -229,3 +236,24 @@ type bulkItem struct { func (w *esSpanWriter) esClientVersion() int { return w.client.MajorVersion() } + +func bulkItemsToTraces(bulkItems []bulkItem) pdata.Traces { + traces := pdata.NewTraces() + traces.ResourceSpans().Resize(len(bulkItems)) + for i, op := range bulkItems { + spanData := op.spanData + rss := traces.ResourceSpans().At(i) + if !spanData.Resource.IsNil() { + rss.Resource().InitEmpty() + rss.Resource().Attributes().InitFromAttributeMap(spanData.Resource.Attributes()) + } + rss.InstrumentationLibrarySpans().Resize(1) + ispans := rss.InstrumentationLibrarySpans().At(0) + ispans.InitEmpty() + if !spanData.InstrumentationLibrary.IsNil() { + spanData.InstrumentationLibrary.CopyTo(ispans.InstrumentationLibrary()) + } + ispans.Spans().Append(&spanData.Span) + } + return traces +} diff --git a/cmd/opentelemetry/app/exporter/elasticsearchexporter/spanstore_test.go b/cmd/opentelemetry/app/exporter/elasticsearchexporter/spanstore_test.go index 2c3460541a4..5a48da5c1bd 100644 --- a/cmd/opentelemetry/app/exporter/elasticsearchexporter/spanstore_test.go +++ b/cmd/opentelemetry/app/exporter/elasticsearchexporter/spanstore_test.go @@ -15,16 +15,24 @@ package elasticsearchexporter import ( + "bytes" "context" + "io" "testing" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opencensus.io/stats/view" + "go.opencensus.io/tag" + "go.opentelemetry.io/collector/consumer/consumererror" + "go.opentelemetry.io/collector/consumer/pdata" "go.uber.org/zap" + "github.com/jaegertracing/jaeger/cmd/opentelemetry/app/exporter/elasticsearchexporter/esmodeltranslator" "github.com/jaegertracing/jaeger/cmd/opentelemetry/app/exporter/storagemetrics" "github.com/jaegertracing/jaeger/cmd/opentelemetry/app/internal/esclient" + "github.com/jaegertracing/jaeger/cmd/opentelemetry/app/internal/esutil" + "github.com/jaegertracing/jaeger/pkg/cache" "github.com/jaegertracing/jaeger/pkg/es/config" "github.com/jaegertracing/jaeger/plugin/storage/es/spanstore/dbmodel" ) @@ -40,18 +48,39 @@ func TestMetrics(t *testing.T) { {Index: esclient.BulkIndexResponse{Status: 500}}, } blkItms := []bulkItem{ - {isService: true, span: &dbmodel.Span{}}, - {isService: true, span: &dbmodel.Span{}}, - {span: &dbmodel.Span{Process: dbmodel.Process{ServiceName: "foo"}}}, - {span: &dbmodel.Span{Process: dbmodel.Process{ServiceName: "foo"}}}, + {isService: true, spanData: esmodeltranslator.ConvertedData{ + DBSpan: &dbmodel.Span{}, + Span: pdata.NewSpan(), + Resource: pdata.NewResource(), + InstrumentationLibrary: pdata.NewInstrumentationLibrary(), + }}, + {isService: true, spanData: esmodeltranslator.ConvertedData{ + DBSpan: &dbmodel.Span{}, + Span: pdata.NewSpan(), + Resource: pdata.NewResource(), + InstrumentationLibrary: pdata.NewInstrumentationLibrary(), + }}, + {isService: false, spanData: esmodeltranslator.ConvertedData{ + DBSpan: &dbmodel.Span{Process: dbmodel.Process{ServiceName: "foo"}}, + Span: pdata.NewSpan(), + Resource: pdata.NewResource(), + InstrumentationLibrary: pdata.NewInstrumentationLibrary(), + }}, + {isService: false, spanData: esmodeltranslator.ConvertedData{ + DBSpan: &dbmodel.Span{Process: dbmodel.Process{ServiceName: "foo"}}, + Span: pdata.NewSpan(), + Resource: pdata.NewResource(), + InstrumentationLibrary: pdata.NewInstrumentationLibrary(), + }}, } views := storagemetrics.MetricViews() require.NoError(t, view.Register(views...)) defer view.Unregister(views...) - errs := w.handleResponse(context.Background(), response, blkItms) - assert.Equal(t, 2, errs) + failedOperations, err := w.handleResponse(context.Background(), response, blkItms) + require.Error(t, err) + assert.Equal(t, 1, len(failedOperations)) viewData, err := view.RetrieveData(storagemetrics.StatSpansStoredCount().Name()) require.NoError(t, err) @@ -65,3 +94,148 @@ func TestMetrics(t *testing.T) { distData = viewData[0].Data.(*view.SumData) assert.Equal(t, float64(1), distData.Value) } + +func TestBulkItemsToTraces(t *testing.T) { + t.Run("empty", func(t *testing.T) { + traces := bulkItemsToTraces([]bulkItem{}) + assert.Equal(t, 0, traces.SpanCount()) + }) + t.Run("one_span", func(t *testing.T) { + span := pdata.NewSpan() + span.InitEmpty() + span.SetName("name") + resource := pdata.NewResource() + resource.InitEmpty() + resource.Attributes().Insert("key", pdata.NewAttributeValueString("val")) + inst := pdata.NewInstrumentationLibrary() + inst.InitEmpty() + inst.SetName("name") + traces := bulkItemsToTraces([]bulkItem{ + { + spanData: esmodeltranslator.ConvertedData{ + Span: span, + Resource: resource, + InstrumentationLibrary: inst, + DBSpan: nil, + }, + isService: false, + }, + }) + expectedTraces := pdata.NewTraces() + expectedTraces.ResourceSpans().Resize(1) + rss := expectedTraces.ResourceSpans().At(0) + resource.CopyTo(rss.Resource()) + rss.InstrumentationLibrarySpans().Resize(1) + inst.CopyTo(rss.InstrumentationLibrarySpans().At(0).InstrumentationLibrary()) + rss.InstrumentationLibrarySpans().At(0).Spans().Resize(1) + span.CopyTo(rss.InstrumentationLibrarySpans().At(0).Spans().At(0)) + assert.Equal(t, expectedTraces, traces) + }) +} + +func TestWriteSpans(t *testing.T) { + esClient := &mockESClient{ + bulkResponse: &esclient.BulkResponse{ + Errors: false, + Items: []esclient.BulkResponseItem{ + { + Index: esclient.BulkIndexResponse{}, + }, + }, + }, + } + w := esSpanWriter{ + logger: zap.NewNop(), + client: esClient, + spanIndexName: esutil.NewIndexNameProvider("span", "", esutil.AliasNone, false), + serviceIndexName: esutil.NewIndexNameProvider("service", "", esutil.AliasNone, false), + serviceCache: cache.NewLRU(1), + nameTag: tag.Insert(storagemetrics.TagExporterName(), "name"), + } + + t.Run("zero_spans_failed", func(t *testing.T) { + dropped, err := w.writeSpans(context.Background(), []esmodeltranslator.ConvertedData{ + { + DBSpan: &dbmodel.Span{}, + }, + }) + assert.Equal(t, 0, dropped) + assert.NoError(t, err) + esClient.bulkResponse = &esclient.BulkResponse{ + Items: []esclient.BulkResponseItem{ + { + Index: esclient.BulkIndexResponse{ + Status: 500, + }, + }, + }, + } + }) + t.Run("one_span_failed", func(t *testing.T) { + span := pdata.NewSpan() + span.InitEmpty() + span.SetName("name") + resource := pdata.NewResource() + resource.InitEmpty() + resource.Attributes().Insert("key", pdata.NewAttributeValueString("val")) + inst := pdata.NewInstrumentationLibrary() + inst.InitEmpty() + inst.SetName("name") + traces := bulkItemsToTraces([]bulkItem{{ + spanData: esmodeltranslator.ConvertedData{ + Span: span, + Resource: resource, + InstrumentationLibrary: inst, + DBSpan: nil, + }, + isService: false, + }}) + + dropped, err := w.writeSpans(context.Background(), []esmodeltranslator.ConvertedData{ + { + DBSpan: &dbmodel.Span{}, + Span: span, + Resource: resource, + InstrumentationLibrary: inst, + }, + }) + assert.Equal(t, 1, dropped) + assert.Error(t, err) + partialErr, ok := err.(consumererror.PartialError) + require.True(t, ok) + assert.Equal(t, traces, partialErr.GetTraces()) + }) +} + +type mockESClient struct { + bulkResponse *esclient.BulkResponse +} + +var _ esclient.ElasticsearchClient = (*mockESClient)(nil) + +func (m mockESClient) PutTemplate(ctx context.Context, name string, template io.Reader) error { + panic("implement me") +} + +func (m mockESClient) Bulk(ctx context.Context, bulkBody io.Reader) (*esclient.BulkResponse, error) { + return m.bulkResponse, nil +} + +func (m mockESClient) AddDataToBulkBuffer(bulkBody *bytes.Buffer, data []byte, index, typ string) { +} + +func (m mockESClient) Index(ctx context.Context, body io.Reader, index, typ string) error { + panic("implement me") +} + +func (m mockESClient) Search(ctx context.Context, query esclient.SearchBody, size int, indices ...string) (*esclient.SearchResponse, error) { + panic("implement me") +} + +func (m mockESClient) MultiSearch(ctx context.Context, queries []esclient.SearchBody) (*esclient.MultiSearchResponse, error) { + panic("implement me") +} + +func (m mockESClient) MajorVersion() int { + panic("implement me") +} diff --git a/cmd/opentelemetry/app/exporter/elasticsearchexporter/storagefactory.go b/cmd/opentelemetry/app/exporter/elasticsearchexporter/storagefactory.go index c631314e70f..75f8b0ac506 100644 --- a/cmd/opentelemetry/app/exporter/elasticsearchexporter/storagefactory.go +++ b/cmd/opentelemetry/app/exporter/elasticsearchexporter/storagefactory.go @@ -21,6 +21,7 @@ import ( "github.com/uber/jaeger-lib/metrics" "go.uber.org/zap" + "github.com/jaegertracing/jaeger/cmd/opentelemetry/app/exporter/elasticsearchexporter/esmodeltranslator" "github.com/jaegertracing/jaeger/cmd/opentelemetry/app/internal/esclient" "github.com/jaegertracing/jaeger/cmd/opentelemetry/app/internal/reader/es/esdependencyreader" "github.com/jaegertracing/jaeger/cmd/opentelemetry/app/internal/reader/es/esspanreader" @@ -143,13 +144,13 @@ type singleSpanWriter struct { } type batchSpanWriter interface { - writeSpans(context.Context, []*dbmodel.Span) (int, error) + writeSpans(context.Context, []esmodeltranslator.ConvertedData) (int, error) } var _ spanstore.Writer = (*singleSpanWriter)(nil) func (s singleSpanWriter) WriteSpan(ctx context.Context, span *model.Span) error { dbSpan := s.converter.FromDomainEmbedProcess(span) - _, err := s.writer.writeSpans(ctx, []*dbmodel.Span{dbSpan}) + _, err := s.writer.writeSpans(ctx, []esmodeltranslator.ConvertedData{{DBSpan: dbSpan}}) return err } diff --git a/cmd/opentelemetry/app/exporter/elasticsearchexporter/storagefactory_test.go b/cmd/opentelemetry/app/exporter/elasticsearchexporter/storagefactory_test.go index 00c96678bb6..fac86347ffd 100644 --- a/cmd/opentelemetry/app/exporter/elasticsearchexporter/storagefactory_test.go +++ b/cmd/opentelemetry/app/exporter/elasticsearchexporter/storagefactory_test.go @@ -22,6 +22,7 @@ import ( "github.com/stretchr/testify/require" "go.uber.org/zap" + "github.com/jaegertracing/jaeger/cmd/opentelemetry/app/exporter/elasticsearchexporter/esmodeltranslator" "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/pkg/es/config" "github.com/jaegertracing/jaeger/plugin/storage/es" @@ -139,7 +140,9 @@ type mockWriter struct { var _ batchSpanWriter = (*mockWriter)(nil) -func (m *mockWriter) writeSpans(_ context.Context, spans []*dbmodel.Span) (int, error) { - m.spans = append(spans) +func (m *mockWriter) writeSpans(ctx context.Context, containers []esmodeltranslator.ConvertedData) (int, error) { + for _, c := range containers { + m.spans = append(m.spans, c.DBSpan) + } return 0, nil } diff --git a/cmd/opentelemetry/go.sum b/cmd/opentelemetry/go.sum index 82026f37ff2..ed305721fa8 100644 --- a/cmd/opentelemetry/go.sum +++ b/cmd/opentelemetry/go.sum @@ -1471,7 +1471,6 @@ google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQ google.golang.org/grpc v1.26.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= google.golang.org/grpc v1.27.1/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= -google.golang.org/grpc v1.28.0-pre/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= google.golang.org/grpc v1.28.0/go.mod h1:rpkK4SK4GF4Ach/+MFLZUBavHOvF2JJB5uozKKal+60= google.golang.org/grpc v1.29.1/go.mod h1:itym6AZVZYACWQqET3MqgPpjcuV5QH3BxFS3IjizoKk= google.golang.org/grpc v1.30.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak=