From 17865378ea0249c742714fb173d1cc9768c49061 Mon Sep 17 00:00:00 2001 From: Chun Lin Yang Date: Thu, 25 Oct 2018 14:23:45 +0800 Subject: [PATCH 1/6] Fixed https://github.com/jaegertracing/jaeger/issues/678 Signed-off-by: Chun Lin Yang --- pkg/es/config/config.go | 95 ++++++++++++++++++++++++++++++++++-- plugin/storage/es/options.go | 30 ++++++++++++ 2 files changed, 122 insertions(+), 3 deletions(-) diff --git a/pkg/es/config/config.go b/pkg/es/config/config.go index 6122c1a7181..afda15e02de 100644 --- a/pkg/es/config/config.go +++ b/pkg/es/config/config.go @@ -16,13 +16,19 @@ package config import ( "context" + "crypto/tls" + "crypto/x509" + "fmt" + "io/ioutil" + "log" + "net/http" "sync" "time" "github.com/pkg/errors" "github.com/uber/jaeger-lib/metrics" "go.uber.org/zap" - "gopkg.in/olivere/elastic.v5" + elastic "gopkg.in/olivere/elastic.v5" "github.com/jaegertracing/jaeger/pkg/es" storageMetrics "github.com/jaegertracing/jaeger/storage/spanstore/metrics" @@ -37,6 +43,7 @@ type Configuration struct { MaxSpanAge time.Duration `yaml:"max_span_age"` // configures the maximum lookback on span reads NumShards int64 `yaml:"shards"` NumReplicas int64 `yaml:"replicas"` + Timeout time.Duration `validate:"min=500"` BulkSize int BulkWorkers int BulkActions int @@ -45,6 +52,15 @@ type Configuration struct { TagsFilePath string AllTagsAsFields bool TagDotReplacement string + TLS TLS +} + +// TLS Config +type TLS struct { + Enabled bool + CertPath string + KeyPath string + CaPath string } // ClientBuilder creates new es.Client @@ -64,7 +80,7 @@ func (c *Configuration) NewClient(logger *zap.Logger, metricsFactory metrics.Fac if len(c.Servers) < 1 { return nil, errors.New("No servers specified") } - rawClient, err := elastic.NewClient(c.GetConfigs()...) + rawClient, err := elastic.NewClient(c.GetConfigs(logger)...) if err != nil { return nil, err } @@ -188,10 +204,83 @@ func (c *Configuration) GetTagDotReplacement() string { } // GetConfigs wraps the configs to feed to the ElasticSearch client init -func (c *Configuration) GetConfigs() []elastic.ClientOptionFunc { +func (c *Configuration) GetConfigs(logger *zap.Logger) []elastic.ClientOptionFunc { + + if c.TLS.Enabled { + tlsConfig, err := c.CreateTLSConfig() + if err != nil { + return nil + } + httpClient := &http.Client{ + Timeout: c.Timeout, + Transport: &http.Transport{ + TLSClientConfig: tlsConfig, + }, + } + + resp, err := httpClient.Get("https://elasticsearch:9200") + if err != nil { + fmt.Println(err) + } + + htmlData, err := ioutil.ReadAll(resp.Body) + if err != nil { + fmt.Println(err) + } + defer resp.Body.Close() + fmt.Printf("%v\n", resp.Status) + fmt.Printf(string(htmlData)) + + options := make([]elastic.ClientOptionFunc, 4) + options[0] = elastic.SetHttpClient(httpClient) + options[1] = elastic.SetURL(c.Servers...) + options[2] = elastic.SetSniff(c.Sniffer) + options[3] = elastic.SetScheme("https") + logger.Info("tlsConfig", zap.Any("tlsConfig", tlsConfig)) + return options + } options := make([]elastic.ClientOptionFunc, 3) options[0] = elastic.SetURL(c.Servers...) options[1] = elastic.SetBasicAuth(c.Username, c.Password) options[2] = elastic.SetSniff(c.Sniffer) return options } + +// CreateTLSConfig creates TLS Configuration to connect with ES Cluster. +func (c *Configuration) CreateTLSConfig() (*tls.Config, error) { + rootCerts, err := c.LoadCertificatesFrom() + if err != nil { + log.Fatalf("Couldn't load root certificate from %s. Got %s.", c.TLS.CaPath, err) + } + if len(c.TLS.CertPath) > 0 && len(c.TLS.KeyPath) > 0 { + clientPrivateKey, err := c.LoadPrivateKeyFrom() + if err != nil { + log.Fatalf("Couldn't setup client authentication. Got %s.", err) + } + return &tls.Config{ + RootCAs: rootCerts, + Certificates: []tls.Certificate{*clientPrivateKey}, + }, err + } + return nil, err +} + +// LoadCertificatesFrom is used to load root certification +func (c *Configuration) LoadCertificatesFrom() (*x509.CertPool, error) { + caCert, err := ioutil.ReadFile(c.TLS.CaPath) + if err != nil { + return nil, err + } + certificates := x509.NewCertPool() + certificates.AppendCertsFromPEM(caCert) + return certificates, nil +} + +// LoadPrivateKeyFrom is used to load the private certificate and key for TLS +func (c *Configuration) LoadPrivateKeyFrom() (*tls.Certificate, error) { + privateKey, err := tls.LoadX509KeyPair(c.TLS.CertPath, c.TLS.KeyPath) + if err != nil { + return nil, err + } + return &privateKey, nil +} diff --git a/plugin/storage/es/options.go b/plugin/storage/es/options.go index 0e650499e7a..81be05f6588 100644 --- a/plugin/storage/es/options.go +++ b/plugin/storage/es/options.go @@ -36,6 +36,11 @@ const ( suffixBulkWorkers = ".bulk.workers" suffixBulkActions = ".bulk.actions" suffixBulkFlushInterval = ".bulk.flush-interval" + suffixTimeout = ".timeout" + suffixTLS = ".tls" + suffixCert = ".tls.cert" + suffixKey = ".tls.key" + suffixCA = ".tls.ca" suffixIndexPrefix = ".index-prefix" suffixTagsAsFields = ".tags-as-fields" suffixTagsAsFieldsAll = suffixTagsAsFields + ".all" @@ -119,6 +124,10 @@ func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) { nsConfig.namespace+suffixServerURLs, nsConfig.servers, "The comma-separated list of ElasticSearch servers, must be full url i.e. http://localhost:9200") + flagSet.Duration( + nsConfig.namespace+suffixTimeout, + nsConfig.Timeout, + "Timeout used for queries") flagSet.Duration( nsConfig.namespace+suffixMaxSpanAge, nsConfig.MaxSpanAge, @@ -147,6 +156,22 @@ func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) { nsConfig.namespace+suffixBulkFlushInterval, nsConfig.BulkFlushInterval, "A time.Duration after which bulk requests are committed, regardless of other tresholds. Set to zero to disable. By default, this is disabled.") + flagSet.Bool( + nsConfig.namespace+suffixTLS, + nsConfig.TLS.Enabled, + "Enable TLS") + flagSet.String( + nsConfig.namespace+suffixCert, + nsConfig.TLS.CertPath, + "Path to TLS certificate file") + flagSet.String( + nsConfig.namespace+suffixKey, + nsConfig.TLS.KeyPath, + "Path to TLS key file") + flagSet.String( + nsConfig.namespace+suffixCA, + nsConfig.TLS.CaPath, + "Path to TLS CA file") flagSet.String( nsConfig.namespace+suffixIndexPrefix, nsConfig.IndexPrefix, @@ -185,6 +210,11 @@ func initFromViper(cfg *namespaceConfig, v *viper.Viper) { cfg.BulkWorkers = v.GetInt(cfg.namespace + suffixBulkWorkers) cfg.BulkActions = v.GetInt(cfg.namespace + suffixBulkActions) cfg.BulkFlushInterval = v.GetDuration(cfg.namespace + suffixBulkFlushInterval) + cfg.Timeout = v.GetDuration(cfg.namespace + suffixTimeout) + cfg.TLS.Enabled = v.GetBool(cfg.namespace + suffixTLS) + cfg.TLS.CertPath = v.GetString(cfg.namespace + suffixCert) + cfg.TLS.KeyPath = v.GetString(cfg.namespace + suffixKey) + cfg.TLS.CaPath = v.GetString(cfg.namespace + suffixCA) cfg.IndexPrefix = v.GetString(cfg.namespace + suffixIndexPrefix) cfg.AllTagsAsFields = v.GetBool(cfg.namespace + suffixTagsAsFieldsAll) cfg.TagsFilePath = v.GetString(cfg.namespace + suffixTagsFile) From cb7fc0ce5de349ae94b6ae3065cdfd052bc35cd1 Mon Sep 17 00:00:00 2001 From: Chun Lin Yang Date: Thu, 25 Oct 2018 14:39:06 +0800 Subject: [PATCH 2/6] Fixed https://github.com/jaegertracing/jaeger/issues/678 Signed-off-by: Chun Lin Yang --- pkg/es/config/config.go | 32 ++++++-------------------------- 1 file changed, 6 insertions(+), 26 deletions(-) diff --git a/pkg/es/config/config.go b/pkg/es/config/config.go index afda15e02de..774c4b97f18 100644 --- a/pkg/es/config/config.go +++ b/pkg/es/config/config.go @@ -18,9 +18,7 @@ import ( "context" "crypto/tls" "crypto/x509" - "fmt" "io/ioutil" - "log" "net/http" "sync" "time" @@ -28,7 +26,7 @@ import ( "github.com/pkg/errors" "github.com/uber/jaeger-lib/metrics" "go.uber.org/zap" - elastic "gopkg.in/olivere/elastic.v5" + "gopkg.in/olivere/elastic.v5" "github.com/jaegertracing/jaeger/pkg/es" storageMetrics "github.com/jaegertracing/jaeger/storage/spanstore/metrics" @@ -205,9 +203,9 @@ func (c *Configuration) GetTagDotReplacement() string { // GetConfigs wraps the configs to feed to the ElasticSearch client init func (c *Configuration) GetConfigs(logger *zap.Logger) []elastic.ClientOptionFunc { - + options := make([]elastic.ClientOptionFunc, 3) if c.TLS.Enabled { - tlsConfig, err := c.CreateTLSConfig() + tlsConfig, err := c.CreateTLSConfig(logger) if err != nil { return nil } @@ -217,29 +215,11 @@ func (c *Configuration) GetConfigs(logger *zap.Logger) []elastic.ClientOptionFun TLSClientConfig: tlsConfig, }, } - - resp, err := httpClient.Get("https://elasticsearch:9200") - if err != nil { - fmt.Println(err) - } - - htmlData, err := ioutil.ReadAll(resp.Body) - if err != nil { - fmt.Println(err) - } - defer resp.Body.Close() - fmt.Printf("%v\n", resp.Status) - fmt.Printf(string(htmlData)) - - options := make([]elastic.ClientOptionFunc, 4) options[0] = elastic.SetHttpClient(httpClient) options[1] = elastic.SetURL(c.Servers...) options[2] = elastic.SetSniff(c.Sniffer) - options[3] = elastic.SetScheme("https") - logger.Info("tlsConfig", zap.Any("tlsConfig", tlsConfig)) return options } - options := make([]elastic.ClientOptionFunc, 3) options[0] = elastic.SetURL(c.Servers...) options[1] = elastic.SetBasicAuth(c.Username, c.Password) options[2] = elastic.SetSniff(c.Sniffer) @@ -247,15 +227,15 @@ func (c *Configuration) GetConfigs(logger *zap.Logger) []elastic.ClientOptionFun } // CreateTLSConfig creates TLS Configuration to connect with ES Cluster. -func (c *Configuration) CreateTLSConfig() (*tls.Config, error) { +func (c *Configuration) CreateTLSConfig(logger *zap.Logger) (*tls.Config, error) { rootCerts, err := c.LoadCertificatesFrom() if err != nil { - log.Fatalf("Couldn't load root certificate from %s. Got %s.", c.TLS.CaPath, err) + logger.Fatal("Couldn't load root certificate", zap.Error(err)) } if len(c.TLS.CertPath) > 0 && len(c.TLS.KeyPath) > 0 { clientPrivateKey, err := c.LoadPrivateKeyFrom() if err != nil { - log.Fatalf("Couldn't setup client authentication. Got %s.", err) + logger.Fatal("Couldn't setup client authentication", zap.Error(err)) } return &tls.Config{ RootCAs: rootCerts, From cb489f0ebe890b10b2e6c752412d2201176afcef Mon Sep 17 00:00:00 2001 From: Chun Lin Yang Date: Fri, 26 Oct 2018 11:23:19 +0800 Subject: [PATCH 3/6] address review comments Signed-off-by: Chun Lin Yang --- pkg/es/config/config.go | 45 +++++++++++++++++++---------------------- 1 file changed, 21 insertions(+), 24 deletions(-) diff --git a/pkg/es/config/config.go b/pkg/es/config/config.go index 774c4b97f18..c713cd33aa5 100644 --- a/pkg/es/config/config.go +++ b/pkg/es/config/config.go @@ -50,11 +50,11 @@ type Configuration struct { TagsFilePath string AllTagsAsFields bool TagDotReplacement string - TLS TLS + TLS TLSConfig } -// TLS Config -type TLS struct { +// TLSConfig describes the configuration properties to connect tls enabled ElasticSearch cluster +type TLSConfig struct { Enabled bool CertPath string KeyPath string @@ -203,37 +203,34 @@ func (c *Configuration) GetTagDotReplacement() string { // GetConfigs wraps the configs to feed to the ElasticSearch client init func (c *Configuration) GetConfigs(logger *zap.Logger) []elastic.ClientOptionFunc { - options := make([]elastic.ClientOptionFunc, 3) + options := make([]elastic.ClientOptionFunc, 0) + options = append(options, elastic.SetURL(c.Servers...), elastic.SetSniff(c.Sniffer)) if c.TLS.Enabled { - tlsConfig, err := c.CreateTLSConfig(logger) + ctlsConfig, err := c.TLS.createTLSConfig(logger) if err != nil { return nil } httpClient := &http.Client{ Timeout: c.Timeout, Transport: &http.Transport{ - TLSClientConfig: tlsConfig, + TLSClientConfig: ctlsConfig, }, } - options[0] = elastic.SetHttpClient(httpClient) - options[1] = elastic.SetURL(c.Servers...) - options[2] = elastic.SetSniff(c.Sniffer) - return options + options = append(options, elastic.SetHttpClient(httpClient)) + } else { + options = append(options, elastic.SetBasicAuth(c.Username, c.Password)) } - options[0] = elastic.SetURL(c.Servers...) - options[1] = elastic.SetBasicAuth(c.Username, c.Password) - options[2] = elastic.SetSniff(c.Sniffer) return options } -// CreateTLSConfig creates TLS Configuration to connect with ES Cluster. -func (c *Configuration) CreateTLSConfig(logger *zap.Logger) (*tls.Config, error) { - rootCerts, err := c.LoadCertificatesFrom() +// createTLSConfig creates TLS Configuration to connect with ES Cluster. +func (tlsConfig *TLSConfig) createTLSConfig(logger *zap.Logger) (*tls.Config, error) { + rootCerts, err := tlsConfig.loadCertificate() if err != nil { logger.Fatal("Couldn't load root certificate", zap.Error(err)) } - if len(c.TLS.CertPath) > 0 && len(c.TLS.KeyPath) > 0 { - clientPrivateKey, err := c.LoadPrivateKeyFrom() + if len(tlsConfig.CertPath) > 0 && len(tlsConfig.KeyPath) > 0 { + clientPrivateKey, err := tlsConfig.loadPrivateKeyFrom() if err != nil { logger.Fatal("Couldn't setup client authentication", zap.Error(err)) } @@ -245,9 +242,9 @@ func (c *Configuration) CreateTLSConfig(logger *zap.Logger) (*tls.Config, error) return nil, err } -// LoadCertificatesFrom is used to load root certification -func (c *Configuration) LoadCertificatesFrom() (*x509.CertPool, error) { - caCert, err := ioutil.ReadFile(c.TLS.CaPath) +// loadCertificate is used to load root certification +func (tlsConfig *TLSConfig) loadCertificate() (*x509.CertPool, error) { + caCert, err := ioutil.ReadFile(tlsConfig.CaPath) if err != nil { return nil, err } @@ -256,9 +253,9 @@ func (c *Configuration) LoadCertificatesFrom() (*x509.CertPool, error) { return certificates, nil } -// LoadPrivateKeyFrom is used to load the private certificate and key for TLS -func (c *Configuration) LoadPrivateKeyFrom() (*tls.Certificate, error) { - privateKey, err := tls.LoadX509KeyPair(c.TLS.CertPath, c.TLS.KeyPath) +// loadPrivateKeyFrom is used to load the private certificate and key for TLS +func (tlsConfig *TLSConfig) loadPrivateKeyFrom() (*tls.Certificate, error) { + privateKey, err := tls.LoadX509KeyPair(tlsConfig.CertPath, tlsConfig.KeyPath) if err != nil { return nil, err } From e2993158196d146f93c3846cff614e0f3bfa4c95 Mon Sep 17 00:00:00 2001 From: Chun Lin Yang Date: Wed, 31 Oct 2018 14:25:46 +0800 Subject: [PATCH 4/6] address review comments Signed-off-by: Chun Lin Yang --- pkg/es/config/config.go | 27 +++++++++++++-------------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/pkg/es/config/config.go b/pkg/es/config/config.go index c713cd33aa5..0fb5388c466 100644 --- a/pkg/es/config/config.go +++ b/pkg/es/config/config.go @@ -203,8 +203,7 @@ func (c *Configuration) GetTagDotReplacement() string { // GetConfigs wraps the configs to feed to the ElasticSearch client init func (c *Configuration) GetConfigs(logger *zap.Logger) []elastic.ClientOptionFunc { - options := make([]elastic.ClientOptionFunc, 0) - options = append(options, elastic.SetURL(c.Servers...), elastic.SetSniff(c.Sniffer)) + options := []elastic.ClientOptionFunc{elastic.SetURL(c.Servers...), elastic.SetSniff(c.Sniffer)} if c.TLS.Enabled { ctlsConfig, err := c.TLS.createTLSConfig(logger) if err != nil { @@ -228,18 +227,18 @@ func (tlsConfig *TLSConfig) createTLSConfig(logger *zap.Logger) (*tls.Config, er rootCerts, err := tlsConfig.loadCertificate() if err != nil { logger.Fatal("Couldn't load root certificate", zap.Error(err)) + return nil, err } - if len(tlsConfig.CertPath) > 0 && len(tlsConfig.KeyPath) > 0 { - clientPrivateKey, err := tlsConfig.loadPrivateKeyFrom() - if err != nil { - logger.Fatal("Couldn't setup client authentication", zap.Error(err)) - } - return &tls.Config{ - RootCAs: rootCerts, - Certificates: []tls.Certificate{*clientPrivateKey}, - }, err + clientPrivateKey, err := tlsConfig.loadPrivateKey() + if err != nil { + logger.Fatal("Couldn't setup client authentication", zap.Error(err)) + return nil, err } - return nil, err + return &tls.Config{ + RootCAs: rootCerts, + Certificates: []tls.Certificate{*clientPrivateKey}, + }, err + } // loadCertificate is used to load root certification @@ -253,8 +252,8 @@ func (tlsConfig *TLSConfig) loadCertificate() (*x509.CertPool, error) { return certificates, nil } -// loadPrivateKeyFrom is used to load the private certificate and key for TLS -func (tlsConfig *TLSConfig) loadPrivateKeyFrom() (*tls.Certificate, error) { +// loadPrivateKey is used to load the private certificate and key for TLS +func (tlsConfig *TLSConfig) loadPrivateKey() (*tls.Certificate, error) { privateKey, err := tls.LoadX509KeyPair(tlsConfig.CertPath, tlsConfig.KeyPath) if err != nil { return nil, err From b91524176d9cab11358e5ed823d331f76b4cfc0d Mon Sep 17 00:00:00 2001 From: Chun Lin Yang Date: Fri, 2 Nov 2018 14:07:50 +0800 Subject: [PATCH 5/6] support timeout for non-tls-enabled case Signed-off-by: Chun Lin Yang --- pkg/es/config/config.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/pkg/es/config/config.go b/pkg/es/config/config.go index 0fb5388c466..33fe8caac5e 100644 --- a/pkg/es/config/config.go +++ b/pkg/es/config/config.go @@ -204,18 +204,18 @@ func (c *Configuration) GetTagDotReplacement() string { // GetConfigs wraps the configs to feed to the ElasticSearch client init func (c *Configuration) GetConfigs(logger *zap.Logger) []elastic.ClientOptionFunc { options := []elastic.ClientOptionFunc{elastic.SetURL(c.Servers...), elastic.SetSniff(c.Sniffer)} + httpClient := &http.Client{ + Timeout: c.Timeout, + } + options = append(options, elastic.SetHttpClient(httpClient)) if c.TLS.Enabled { ctlsConfig, err := c.TLS.createTLSConfig(logger) if err != nil { return nil } - httpClient := &http.Client{ - Timeout: c.Timeout, - Transport: &http.Transport{ - TLSClientConfig: ctlsConfig, - }, + httpClient.Transport = &http.Transport{ + TLSClientConfig: ctlsConfig, } - options = append(options, elastic.SetHttpClient(httpClient)) } else { options = append(options, elastic.SetBasicAuth(c.Username, c.Password)) } From 9b0050eb9c49918ad8018b70f707c1160bfdfdb2 Mon Sep 17 00:00:00 2001 From: Chun Lin Yang Date: Mon, 5 Nov 2018 12:43:04 +0800 Subject: [PATCH 6/6] change method name Signed-off-by: Chun Lin Yang --- pkg/es/config/config.go | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/pkg/es/config/config.go b/pkg/es/config/config.go index 33fe8caac5e..5ca627a7cfa 100644 --- a/pkg/es/config/config.go +++ b/pkg/es/config/config.go @@ -78,7 +78,12 @@ func (c *Configuration) NewClient(logger *zap.Logger, metricsFactory metrics.Fac if len(c.Servers) < 1 { return nil, errors.New("No servers specified") } - rawClient, err := elastic.NewClient(c.GetConfigs(logger)...) + options, err := c.getConfigOptions() + if err != nil { + return nil, err + } + + rawClient, err := elastic.NewClient(options...) if err != nil { return nil, err } @@ -201,17 +206,17 @@ func (c *Configuration) GetTagDotReplacement() string { return c.TagDotReplacement } -// GetConfigs wraps the configs to feed to the ElasticSearch client init -func (c *Configuration) GetConfigs(logger *zap.Logger) []elastic.ClientOptionFunc { +// getConfigOptions wraps the configs to feed to the ElasticSearch client init +func (c *Configuration) getConfigOptions() ([]elastic.ClientOptionFunc, error) { options := []elastic.ClientOptionFunc{elastic.SetURL(c.Servers...), elastic.SetSniff(c.Sniffer)} httpClient := &http.Client{ Timeout: c.Timeout, } options = append(options, elastic.SetHttpClient(httpClient)) if c.TLS.Enabled { - ctlsConfig, err := c.TLS.createTLSConfig(logger) + ctlsConfig, err := c.TLS.createTLSConfig() if err != nil { - return nil + return nil, err } httpClient.Transport = &http.Transport{ TLSClientConfig: ctlsConfig, @@ -219,25 +224,23 @@ func (c *Configuration) GetConfigs(logger *zap.Logger) []elastic.ClientOptionFun } else { options = append(options, elastic.SetBasicAuth(c.Username, c.Password)) } - return options + return options, nil } // createTLSConfig creates TLS Configuration to connect with ES Cluster. -func (tlsConfig *TLSConfig) createTLSConfig(logger *zap.Logger) (*tls.Config, error) { +func (tlsConfig *TLSConfig) createTLSConfig() (*tls.Config, error) { rootCerts, err := tlsConfig.loadCertificate() if err != nil { - logger.Fatal("Couldn't load root certificate", zap.Error(err)) return nil, err } clientPrivateKey, err := tlsConfig.loadPrivateKey() if err != nil { - logger.Fatal("Couldn't setup client authentication", zap.Error(err)) return nil, err } return &tls.Config{ RootCAs: rootCerts, Certificates: []tls.Certificate{*clientPrivateKey}, - }, err + }, nil }