From 200f94ca6ea84dc3ec23321ff3148d2a6a4b121b Mon Sep 17 00:00:00 2001 From: moh-osman3 Date: Mon, 6 May 2024 13:58:00 -0400 Subject: [PATCH 1/6] check incoming context as fallback --- .../batch_processor.go | 69 ++++++++++++++++++- 1 file changed, 68 insertions(+), 1 deletion(-) diff --git a/collector/processor/concurrentbatchprocessor/batch_processor.go b/collector/processor/concurrentbatchprocessor/batch_processor.go index 7cf1c647b7..1cb18aa82e 100644 --- a/collector/processor/concurrentbatchprocessor/batch_processor.go +++ b/collector/processor/concurrentbatchprocessor/batch_processor.go @@ -19,6 +19,7 @@ import ( "go.uber.org/multierr" "go.uber.org/zap" "golang.org/x/sync/semaphore" + "google.golang.org/grpc/metadata" "go.opentelemetry.io/collector/client" "go.opentelemetry.io/collector/component" @@ -363,7 +364,7 @@ func (b *shard) sendItems(trigger trigger) { var err error var parent context.Context - isSingleCtx := allSame(contexts) + isSingleCtx := allSame(contexts) || allSameMetadata(contexts) // For SDK's we can reuse the parent context because there is // only one possible parent. This is not the case @@ -426,6 +427,41 @@ func allSame(x []context.Context) bool { return true } +func allSameMetadata(x []context.Context) bool { + firstMD, _ := metadata.FromIncomingContext(x[0]) + for idx := range x[1:] { + md, _ := metadata.FromIncomingContext(x[idx]) + if !equalMaps(md, firstMD) { + return false + } + } + return true +} + +func equalMaps(m1, m2 map[string][]string) bool { + if len(m1) != len(m2) { + return false + } + + for key, val1 := range m1 { + if val2, ok := m2[key]; ok { + if len(val1) != len(val2) { + return false + } + + for idx := range val1 { + if val1[idx] != val2[idx] { + return false + } + } + } else { // key not found + return false + } + } + + return true +} + func (bp *batchProcessor) countAcquire(ctx context.Context, bytes int64) error { err := bp.sem.Acquire(ctx, bytes) if err == nil && bp.telemetry.batchInFlightBytes != nil { @@ -566,6 +602,13 @@ func (mb *multiShardBatcher) consume(ctx context.Context, data any) error { attrs = append(attrs, attribute.StringSlice(k, vs)) } } + + if len(md) != len(mb.metadataKeys) { + // Did not find all metadata keys from client.Info metadata. + // Check if they exist in the incoming context. + md, attrs = mb.getMetadataKeysFromIncomingContext(ctx) + } + aset := attribute.NewSet(attrs...) b, ok := mb.batchers.Load(aset) @@ -589,6 +632,30 @@ func (mb *multiShardBatcher) consume(ctx context.Context, data any) error { return b.(*shard).consumeAndWait(ctx, data) } +func (mb *multiShardBatcher) getMetadataKeysFromIncomingContext(ctx context.Context) (map[string][]string, []attribute.KeyValue) { + headers, ok := metadata.FromIncomingContext(ctx) + if !ok || len(headers) == 0 { + return nil, nil + } + + md := map[string][]string{} + var attrs []attribute.KeyValue + for _, k := range mb.metadataKeys { + // Lookup the value in the incoming metadata, copy it + // into the outgoing metadata, and create a unique + // value for the attributeSet. + vs := headers[strings.ToLower(k)] + md[k] = vs + if len(vs) == 1 { + attrs = append(attrs, attribute.String(k, vs[0])) + } else { + attrs = append(attrs, attribute.StringSlice(k, vs)) + } + } + + return md, attrs +} + func recordBatchError(err error) error { return fmt.Errorf("Batch contained errors: %w", err) } From c88050dbfe909be59118cbfee2d188e5ffa7bbd3 Mon Sep 17 00:00:00 2001 From: moh-osman3 Date: Tue, 7 May 2024 09:18:56 -0400 Subject: [PATCH 2/6] simplify use shard's exportCtx --- collector/examples/bridge/edge-collector.yaml | 23 +++++++++++---- .../batch_processor.go | 28 +++++++++---------- 2 files changed, 31 insertions(+), 20 deletions(-) diff --git a/collector/examples/bridge/edge-collector.yaml b/collector/examples/bridge/edge-collector.yaml index db9922d331..a3c627cf43 100644 --- a/collector/examples/bridge/edge-collector.yaml +++ b/collector/examples/bridge/edge-collector.yaml @@ -3,12 +3,29 @@ receivers: # it uses port 4317, the standard port for OTLP/gRPC. # There are no required configuration fields. otelarrow/standard: + protocols: + grpc: + endpoint: 127.0.0.1:8100 + max_recv_msg_size_mib: 3000 processors: # The batch processor will benefit pipelines with small export sizes. concurrentbatch: exporters: + otelarrow/stdout: + endpoint: localhost:4317 + arrow: + disabled: false + disable_downgrade: true + tls: + insecure: true + headers: + "lightstep-org-id": "2" + "lightstep-org-name": "LightStep" + "lightstep-project-id": "768070" + "lightstep-project-name": "dev-mohosman" + # lightstep-access-token: "${LIGHTSTEP_ACCESS_TOKEN}" # otelarrow/arrow is an OTel-Arrow exporter. otelarrow/arrow: # For the sample configuration, the other side of the bridge @@ -36,11 +53,7 @@ service: traces: receivers: [otelarrow/standard] processors: [concurrentbatch] - exporters: [otelarrow/arrow, debug] - metrics: - receivers: [otelarrow/standard] - processors: [concurrentbatch] - exporters: [otelarrow/arrow, debug] + exporters: [otelarrow/stdout, debug] telemetry: resource: diff --git a/collector/processor/concurrentbatchprocessor/batch_processor.go b/collector/processor/concurrentbatchprocessor/batch_processor.go index 1cb18aa82e..70dccf51bd 100644 --- a/collector/processor/concurrentbatchprocessor/batch_processor.go +++ b/collector/processor/concurrentbatchprocessor/batch_processor.go @@ -201,9 +201,15 @@ func newBatchProcessor(set processor.CreateSettings, cfg *Config, batchFunc func // newShard gets or creates a batcher corresponding with attrs. func (bp *batchProcessor) newShard(md map[string][]string) *shard { - exportCtx := client.NewContext(context.Background(), client.Info{ + // client.NewContext adds the metadata to client.Info object. In some cases the md + // keys were found in the incoming context and this incoming context may be used + // by downstream collector components that don't know about or use the client.Info object. + incoming := metadata.NewIncomingContext(context.Background(), md) + + exportCtx := client.NewContext(incoming, client.Info{ Metadata: client.NewMetadata(md), }) + b := &shard{ processor: bp, newItem: make(chan dataItem, runtime.NumCPU()), @@ -364,7 +370,7 @@ func (b *shard) sendItems(trigger trigger) { var err error var parent context.Context - isSingleCtx := allSame(contexts) || allSameMetadata(contexts) + isSingleCtx := allSame(contexts) // For SDK's we can reuse the parent context because there is // only one possible parent. This is not the case @@ -375,8 +381,8 @@ func (b *shard) sendItems(trigger trigger) { } else { var sp trace.Span links := buildLinks(contexts) - parent = context.Background() - parent, sp = b.tracer.Tracer("otel").Start(context.Background(), "concurrent_batch_processor/export", trace.WithLinks(links...)) + + parent, sp = b.tracer.Tracer("otel").Start(b.exportCtx, "concurrent_batch_processor/export", trace.WithLinks(links...)) sp.End() } err = b.batch.export(parent, req) @@ -427,17 +433,6 @@ func allSame(x []context.Context) bool { return true } -func allSameMetadata(x []context.Context) bool { - firstMD, _ := metadata.FromIncomingContext(x[0]) - for idx := range x[1:] { - md, _ := metadata.FromIncomingContext(x[idx]) - if !equalMaps(md, firstMD) { - return false - } - } - return true -} - func equalMaps(m1, m2 map[string][]string) bool { if len(m1) != len(m2) { return false @@ -595,6 +590,9 @@ func (mb *multiShardBatcher) consume(ctx context.Context, data any) error { // into the outgoing metadata, and create a unique // value for the attributeSet. vs := info.Metadata.Get(k) + if len(vs) == 0 { + continue + } md[k] = vs if len(vs) == 1 { attrs = append(attrs, attribute.String(k, vs[0])) From 4f59f0e61808f3cc966b04e774f8453aba5e65e4 Mon Sep 17 00:00:00 2001 From: moh-osman3 Date: Tue, 7 May 2024 10:04:57 -0400 Subject: [PATCH 3/6] simpler is better + tests --- collector/examples/bridge/edge-collector.yaml | 23 +++--------- .../batch_processor.go | 37 +++---------------- .../batch_processor_test.go | 21 +++++++++++ 3 files changed, 31 insertions(+), 50 deletions(-) diff --git a/collector/examples/bridge/edge-collector.yaml b/collector/examples/bridge/edge-collector.yaml index a3c627cf43..db9922d331 100644 --- a/collector/examples/bridge/edge-collector.yaml +++ b/collector/examples/bridge/edge-collector.yaml @@ -3,29 +3,12 @@ receivers: # it uses port 4317, the standard port for OTLP/gRPC. # There are no required configuration fields. otelarrow/standard: - protocols: - grpc: - endpoint: 127.0.0.1:8100 - max_recv_msg_size_mib: 3000 processors: # The batch processor will benefit pipelines with small export sizes. concurrentbatch: exporters: - otelarrow/stdout: - endpoint: localhost:4317 - arrow: - disabled: false - disable_downgrade: true - tls: - insecure: true - headers: - "lightstep-org-id": "2" - "lightstep-org-name": "LightStep" - "lightstep-project-id": "768070" - "lightstep-project-name": "dev-mohosman" - # lightstep-access-token: "${LIGHTSTEP_ACCESS_TOKEN}" # otelarrow/arrow is an OTel-Arrow exporter. otelarrow/arrow: # For the sample configuration, the other side of the bridge @@ -53,7 +36,11 @@ service: traces: receivers: [otelarrow/standard] processors: [concurrentbatch] - exporters: [otelarrow/stdout, debug] + exporters: [otelarrow/arrow, debug] + metrics: + receivers: [otelarrow/standard] + processors: [concurrentbatch] + exporters: [otelarrow/arrow, debug] telemetry: resource: diff --git a/collector/processor/concurrentbatchprocessor/batch_processor.go b/collector/processor/concurrentbatchprocessor/batch_processor.go index 70dccf51bd..6b0332c846 100644 --- a/collector/processor/concurrentbatchprocessor/batch_processor.go +++ b/collector/processor/concurrentbatchprocessor/batch_processor.go @@ -585,13 +585,16 @@ func (mb *multiShardBatcher) consume(ctx context.Context, data any) error { info := client.FromContext(ctx) md := map[string][]string{} var attrs []attribute.KeyValue + + incomingHeaders, headersFound := metadata.FromIncomingContext(ctx) for _, k := range mb.metadataKeys { // Lookup the value in the incoming metadata, copy it // into the outgoing metadata, and create a unique // value for the attributeSet. vs := info.Metadata.Get(k) - if len(vs) == 0 { - continue + if len(vs) == 0 && headersFound { + // not found in client.Info so try the metadata directly from incoming context. + vs = incomingHeaders[strings.ToLower(k)] } md[k] = vs if len(vs) == 1 { @@ -601,12 +604,6 @@ func (mb *multiShardBatcher) consume(ctx context.Context, data any) error { } } - if len(md) != len(mb.metadataKeys) { - // Did not find all metadata keys from client.Info metadata. - // Check if they exist in the incoming context. - md, attrs = mb.getMetadataKeysFromIncomingContext(ctx) - } - aset := attribute.NewSet(attrs...) b, ok := mb.batchers.Load(aset) @@ -630,30 +627,6 @@ func (mb *multiShardBatcher) consume(ctx context.Context, data any) error { return b.(*shard).consumeAndWait(ctx, data) } -func (mb *multiShardBatcher) getMetadataKeysFromIncomingContext(ctx context.Context) (map[string][]string, []attribute.KeyValue) { - headers, ok := metadata.FromIncomingContext(ctx) - if !ok || len(headers) == 0 { - return nil, nil - } - - md := map[string][]string{} - var attrs []attribute.KeyValue - for _, k := range mb.metadataKeys { - // Lookup the value in the incoming metadata, copy it - // into the outgoing metadata, and create a unique - // value for the attributeSet. - vs := headers[strings.ToLower(k)] - md[k] = vs - if len(vs) == 1 { - attrs = append(attrs, attribute.String(k, vs[0])) - } else { - attrs = append(attrs, attribute.StringSlice(k, vs)) - } - } - - return md, attrs -} - func recordBatchError(err error) error { return fmt.Errorf("Batch contained errors: %w", err) } diff --git a/collector/processor/concurrentbatchprocessor/batch_processor_test.go b/collector/processor/concurrentbatchprocessor/batch_processor_test.go index 8abd3042cb..1b8693d34c 100644 --- a/collector/processor/concurrentbatchprocessor/batch_processor_test.go +++ b/collector/processor/concurrentbatchprocessor/batch_processor_test.go @@ -16,6 +16,7 @@ import ( "github.com/stretchr/testify/require" "go.uber.org/zap" "golang.org/x/sync/semaphore" + "google.golang.org/grpc/metadata" "github.com/open-telemetry/otel-arrow/collector/processor/concurrentbatchprocessor/testdata" "go.opentelemetry.io/collector/client" @@ -1407,8 +1408,16 @@ func formatTwo(first, second []string) string { func (mts *metadataTracesSink) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { info := client.FromContext(ctx) + incomingHeaders, headersFound := metadata.FromIncomingContext(ctx) token1 := info.Metadata.Get("token1") + if len(token1) == 0 && headersFound { + token1 = incomingHeaders["token1"] + } token2 := info.Metadata.Get("token2") + if len(token2) == 0 && headersFound { + token2 = incomingHeaders["token2"] + } + mts.lock.Lock() defer mts.lock.Unlock() @@ -1434,6 +1443,10 @@ func TestBatchProcessorSpansBatchedByMetadata(t *testing.T) { require.NoError(t, err) require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost())) + incoming := metadata.NewIncomingContext(context.Background(), map[string][]string{ + "token1": {"incoming1"}, + "token2": {"incoming2"}, + }) bg := context.Background() callCtxs := []context.Context{ client.NewContext(bg, client.Info{ @@ -1463,6 +1476,14 @@ func TestBatchProcessorSpansBatchedByMetadata(t *testing.T) { "token4": {"n/a", "d/c"}, }), }), + // empty client.Info with empty metadata.FromIncomingContext. + client.NewContext(bg, client.Info{ + Metadata: client.NewMetadata(map[string][]string{}), + }), + // empty client.Info with existing metadata.FromIncomingContext. + client.NewContext(incoming, client.Info{ + Metadata: client.NewMetadata(map[string][]string{}), + }), } expectByContext := make([]int, len(callCtxs)) From f925e98bf51e55239a0bd9ffdcfab66efd35ecfa Mon Sep 17 00:00:00 2001 From: moh-osman3 Date: Tue, 7 May 2024 10:10:04 -0400 Subject: [PATCH 4/6] remove unneeded function --- .../batch_processor.go | 24 ------------------- 1 file changed, 24 deletions(-) diff --git a/collector/processor/concurrentbatchprocessor/batch_processor.go b/collector/processor/concurrentbatchprocessor/batch_processor.go index 6b0332c846..3aba632b2d 100644 --- a/collector/processor/concurrentbatchprocessor/batch_processor.go +++ b/collector/processor/concurrentbatchprocessor/batch_processor.go @@ -433,30 +433,6 @@ func allSame(x []context.Context) bool { return true } -func equalMaps(m1, m2 map[string][]string) bool { - if len(m1) != len(m2) { - return false - } - - for key, val1 := range m1 { - if val2, ok := m2[key]; ok { - if len(val1) != len(val2) { - return false - } - - for idx := range val1 { - if val1[idx] != val2[idx] { - return false - } - } - } else { // key not found - return false - } - } - - return true -} - func (bp *batchProcessor) countAcquire(ctx context.Context, bytes int64) error { err := bp.sem.Acquire(ctx, bytes) if err == nil && bp.telemetry.batchInFlightBytes != nil { From 25dd516d8ba240ac804c95fc7f0621e4ecb375c9 Mon Sep 17 00:00:00 2001 From: moh-osman3 Date: Tue, 7 May 2024 18:01:05 -0400 Subject: [PATCH 5/6] revert --- .../batch_processor.go | 16 +------------- .../batch_processor_test.go | 21 ------------------- 2 files changed, 1 insertion(+), 36 deletions(-) diff --git a/collector/processor/concurrentbatchprocessor/batch_processor.go b/collector/processor/concurrentbatchprocessor/batch_processor.go index 3aba632b2d..8173e89f85 100644 --- a/collector/processor/concurrentbatchprocessor/batch_processor.go +++ b/collector/processor/concurrentbatchprocessor/batch_processor.go @@ -19,7 +19,6 @@ import ( "go.uber.org/multierr" "go.uber.org/zap" "golang.org/x/sync/semaphore" - "google.golang.org/grpc/metadata" "go.opentelemetry.io/collector/client" "go.opentelemetry.io/collector/component" @@ -201,15 +200,9 @@ func newBatchProcessor(set processor.CreateSettings, cfg *Config, batchFunc func // newShard gets or creates a batcher corresponding with attrs. func (bp *batchProcessor) newShard(md map[string][]string) *shard { - // client.NewContext adds the metadata to client.Info object. In some cases the md - // keys were found in the incoming context and this incoming context may be used - // by downstream collector components that don't know about or use the client.Info object. - incoming := metadata.NewIncomingContext(context.Background(), md) - - exportCtx := client.NewContext(incoming, client.Info{ + exportCtx := client.NewContext(context.Background(), client.Info{ Metadata: client.NewMetadata(md), }) - b := &shard{ processor: bp, newItem: make(chan dataItem, runtime.NumCPU()), @@ -561,17 +554,11 @@ func (mb *multiShardBatcher) consume(ctx context.Context, data any) error { info := client.FromContext(ctx) md := map[string][]string{} var attrs []attribute.KeyValue - - incomingHeaders, headersFound := metadata.FromIncomingContext(ctx) for _, k := range mb.metadataKeys { // Lookup the value in the incoming metadata, copy it // into the outgoing metadata, and create a unique // value for the attributeSet. vs := info.Metadata.Get(k) - if len(vs) == 0 && headersFound { - // not found in client.Info so try the metadata directly from incoming context. - vs = incomingHeaders[strings.ToLower(k)] - } md[k] = vs if len(vs) == 1 { attrs = append(attrs, attribute.String(k, vs[0])) @@ -579,7 +566,6 @@ func (mb *multiShardBatcher) consume(ctx context.Context, data any) error { attrs = append(attrs, attribute.StringSlice(k, vs)) } } - aset := attribute.NewSet(attrs...) b, ok := mb.batchers.Load(aset) diff --git a/collector/processor/concurrentbatchprocessor/batch_processor_test.go b/collector/processor/concurrentbatchprocessor/batch_processor_test.go index 1b8693d34c..8abd3042cb 100644 --- a/collector/processor/concurrentbatchprocessor/batch_processor_test.go +++ b/collector/processor/concurrentbatchprocessor/batch_processor_test.go @@ -16,7 +16,6 @@ import ( "github.com/stretchr/testify/require" "go.uber.org/zap" "golang.org/x/sync/semaphore" - "google.golang.org/grpc/metadata" "github.com/open-telemetry/otel-arrow/collector/processor/concurrentbatchprocessor/testdata" "go.opentelemetry.io/collector/client" @@ -1408,16 +1407,8 @@ func formatTwo(first, second []string) string { func (mts *metadataTracesSink) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { info := client.FromContext(ctx) - incomingHeaders, headersFound := metadata.FromIncomingContext(ctx) token1 := info.Metadata.Get("token1") - if len(token1) == 0 && headersFound { - token1 = incomingHeaders["token1"] - } token2 := info.Metadata.Get("token2") - if len(token2) == 0 && headersFound { - token2 = incomingHeaders["token2"] - } - mts.lock.Lock() defer mts.lock.Unlock() @@ -1443,10 +1434,6 @@ func TestBatchProcessorSpansBatchedByMetadata(t *testing.T) { require.NoError(t, err) require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost())) - incoming := metadata.NewIncomingContext(context.Background(), map[string][]string{ - "token1": {"incoming1"}, - "token2": {"incoming2"}, - }) bg := context.Background() callCtxs := []context.Context{ client.NewContext(bg, client.Info{ @@ -1476,14 +1463,6 @@ func TestBatchProcessorSpansBatchedByMetadata(t *testing.T) { "token4": {"n/a", "d/c"}, }), }), - // empty client.Info with empty metadata.FromIncomingContext. - client.NewContext(bg, client.Info{ - Metadata: client.NewMetadata(map[string][]string{}), - }), - // empty client.Info with existing metadata.FromIncomingContext. - client.NewContext(incoming, client.Info{ - Metadata: client.NewMetadata(map[string][]string{}), - }), } expectByContext := make([]int, len(callCtxs)) From 08acae5a9d3d87eccdc40e9439f0ab7a5b6f0a01 Mon Sep 17 00:00:00 2001 From: moh-osman3 Date: Tue, 7 May 2024 18:07:18 -0400 Subject: [PATCH 6/6] rm --- collector/processor/concurrentbatchprocessor/batch_processor.go | 1 - 1 file changed, 1 deletion(-) diff --git a/collector/processor/concurrentbatchprocessor/batch_processor.go b/collector/processor/concurrentbatchprocessor/batch_processor.go index 8173e89f85..5a018596d5 100644 --- a/collector/processor/concurrentbatchprocessor/batch_processor.go +++ b/collector/processor/concurrentbatchprocessor/batch_processor.go @@ -374,7 +374,6 @@ func (b *shard) sendItems(trigger trigger) { } else { var sp trace.Span links := buildLinks(contexts) - parent, sp = b.tracer.Tracer("otel").Start(b.exportCtx, "concurrent_batch_processor/export", trace.WithLinks(links...)) sp.End() }