diff --git a/pkg/es/config/config.go b/pkg/es/config/config.go index 38f3fc42f3f..8536ae8be41 100644 --- a/pkg/es/config/config.go +++ b/pkg/es/config/config.go @@ -44,33 +44,37 @@ import ( // Configuration describes the configuration properties needed to connect to an ElasticSearch cluster type Configuration struct { - Servers []string `mapstructure:"server_urls"` - RemoteReadClusters []string `mapstructure:"remote_read_clusters"` - Username string `mapstructure:"username"` - Password string `mapstructure:"password" json:"-"` - TokenFilePath string `mapstructure:"token_file"` - AllowTokenFromContext bool `mapstructure:"-"` - Sniffer bool `mapstructure:"sniffer"` // https://github.com/olivere/elastic/wiki/Sniffing - SnifferTLSEnabled bool `mapstructure:"sniffer_tls_enabled"` - MaxDocCount int `mapstructure:"-"` // Defines maximum number of results to fetch from storage per query - MaxSpanAge time.Duration `yaml:"max_span_age" mapstructure:"-"` // configures the maximum lookback on span reads - NumShards int64 `yaml:"shards" mapstructure:"num_shards"` - NumReplicas int64 `yaml:"replicas" mapstructure:"num_replicas"` - Timeout time.Duration `validate:"min=500" mapstructure:"-"` - BulkSize int `mapstructure:"-"` - BulkWorkers int `mapstructure:"-"` - BulkActions int `mapstructure:"-"` - BulkFlushInterval time.Duration `mapstructure:"-"` - IndexPrefix string `mapstructure:"index_prefix"` - IndexDateLayout string `mapstructure:"index_date_layout"` - Tags TagsAsFields `mapstructure:"tags_as_fields"` - Enabled bool `mapstructure:"-"` - TLS tlscfg.Options `mapstructure:"tls"` - UseReadWriteAliases bool `mapstructure:"use_aliases"` - CreateIndexTemplates bool `mapstructure:"create_mappings"` - UseILM bool `mapstructure:"use_ilm"` - Version uint `mapstructure:"version"` - LogLevel string `mapstructure:"log_level"` + Servers []string `mapstructure:"server_urls"` + RemoteReadClusters []string `mapstructure:"remote_read_clusters"` + Username string `mapstructure:"username"` + Password string `mapstructure:"password" json:"-"` + TokenFilePath string `mapstructure:"token_file"` + AllowTokenFromContext bool `mapstructure:"-"` + Sniffer bool `mapstructure:"sniffer"` // https://github.com/olivere/elastic/wiki/Sniffing + SnifferTLSEnabled bool `mapstructure:"sniffer_tls_enabled"` + MaxDocCount int `mapstructure:"-"` // Defines maximum number of results to fetch from storage per query + MaxSpanAge time.Duration `yaml:"max_span_age" mapstructure:"-"` // configures the maximum lookback on span reads + NumShards int64 `yaml:"shards" mapstructure:"num_shards"` + NumReplicas int64 `yaml:"replicas" mapstructure:"num_replicas"` + Timeout time.Duration `validate:"min=500" mapstructure:"-"` + BulkSize int `mapstructure:"-"` + BulkWorkers int `mapstructure:"-"` + BulkActions int `mapstructure:"-"` + BulkFlushInterval time.Duration `mapstructure:"-"` + IndexPrefix string `mapstructure:"index_prefix"` + IndexDateLayoutSpans string `mapstructure:"-"` + IndexDateLayoutServices string `mapstructure:"-"` + IndexDateLayoutDependencies string `mapstructure:"-"` + IndexRolloverFrequencySpans string `mapstructure:"-"` + IndexRolloverFrequencyServices string `mapstructure:"-"` + Tags TagsAsFields `mapstructure:"tags_as_fields"` + Enabled bool `mapstructure:"-"` + TLS tlscfg.Options `mapstructure:"tls"` + UseReadWriteAliases bool `mapstructure:"use_aliases"` + CreateIndexTemplates bool `mapstructure:"create_mappings"` + UseILM bool `mapstructure:"use_ilm"` + Version uint `mapstructure:"version"` + LogLevel string `mapstructure:"log_level"` } // TagsAsFields holds configuration for tag schema. @@ -96,7 +100,11 @@ type ClientBuilder interface { GetMaxSpanAge() time.Duration GetMaxDocCount() int GetIndexPrefix() string - GetIndexDateLayout() string + GetIndexDateLayoutSpans() string + GetIndexDateLayoutServices() string + GetIndexDateLayoutDependencies() string + GetIndexRolloverFrequencySpansDuration() time.Duration + GetIndexRolloverFrequencyServicesDuration() time.Duration GetTagsFilePath() string GetAllTagsAsFields() bool GetTagDotReplacement() string @@ -281,9 +289,35 @@ func (c *Configuration) GetIndexPrefix() string { return c.IndexPrefix } -// GetIndexDateLayout returns index date layout -func (c *Configuration) GetIndexDateLayout() string { - return c.IndexDateLayout +// GetIndexDateLayoutSpans returns jaeger-span index date layout +func (c *Configuration) GetIndexDateLayoutSpans() string { + return c.IndexDateLayoutSpans +} + +// GetIndexDateLayoutServices returns jaeger-service index date layout +func (c *Configuration) GetIndexDateLayoutServices() string { + return c.IndexDateLayoutServices +} + +// GetIndexDateLayoutDependencies returns jaeger-dependencies index date layout +func (c *Configuration) GetIndexDateLayoutDependencies() string { + return c.IndexDateLayoutDependencies +} + +// GetIndexRolloverFrequencySpansDuration returns jaeger-span index rollover frequency duration +func (c *Configuration) GetIndexRolloverFrequencySpansDuration() time.Duration { + if c.IndexRolloverFrequencySpans == "hour" { + return -1 * time.Hour + } + return -24 * time.Hour +} + +// GetIndexRolloverFrequencyServicesDuration returns jaeger-service index rollover frequency duration +func (c *Configuration) GetIndexRolloverFrequencyServicesDuration() time.Duration { + if c.IndexRolloverFrequencyServices == "hour" { + return -1 * time.Hour + } + return -24 * time.Hour } // GetTagsFilePath returns a path to file containing tag keys diff --git a/plugin/storage/es/factory.go b/plugin/storage/es/factory.go index 3877840750c..c7eebcfe596 100644 --- a/plugin/storage/es/factory.go +++ b/plugin/storage/es/factory.go @@ -110,7 +110,7 @@ func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) { // CreateDependencyReader implements storage.Factory func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) { reader := esDepStore.NewDependencyStore(f.primaryClient, f.logger, f.primaryConfig.GetIndexPrefix(), - f.primaryConfig.GetIndexDateLayout(), f.primaryConfig.GetMaxDocCount()) + f.primaryConfig.GetIndexDateLayoutDependencies(), f.primaryConfig.GetMaxDocCount()) return reader, nil } @@ -141,17 +141,20 @@ func createSpanReader( return nil, fmt.Errorf("--es.use-ilm must always be used in conjunction with --es.use-aliases to ensure ES writers and readers refer to the single index mapping") } return esSpanStore.NewSpanReader(esSpanStore.SpanReaderParams{ - Client: client, - Logger: logger, - MetricsFactory: mFactory, - MaxDocCount: cfg.GetMaxDocCount(), - MaxSpanAge: cfg.GetMaxSpanAge(), - IndexPrefix: cfg.GetIndexPrefix(), - IndexDateLayout: cfg.GetIndexDateLayout(), - TagDotReplacement: cfg.GetTagDotReplacement(), - UseReadWriteAliases: cfg.GetUseReadWriteAliases(), - Archive: archive, - RemoteReadClusters: cfg.GetRemoteReadClusters(), + Client: client, + Logger: logger, + MetricsFactory: mFactory, + MaxDocCount: cfg.GetMaxDocCount(), + MaxSpanAge: cfg.GetMaxSpanAge(), + IndexPrefix: cfg.GetIndexPrefix(), + SpanIndexDateLayout: cfg.GetIndexDateLayoutSpans(), + ServiceIndexDateLayout: cfg.GetIndexDateLayoutServices(), + SpanIndexRolloverFrequency: cfg.GetIndexRolloverFrequencySpansDuration(), + ServiceIndexRolloverFrequency: cfg.GetIndexRolloverFrequencyServicesDuration(), + TagDotReplacement: cfg.GetTagDotReplacement(), + UseReadWriteAliases: cfg.GetUseReadWriteAliases(), + Archive: archive, + RemoteReadClusters: cfg.GetRemoteReadClusters(), }), nil } @@ -186,16 +189,17 @@ func createSpanWriter( return nil, err } writer := esSpanStore.NewSpanWriter(esSpanStore.SpanWriterParams{ - Client: client, - Logger: logger, - MetricsFactory: mFactory, - IndexPrefix: cfg.GetIndexPrefix(), - IndexDateLayout: cfg.GetIndexDateLayout(), - AllTagsAsFields: cfg.GetAllTagsAsFields(), - TagKeysAsFields: tags, - TagDotReplacement: cfg.GetTagDotReplacement(), - Archive: archive, - UseReadWriteAliases: cfg.GetUseReadWriteAliases(), + Client: client, + Logger: logger, + MetricsFactory: mFactory, + IndexPrefix: cfg.GetIndexPrefix(), + SpanIndexDateLayout: cfg.GetIndexDateLayoutSpans(), + ServiceIndexDateLayout: cfg.GetIndexDateLayoutServices(), + AllTagsAsFields: cfg.GetAllTagsAsFields(), + TagKeysAsFields: tags, + TagDotReplacement: cfg.GetTagDotReplacement(), + Archive: archive, + UseReadWriteAliases: cfg.GetUseReadWriteAliases(), }) if cfg.IsCreateIndexTemplates() { err := writer.CreateTemplates(spanMapping, serviceMapping, cfg.GetIndexPrefix()) diff --git a/plugin/storage/es/options.go b/plugin/storage/es/options.go index da2702f3cd7..0fc0e2e087b 100644 --- a/plugin/storage/es/options.go +++ b/plugin/storage/es/options.go @@ -17,7 +17,6 @@ package es import ( "flag" - "fmt" "strings" "time" @@ -29,35 +28,37 @@ import ( ) const ( - suffixUsername = ".username" - suffixPassword = ".password" - suffixSniffer = ".sniffer" - suffixSnifferTLSEnabled = ".sniffer-tls-enabled" - suffixTokenPath = ".token-file" - suffixServerURLs = ".server-urls" - suffixRemoteReadClusters = ".remote-read-clusters" - suffixMaxSpanAge = ".max-span-age" - suffixNumShards = ".num-shards" - suffixNumReplicas = ".num-replicas" - suffixBulkSize = ".bulk.size" - suffixBulkWorkers = ".bulk.workers" - suffixBulkActions = ".bulk.actions" - suffixBulkFlushInterval = ".bulk.flush-interval" - suffixTimeout = ".timeout" - suffixIndexPrefix = ".index-prefix" - suffixIndexDateSeparator = ".index-date-separator" - suffixTagsAsFields = ".tags-as-fields" - suffixTagsAsFieldsAll = suffixTagsAsFields + ".all" - suffixTagsAsFieldsInclude = suffixTagsAsFields + ".include" - suffixTagsFile = suffixTagsAsFields + ".config-file" - suffixTagDeDotChar = suffixTagsAsFields + ".dot-replacement" - suffixReadAlias = ".use-aliases" - suffixUseILM = ".use-ilm" - suffixCreateIndexTemplate = ".create-index-templates" - suffixEnabled = ".enabled" - suffixVersion = ".version" - suffixMaxDocCount = ".max-doc-count" - suffixLogLevel = ".log-level" + suffixUsername = ".username" + suffixPassword = ".password" + suffixSniffer = ".sniffer" + suffixSnifferTLSEnabled = ".sniffer-tls-enabled" + suffixTokenPath = ".token-file" + suffixServerURLs = ".server-urls" + suffixRemoteReadClusters = ".remote-read-clusters" + suffixMaxSpanAge = ".max-span-age" + suffixNumShards = ".num-shards" + suffixNumReplicas = ".num-replicas" + suffixBulkSize = ".bulk.size" + suffixBulkWorkers = ".bulk.workers" + suffixBulkActions = ".bulk.actions" + suffixBulkFlushInterval = ".bulk.flush-interval" + suffixTimeout = ".timeout" + suffixIndexPrefix = ".index-prefix" + suffixIndexDateSeparator = ".index-date-separator" + suffixIndexRolloverFrequencySpans = ".index-rollover-frequency-spans" + suffixIndexRolloverFrequencyServices = ".index-rollover-frequency-services" + suffixTagsAsFields = ".tags-as-fields" + suffixTagsAsFieldsAll = suffixTagsAsFields + ".all" + suffixTagsAsFieldsInclude = suffixTagsAsFields + ".include" + suffixTagsFile = suffixTagsAsFields + ".config-file" + suffixTagDeDotChar = suffixTagsAsFields + ".dot-replacement" + suffixReadAlias = ".use-aliases" + suffixUseILM = ".use-ilm" + suffixCreateIndexTemplate = ".create-index-templates" + suffixEnabled = ".enabled" + suffixVersion = ".version" + suffixMaxDocCount = ".max-doc-count" + suffixLogLevel = ".log-level" // default number of documents to return from a query (elasticsearch allowed limit) // see search.max_buckets and index.max_result_window defaultMaxDocCount = 10_000 @@ -65,6 +66,8 @@ const ( defaultRemoteReadClusters = "" // default separator for Elasticsearch index date layout. defaultIndexDateSeparator = "-" + + defaultIndexRolloverFrequency = "day" ) // TODO this should be moved next to config.Configuration struct (maybe ./flags package) @@ -205,7 +208,17 @@ func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) { flagSet.String( nsConfig.namespace+suffixIndexDateSeparator, defaultIndexDateSeparator, - "Optional date separator of Jaeger indices. For example \".\" creates \"jaeger-span-2020.11.20 \".") + "Optional date separator of Jaeger indices. For example \".\" creates \"jaeger-span-2020.11.20\".") + flagSet.String( + nsConfig.namespace+suffixIndexRolloverFrequencySpans, + defaultIndexRolloverFrequency, + "Rotates jaeger-span indices over the given period. For example \"day\" creates \"jaeger-span-yyyy-MM-dd\" every day after UTC 12AM. Valid options: [hour, day]. "+ + "This does not delete old indices. For details on complete index management solutions supported by Jaeger, refer to: https://www.jaegertracing.io/docs/deployment/#elasticsearch-rollover") + flagSet.String( + nsConfig.namespace+suffixIndexRolloverFrequencyServices, + defaultIndexRolloverFrequency, + "Rotates jaeger-service indices over the given period. For example \"day\" creates \"jaeger-service-yyyy-MM-dd\" every day after UTC 12AM. Valid options: [hour, day]. "+ + "This does not delete old indices. For details on complete index management solutions supported by Jaeger, refer to: https://www.jaegertracing.io/docs/deployment/#elasticsearch-rollover") flagSet.Bool( nsConfig.namespace+suffixTagsAsFieldsAll, nsConfig.Tags.AllAsFields, @@ -295,7 +308,6 @@ func initFromViper(cfg *namespaceConfig, v *viper.Viper) { cfg.BulkFlushInterval = v.GetDuration(cfg.namespace + suffixBulkFlushInterval) cfg.Timeout = v.GetDuration(cfg.namespace + suffixTimeout) cfg.IndexPrefix = v.GetString(cfg.namespace + suffixIndexPrefix) - cfg.IndexDateLayout = initDateLayout(v.GetString(cfg.namespace + suffixIndexDateSeparator)) cfg.Tags.AllAsFields = v.GetBool(cfg.namespace + suffixTagsAsFieldsAll) cfg.Tags.Include = v.GetString(cfg.namespace + suffixTagsAsFieldsInclude) cfg.Tags.File = v.GetString(cfg.namespace + suffixTagsFile) @@ -317,6 +329,16 @@ func initFromViper(cfg *namespaceConfig, v *viper.Viper) { if len(remoteReadClusters) > 0 { cfg.RemoteReadClusters = strings.Split(remoteReadClusters, ",") } + + cfg.IndexRolloverFrequencySpans = strings.ToLower(v.GetString(cfg.namespace + suffixIndexRolloverFrequencySpans)) + cfg.IndexRolloverFrequencyServices = strings.ToLower(v.GetString(cfg.namespace + suffixIndexRolloverFrequencyServices)) + + separator := v.GetString(cfg.namespace + suffixIndexDateSeparator) + cfg.IndexDateLayoutSpans = initDateLayout(cfg.IndexRolloverFrequencySpans, separator) + cfg.IndexDateLayoutServices = initDateLayout(cfg.IndexRolloverFrequencyServices, separator) + + // Dependencies calculation should be daily, and this index size is very small + cfg.IndexDateLayoutDependencies = initDateLayout(defaultIndexRolloverFrequency, separator) } // GetPrimary returns primary configuration. @@ -343,6 +365,11 @@ func stripWhiteSpace(str string) string { return strings.Replace(str, " ", "", -1) } -func initDateLayout(separator string) string { - return fmt.Sprintf("2006%s01%s02", separator, separator) +func initDateLayout(rolloverFreq, sep string) string { + // default to daily format + indexLayout := "2006" + sep + "01" + sep + "02" + if rolloverFreq == "hour" { + indexLayout = indexLayout + sep + "15" + } + return indexLayout } diff --git a/plugin/storage/es/options_test.go b/plugin/storage/es/options_test.go index d9bee8bd2b6..1836442e906 100644 --- a/plugin/storage/es/options_test.go +++ b/plugin/storage/es/options_test.go @@ -58,12 +58,15 @@ func TestOptionsWithFlags(t *testing.T) { "--es.num-shards=20", "--es.num-replicas=10", "--es.index-date-separator=", + "--es.index-rollover-frequency-spans=hour", + "--es.index-rollover-frequency-services=day", // a couple overrides "--es.remote-read-clusters=cluster_one,cluster_two", "--es.aux.server-urls=3.3.3.3, 4.4.4.4", "--es.aux.max-span-age=24h", "--es.aux.num-replicas=10", "--es.aux.index-date-separator=.", + "--es.aux.index-rollover-frequency-spans=hour", "--es.tls.enabled=true", "--es.tls.skip-host-verify=true", "--es.tags-as-fields.all=true", @@ -89,7 +92,8 @@ func TestOptionsWithFlags(t *testing.T) { assert.Equal(t, "!", primary.Tags.DotReplacement) assert.Equal(t, "./file.txt", primary.Tags.File) assert.Equal(t, "test,tags", primary.Tags.Include) - assert.Equal(t, "20060102", primary.IndexDateLayout) + assert.Equal(t, "20060102", primary.IndexDateLayoutServices) + assert.Equal(t, "2006010215", primary.IndexDateLayoutSpans) aux := opts.Get("es.aux") assert.Equal(t, []string{"3.3.3.3", "4.4.4.4"}, aux.Servers) assert.Equal(t, "hello", aux.Username) @@ -102,7 +106,8 @@ func TestOptionsWithFlags(t *testing.T) { assert.Equal(t, "@", aux.Tags.DotReplacement) assert.Equal(t, "./file.txt", aux.Tags.File) assert.Equal(t, "test,tags", aux.Tags.Include) - assert.Equal(t, "2006.01.02", aux.IndexDateLayout) + assert.Equal(t, "2006.01.02", aux.IndexDateLayoutServices) + assert.Equal(t, "2006.01.02.15", aux.IndexDateLayoutSpans) assert.True(t, primary.UseILM) } @@ -170,7 +175,64 @@ func TestIndexDateSeparator(t *testing.T) { opts.InitFromViper(v) primary := opts.GetPrimary() - assert.Equal(t, tc.wantDateLayout, primary.IndexDateLayout) + assert.Equal(t, tc.wantDateLayout, primary.IndexDateLayoutSpans) + }) + } +} + +func TestIndexRollover(t *testing.T) { + testCases := []struct { + name string + flags []string + wantSpanDateLayout string + wantServiceDateLayout string + wantSpanIndexRolloverFrequency time.Duration + wantServiceIndexRolloverFrequency time.Duration + }{ + { + name: "not defined (default)", + flags: []string{}, + wantSpanDateLayout: "2006-01-02", + wantServiceDateLayout: "2006-01-02", + wantSpanIndexRolloverFrequency: -24 * time.Hour, + wantServiceIndexRolloverFrequency: -24 * time.Hour, + }, + { + name: "index day rollover", + flags: []string{"--es.index-rollover-frequency-services=day", "--es.index-rollover-frequency-spans=hour"}, + wantSpanDateLayout: "2006-01-02-15", + wantServiceDateLayout: "2006-01-02", + wantSpanIndexRolloverFrequency: -1 * time.Hour, + wantServiceIndexRolloverFrequency: -24 * time.Hour, + }, + { + name: "index hour rollover", + flags: []string{"--es.index-rollover-frequency-services=hour", "--es.index-rollover-frequency-spans=day"}, + wantSpanDateLayout: "2006-01-02", + wantServiceDateLayout: "2006-01-02-15", + wantSpanIndexRolloverFrequency: -24 * time.Hour, + wantServiceIndexRolloverFrequency: -1 * time.Hour, + }, + { + name: "invalid index rollover frequency falls back to default 'day'", + flags: []string{"--es.index-rollover-frequency-services=hours", "--es.index-rollover-frequency-spans=hours"}, + wantSpanDateLayout: "2006-01-02", + wantServiceDateLayout: "2006-01-02", + wantSpanIndexRolloverFrequency: -24 * time.Hour, + wantServiceIndexRolloverFrequency: -24 * time.Hour, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + opts := NewOptions("es") + v, command := config.Viperize(opts.AddFlags) + command.ParseFlags(tc.flags) + opts.InitFromViper(v) + primary := opts.GetPrimary() + assert.Equal(t, tc.wantSpanDateLayout, primary.IndexDateLayoutSpans) + assert.Equal(t, tc.wantServiceDateLayout, primary.IndexDateLayoutServices) + assert.Equal(t, tc.wantSpanIndexRolloverFrequency, primary.GetIndexRolloverFrequencySpansDuration()) + assert.Equal(t, tc.wantServiceIndexRolloverFrequency, primary.GetIndexRolloverFrequencyServicesDuration()) }) } } diff --git a/plugin/storage/es/spanstore/reader.go b/plugin/storage/es/spanstore/reader.go index 4688f6c6ea7..3e405dfca84 100644 --- a/plugin/storage/es/spanstore/reader.go +++ b/plugin/storage/es/spanstore/reader.go @@ -94,52 +94,61 @@ type SpanReader struct { logger *zap.Logger // The age of the oldest service/operation we will look for. Because indices in ElasticSearch are by day, // this will be rounded down to UTC 00:00 of that day. - maxSpanAge time.Duration - serviceOperationStorage *ServiceOperationStorage - spanIndexPrefix string - serviceIndexPrefix string - indexDateLayout string - spanConverter dbmodel.ToDomain - timeRangeIndices timeRangeIndexFn - sourceFn sourceFn - maxDocCount int - useReadWriteAliases bool + maxSpanAge time.Duration + serviceOperationStorage *ServiceOperationStorage + spanIndexPrefix string + serviceIndexPrefix string + spanIndexDateLayout string + serviceIndexDateLayout string + spanIndexRolloverFrequency time.Duration + serviceIndexRolloverFrequency time.Duration + spanConverter dbmodel.ToDomain + timeRangeIndices timeRangeIndexFn + sourceFn sourceFn + maxDocCount int + useReadWriteAliases bool } // SpanReaderParams holds constructor params for NewSpanReader type SpanReaderParams struct { - Client es.Client - Logger *zap.Logger - MaxSpanAge time.Duration - MaxDocCount int - MetricsFactory metrics.Factory - IndexPrefix string - IndexDateLayout string - TagDotReplacement string - Archive bool - UseReadWriteAliases bool - RemoteReadClusters []string + Client es.Client + Logger *zap.Logger + MaxSpanAge time.Duration + MaxDocCount int + MetricsFactory metrics.Factory + IndexPrefix string + SpanIndexDateLayout string + ServiceIndexDateLayout string + SpanIndexRolloverFrequency time.Duration + ServiceIndexRolloverFrequency time.Duration + TagDotReplacement string + Archive bool + UseReadWriteAliases bool + RemoteReadClusters []string } // NewSpanReader returns a new SpanReader with a metrics. func NewSpanReader(p SpanReaderParams) *SpanReader { return &SpanReader{ - client: p.Client, - logger: p.Logger, - maxSpanAge: p.MaxSpanAge, - serviceOperationStorage: NewServiceOperationStorage(p.Client, p.Logger, 0), // the decorator takes care of metrics - spanIndexPrefix: indexNames(p.IndexPrefix, spanIndex), - serviceIndexPrefix: indexNames(p.IndexPrefix, serviceIndex), - indexDateLayout: p.IndexDateLayout, - spanConverter: dbmodel.NewToDomain(p.TagDotReplacement), - timeRangeIndices: getTimeRangeIndexFn(p.Archive, p.UseReadWriteAliases, p.RemoteReadClusters), - sourceFn: getSourceFn(p.Archive, p.MaxDocCount), - maxDocCount: p.MaxDocCount, - useReadWriteAliases: p.UseReadWriteAliases, - } -} - -type timeRangeIndexFn func(indexName string, indexDateLayout string, startTime time.Time, endTime time.Time) []string + client: p.Client, + logger: p.Logger, + maxSpanAge: p.MaxSpanAge, + serviceOperationStorage: NewServiceOperationStorage(p.Client, p.Logger, 0), // the decorator takes care of metrics + spanIndexPrefix: indexNames(p.IndexPrefix, spanIndex), + serviceIndexPrefix: indexNames(p.IndexPrefix, serviceIndex), + spanIndexDateLayout: p.SpanIndexDateLayout, + serviceIndexDateLayout: p.ServiceIndexDateLayout, + spanIndexRolloverFrequency: p.SpanIndexRolloverFrequency, + serviceIndexRolloverFrequency: p.SpanIndexRolloverFrequency, + spanConverter: dbmodel.NewToDomain(p.TagDotReplacement), + timeRangeIndices: getTimeRangeIndexFn(p.Archive, p.UseReadWriteAliases, p.RemoteReadClusters), + sourceFn: getSourceFn(p.Archive, p.MaxDocCount), + maxDocCount: p.MaxDocCount, + useReadWriteAliases: p.UseReadWriteAliases, + } +} + +type timeRangeIndexFn func(indexName string, indexDateLayout string, startTime time.Time, endTime time.Time, reduceDuration time.Duration) []string type sourceFn func(query elastic.Query, nextTime uint64) *elastic.SearchSource @@ -151,12 +160,12 @@ func getTimeRangeIndexFn(archive, useReadWriteAliases bool, remoteReadClusters [ } else { archiveSuffix = archiveIndexSuffix } - return addRemoteReadClusters(func(indexPrefix, indexDateLayout string, startTime time.Time, endTime time.Time) []string { + return addRemoteReadClusters(func(indexPrefix, indexDateLayout string, startTime time.Time, endTime time.Time, reduceDuration time.Duration) []string { return []string{archiveIndex(indexPrefix, archiveSuffix)} }, remoteReadClusters) } if useReadWriteAliases { - return addRemoteReadClusters(func(indexPrefix string, indexDateLayout string, startTime time.Time, endTime time.Time) []string { + return addRemoteReadClusters(func(indexPrefix string, indexDateLayout string, startTime time.Time, endTime time.Time, reduceDuration time.Duration) []string { return []string{indexPrefix + "read"} }, remoteReadClusters) } @@ -166,8 +175,8 @@ func getTimeRangeIndexFn(archive, useReadWriteAliases bool, remoteReadClusters [ // Add a remote cluster prefix for each cluster and for each index and add it to the list of original indices. // Elasticsearch cross cluster api example GET /twitter,cluster_one:twitter,cluster_two:twitter/_search. func addRemoteReadClusters(fn timeRangeIndexFn, remoteReadClusters []string) timeRangeIndexFn { - return func(indexPrefix string, indexDateLayout string, startTime time.Time, endTime time.Time) []string { - jaegerIndices := fn(indexPrefix, indexDateLayout, startTime, endTime) + return func(indexPrefix string, indexDateLayout string, startTime time.Time, endTime time.Time, reduceDuration time.Duration) []string { + jaegerIndices := fn(indexPrefix, indexDateLayout, startTime, endTime, reduceDuration) if len(remoteReadClusters) == 0 { return jaegerIndices } @@ -198,13 +207,15 @@ func getSourceFn(archive bool, maxDocCount int) sourceFn { } // timeRangeIndices returns the array of indices that we need to query, based on query params -func timeRangeIndices(indexName, indexDateLayout string, startTime time.Time, endTime time.Time) []string { +func timeRangeIndices(indexName, indexDateLayout string, startTime time.Time, endTime time.Time, reduceDuration time.Duration) []string { var indices []string firstIndex := indexWithDate(indexName, indexDateLayout, startTime) currentIndex := indexWithDate(indexName, indexDateLayout, endTime) for currentIndex != firstIndex { - indices = append(indices, currentIndex) - endTime = endTime.Add(-24 * time.Hour) + if len(indices) == 0 || indices[len(indices)-1] != currentIndex { + indices = append(indices, currentIndex) + } + endTime = endTime.Add(reduceDuration) currentIndex = indexWithDate(indexName, indexDateLayout, endTime) } indices = append(indices, firstIndex) @@ -268,7 +279,7 @@ func (s *SpanReader) GetServices(ctx context.Context) ([]string, error) { span, ctx := opentracing.StartSpanFromContext(ctx, "GetServices") defer span.Finish() currentTime := time.Now() - jaegerIndices := s.timeRangeIndices(s.serviceIndexPrefix, s.indexDateLayout, currentTime.Add(-s.maxSpanAge), currentTime) + jaegerIndices := s.timeRangeIndices(s.serviceIndexPrefix, s.serviceIndexDateLayout, currentTime.Add(-s.maxSpanAge), currentTime, s.serviceIndexRolloverFrequency) return s.serviceOperationStorage.getServices(ctx, jaegerIndices, s.maxDocCount) } @@ -280,7 +291,7 @@ func (s *SpanReader) GetOperations( span, ctx := opentracing.StartSpanFromContext(ctx, "GetOperations") defer span.Finish() currentTime := time.Now() - jaegerIndices := s.timeRangeIndices(s.serviceIndexPrefix, s.indexDateLayout, currentTime.Add(-s.maxSpanAge), currentTime) + jaegerIndices := s.timeRangeIndices(s.serviceIndexPrefix, s.serviceIndexDateLayout, currentTime.Add(-s.maxSpanAge), currentTime, s.serviceIndexRolloverFrequency) operations, err := s.serviceOperationStorage.getOperations(ctx, jaegerIndices, query.ServiceName, s.maxDocCount) if err != nil { return nil, err @@ -353,7 +364,7 @@ func (s *SpanReader) multiRead(ctx context.Context, traceIDs []model.TraceID, st // Add an hour in both directions so that traces that straddle two indexes are retrieved. // i.e starts in one and ends in another. - indices := s.timeRangeIndices(s.spanIndexPrefix, s.indexDateLayout, startTime.Add(-time.Hour), endTime.Add(time.Hour)) + indices := s.timeRangeIndices(s.spanIndexPrefix, s.spanIndexDateLayout, startTime.Add(-time.Hour), endTime.Add(time.Hour), s.spanIndexRolloverFrequency) nextTime := model.TimeAsEpochMicroseconds(startTime.Add(-time.Hour)) searchAfterTime := make(map[model.TraceID]uint64) totalDocumentsFetched := make(map[model.TraceID]int) @@ -544,7 +555,7 @@ func (s *SpanReader) findTraceIDs(ctx context.Context, traceQuery *spanstore.Tra // } aggregation := s.buildTraceIDAggregation(traceQuery.NumTraces) boolQuery := s.buildFindTraceIDsQuery(traceQuery) - jaegerIndices := s.timeRangeIndices(s.spanIndexPrefix, s.indexDateLayout, traceQuery.StartTimeMin, traceQuery.StartTimeMax) + jaegerIndices := s.timeRangeIndices(s.spanIndexPrefix, s.spanIndexDateLayout, traceQuery.StartTimeMin, traceQuery.StartTimeMax, s.spanIndexRolloverFrequency) searchService := s.client.Search(jaegerIndices...). Size(0). // set to 0 because we don't want actual documents. diff --git a/plugin/storage/es/spanstore/reader_test.go b/plugin/storage/es/spanstore/reader_test.go index 220445dc7ab..64410d35394 100644 --- a/plugin/storage/es/spanstore/reader_test.go +++ b/plugin/storage/es/spanstore/reader_test.go @@ -147,61 +147,79 @@ func TestSpanReaderIndices(t *testing.T) { logger, _ := testutils.NewLogger() metricsFactory := metricstest.NewFactory(0) date := time.Date(2019, 10, 10, 5, 0, 0, 0, time.UTC) - dateFormat := date.UTC().Format("2006-01-02") + spanDataLayout := "2006-01-02-15" + serviceDataLayout := "2006-01-02" + spanDataLayoutFormat := date.UTC().Format(spanDataLayout) + serviceDataLayoutFormat := date.UTC().Format(serviceDataLayout) + testCases := []struct { indices []string params SpanReaderParams }{ {params: SpanReaderParams{Client: client, Logger: logger, MetricsFactory: metricsFactory, - IndexPrefix: "", Archive: false}, - indices: []string{spanIndex + dateFormat}}, + IndexPrefix: "", Archive: false, SpanIndexDateLayout: spanDataLayout, ServiceIndexDateLayout: serviceDataLayout}, + indices: []string{spanIndex + spanDataLayoutFormat, serviceIndex + serviceDataLayoutFormat}}, {params: SpanReaderParams{Client: client, Logger: logger, MetricsFactory: metricsFactory, IndexPrefix: "", UseReadWriteAliases: true}, - indices: []string{spanIndex + "read"}}, + indices: []string{spanIndex + "read", serviceIndex + "read"}}, {params: SpanReaderParams{Client: client, Logger: logger, MetricsFactory: metricsFactory, - IndexPrefix: "foo:", Archive: false}, - indices: []string{"foo:" + indexPrefixSeparator + spanIndex + dateFormat}}, + IndexPrefix: "foo:", Archive: false, SpanIndexDateLayout: spanDataLayout, ServiceIndexDateLayout: serviceDataLayout}, + indices: []string{"foo:" + indexPrefixSeparator + spanIndex + spanDataLayoutFormat, "foo:" + indexPrefixSeparator + serviceIndex + serviceDataLayoutFormat}}, {params: SpanReaderParams{Client: client, Logger: logger, MetricsFactory: metricsFactory, IndexPrefix: "foo:", UseReadWriteAliases: true}, - indices: []string{"foo:-" + spanIndex + "read"}}, + indices: []string{"foo:-" + spanIndex + "read", "foo:-" + serviceIndex + "read"}}, {params: SpanReaderParams{Client: client, Logger: logger, MetricsFactory: metricsFactory, IndexPrefix: "", Archive: true}, - indices: []string{spanIndex + archiveIndexSuffix}}, + indices: []string{spanIndex + archiveIndexSuffix, serviceIndex + archiveIndexSuffix}}, {params: SpanReaderParams{Client: client, Logger: logger, MetricsFactory: metricsFactory, IndexPrefix: "foo:", Archive: true}, - indices: []string{"foo:" + indexPrefixSeparator + spanIndex + archiveIndexSuffix}}, + indices: []string{"foo:" + indexPrefixSeparator + spanIndex + archiveIndexSuffix, "foo:" + indexPrefixSeparator + serviceIndex + archiveIndexSuffix}}, {params: SpanReaderParams{Client: client, Logger: logger, MetricsFactory: metricsFactory, IndexPrefix: "foo:", Archive: true, UseReadWriteAliases: true}, - indices: []string{"foo:" + indexPrefixSeparator + spanIndex + archiveReadIndexSuffix}}, + indices: []string{"foo:" + indexPrefixSeparator + spanIndex + archiveReadIndexSuffix, "foo:" + indexPrefixSeparator + serviceIndex + archiveReadIndexSuffix}}, {params: SpanReaderParams{Client: client, Logger: logger, MetricsFactory: metricsFactory, - IndexPrefix: "", Archive: false, RemoteReadClusters: []string{"cluster_one", "cluster_two"}}, + IndexPrefix: "", Archive: false, RemoteReadClusters: []string{"cluster_one", "cluster_two"}, SpanIndexDateLayout: spanDataLayout, ServiceIndexDateLayout: serviceDataLayout}, indices: []string{ - spanIndex + dateFormat, - "cluster_one:" + spanIndex + dateFormat, - "cluster_two:" + spanIndex + dateFormat}}, + spanIndex + spanDataLayoutFormat, + "cluster_one:" + spanIndex + spanDataLayoutFormat, + "cluster_two:" + spanIndex + spanDataLayoutFormat, + serviceIndex + serviceDataLayoutFormat, + "cluster_one:" + serviceIndex + serviceDataLayoutFormat, + "cluster_two:" + serviceIndex + serviceDataLayoutFormat}}, {params: SpanReaderParams{Client: client, Logger: logger, MetricsFactory: metricsFactory, IndexPrefix: "", Archive: true, RemoteReadClusters: []string{"cluster_one", "cluster_two"}}, indices: []string{ spanIndex + archiveIndexSuffix, "cluster_one:" + spanIndex + archiveIndexSuffix, - "cluster_two:" + spanIndex + archiveIndexSuffix}}, + "cluster_two:" + spanIndex + archiveIndexSuffix, + serviceIndex + archiveIndexSuffix, + "cluster_one:" + serviceIndex + archiveIndexSuffix, + "cluster_two:" + serviceIndex + archiveIndexSuffix}}, {params: SpanReaderParams{Client: client, Logger: logger, MetricsFactory: metricsFactory, IndexPrefix: "", Archive: false, UseReadWriteAliases: true, RemoteReadClusters: []string{"cluster_one", "cluster_two"}}, indices: []string{ spanIndex + "read", "cluster_one:" + spanIndex + "read", - "cluster_two:" + spanIndex + "read"}}, + "cluster_two:" + spanIndex + "read", + serviceIndex + "read", + "cluster_one:" + serviceIndex + "read", + "cluster_two:" + serviceIndex + "read"}}, {params: SpanReaderParams{Client: client, Logger: logger, MetricsFactory: metricsFactory, IndexPrefix: "", Archive: true, UseReadWriteAliases: true, RemoteReadClusters: []string{"cluster_one", "cluster_two"}}, indices: []string{ spanIndex + archiveReadIndexSuffix, "cluster_one:" + spanIndex + archiveReadIndexSuffix, - "cluster_two:" + spanIndex + archiveReadIndexSuffix}}, + "cluster_two:" + spanIndex + archiveReadIndexSuffix, + serviceIndex + archiveReadIndexSuffix, + "cluster_one:" + serviceIndex + archiveReadIndexSuffix, + "cluster_two:" + serviceIndex + archiveReadIndexSuffix}}, } for _, testCase := range testCases { r := NewSpanReader(testCase.params) - actual := r.timeRangeIndices(r.spanIndexPrefix, "2006-01-02", date, date) - assert.Equal(t, testCase.indices, actual) + + actualSpan := r.timeRangeIndices(r.spanIndexPrefix, r.spanIndexDateLayout, date, date, -1*time.Hour) + actualService := r.timeRangeIndices(r.serviceIndexPrefix, r.serviceIndexDateLayout, date, date, -24*time.Hour) + assert.Equal(t, testCase.indices, append(actualSpan, actualService...)) } } @@ -495,7 +513,7 @@ func TestSpanReaderFindIndices(t *testing.T) { } withSpanReader(func(r *spanReaderTest) { for _, testCase := range testCases { - actual := r.reader.timeRangeIndices(spanIndex, dateLayout, testCase.startTime, testCase.endTime) + actual := r.reader.timeRangeIndices(spanIndex, dateLayout, testCase.startTime, testCase.endTime, -24*time.Hour) assert.EqualValues(t, testCase.expected, actual) } }) diff --git a/plugin/storage/es/spanstore/writer.go b/plugin/storage/es/spanstore/writer.go index 87b6bbff3b2..bf69ae07764 100644 --- a/plugin/storage/es/spanstore/writer.go +++ b/plugin/storage/es/spanstore/writer.go @@ -56,18 +56,19 @@ type SpanWriter struct { // SpanWriterParams holds constructor parameters for NewSpanWriter type SpanWriterParams struct { - Client es.Client - Logger *zap.Logger - MetricsFactory metrics.Factory - IndexPrefix string - IndexDateLayout string - AllTagsAsFields bool - TagKeysAsFields []string - TagDotReplacement string - Archive bool - UseReadWriteAliases bool - ServiceCacheTTL time.Duration - IndexCacheTTL time.Duration + Client es.Client + Logger *zap.Logger + MetricsFactory metrics.Factory + IndexPrefix string + SpanIndexDateLayout string + ServiceIndexDateLayout string + AllTagsAsFields bool + TagKeysAsFields []string + TagDotReplacement string + Archive bool + UseReadWriteAliases bool + ServiceCacheTTL time.Duration + IndexCacheTTL time.Duration } // NewSpanWriter creates a new SpanWriter for use @@ -97,7 +98,7 @@ func NewSpanWriter(p SpanWriterParams) *SpanWriter { }, ), spanConverter: dbmodel.NewFromDomain(p.AllTagsAsFields, p.TagKeysAsFields, p.TagDotReplacement), - spanServiceIndex: getSpanAndServiceIndexFn(p.Archive, p.UseReadWriteAliases, p.IndexPrefix, p.IndexDateLayout), + spanServiceIndex: getSpanAndServiceIndexFn(p.Archive, p.UseReadWriteAliases, p.IndexPrefix, p.SpanIndexDateLayout, p.ServiceIndexDateLayout), } } @@ -120,7 +121,7 @@ func (s *SpanWriter) CreateTemplates(spanTemplate, serviceTemplate, indexPrefix // spanAndServiceIndexFn returns names of span and service indices type spanAndServiceIndexFn func(spanTime time.Time) (string, string) -func getSpanAndServiceIndexFn(archive, useReadWriteAliases bool, prefix, dateLayout string) spanAndServiceIndexFn { +func getSpanAndServiceIndexFn(archive, useReadWriteAliases bool, prefix, spanDateLayout string, serviceDateLayout string) spanAndServiceIndexFn { if prefix != "" { prefix += indexPrefixSeparator } @@ -141,7 +142,7 @@ func getSpanAndServiceIndexFn(archive, useReadWriteAliases bool, prefix, dateLay } } return func(date time.Time) (string, string) { - return indexWithDate(spanIndexPrefix, dateLayout, date), indexWithDate(serviceIndexPrefix, dateLayout, date) + return indexWithDate(spanIndexPrefix, spanDateLayout, date), indexWithDate(serviceIndexPrefix, serviceDateLayout, date) } } diff --git a/plugin/storage/es/spanstore/writer_test.go b/plugin/storage/es/spanstore/writer_test.go index b2f208015b0..b4f011b8efa 100644 --- a/plugin/storage/es/spanstore/writer_test.go +++ b/plugin/storage/es/spanstore/writer_test.go @@ -50,7 +50,7 @@ func withSpanWriter(fn func(w *spanWriterTest)) { client: client, logger: logger, logBuffer: logBuffer, - writer: NewSpanWriter(SpanWriterParams{Client: client, Logger: logger, MetricsFactory: metricsFactory, IndexDateLayout: "2006-01-02"}), + writer: NewSpanWriter(SpanWriterParams{Client: client, Logger: logger, MetricsFactory: metricsFactory, SpanIndexDateLayout: "2006-01-02", ServiceIndexDateLayout: "2006-01-02"}), } fn(w) } @@ -62,32 +62,34 @@ func TestSpanWriterIndices(t *testing.T) { logger, _ := testutils.NewLogger() metricsFactory := metricstest.NewFactory(0) date := time.Now() - layout := "2006-01-02" - dateFormat := date.UTC().Format(layout) + spanDataLayout := "2006-01-02-15" + serviceDataLayout := "2006-01-02" + spanDataLayoutFormat := date.UTC().Format(spanDataLayout) + serviceDataLayoutFormat := date.UTC().Format(serviceDataLayout) testCases := []struct { indices []string params SpanWriterParams }{ {params: SpanWriterParams{Client: client, Logger: logger, MetricsFactory: metricsFactory, - IndexPrefix: "", IndexDateLayout: layout, Archive: false}, - indices: []string{spanIndex + dateFormat, serviceIndex + dateFormat}}, + IndexPrefix: "", SpanIndexDateLayout: spanDataLayout, ServiceIndexDateLayout: serviceDataLayout, Archive: false}, + indices: []string{spanIndex + spanDataLayoutFormat, serviceIndex + serviceDataLayoutFormat}}, {params: SpanWriterParams{Client: client, Logger: logger, MetricsFactory: metricsFactory, - IndexPrefix: "", IndexDateLayout: layout, UseReadWriteAliases: true}, + IndexPrefix: "", SpanIndexDateLayout: spanDataLayout, ServiceIndexDateLayout: serviceDataLayout, UseReadWriteAliases: true}, indices: []string{spanIndex + "write", serviceIndex + "write"}}, {params: SpanWriterParams{Client: client, Logger: logger, MetricsFactory: metricsFactory, - IndexPrefix: "foo:", IndexDateLayout: layout, Archive: false}, - indices: []string{"foo:" + indexPrefixSeparator + spanIndex + dateFormat, "foo:" + indexPrefixSeparator + serviceIndex + dateFormat}}, + IndexPrefix: "foo:", SpanIndexDateLayout: spanDataLayout, ServiceIndexDateLayout: serviceDataLayout, Archive: false}, + indices: []string{"foo:" + indexPrefixSeparator + spanIndex + spanDataLayoutFormat, "foo:" + indexPrefixSeparator + serviceIndex + serviceDataLayoutFormat}}, {params: SpanWriterParams{Client: client, Logger: logger, MetricsFactory: metricsFactory, - IndexPrefix: "foo:", IndexDateLayout: layout, UseReadWriteAliases: true}, + IndexPrefix: "foo:", SpanIndexDateLayout: spanDataLayout, ServiceIndexDateLayout: serviceDataLayout, UseReadWriteAliases: true}, indices: []string{"foo:-" + spanIndex + "write", "foo:-" + serviceIndex + "write"}}, {params: SpanWriterParams{Client: client, Logger: logger, MetricsFactory: metricsFactory, - IndexPrefix: "", IndexDateLayout: layout, Archive: true}, + IndexPrefix: "", SpanIndexDateLayout: spanDataLayout, ServiceIndexDateLayout: serviceDataLayout, Archive: true}, indices: []string{spanIndex + archiveIndexSuffix, ""}}, {params: SpanWriterParams{Client: client, Logger: logger, MetricsFactory: metricsFactory, - IndexPrefix: "foo:", IndexDateLayout: layout, Archive: true}, + IndexPrefix: "foo:", SpanIndexDateLayout: spanDataLayout, ServiceIndexDateLayout: serviceDataLayout, Archive: true}, indices: []string{"foo:" + indexPrefixSeparator + spanIndex + archiveIndexSuffix, ""}}, {params: SpanWriterParams{Client: client, Logger: logger, MetricsFactory: metricsFactory, - IndexPrefix: "foo:", IndexDateLayout: layout, Archive: true, UseReadWriteAliases: true}, + IndexPrefix: "foo:", SpanIndexDateLayout: spanDataLayout, ServiceIndexDateLayout: serviceDataLayout, Archive: true, UseReadWriteAliases: true}, indices: []string{"foo:" + indexPrefixSeparator + spanIndex + archiveWriteIndexSuffix, ""}}, } for _, testCase := range testCases {