From 82121e99f9d34f93b204a81ba4e8ec6311a07d75 Mon Sep 17 00:00:00 2001 From: Yuri Shkuro Date: Wed, 19 Apr 2023 00:43:28 -0400 Subject: [PATCH] Simplify ES config and factory (#4396) The `ClientBuilder` interface was unnecessary, use Configuration struct directly --------- Signed-off-by: Yuri Shkuro --- pkg/es/config/config.go | 131 +----------------------------- plugin/storage/es/factory.go | 79 +++++++++--------- plugin/storage/es/factory_test.go | 55 +++++++------ 3 files changed, 74 insertions(+), 191 deletions(-) diff --git a/pkg/es/config/config.go b/pkg/es/config/config.go index ebe729c3ed3..48460a681f2 100644 --- a/pkg/es/config/config.go +++ b/pkg/es/config/config.go @@ -92,36 +92,8 @@ type TagsAsFields struct { Include string `mapstructure:"include"` } -// ClientBuilder creates new es.Client -type ClientBuilder interface { - NewClient(logger *zap.Logger, metricsFactory metrics.Factory) (es.Client, error) - GetRemoteReadClusters() []string - GetNumShards() int64 - GetNumReplicas() int64 - GetMaxSpanAge() time.Duration - GetMaxDocCount() int - GetIndexPrefix() string - GetIndexDateLayoutSpans() string - GetIndexDateLayoutServices() string - GetIndexDateLayoutDependencies() string - GetIndexRolloverFrequencySpansDuration() time.Duration - GetIndexRolloverFrequencyServicesDuration() time.Duration - GetTagsFilePath() string - GetAllTagsAsFields() bool - GetTagDotReplacement() string - GetUseReadWriteAliases() bool - GetTokenFilePath() string - IsStorageEnabled() bool - IsCreateIndexTemplates() bool - GetVersion() uint - TagKeysAsFields() ([]string, error) - GetUseILM() bool - GetLogLevel() string - GetSendGetBodyAs() string -} - // NewClient creates a new ElasticSearch client -func (c *Configuration) NewClient(logger *zap.Logger, metricsFactory metrics.Factory) (es.Client, error) { +func NewClient(c *Configuration, logger *zap.Logger, metricsFactory metrics.Factory) (es.Client, error) { if len(c.Servers) < 1 { return nil, errors.New("no servers specified") } @@ -275,51 +247,6 @@ func (c *Configuration) ApplyDefaults(source *Configuration) { } } -// GetRemoteReadClusters returns list of remote read clusters -func (c *Configuration) GetRemoteReadClusters() []string { - return c.RemoteReadClusters -} - -// GetNumShards returns number of shards from Configuration -func (c *Configuration) GetNumShards() int64 { - return c.NumShards -} - -// GetNumReplicas returns number of replicas from Configuration -func (c *Configuration) GetNumReplicas() int64 { - return c.NumReplicas -} - -// GetMaxSpanAge returns max span age from Configuration -func (c *Configuration) GetMaxSpanAge() time.Duration { - return c.MaxSpanAge -} - -// GetMaxDocCount returns the maximum number of documents that a query should return -func (c *Configuration) GetMaxDocCount() int { - return c.MaxDocCount -} - -// GetIndexPrefix returns index prefix -func (c *Configuration) GetIndexPrefix() string { - return c.IndexPrefix -} - -// GetIndexDateLayoutSpans returns jaeger-span index date layout -func (c *Configuration) GetIndexDateLayoutSpans() string { - return c.IndexDateLayoutSpans -} - -// GetIndexDateLayoutServices returns jaeger-service index date layout -func (c *Configuration) GetIndexDateLayoutServices() string { - return c.IndexDateLayoutServices -} - -// GetIndexDateLayoutDependencies returns jaeger-dependencies index date layout -func (c *Configuration) GetIndexDateLayoutDependencies() string { - return c.IndexDateLayoutDependencies -} - // GetIndexRolloverFrequencySpansDuration returns jaeger-span index rollover frequency duration func (c *Configuration) GetIndexRolloverFrequencySpansDuration() time.Duration { if c.IndexRolloverFrequencySpans == "hour" { @@ -336,62 +263,6 @@ func (c *Configuration) GetIndexRolloverFrequencyServicesDuration() time.Duratio return -24 * time.Hour } -// GetTagsFilePath returns a path to file containing tag keys -func (c *Configuration) GetTagsFilePath() string { - return c.Tags.File -} - -// GetAllTagsAsFields returns true if all tags should be stored as object fields -func (c *Configuration) GetAllTagsAsFields() bool { - return c.Tags.AllAsFields -} - -// GetVersion returns Elasticsearch version -func (c *Configuration) GetVersion() uint { - return c.Version -} - -// GetTagDotReplacement returns character is used to replace dots in tag keys, when -// the tag is stored as object field. -func (c *Configuration) GetTagDotReplacement() string { - return c.Tags.DotReplacement -} - -// GetUseReadWriteAliases indicates whether read alias should be used -func (c *Configuration) GetUseReadWriteAliases() bool { - return c.UseReadWriteAliases -} - -// GetUseILM indicates whether ILM should be used -func (c *Configuration) GetUseILM() bool { - return c.UseILM -} - -// GetLogLevel returns the log-level the ES client should log at. -func (c *Configuration) GetLogLevel() string { - return c.LogLevel -} - -// GetSendGetBodyAs returns the SendGetBodyAs the ES client should use. -func (c *Configuration) GetSendGetBodyAs() string { - return c.SendGetBodyAs -} - -// GetTokenFilePath returns file path containing the bearer token -func (c *Configuration) GetTokenFilePath() string { - return c.TokenFilePath -} - -// IsStorageEnabled determines whether storage is enabled -func (c *Configuration) IsStorageEnabled() bool { - return c.Enabled -} - -// IsCreateIndexTemplates determines whether index templates should be created or not -func (c *Configuration) IsCreateIndexTemplates() bool { - return c.CreateIndexTemplates -} - // TagKeysAsFields returns tags from the file and command line merged func (c *Configuration) TagKeysAsFields() ([]string, error) { var tags []string diff --git a/plugin/storage/es/factory.go b/plugin/storage/es/factory.go index cf98e692afb..d0f78c294f3 100644 --- a/plugin/storage/es/factory.go +++ b/plugin/storage/es/factory.go @@ -51,16 +51,19 @@ type Factory struct { metricsFactory metrics.Factory logger *zap.Logger - primaryConfig config.ClientBuilder + newClientFn func(c *config.Configuration, logger *zap.Logger, metricsFactory metrics.Factory) (es.Client, error) + + primaryConfig *config.Configuration primaryClient es.Client - archiveConfig config.ClientBuilder + archiveConfig *config.Configuration archiveClient es.Client } // NewFactory creates a new Factory. func NewFactory() *Factory { return &Factory{ - Options: NewOptions(primaryNamespace, archiveNamespace), + Options: NewOptions(primaryNamespace, archiveNamespace), + newClientFn: config.NewClient, } } @@ -89,13 +92,13 @@ func (f *Factory) InitFromOptions(o Options) { func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) error { f.metricsFactory, f.logger = metricsFactory, logger - primaryClient, err := f.primaryConfig.NewClient(logger, metricsFactory) + primaryClient, err := f.newClientFn(f.primaryConfig, logger, metricsFactory) if err != nil { return fmt.Errorf("failed to create primary Elasticsearch client: %w", err) } f.primaryClient = primaryClient - if f.archiveConfig.IsStorageEnabled() { - f.archiveClient, err = f.archiveConfig.NewClient(logger, metricsFactory) + if f.archiveConfig.Enabled { + f.archiveClient, err = f.newClientFn(f.archiveConfig, logger, metricsFactory) if err != nil { return fmt.Errorf("failed to create archive Elasticsearch client: %w", err) } @@ -120,7 +123,7 @@ func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) { // CreateArchiveSpanReader implements storage.ArchiveFactory func (f *Factory) CreateArchiveSpanReader() (spanstore.Reader, error) { - if !f.archiveConfig.IsStorageEnabled() { + if !f.archiveConfig.Enabled { return nil, nil } return createSpanReader(f.metricsFactory, f.logger, f.archiveClient, f.archiveConfig, true) @@ -128,7 +131,7 @@ func (f *Factory) CreateArchiveSpanReader() (spanstore.Reader, error) { // CreateArchiveSpanWriter implements storage.ArchiveFactory func (f *Factory) CreateArchiveSpanWriter() (spanstore.Writer, error) { - if !f.archiveConfig.IsStorageEnabled() { + if !f.archiveConfig.Enabled { return nil, nil } return createSpanWriter(f.metricsFactory, f.logger, f.archiveClient, f.archiveConfig, true) @@ -138,27 +141,27 @@ func createSpanReader( mFactory metrics.Factory, logger *zap.Logger, client es.Client, - cfg config.ClientBuilder, + cfg *config.Configuration, archive bool, ) (spanstore.Reader, error) { - if cfg.GetUseILM() && !cfg.GetUseReadWriteAliases() { + if cfg.UseILM && !cfg.UseReadWriteAliases { return nil, fmt.Errorf("--es.use-ilm must always be used in conjunction with --es.use-aliases to ensure ES writers and readers refer to the single index mapping") } return esSpanStore.NewSpanReader(esSpanStore.SpanReaderParams{ Client: client, Logger: logger, MetricsFactory: mFactory, - MaxDocCount: cfg.GetMaxDocCount(), - MaxSpanAge: cfg.GetMaxSpanAge(), - IndexPrefix: cfg.GetIndexPrefix(), - SpanIndexDateLayout: cfg.GetIndexDateLayoutSpans(), - ServiceIndexDateLayout: cfg.GetIndexDateLayoutServices(), + MaxDocCount: cfg.MaxDocCount, + MaxSpanAge: cfg.MaxSpanAge, + IndexPrefix: cfg.IndexPrefix, + SpanIndexDateLayout: cfg.IndexDateLayoutSpans, + ServiceIndexDateLayout: cfg.IndexDateLayoutServices, SpanIndexRolloverFrequency: cfg.GetIndexRolloverFrequencySpansDuration(), ServiceIndexRolloverFrequency: cfg.GetIndexRolloverFrequencyServicesDuration(), - TagDotReplacement: cfg.GetTagDotReplacement(), - UseReadWriteAliases: cfg.GetUseReadWriteAliases(), + TagDotReplacement: cfg.Tags.DotReplacement, + UseReadWriteAliases: cfg.UseReadWriteAliases, Archive: archive, - RemoteReadClusters: cfg.GetRemoteReadClusters(), + RemoteReadClusters: cfg.RemoteReadClusters, }), nil } @@ -166,12 +169,12 @@ func createSpanWriter( mFactory metrics.Factory, logger *zap.Logger, client es.Client, - cfg config.ClientBuilder, + cfg *config.Configuration, archive bool, ) (spanstore.Writer, error) { var tags []string var err error - if cfg.GetUseILM() && !cfg.GetUseReadWriteAliases() { + if cfg.UseILM && !cfg.UseReadWriteAliases { return nil, fmt.Errorf("--es.use-ilm must always be used in conjunction with --es.use-aliases to ensure ES writers and readers refer to the single index mapping") } if tags, err = cfg.TagKeysAsFields(); err != nil { @@ -181,11 +184,11 @@ func createSpanWriter( mappingBuilder := mappings.MappingBuilder{ TemplateBuilder: es.TextTemplateBuilder{}, - Shards: cfg.GetNumShards(), - Replicas: cfg.GetNumReplicas(), - EsVersion: cfg.GetVersion(), - IndexPrefix: cfg.GetIndexPrefix(), - UseILM: cfg.GetUseILM(), + Shards: cfg.NumShards, + Replicas: cfg.NumReplicas, + EsVersion: cfg.Version, + IndexPrefix: cfg.IndexPrefix, + UseILM: cfg.UseILM, } spanMapping, serviceMapping, err := mappingBuilder.GetSpanServiceMappings() @@ -196,19 +199,19 @@ func createSpanWriter( Client: client, Logger: logger, MetricsFactory: mFactory, - IndexPrefix: cfg.GetIndexPrefix(), - SpanIndexDateLayout: cfg.GetIndexDateLayoutSpans(), - ServiceIndexDateLayout: cfg.GetIndexDateLayoutServices(), - AllTagsAsFields: cfg.GetAllTagsAsFields(), + IndexPrefix: cfg.IndexPrefix, + SpanIndexDateLayout: cfg.IndexDateLayoutSpans, + ServiceIndexDateLayout: cfg.IndexDateLayoutServices, + AllTagsAsFields: cfg.Tags.AllAsFields, TagKeysAsFields: tags, - TagDotReplacement: cfg.GetTagDotReplacement(), + TagDotReplacement: cfg.Tags.DotReplacement, Archive: archive, - UseReadWriteAliases: cfg.GetUseReadWriteAliases(), + UseReadWriteAliases: cfg.UseReadWriteAliases, }) // Creating a template here would conflict with the one created for ILM resulting to no index rollover - if cfg.IsCreateIndexTemplates() && !cfg.GetUseILM() { - err := writer.CreateTemplates(spanMapping, serviceMapping, cfg.GetIndexPrefix()) + if cfg.CreateIndexTemplates && !cfg.UseILM { + err := writer.CreateTemplates(spanMapping, serviceMapping, cfg.IndexPrefix) if err != nil { return nil, err } @@ -219,15 +222,15 @@ func createSpanWriter( func createDependencyReader( logger *zap.Logger, client es.Client, - cfg config.ClientBuilder, + cfg *config.Configuration, ) (dependencystore.Reader, error) { reader := esDepStore.NewDependencyStore(esDepStore.DependencyStoreParams{ Client: client, Logger: logger, - IndexPrefix: cfg.GetIndexPrefix(), - IndexDateLayout: cfg.GetIndexDateLayoutDependencies(), - MaxDocCount: cfg.GetMaxDocCount(), - UseReadWriteAliases: cfg.GetUseReadWriteAliases(), + IndexPrefix: cfg.IndexPrefix, + IndexDateLayout: cfg.IndexDateLayoutDependencies, + MaxDocCount: cfg.MaxDocCount, + UseReadWriteAliases: cfg.UseReadWriteAliases, }) return reader, nil } diff --git a/plugin/storage/es/factory_test.go b/plugin/storage/es/factory_test.go index 23087ef48ef..fc0a2c0d0ec 100644 --- a/plugin/storage/es/factory_test.go +++ b/plugin/storage/es/factory_test.go @@ -36,12 +36,11 @@ import ( var _ storage.Factory = new(Factory) type mockClientBuilder struct { - escfg.Configuration err error createTemplateError error } -func (m *mockClientBuilder) NewClient(logger *zap.Logger, metricsFactory metrics.Factory) (es.Client, error) { +func (m *mockClientBuilder) NewClient(_ *escfg.Configuration, logger *zap.Logger, metricsFactory metrics.Factory) (es.Client, error) { if m.err == nil { c := &mocks.Client{} tService := &mocks.TemplateCreateService{} @@ -60,16 +59,19 @@ func TestElasticsearchFactory(t *testing.T) { command.ParseFlags([]string{}) f.InitFromViper(v, zap.NewNop()) - // after InitFromViper, f.primaryConfig points to a real session builder that will fail in unit tests, - // so we override it with a mock. - f.primaryConfig = &mockClientBuilder{err: errors.New("made-up error")} + f.newClientFn = (&mockClientBuilder{err: errors.New("made-up error")}).NewClient assert.EqualError(t, f.Initialize(metrics.NullFactory, zap.NewNop()), "failed to create primary Elasticsearch client: made-up error") - f.primaryConfig = &mockClientBuilder{} - f.archiveConfig = &mockClientBuilder{err: errors.New("made-up error2"), Configuration: escfg.Configuration{Enabled: true}} + f.archiveConfig.Enabled = true + f.newClientFn = func(c *escfg.Configuration, logger *zap.Logger, metricsFactory metrics.Factory) (es.Client, error) { + // to test archive storage error, pretend that primary client creation is successful + // but override newClientFn so it fails for the next invocation + f.newClientFn = (&mockClientBuilder{err: errors.New("made-up error2")}).NewClient + return (&mockClientBuilder{}).NewClient(c, logger, metricsFactory) + } assert.EqualError(t, f.Initialize(metrics.NullFactory, zap.NewNop()), "failed to create archive Elasticsearch client: made-up error2") - f.archiveConfig = &mockClientBuilder{} + f.newClientFn = (&mockClientBuilder{}).NewClient assert.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop())) _, err := f.CreateSpanReader() @@ -91,10 +93,13 @@ func TestElasticsearchFactory(t *testing.T) { func TestElasticsearchTagsFileDoNotExist(t *testing.T) { f := NewFactory() - mockConf := &mockClientBuilder{} - mockConf.Tags.File = "fixtures/tags_foo.txt" - f.primaryConfig = mockConf - f.archiveConfig = mockConf + f.primaryConfig = &escfg.Configuration{ + Tags: escfg.TagsAsFields{ + File: "fixtures/file-does-not-exist.txt", + }, + } + f.archiveConfig = &escfg.Configuration{} + f.newClientFn = (&mockClientBuilder{}).NewClient assert.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop())) r, err := f.CreateSpanWriter() require.Error(t, err) @@ -103,10 +108,11 @@ func TestElasticsearchTagsFileDoNotExist(t *testing.T) { func TestElasticsearchILMUsedWithoutReadWriteAliases(t *testing.T) { f := NewFactory() - mockConf := &mockClientBuilder{} - mockConf.UseILM = true - f.primaryConfig = mockConf - f.archiveConfig = mockConf + f.primaryConfig = &escfg.Configuration{ + UseILM: true, + } + f.archiveConfig = &escfg.Configuration{} + f.newClientFn = (&mockClientBuilder{}).NewClient assert.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop())) w, err := f.CreateSpanWriter() require.EqualError(t, err, "--es.use-ilm must always be used in conjunction with --es.use-aliases to ensure ES writers and readers refer to the single index mapping") @@ -176,8 +182,9 @@ func TestTagKeysAsFields(t *testing.T) { func TestCreateTemplateError(t *testing.T) { f := NewFactory() - f.primaryConfig = &mockClientBuilder{createTemplateError: errors.New("template-error"), Configuration: escfg.Configuration{Enabled: true, CreateIndexTemplates: true}} - f.archiveConfig = &mockClientBuilder{} + f.primaryConfig = &escfg.Configuration{CreateIndexTemplates: true} + f.archiveConfig = &escfg.Configuration{} + f.newClientFn = (&mockClientBuilder{createTemplateError: errors.New("template-error")}).NewClient err := f.Initialize(metrics.NullFactory, zap.NewNop()) require.NoError(t, err) w, err := f.CreateSpanWriter() @@ -187,8 +194,9 @@ func TestCreateTemplateError(t *testing.T) { func TestILMDisableTemplateCreation(t *testing.T) { f := NewFactory() - f.primaryConfig = &mockClientBuilder{createTemplateError: errors.New("template-error"), Configuration: escfg.Configuration{Enabled: true, UseILM: true, UseReadWriteAliases: true, CreateIndexTemplates: true}} - f.archiveConfig = &mockClientBuilder{} + f.primaryConfig = &escfg.Configuration{UseILM: true, UseReadWriteAliases: true, CreateIndexTemplates: true} + f.archiveConfig = &escfg.Configuration{} + f.newClientFn = (&mockClientBuilder{createTemplateError: errors.New("template-error")}).NewClient err := f.Initialize(metrics.NullFactory, zap.NewNop()) require.NoError(t, err) _, err = f.CreateSpanWriter() @@ -197,7 +205,8 @@ func TestILMDisableTemplateCreation(t *testing.T) { func TestArchiveDisabled(t *testing.T) { f := NewFactory() - f.archiveConfig = &mockClientBuilder{Configuration: escfg.Configuration{Enabled: false}} + f.archiveConfig = &escfg.Configuration{Enabled: false} + f.newClientFn = (&mockClientBuilder{}).NewClient w, err := f.CreateArchiveSpanWriter() assert.Nil(t, w) assert.Nil(t, err) @@ -208,8 +217,8 @@ func TestArchiveDisabled(t *testing.T) { func TestArchiveEnabled(t *testing.T) { f := NewFactory() - f.primaryConfig = &mockClientBuilder{} - f.archiveConfig = &mockClientBuilder{Configuration: escfg.Configuration{Enabled: true}} + f.archiveConfig = &escfg.Configuration{Enabled: true} + f.newClientFn = (&mockClientBuilder{}).NewClient err := f.Initialize(metrics.NullFactory, zap.NewNop()) require.NoError(t, err) w, err := f.CreateArchiveSpanWriter()