Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[es] Add index rollover mode that can choose day and hour #2965

Merged
merged 13 commits into from
May 17, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 28 additions & 27 deletions pkg/es/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,33 +44,34 @@ import (

// Configuration describes the configuration properties needed to connect to an ElasticSearch cluster
type Configuration struct {
Servers []string `mapstructure:"server_urls"`
RemoteReadClusters []string `mapstructure:"remote_read_clusters"`
Username string `mapstructure:"username"`
Password string `mapstructure:"password" json:"-"`
TokenFilePath string `mapstructure:"token_file"`
AllowTokenFromContext bool `mapstructure:"-"`
Sniffer bool `mapstructure:"sniffer"` // https://github.com/olivere/elastic/wiki/Sniffing
SnifferTLSEnabled bool `mapstructure:"sniffer_tls_enabled"`
MaxDocCount int `mapstructure:"-"` // Defines maximum number of results to fetch from storage per query
MaxSpanAge time.Duration `yaml:"max_span_age" mapstructure:"-"` // configures the maximum lookback on span reads
NumShards int64 `yaml:"shards" mapstructure:"num_shards"`
NumReplicas int64 `yaml:"replicas" mapstructure:"num_replicas"`
Timeout time.Duration `validate:"min=500" mapstructure:"-"`
BulkSize int `mapstructure:"-"`
BulkWorkers int `mapstructure:"-"`
BulkActions int `mapstructure:"-"`
BulkFlushInterval time.Duration `mapstructure:"-"`
IndexPrefix string `mapstructure:"index_prefix"`
IndexDateLayout string `mapstructure:"index_date_layout"`
Tags TagsAsFields `mapstructure:"tags_as_fields"`
Enabled bool `mapstructure:"-"`
TLS tlscfg.Options `mapstructure:"tls"`
UseReadWriteAliases bool `mapstructure:"use_aliases"`
CreateIndexTemplates bool `mapstructure:"create_mappings"`
UseILM bool `mapstructure:"use_ilm"`
Version uint `mapstructure:"version"`
LogLevel string `mapstructure:"log_level"`
Servers []string `mapstructure:"server_urls"`
RemoteReadClusters []string `mapstructure:"remote_read_clusters"`
Username string `mapstructure:"username"`
Password string `mapstructure:"password" json:"-"`
TokenFilePath string `mapstructure:"token_file"`
AllowTokenFromContext bool `mapstructure:"-"`
Sniffer bool `mapstructure:"sniffer"` // https://github.com/olivere/elastic/wiki/Sniffing
SnifferTLSEnabled bool `mapstructure:"sniffer_tls_enabled"`
MaxDocCount int `mapstructure:"-"` // Defines maximum number of results to fetch from storage per query
MaxSpanAge time.Duration `yaml:"max_span_age" mapstructure:"-"` // configures the maximum lookback on span reads
NumShards int64 `yaml:"shards" mapstructure:"num_shards"`
NumReplicas int64 `yaml:"replicas" mapstructure:"num_replicas"`
Timeout time.Duration `validate:"min=500" mapstructure:"-"`
BulkSize int `mapstructure:"-"`
BulkWorkers int `mapstructure:"-"`
BulkActions int `mapstructure:"-"`
BulkFlushInterval time.Duration `mapstructure:"-"`
IndexPrefix string `mapstructure:"index_prefix"`
IndexDateLayout string `mapstructure:"index_date_layout"`
IndexRolloverFrequency string `mapstructure:"-"`
Tags TagsAsFields `mapstructure:"tags_as_fields"`
Enabled bool `mapstructure:"-"`
TLS tlscfg.Options `mapstructure:"tls"`
UseReadWriteAliases bool `mapstructure:"use_aliases"`
CreateIndexTemplates bool `mapstructure:"create_mappings"`
UseILM bool `mapstructure:"use_ilm"`
Version uint `mapstructure:"version"`
LogLevel string `mapstructure:"log_level"`
}

// TagsAsFields holds configuration for tag schema.
Expand Down
83 changes: 49 additions & 34 deletions plugin/storage/es/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package es

import (
"flag"
"fmt"
"strings"
"time"

Expand All @@ -29,42 +28,45 @@ import (
)

const (
suffixUsername = ".username"
suffixPassword = ".password"
suffixSniffer = ".sniffer"
suffixSnifferTLSEnabled = ".sniffer-tls-enabled"
suffixTokenPath = ".token-file"
suffixServerURLs = ".server-urls"
suffixRemoteReadClusters = ".remote-read-clusters"
suffixMaxSpanAge = ".max-span-age"
suffixNumShards = ".num-shards"
suffixNumReplicas = ".num-replicas"
suffixBulkSize = ".bulk.size"
suffixBulkWorkers = ".bulk.workers"
suffixBulkActions = ".bulk.actions"
suffixBulkFlushInterval = ".bulk.flush-interval"
suffixTimeout = ".timeout"
suffixIndexPrefix = ".index-prefix"
suffixIndexDateSeparator = ".index-date-separator"
suffixTagsAsFields = ".tags-as-fields"
suffixTagsAsFieldsAll = suffixTagsAsFields + ".all"
suffixTagsAsFieldsInclude = suffixTagsAsFields + ".include"
suffixTagsFile = suffixTagsAsFields + ".config-file"
suffixTagDeDotChar = suffixTagsAsFields + ".dot-replacement"
suffixReadAlias = ".use-aliases"
suffixUseILM = ".use-ilm"
suffixCreateIndexTemplate = ".create-index-templates"
suffixEnabled = ".enabled"
suffixVersion = ".version"
suffixMaxDocCount = ".max-doc-count"
suffixLogLevel = ".log-level"
suffixUsername = ".username"
suffixPassword = ".password"
suffixSniffer = ".sniffer"
suffixSnifferTLSEnabled = ".sniffer-tls-enabled"
suffixTokenPath = ".token-file"
suffixServerURLs = ".server-urls"
suffixRemoteReadClusters = ".remote-read-clusters"
suffixMaxSpanAge = ".max-span-age"
suffixNumShards = ".num-shards"
suffixNumReplicas = ".num-replicas"
suffixBulkSize = ".bulk.size"
suffixBulkWorkers = ".bulk.workers"
suffixBulkActions = ".bulk.actions"
suffixBulkFlushInterval = ".bulk.flush-interval"
suffixTimeout = ".timeout"
suffixIndexPrefix = ".index-prefix"
suffixIndexDateSeparator = ".index-date-separator"
suffixIndexRolloverFrequency = ".index-rollover-frequency"
suffixTagsAsFields = ".tags-as-fields"
suffixTagsAsFieldsAll = suffixTagsAsFields + ".all"
suffixTagsAsFieldsInclude = suffixTagsAsFields + ".include"
suffixTagsFile = suffixTagsAsFields + ".config-file"
suffixTagDeDotChar = suffixTagsAsFields + ".dot-replacement"
suffixReadAlias = ".use-aliases"
suffixUseILM = ".use-ilm"
suffixCreateIndexTemplate = ".create-index-templates"
suffixEnabled = ".enabled"
suffixVersion = ".version"
suffixMaxDocCount = ".max-doc-count"
suffixLogLevel = ".log-level"
// default number of documents to return from a query (elasticsearch allowed limit)
// see search.max_buckets and index.max_result_window
defaultMaxDocCount = 10_000
defaultServerURL = "http://127.0.0.1:9200"
defaultRemoteReadClusters = ""
// default separator for Elasticsearch index date layout.
defaultIndexDateSeparator = "-"

defaultIndexRolloverFrequency = "day"
)

// TODO this should be moved next to config.Configuration struct (maybe ./flags package)
Expand Down Expand Up @@ -205,7 +207,12 @@ func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) {
flagSet.String(
nsConfig.namespace+suffixIndexDateSeparator,
defaultIndexDateSeparator,
"Optional date separator of Jaeger indices. For example \".\" creates \"jaeger-span-2020.11.20 \".")
"Optional date separator of Jaeger indices. For example \".\" creates \"jaeger-span-2020.11.20\".")
flagSet.String(
nsConfig.namespace+suffixIndexRolloverFrequency,
defaultIndexRolloverFrequency,
"Rotates Jaeger indices over the given period. For example \"day\" creates \"jaeger-span-yyyy-MM-dd\" every day after UTC 12AM. Valid options: [hour, day]. "+
"Jaeger additionally supports manual and automated (via ILM) index management. Reference: https://www.jaegertracing.io/docs/deployment/#elasticsearch-rollover.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't we decide to change this msg?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@WalkerWang731 I believe we suggested to reword this to (@yurishkuro please correct if I'm wrong):

This does not delete old indices. For details on complete index management solutions supported by Jaeger, refer to: https://www.jaegertracing.io/docs/deployment/#elasticsearch-rollover

flagSet.Bool(
nsConfig.namespace+suffixTagsAsFieldsAll,
nsConfig.Tags.AllAsFields,
Expand Down Expand Up @@ -295,7 +302,6 @@ func initFromViper(cfg *namespaceConfig, v *viper.Viper) {
cfg.BulkFlushInterval = v.GetDuration(cfg.namespace + suffixBulkFlushInterval)
cfg.Timeout = v.GetDuration(cfg.namespace + suffixTimeout)
cfg.IndexPrefix = v.GetString(cfg.namespace + suffixIndexPrefix)
cfg.IndexDateLayout = initDateLayout(v.GetString(cfg.namespace + suffixIndexDateSeparator))
cfg.Tags.AllAsFields = v.GetBool(cfg.namespace + suffixTagsAsFieldsAll)
cfg.Tags.Include = v.GetString(cfg.namespace + suffixTagsAsFieldsInclude)
cfg.Tags.File = v.GetString(cfg.namespace + suffixTagsFile)
Expand All @@ -317,6 +323,10 @@ func initFromViper(cfg *namespaceConfig, v *viper.Viper) {
if len(remoteReadClusters) > 0 {
cfg.RemoteReadClusters = strings.Split(remoteReadClusters, ",")
}

rolloverFreq := strings.ToLower(v.GetString(cfg.namespace + suffixIndexRolloverFrequency))
separator := v.GetString(cfg.namespace + suffixIndexDateSeparator)
cfg.IndexDateLayout = initDateLayout(rolloverFreq, separator)
}

// GetPrimary returns primary configuration.
Expand All @@ -343,6 +353,11 @@ func stripWhiteSpace(str string) string {
return strings.Replace(str, " ", "", -1)
}

func initDateLayout(separator string) string {
return fmt.Sprintf("2006%s01%s02", separator, separator)
func initDateLayout(rolloverFreq, sep string) string {
// default to daily format
indexLayout := "2006" + sep + "01" + sep + "02"
if rolloverFreq == "hour" {
indexLayout = indexLayout + sep + "15"
}
return indexLayout
}
24 changes: 24 additions & 0 deletions plugin/storage/es/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,3 +174,27 @@ func TestIndexDateSeparator(t *testing.T) {
})
}
}

func TestIndexRollover(t *testing.T) {
testCases := []struct {
name string
flags []string
wantDateLayout string
}{
{"not defined (default)", []string{}, "2006-01-02"},
{"day rotation", []string{"--es.index-rollover-frequency=day"}, "2006-01-02"},
{"hour rotation", []string{"--es.index-rollover-frequency=hour"}, "2006-01-02-15"},
{"error rotation change default", []string{"--es.index-rollover=hours"}, "2006-01-02"},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
opts := NewOptions("es")
v, command := config.Viperize(opts.AddFlags)
command.ParseFlags(tc.flags)
opts.InitFromViper(v)
primary := opts.GetPrimary()
assert.Equal(t, tc.wantDateLayout, primary.IndexDateLayout)

})
}
}
12 changes: 10 additions & 2 deletions plugin/storage/es/spanstore/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"encoding/json"
"errors"
"fmt"
"strings"
"time"

"github.com/olivere/elastic"
Expand Down Expand Up @@ -199,9 +200,16 @@ func timeRangeIndices(indexName, indexDateLayout string, startTime time.Time, en
var indices []string
firstIndex := indexWithDate(indexName, indexDateLayout, startTime)
currentIndex := indexWithDate(indexName, indexDateLayout, endTime)

reduce := -24 * time.Hour
if strings.HasSuffix(indexDateLayout, "15") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a shame time.stdHour isn't exported.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A cleaner way is to pass freq directly as parameter, not infer it back from format string

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I also thought previously, but if pass freq directly then will also judge this variable too(if freq == "day" or move this judgment to ClientBuilder of config.go ), basically it same as currently.

And another side, other functions will be become more complicated include NewSpanReader SpanReaderParams SpanReader addRemoteReadClusters timeRangeIndices createSpanWriter and reader relevant factory functions and all of unit test functions. and change the ClientBuilder of config.go.

Actually, I always thought this commit should be small and should not change more code, but if we need, I can do it

Copy link
Member

@yurishkuro yurishkuro Apr 28, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fair enough. As Albert said, it would be good to at least use a named constant shared between config and span reader

Copy link
Contributor Author

@WalkerWang731 WalkerWang731 Apr 28, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I see, now that we decide to add the new constant, I want to separate rollover jaeger-span and jaeger-service incidentally. How about?
Because the reason can refer here
I will add two params index-rollover-span-frequency and index-rollover-span-frequency, do you have any suggestions?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. I would suggest these names index-rollover-frequency-spans and index-rollover-frequency-services

reduce = -1 * time.Hour
}
for currentIndex != firstIndex {
indices = append(indices, currentIndex)
endTime = endTime.Add(-24 * time.Hour)
if len(indices) == 0 || indices[len(indices)-1] != currentIndex {
indices = append(indices, currentIndex)
}
endTime = endTime.Add(reduce)
currentIndex = indexWithDate(indexName, indexDateLayout, endTime)
}
indices = append(indices, firstIndex)
Expand Down
36 changes: 26 additions & 10 deletions plugin/storage/es/spanstore/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,20 +147,28 @@ func TestSpanReaderIndices(t *testing.T) {
logger, _ := testutils.NewLogger()
metricsFactory := metricstest.NewFactory(0)
date := time.Date(2019, 10, 10, 5, 0, 0, 0, time.UTC)
dateFormat := date.UTC().Format("2006-01-02")
dateFormatDay := "2006-01-02"
dateFormatHour := "2006-01-02-15"

testCases := []struct {
indices []string
params SpanReaderParams
}{
{params: SpanReaderParams{Client: client, Logger: logger, MetricsFactory: metricsFactory,
IndexPrefix: "", Archive: false},
indices: []string{spanIndex + dateFormat}},
IndexPrefix: "", Archive: false, IndexDateLayout: dateFormatDay},
indices: []string{spanIndex + date.UTC().Format(dateFormatDay)}},
{params: SpanReaderParams{Client: client, Logger: logger, MetricsFactory: metricsFactory,
IndexPrefix: "", Archive: false, IndexDateLayout: dateFormatHour},
indices: []string{spanIndex + date.UTC().Format(dateFormatHour)}},
{params: SpanReaderParams{Client: client, Logger: logger, MetricsFactory: metricsFactory,
IndexPrefix: "", UseReadWriteAliases: true},
indices: []string{spanIndex + "read"}},
{params: SpanReaderParams{Client: client, Logger: logger, MetricsFactory: metricsFactory,
IndexPrefix: "foo:", Archive: false},
indices: []string{"foo:" + indexPrefixSeparator + spanIndex + dateFormat}},
IndexPrefix: "foo:", Archive: false, IndexDateLayout: dateFormatDay},
indices: []string{"foo:" + indexPrefixSeparator + spanIndex + date.UTC().Format(dateFormatDay)}},
{params: SpanReaderParams{Client: client, Logger: logger, MetricsFactory: metricsFactory,
IndexPrefix: "foo:", Archive: false, IndexDateLayout: dateFormatHour},
indices: []string{"foo:" + indexPrefixSeparator + spanIndex + date.UTC().Format(dateFormatHour)}},
{params: SpanReaderParams{Client: client, Logger: logger, MetricsFactory: metricsFactory,
IndexPrefix: "foo:", UseReadWriteAliases: true},
indices: []string{"foo:-" + spanIndex + "read"}},
Expand All @@ -174,11 +182,19 @@ func TestSpanReaderIndices(t *testing.T) {
IndexPrefix: "foo:", Archive: true, UseReadWriteAliases: true},
indices: []string{"foo:" + indexPrefixSeparator + spanIndex + archiveReadIndexSuffix}},
{params: SpanReaderParams{Client: client, Logger: logger, MetricsFactory: metricsFactory,
IndexPrefix: "", Archive: false, RemoteReadClusters: []string{"cluster_one", "cluster_two"}},
IndexPrefix: "", Archive: false, RemoteReadClusters: []string{"cluster_one", "cluster_two"},
IndexDateLayout: dateFormatDay},
indices: []string{
spanIndex + date.UTC().Format(dateFormatDay),
"cluster_one:" + spanIndex + date.UTC().Format(dateFormatDay),
"cluster_two:" + spanIndex + date.UTC().Format(dateFormatDay)}},
{params: SpanReaderParams{Client: client, Logger: logger, MetricsFactory: metricsFactory,
IndexPrefix: "", Archive: false, RemoteReadClusters: []string{"cluster_one", "cluster_two"},
IndexDateLayout: dateFormatHour},
indices: []string{
spanIndex + dateFormat,
"cluster_one:" + spanIndex + dateFormat,
"cluster_two:" + spanIndex + dateFormat}},
spanIndex + date.UTC().Format(dateFormatHour),
"cluster_one:" + spanIndex + date.UTC().Format(dateFormatHour),
"cluster_two:" + spanIndex + date.UTC().Format(dateFormatHour)}},
{params: SpanReaderParams{Client: client, Logger: logger, MetricsFactory: metricsFactory,
IndexPrefix: "", Archive: true, RemoteReadClusters: []string{"cluster_one", "cluster_two"}},
indices: []string{
Expand All @@ -200,7 +216,7 @@ func TestSpanReaderIndices(t *testing.T) {
}
for _, testCase := range testCases {
r := NewSpanReader(testCase.params)
actual := r.timeRangeIndices(r.spanIndexPrefix, "2006-01-02", date, date)
actual := r.timeRangeIndices(r.spanIndexPrefix, r.indexDateLayout, date, date)
assert.Equal(t, testCase.indices, actual)
}
}
Expand Down