Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pass Context through span processors #6534

Merged
merged 2 commits into from
Jan 13, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/collector/app/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ func TestAggregator(t *testing.T) {
},
},
}
_, err := c.spanProcessor.ProcessSpans(processor.SpansV1{
_, err := c.spanProcessor.ProcessSpans(context.Background(), processor.SpansV1{
Spans: spans,
Details: processor.Details{
SpanFormat: processor.JaegerSpanFormat,
Expand Down
2 changes: 1 addition & 1 deletion cmd/collector/app/handler/grpc_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (c *batchConsumer) consume(ctx context.Context, batch *model.Batch) error {
span.Process = batch.Process
}
}
_, err = c.spanProcessor.ProcessSpans(processor.SpansV1{
_, err = c.spanProcessor.ProcessSpans(ctx, processor.SpansV1{
Spans: batch.Spans,
Details: processor.Details{
InboundTransport: c.spanOptions.InboundTransport,
Expand Down
4 changes: 3 additions & 1 deletion cmd/collector/app/handler/grpc_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
)

var _ processor.SpanProcessor = (*mockSpanProcessor)(nil)

type mockSpanProcessor struct {
expectedError error
mux sync.Mutex
Expand All @@ -34,7 +36,7 @@ type mockSpanProcessor struct {
spanFormat processor.SpanFormat
}

func (p *mockSpanProcessor) ProcessSpans(batch processor.Batch) ([]bool, error) {
func (p *mockSpanProcessor) ProcessSpans(_ context.Context, batch processor.Batch) ([]bool, error) {
p.mux.Lock()
defer p.mux.Unlock()
batch.GetSpans(func(spans []*model.Span) {
Expand Down
2 changes: 1 addition & 1 deletion cmd/collector/app/handler/http_thrift_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (aH *APIHandler) SaveSpan(w http.ResponseWriter, r *http.Request) {
}
batches := []*tJaeger.Batch{batch}
opts := SubmitBatchOptions{InboundTransport: processor.HTTPTransport}
if _, err = aH.jaegerBatchesHandler.SubmitBatches(batches, opts); err != nil {
if _, err = aH.jaegerBatchesHandler.SubmitBatches(r.Context(), batches, opts); err != nil {
http.Error(w, fmt.Sprintf("Cannot submit Jaeger batch: %v", err), http.StatusInternalServerError)
return
}
Expand Down
7 changes: 5 additions & 2 deletions cmd/collector/app/handler/http_thrift_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,18 @@ import (
"github.com/jaegertracing/jaeger/thrift-gen/jaeger"
)

var httpClient = &http.Client{Timeout: 2 * time.Second}
var (
httpClient = &http.Client{Timeout: 2 * time.Second}
_ JaegerBatchesHandler = (*mockJaegerHandler)(nil)
)

type mockJaegerHandler struct {
err error
mux sync.Mutex
batches []*jaeger.Batch
}

func (p *mockJaegerHandler) SubmitBatches(batches []*jaeger.Batch, _ SubmitBatchOptions) ([]*jaeger.BatchSubmitResponse, error) {
func (p *mockJaegerHandler) SubmitBatches(_ context.Context, batches []*jaeger.Batch, _ SubmitBatchOptions) ([]*jaeger.BatchSubmitResponse, error) {
p.mux.Lock()
defer p.mux.Unlock()
p.batches = append(p.batches, batches...)
Expand Down
14 changes: 8 additions & 6 deletions cmd/collector/app/handler/thrift_span_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
package handler

import (
"context"

"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/collector/app/processor"
Expand All @@ -24,13 +26,13 @@ type SubmitBatchOptions struct {
// ZipkinSpansHandler consumes and handles zipkin spans
type ZipkinSpansHandler interface {
// SubmitZipkinBatch records a batch of spans in Zipkin Thrift format
SubmitZipkinBatch(spans []*zipkincore.Span, options SubmitBatchOptions) ([]*zipkincore.Response, error)
SubmitZipkinBatch(ctx context.Context, spans []*zipkincore.Span, options SubmitBatchOptions) ([]*zipkincore.Response, error)
}

// JaegerBatchesHandler consumes and handles Jaeger batches
type JaegerBatchesHandler interface {
// SubmitBatches records a batch of spans in Jaeger Thrift format
SubmitBatches(batches []*jaeger.Batch, options SubmitBatchOptions) ([]*jaeger.BatchSubmitResponse, error)
SubmitBatches(ctx context.Context, batches []*jaeger.Batch, options SubmitBatchOptions) ([]*jaeger.BatchSubmitResponse, error)
}

type jaegerBatchesHandler struct {
Expand All @@ -46,15 +48,15 @@ func NewJaegerSpanHandler(logger *zap.Logger, modelProcessor processor.SpanProce
}
}

func (jbh *jaegerBatchesHandler) SubmitBatches(batches []*jaeger.Batch, options SubmitBatchOptions) ([]*jaeger.BatchSubmitResponse, error) {
func (jbh *jaegerBatchesHandler) SubmitBatches(ctx context.Context, batches []*jaeger.Batch, options SubmitBatchOptions) ([]*jaeger.BatchSubmitResponse, error) {
responses := make([]*jaeger.BatchSubmitResponse, 0, len(batches))
for _, batch := range batches {
mSpans := make([]*model.Span, 0, len(batch.Spans))
for _, span := range batch.Spans {
mSpan := jConv.ToDomainSpan(span, batch.Process)
mSpans = append(mSpans, mSpan)
}
oks, err := jbh.modelProcessor.ProcessSpans(processor.SpansV1{
oks, err := jbh.modelProcessor.ProcessSpans(ctx, processor.SpansV1{
Spans: mSpans,
Details: processor.Details{
InboundTransport: options.InboundTransport,
Expand Down Expand Up @@ -98,7 +100,7 @@ func NewZipkinSpanHandler(logger *zap.Logger, modelHandler processor.SpanProcess
}

// SubmitZipkinBatch records a batch of spans already in Zipkin Thrift format.
func (h *zipkinSpanHandler) SubmitZipkinBatch(spans []*zipkincore.Span, options SubmitBatchOptions) ([]*zipkincore.Response, error) {
func (h *zipkinSpanHandler) SubmitZipkinBatch(ctx context.Context, spans []*zipkincore.Span, options SubmitBatchOptions) ([]*zipkincore.Response, error) {
mSpans := make([]*model.Span, 0, len(spans))
convCount := make([]int, len(spans))
for i, span := range spans {
Expand All @@ -108,7 +110,7 @@ func (h *zipkinSpanHandler) SubmitZipkinBatch(spans []*zipkincore.Span, options
convCount[i] = len(converted)
mSpans = append(mSpans, converted...)
}
bools, err := h.modelProcessor.ProcessSpans(processor.SpansV1{
bools, err := h.modelProcessor.ProcessSpans(ctx, processor.SpansV1{
Spans: mSpans,
Details: processor.Details{
InboundTransport: options.InboundTransport,
Expand Down
12 changes: 8 additions & 4 deletions cmd/collector/app/handler/thrift_span_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package handler

import (
"context"
"encoding/json"
"errors"
"os"
Expand Down Expand Up @@ -36,7 +37,7 @@ func TestJaegerSpanHandler(t *testing.T) {
for _, tc := range testChunks {
logger := zap.NewNop()
h := NewJaegerSpanHandler(logger, &shouldIErrorProcessor{tc.expectedErr != nil})
res, err := h.SubmitBatches([]*jaeger.Batch{
res, err := h.SubmitBatches(context.Background(), []*jaeger.Batch{
{
Process: &jaeger.Process{ServiceName: "someServiceName"},
Spans: []*jaeger.Span{{SpanId: 21345}},
Expand All @@ -57,9 +58,12 @@ type shouldIErrorProcessor struct {
shouldError bool
}

var errTestError = errors.New("Whoops")
var (
_ processor.SpanProcessor = (*shouldIErrorProcessor)(nil)
errTestError = errors.New("Whoops")
)

func (s *shouldIErrorProcessor) ProcessSpans(batch processor.Batch) ([]bool, error) {
func (s *shouldIErrorProcessor) ProcessSpans(_ context.Context, batch processor.Batch) ([]bool, error) {
if s.shouldError {
return nil, errTestError
}
Expand Down Expand Up @@ -121,7 +125,7 @@ func TestZipkinSpanHandler(t *testing.T) {
},
}
}
res, err := h.SubmitZipkinBatch(spans, SubmitBatchOptions{})
res, err := h.SubmitZipkinBatch(context.Background(), spans, SubmitBatchOptions{})
if tc.expectedErr != nil {
assert.Nil(t, res)
assert.Equal(t, tc.expectedErr, err)
Expand Down
3 changes: 2 additions & 1 deletion cmd/collector/app/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package processor

import (
"context"
"io"

"go.opentelemetry.io/collector/pdata/ptrace"
Expand All @@ -29,7 +30,7 @@ type Batch interface {
// SpanProcessor handles spans
type SpanProcessor interface {
// ProcessSpans processes spans and return with either a list of true/false success or an error
ProcessSpans(spans Batch) ([]bool, error)
ProcessSpans(ctx context.Context, spans Batch) ([]bool, error)
io.Closer
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/collector/app/server/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,6 @@ func (*mockSpanProcessor) Close() error {
return nil
}

func (*mockSpanProcessor) ProcessSpans(_ processor.Batch) ([]bool, error) {
func (*mockSpanProcessor) ProcessSpans(_ context.Context, _ processor.Batch) ([]bool, error) {
return []bool{}, nil
}
10 changes: 6 additions & 4 deletions cmd/collector/app/span_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ const (
minRequiredChange = 1.2
)

var _ processor.SpanProcessor = (*spanProcessor)(nil)

type spanProcessor struct {
queue *queue.BoundedQueue[queueItem]
otelExporter exporter.Traces
Expand Down Expand Up @@ -240,18 +242,18 @@ func (sp *spanProcessor) countSpansInQueue(span *model.Span, _ string /* tenant
}

// TODO pass Context
func (sp *spanProcessor) ProcessSpans(batch processor.Batch) ([]bool, error) {
func (sp *spanProcessor) ProcessSpans(ctx context.Context, batch processor.Batch) ([]bool, error) {
// We call preProcessSpans on a batch, it's responsibility of implementation
// to understand v1/v2 distinction. Jaeger itself does not use pre-processors.
sp.preProcessSpans(batch)

var batchOks []bool
var batchErr error
batch.GetSpans(func(spans []*model.Span) {
batchOks, batchErr = sp.processSpans(batch, spans)
batchOks, batchErr = sp.processSpans(ctx, batch, spans)
}, func(traces ptrace.Traces) {
// TODO verify if the context will survive all the way to the consumer threads.
ctx := tenancy.WithTenant(context.Background(), batch.GetTenant())
ctx := tenancy.WithTenant(ctx, batch.GetTenant())

// the exporter will eventually call pushTraces from consumer threads.
if err := sp.otelExporter.ConsumeTraces(ctx, traces); err != nil {
Expand All @@ -266,7 +268,7 @@ func (sp *spanProcessor) ProcessSpans(batch processor.Batch) ([]bool, error) {
return batchOks, batchErr
}

func (sp *spanProcessor) processSpans(batch processor.Batch, spans []*model.Span) ([]bool, error) {
func (sp *spanProcessor) processSpans(_ context.Context, batch processor.Batch, spans []*model.Span) ([]bool, error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just curious - why are we passing the context in here if its not being used?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better have it and not need it

sp.metrics.BatchSize.Update(int64(len(spans)))
retMe := make([]bool, len(spans))

Expand Down
21 changes: 11 additions & 10 deletions cmd/collector/app/span_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,13 @@ func TestBySvcMetrics(t *testing.T) {
span := makeZipkinSpan(test.serviceName, test.rootSpan, test.debug)
sanitizer := zipkinsanitizer.NewChainedSanitizer(zipkinsanitizer.NewStandardSanitizers()...)
zHandler := handler.NewZipkinSpanHandler(logger, sp, sanitizer)
zHandler.SubmitZipkinBatch([]*zc.Span{span, span}, handler.SubmitBatchOptions{})
zHandler.SubmitZipkinBatch(context.Background(), []*zc.Span{span, span}, handler.SubmitBatchOptions{})
metricPrefix = "service"
format = "zipkin"
case processor.JaegerSpanFormat:
span, process := makeJaegerSpan(test.serviceName, test.rootSpan, test.debug)
jHandler := handler.NewJaegerSpanHandler(logger, sp)
jHandler.SubmitBatches([]*jaeger.Batch{
jHandler.SubmitBatches(context.Background(), []*jaeger.Batch{
{
Spans: []*jaeger.Span{
span,
Expand Down Expand Up @@ -248,6 +248,7 @@ func TestSpanProcessor(t *testing.T) {
require.NoError(t, err)

res, err := p.ProcessSpans(
context.Background(),
processor.SpansV1{
Spans: []*model.Span{{}}, // empty span should be enriched by sanitizers
Details: processor.Details{
Expand Down Expand Up @@ -280,7 +281,7 @@ func TestSpanProcessorErrors(t *testing.T) {
require.NoError(t, err)
p := pp.(*spanProcessor)

res, err := p.ProcessSpans(processor.SpansV1{
res, err := p.ProcessSpans(context.Background(), processor.SpansV1{
Spans: []*model.Span{
{
Process: &model.Process{
Expand Down Expand Up @@ -340,7 +341,7 @@ func TestSpanProcessorBusy(t *testing.T) {
w.Lock()
defer w.Unlock()

res, err := p.ProcessSpans(processor.SpansV1{
res, err := p.ProcessSpans(context.Background(), processor.SpansV1{
Spans: []*model.Span{
{
Process: &model.Process{
Expand Down Expand Up @@ -427,7 +428,7 @@ func TestSpanProcessorWithCollectorTags(t *testing.T) {
Spans: []*model.Span{span},
}
}
_, err = p.ProcessSpans(batch)
_, err = p.ProcessSpans(context.Background(), batch)
require.NoError(t, err)

require.Eventually(t, func() bool {
Expand Down Expand Up @@ -684,7 +685,7 @@ func TestAdditionalProcessors(t *testing.T) {
// nil doesn't fail
p, err := NewSpanProcessor(v1adapter.NewTraceWriter(w), nil, Options.QueueSize(1))
require.NoError(t, err)
res, err := p.ProcessSpans(processor.SpansV1{
res, err := p.ProcessSpans(context.Background(), processor.SpansV1{
Spans: []*model.Span{
{
Process: &model.Process{
Expand All @@ -707,7 +708,7 @@ func TestAdditionalProcessors(t *testing.T) {
}
p, err = NewSpanProcessor(v1adapter.NewTraceWriter(w), []ProcessSpan{f}, Options.QueueSize(1))
require.NoError(t, err)
res, err = p.ProcessSpans(processor.SpansV1{
res, err = p.ProcessSpans(context.Background(), processor.SpansV1{
Spans: []*model.Span{
{
Process: &model.Process{
Expand All @@ -732,7 +733,7 @@ func TestSpanProcessorContextPropagation(t *testing.T) {

dummyTenant := "context-prop-test-tenant"

res, err := p.ProcessSpans(processor.SpansV1{
res, err := p.ProcessSpans(context.Background(), processor.SpansV1{
Spans: []*model.Span{
{
Process: &model.Process{
Expand Down Expand Up @@ -777,7 +778,7 @@ func TestSpanProcessorWithOnDroppedSpanOption(t *testing.T) {
w.Lock()
defer w.Unlock()

_, err = p.ProcessSpans(processor.SpansV1{
_, err = p.ProcessSpans(context.Background(), processor.SpansV1{
Spans: []*model.Span{
{OperationName: "op1"},
},
Expand All @@ -794,7 +795,7 @@ func TestSpanProcessorWithOnDroppedSpanOption(t *testing.T) {

// Now the queue is empty again and can accept one more item, but no workers available.
// If we send two items, the last one will have to be dropped.
_, err = p.ProcessSpans(processor.SpansV1{
_, err = p.ProcessSpans(context.Background(), processor.SpansV1{
Spans: []*model.Span{
{OperationName: "op2"},
{OperationName: "op3"},
Expand Down
Loading