Skip to content

Commit

Permalink
Index prefix
Browse files Browse the repository at this point in the history
Signed-off-by: Pavol Loffay <[email protected]>
  • Loading branch information
pavolloffay committed Dec 5, 2018
1 parent 48e2095 commit bfd2dbe
Show file tree
Hide file tree
Showing 10 changed files with 224 additions and 132 deletions.
7 changes: 7 additions & 0 deletions pkg/es/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type Configuration struct {
AllTagsAsFields bool
TagDotReplacement string
TLS TLSConfig
UseReadAlias bool
}

// TLSConfig describes the configuration properties to connect tls enabled ElasticSearch cluster
Expand All @@ -71,6 +72,7 @@ type ClientBuilder interface {
GetTagsFilePath() string
GetAllTagsAsFields() bool
GetTagDotReplacement() string
GetUseReadAlias() bool
}

// NewClient creates a new ElasticSearch client
Expand Down Expand Up @@ -206,6 +208,11 @@ func (c *Configuration) GetTagDotReplacement() string {
return c.TagDotReplacement
}

// GetUseReadAlias returns read index name
func (c *Configuration) GetUseReadAlias() bool {
return c.UseReadAlias
}

// getConfigOptions wraps the configs to feed to the ElasticSearch client init
func (c *Configuration) getConfigOptions() ([]elastic.ClientOptionFunc, error) {
options := []elastic.ClientOptionFunc{elastic.SetURL(c.Servers...), elastic.SetSniff(c.Sniffer)}
Expand Down
2 changes: 2 additions & 0 deletions plugin/storage/es/esCleaner.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ def main():
prefix += 'jaeger'

ilo.filter_by_regex(kind='prefix', value=prefix)
# This excludes archive index as we use source='name'
# source `creation_date` would include archive index
ilo.filter_by_age(source='name', direction='older', timestring='%Y-%m-%d', unit='days', unit_count=int(sys.argv[1]))
empty_list(ilo, 'No indices to delete')

Expand Down
81 changes: 37 additions & 44 deletions plugin/storage/es/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package es
import (
"bufio"
"flag"
"fmt"
"os"
"path/filepath"
"strings"
Expand Down Expand Up @@ -74,7 +75,6 @@ func (f *Factory) InitFromViper(v *viper.Viper) {
func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) error {
f.metricsFactory, f.logger = metricsFactory, logger

// TODO move to one builder function
primaryClient, err := f.primaryConfig.NewClient(logger, metricsFactory)
if err != nil {
return err
Expand All @@ -90,38 +90,12 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger)

// CreateSpanReader implements storage.Factory
func (f *Factory) CreateSpanReader() (spanstore.Reader, error) {
cfg := f.primaryConfig
return esSpanStore.NewSpanReader(esSpanStore.SpanReaderParams{
Client: f.primaryClient,
Logger: f.logger,
MetricsFactory: f.metricsFactory,
MaxSpanAge: cfg.GetMaxSpanAge(),
IndexPrefix: cfg.GetIndexPrefix(),
TagDotReplacement: cfg.GetTagDotReplacement(),
}), nil
return createSpanReader(f.metricsFactory, f.logger, f.primaryClient, f.primaryConfig, false)
}

// CreateSpanWriter implements storage.Factory
func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) {
cfg := f.primaryConfig
var tags []string
if cfg.GetTagsFilePath() != "" {
var err error
if tags, err = loadTagsFromFile(cfg.GetTagsFilePath()); err != nil {
f.logger.Error("Could not open file with tags", zap.Error(err))
return nil, err
}
}
return esSpanStore.NewSpanWriter(esSpanStore.SpanWriterParams{Client: f.primaryClient,
Logger: f.logger,
MetricsFactory: f.metricsFactory,
NumShards: cfg.GetNumShards(),
NumReplicas: cfg.GetNumReplicas(),
IndexPrefix: cfg.GetIndexPrefix(),
AllTagsAsFields: cfg.GetAllTagsAsFields(),
TagKeysAsFields: tags,
TagDotReplacement: cfg.GetTagDotReplacement(),
}), nil
return createSpanWriter(f.metricsFactory, f.logger, f.archiveClient, f.primaryConfig, false)
}

// CreateDependencyReader implements storage.Factory
Expand Down Expand Up @@ -149,40 +123,59 @@ func loadTagsFromFile(filePath string) ([]string, error) {

// CreateArchiveSpanReader implements storage.ArchiveFactory
func (f *Factory) CreateArchiveSpanReader() (spanstore.Reader, error) {
cfg := f.Options.Get(archiveNamespace)
return createSpanReader(f.metricsFactory, f.logger, f.archiveClient, f.Options.Get(archiveNamespace), true)
}

// CreateArchiveSpanWriter implements storage.ArchiveFactory
func (f *Factory) CreateArchiveSpanWriter() (spanstore.Writer, error) {
return createSpanWriter(f.metricsFactory, f.logger, f.archiveClient, f.Options.Get(archiveNamespace), true)
}

func createSpanReader(
mFactory metrics.Factory,
logger *zap.Logger,
client es.Client,
cfg config.ClientBuilder,
archive bool,
) (spanstore.Reader, error) {
return esSpanStore.NewSpanReader(esSpanStore.SpanReaderParams{
Client: f.archiveClient,
Logger: f.logger,
MetricsFactory: f.metricsFactory,
// TODO allow this config? we probably want to use forever
MaxSpanAge: cfg.GetMaxSpanAge(),
Client: client,
Logger: logger,
MetricsFactory: mFactory,
IndexPrefix: cfg.GetIndexPrefix(),
TagDotReplacement: cfg.GetTagDotReplacement(),
Archive: true,
ReadAlias: cfg.GetUseReadAlias(),
Archive: archive,
}), nil
}

// CreateArchiveSpanWriter implements storage.ArchiveFactory
func (f *Factory) CreateArchiveSpanWriter() (spanstore.Writer, error) {
cfg := f.Options.Get(archiveNamespace)
func createSpanWriter(
mFactory metrics.Factory,
logger *zap.Logger,
client es.Client,
cfg config.ClientBuilder,
archive bool,
) (spanstore.Writer, error) {
var tags []string
fmt.Println("AAAAAA")
fmt.Println(cfg.GetTagsFilePath())
if cfg.GetTagsFilePath() != "" {
var err error
if tags, err = loadTagsFromFile(cfg.GetTagsFilePath()); err != nil {
f.logger.Error("Could not open file with tags", zap.Error(err))
logger.Error("Could not open file with tags", zap.Error(err))
return nil, err
}
}
return esSpanStore.NewSpanWriter(esSpanStore.SpanWriterParams{
Client: f.archiveClient,
Logger: f.logger,
MetricsFactory: f.metricsFactory,
Client: client,
Logger: logger,
MetricsFactory: mFactory,
NumShards: cfg.GetNumShards(),
NumReplicas: cfg.GetNumReplicas(),
IndexPrefix: cfg.GetIndexPrefix(),
AllTagsAsFields: cfg.GetAllTagsAsFields(),
TagKeysAsFields: tags,
TagDotReplacement: cfg.GetTagDotReplacement(),
Archive: true,
Archive: archive,
}), nil
}
11 changes: 11 additions & 0 deletions plugin/storage/es/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ func TestElasticsearchFactory(t *testing.T) {
assert.EqualError(t, f.Initialize(metrics.NullFactory, zap.NewNop()), "made-up error")

f.primaryConfig = &mockClientBuilder{}
f.archiveConfig = &mockClientBuilder{err: errors.New("made-up error2")}
assert.EqualError(t, f.Initialize(metrics.NullFactory, zap.NewNop()), "made-up error2")

f.archiveConfig = &mockClientBuilder{}
assert.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop()))

_, err := f.CreateSpanReader()
Expand All @@ -66,13 +70,20 @@ func TestElasticsearchFactory(t *testing.T) {

_, err = f.CreateDependencyReader()
assert.NoError(t, err)

_, err = f.CreateArchiveSpanReader()
assert.NoError(t, err)

_, err = f.CreateArchiveSpanWriter()
assert.NoError(t, err)
}

func TestElasticsearchTagsFileDoNotExist(t *testing.T) {
f := NewFactory()
mockConf := &mockClientBuilder{}
mockConf.TagsFilePath = "fixtures/tags_foo.txt"
f.primaryConfig = mockConf
f.archiveConfig = mockConf
assert.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop()))
r, err := f.CreateSpanWriter()
require.Error(t, err)
Expand Down
11 changes: 11 additions & 0 deletions plugin/storage/es/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ const (
suffixTagsAsFieldsAll = suffixTagsAsFields + ".all"
suffixTagsFile = suffixTagsAsFields + ".config-file"
suffixTagDeDotChar = suffixTagsAsFields + ".dot-replacement"
suffixReadAlias = ".read-alias"
)

// TODO this should be moved next to config.Configuration struct (maybe ./flags package)
Expand Down Expand Up @@ -188,6 +189,15 @@ func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) {
nsConfig.namespace+suffixTagDeDotChar,
nsConfig.TagDotReplacement,
"(experimental) The character used to replace dots (\".\") in tag keys stored as object fields.")
// TODO support rollover API for main indices
if nsConfig.namespace == archiveNamespace {
flagSet.Bool(
nsConfig.namespace+suffixReadAlias,
nsConfig.UseReadAlias,
// TODO with the main index we will need to configure more names - span, serviceNames
// we could do a prefix, if empty it would use the standard name.
"Use \"-read\" alias for read indices.")
}
}

// InitFromViper initializes Options with properties from viper
Expand Down Expand Up @@ -219,6 +229,7 @@ func initFromViper(cfg *namespaceConfig, v *viper.Viper) {
cfg.AllTagsAsFields = v.GetBool(cfg.namespace + suffixTagsAsFieldsAll)
cfg.TagsFilePath = v.GetString(cfg.namespace + suffixTagsFile)
cfg.TagDotReplacement = v.GetString(cfg.namespace + suffixTagDeDotChar)
cfg.UseReadAlias = v.GetBool(cfg.namespace + suffixReadAlias)
}

// GetPrimary returns primary configuration.
Expand Down
67 changes: 67 additions & 0 deletions plugin/storage/es/spanstore/index_utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright (c) 2018 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package spanstore

import (
"time"
)

type spanAndServiceIndexFce func(spanTime time.Time) (string, string)

type indicesForTimeRangeFce func(indexName string, startTime time.Time, endTime time.Time) []string

func getSpanAndServiceIndexFunc(archive bool, spanIndexPrefix, serviceIndexPrefix string) spanAndServiceIndexFce {
if archive {
return func(date time.Time) (string, string) {
return archiveIndex(spanIndexPrefix, archiveIndexSuffix), ""
}
}
return func(date time.Time) (string, string) {
return indexWithDate(spanIndexPrefix, date), indexWithDate(serviceIndexPrefix, date)
}
}

func getIndicesFceForTimeRange(archive bool, archivePrefix string) indicesForTimeRangeFce {
if archive {
return func(indexName string, startTime time.Time, endTime time.Time) []string {
return []string{archiveIndex(indexName, archivePrefix)}
}
}
return indicesForTimeRange
}

// returns index name with date
func indexWithDate(indexPrefix string, date time.Time) string {
spanDate := date.UTC().Format("2006-01-02")
return indexPrefix + spanDate
}

// returns archive index name
func archiveIndex(indexPrefix, archivePrefix string) string {
return indexPrefix + archivePrefix
}

// indicesForTimeRange returns the array of indices that we need to query, based on query params
func indicesForTimeRange(indexName string, startTime time.Time, endTime time.Time) []string {
var indices []string
firstIndex := indexWithDate(indexName, startTime)
currentIndex := indexWithDate(indexName, endTime)
for currentIndex != firstIndex {
indices = append(indices, currentIndex)
endTime = endTime.Add(-24 * time.Hour)
currentIndex = indexWithDate(indexName, endTime)
}
return append(indices, firstIndex)
}
Loading

0 comments on commit bfd2dbe

Please sign in to comment.