diff --git a/pkg/es/config/config.go b/pkg/es/config/config.go index 5ca627a7cfa..3f0167382c2 100644 --- a/pkg/es/config/config.go +++ b/pkg/es/config/config.go @@ -51,6 +51,7 @@ type Configuration struct { AllTagsAsFields bool TagDotReplacement string TLS TLSConfig + UseReadAlias bool } // TLSConfig describes the configuration properties to connect tls enabled ElasticSearch cluster @@ -71,6 +72,7 @@ type ClientBuilder interface { GetTagsFilePath() string GetAllTagsAsFields() bool GetTagDotReplacement() string + GetUseReadAlias() bool } // NewClient creates a new ElasticSearch client @@ -206,6 +208,11 @@ func (c *Configuration) GetTagDotReplacement() string { return c.TagDotReplacement } +// GetUseReadAlias returns read index name +func (c *Configuration) GetUseReadAlias() bool { + return c.UseReadAlias +} + // getConfigOptions wraps the configs to feed to the ElasticSearch client init func (c *Configuration) getConfigOptions() ([]elastic.ClientOptionFunc, error) { options := []elastic.ClientOptionFunc{elastic.SetURL(c.Servers...), elastic.SetSniff(c.Sniffer)} diff --git a/plugin/storage/es/esCleaner.py b/plugin/storage/es/esCleaner.py index 4c77aa1da9d..003d8ab6563 100755 --- a/plugin/storage/es/esCleaner.py +++ b/plugin/storage/es/esCleaner.py @@ -25,6 +25,8 @@ def main(): prefix += 'jaeger' ilo.filter_by_regex(kind='prefix', value=prefix) + # This excludes archive index as we use source='name' + # source `creation_date` would include archive index ilo.filter_by_age(source='name', direction='older', timestring='%Y-%m-%d', unit='days', unit_count=int(sys.argv[1])) empty_list(ilo, 'No indices to delete') diff --git a/plugin/storage/es/factory.go b/plugin/storage/es/factory.go index f53b2586ca2..c02427b0e3b 100644 --- a/plugin/storage/es/factory.go +++ b/plugin/storage/es/factory.go @@ -17,6 +17,7 @@ package es import ( "bufio" "flag" + "fmt" "os" "path/filepath" "strings" @@ -74,7 +75,6 @@ func (f *Factory) InitFromViper(v *viper.Viper) { func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) error { f.metricsFactory, f.logger = metricsFactory, logger - // TODO move to one builder function primaryClient, err := f.primaryConfig.NewClient(logger, metricsFactory) if err != nil { return err @@ -90,38 +90,12 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) // CreateSpanReader implements storage.Factory func (f *Factory) CreateSpanReader() (spanstore.Reader, error) { - cfg := f.primaryConfig - return esSpanStore.NewSpanReader(esSpanStore.SpanReaderParams{ - Client: f.primaryClient, - Logger: f.logger, - MetricsFactory: f.metricsFactory, - MaxSpanAge: cfg.GetMaxSpanAge(), - IndexPrefix: cfg.GetIndexPrefix(), - TagDotReplacement: cfg.GetTagDotReplacement(), - }), nil + return createSpanReader(f.metricsFactory, f.logger, f.primaryClient, f.primaryConfig, false) } // CreateSpanWriter implements storage.Factory func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) { - cfg := f.primaryConfig - var tags []string - if cfg.GetTagsFilePath() != "" { - var err error - if tags, err = loadTagsFromFile(cfg.GetTagsFilePath()); err != nil { - f.logger.Error("Could not open file with tags", zap.Error(err)) - return nil, err - } - } - return esSpanStore.NewSpanWriter(esSpanStore.SpanWriterParams{Client: f.primaryClient, - Logger: f.logger, - MetricsFactory: f.metricsFactory, - NumShards: cfg.GetNumShards(), - NumReplicas: cfg.GetNumReplicas(), - IndexPrefix: cfg.GetIndexPrefix(), - AllTagsAsFields: cfg.GetAllTagsAsFields(), - TagKeysAsFields: tags, - TagDotReplacement: cfg.GetTagDotReplacement(), - }), nil + return createSpanWriter(f.metricsFactory, f.logger, f.archiveClient, f.primaryConfig, false) } // CreateDependencyReader implements storage.Factory @@ -149,40 +123,59 @@ func loadTagsFromFile(filePath string) ([]string, error) { // CreateArchiveSpanReader implements storage.ArchiveFactory func (f *Factory) CreateArchiveSpanReader() (spanstore.Reader, error) { - cfg := f.Options.Get(archiveNamespace) + return createSpanReader(f.metricsFactory, f.logger, f.archiveClient, f.Options.Get(archiveNamespace), true) +} + +// CreateArchiveSpanWriter implements storage.ArchiveFactory +func (f *Factory) CreateArchiveSpanWriter() (spanstore.Writer, error) { + return createSpanWriter(f.metricsFactory, f.logger, f.archiveClient, f.Options.Get(archiveNamespace), true) +} + +func createSpanReader( + mFactory metrics.Factory, + logger *zap.Logger, + client es.Client, + cfg config.ClientBuilder, + archive bool, +) (spanstore.Reader, error) { return esSpanStore.NewSpanReader(esSpanStore.SpanReaderParams{ - Client: f.archiveClient, - Logger: f.logger, - MetricsFactory: f.metricsFactory, - // TODO allow this config? we probably want to use forever - MaxSpanAge: cfg.GetMaxSpanAge(), + Client: client, + Logger: logger, + MetricsFactory: mFactory, IndexPrefix: cfg.GetIndexPrefix(), TagDotReplacement: cfg.GetTagDotReplacement(), - Archive: true, + ReadAlias: cfg.GetUseReadAlias(), + Archive: archive, }), nil } -// CreateArchiveSpanWriter implements storage.ArchiveFactory -func (f *Factory) CreateArchiveSpanWriter() (spanstore.Writer, error) { - cfg := f.Options.Get(archiveNamespace) +func createSpanWriter( + mFactory metrics.Factory, + logger *zap.Logger, + client es.Client, + cfg config.ClientBuilder, + archive bool, +) (spanstore.Writer, error) { var tags []string + fmt.Println("AAAAAA") + fmt.Println(cfg.GetTagsFilePath()) if cfg.GetTagsFilePath() != "" { var err error if tags, err = loadTagsFromFile(cfg.GetTagsFilePath()); err != nil { - f.logger.Error("Could not open file with tags", zap.Error(err)) + logger.Error("Could not open file with tags", zap.Error(err)) return nil, err } } return esSpanStore.NewSpanWriter(esSpanStore.SpanWriterParams{ - Client: f.archiveClient, - Logger: f.logger, - MetricsFactory: f.metricsFactory, + Client: client, + Logger: logger, + MetricsFactory: mFactory, NumShards: cfg.GetNumShards(), NumReplicas: cfg.GetNumReplicas(), IndexPrefix: cfg.GetIndexPrefix(), AllTagsAsFields: cfg.GetAllTagsAsFields(), TagKeysAsFields: tags, TagDotReplacement: cfg.GetTagDotReplacement(), - Archive: true, + Archive: archive, }), nil } diff --git a/plugin/storage/es/factory_test.go b/plugin/storage/es/factory_test.go index c429cd545a3..db04e42fdb2 100644 --- a/plugin/storage/es/factory_test.go +++ b/plugin/storage/es/factory_test.go @@ -56,6 +56,10 @@ func TestElasticsearchFactory(t *testing.T) { assert.EqualError(t, f.Initialize(metrics.NullFactory, zap.NewNop()), "made-up error") f.primaryConfig = &mockClientBuilder{} + f.archiveConfig = &mockClientBuilder{err: errors.New("made-up error2")} + assert.EqualError(t, f.Initialize(metrics.NullFactory, zap.NewNop()), "made-up error2") + + f.archiveConfig = &mockClientBuilder{} assert.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop())) _, err := f.CreateSpanReader() @@ -66,6 +70,12 @@ func TestElasticsearchFactory(t *testing.T) { _, err = f.CreateDependencyReader() assert.NoError(t, err) + + _, err = f.CreateArchiveSpanReader() + assert.NoError(t, err) + + _, err = f.CreateArchiveSpanWriter() + assert.NoError(t, err) } func TestElasticsearchTagsFileDoNotExist(t *testing.T) { @@ -73,6 +83,7 @@ func TestElasticsearchTagsFileDoNotExist(t *testing.T) { mockConf := &mockClientBuilder{} mockConf.TagsFilePath = "fixtures/tags_foo.txt" f.primaryConfig = mockConf + f.archiveConfig = mockConf assert.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop())) r, err := f.CreateSpanWriter() require.Error(t, err) diff --git a/plugin/storage/es/options.go b/plugin/storage/es/options.go index 8d85ba0220e..8b6c9e05c7f 100644 --- a/plugin/storage/es/options.go +++ b/plugin/storage/es/options.go @@ -46,6 +46,7 @@ const ( suffixTagsAsFieldsAll = suffixTagsAsFields + ".all" suffixTagsFile = suffixTagsAsFields + ".config-file" suffixTagDeDotChar = suffixTagsAsFields + ".dot-replacement" + suffixReadAlias = ".read-alias" ) // TODO this should be moved next to config.Configuration struct (maybe ./flags package) @@ -188,6 +189,15 @@ func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) { nsConfig.namespace+suffixTagDeDotChar, nsConfig.TagDotReplacement, "(experimental) The character used to replace dots (\".\") in tag keys stored as object fields.") + // TODO support rollover API for main indices + if nsConfig.namespace == archiveNamespace { + flagSet.Bool( + nsConfig.namespace+suffixReadAlias, + nsConfig.UseReadAlias, + // TODO with the main index we will need to configure more names - span, serviceNames + // we could do a prefix, if empty it would use the standard name. + "Use \"-read\" alias for read indices.") + } } // InitFromViper initializes Options with properties from viper @@ -219,6 +229,7 @@ func initFromViper(cfg *namespaceConfig, v *viper.Viper) { cfg.AllTagsAsFields = v.GetBool(cfg.namespace + suffixTagsAsFieldsAll) cfg.TagsFilePath = v.GetString(cfg.namespace + suffixTagsFile) cfg.TagDotReplacement = v.GetString(cfg.namespace + suffixTagDeDotChar) + cfg.UseReadAlias = v.GetBool(cfg.namespace + suffixReadAlias) } // GetPrimary returns primary configuration. diff --git a/plugin/storage/es/spanstore/index_utils.go b/plugin/storage/es/spanstore/index_utils.go new file mode 100644 index 00000000000..7f7016e99cd --- /dev/null +++ b/plugin/storage/es/spanstore/index_utils.go @@ -0,0 +1,67 @@ +// Copyright (c) 2018 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package spanstore + +import ( + "time" +) + +type spanAndServiceIndexFce func(spanTime time.Time) (string, string) + +type indicesForTimeRangeFce func(indexName string, startTime time.Time, endTime time.Time) []string + +func getSpanAndServiceIndexFunc(archive bool, spanIndexPrefix, serviceIndexPrefix string) spanAndServiceIndexFce { + if archive { + return func(date time.Time) (string, string) { + return archiveIndex(spanIndexPrefix, archiveIndexSuffix), "" + } + } + return func(date time.Time) (string, string) { + return indexWithDate(spanIndexPrefix, date), indexWithDate(serviceIndexPrefix, date) + } +} + +func getIndicesFceForTimeRange(archive bool, archivePrefix string) indicesForTimeRangeFce { + if archive { + return func(indexName string, startTime time.Time, endTime time.Time) []string { + return []string{archiveIndex(indexName, archivePrefix)} + } + } + return indicesForTimeRange +} + +// returns index name with date +func indexWithDate(indexPrefix string, date time.Time) string { + spanDate := date.UTC().Format("2006-01-02") + return indexPrefix + spanDate +} + +// returns archive index name +func archiveIndex(indexPrefix, archivePrefix string) string { + return indexPrefix + archivePrefix +} + +// indicesForTimeRange returns the array of indices that we need to query, based on query params +func indicesForTimeRange(indexName string, startTime time.Time, endTime time.Time) []string { + var indices []string + firstIndex := indexWithDate(indexName, startTime) + currentIndex := indexWithDate(indexName, endTime) + for currentIndex != firstIndex { + indices = append(indices, currentIndex) + endTime = endTime.Add(-24 * time.Hour) + currentIndex = indexWithDate(indexName, endTime) + } + return append(indices, firstIndex) +} diff --git a/plugin/storage/es/spanstore/reader.go b/plugin/storage/es/spanstore/reader.go index e7e28946c86..cda701c0cff 100644 --- a/plugin/storage/es/spanstore/reader.go +++ b/plugin/storage/es/spanstore/reader.go @@ -35,10 +35,11 @@ import ( ) const ( - spanIndex = "jaeger-span-" - serviceIndex = "jaeger-service-" - traceIDAggregation = "traceIDs" - archiveIndexSuffix = "archive" + spanIndex = "jaeger-span-" + serviceIndex = "jaeger-service-" + archiveIndexSuffix = "archive" + archiveReadIndexPrefix = "archive-read" + traceIDAggregation = "traceIDs" traceIDField = "traceID" durationField = "duration" @@ -95,7 +96,7 @@ type SpanReader struct { spanIndexPrefix string serviceIndexPrefix string spanConverter dbmodel.ToDomain - archive bool + indicesForTimeRangeFce indicesForTimeRangeFce } // SpanReaderParams holds constructor params for NewSpanReader @@ -108,6 +109,7 @@ type SpanReaderParams struct { IndexPrefix string TagDotReplacement string Archive bool + ReadAlias bool } // NewSpanReader returns a new SpanReader with a metrics. @@ -120,6 +122,12 @@ func newSpanReader(p SpanReaderParams) *SpanReader { if p.IndexPrefix != "" { p.IndexPrefix += ":" } + var archivePrefix string + if p.ReadAlias { + archivePrefix = archiveReadIndexPrefix + } else { + archivePrefix = archiveIndexSuffix + } return &SpanReader{ ctx: ctx, client: p.Client, @@ -129,7 +137,7 @@ func newSpanReader(p SpanReaderParams) *SpanReader { spanIndexPrefix: p.IndexPrefix + spanIndex, serviceIndexPrefix: p.IndexPrefix + serviceIndex, spanConverter: dbmodel.NewToDomain(p.TagDotReplacement), - archive: p.Archive, + indicesForTimeRangeFce: getIndicesFceForTimeRange(p.Archive, archivePrefix), } } @@ -175,25 +183,12 @@ func (s *SpanReader) unmarshalJSONSpan(esSpanRaw *elastic.SearchHit) (*dbmodel.S return &jsonSpan, nil } -// Returns the array of indices that we need to query, based on query params -func (s *SpanReader) indicesForTimeRange(indexName string, startTime time.Time, endTime time.Time) []string { - var indices []string - firstIndex := indexWithDate(indexName, startTime) - currentIndex := indexWithDate(indexName, endTime) - for currentIndex != firstIndex { - indices = append(indices, currentIndex) - endTime = endTime.Add(-24 * time.Hour) - currentIndex = indexWithDate(indexName, endTime) - } - return append(indices, firstIndex) -} - // GetServices returns all services traced by Jaeger, ordered by frequency func (s *SpanReader) GetServices(ctx context.Context) ([]string, error) { span, ctx := opentracing.StartSpanFromContext(ctx, "GetServices") defer span.Finish() currentTime := time.Now() - jaegerIndices := s.indicesForTimeRange(s.serviceIndexPrefix, currentTime.Add(-s.maxSpanAge), currentTime) + jaegerIndices := s.indicesForTimeRangeFce(s.serviceIndexPrefix, currentTime.Add(-s.maxSpanAge), currentTime) return s.serviceOperationStorage.getServices(jaegerIndices) } @@ -202,7 +197,7 @@ func (s *SpanReader) GetOperations(ctx context.Context, service string) ([]strin span, ctx := opentracing.StartSpanFromContext(ctx, "GetOperations") defer span.Finish() currentTime := time.Now() - jaegerIndices := s.indicesForTimeRange(s.serviceIndexPrefix, currentTime.Add(-s.maxSpanAge), currentTime) + jaegerIndices := s.indicesForTimeRangeFce(s.serviceIndexPrefix, currentTime.Add(-s.maxSpanAge), currentTime) return s.serviceOperationStorage.getOperations(jaegerIndices, service) } @@ -250,13 +245,8 @@ func (s *SpanReader) multiRead(ctx context.Context, traceIDs []string, startTime var traces []*model.Trace // Add an hour in both directions so that traces that straddle two indexes are retrieved. // i.e starts in one and ends in another. - var indices []string - if s.archive { - indices = append(indices, archiveIndex(s.spanIndexPrefix)) - } else { - indices = s.indicesForTimeRange(s.spanIndexPrefix, startTime.Add(-time.Hour), endTime.Add(time.Hour)) - } - + indices := s.indicesForTimeRangeFce(s.spanIndexPrefix, startTime.Add(-time.Hour), endTime.Add(time.Hour)) + fmt.Println(indices) nextTime := model.TimeAsEpochMicroseconds(startTime.Add(-time.Hour)) searchAfterTime := make(map[string]uint64) @@ -277,10 +267,10 @@ func (s *SpanReader) multiRead(ctx context.Context, traceIDs []string, startTime Type(spanType). Source( elastic.NewSearchSource(). - Query(query). - Size(defaultDocCount). - Sort("startTime", true). - SearchAfter(nextTime)) + Query(query). + Size(defaultDocCount). + Sort("startTime", true). + SearchAfter(nextTime)) } // set traceIDs to empty traceIDs = nil @@ -405,7 +395,7 @@ func (s *SpanReader) findTraceIDs(ctx context.Context, traceQuery *spanstore.Tra aggregation := s.buildTraceIDAggregation(traceQuery.NumTraces) boolQuery := s.buildFindTraceIDsQuery(traceQuery) - jaegerIndices := s.indicesForTimeRange(s.spanIndexPrefix, traceQuery.StartTimeMin, traceQuery.StartTimeMax) + jaegerIndices := s.indicesForTimeRangeFce(s.spanIndexPrefix, traceQuery.StartTimeMin, traceQuery.StartTimeMax) searchService := s.client.Search(jaegerIndices...). Type(spanType). diff --git a/plugin/storage/es/spanstore/reader_test.go b/plugin/storage/es/spanstore/reader_test.go index 69e3678b68b..26b31ac2a08 100644 --- a/plugin/storage/es/spanstore/reader_test.go +++ b/plugin/storage/es/spanstore/reader_test.go @@ -103,7 +103,7 @@ func withSpanReader(fn func(r *spanReaderTest)) { fn(r) } -func withArchiveSpanReader(fn func(r *spanReaderTest)) { +func withArchiveSpanReader(readAlias bool, fn func(r *spanReaderTest)) { client := &mocks.Client{} logger, logBuffer := testutils.NewLogger() r := &spanReaderTest{ @@ -116,7 +116,8 @@ func withArchiveSpanReader(fn func(r *spanReaderTest)) { MaxSpanAge: 0, IndexPrefix: "", TagDotReplacement: "@", - Archive: true, + Archive: true, + ReadAlias: readAlias, }), } fn(r) @@ -223,7 +224,7 @@ func TestSpanReader_GetTraceQueryError(t *testing.T) { Responses: []*elastic.SearchResult{}, }, nil) trace, err := r.reader.GetTrace(context.Background(), model.NewTraceID(0, 1)) - require.EqualError(t, err, "No trace with that ID found") + require.EqualError(t, err, "trace not found") require.Nil(t, trace) }) } @@ -242,7 +243,7 @@ func TestSpanReader_GetTraceNilHits(t *testing.T) { }, nil) trace, err := r.reader.GetTrace(context.Background(), model.NewTraceID(0, 1)) - require.EqualError(t, err, "No trace with that ID found") + require.EqualError(t, err, "trace not found") require.Nil(t, trace) }) } @@ -363,7 +364,7 @@ func TestSpanReaderFindIndices(t *testing.T) { } withSpanReader(func(r *spanReaderTest) { for _, testCase := range testCases { - actual := r.reader.indicesForTimeRange(spanIndex, testCase.startTime, testCase.endTime) + actual := r.reader.indicesForTimeRangeFce(spanIndex, testCase.startTime, testCase.endTime) assert.EqualValues(t, testCase.expected, actual) } }) @@ -691,9 +692,16 @@ func TestFindTraceIDs(t *testing.T) { func mockMultiSearchService(r *spanReaderTest) *mock.Call { multiSearchService := &mocks.MultiSearchService{} multiSearchService.On("Add", mock.Anything, mock.Anything, mock.Anything).Return(multiSearchService) - //multiSearchService.On("Index", mock.AnythingOfType("string"), mock.AnythingOfType("string"), - // mock.AnythingOfType("string"), mock.AnythingOfType("string"), mock.AnythingOfType("string")).Return(multiSearchService) - multiSearchService.On("Index", "jaeger-span-archive") + multiSearchService.On("Index", mock.AnythingOfType("string"), mock.AnythingOfType("string"), + mock.AnythingOfType("string"), mock.AnythingOfType("string"), mock.AnythingOfType("string")).Return(multiSearchService) + r.client.On("MultiSearch").Return(multiSearchService) + return multiSearchService.On("Do", mock.AnythingOfType("*context.emptyCtx")) +} + +func mockArchiveMultiSearchService(r *spanReaderTest, indexName string) *mock.Call { + multiSearchService := &mocks.MultiSearchService{} + multiSearchService.On("Add", mock.Anything, mock.Anything, mock.Anything).Return(multiSearchService) + multiSearchService.On("Index", indexName).Return(multiSearchService) r.client.On("MultiSearch").Return(multiSearchService) return multiSearchService.On("Do", mock.AnythingOfType("*context.emptyCtx")) } @@ -927,19 +935,32 @@ func TestSpanReader_GetEmptyIndex(t *testing.T) { } func TestSpanReader_ArchiveTraces(t *testing.T) { - withArchiveSpanReader(func(r *spanReaderTest) { - + withArchiveSpanReader(false, func(r *spanReaderTest) { mockSearchService(r). Return(&elastic.SearchResult{}, nil) - mockMultiSearchService(r). + mockArchiveMultiSearchService(r, "jaeger-span-archive"). Return(&elastic.MultiSearchResult{ Responses: []*elastic.SearchResult{}, - }, nil).On("Index", "jaegera-span-archive") + }, nil) + trace, err := r.reader.GetTrace(context.Background(), model.TraceID{}) + require.Nil(t, trace) + assert.EqualError(t, err, "trace not found") + }) +} - services, err := r.reader.GetTrace(context.Background(), model.TraceID{}) - require.NoError(t, err) - assert.Empty(t, services) +func TestSpanReader_ArchiveTraces_ReadAlias(t *testing.T) { + withArchiveSpanReader(true, func(r *spanReaderTest) { + mockSearchService(r). + Return(&elastic.SearchResult{}, nil) + mockArchiveMultiSearchService(r, "jaeger-span-archive-read"). + Return(&elastic.MultiSearchResult{ + Responses: []*elastic.SearchResult{}, + }, nil) + + trace, err := r.reader.GetTrace(context.Background(), model.TraceID{}) + require.Nil(t, trace) + assert.EqualError(t, err, "trace not found") }) } diff --git a/plugin/storage/es/spanstore/writer.go b/plugin/storage/es/spanstore/writer.go index 0e2b3c564dc..7b13a78185d 100644 --- a/plugin/storage/es/spanstore/writer.go +++ b/plugin/storage/es/spanstore/writer.go @@ -47,18 +47,16 @@ type serviceWriter func(string, *dbmodel.Span) // SpanWriter is a wrapper around elastic.Client type SpanWriter struct { - ctx context.Context - client es.Client - logger *zap.Logger - writerMetrics spanWriterMetrics // TODO: build functions to wrap around each Do fn - indexCache cache.Cache - serviceWriter serviceWriter - numShards int64 - numReplicas int64 - spanIndexPrefix string - serviceIndexPrefix string - spanConverter dbmodel.FromDomain - archive bool + ctx context.Context + client es.Client + logger *zap.Logger + writerMetrics spanWriterMetrics // TODO: build functions to wrap around each Do fn + indexCache cache.Cache + serviceWriter serviceWriter + numShards int64 + numReplicas int64 + spanConverter dbmodel.FromDomain + spanServiceIndex spanAndServiceIndexFce } // SpanWriterParams holds constructor parameters for NewSpanWriter @@ -101,28 +99,18 @@ func NewSpanWriter(p SpanWriterParams) *SpanWriter { TTL: 48 * time.Hour, }, ), - numShards: p.NumShards, - numReplicas: p.NumReplicas, - spanIndexPrefix: p.IndexPrefix + spanIndex, - serviceIndexPrefix: p.IndexPrefix + serviceIndex, - spanConverter: dbmodel.NewFromDomain(p.AllTagsAsFields, p.TagKeysAsFields, p.TagDotReplacement), - archive: p.Archive, + numShards: p.NumShards, + numReplicas: p.NumReplicas, + spanConverter: dbmodel.NewFromDomain(p.AllTagsAsFields, p.TagKeysAsFields, p.TagDotReplacement), + spanServiceIndex: getSpanAndServiceIndexFunc(p.Archive, p.IndexPrefix+spanIndex, p.IndexPrefix+serviceIndex), } } // WriteSpan writes a span and its corresponding service:operation in ElasticSearch func (s *SpanWriter) WriteSpan(span *model.Span) error { - var spanIndexName string - if s.archive { - spanIndexName = archiveIndex(s.spanIndexPrefix) - } else { - spanIndexName = indexWithDate(s.spanIndexPrefix, span.StartTime) - } - + spanIndexName, serviceIndexName := s.spanServiceIndex(span.StartTime) jsonSpan := s.spanConverter.FromDomainEmbedProcess(span) - - if !s.archive { - serviceIndexName := indexWithDate(s.serviceIndexPrefix, span.StartTime) + if serviceIndexName != "" { if err := s.createIndex(serviceIndexName, serviceMapping, jsonSpan); err != nil { return err } @@ -140,15 +128,6 @@ func (s *SpanWriter) Close() error { return s.client.Close() } -func indexWithDate(indexPrefix string, date time.Time) string { - spanDate := date.UTC().Format("2006-01-02") - return indexPrefix + spanDate -} - -func archiveIndex(indexPrefix string) string { - return indexPrefix + archiveIndexSuffix -} - func (s *SpanWriter) createIndex(indexName string, mapping string, jsonSpan *dbmodel.Span) error { if !keyInCache(indexName, s.indexCache) { start := time.Now() diff --git a/plugin/storage/es/spanstore/writer_test.go b/plugin/storage/es/spanstore/writer_test.go index 66ebe2dec9c..500255dce89 100644 --- a/plugin/storage/es/spanstore/writer_test.go +++ b/plugin/storage/es/spanstore/writer_test.go @@ -59,19 +59,30 @@ func TestNewSpanWriterIndexPrefix(t *testing.T) { testCases := []struct { prefix string expected string + archive bool }{ {prefix: "", expected: ""}, {prefix: "foo", expected: "foo:"}, {prefix: ":", expected: "::"}, + {prefix: "", expected: "", archive: true}, + {prefix: "foo", expected: "foo:", archive: true}, } client := &mocks.Client{} logger, _ := testutils.NewLogger() metricsFactory := metrics.NewLocalFactory(0) for _, testCase := range testCases { w := NewSpanWriter(SpanWriterParams{Client: client, Logger: logger, MetricsFactory: metricsFactory, - IndexPrefix: testCase.prefix}) - assert.Equal(t, testCase.expected+spanIndex, w.spanIndexPrefix) - assert.Equal(t, testCase.expected+serviceIndex, w.serviceIndexPrefix) + IndexPrefix: testCase.prefix, Archive: testCase.archive}) + if testCase.archive { + spanIndexName, serviceIndexName := w.spanServiceIndex(time.Now()) + assert.Equal(t, testCase.expected+spanIndex+archiveIndexSuffix, spanIndexName) + assert.Equal(t, "", serviceIndexName) + } else { + date := time.Now() + spanIndexName, serviceIndexName := w.spanServiceIndex(date) + assert.Equal(t, testCase.expected+spanIndex+date.UTC().Format("2006-01-02"), spanIndexName) + assert.Equal(t, testCase.expected+serviceIndex+date.UTC().Format("2006-01-02"), serviceIndexName) + } } }