Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add partial retry capability to OTEL ES exporter. #2456

Merged
merged 1 commit into from
Sep 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,26 +61,34 @@ func NewTranslator(allTagsAsFields bool, tagsKeysAsFields []string, tagDotReplac
}
}

// ConvertedData holds DB span and the original data used to construct it.
type ConvertedData struct {
Span pdata.Span
Resource pdata.Resource
InstrumentationLibrary pdata.InstrumentationLibrary
DBSpan *dbmodel.Span
}

// ConvertSpans converts spans from OTEL model to Jaeger Elasticsearch model
func (c *Translator) ConvertSpans(traces pdata.Traces) ([]*dbmodel.Span, error) {
func (c *Translator) ConvertSpans(traces pdata.Traces) ([]ConvertedData, error) {
rss := traces.ResourceSpans()
if rss.Len() == 0 {
return nil, nil
}
dbSpans := make([]*dbmodel.Span, 0, traces.SpanCount())
containers := make([]ConvertedData, 0, traces.SpanCount())
for i := 0; i < rss.Len(); i++ {
// this would correspond to a single batch
err := c.resourceSpans(rss.At(i), &dbSpans)
err := c.resourceSpans(rss.At(i), &containers)
if err != nil {
return nil, err
}
}
return dbSpans, nil
return containers, nil
}

func (c *Translator) resourceSpans(spans pdata.ResourceSpans, dbSpans *[]*dbmodel.Span) error {
ils := spans.InstrumentationLibrarySpans()
process := c.process(spans.Resource())
func (c *Translator) resourceSpans(rspans pdata.ResourceSpans, containers *[]ConvertedData) error {
ils := rspans.InstrumentationLibrarySpans()
process := c.process(rspans.Resource())
for i := 0; i < ils.Len(); i++ {
// TODO convert instrumentation library info
//ils.At(i).InstrumentationLibrary()
Expand All @@ -91,7 +99,12 @@ func (c *Translator) resourceSpans(spans pdata.ResourceSpans, dbSpans *[]*dbmode
return err
}
dbSpan.Process = *process
*dbSpans = append(*dbSpans, dbSpan)
*containers = append(*containers, ConvertedData{
Span: spans.At(j),
Resource: rspans.Resource(),
InstrumentationLibrary: ils.At(i).InstrumentationLibrary(),
DBSpan: dbSpan,
})
}
}
return nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,34 +129,40 @@ func TestConvertSpan(t *testing.T) {
c := &Translator{
tagKeysAsFields: map[string]bool{"toTagMap": true},
}
spans, err := c.ConvertSpans(traces)
spanDataContainers, err := c.ConvertSpans(traces)
require.NoError(t, err)
assert.Equal(t, 1, len(spans))
assert.Equal(t, &dbmodel.Span{
TraceID: "30313233343536373839616263646566",
SpanID: "3031323334353637",
StartTime: 1000,
Duration: 1000,
OperationName: "root",
StartTimeMillis: 1,
Tags: []dbmodel.KeyValue{
{Key: "span.kind", Type: dbmodel.StringType, Value: "client"},
{Key: "status.code", Type: dbmodel.StringType, Value: "Cancelled"},
{Key: "error", Type: dbmodel.BoolType, Value: "true"},
{Key: "status.message", Type: dbmodel.StringType, Value: "messagetext"},
{Key: "foo", Type: dbmodel.BoolType, Value: "true"}},
Tag: map[string]interface{}{"toTagMap": "val"},
Logs: []dbmodel.Log{{Fields: []dbmodel.KeyValue{
{Key: "event", Value: "eventName", Type: dbmodel.StringType},
{Key: "foo", Value: "bar", Type: dbmodel.StringType}}, Timestamp: 500}},
References: []dbmodel.Reference{
{SpanID: "3031323334353637", TraceID: "30313233343536373839616263646566", RefType: dbmodel.ChildOf},
{SpanID: "3031323334353637", TraceID: "30313233343536373839616263646566", RefType: dbmodel.FollowsFrom}},
Process: dbmodel.Process{
ServiceName: "myservice",
Tags: []dbmodel.KeyValue{{Key: "num", Value: "16.66", Type: dbmodel.Float64Type}},
},
}, spans[0])
assert.Equal(t, 1, len(spanDataContainers))
assert.Equal(t,
ConvertedData{
Span: span,
Resource: resource,
InstrumentationLibrary: traces.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).InstrumentationLibrary(),
DBSpan: &dbmodel.Span{
TraceID: "30313233343536373839616263646566",
SpanID: "3031323334353637",
StartTime: 1000,
Duration: 1000,
OperationName: "root",
StartTimeMillis: 1,
Tags: []dbmodel.KeyValue{
{Key: "span.kind", Type: dbmodel.StringType, Value: "client"},
{Key: "status.code", Type: dbmodel.StringType, Value: "Cancelled"},
{Key: "error", Type: dbmodel.BoolType, Value: "true"},
{Key: "status.message", Type: dbmodel.StringType, Value: "messagetext"},
{Key: "foo", Type: dbmodel.BoolType, Value: "true"}},
Tag: map[string]interface{}{"toTagMap": "val"},
Logs: []dbmodel.Log{{Fields: []dbmodel.KeyValue{
{Key: "event", Value: "eventName", Type: dbmodel.StringType},
{Key: "foo", Value: "bar", Type: dbmodel.StringType}}, Timestamp: 500}},
References: []dbmodel.Reference{
{SpanID: "3031323334353637", TraceID: "30313233343536373839616263646566", RefType: dbmodel.ChildOf},
{SpanID: "3031323334353637", TraceID: "30313233343536373839616263646566", RefType: dbmodel.FollowsFrom}},
Process: dbmodel.Process{
ServiceName: "myservice",
Tags: []dbmodel.KeyValue{{Key: "num", Value: "16.66", Type: dbmodel.Float64Type}},
},
},
}, spanDataContainers[0])
}

func TestSpanEmptyRef(t *testing.T) {
Expand All @@ -166,24 +172,30 @@ func TestSpanEmptyRef(t *testing.T) {
span.SetEndTime(pdata.TimestampUnixNano(2000000))

c := &Translator{}
spans, err := c.ConvertSpans(traces)
spanDataContainers, err := c.ConvertSpans(traces)
require.NoError(t, err)
assert.Equal(t, 1, len(spans))
assert.Equal(t, &dbmodel.Span{
TraceID: "30313233343536373839616263646566",
SpanID: "3031323334353637",
StartTime: 1000,
Duration: 1000,
OperationName: "root",
StartTimeMillis: 1,
Tags: []dbmodel.KeyValue{}, // should not be nil
Logs: []dbmodel.Log{}, // should not be nil
References: []dbmodel.Reference{}, // should not be nil
Process: dbmodel.Process{
ServiceName: "myservice",
Tags: nil,
},
}, spans[0])
assert.Equal(t, 1, len(spanDataContainers))
assert.Equal(t,
ConvertedData{
Span: span,
Resource: traces.ResourceSpans().At(0).Resource(),
InstrumentationLibrary: traces.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).InstrumentationLibrary(),
DBSpan: &dbmodel.Span{
TraceID: "30313233343536373839616263646566",
SpanID: "3031323334353637",
StartTime: 1000,
Duration: 1000,
OperationName: "root",
StartTimeMillis: 1,
Tags: []dbmodel.KeyValue{}, // should not be nil
Logs: []dbmodel.Log{}, // should not be nil
References: []dbmodel.Reference{}, // should not be nil
Process: dbmodel.Process{
ServiceName: "myservice",
Tags: nil,
},
},
}, spanDataContainers[0])
}

func TestEmpty(t *testing.T) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import (
"github.com/jaegertracing/jaeger/plugin/storage/es"
)

// new creates Elasticsearch exporter/storage.
func new(ctx context.Context, config *Config, params component.ExporterCreateParams) (component.TraceExporter, error) {
// newExporter creates Elasticsearch exporter/storage.
func newExporter(ctx context.Context, config *Config, params component.ExporterCreateParams) (component.TraceExporter, error) {
esCfg := config.GetPrimary()
w, err := newEsSpanWriter(*esCfg, params.Logger, false, config.Name())
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (Factory) CreateTraceExporter(
if !ok {
return nil, fmt.Errorf("could not cast configuration to %s", TypeStr)
}
return new(ctx, esCfg, params)
return newExporter(ctx, esCfg, params)
}

// CreateMetricsExporter is not implemented.
Expand Down
80 changes: 54 additions & 26 deletions cmd/opentelemetry/app/exporter/elasticsearchexporter/spanstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/pkg/cache"
"github.com/jaegertracing/jaeger/pkg/es/config"
"github.com/jaegertracing/jaeger/pkg/multierror"
"github.com/jaegertracing/jaeger/plugin/storage/es/spanstore/dbmodel"
)

Expand Down Expand Up @@ -114,69 +115,75 @@ func (w *esSpanWriter) WriteTraces(ctx context.Context, traces pdata.Traces) (in
return w.writeSpans(ctx, spans)
}

func (w *esSpanWriter) writeSpans(ctx context.Context, spans []*dbmodel.Span) (int, error) {
func (w *esSpanWriter) writeSpans(ctx context.Context, spansData []esmodeltranslator.ConvertedData) (int, error) {
buffer := &bytes.Buffer{}
// mapping for bulk operation to span
var bulkOperations []bulkItem
var bulkItems []bulkItem
var errs []error
dropped := 0
for _, span := range spans {
data, err := json.Marshal(span)
for _, spanData := range spansData {
data, err := json.Marshal(spanData.DBSpan)
if err != nil {
errs = append(errs, err)
dropped++
continue
}
indexName := w.spanIndexName.IndexName(model.EpochMicrosecondsAsTime(span.StartTime))
bulkOperations = append(bulkOperations, bulkItem{span: span, isService: false})
indexName := w.spanIndexName.IndexName(model.EpochMicrosecondsAsTime(spanData.DBSpan.StartTime))
bulkItems = append(bulkItems, bulkItem{spanData: spanData, isService: false})
w.client.AddDataToBulkBuffer(buffer, data, indexName, spanTypeName)

if !w.isArchive {
storeService, err := w.writeService(span, buffer)
storeService, err := w.writeService(spanData.DBSpan, buffer)
if err != nil {
errs = append(errs, err)
// dropped is not increased since this is only service name, the span could be written well
continue
} else if storeService {
bulkOperations = append(bulkOperations, bulkItem{span: span, isService: true})
bulkItems = append(bulkItems, bulkItem{spanData: spanData, isService: true})
}
}
}
res, err := w.client.Bulk(ctx, bytes.NewReader(buffer.Bytes()))
res, err := w.client.Bulk(ctx, buffer)
if err != nil {
errs = append(errs, err)
return len(spansData), componenterror.CombineErrors(errs)
}
failedOperations, err := w.handleResponse(ctx, res, bulkItems)
if err != nil {
errs = append(errs, err)
return len(spans), componenterror.CombineErrors(errs)
}
droppedFromResponse := w.handleResponse(ctx, res, bulkOperations)
dropped += droppedFromResponse
dropped += len(failedOperations)
if len(failedOperations) > 0 {
return dropped, consumererror.PartialTracesError(componenterror.CombineErrors(errs), bulkItemsToTraces(failedOperations))
}
return dropped, componenterror.CombineErrors(errs)
}

func (w *esSpanWriter) handleResponse(ctx context.Context, blk *esclient.BulkResponse, operationToSpan []bulkItem) int {
numErrors := 0
// handleResponse processes blk response and returns spans that
func (w *esSpanWriter) handleResponse(ctx context.Context, blk *esclient.BulkResponse, bulkItems []bulkItem) ([]bulkItem, error) {
storedSpans := map[string]int64{}
notStoredSpans := map[string]int64{}
var failed []bulkItem
var errs []error
for i, d := range blk.Items {
bulkOp := operationToSpan[i]
bulkItem := bulkItems[i]
if d.Index.Status > 201 {
numErrors++
w.logger.Error("Part of the bulk request failed",
zap.String("result", d.Index.Result),
zap.String("error.reason", d.Index.Error.Reason),
zap.String("error.type", d.Index.Error.Type),
zap.String("error.cause.type", d.Index.Error.Cause.Type),
zap.String("error.cause.reason", d.Index.Error.Cause.Reason))
// TODO return an error or a struct that indicates which spans should be retried
// https://github.com/open-telemetry/opentelemetry-collector/issues/990
if !bulkOp.isService {
notStoredSpans[bulkOp.span.Process.ServiceName] = notStoredSpans[bulkOp.span.Process.ServiceName] + 1
errs = append(errs, fmt.Errorf("bulk request failed, reason %v, result: %v", d.Index.Error.Reason, d.Index.Result))
if !bulkItem.isService {
failed = append(failed, bulkItem)
notStoredSpans[bulkItem.spanData.DBSpan.Process.ServiceName] = notStoredSpans[bulkItem.spanData.DBSpan.Process.ServiceName] + 1
}
} else {
// passed
if !bulkOp.isService {
storedSpans[bulkOp.span.Process.ServiceName] = storedSpans[bulkOp.span.Process.ServiceName] + 1
if !bulkItem.isService {
storedSpans[bulkItem.spanData.DBSpan.Process.ServiceName] = storedSpans[bulkItem.spanData.DBSpan.Process.ServiceName] + 1
} else {
cacheKey := hashCode(bulkOp.span.Process.ServiceName, bulkOp.span.OperationName)
cacheKey := hashCode(bulkItem.spanData.DBSpan.Process.ServiceName, bulkItem.spanData.DBSpan.OperationName)
w.serviceCache.Put(cacheKey, cacheKey)
}
}
Expand All @@ -191,7 +198,7 @@ func (w *esSpanWriter) handleResponse(ctx context.Context, blk *esclient.BulkRes
tag.Insert(storagemetrics.TagServiceName(), k), w.nameTag)
stats.Record(ctx, storagemetrics.StatSpansStoredCount().M(v))
}
return numErrors
return failed, multierror.Wrap(errs)
}

func (w *esSpanWriter) writeService(span *dbmodel.Span, buffer *bytes.Buffer) (bool, error) {
Expand Down Expand Up @@ -221,11 +228,32 @@ func hashCode(serviceName, operationName string) string {

type bulkItem struct {
// span associated with the bulk operation
span *dbmodel.Span
spanData esmodeltranslator.ConvertedData
// isService indicates that this bulk operation is for service index
isService bool
}

func (w *esSpanWriter) esClientVersion() int {
return w.client.MajorVersion()
}

func bulkItemsToTraces(bulkItems []bulkItem) pdata.Traces {
traces := pdata.NewTraces()
traces.ResourceSpans().Resize(len(bulkItems))
for i, op := range bulkItems {
spanData := op.spanData
rss := traces.ResourceSpans().At(i)
if !spanData.Resource.IsNil() {
rss.Resource().InitEmpty()
rss.Resource().Attributes().InitFromAttributeMap(spanData.Resource.Attributes())
}
rss.InstrumentationLibrarySpans().Resize(1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to above, you could keep some kind of map of Resource/InstrumentationLibrary => InstrumentationLibrarySpans which you'd then check to see if you had an InstrumentationLibrarySpans to append to or if you should just make a new one.

Might be faster to just make one ResourceSpans/InstrumentationLibrarySpans for each bulkitem and forget trying to reuse them. It's hard to say if spans in a batch will likely come from the same trace and this will be worthwhile or not. Probably depends on which processors are configured in the pipeline.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We cannot expect that spans in a batch share the same resource&instrumentation.

Might be faster to just make one ResourceSpans/InstrumentationLibrarySpans for each bulkitem and forget trying to reuse them. It's hard to say if spans in a batch will likely come from the same trace and this will be worthwhile or not. Probably depends on which processors are configured in the pipeline.

Keeping the resource/instrumentation -> spans would be more memory efficient. However, I would ho with this simple approach for now. The instrumentation library has two string fields and resource has a map.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed +1

ispans := rss.InstrumentationLibrarySpans().At(0)
ispans.InitEmpty()
if !spanData.InstrumentationLibrary.IsNil() {
spanData.InstrumentationLibrary.CopyTo(ispans.InstrumentationLibrary())
}
ispans.Spans().Append(&spanData.Span)
}
return traces
}
Loading