From 3a3cb4b9f6418169e325883e14696c43308a5d93 Mon Sep 17 00:00:00 2001 From: Vladyslav Diachenko Date: Fri, 14 May 2021 19:14:20 +0300 Subject: [PATCH 1/9] #4147 1. added logic to add tag `tenant_id` to tracing spans if tenant is defined in context. 2. added additional tracing spans to merge_querier's methods Signed-off-by: Vladyslav Diachenko --- CHANGELOG.md | 1 + .../tenantfederation/merge_queryable.go | 12 ++- .../tenantfederation/merge_queryable_test.go | 73 ++++++++++++++++++- pkg/util/spanlogger/spanlogger.go | 8 ++ pkg/util/spanlogger/spanlogger_test.go | 24 ++++++ 5 files changed, 116 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d2df96e80b4..6aafaa5bd44 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,7 @@ * [ENHANCEMENT] Distributor: Added distributors ring status section in the admin page. #4151 * [BUGFIX] Purger: fix `Invalid null value in condition for column range` caused by `nil` value in range for WriteBatch query. #4128 * [BUGFIX] Ingester: fixed infrequent panic caused by a race condition between TSDB mmap-ed head chunks truncation and queries. #4176 +* [ENHANCEMENT] Added `tenant_id` tag to tracing spans ## Blocksconvert diff --git a/pkg/querier/tenantfederation/merge_queryable.go b/pkg/querier/tenantfederation/merge_queryable.go index b8e8d21ce67..6472114eee7 100644 --- a/pkg/querier/tenantfederation/merge_queryable.go +++ b/pkg/querier/tenantfederation/merge_queryable.go @@ -15,6 +15,7 @@ import ( "github.com/cortexproject/cortex/pkg/tenant" "github.com/cortexproject/cortex/pkg/util/concurrency" + "github.com/cortexproject/cortex/pkg/util/spanlogger" ) const ( @@ -96,6 +97,9 @@ type mergeQueryable struct { // Querier returns a new mergeQuerier, which aggregates results from multiple // underlying queriers into a single result. func (m *mergeQueryable) Querier(ctx context.Context, mint int64, maxt int64) (storage.Querier, error) { + // TODO: it's necessary to think how to override context inside querier + // to mark spans created inside querier as child of a span created inside + // methods of merged querier. ids, queriers, err := m.callback(ctx, mint, maxt) if err != nil { return nil, err @@ -133,6 +137,8 @@ type mergeQuerier struct { // For the label "original_" + `idLabelName it will return all the values // of the underlying queriers for `idLabelName`. func (m *mergeQuerier) LabelValues(name string, matchers ...*labels.Matcher) ([]string, storage.Warnings, error) { + log, _ := spanlogger.New(m.ctx, "mergeQuerier.labelValues") + defer log.Span.Finish() if name == m.idLabelName { return m.ids, nil, nil } @@ -152,6 +158,8 @@ func (m *mergeQuerier) LabelValues(name string, matchers ...*labels.Matcher) ([] // queriers. It also adds the `idLabelName` and if present in the original // results the original `idLabelName`. func (m *mergeQuerier) LabelNames() ([]string, storage.Warnings, error) { + log, _ := spanlogger.New(m.ctx, "mergeQuerier.labelNames") + defer log.Span.Finish() labelNames, warnings, err := m.mergeDistinctStringSlice(func(ctx context.Context, q storage.Querier) ([]string, storage.Warnings, error) { return q.LabelNames() }) @@ -272,6 +280,8 @@ type selectJob struct { // matching. The forwarded labelSelector is not containing those that operate // on `idLabelName`. func (m *mergeQuerier) Select(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet { + log, ctx := spanlogger.New(m.ctx, "mergeQuerier.select") + defer log.Span.Finish() matchedValues, filteredMatchers := filterValuesByMatchers(m.idLabelName, m.ids, matchers...) var jobs = make([]interface{}, len(matchedValues)) var seriesSets = make([]storage.SeriesSet, len(matchedValues)) @@ -305,7 +315,7 @@ func (m *mergeQuerier) Select(sortSeries bool, hints *storage.SelectHints, match return nil } - err := concurrency.ForEach(m.ctx, jobs, maxConcurrency, run) + err := concurrency.ForEach(ctx, jobs, maxConcurrency, run) if err != nil { return storage.ErrSeriesSet(err) } diff --git a/pkg/querier/tenantfederation/merge_queryable_test.go b/pkg/querier/tenantfederation/merge_queryable_test.go index 6109141eaf9..6d7fc7b08fc 100644 --- a/pkg/querier/tenantfederation/merge_queryable_test.go +++ b/pkg/querier/tenantfederation/merge_queryable_test.go @@ -9,6 +9,9 @@ import ( "testing" "time" + sl "github.com/cortexproject/cortex/pkg/util/spanlogger" + "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go/mocktracer" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/storage" @@ -37,7 +40,11 @@ func (m *mockTenantQueryableWithFilter) Querier(ctx context.Context, _, _ int64) return nil, err } - q := mockTenantQuerier{tenant: tenantIDs[0], extraLabels: m.extraLabels} + q := mockTenantQuerier{ + tenant: tenantIDs[0], + extraLabels: m.extraLabels, + ctx: ctx, + } // set warning if exists if m.warningsByTenant != nil { @@ -66,6 +73,7 @@ type mockTenantQuerier struct { warnings storage.Warnings queryErr error + ctx context.Context } func (m mockTenantQuerier) matrix() model.Matrix { @@ -135,6 +143,8 @@ func (m *mockSeriesSet) Warnings() storage.Warnings { } func (m mockTenantQuerier) Select(_ bool, sp *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet { + log, _ := sl.New(m.ctx, "mockTenantQuerier.select") + defer log.Span.Finish() var matrix model.Matrix for _, s := range m.matrix() { @@ -478,3 +488,64 @@ func TestSetLabelsRetainExisting(t *testing.T) { assert.Equal(t, tc.expected, setLabelsRetainExisting(tc.labels, tc.additionalLabels...)) } } + +func TestTracingMergeQueryable(t *testing.T) { + mockTracer := mocktracer.New() + opentracing.SetGlobalTracer(mockTracer) + ctx := user.InjectOrgID(context.Background(), "team-a|team-b") + + // set a multi tenant resolver + tenant.WithDefaultResolver(tenant.NewMultiResolver()) + filter := mockTenantQueryableWithFilter{} + q := NewQueryable(&filter, false) + // retrieve querier if set + querier, err := q.Querier(ctx, mint, maxt) + require.NoError(t, err) + + seriesSet := querier.Select(true, &storage.SelectHints{Start: mint, + End: maxt}) + + require.NoError(t, seriesSet.Err()) + spans := mockTracer.FinishedSpans() + assertSpanExist(t, spans, "mergeQuerier.select", map[string]string{sl.TenantIdTagName: "team-a|team-b"}) + assertSpanExist(t, spans, "mockTenantQuerier.select", map[string]string{sl.TenantIdTagName: "team-a"}) + assertSpanExist(t, spans, "mockTenantQuerier.select", map[string]string{sl.TenantIdTagName: "team-b"}) +} + +func assertSpanExist(t *testing.T, + actualSpans []*mocktracer.MockSpan, + name string, + expectedTags map[string]string) { + for _, span := range actualSpans { + if span.OperationName == name && containsTags(span, expectedTags) { + return + } + } + require.FailNow(t, "can not find span matching params", + "expected span with name `%v` and with "+ + "tags %v to be present but it was not. actual spans: %+v", + name, expectedTags, extractNameWithTags(actualSpans)) +} + +type spanWithTags struct { + name string + tags map[string]interface{} +} + +func extractNameWithTags(actualSpans []*mocktracer.MockSpan) []spanWithTags { + result := make([]spanWithTags, len(actualSpans)) + for i, span := range actualSpans { + result[i] = spanWithTags{span.OperationName, span.Tags()} + } + return result +} + +func containsTags(span *mocktracer.MockSpan, + expectedTags map[string]string) bool { + for k, expectedVal := range expectedTags { + if span.Tag(k) == expectedVal { + return true + } + } + return false +} diff --git a/pkg/util/spanlogger/spanlogger.go b/pkg/util/spanlogger/spanlogger.go index 4b6131d45c6..50029b21e5c 100644 --- a/pkg/util/spanlogger/spanlogger.go +++ b/pkg/util/spanlogger/spanlogger.go @@ -8,12 +8,17 @@ import ( opentracing "github.com/opentracing/opentracing-go" "github.com/opentracing/opentracing-go/ext" otlog "github.com/opentracing/opentracing-go/log" + "github.com/weaveworks/common/user" util_log "github.com/cortexproject/cortex/pkg/util/log" ) type loggerCtxMarker struct{} +const ( + TenantIdTagName = "tenant_id" +) + var ( loggerCtxKey = &loggerCtxMarker{} ) @@ -34,6 +39,9 @@ func New(ctx context.Context, method string, kvps ...interface{}) (*SpanLogger, // retrieved with FromContext or FromContextWithFallback. func NewWithLogger(ctx context.Context, l log.Logger, method string, kvps ...interface{}) (*SpanLogger, context.Context) { span, ctx := opentracing.StartSpanFromContext(ctx, method) + if orgId, err := user.ExtractOrgID(ctx); err == nil { + span.SetTag(TenantIdTagName, orgId) + } logger := &SpanLogger{ Logger: log.With(util_log.WithContext(ctx, l), "method", method), Span: span, diff --git a/pkg/util/spanlogger/spanlogger_test.go b/pkg/util/spanlogger/spanlogger_test.go index fbea0cb2633..700be7bd608 100644 --- a/pkg/util/spanlogger/spanlogger_test.go +++ b/pkg/util/spanlogger/spanlogger_test.go @@ -5,8 +5,11 @@ import ( "testing" "github.com/go-kit/kit/log" + "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go/mocktracer" "github.com/pkg/errors" "github.com/stretchr/testify/require" + "github.com/weaveworks/common/user" ) func TestSpanLogger_Log(t *testing.T) { @@ -44,6 +47,27 @@ func TestSpanLogger_CustomLogger(t *testing.T) { require.Equal(t, expect, logged) } +func TestSpanCreatedWithTenantTag(t *testing.T) { + mockSpan := createSpan(user.InjectOrgID(context.Background(), "team-a")) + + require.Equal(t, "team-a", mockSpan.Tag(TenantIdTagName)) +} + +func TestSpanCreatedWithoutTenantTag(t *testing.T) { + mockSpan := createSpan(context.Background()) + + _, exist := mockSpan.Tags()[TenantIdTagName] + require.False(t, exist) +} + +func createSpan(ctx context.Context) *mocktracer.MockSpan { + mockTracer := mocktracer.New() + opentracing.SetGlobalTracer(mockTracer) + + logger, _ := New(ctx, "name") + return logger.Span.(*mocktracer.MockSpan) +} + type funcLogger func(keyvals ...interface{}) error func (f funcLogger) Log(keyvals ...interface{}) error { From acef6970a83283181ee59dd62d76e34b9cddff92 Mon Sep 17 00:00:00 2001 From: Vladyslav Diachenko Date: Mon, 17 May 2021 14:30:03 +0300 Subject: [PATCH 2/9] #4147 fixed errors reported by linters Signed-off-by: Vladyslav Diachenko --- pkg/querier/tenantfederation/merge_queryable_test.go | 8 ++++---- pkg/util/spanlogger/spanlogger.go | 8 ++++---- pkg/util/spanlogger/spanlogger_test.go | 4 ++-- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/pkg/querier/tenantfederation/merge_queryable_test.go b/pkg/querier/tenantfederation/merge_queryable_test.go index 6d7fc7b08fc..45a24af7036 100644 --- a/pkg/querier/tenantfederation/merge_queryable_test.go +++ b/pkg/querier/tenantfederation/merge_queryable_test.go @@ -9,7 +9,6 @@ import ( "testing" "time" - sl "github.com/cortexproject/cortex/pkg/util/spanlogger" "github.com/opentracing/opentracing-go" "github.com/opentracing/opentracing-go/mocktracer" "github.com/prometheus/common/model" @@ -21,6 +20,7 @@ import ( "github.com/cortexproject/cortex/pkg/querier/series" "github.com/cortexproject/cortex/pkg/tenant" + sl "github.com/cortexproject/cortex/pkg/util/spanlogger" ) const ( @@ -507,9 +507,9 @@ func TestTracingMergeQueryable(t *testing.T) { require.NoError(t, seriesSet.Err()) spans := mockTracer.FinishedSpans() - assertSpanExist(t, spans, "mergeQuerier.select", map[string]string{sl.TenantIdTagName: "team-a|team-b"}) - assertSpanExist(t, spans, "mockTenantQuerier.select", map[string]string{sl.TenantIdTagName: "team-a"}) - assertSpanExist(t, spans, "mockTenantQuerier.select", map[string]string{sl.TenantIdTagName: "team-b"}) + assertSpanExist(t, spans, "mergeQuerier.select", map[string]string{sl.TenantIDTagName: "team-a|team-b"}) + assertSpanExist(t, spans, "mockTenantQuerier.select", map[string]string{sl.TenantIDTagName: "team-a"}) + assertSpanExist(t, spans, "mockTenantQuerier.select", map[string]string{sl.TenantIDTagName: "team-b"}) } func assertSpanExist(t *testing.T, diff --git a/pkg/util/spanlogger/spanlogger.go b/pkg/util/spanlogger/spanlogger.go index 50029b21e5c..7a0754299f7 100644 --- a/pkg/util/spanlogger/spanlogger.go +++ b/pkg/util/spanlogger/spanlogger.go @@ -8,15 +8,15 @@ import ( opentracing "github.com/opentracing/opentracing-go" "github.com/opentracing/opentracing-go/ext" otlog "github.com/opentracing/opentracing-go/log" - "github.com/weaveworks/common/user" + "github.com/cortexproject/cortex/pkg/tenant" util_log "github.com/cortexproject/cortex/pkg/util/log" ) type loggerCtxMarker struct{} const ( - TenantIdTagName = "tenant_id" + TenantIDTagName = "tenant_id" ) var ( @@ -39,8 +39,8 @@ func New(ctx context.Context, method string, kvps ...interface{}) (*SpanLogger, // retrieved with FromContext or FromContextWithFallback. func NewWithLogger(ctx context.Context, l log.Logger, method string, kvps ...interface{}) (*SpanLogger, context.Context) { span, ctx := opentracing.StartSpanFromContext(ctx, method) - if orgId, err := user.ExtractOrgID(ctx); err == nil { - span.SetTag(TenantIdTagName, orgId) + if ids, _ := tenant.TenantIDs(ctx); ids != nil { + span.SetTag(TenantIDTagName, tenant.JoinTenantIDs(ids)) } logger := &SpanLogger{ Logger: log.With(util_log.WithContext(ctx, l), "method", method), diff --git a/pkg/util/spanlogger/spanlogger_test.go b/pkg/util/spanlogger/spanlogger_test.go index 700be7bd608..afa7fa75b96 100644 --- a/pkg/util/spanlogger/spanlogger_test.go +++ b/pkg/util/spanlogger/spanlogger_test.go @@ -50,13 +50,13 @@ func TestSpanLogger_CustomLogger(t *testing.T) { func TestSpanCreatedWithTenantTag(t *testing.T) { mockSpan := createSpan(user.InjectOrgID(context.Background(), "team-a")) - require.Equal(t, "team-a", mockSpan.Tag(TenantIdTagName)) + require.Equal(t, "team-a", mockSpan.Tag(TenantIDTagName)) } func TestSpanCreatedWithoutTenantTag(t *testing.T) { mockSpan := createSpan(context.Background()) - _, exist := mockSpan.Tags()[TenantIdTagName] + _, exist := mockSpan.Tags()[TenantIDTagName] require.False(t, exist) } From f3c5648c3fbf5f7f21b77c112e32ce38bb45fdf6 Mon Sep 17 00:00:00 2001 From: Vladyslav Diachenko Date: Tue, 18 May 2021 16:53:26 +0300 Subject: [PATCH 3/9] #4147 1.changed value of tenant_id tag in tracing span from string value to array 2. removed import alias for spanlogger Signed-off-by: Vladyslav Diachenko --- pkg/querier/tenantfederation/merge_queryable_test.go | 10 +++++----- pkg/util/spanlogger/spanlogger.go | 2 +- pkg/util/spanlogger/spanlogger_test.go | 2 +- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/pkg/querier/tenantfederation/merge_queryable_test.go b/pkg/querier/tenantfederation/merge_queryable_test.go index 45a24af7036..2aa78987a57 100644 --- a/pkg/querier/tenantfederation/merge_queryable_test.go +++ b/pkg/querier/tenantfederation/merge_queryable_test.go @@ -20,7 +20,7 @@ import ( "github.com/cortexproject/cortex/pkg/querier/series" "github.com/cortexproject/cortex/pkg/tenant" - sl "github.com/cortexproject/cortex/pkg/util/spanlogger" + "github.com/cortexproject/cortex/pkg/util/spanlogger" ) const ( @@ -143,7 +143,7 @@ func (m *mockSeriesSet) Warnings() storage.Warnings { } func (m mockTenantQuerier) Select(_ bool, sp *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet { - log, _ := sl.New(m.ctx, "mockTenantQuerier.select") + log, _ := spanlogger.New(m.ctx, "mockTenantQuerier.select") defer log.Span.Finish() var matrix model.Matrix @@ -507,9 +507,9 @@ func TestTracingMergeQueryable(t *testing.T) { require.NoError(t, seriesSet.Err()) spans := mockTracer.FinishedSpans() - assertSpanExist(t, spans, "mergeQuerier.select", map[string]string{sl.TenantIDTagName: "team-a|team-b"}) - assertSpanExist(t, spans, "mockTenantQuerier.select", map[string]string{sl.TenantIDTagName: "team-a"}) - assertSpanExist(t, spans, "mockTenantQuerier.select", map[string]string{sl.TenantIDTagName: "team-b"}) + assertSpanExist(t, spans, "mergeQuerier.select", map[string]string{spanlogger.TenantIDTagName: "team-a|team-b"}) + assertSpanExist(t, spans, "mockTenantQuerier.select", map[string]string{spanlogger.TenantIDTagName: "team-a"}) + assertSpanExist(t, spans, "mockTenantQuerier.select", map[string]string{spanlogger.TenantIDTagName: "team-b"}) } func assertSpanExist(t *testing.T, diff --git a/pkg/util/spanlogger/spanlogger.go b/pkg/util/spanlogger/spanlogger.go index 7a0754299f7..b164449432c 100644 --- a/pkg/util/spanlogger/spanlogger.go +++ b/pkg/util/spanlogger/spanlogger.go @@ -40,7 +40,7 @@ func New(ctx context.Context, method string, kvps ...interface{}) (*SpanLogger, func NewWithLogger(ctx context.Context, l log.Logger, method string, kvps ...interface{}) (*SpanLogger, context.Context) { span, ctx := opentracing.StartSpanFromContext(ctx, method) if ids, _ := tenant.TenantIDs(ctx); ids != nil { - span.SetTag(TenantIDTagName, tenant.JoinTenantIDs(ids)) + span.SetTag(TenantIDTagName, ids) } logger := &SpanLogger{ Logger: log.With(util_log.WithContext(ctx, l), "method", method), diff --git a/pkg/util/spanlogger/spanlogger_test.go b/pkg/util/spanlogger/spanlogger_test.go index afa7fa75b96..4d701fcbbb5 100644 --- a/pkg/util/spanlogger/spanlogger_test.go +++ b/pkg/util/spanlogger/spanlogger_test.go @@ -50,7 +50,7 @@ func TestSpanLogger_CustomLogger(t *testing.T) { func TestSpanCreatedWithTenantTag(t *testing.T) { mockSpan := createSpan(user.InjectOrgID(context.Background(), "team-a")) - require.Equal(t, "team-a", mockSpan.Tag(TenantIDTagName)) + require.Equal(t, []string{"team-a"}, mockSpan.Tag(TenantIDTagName)) } func TestSpanCreatedWithoutTenantTag(t *testing.T) { From 48650185c8e62793760d78127b130849785aa43e Mon Sep 17 00:00:00 2001 From: Vladyslav Diachenko Date: Thu, 20 May 2021 10:21:56 +0300 Subject: [PATCH 4/9] #4147 1. fixed tests 2. fixed name of the spans in mergeQuerier Signed-off-by: Vladyslav Diachenko --- .../tenantfederation/merge_queryable.go | 6 +-- .../tenantfederation/merge_queryable_test.go | 41 ++++++++++--------- pkg/util/spanlogger/spanlogger.go | 4 +- 3 files changed, 27 insertions(+), 24 deletions(-) diff --git a/pkg/querier/tenantfederation/merge_queryable.go b/pkg/querier/tenantfederation/merge_queryable.go index 6472114eee7..c05f9980171 100644 --- a/pkg/querier/tenantfederation/merge_queryable.go +++ b/pkg/querier/tenantfederation/merge_queryable.go @@ -137,7 +137,7 @@ type mergeQuerier struct { // For the label "original_" + `idLabelName it will return all the values // of the underlying queriers for `idLabelName`. func (m *mergeQuerier) LabelValues(name string, matchers ...*labels.Matcher) ([]string, storage.Warnings, error) { - log, _ := spanlogger.New(m.ctx, "mergeQuerier.labelValues") + log, _ := spanlogger.New(m.ctx, "mergeQuerier.LabelValues") defer log.Span.Finish() if name == m.idLabelName { return m.ids, nil, nil @@ -158,7 +158,7 @@ func (m *mergeQuerier) LabelValues(name string, matchers ...*labels.Matcher) ([] // queriers. It also adds the `idLabelName` and if present in the original // results the original `idLabelName`. func (m *mergeQuerier) LabelNames() ([]string, storage.Warnings, error) { - log, _ := spanlogger.New(m.ctx, "mergeQuerier.labelNames") + log, _ := spanlogger.New(m.ctx, "mergeQuerier.LabelNames") defer log.Span.Finish() labelNames, warnings, err := m.mergeDistinctStringSlice(func(ctx context.Context, q storage.Querier) ([]string, storage.Warnings, error) { return q.LabelNames() @@ -280,7 +280,7 @@ type selectJob struct { // matching. The forwarded labelSelector is not containing those that operate // on `idLabelName`. func (m *mergeQuerier) Select(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet { - log, ctx := spanlogger.New(m.ctx, "mergeQuerier.select") + log, ctx := spanlogger.New(m.ctx, "mergeQuerier.Select") defer log.Span.Finish() matchedValues, filteredMatchers := filterValuesByMatchers(m.idLabelName, m.ids, matchers...) var jobs = make([]interface{}, len(matchedValues)) diff --git a/pkg/querier/tenantfederation/merge_queryable_test.go b/pkg/querier/tenantfederation/merge_queryable_test.go index 2aa78987a57..7c59dfffba7 100644 --- a/pkg/querier/tenantfederation/merge_queryable_test.go +++ b/pkg/querier/tenantfederation/merge_queryable_test.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "reflect" "sort" "strings" "testing" @@ -507,29 +508,27 @@ func TestTracingMergeQueryable(t *testing.T) { require.NoError(t, seriesSet.Err()) spans := mockTracer.FinishedSpans() - assertSpanExist(t, spans, "mergeQuerier.select", map[string]string{spanlogger.TenantIDTagName: "team-a|team-b"}) - assertSpanExist(t, spans, "mockTenantQuerier.select", map[string]string{spanlogger.TenantIDTagName: "team-a"}) - assertSpanExist(t, spans, "mockTenantQuerier.select", map[string]string{spanlogger.TenantIDTagName: "team-b"}) + assertSpanExist(t, spans, "mergeQuerier.select", expectedTag{spanlogger.TenantIDTagName, + []string{"team-a", "team-b"}}) + assertSpanExist(t, spans, "mockTenantQuerier.select", expectedTag{spanlogger.TenantIDTagName, + []string{"team-a"}}) + assertSpanExist(t, spans, "mockTenantQuerier.select", expectedTag{spanlogger.TenantIDTagName, + []string{"team-b"}}) } func assertSpanExist(t *testing.T, actualSpans []*mocktracer.MockSpan, name string, - expectedTags map[string]string) { + tag expectedTag) { for _, span := range actualSpans { - if span.OperationName == name && containsTags(span, expectedTags) { + if span.OperationName == name && containsTags(span, tag) { return } } require.FailNow(t, "can not find span matching params", "expected span with name `%v` and with "+ "tags %v to be present but it was not. actual spans: %+v", - name, expectedTags, extractNameWithTags(actualSpans)) -} - -type spanWithTags struct { - name string - tags map[string]interface{} + name, tag, extractNameWithTags(actualSpans)) } func extractNameWithTags(actualSpans []*mocktracer.MockSpan) []spanWithTags { @@ -540,12 +539,16 @@ func extractNameWithTags(actualSpans []*mocktracer.MockSpan) []spanWithTags { return result } -func containsTags(span *mocktracer.MockSpan, - expectedTags map[string]string) bool { - for k, expectedVal := range expectedTags { - if span.Tag(k) == expectedVal { - return true - } - } - return false +func containsTags(span *mocktracer.MockSpan, expectedTag expectedTag) bool { + return reflect.DeepEqual(span.Tag(expectedTag.key), expectedTag.values) +} + +type spanWithTags struct { + name string + tags map[string]interface{} +} + +type expectedTag struct { + key string + values []string } diff --git a/pkg/util/spanlogger/spanlogger.go b/pkg/util/spanlogger/spanlogger.go index b164449432c..a856ceed75b 100644 --- a/pkg/util/spanlogger/spanlogger.go +++ b/pkg/util/spanlogger/spanlogger.go @@ -16,7 +16,7 @@ import ( type loggerCtxMarker struct{} const ( - TenantIDTagName = "tenant_id" + TenantIDTagName = "tenant_ids" ) var ( @@ -39,7 +39,7 @@ func New(ctx context.Context, method string, kvps ...interface{}) (*SpanLogger, // retrieved with FromContext or FromContextWithFallback. func NewWithLogger(ctx context.Context, l log.Logger, method string, kvps ...interface{}) (*SpanLogger, context.Context) { span, ctx := opentracing.StartSpanFromContext(ctx, method) - if ids, _ := tenant.TenantIDs(ctx); ids != nil { + if ids, _ := tenant.TenantIDs(ctx); len(ids) > 0 { span.SetTag(TenantIDTagName, ids) } logger := &SpanLogger{ From 3ff0b808af00b5804073b0f05190974763a54267 Mon Sep 17 00:00:00 2001 From: Vladyslav Diachenko Date: Thu, 20 May 2021 10:25:08 +0300 Subject: [PATCH 5/9] #4147 updated CHANGELOG.md Signed-off-by: Vladyslav Diachenko --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6aafaa5bd44..1d5e7a43f29 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,7 +20,7 @@ * [ENHANCEMENT] Distributor: Added distributors ring status section in the admin page. #4151 * [BUGFIX] Purger: fix `Invalid null value in condition for column range` caused by `nil` value in range for WriteBatch query. #4128 * [BUGFIX] Ingester: fixed infrequent panic caused by a race condition between TSDB mmap-ed head chunks truncation and queries. #4176 -* [ENHANCEMENT] Added `tenant_id` tag to tracing spans +* [ENHANCEMENT] Added `tenant_ids` tag to tracing spans ## Blocksconvert From 1d15a7169486b99aa97b70100a7c0fc9974e63bf Mon Sep 17 00:00:00 2001 From: Vladyslav Diachenko Date: Thu, 20 May 2021 12:04:39 +0300 Subject: [PATCH 6/9] #4147 passed method's span context to mergeDistinctStringSlice function to create inner spans under this context Signed-off-by: Vladyslav Diachenko --- pkg/querier/tenantfederation/merge_queryable.go | 15 +++++++++------ .../tenantfederation/merge_queryable_test.go | 2 +- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/pkg/querier/tenantfederation/merge_queryable.go b/pkg/querier/tenantfederation/merge_queryable.go index c05f9980171..1185fe53d34 100644 --- a/pkg/querier/tenantfederation/merge_queryable.go +++ b/pkg/querier/tenantfederation/merge_queryable.go @@ -137,7 +137,7 @@ type mergeQuerier struct { // For the label "original_" + `idLabelName it will return all the values // of the underlying queriers for `idLabelName`. func (m *mergeQuerier) LabelValues(name string, matchers ...*labels.Matcher) ([]string, storage.Warnings, error) { - log, _ := spanlogger.New(m.ctx, "mergeQuerier.LabelValues") + log, ctx := spanlogger.New(m.ctx, "mergeQuerier.LabelValues") defer log.Span.Finish() if name == m.idLabelName { return m.ids, nil, nil @@ -149,7 +149,8 @@ func (m *mergeQuerier) LabelValues(name string, matchers ...*labels.Matcher) ([] name = m.idLabelName } - return m.mergeDistinctStringSlice(func(ctx context.Context, q storage.Querier) ([]string, storage.Warnings, error) { + return m.mergeDistinctStringSlice(ctx, func(ctx context.Context, + q storage.Querier) ([]string, storage.Warnings, error) { return q.LabelValues(name, matchers...) }) } @@ -158,9 +159,10 @@ func (m *mergeQuerier) LabelValues(name string, matchers ...*labels.Matcher) ([] // queriers. It also adds the `idLabelName` and if present in the original // results the original `idLabelName`. func (m *mergeQuerier) LabelNames() ([]string, storage.Warnings, error) { - log, _ := spanlogger.New(m.ctx, "mergeQuerier.LabelNames") + log, ctx := spanlogger.New(m.ctx, "mergeQuerier.LabelNames") defer log.Span.Finish() - labelNames, warnings, err := m.mergeDistinctStringSlice(func(ctx context.Context, q storage.Querier) ([]string, storage.Warnings, error) { + labelNames, warnings, err := m.mergeDistinctStringSlice(ctx, func(ctx context.Context, + q storage.Querier) ([]string, storage.Warnings, error) { return q.LabelNames() }) if err != nil { @@ -204,7 +206,8 @@ type stringSliceFuncJob struct { // on per querier in parallel. It removes duplicates and sorts the result. It // doesn't require the output of the stringSliceFunc to be sorted, as results // of LabelValues are not sorted. -func (m *mergeQuerier) mergeDistinctStringSlice(f stringSliceFunc) ([]string, storage.Warnings, error) { +func (m *mergeQuerier) mergeDistinctStringSlice(ctx context.Context, + f stringSliceFunc) ([]string, storage.Warnings, error) { var jobs = make([]interface{}, len(m.ids)) for pos := range m.ids { @@ -229,7 +232,7 @@ func (m *mergeQuerier) mergeDistinctStringSlice(f stringSliceFunc) ([]string, st return nil } - err := concurrency.ForEach(m.ctx, jobs, maxConcurrency, run) + err := concurrency.ForEach(ctx, jobs, maxConcurrency, run) if err != nil { return nil, nil, err } diff --git a/pkg/querier/tenantfederation/merge_queryable_test.go b/pkg/querier/tenantfederation/merge_queryable_test.go index 7c59dfffba7..6f94c0bf0a9 100644 --- a/pkg/querier/tenantfederation/merge_queryable_test.go +++ b/pkg/querier/tenantfederation/merge_queryable_test.go @@ -508,7 +508,7 @@ func TestTracingMergeQueryable(t *testing.T) { require.NoError(t, seriesSet.Err()) spans := mockTracer.FinishedSpans() - assertSpanExist(t, spans, "mergeQuerier.select", expectedTag{spanlogger.TenantIDTagName, + assertSpanExist(t, spans, "mergeQuerier.Select", expectedTag{spanlogger.TenantIDTagName, []string{"team-a", "team-b"}}) assertSpanExist(t, spans, "mockTenantQuerier.select", expectedTag{spanlogger.TenantIDTagName, []string{"team-a"}}) From 669b8d252f680dfdc367cbdcf40909364395dc92 Mon Sep 17 00:00:00 2001 From: Vladyslav Diachenko Date: Thu, 20 May 2021 13:29:35 +0300 Subject: [PATCH 7/9] #4147 added mocktracer package for testing purpose Signed-off-by: Vladyslav Diachenko --- .../mocktracer/mocklogrecord.go | 105 +++++++ .../opentracing-go/mocktracer/mockspan.go | 284 ++++++++++++++++++ .../opentracing-go/mocktracer/mocktracer.go | 105 +++++++ .../opentracing-go/mocktracer/propagation.go | 120 ++++++++ vendor/modules.txt | 1 + 5 files changed, 615 insertions(+) create mode 100644 vendor/github.com/opentracing/opentracing-go/mocktracer/mocklogrecord.go create mode 100644 vendor/github.com/opentracing/opentracing-go/mocktracer/mockspan.go create mode 100644 vendor/github.com/opentracing/opentracing-go/mocktracer/mocktracer.go create mode 100644 vendor/github.com/opentracing/opentracing-go/mocktracer/propagation.go diff --git a/vendor/github.com/opentracing/opentracing-go/mocktracer/mocklogrecord.go b/vendor/github.com/opentracing/opentracing-go/mocktracer/mocklogrecord.go new file mode 100644 index 00000000000..2ce96d9d388 --- /dev/null +++ b/vendor/github.com/opentracing/opentracing-go/mocktracer/mocklogrecord.go @@ -0,0 +1,105 @@ +package mocktracer + +import ( + "fmt" + "reflect" + "time" + + "github.com/opentracing/opentracing-go/log" +) + +// MockLogRecord represents data logged to a Span via Span.LogFields or +// Span.LogKV. +type MockLogRecord struct { + Timestamp time.Time + Fields []MockKeyValue +} + +// MockKeyValue represents a single key:value pair. +type MockKeyValue struct { + Key string + + // All MockLogRecord values are coerced to strings via fmt.Sprint(), though + // we retain their type separately. + ValueKind reflect.Kind + ValueString string +} + +// EmitString belongs to the log.Encoder interface +func (m *MockKeyValue) EmitString(key, value string) { + m.Key = key + m.ValueKind = reflect.TypeOf(value).Kind() + m.ValueString = fmt.Sprint(value) +} + +// EmitBool belongs to the log.Encoder interface +func (m *MockKeyValue) EmitBool(key string, value bool) { + m.Key = key + m.ValueKind = reflect.TypeOf(value).Kind() + m.ValueString = fmt.Sprint(value) +} + +// EmitInt belongs to the log.Encoder interface +func (m *MockKeyValue) EmitInt(key string, value int) { + m.Key = key + m.ValueKind = reflect.TypeOf(value).Kind() + m.ValueString = fmt.Sprint(value) +} + +// EmitInt32 belongs to the log.Encoder interface +func (m *MockKeyValue) EmitInt32(key string, value int32) { + m.Key = key + m.ValueKind = reflect.TypeOf(value).Kind() + m.ValueString = fmt.Sprint(value) +} + +// EmitInt64 belongs to the log.Encoder interface +func (m *MockKeyValue) EmitInt64(key string, value int64) { + m.Key = key + m.ValueKind = reflect.TypeOf(value).Kind() + m.ValueString = fmt.Sprint(value) +} + +// EmitUint32 belongs to the log.Encoder interface +func (m *MockKeyValue) EmitUint32(key string, value uint32) { + m.Key = key + m.ValueKind = reflect.TypeOf(value).Kind() + m.ValueString = fmt.Sprint(value) +} + +// EmitUint64 belongs to the log.Encoder interface +func (m *MockKeyValue) EmitUint64(key string, value uint64) { + m.Key = key + m.ValueKind = reflect.TypeOf(value).Kind() + m.ValueString = fmt.Sprint(value) +} + +// EmitFloat32 belongs to the log.Encoder interface +func (m *MockKeyValue) EmitFloat32(key string, value float32) { + m.Key = key + m.ValueKind = reflect.TypeOf(value).Kind() + m.ValueString = fmt.Sprint(value) +} + +// EmitFloat64 belongs to the log.Encoder interface +func (m *MockKeyValue) EmitFloat64(key string, value float64) { + m.Key = key + m.ValueKind = reflect.TypeOf(value).Kind() + m.ValueString = fmt.Sprint(value) +} + +// EmitObject belongs to the log.Encoder interface +func (m *MockKeyValue) EmitObject(key string, value interface{}) { + m.Key = key + m.ValueKind = reflect.TypeOf(value).Kind() + m.ValueString = fmt.Sprint(value) +} + +// EmitLazyLogger belongs to the log.Encoder interface +func (m *MockKeyValue) EmitLazyLogger(value log.LazyLogger) { + var meta MockKeyValue + value(&meta) + m.Key = meta.Key + m.ValueKind = meta.ValueKind + m.ValueString = meta.ValueString +} diff --git a/vendor/github.com/opentracing/opentracing-go/mocktracer/mockspan.go b/vendor/github.com/opentracing/opentracing-go/mocktracer/mockspan.go new file mode 100644 index 00000000000..8c7932ce65b --- /dev/null +++ b/vendor/github.com/opentracing/opentracing-go/mocktracer/mockspan.go @@ -0,0 +1,284 @@ +package mocktracer + +import ( + "fmt" + "sync" + "sync/atomic" + "time" + + "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go/ext" + "github.com/opentracing/opentracing-go/log" +) + +// MockSpanContext is an opentracing.SpanContext implementation. +// +// It is entirely unsuitable for production use, but appropriate for tests +// that want to verify tracing behavior in other frameworks/applications. +// +// By default all spans have Sampled=true flag, unless {"sampling.priority": 0} +// tag is set. +type MockSpanContext struct { + TraceID int + SpanID int + Sampled bool + Baggage map[string]string +} + +var mockIDSource = uint32(42) + +func nextMockID() int { + return int(atomic.AddUint32(&mockIDSource, 1)) +} + +// ForeachBaggageItem belongs to the SpanContext interface +func (c MockSpanContext) ForeachBaggageItem(handler func(k, v string) bool) { + for k, v := range c.Baggage { + if !handler(k, v) { + break + } + } +} + +// WithBaggageItem creates a new context with an extra baggage item. +func (c MockSpanContext) WithBaggageItem(key, value string) MockSpanContext { + var newBaggage map[string]string + if c.Baggage == nil { + newBaggage = map[string]string{key: value} + } else { + newBaggage = make(map[string]string, len(c.Baggage)+1) + for k, v := range c.Baggage { + newBaggage[k] = v + } + newBaggage[key] = value + } + // Use positional parameters so the compiler will help catch new fields. + return MockSpanContext{c.TraceID, c.SpanID, c.Sampled, newBaggage} +} + +// MockSpan is an opentracing.Span implementation that exports its internal +// state for testing purposes. +type MockSpan struct { + sync.RWMutex + + ParentID int + + OperationName string + StartTime time.Time + FinishTime time.Time + + // All of the below are protected by the embedded RWMutex. + SpanContext MockSpanContext + tags map[string]interface{} + logs []MockLogRecord + tracer *MockTracer +} + +func newMockSpan(t *MockTracer, name string, opts opentracing.StartSpanOptions) *MockSpan { + tags := opts.Tags + if tags == nil { + tags = map[string]interface{}{} + } + traceID := nextMockID() + parentID := int(0) + var baggage map[string]string + sampled := true + if len(opts.References) > 0 { + traceID = opts.References[0].ReferencedContext.(MockSpanContext).TraceID + parentID = opts.References[0].ReferencedContext.(MockSpanContext).SpanID + sampled = opts.References[0].ReferencedContext.(MockSpanContext).Sampled + baggage = opts.References[0].ReferencedContext.(MockSpanContext).Baggage + } + spanContext := MockSpanContext{traceID, nextMockID(), sampled, baggage} + startTime := opts.StartTime + if startTime.IsZero() { + startTime = time.Now() + } + return &MockSpan{ + ParentID: parentID, + OperationName: name, + StartTime: startTime, + tags: tags, + logs: []MockLogRecord{}, + SpanContext: spanContext, + + tracer: t, + } +} + +// Tags returns a copy of tags accumulated by the span so far +func (s *MockSpan) Tags() map[string]interface{} { + s.RLock() + defer s.RUnlock() + tags := make(map[string]interface{}) + for k, v := range s.tags { + tags[k] = v + } + return tags +} + +// Tag returns a single tag +func (s *MockSpan) Tag(k string) interface{} { + s.RLock() + defer s.RUnlock() + return s.tags[k] +} + +// Logs returns a copy of logs accumulated in the span so far +func (s *MockSpan) Logs() []MockLogRecord { + s.RLock() + defer s.RUnlock() + logs := make([]MockLogRecord, len(s.logs)) + copy(logs, s.logs) + return logs +} + +// Context belongs to the Span interface +func (s *MockSpan) Context() opentracing.SpanContext { + s.Lock() + defer s.Unlock() + return s.SpanContext +} + +// SetTag belongs to the Span interface +func (s *MockSpan) SetTag(key string, value interface{}) opentracing.Span { + s.Lock() + defer s.Unlock() + if key == string(ext.SamplingPriority) { + if v, ok := value.(uint16); ok { + s.SpanContext.Sampled = v > 0 + return s + } + if v, ok := value.(int); ok { + s.SpanContext.Sampled = v > 0 + return s + } + } + s.tags[key] = value + return s +} + +// SetBaggageItem belongs to the Span interface +func (s *MockSpan) SetBaggageItem(key, val string) opentracing.Span { + s.Lock() + defer s.Unlock() + s.SpanContext = s.SpanContext.WithBaggageItem(key, val) + return s +} + +// BaggageItem belongs to the Span interface +func (s *MockSpan) BaggageItem(key string) string { + s.RLock() + defer s.RUnlock() + return s.SpanContext.Baggage[key] +} + +// Finish belongs to the Span interface +func (s *MockSpan) Finish() { + s.Lock() + s.FinishTime = time.Now() + s.Unlock() + s.tracer.recordSpan(s) +} + +// FinishWithOptions belongs to the Span interface +func (s *MockSpan) FinishWithOptions(opts opentracing.FinishOptions) { + s.Lock() + s.FinishTime = opts.FinishTime + s.Unlock() + + // Handle any late-bound LogRecords. + for _, lr := range opts.LogRecords { + s.logFieldsWithTimestamp(lr.Timestamp, lr.Fields...) + } + // Handle (deprecated) BulkLogData. + for _, ld := range opts.BulkLogData { + if ld.Payload != nil { + s.logFieldsWithTimestamp( + ld.Timestamp, + log.String("event", ld.Event), + log.Object("payload", ld.Payload)) + } else { + s.logFieldsWithTimestamp( + ld.Timestamp, + log.String("event", ld.Event)) + } + } + + s.tracer.recordSpan(s) +} + +// String allows printing span for debugging +func (s *MockSpan) String() string { + return fmt.Sprintf( + "traceId=%d, spanId=%d, parentId=%d, sampled=%t, name=%s", + s.SpanContext.TraceID, s.SpanContext.SpanID, s.ParentID, + s.SpanContext.Sampled, s.OperationName) +} + +// LogFields belongs to the Span interface +func (s *MockSpan) LogFields(fields ...log.Field) { + s.logFieldsWithTimestamp(time.Now(), fields...) +} + +// The caller MUST NOT hold s.Lock +func (s *MockSpan) logFieldsWithTimestamp(ts time.Time, fields ...log.Field) { + lr := MockLogRecord{ + Timestamp: ts, + Fields: make([]MockKeyValue, len(fields)), + } + for i, f := range fields { + outField := &(lr.Fields[i]) + f.Marshal(outField) + } + + s.Lock() + defer s.Unlock() + s.logs = append(s.logs, lr) +} + +// LogKV belongs to the Span interface. +// +// This implementations coerces all "values" to strings, though that is not +// something all implementations need to do. Indeed, a motivated person can and +// probably should have this do a typed switch on the values. +func (s *MockSpan) LogKV(keyValues ...interface{}) { + if len(keyValues)%2 != 0 { + s.LogFields(log.Error(fmt.Errorf("Non-even keyValues len: %v", len(keyValues)))) + return + } + fields, err := log.InterleavedKVToFields(keyValues...) + if err != nil { + s.LogFields(log.Error(err), log.String("function", "LogKV")) + return + } + s.LogFields(fields...) +} + +// LogEvent belongs to the Span interface +func (s *MockSpan) LogEvent(event string) { + s.LogFields(log.String("event", event)) +} + +// LogEventWithPayload belongs to the Span interface +func (s *MockSpan) LogEventWithPayload(event string, payload interface{}) { + s.LogFields(log.String("event", event), log.Object("payload", payload)) +} + +// Log belongs to the Span interface +func (s *MockSpan) Log(data opentracing.LogData) { + panic("MockSpan.Log() no longer supported") +} + +// SetOperationName belongs to the Span interface +func (s *MockSpan) SetOperationName(operationName string) opentracing.Span { + s.Lock() + defer s.Unlock() + s.OperationName = operationName + return s +} + +// Tracer belongs to the Span interface +func (s *MockSpan) Tracer() opentracing.Tracer { + return s.tracer +} diff --git a/vendor/github.com/opentracing/opentracing-go/mocktracer/mocktracer.go b/vendor/github.com/opentracing/opentracing-go/mocktracer/mocktracer.go new file mode 100644 index 00000000000..4533da7b1f7 --- /dev/null +++ b/vendor/github.com/opentracing/opentracing-go/mocktracer/mocktracer.go @@ -0,0 +1,105 @@ +package mocktracer + +import ( + "sync" + + "github.com/opentracing/opentracing-go" +) + +// New returns a MockTracer opentracing.Tracer implementation that's intended +// to facilitate tests of OpenTracing instrumentation. +func New() *MockTracer { + t := &MockTracer{ + finishedSpans: []*MockSpan{}, + injectors: make(map[interface{}]Injector), + extractors: make(map[interface{}]Extractor), + } + + // register default injectors/extractors + textPropagator := new(TextMapPropagator) + t.RegisterInjector(opentracing.TextMap, textPropagator) + t.RegisterExtractor(opentracing.TextMap, textPropagator) + + httpPropagator := &TextMapPropagator{HTTPHeaders: true} + t.RegisterInjector(opentracing.HTTPHeaders, httpPropagator) + t.RegisterExtractor(opentracing.HTTPHeaders, httpPropagator) + + return t +} + +// MockTracer is only intended for testing OpenTracing instrumentation. +// +// It is entirely unsuitable for production use, but appropriate for tests +// that want to verify tracing behavior in other frameworks/applications. +type MockTracer struct { + sync.RWMutex + finishedSpans []*MockSpan + injectors map[interface{}]Injector + extractors map[interface{}]Extractor +} + +// FinishedSpans returns all spans that have been Finish()'ed since the +// MockTracer was constructed or since the last call to its Reset() method. +func (t *MockTracer) FinishedSpans() []*MockSpan { + t.RLock() + defer t.RUnlock() + spans := make([]*MockSpan, len(t.finishedSpans)) + copy(spans, t.finishedSpans) + return spans +} + +// Reset clears the internally accumulated finished spans. Note that any +// extant MockSpans will still append to finishedSpans when they Finish(), +// even after a call to Reset(). +func (t *MockTracer) Reset() { + t.Lock() + defer t.Unlock() + t.finishedSpans = []*MockSpan{} +} + +// StartSpan belongs to the Tracer interface. +func (t *MockTracer) StartSpan(operationName string, opts ...opentracing.StartSpanOption) opentracing.Span { + sso := opentracing.StartSpanOptions{} + for _, o := range opts { + o.Apply(&sso) + } + return newMockSpan(t, operationName, sso) +} + +// RegisterInjector registers injector for given format +func (t *MockTracer) RegisterInjector(format interface{}, injector Injector) { + t.injectors[format] = injector +} + +// RegisterExtractor registers extractor for given format +func (t *MockTracer) RegisterExtractor(format interface{}, extractor Extractor) { + t.extractors[format] = extractor +} + +// Inject belongs to the Tracer interface. +func (t *MockTracer) Inject(sm opentracing.SpanContext, format interface{}, carrier interface{}) error { + spanContext, ok := sm.(MockSpanContext) + if !ok { + return opentracing.ErrInvalidSpanContext + } + injector, ok := t.injectors[format] + if !ok { + return opentracing.ErrUnsupportedFormat + } + return injector.Inject(spanContext, carrier) +} + +// Extract belongs to the Tracer interface. +func (t *MockTracer) Extract(format interface{}, carrier interface{}) (opentracing.SpanContext, error) { + extractor, ok := t.extractors[format] + if !ok { + return nil, opentracing.ErrUnsupportedFormat + } + return extractor.Extract(carrier) +} + +func (t *MockTracer) recordSpan(span *MockSpan) { + t.Lock() + defer t.Unlock() + t.finishedSpans = append(t.finishedSpans, span) +} diff --git a/vendor/github.com/opentracing/opentracing-go/mocktracer/propagation.go b/vendor/github.com/opentracing/opentracing-go/mocktracer/propagation.go new file mode 100644 index 00000000000..8364f1d1825 --- /dev/null +++ b/vendor/github.com/opentracing/opentracing-go/mocktracer/propagation.go @@ -0,0 +1,120 @@ +package mocktracer + +import ( + "fmt" + "net/url" + "strconv" + "strings" + + "github.com/opentracing/opentracing-go" +) + +const mockTextMapIdsPrefix = "mockpfx-ids-" +const mockTextMapBaggagePrefix = "mockpfx-baggage-" + +var emptyContext = MockSpanContext{} + +// Injector is responsible for injecting SpanContext instances in a manner suitable +// for propagation via a format-specific "carrier" object. Typically the +// injection will take place across an RPC boundary, but message queues and +// other IPC mechanisms are also reasonable places to use an Injector. +type Injector interface { + // Inject takes `SpanContext` and injects it into `carrier`. The actual type + // of `carrier` depends on the `format` passed to `Tracer.Inject()`. + // + // Implementations may return opentracing.ErrInvalidCarrier or any other + // implementation-specific error if injection fails. + Inject(ctx MockSpanContext, carrier interface{}) error +} + +// Extractor is responsible for extracting SpanContext instances from a +// format-specific "carrier" object. Typically the extraction will take place +// on the server side of an RPC boundary, but message queues and other IPC +// mechanisms are also reasonable places to use an Extractor. +type Extractor interface { + // Extract decodes a SpanContext instance from the given `carrier`, + // or (nil, opentracing.ErrSpanContextNotFound) if no context could + // be found in the `carrier`. + Extract(carrier interface{}) (MockSpanContext, error) +} + +// TextMapPropagator implements Injector/Extractor for TextMap and HTTPHeaders formats. +type TextMapPropagator struct { + HTTPHeaders bool +} + +// Inject implements the Injector interface +func (t *TextMapPropagator) Inject(spanContext MockSpanContext, carrier interface{}) error { + writer, ok := carrier.(opentracing.TextMapWriter) + if !ok { + return opentracing.ErrInvalidCarrier + } + // Ids: + writer.Set(mockTextMapIdsPrefix+"traceid", strconv.Itoa(spanContext.TraceID)) + writer.Set(mockTextMapIdsPrefix+"spanid", strconv.Itoa(spanContext.SpanID)) + writer.Set(mockTextMapIdsPrefix+"sampled", fmt.Sprint(spanContext.Sampled)) + // Baggage: + for baggageKey, baggageVal := range spanContext.Baggage { + safeVal := baggageVal + if t.HTTPHeaders { + safeVal = url.QueryEscape(baggageVal) + } + writer.Set(mockTextMapBaggagePrefix+baggageKey, safeVal) + } + return nil +} + +// Extract implements the Extractor interface +func (t *TextMapPropagator) Extract(carrier interface{}) (MockSpanContext, error) { + reader, ok := carrier.(opentracing.TextMapReader) + if !ok { + return emptyContext, opentracing.ErrInvalidCarrier + } + rval := MockSpanContext{0, 0, true, nil} + err := reader.ForeachKey(func(key, val string) error { + lowerKey := strings.ToLower(key) + switch { + case lowerKey == mockTextMapIdsPrefix+"traceid": + // Ids: + i, err := strconv.Atoi(val) + if err != nil { + return err + } + rval.TraceID = i + case lowerKey == mockTextMapIdsPrefix+"spanid": + // Ids: + i, err := strconv.Atoi(val) + if err != nil { + return err + } + rval.SpanID = i + case lowerKey == mockTextMapIdsPrefix+"sampled": + b, err := strconv.ParseBool(val) + if err != nil { + return err + } + rval.Sampled = b + case strings.HasPrefix(lowerKey, mockTextMapBaggagePrefix): + // Baggage: + if rval.Baggage == nil { + rval.Baggage = make(map[string]string) + } + safeVal := val + if t.HTTPHeaders { + // unescape errors are ignored, nothing can be done + if rawVal, err := url.QueryUnescape(val); err == nil { + safeVal = rawVal + } + } + rval.Baggage[lowerKey[len(mockTextMapBaggagePrefix):]] = safeVal + } + return nil + }) + if rval.TraceID == 0 || rval.SpanID == 0 { + return emptyContext, opentracing.ErrSpanContextNotFound + } + if err != nil { + return emptyContext, err + } + return rval, nil +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 598d73bdd57..c8078efab24 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -404,6 +404,7 @@ github.com/opentracing-contrib/go-stdlib/nethttp github.com/opentracing/opentracing-go github.com/opentracing/opentracing-go/ext github.com/opentracing/opentracing-go/log +github.com/opentracing/opentracing-go/mocktracer # github.com/pkg/errors v0.9.1 ## explicit github.com/pkg/errors From f0fb3bdcce982089a62cae4d4ec246ea42eaf556 Mon Sep 17 00:00:00 2001 From: Vladyslav Diachenko Date: Thu, 20 May 2021 20:23:07 +0300 Subject: [PATCH 8/9] #4147 fixed code style and removed redundant context param that is not used in callback function Signed-off-by: Vladyslav Diachenko --- CHANGELOG.md | 2 +- pkg/querier/tenantfederation/merge_queryable.go | 14 ++++++-------- 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1d5e7a43f29..b15ac711a91 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,9 +18,9 @@ * `cortex_alertmanager_state_persist_failed_total` * [ENHANCEMENT] Blocks storage: support ingesting exemplars. Enabled by setting new CLI flag `-blocks-storage.tsdb.max-exemplars=` or config option `blocks_storage.tsdb.max_exemplars` to positive value. #4124 * [ENHANCEMENT] Distributor: Added distributors ring status section in the admin page. #4151 +* [ENHANCEMENT] Added `tenant_ids` tag to tracing spans #4147 * [BUGFIX] Purger: fix `Invalid null value in condition for column range` caused by `nil` value in range for WriteBatch query. #4128 * [BUGFIX] Ingester: fixed infrequent panic caused by a race condition between TSDB mmap-ed head chunks truncation and queries. #4176 -* [ENHANCEMENT] Added `tenant_ids` tag to tracing spans ## Blocksconvert diff --git a/pkg/querier/tenantfederation/merge_queryable.go b/pkg/querier/tenantfederation/merge_queryable.go index 1185fe53d34..7ef00683c89 100644 --- a/pkg/querier/tenantfederation/merge_queryable.go +++ b/pkg/querier/tenantfederation/merge_queryable.go @@ -137,7 +137,7 @@ type mergeQuerier struct { // For the label "original_" + `idLabelName it will return all the values // of the underlying queriers for `idLabelName`. func (m *mergeQuerier) LabelValues(name string, matchers ...*labels.Matcher) ([]string, storage.Warnings, error) { - log, ctx := spanlogger.New(m.ctx, "mergeQuerier.LabelValues") + log, _ := spanlogger.New(m.ctx, "mergeQuerier.LabelValues") defer log.Span.Finish() if name == m.idLabelName { return m.ids, nil, nil @@ -149,7 +149,7 @@ func (m *mergeQuerier) LabelValues(name string, matchers ...*labels.Matcher) ([] name = m.idLabelName } - return m.mergeDistinctStringSlice(ctx, func(ctx context.Context, + return m.mergeDistinctStringSlice(func(ctx context.Context, q storage.Querier) ([]string, storage.Warnings, error) { return q.LabelValues(name, matchers...) }) @@ -159,10 +159,9 @@ func (m *mergeQuerier) LabelValues(name string, matchers ...*labels.Matcher) ([] // queriers. It also adds the `idLabelName` and if present in the original // results the original `idLabelName`. func (m *mergeQuerier) LabelNames() ([]string, storage.Warnings, error) { - log, ctx := spanlogger.New(m.ctx, "mergeQuerier.LabelNames") + log, _ := spanlogger.New(m.ctx, "mergeQuerier.LabelNames") defer log.Span.Finish() - labelNames, warnings, err := m.mergeDistinctStringSlice(ctx, func(ctx context.Context, - q storage.Querier) ([]string, storage.Warnings, error) { + labelNames, warnings, err := m.mergeDistinctStringSlice(func(ctx context.Context, q storage.Querier) ([]string, storage.Warnings, error) { return q.LabelNames() }) if err != nil { @@ -206,8 +205,7 @@ type stringSliceFuncJob struct { // on per querier in parallel. It removes duplicates and sorts the result. It // doesn't require the output of the stringSliceFunc to be sorted, as results // of LabelValues are not sorted. -func (m *mergeQuerier) mergeDistinctStringSlice(ctx context.Context, - f stringSliceFunc) ([]string, storage.Warnings, error) { +func (m *mergeQuerier) mergeDistinctStringSlice(f stringSliceFunc) ([]string, storage.Warnings, error) { var jobs = make([]interface{}, len(m.ids)) for pos := range m.ids { @@ -232,7 +230,7 @@ func (m *mergeQuerier) mergeDistinctStringSlice(ctx context.Context, return nil } - err := concurrency.ForEach(ctx, jobs, maxConcurrency, run) + err := concurrency.ForEach(m.ctx, jobs, maxConcurrency, run) if err != nil { return nil, nil, err } From aed8dabf82c168ee8113dd49bc5a5cb4b64062ae Mon Sep 17 00:00:00 2001 From: Vladyslav Diachenko Date: Thu, 20 May 2021 20:44:06 +0300 Subject: [PATCH 9/9] #4147 fixed code style Signed-off-by: Vladyslav Diachenko --- pkg/querier/tenantfederation/merge_queryable.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/querier/tenantfederation/merge_queryable.go b/pkg/querier/tenantfederation/merge_queryable.go index 7ef00683c89..c05f9980171 100644 --- a/pkg/querier/tenantfederation/merge_queryable.go +++ b/pkg/querier/tenantfederation/merge_queryable.go @@ -149,8 +149,7 @@ func (m *mergeQuerier) LabelValues(name string, matchers ...*labels.Matcher) ([] name = m.idLabelName } - return m.mergeDistinctStringSlice(func(ctx context.Context, - q storage.Querier) ([]string, storage.Warnings, error) { + return m.mergeDistinctStringSlice(func(ctx context.Context, q storage.Querier) ([]string, storage.Warnings, error) { return q.LabelValues(name, matchers...) }) }