diff --git a/cmd/all-in-one/main.go b/cmd/all-in-one/main.go index 19275431c8d..0a9b79ed01f 100644 --- a/cmd/all-in-one/main.go +++ b/cmd/all-in-one/main.go @@ -45,7 +45,6 @@ import ( ss "github.com/jaegertracing/jaeger/plugin/sampling/strategystore" "github.com/jaegertracing/jaeger/plugin/storage" "github.com/jaegertracing/jaeger/ports" - istorage "github.com/jaegertracing/jaeger/storage" "github.com/jaegertracing/jaeger/storage/dependencystore" "github.com/jaegertracing/jaeger/storage/spanstore" storageMetrics "github.com/jaegertracing/jaeger/storage/spanstore/metrics" @@ -148,7 +147,7 @@ by default uses only in-memory database.`, // query querySrv := startQuery( - svc, qOpts, archiveOptions(storageFactory, logger), + svc, qOpts, qOpts.BuildQueryServiceOptions(storageFactory, logger), spanReader, dependencyReader, rootMetricsFactory, metricsFactory, ) @@ -232,14 +231,6 @@ func startQuery( return server } -func archiveOptions(storageFactory istorage.Factory, logger *zap.Logger) *querysvc.QueryServiceOptions { - opts := &querysvc.QueryServiceOptions{} - if !opts.InitArchiveStorage(storageFactory, logger) { - logger.Info("Archive storage not initialized") - } - return opts -} - func initTracer(metricsFactory metrics.Factory, logger *zap.Logger) io.Closer { traceCfg := &jaegerClientConfig.Configuration{ ServiceName: "jaeger-query", diff --git a/cmd/query/app/flags.go b/cmd/query/app/flags.go index 64d4a2ae9c8..aec4c4899c6 100644 --- a/cmd/query/app/flags.go +++ b/cmd/query/app/flags.go @@ -23,21 +23,26 @@ import ( "net/http" "net/textproto" "strings" + "time" "github.com/spf13/viper" "go.uber.org/zap" + "github.com/jaegertracing/jaeger/cmd/query/app/querysvc" + "github.com/jaegertracing/jaeger/model/adjuster" "github.com/jaegertracing/jaeger/pkg/config" "github.com/jaegertracing/jaeger/ports" + "github.com/jaegertracing/jaeger/storage" ) const ( - queryPort = "query.port" - queryBasePath = "query.base-path" - queryStaticFiles = "query.static-files" - queryUIConfig = "query.ui-config" - queryTokenPropagation = "query.bearer-token-propagation" - queryAdditionalHeaders = "query.additional-headers" + queryPort = "query.port" + queryBasePath = "query.base-path" + queryStaticFiles = "query.static-files" + queryUIConfig = "query.ui-config" + queryTokenPropagation = "query.bearer-token-propagation" + queryAdditionalHeaders = "query.additional-headers" + queryMaxClockSkewAdjust = "query.max-clock-skew-adjustment" ) // QueryOptions holds configuration for query service @@ -54,6 +59,8 @@ type QueryOptions struct { BearerTokenPropagation bool // AdditionalHeaders AdditionalHeaders http.Header + // MaxClockSkewAdjust is the maximum duration by which jaeger-query will adjust a span + MaxClockSkewAdjust time.Duration } // AddFlags adds flags for QueryOptions @@ -64,6 +71,7 @@ func AddFlags(flagSet *flag.FlagSet) { flagSet.String(queryStaticFiles, "", "The directory path override for the static assets for the UI") flagSet.String(queryUIConfig, "", "The path to the UI configuration file in JSON format") flagSet.Bool(queryTokenPropagation, false, "Allow propagation of bearer token to be used by storage plugins") + flagSet.Duration(queryMaxClockSkewAdjust, time.Second, "The maximum delta by which span timestamps may be adjusted in the UI due to clock skew; set to 0s to disable clock skew adjustments") } // InitFromViper initializes QueryOptions with properties from viper @@ -73,6 +81,7 @@ func (qOpts *QueryOptions) InitFromViper(v *viper.Viper, logger *zap.Logger) *Qu qOpts.StaticAssets = v.GetString(queryStaticFiles) qOpts.UIConfig = v.GetString(queryUIConfig) qOpts.BearerTokenPropagation = v.GetBool(queryTokenPropagation) + qOpts.MaxClockSkewAdjust = v.GetDuration(queryMaxClockSkewAdjust) stringSlice := v.GetStringSlice(queryAdditionalHeaders) headers, err := stringSliceAsHeader(stringSlice) @@ -84,6 +93,18 @@ func (qOpts *QueryOptions) InitFromViper(v *viper.Viper, logger *zap.Logger) *Qu return qOpts } +// BuildQueryServiceOptions creates a QueryServiceOptions struct with appropriate adjusters and archive config +func (qOpts *QueryOptions) BuildQueryServiceOptions(storageFactory storage.Factory, logger *zap.Logger) *querysvc.QueryServiceOptions { + opts := &querysvc.QueryServiceOptions{} + if !opts.InitArchiveStorage(storageFactory, logger) { + logger.Info("Archive storage not initialized") + } + + opts.Adjuster = adjuster.Sequence(querysvc.StandardAdjusters(qOpts.MaxClockSkewAdjust)...) + + return opts +} + // stringSliceAsHeader parses a slice of strings and returns a http.Header. // Each string in the slice is expected to be in the format "key: value" func stringSliceAsHeader(slice []string) (http.Header, error) { diff --git a/cmd/query/app/flags_test.go b/cmd/query/app/flags_test.go index 4308385e12f..3f07c56e1fc 100644 --- a/cmd/query/app/flags_test.go +++ b/cmd/query/app/flags_test.go @@ -18,11 +18,14 @@ package app import ( "net/http" "testing" + "time" "github.com/stretchr/testify/assert" "go.uber.org/zap" "github.com/jaegertracing/jaeger/pkg/config" + "github.com/jaegertracing/jaeger/storage/mocks" + spanstore_mocks "github.com/jaegertracing/jaeger/storage/spanstore/mocks" ) func TestQueryBuilderFlags(t *testing.T) { @@ -34,6 +37,7 @@ func TestQueryBuilderFlags(t *testing.T) { "--query.port=80", "--query.additional-headers=access-control-allow-origin:blerg", "--query.additional-headers=whatever:thing", + "--query.max-clock-skew-adjustment=10s", }) qOpts := new(QueryOptions).InitFromViper(v, zap.NewNop()) assert.Equal(t, "/dev/null", qOpts.StaticAssets) @@ -44,6 +48,7 @@ func TestQueryBuilderFlags(t *testing.T) { "Access-Control-Allow-Origin": []string{"blerg"}, "Whatever": []string{"thing"}, }, qOpts.AdditionalHeaders) + assert.Equal(t, 10*time.Second, qOpts.MaxClockSkewAdjust) } func TestQueryBuilderBadHeadersFlags(t *testing.T) { @@ -81,3 +86,32 @@ func TestStringSliceAsHeader(t *testing.T) { assert.Nil(t, parsedHeaders) assert.NoError(t, err) } + +func TestBuildQueryServiceOptions(t *testing.T) { + v, _ := config.Viperize(AddFlags) + qOpts := new(QueryOptions).InitFromViper(v, zap.NewNop()) + assert.NotNil(t, qOpts) + + qSvcOpts := qOpts.BuildQueryServiceOptions(&mocks.Factory{}, zap.NewNop()) + assert.NotNil(t, qSvcOpts) + assert.NotNil(t, qSvcOpts.Adjuster) + assert.Nil(t, qSvcOpts.ArchiveSpanReader) + assert.Nil(t, qSvcOpts.ArchiveSpanWriter) + + comboFactory := struct { + *mocks.Factory + *mocks.ArchiveFactory + }{ + &mocks.Factory{}, + &mocks.ArchiveFactory{}, + } + + comboFactory.ArchiveFactory.On("CreateArchiveSpanReader").Return(&spanstore_mocks.Reader{}, nil) + comboFactory.ArchiveFactory.On("CreateArchiveSpanWriter").Return(&spanstore_mocks.Writer{}, nil) + + qSvcOpts = qOpts.BuildQueryServiceOptions(comboFactory, zap.NewNop()) + assert.NotNil(t, qSvcOpts) + assert.NotNil(t, qSvcOpts.Adjuster) + assert.NotNil(t, qSvcOpts.ArchiveSpanReader) + assert.NotNil(t, qSvcOpts.ArchiveSpanWriter) +} diff --git a/cmd/query/app/querysvc/adjusters.go b/cmd/query/app/querysvc/adjusters.go index 29fa1a3721d..20691047707 100644 --- a/cmd/query/app/querysvc/adjusters.go +++ b/cmd/query/app/querysvc/adjusters.go @@ -16,15 +16,19 @@ package querysvc import ( + "time" + "github.com/jaegertracing/jaeger/model/adjuster" ) // StandardAdjusters is a list of model adjusters applied by the query service // before returning the data to the API clients. -var StandardAdjusters = []adjuster.Adjuster{ - adjuster.SpanIDDeduper(), - adjuster.ClockSkew(), - adjuster.IPTagAdjuster(), - adjuster.SortLogFields(), - adjuster.SpanReferences(), +func StandardAdjusters(maxClockSkewAdjust time.Duration) []adjuster.Adjuster { + return []adjuster.Adjuster{ + adjuster.SpanIDDeduper(), + adjuster.ClockSkew(maxClockSkewAdjust), + adjuster.IPTagAdjuster(), + adjuster.SortLogFields(), + adjuster.SpanReferences(), + } } diff --git a/cmd/query/app/querysvc/query_service.go b/cmd/query/app/querysvc/query_service.go index 43cdbf2d1ee..55e52b4d6da 100644 --- a/cmd/query/app/querysvc/query_service.go +++ b/cmd/query/app/querysvc/query_service.go @@ -33,6 +33,10 @@ var ( errNoArchiveSpanStorage = errors.New("archive span storage was not configured") ) +const ( + defaultMaxClockSkewAdjust = time.Second +) + // QueryServiceOptions has optional members of QueryService type QueryServiceOptions struct { ArchiveSpanReader spanstore.Reader @@ -56,7 +60,7 @@ func NewQueryService(spanReader spanstore.Reader, dependencyReader dependencysto } if qsvc.options.Adjuster == nil { - qsvc.options.Adjuster = adjuster.Sequence(StandardAdjusters...) + qsvc.options.Adjuster = adjuster.Sequence(StandardAdjusters(defaultMaxClockSkewAdjust)...) } return qsvc } diff --git a/cmd/query/app/util.go b/cmd/query/app/util.go index 9b8242324a1..29e1bc7f7b8 100644 --- a/cmd/query/app/util.go +++ b/cmd/query/app/util.go @@ -14,7 +14,9 @@ package app -import "github.com/jaegertracing/jaeger/storage/spanstore" +import ( + "github.com/jaegertracing/jaeger/storage/spanstore" +) func getUniqueOperationNames(operations []spanstore.Operation) []string { // only return unique operation names diff --git a/cmd/query/main.go b/cmd/query/main.go index d14d4f2626b..88d05d7c9d8 100644 --- a/cmd/query/main.go +++ b/cmd/query/main.go @@ -38,7 +38,6 @@ import ( "github.com/jaegertracing/jaeger/pkg/version" "github.com/jaegertracing/jaeger/plugin/storage" "github.com/jaegertracing/jaeger/ports" - istorage "github.com/jaegertracing/jaeger/storage" "github.com/jaegertracing/jaeger/storage/spanstore" storageMetrics "github.com/jaegertracing/jaeger/storage/spanstore/metrics" ) @@ -101,7 +100,7 @@ func main() { if err != nil { logger.Fatal("Failed to create dependency reader", zap.Error(err)) } - queryServiceOptions := archiveOptions(storageFactory, logger) + queryServiceOptions := queryOpts.BuildQueryServiceOptions(storageFactory, logger) queryService := querysvc.NewQueryService( spanReader, dependencyReader, @@ -137,11 +136,3 @@ func main() { os.Exit(1) } } - -func archiveOptions(storageFactory istorage.Factory, logger *zap.Logger) *querysvc.QueryServiceOptions { - opts := &querysvc.QueryServiceOptions{} - if !opts.InitArchiveStorage(storageFactory, logger) { - logger.Info("Archive storage not initialized") - } - return opts -} diff --git a/model/adjuster/clockskew.go b/model/adjuster/clockskew.go index 89a09ea2527..167590d862d 100644 --- a/model/adjuster/clockskew.go +++ b/model/adjuster/clockskew.go @@ -34,10 +34,11 @@ import ( // // This adjuster never returns any errors. Instead it records any issues // it encounters in Span.Warnings. -func ClockSkew() Adjuster { +func ClockSkew(maxDelta time.Duration) Adjuster { return Func(func(trace *model.Trace) (*model.Trace, error) { adjuster := &clockSkewAdjuster{ - trace: trace, + trace: trace, + maxDelta: maxDelta, } adjuster.buildNodesMap() adjuster.buildSubGraphs() @@ -52,12 +53,15 @@ func ClockSkew() Adjuster { const ( warningDuplicateSpanID = "duplicate span IDs; skipping clock skew adjustment" warningFormatInvalidParentID = "invalid parent span IDs=%s; skipping clock skew adjustment" + warningMaxDeltaExceeded = "max clock skew adjustment delta of %v exceeded; not applying calculated delta of %v" + warningSkewAdjustDisabled = "clock skew adjustment disabled; not applying calculated delta of %v" ) type clockSkewAdjuster struct { - trace *model.Trace - spans map[model.SpanID]*node - roots map[model.SpanID]*node + trace *model.Trace + spans map[model.SpanID]*node + roots map[model.SpanID]*node + maxDelta time.Duration } type clockSkew struct { @@ -177,6 +181,16 @@ func (a *clockSkewAdjuster) adjustTimestamps(n *node, skew clockSkew) { return } + if absDuration(skew.delta) > a.maxDelta { + if a.maxDelta == 0 { + n.span.Warnings = append(n.span.Warnings, fmt.Sprintf(warningSkewAdjustDisabled, skew.delta)) + return + } + + n.span.Warnings = append(n.span.Warnings, fmt.Sprintf(warningMaxDeltaExceeded, a.maxDelta, skew.delta)) + return + } + n.span.StartTime = n.span.StartTime.Add(skew.delta) n.span.Warnings = append(n.span.Warnings, fmt.Sprintf("This span's timestamps were adjusted by %v", skew.delta)) @@ -184,3 +198,11 @@ func (a *clockSkewAdjuster) adjustTimestamps(n *node, skew clockSkew) { n.span.Logs[i].Timestamp = n.span.Logs[i].Timestamp.Add(skew.delta) } } + +func absDuration(d time.Duration) time.Duration { + if d < 0 { + return -1 * d + } + + return d +} diff --git a/model/adjuster/clockskew_test.go b/model/adjuster/clockskew_test.go index 813d99199e1..3eaff5a14b6 100644 --- a/model/adjuster/clockskew_test.go +++ b/model/adjuster/clockskew_test.go @@ -78,6 +78,7 @@ func TestClockSkewAdjuster(t *testing.T) { description string trace []spanProto err string + maxAdjust time.Duration }{ { description: "single span with bad parent", @@ -128,6 +129,32 @@ func TestClockSkewAdjuster(t *testing.T) { {id: 2, parent: 1, startTime: 20, duration: 150, host: "b", adjusted: 20}, }, }, + { + description: "do not apply positive adjustment due to max skew adjustment", + trace: []spanProto{ + {id: 1, parent: 0, startTime: 10, duration: 100, host: "a", adjusted: 10}, + {id: 2, parent: 1, startTime: 0, duration: 50, host: "b", adjusted: 0}, + }, + maxAdjust: 10 * time.Millisecond, + err: "max clock skew adjustment delta of 10ms exceeded; not applying calculated delta of 35ms", + }, + { + description: "do not apply negative adjustment due to max skew adjustment", + trace: []spanProto{ + {id: 1, parent: 0, startTime: 10, duration: 100, host: "a", adjusted: 10}, + {id: 2, parent: 1, startTime: 80, duration: 50, host: "b", adjusted: 80}, + }, + maxAdjust: 10 * time.Millisecond, + err: "max clock skew adjustment delta of 10ms exceeded; not applying calculated delta of -45ms", + }, + { + description: "do not apply adjustment due to disabled adjustment", + trace: []spanProto{ + {id: 1, parent: 0, startTime: 10, duration: 100, host: "a", adjusted: 10}, + {id: 2, parent: 1, startTime: 0, duration: 50, host: "b", adjusted: 0}, + }, + err: "clock skew adjustment disabled; not applying calculated delta of 35ms", + }, { description: "adjust child starting before parent", trace: []spanProto{ @@ -137,6 +164,7 @@ func TestClockSkewAdjuster(t *testing.T) { {id: 2, parent: 1, startTime: 0, duration: 50, host: "b", adjusted: 35, logs: []int{5, 10}, adjustedLogs: []int{40, 45}}, }, + maxAdjust: time.Second, }, { description: "adjust child starting before parent even if it is longer", @@ -144,6 +172,7 @@ func TestClockSkewAdjuster(t *testing.T) { {id: 1, parent: 0, startTime: 10, duration: 100, host: "a", adjusted: 10}, {id: 2, parent: 1, startTime: 0, duration: 150, host: "b", adjusted: 10}, }, + maxAdjust: time.Second, }, { description: "adjust child ending after parent but being shorter", @@ -157,13 +186,14 @@ func TestClockSkewAdjuster(t *testing.T) { {id: 3, parent: 2, startTime: 60, duration: 20, host: "b", adjusted: 35, logs: []int{65, 70}, adjustedLogs: []int{40, 45}}, }, + maxAdjust: time.Second, }, } for _, tt := range testCases { testCase := tt // capture loop var t.Run(testCase.description, func(t *testing.T) { - adjuster := ClockSkew() + adjuster := ClockSkew(tt.maxAdjust) trace, err := adjuster.Adjust(makeTrace(testCase.trace)) assert.NoError(t, err) if testCase.err != "" {