Skip to content

Commit

Permalink
Allow to use TLS certificates for ElasticSearch Authentication" (#1139)
Browse files Browse the repository at this point in the history
* Fixed #678

Signed-off-by: Chun Lin Yang <[email protected]>

* Fixed #678

Signed-off-by: Chun Lin Yang <[email protected]>

* address review comments

Signed-off-by: Chun Lin Yang <[email protected]>

* address review comments

Signed-off-by: Chun Lin Yang <[email protected]>

* support timeout for non-tls-enabled case

Signed-off-by: Chun Lin Yang <[email protected]>

* change method name

Signed-off-by: Chun Lin Yang <[email protected]>
  • Loading branch information
clyang82 authored and yurishkuro committed Nov 9, 2018
1 parent 6b22e43 commit de8a9ad
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 8 deletions.
84 changes: 76 additions & 8 deletions pkg/es/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ package config

import (
"context"
"crypto/tls"
"crypto/x509"
"io/ioutil"
"net/http"
"sync"
"time"

Expand All @@ -37,6 +41,7 @@ type Configuration struct {
MaxSpanAge time.Duration `yaml:"max_span_age"` // configures the maximum lookback on span reads
NumShards int64 `yaml:"shards"`
NumReplicas int64 `yaml:"replicas"`
Timeout time.Duration `validate:"min=500"`
BulkSize int
BulkWorkers int
BulkActions int
Expand All @@ -45,6 +50,15 @@ type Configuration struct {
TagsFilePath string
AllTagsAsFields bool
TagDotReplacement string
TLS TLSConfig
}

// TLSConfig describes the configuration properties to connect tls enabled ElasticSearch cluster
type TLSConfig struct {
Enabled bool
CertPath string
KeyPath string
CaPath string
}

// ClientBuilder creates new es.Client
Expand All @@ -64,7 +78,12 @@ func (c *Configuration) NewClient(logger *zap.Logger, metricsFactory metrics.Fac
if len(c.Servers) < 1 {
return nil, errors.New("No servers specified")
}
rawClient, err := elastic.NewClient(c.GetConfigs()...)
options, err := c.getConfigOptions()
if err != nil {
return nil, err
}

rawClient, err := elastic.NewClient(options...)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -187,11 +206,60 @@ func (c *Configuration) GetTagDotReplacement() string {
return c.TagDotReplacement
}

// GetConfigs wraps the configs to feed to the ElasticSearch client init
func (c *Configuration) GetConfigs() []elastic.ClientOptionFunc {
options := make([]elastic.ClientOptionFunc, 3)
options[0] = elastic.SetURL(c.Servers...)
options[1] = elastic.SetBasicAuth(c.Username, c.Password)
options[2] = elastic.SetSniff(c.Sniffer)
return options
// getConfigOptions wraps the configs to feed to the ElasticSearch client init
func (c *Configuration) getConfigOptions() ([]elastic.ClientOptionFunc, error) {
options := []elastic.ClientOptionFunc{elastic.SetURL(c.Servers...), elastic.SetSniff(c.Sniffer)}
httpClient := &http.Client{
Timeout: c.Timeout,
}
options = append(options, elastic.SetHttpClient(httpClient))
if c.TLS.Enabled {
ctlsConfig, err := c.TLS.createTLSConfig()
if err != nil {
return nil, err
}
httpClient.Transport = &http.Transport{
TLSClientConfig: ctlsConfig,
}
} else {
options = append(options, elastic.SetBasicAuth(c.Username, c.Password))
}
return options, nil
}

// createTLSConfig creates TLS Configuration to connect with ES Cluster.
func (tlsConfig *TLSConfig) createTLSConfig() (*tls.Config, error) {
rootCerts, err := tlsConfig.loadCertificate()
if err != nil {
return nil, err
}
clientPrivateKey, err := tlsConfig.loadPrivateKey()
if err != nil {
return nil, err
}
return &tls.Config{
RootCAs: rootCerts,
Certificates: []tls.Certificate{*clientPrivateKey},
}, nil

}

// loadCertificate is used to load root certification
func (tlsConfig *TLSConfig) loadCertificate() (*x509.CertPool, error) {
caCert, err := ioutil.ReadFile(tlsConfig.CaPath)
if err != nil {
return nil, err
}
certificates := x509.NewCertPool()
certificates.AppendCertsFromPEM(caCert)
return certificates, nil
}

// loadPrivateKey is used to load the private certificate and key for TLS
func (tlsConfig *TLSConfig) loadPrivateKey() (*tls.Certificate, error) {
privateKey, err := tls.LoadX509KeyPair(tlsConfig.CertPath, tlsConfig.KeyPath)
if err != nil {
return nil, err
}
return &privateKey, nil
}
30 changes: 30 additions & 0 deletions plugin/storage/es/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ const (
suffixBulkWorkers = ".bulk.workers"
suffixBulkActions = ".bulk.actions"
suffixBulkFlushInterval = ".bulk.flush-interval"
suffixTimeout = ".timeout"
suffixTLS = ".tls"
suffixCert = ".tls.cert"
suffixKey = ".tls.key"
suffixCA = ".tls.ca"
suffixIndexPrefix = ".index-prefix"
suffixTagsAsFields = ".tags-as-fields"
suffixTagsAsFieldsAll = suffixTagsAsFields + ".all"
Expand Down Expand Up @@ -119,6 +124,10 @@ func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) {
nsConfig.namespace+suffixServerURLs,
nsConfig.servers,
"The comma-separated list of ElasticSearch servers, must be full url i.e. http://localhost:9200")
flagSet.Duration(
nsConfig.namespace+suffixTimeout,
nsConfig.Timeout,
"Timeout used for queries")
flagSet.Duration(
nsConfig.namespace+suffixMaxSpanAge,
nsConfig.MaxSpanAge,
Expand Down Expand Up @@ -147,6 +156,22 @@ func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) {
nsConfig.namespace+suffixBulkFlushInterval,
nsConfig.BulkFlushInterval,
"A time.Duration after which bulk requests are committed, regardless of other tresholds. Set to zero to disable. By default, this is disabled.")
flagSet.Bool(
nsConfig.namespace+suffixTLS,
nsConfig.TLS.Enabled,
"Enable TLS")
flagSet.String(
nsConfig.namespace+suffixCert,
nsConfig.TLS.CertPath,
"Path to TLS certificate file")
flagSet.String(
nsConfig.namespace+suffixKey,
nsConfig.TLS.KeyPath,
"Path to TLS key file")
flagSet.String(
nsConfig.namespace+suffixCA,
nsConfig.TLS.CaPath,
"Path to TLS CA file")
flagSet.String(
nsConfig.namespace+suffixIndexPrefix,
nsConfig.IndexPrefix,
Expand Down Expand Up @@ -185,6 +210,11 @@ func initFromViper(cfg *namespaceConfig, v *viper.Viper) {
cfg.BulkWorkers = v.GetInt(cfg.namespace + suffixBulkWorkers)
cfg.BulkActions = v.GetInt(cfg.namespace + suffixBulkActions)
cfg.BulkFlushInterval = v.GetDuration(cfg.namespace + suffixBulkFlushInterval)
cfg.Timeout = v.GetDuration(cfg.namespace + suffixTimeout)
cfg.TLS.Enabled = v.GetBool(cfg.namespace + suffixTLS)
cfg.TLS.CertPath = v.GetString(cfg.namespace + suffixCert)
cfg.TLS.KeyPath = v.GetString(cfg.namespace + suffixKey)
cfg.TLS.CaPath = v.GetString(cfg.namespace + suffixCA)
cfg.IndexPrefix = v.GetString(cfg.namespace + suffixIndexPrefix)
cfg.AllTagsAsFields = v.GetBool(cfg.namespace + suffixTagsAsFieldsAll)
cfg.TagsFilePath = v.GetString(cfg.namespace + suffixTagsFile)
Expand Down

0 comments on commit de8a9ad

Please sign in to comment.