Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clock skew config #2119

Merged
merged 12 commits into from
Mar 10, 2020
11 changes: 1 addition & 10 deletions cmd/all-in-one/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ import (
ss "github.com/jaegertracing/jaeger/plugin/sampling/strategystore"
"github.com/jaegertracing/jaeger/plugin/storage"
"github.com/jaegertracing/jaeger/ports"
istorage "github.com/jaegertracing/jaeger/storage"
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage/spanstore"
storageMetrics "github.com/jaegertracing/jaeger/storage/spanstore/metrics"
Expand Down Expand Up @@ -148,7 +147,7 @@ by default uses only in-memory database.`,

// query
querySrv := startQuery(
svc, qOpts, archiveOptions(storageFactory, logger),
svc, qOpts, qOpts.BuildQueryServiceOptions(storageFactory, logger),
spanReader, dependencyReader,
rootMetricsFactory, metricsFactory,
)
Expand Down Expand Up @@ -232,14 +231,6 @@ func startQuery(
return server
}

func archiveOptions(storageFactory istorage.Factory, logger *zap.Logger) *querysvc.QueryServiceOptions {
opts := &querysvc.QueryServiceOptions{}
if !opts.InitArchiveStorage(storageFactory, logger) {
logger.Info("Archive storage not initialized")
}
return opts
}

func initTracer(metricsFactory metrics.Factory, logger *zap.Logger) io.Closer {
traceCfg := &jaegerClientConfig.Configuration{
ServiceName: "jaeger-query",
Expand Down
33 changes: 27 additions & 6 deletions cmd/query/app/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,26 @@ import (
"net/http"
"net/textproto"
"strings"
"time"

"github.com/spf13/viper"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/query/app/querysvc"
"github.com/jaegertracing/jaeger/model/adjuster"
"github.com/jaegertracing/jaeger/pkg/config"
"github.com/jaegertracing/jaeger/ports"
"github.com/jaegertracing/jaeger/storage"
)

const (
queryPort = "query.port"
queryBasePath = "query.base-path"
queryStaticFiles = "query.static-files"
queryUIConfig = "query.ui-config"
queryTokenPropagation = "query.bearer-token-propagation"
queryAdditionalHeaders = "query.additional-headers"
queryPort = "query.port"
queryBasePath = "query.base-path"
queryStaticFiles = "query.static-files"
queryUIConfig = "query.ui-config"
queryTokenPropagation = "query.bearer-token-propagation"
queryAdditionalHeaders = "query.additional-headers"
queryMaxClockSkewAdjust = "query.max-clock-skew-adjustment"
)

// QueryOptions holds configuration for query service
Expand All @@ -54,6 +59,8 @@ type QueryOptions struct {
BearerTokenPropagation bool
// AdditionalHeaders
AdditionalHeaders http.Header
// MaxClockSkewAdjust is the maximum duration by which jaeger-query will adjust a span
MaxClockSkewAdjust time.Duration
}

// AddFlags adds flags for QueryOptions
Expand All @@ -64,6 +71,7 @@ func AddFlags(flagSet *flag.FlagSet) {
flagSet.String(queryStaticFiles, "", "The directory path override for the static assets for the UI")
flagSet.String(queryUIConfig, "", "The path to the UI configuration file in JSON format")
flagSet.Bool(queryTokenPropagation, false, "Allow propagation of bearer token to be used by storage plugins")
flagSet.Duration(queryMaxClockSkewAdjust, time.Second, "The maximum delta by which span timestamps may be adjusted in the UI due to clock skew; set to 0s to disable clock skew adjustments")
}

// InitFromViper initializes QueryOptions with properties from viper
Expand All @@ -73,6 +81,7 @@ func (qOpts *QueryOptions) InitFromViper(v *viper.Viper, logger *zap.Logger) *Qu
qOpts.StaticAssets = v.GetString(queryStaticFiles)
qOpts.UIConfig = v.GetString(queryUIConfig)
qOpts.BearerTokenPropagation = v.GetBool(queryTokenPropagation)
qOpts.MaxClockSkewAdjust = v.GetDuration(queryMaxClockSkewAdjust)

stringSlice := v.GetStringSlice(queryAdditionalHeaders)
headers, err := stringSliceAsHeader(stringSlice)
Expand All @@ -84,6 +93,18 @@ func (qOpts *QueryOptions) InitFromViper(v *viper.Viper, logger *zap.Logger) *Qu
return qOpts
}

// BuildQueryServiceOptions creates a QueryServiceOptions struct with appropriate adjusters and archive config
func (qOpts *QueryOptions) BuildQueryServiceOptions(storageFactory storage.Factory, logger *zap.Logger) *querysvc.QueryServiceOptions {
opts := &querysvc.QueryServiceOptions{}
if !opts.InitArchiveStorage(storageFactory, logger) {
logger.Info("Archive storage not initialized")
}

opts.Adjuster = adjuster.Sequence(querysvc.StandardAdjusters(qOpts.MaxClockSkewAdjust)...)

return opts
}

// stringSliceAsHeader parses a slice of strings and returns a http.Header.
// Each string in the slice is expected to be in the format "key: value"
func stringSliceAsHeader(slice []string) (http.Header, error) {
Expand Down
34 changes: 34 additions & 0 deletions cmd/query/app/flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@ package app
import (
"net/http"
"testing"
"time"

"github.com/stretchr/testify/assert"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/pkg/config"
"github.com/jaegertracing/jaeger/storage/mocks"
spanstore_mocks "github.com/jaegertracing/jaeger/storage/spanstore/mocks"
)

func TestQueryBuilderFlags(t *testing.T) {
Expand All @@ -34,6 +37,7 @@ func TestQueryBuilderFlags(t *testing.T) {
"--query.port=80",
"--query.additional-headers=access-control-allow-origin:blerg",
"--query.additional-headers=whatever:thing",
"--query.max-clock-skew-adjustment=10s",
})
qOpts := new(QueryOptions).InitFromViper(v, zap.NewNop())
assert.Equal(t, "/dev/null", qOpts.StaticAssets)
Expand All @@ -44,6 +48,7 @@ func TestQueryBuilderFlags(t *testing.T) {
"Access-Control-Allow-Origin": []string{"blerg"},
"Whatever": []string{"thing"},
}, qOpts.AdditionalHeaders)
assert.Equal(t, 10*time.Second, qOpts.MaxClockSkewAdjust)
}

func TestQueryBuilderBadHeadersFlags(t *testing.T) {
Expand Down Expand Up @@ -81,3 +86,32 @@ func TestStringSliceAsHeader(t *testing.T) {
assert.Nil(t, parsedHeaders)
assert.NoError(t, err)
}

func TestBuildQueryServiceOptions(t *testing.T) {
v, _ := config.Viperize(AddFlags)
qOpts := new(QueryOptions).InitFromViper(v, zap.NewNop())
assert.NotNil(t, qOpts)

qSvcOpts := qOpts.BuildQueryServiceOptions(&mocks.Factory{}, zap.NewNop())
assert.NotNil(t, qSvcOpts)
assert.NotNil(t, qSvcOpts.Adjuster)
assert.Nil(t, qSvcOpts.ArchiveSpanReader)
assert.Nil(t, qSvcOpts.ArchiveSpanWriter)

comboFactory := struct {
*mocks.Factory
*mocks.ArchiveFactory
}{
&mocks.Factory{},
&mocks.ArchiveFactory{},
}

comboFactory.ArchiveFactory.On("CreateArchiveSpanReader").Return(&spanstore_mocks.Reader{}, nil)
comboFactory.ArchiveFactory.On("CreateArchiveSpanWriter").Return(&spanstore_mocks.Writer{}, nil)

qSvcOpts = qOpts.BuildQueryServiceOptions(comboFactory, zap.NewNop())
assert.NotNil(t, qSvcOpts)
assert.NotNil(t, qSvcOpts.Adjuster)
assert.NotNil(t, qSvcOpts.ArchiveSpanReader)
assert.NotNil(t, qSvcOpts.ArchiveSpanWriter)
}
16 changes: 10 additions & 6 deletions cmd/query/app/querysvc/adjusters.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,19 @@
package querysvc

import (
"time"

"github.com/jaegertracing/jaeger/model/adjuster"
)

// StandardAdjusters is a list of model adjusters applied by the query service
// before returning the data to the API clients.
var StandardAdjusters = []adjuster.Adjuster{
adjuster.SpanIDDeduper(),
adjuster.ClockSkew(),
adjuster.IPTagAdjuster(),
adjuster.SortLogFields(),
adjuster.SpanReferences(),
func StandardAdjusters(maxClockSkewAdjust time.Duration) []adjuster.Adjuster {
return []adjuster.Adjuster{
adjuster.SpanIDDeduper(),
adjuster.ClockSkew(maxClockSkewAdjust),
adjuster.IPTagAdjuster(),
adjuster.SortLogFields(),
adjuster.SpanReferences(),
}
}
6 changes: 5 additions & 1 deletion cmd/query/app/querysvc/query_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ var (
errNoArchiveSpanStorage = errors.New("archive span storage was not configured")
)

const (
defaultMaxClockSkewAdjust = time.Second
)

// QueryServiceOptions has optional members of QueryService
type QueryServiceOptions struct {
ArchiveSpanReader spanstore.Reader
Expand All @@ -56,7 +60,7 @@ func NewQueryService(spanReader spanstore.Reader, dependencyReader dependencysto
}

if qsvc.options.Adjuster == nil {
qsvc.options.Adjuster = adjuster.Sequence(StandardAdjusters...)
qsvc.options.Adjuster = adjuster.Sequence(StandardAdjusters(defaultMaxClockSkewAdjust)...)
}
return qsvc
}
Expand Down
4 changes: 3 additions & 1 deletion cmd/query/app/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@

package app

import "github.com/jaegertracing/jaeger/storage/spanstore"
import (
"github.com/jaegertracing/jaeger/storage/spanstore"
)

func getUniqueOperationNames(operations []spanstore.Operation) []string {
// only return unique operation names
Expand Down
11 changes: 1 addition & 10 deletions cmd/query/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import (
"github.com/jaegertracing/jaeger/pkg/version"
"github.com/jaegertracing/jaeger/plugin/storage"
"github.com/jaegertracing/jaeger/ports"
istorage "github.com/jaegertracing/jaeger/storage"
"github.com/jaegertracing/jaeger/storage/spanstore"
storageMetrics "github.com/jaegertracing/jaeger/storage/spanstore/metrics"
)
Expand Down Expand Up @@ -101,7 +100,7 @@ func main() {
if err != nil {
logger.Fatal("Failed to create dependency reader", zap.Error(err))
}
queryServiceOptions := archiveOptions(storageFactory, logger)
queryServiceOptions := queryOpts.BuildQueryServiceOptions(storageFactory, logger)
queryService := querysvc.NewQueryService(
spanReader,
dependencyReader,
Expand Down Expand Up @@ -137,11 +136,3 @@ func main() {
os.Exit(1)
}
}

func archiveOptions(storageFactory istorage.Factory, logger *zap.Logger) *querysvc.QueryServiceOptions {
opts := &querysvc.QueryServiceOptions{}
if !opts.InitArchiveStorage(storageFactory, logger) {
logger.Info("Archive storage not initialized")
}
return opts
}
32 changes: 27 additions & 5 deletions model/adjuster/clockskew.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,11 @@ import (
//
// This adjuster never returns any errors. Instead it records any issues
// it encounters in Span.Warnings.
func ClockSkew() Adjuster {
func ClockSkew(maxDelta time.Duration) Adjuster {
return Func(func(trace *model.Trace) (*model.Trace, error) {
adjuster := &clockSkewAdjuster{
trace: trace,
trace: trace,
maxDelta: maxDelta,
}
adjuster.buildNodesMap()
adjuster.buildSubGraphs()
Expand All @@ -52,12 +53,15 @@ func ClockSkew() Adjuster {
const (
warningDuplicateSpanID = "duplicate span IDs; skipping clock skew adjustment"
warningFormatInvalidParentID = "invalid parent span IDs=%s; skipping clock skew adjustment"
warningMaxDeltaExceeded = "max clock skew adjustment delta of %v exceeded; not applying calculated delta of %v"
warningSkewAdjustDisabled = "clock skew adjustment disabled; not applying calculated delta of %v"
)

type clockSkewAdjuster struct {
trace *model.Trace
spans map[model.SpanID]*node
roots map[model.SpanID]*node
trace *model.Trace
spans map[model.SpanID]*node
roots map[model.SpanID]*node
maxDelta time.Duration
}

type clockSkew struct {
Expand Down Expand Up @@ -177,10 +181,28 @@ func (a *clockSkewAdjuster) adjustTimestamps(n *node, skew clockSkew) {
return
}

if absDuration(skew.delta) > a.maxDelta {
if a.maxDelta == 0 {
n.span.Warnings = append(n.span.Warnings, fmt.Sprintf(warningSkewAdjustDisabled, skew.delta))
return
}

n.span.Warnings = append(n.span.Warnings, fmt.Sprintf(warningMaxDeltaExceeded, a.maxDelta, skew.delta))
return
}

n.span.StartTime = n.span.StartTime.Add(skew.delta)
n.span.Warnings = append(n.span.Warnings, fmt.Sprintf("This span's timestamps were adjusted by %v", skew.delta))

for i := range n.span.Logs {
n.span.Logs[i].Timestamp = n.span.Logs[i].Timestamp.Add(skew.delta)
}
}

func absDuration(d time.Duration) time.Duration {
if d < 0 {
return -1 * d
}

return d
}
32 changes: 31 additions & 1 deletion model/adjuster/clockskew_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ func TestClockSkewAdjuster(t *testing.T) {
description string
trace []spanProto
err string
maxAdjust time.Duration
}{
{
description: "single span with bad parent",
Expand Down Expand Up @@ -128,6 +129,32 @@ func TestClockSkewAdjuster(t *testing.T) {
{id: 2, parent: 1, startTime: 20, duration: 150, host: "b", adjusted: 20},
},
},
{
description: "do not apply positive adjustment due to max skew adjustment",
trace: []spanProto{
{id: 1, parent: 0, startTime: 10, duration: 100, host: "a", adjusted: 10},
{id: 2, parent: 1, startTime: 0, duration: 50, host: "b", adjusted: 0},
},
maxAdjust: 10 * time.Millisecond,
err: "max clock skew adjustment delta of 10ms exceeded; not applying calculated delta of 35ms",
},
{
description: "do not apply negative adjustment due to max skew adjustment",
trace: []spanProto{
{id: 1, parent: 0, startTime: 10, duration: 100, host: "a", adjusted: 10},
{id: 2, parent: 1, startTime: 80, duration: 50, host: "b", adjusted: 80},
},
maxAdjust: 10 * time.Millisecond,
err: "max clock skew adjustment delta of 10ms exceeded; not applying calculated delta of -45ms",
},
{
description: "do not apply adjustment due to disabled adjustment",
trace: []spanProto{
{id: 1, parent: 0, startTime: 10, duration: 100, host: "a", adjusted: 10},
{id: 2, parent: 1, startTime: 0, duration: 50, host: "b", adjusted: 0},
},
err: "clock skew adjustment disabled; not applying calculated delta of 35ms",
},
{
description: "adjust child starting before parent",
trace: []spanProto{
Expand All @@ -137,13 +164,15 @@ func TestClockSkewAdjuster(t *testing.T) {
{id: 2, parent: 1, startTime: 0, duration: 50, host: "b", adjusted: 35,
logs: []int{5, 10}, adjustedLogs: []int{40, 45}},
},
maxAdjust: time.Second,
},
{
description: "adjust child starting before parent even if it is longer",
trace: []spanProto{
{id: 1, parent: 0, startTime: 10, duration: 100, host: "a", adjusted: 10},
{id: 2, parent: 1, startTime: 0, duration: 150, host: "b", adjusted: 10},
},
maxAdjust: time.Second,
},
{
description: "adjust child ending after parent but being shorter",
Expand All @@ -157,13 +186,14 @@ func TestClockSkewAdjuster(t *testing.T) {
{id: 3, parent: 2, startTime: 60, duration: 20, host: "b", adjusted: 35,
logs: []int{65, 70}, adjustedLogs: []int{40, 45}},
},
maxAdjust: time.Second,
},
}

for _, tt := range testCases {
testCase := tt // capture loop var
t.Run(testCase.description, func(t *testing.T) {
adjuster := ClockSkew()
adjuster := ClockSkew(tt.maxAdjust)
trace, err := adjuster.Adjust(makeTrace(testCase.trace))
assert.NoError(t, err)
if testCase.err != "" {
Expand Down