Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed "Using Client Certificate for Elastic Search Authentication" #1139

Merged
merged 7 commits into from
Nov 9, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 76 additions & 8 deletions pkg/es/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ package config

import (
"context"
"crypto/tls"
"crypto/x509"
"io/ioutil"
"net/http"
"sync"
"time"

Expand All @@ -37,6 +41,7 @@ type Configuration struct {
MaxSpanAge time.Duration `yaml:"max_span_age"` // configures the maximum lookback on span reads
NumShards int64 `yaml:"shards"`
NumReplicas int64 `yaml:"replicas"`
Timeout time.Duration `validate:"min=500"`
BulkSize int
BulkWorkers int
BulkActions int
Expand All @@ -45,6 +50,15 @@ type Configuration struct {
TagsFilePath string
AllTagsAsFields bool
TagDotReplacement string
TLS TLSConfig
}

// TLSConfig describes the configuration properties to connect tls enabled ElasticSearch cluster
type TLSConfig struct {
Enabled bool
CertPath string
KeyPath string
CaPath string
}

// ClientBuilder creates new es.Client
Expand All @@ -64,7 +78,12 @@ func (c *Configuration) NewClient(logger *zap.Logger, metricsFactory metrics.Fac
if len(c.Servers) < 1 {
return nil, errors.New("No servers specified")
}
rawClient, err := elastic.NewClient(c.GetConfigs()...)
options, err := c.getConfigOptions()
if err != nil {
return nil, err
}

rawClient, err := elastic.NewClient(options...)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -187,11 +206,60 @@ func (c *Configuration) GetTagDotReplacement() string {
return c.TagDotReplacement
}

// GetConfigs wraps the configs to feed to the ElasticSearch client init
func (c *Configuration) GetConfigs() []elastic.ClientOptionFunc {
options := make([]elastic.ClientOptionFunc, 3)
options[0] = elastic.SetURL(c.Servers...)
options[1] = elastic.SetBasicAuth(c.Username, c.Password)
options[2] = elastic.SetSniff(c.Sniffer)
return options
// getConfigOptions wraps the configs to feed to the ElasticSearch client init
func (c *Configuration) getConfigOptions() ([]elastic.ClientOptionFunc, error) {
options := []elastic.ClientOptionFunc{elastic.SetURL(c.Servers...), elastic.SetSniff(c.Sniffer)}
httpClient := &http.Client{
Timeout: c.Timeout,
}
options = append(options, elastic.SetHttpClient(httpClient))
if c.TLS.Enabled {
ctlsConfig, err := c.TLS.createTLSConfig()
if err != nil {
return nil, err
}
httpClient.Transport = &http.Transport{
TLSClientConfig: ctlsConfig,
}
} else {
options = append(options, elastic.SetBasicAuth(c.Username, c.Password))
}
clyang82 marked this conversation as resolved.
Show resolved Hide resolved
return options, nil
}

// createTLSConfig creates TLS Configuration to connect with ES Cluster.
func (tlsConfig *TLSConfig) createTLSConfig() (*tls.Config, error) {
rootCerts, err := tlsConfig.loadCertificate()
if err != nil {
return nil, err
}
clientPrivateKey, err := tlsConfig.loadPrivateKey()
if err != nil {
return nil, err
}
return &tls.Config{
RootCAs: rootCerts,
Certificates: []tls.Certificate{*clientPrivateKey},
}, nil

}

// loadCertificate is used to load root certification
func (tlsConfig *TLSConfig) loadCertificate() (*x509.CertPool, error) {
caCert, err := ioutil.ReadFile(tlsConfig.CaPath)
if err != nil {
return nil, err
}
certificates := x509.NewCertPool()
certificates.AppendCertsFromPEM(caCert)
return certificates, nil
}

// loadPrivateKey is used to load the private certificate and key for TLS
func (tlsConfig *TLSConfig) loadPrivateKey() (*tls.Certificate, error) {
privateKey, err := tls.LoadX509KeyPair(tlsConfig.CertPath, tlsConfig.KeyPath)
if err != nil {
return nil, err
}
return &privateKey, nil
}
30 changes: 30 additions & 0 deletions plugin/storage/es/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ const (
suffixBulkWorkers = ".bulk.workers"
suffixBulkActions = ".bulk.actions"
suffixBulkFlushInterval = ".bulk.flush-interval"
suffixTimeout = ".timeout"
suffixTLS = ".tls"
suffixCert = ".tls.cert"
suffixKey = ".tls.key"
suffixCA = ".tls.ca"
suffixIndexPrefix = ".index-prefix"
suffixTagsAsFields = ".tags-as-fields"
suffixTagsAsFieldsAll = suffixTagsAsFields + ".all"
Expand Down Expand Up @@ -119,6 +124,10 @@ func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) {
nsConfig.namespace+suffixServerURLs,
nsConfig.servers,
"The comma-separated list of ElasticSearch servers, must be full url i.e. http://localhost:9200")
flagSet.Duration(
nsConfig.namespace+suffixTimeout,
nsConfig.Timeout,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see where a default value for this is set. I assume it's not becoming a required flag, correct? Maybe expand in the help string what happens if not set.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yurishkuro Yes. It is not a required flag. the default value is 0. A Timeout of zero means no timeout. so how about change the help string from Timeout used for queries to Timeout used for queries. A Timeout of zero means no timeout.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new PR for this fix - #1171

"Timeout used for queries")
flagSet.Duration(
nsConfig.namespace+suffixMaxSpanAge,
nsConfig.MaxSpanAge,
Expand Down Expand Up @@ -147,6 +156,22 @@ func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) {
nsConfig.namespace+suffixBulkFlushInterval,
nsConfig.BulkFlushInterval,
"A time.Duration after which bulk requests are committed, regardless of other tresholds. Set to zero to disable. By default, this is disabled.")
flagSet.Bool(
nsConfig.namespace+suffixTLS,
nsConfig.TLS.Enabled,
"Enable TLS")
flagSet.String(
nsConfig.namespace+suffixCert,
nsConfig.TLS.CertPath,
"Path to TLS certificate file")
flagSet.String(
nsConfig.namespace+suffixKey,
nsConfig.TLS.KeyPath,
"Path to TLS key file")
flagSet.String(
nsConfig.namespace+suffixCA,
nsConfig.TLS.CaPath,
"Path to TLS CA file")
flagSet.String(
nsConfig.namespace+suffixIndexPrefix,
nsConfig.IndexPrefix,
Expand Down Expand Up @@ -185,6 +210,11 @@ func initFromViper(cfg *namespaceConfig, v *viper.Viper) {
cfg.BulkWorkers = v.GetInt(cfg.namespace + suffixBulkWorkers)
cfg.BulkActions = v.GetInt(cfg.namespace + suffixBulkActions)
cfg.BulkFlushInterval = v.GetDuration(cfg.namespace + suffixBulkFlushInterval)
cfg.Timeout = v.GetDuration(cfg.namespace + suffixTimeout)
cfg.TLS.Enabled = v.GetBool(cfg.namespace + suffixTLS)
cfg.TLS.CertPath = v.GetString(cfg.namespace + suffixCert)
cfg.TLS.KeyPath = v.GetString(cfg.namespace + suffixKey)
cfg.TLS.CaPath = v.GetString(cfg.namespace + suffixCA)
cfg.IndexPrefix = v.GetString(cfg.namespace + suffixIndexPrefix)
cfg.AllTagsAsFields = v.GetBool(cfg.namespace + suffixTagsAsFieldsAll)
cfg.TagsFilePath = v.GetString(cfg.namespace + suffixTagsFile)
Expand Down