Skip to content

Commit

Permalink
Simplify ES config and factory (#4396)
Browse files Browse the repository at this point in the history
The `ClientBuilder` interface was unnecessary, use Configuration struct
directly

---------

Signed-off-by: Yuri Shkuro <[email protected]>
  • Loading branch information
yurishkuro authored Apr 19, 2023
1 parent 6926438 commit 82121e9
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 191 deletions.
131 changes: 1 addition & 130 deletions pkg/es/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,36 +92,8 @@ type TagsAsFields struct {
Include string `mapstructure:"include"`
}

// ClientBuilder creates new es.Client
type ClientBuilder interface {
NewClient(logger *zap.Logger, metricsFactory metrics.Factory) (es.Client, error)
GetRemoteReadClusters() []string
GetNumShards() int64
GetNumReplicas() int64
GetMaxSpanAge() time.Duration
GetMaxDocCount() int
GetIndexPrefix() string
GetIndexDateLayoutSpans() string
GetIndexDateLayoutServices() string
GetIndexDateLayoutDependencies() string
GetIndexRolloverFrequencySpansDuration() time.Duration
GetIndexRolloverFrequencyServicesDuration() time.Duration
GetTagsFilePath() string
GetAllTagsAsFields() bool
GetTagDotReplacement() string
GetUseReadWriteAliases() bool
GetTokenFilePath() string
IsStorageEnabled() bool
IsCreateIndexTemplates() bool
GetVersion() uint
TagKeysAsFields() ([]string, error)
GetUseILM() bool
GetLogLevel() string
GetSendGetBodyAs() string
}

// NewClient creates a new ElasticSearch client
func (c *Configuration) NewClient(logger *zap.Logger, metricsFactory metrics.Factory) (es.Client, error) {
func NewClient(c *Configuration, logger *zap.Logger, metricsFactory metrics.Factory) (es.Client, error) {
if len(c.Servers) < 1 {
return nil, errors.New("no servers specified")
}
Expand Down Expand Up @@ -275,51 +247,6 @@ func (c *Configuration) ApplyDefaults(source *Configuration) {
}
}

// GetRemoteReadClusters returns list of remote read clusters
func (c *Configuration) GetRemoteReadClusters() []string {
return c.RemoteReadClusters
}

// GetNumShards returns number of shards from Configuration
func (c *Configuration) GetNumShards() int64 {
return c.NumShards
}

// GetNumReplicas returns number of replicas from Configuration
func (c *Configuration) GetNumReplicas() int64 {
return c.NumReplicas
}

// GetMaxSpanAge returns max span age from Configuration
func (c *Configuration) GetMaxSpanAge() time.Duration {
return c.MaxSpanAge
}

// GetMaxDocCount returns the maximum number of documents that a query should return
func (c *Configuration) GetMaxDocCount() int {
return c.MaxDocCount
}

// GetIndexPrefix returns index prefix
func (c *Configuration) GetIndexPrefix() string {
return c.IndexPrefix
}

// GetIndexDateLayoutSpans returns jaeger-span index date layout
func (c *Configuration) GetIndexDateLayoutSpans() string {
return c.IndexDateLayoutSpans
}

// GetIndexDateLayoutServices returns jaeger-service index date layout
func (c *Configuration) GetIndexDateLayoutServices() string {
return c.IndexDateLayoutServices
}

// GetIndexDateLayoutDependencies returns jaeger-dependencies index date layout
func (c *Configuration) GetIndexDateLayoutDependencies() string {
return c.IndexDateLayoutDependencies
}

// GetIndexRolloverFrequencySpansDuration returns jaeger-span index rollover frequency duration
func (c *Configuration) GetIndexRolloverFrequencySpansDuration() time.Duration {
if c.IndexRolloverFrequencySpans == "hour" {
Expand All @@ -336,62 +263,6 @@ func (c *Configuration) GetIndexRolloverFrequencyServicesDuration() time.Duratio
return -24 * time.Hour
}

// GetTagsFilePath returns a path to file containing tag keys
func (c *Configuration) GetTagsFilePath() string {
return c.Tags.File
}

// GetAllTagsAsFields returns true if all tags should be stored as object fields
func (c *Configuration) GetAllTagsAsFields() bool {
return c.Tags.AllAsFields
}

// GetVersion returns Elasticsearch version
func (c *Configuration) GetVersion() uint {
return c.Version
}

// GetTagDotReplacement returns character is used to replace dots in tag keys, when
// the tag is stored as object field.
func (c *Configuration) GetTagDotReplacement() string {
return c.Tags.DotReplacement
}

// GetUseReadWriteAliases indicates whether read alias should be used
func (c *Configuration) GetUseReadWriteAliases() bool {
return c.UseReadWriteAliases
}

// GetUseILM indicates whether ILM should be used
func (c *Configuration) GetUseILM() bool {
return c.UseILM
}

// GetLogLevel returns the log-level the ES client should log at.
func (c *Configuration) GetLogLevel() string {
return c.LogLevel
}

// GetSendGetBodyAs returns the SendGetBodyAs the ES client should use.
func (c *Configuration) GetSendGetBodyAs() string {
return c.SendGetBodyAs
}

// GetTokenFilePath returns file path containing the bearer token
func (c *Configuration) GetTokenFilePath() string {
return c.TokenFilePath
}

// IsStorageEnabled determines whether storage is enabled
func (c *Configuration) IsStorageEnabled() bool {
return c.Enabled
}

// IsCreateIndexTemplates determines whether index templates should be created or not
func (c *Configuration) IsCreateIndexTemplates() bool {
return c.CreateIndexTemplates
}

// TagKeysAsFields returns tags from the file and command line merged
func (c *Configuration) TagKeysAsFields() ([]string, error) {
var tags []string
Expand Down
79 changes: 41 additions & 38 deletions plugin/storage/es/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,19 @@ type Factory struct {
metricsFactory metrics.Factory
logger *zap.Logger

primaryConfig config.ClientBuilder
newClientFn func(c *config.Configuration, logger *zap.Logger, metricsFactory metrics.Factory) (es.Client, error)

primaryConfig *config.Configuration
primaryClient es.Client
archiveConfig config.ClientBuilder
archiveConfig *config.Configuration
archiveClient es.Client
}

// NewFactory creates a new Factory.
func NewFactory() *Factory {
return &Factory{
Options: NewOptions(primaryNamespace, archiveNamespace),
Options: NewOptions(primaryNamespace, archiveNamespace),
newClientFn: config.NewClient,
}
}

Expand Down Expand Up @@ -89,13 +92,13 @@ func (f *Factory) InitFromOptions(o Options) {
func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) error {
f.metricsFactory, f.logger = metricsFactory, logger

primaryClient, err := f.primaryConfig.NewClient(logger, metricsFactory)
primaryClient, err := f.newClientFn(f.primaryConfig, logger, metricsFactory)
if err != nil {
return fmt.Errorf("failed to create primary Elasticsearch client: %w", err)
}
f.primaryClient = primaryClient
if f.archiveConfig.IsStorageEnabled() {
f.archiveClient, err = f.archiveConfig.NewClient(logger, metricsFactory)
if f.archiveConfig.Enabled {
f.archiveClient, err = f.newClientFn(f.archiveConfig, logger, metricsFactory)
if err != nil {
return fmt.Errorf("failed to create archive Elasticsearch client: %w", err)
}
Expand All @@ -120,15 +123,15 @@ func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) {

// CreateArchiveSpanReader implements storage.ArchiveFactory
func (f *Factory) CreateArchiveSpanReader() (spanstore.Reader, error) {
if !f.archiveConfig.IsStorageEnabled() {
if !f.archiveConfig.Enabled {
return nil, nil
}
return createSpanReader(f.metricsFactory, f.logger, f.archiveClient, f.archiveConfig, true)
}

// CreateArchiveSpanWriter implements storage.ArchiveFactory
func (f *Factory) CreateArchiveSpanWriter() (spanstore.Writer, error) {
if !f.archiveConfig.IsStorageEnabled() {
if !f.archiveConfig.Enabled {
return nil, nil
}
return createSpanWriter(f.metricsFactory, f.logger, f.archiveClient, f.archiveConfig, true)
Expand All @@ -138,40 +141,40 @@ func createSpanReader(
mFactory metrics.Factory,
logger *zap.Logger,
client es.Client,
cfg config.ClientBuilder,
cfg *config.Configuration,
archive bool,
) (spanstore.Reader, error) {
if cfg.GetUseILM() && !cfg.GetUseReadWriteAliases() {
if cfg.UseILM && !cfg.UseReadWriteAliases {
return nil, fmt.Errorf("--es.use-ilm must always be used in conjunction with --es.use-aliases to ensure ES writers and readers refer to the single index mapping")
}
return esSpanStore.NewSpanReader(esSpanStore.SpanReaderParams{
Client: client,
Logger: logger,
MetricsFactory: mFactory,
MaxDocCount: cfg.GetMaxDocCount(),
MaxSpanAge: cfg.GetMaxSpanAge(),
IndexPrefix: cfg.GetIndexPrefix(),
SpanIndexDateLayout: cfg.GetIndexDateLayoutSpans(),
ServiceIndexDateLayout: cfg.GetIndexDateLayoutServices(),
MaxDocCount: cfg.MaxDocCount,
MaxSpanAge: cfg.MaxSpanAge,
IndexPrefix: cfg.IndexPrefix,
SpanIndexDateLayout: cfg.IndexDateLayoutSpans,
ServiceIndexDateLayout: cfg.IndexDateLayoutServices,
SpanIndexRolloverFrequency: cfg.GetIndexRolloverFrequencySpansDuration(),
ServiceIndexRolloverFrequency: cfg.GetIndexRolloverFrequencyServicesDuration(),
TagDotReplacement: cfg.GetTagDotReplacement(),
UseReadWriteAliases: cfg.GetUseReadWriteAliases(),
TagDotReplacement: cfg.Tags.DotReplacement,
UseReadWriteAliases: cfg.UseReadWriteAliases,
Archive: archive,
RemoteReadClusters: cfg.GetRemoteReadClusters(),
RemoteReadClusters: cfg.RemoteReadClusters,
}), nil
}

func createSpanWriter(
mFactory metrics.Factory,
logger *zap.Logger,
client es.Client,
cfg config.ClientBuilder,
cfg *config.Configuration,
archive bool,
) (spanstore.Writer, error) {
var tags []string
var err error
if cfg.GetUseILM() && !cfg.GetUseReadWriteAliases() {
if cfg.UseILM && !cfg.UseReadWriteAliases {
return nil, fmt.Errorf("--es.use-ilm must always be used in conjunction with --es.use-aliases to ensure ES writers and readers refer to the single index mapping")
}
if tags, err = cfg.TagKeysAsFields(); err != nil {
Expand All @@ -181,11 +184,11 @@ func createSpanWriter(

mappingBuilder := mappings.MappingBuilder{
TemplateBuilder: es.TextTemplateBuilder{},
Shards: cfg.GetNumShards(),
Replicas: cfg.GetNumReplicas(),
EsVersion: cfg.GetVersion(),
IndexPrefix: cfg.GetIndexPrefix(),
UseILM: cfg.GetUseILM(),
Shards: cfg.NumShards,
Replicas: cfg.NumReplicas,
EsVersion: cfg.Version,
IndexPrefix: cfg.IndexPrefix,
UseILM: cfg.UseILM,
}

spanMapping, serviceMapping, err := mappingBuilder.GetSpanServiceMappings()
Expand All @@ -196,19 +199,19 @@ func createSpanWriter(
Client: client,
Logger: logger,
MetricsFactory: mFactory,
IndexPrefix: cfg.GetIndexPrefix(),
SpanIndexDateLayout: cfg.GetIndexDateLayoutSpans(),
ServiceIndexDateLayout: cfg.GetIndexDateLayoutServices(),
AllTagsAsFields: cfg.GetAllTagsAsFields(),
IndexPrefix: cfg.IndexPrefix,
SpanIndexDateLayout: cfg.IndexDateLayoutSpans,
ServiceIndexDateLayout: cfg.IndexDateLayoutServices,
AllTagsAsFields: cfg.Tags.AllAsFields,
TagKeysAsFields: tags,
TagDotReplacement: cfg.GetTagDotReplacement(),
TagDotReplacement: cfg.Tags.DotReplacement,
Archive: archive,
UseReadWriteAliases: cfg.GetUseReadWriteAliases(),
UseReadWriteAliases: cfg.UseReadWriteAliases,
})

// Creating a template here would conflict with the one created for ILM resulting to no index rollover
if cfg.IsCreateIndexTemplates() && !cfg.GetUseILM() {
err := writer.CreateTemplates(spanMapping, serviceMapping, cfg.GetIndexPrefix())
if cfg.CreateIndexTemplates && !cfg.UseILM {
err := writer.CreateTemplates(spanMapping, serviceMapping, cfg.IndexPrefix)
if err != nil {
return nil, err
}
Expand All @@ -219,15 +222,15 @@ func createSpanWriter(
func createDependencyReader(
logger *zap.Logger,
client es.Client,
cfg config.ClientBuilder,
cfg *config.Configuration,
) (dependencystore.Reader, error) {
reader := esDepStore.NewDependencyStore(esDepStore.DependencyStoreParams{
Client: client,
Logger: logger,
IndexPrefix: cfg.GetIndexPrefix(),
IndexDateLayout: cfg.GetIndexDateLayoutDependencies(),
MaxDocCount: cfg.GetMaxDocCount(),
UseReadWriteAliases: cfg.GetUseReadWriteAliases(),
IndexPrefix: cfg.IndexPrefix,
IndexDateLayout: cfg.IndexDateLayoutDependencies,
MaxDocCount: cfg.MaxDocCount,
UseReadWriteAliases: cfg.UseReadWriteAliases,
})
return reader, nil
}
Expand Down
Loading

0 comments on commit 82121e9

Please sign in to comment.