Skip to content

Commit

Permalink
Enable IAM based auth to ES for AWS clients (jaegertracing#465)
Browse files Browse the repository at this point in the history
Allows clients to specify jaeger to use AWS IAM auth
for their connection to ElasticSearch.
We allow this to enable an additional layer
of security for those clients running on AWS.

More details on connecting to ElasticSearch
on AWS here: https://docs.aws.amazon.com/elasticsearch-service/latest/developerguide/es-vpc.html

AWS configuration and credentials are expected
to be available at either ~/.aws/credentials
or ~/.aws/config. More details on configuration
here: https://docs.aws.amazon.com/sdk-for-go/api/aws/session/#Session

Signed-off-by: Wesley Kim <[email protected]>
  • Loading branch information
Wesley Kim committed Aug 31, 2018
1 parent ddf9b22 commit 4499321
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 10 deletions.
20 changes: 18 additions & 2 deletions cmd/ingester/app/consumer/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,10 @@ func TestConstructor(t *testing.T) {

// partitionConsumerWrapper wraps a Sarama partition consumer into a Sarama cluster partition consumer
type partitionConsumerWrapper struct {
topic string
partition int32
topic string
partition int32
initialOffset int64
offset int64

sarama.PartitionConsumer
}
Expand All @@ -68,11 +70,24 @@ func (s partitionConsumerWrapper) Topic() string {
return s.topic
}

func (s partitionConsumerWrapper) InitialOffset() int64 {
return s.offset
}

func (s partitionConsumerWrapper) MarkOffset(offset int64, metadata string) {
s.offset = offset
}

func (s partitionConsumerWrapper) ResetOffset(offset int64, metadata string) {
s.offset = s.initialOffset
}

func newSaramaClusterConsumer(saramaPartitionConsumer sarama.PartitionConsumer) *kmocks.Consumer {
pcha := make(chan cluster.PartitionConsumer, 1)
pcha <- &partitionConsumerWrapper{
topic: topic,
partition: partition,
offset: 0,
PartitionConsumer: saramaPartitionConsumer,
}
saramaClusterConsumer := &kmocks.Consumer{}
Expand Down Expand Up @@ -141,6 +156,7 @@ func TestSaramaConsumerWrapper_start_Messages(t *testing.T) {
partitionConsumer: &partitionConsumerWrapper{
topic: topic,
partition: partition,
offset: 0,
PartitionConsumer: &kmocks.PartitionConsumer{},
},
},
Expand Down
29 changes: 25 additions & 4 deletions glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

40 changes: 37 additions & 3 deletions pkg/es/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ import (
"sync"
"time"

"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/aws/signer/v4"
"github.com/pkg/errors"
"github.com/sha1sum/aws_signing_client"
"github.com/uber/jaeger-lib/metrics"
"go.uber.org/zap"
"gopkg.in/olivere/elastic.v5"
Expand All @@ -42,6 +45,12 @@ type Configuration struct {
BulkActions int
BulkFlushInterval time.Duration
IndexPrefix string
AwsIamConfig AwsIamConfiguration
}

// AwsIamConfiguration describes the AWS-specific configuration needed to connect to ElasticSearch with IAM authn
type AwsIamConfiguration struct {
Enabled bool
}

// ClientBuilder creates new es.Client
Expand All @@ -58,7 +67,13 @@ func (c *Configuration) NewClient(logger *zap.Logger, metricsFactory metrics.Fac
if len(c.Servers) < 1 {
return nil, errors.New("No servers specified")
}
rawClient, err := elastic.NewClient(c.GetConfigs()...)

clientOptionFuncs, err := c.GetConfigs()
if err != nil {
return nil, err
}

rawClient, err := elastic.NewClient(clientOptionFuncs...)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -143,6 +158,9 @@ func (c *Configuration) ApplyDefaults(source *Configuration) {
if c.BulkFlushInterval == 0 {
c.BulkFlushInterval = source.BulkFlushInterval
}
if !c.AwsIamConfig.Enabled {
c.AwsIamConfig.Enabled = source.AwsIamConfig.Enabled
}
}

// GetNumShards returns number of shards from Configuration
Expand All @@ -166,10 +184,26 @@ func (c *Configuration) GetIndexPrefix() string {
}

// GetConfigs wraps the configs to feed to the ElasticSearch client init
func (c *Configuration) GetConfigs() []elastic.ClientOptionFunc {
func (c *Configuration) GetConfigs() ([]elastic.ClientOptionFunc, error) {
options := make([]elastic.ClientOptionFunc, 3)
options[0] = elastic.SetURL(c.Servers...)
options[1] = elastic.SetBasicAuth(c.Username, c.Password)
options[2] = elastic.SetSniff(c.Sniffer)
return options

// if AWS IAM is enabled, instantiate the elastic client with a signing client created via the AWS SDK
if c.AwsIamConfig.Enabled {
sess, err := session.NewSession()
if err != nil {
return nil, err
}

signer := v4.NewSigner(sess.Config.Credentials)
awsSigningClient, err := aws_signing_client.New(signer, nil, "es", *sess.Config.Region)
if err != nil {
return nil, err
}
options = append(options, elastic.SetHttpClient(awsSigningClient))
}

return options, nil
}
9 changes: 9 additions & 0 deletions plugin/storage/es/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ const (
suffixBulkActions = ".bulk.actions"
suffixBulkFlushInterval = ".bulk.flush-interval"
suffixIndexPrefix = ".index-prefix"
suffixAwsIamEnabled = ".aws.iam_enabled"
)

// TODO this should be moved next to config.Configuration struct (maybe ./flags package)
Expand Down Expand Up @@ -75,6 +76,9 @@ func NewOptions(primaryNamespace string, otherNamespaces ...string) *Options {
BulkWorkers: 1,
BulkActions: 1000,
BulkFlushInterval: time.Millisecond * 200,
AwsIamConfig: config.AwsIamConfiguration{
Enabled: false,
},
},
servers: "http://127.0.0.1:9200",
namespace: primaryNamespace,
Expand Down Expand Up @@ -146,6 +150,10 @@ func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) {
nsConfig.namespace+suffixIndexPrefix,
nsConfig.IndexPrefix,
"Optional prefix of Jaeger indices. For example \"production\" creates \"production:jaeger-*\".")
flagSet.Bool(
nsConfig.namespace+suffixAwsIamEnabled,
nsConfig.AwsIamConfig.Enabled,
"Whether to connect to AWS ElasticSearch with IAM authentication. Requires the proper .aws/ files to be setup to connect.")
}

// InitFromViper initializes Options with properties from viper
Expand All @@ -169,6 +177,7 @@ func initFromViper(cfg *namespaceConfig, v *viper.Viper) {
cfg.BulkActions = v.GetInt(cfg.namespace + suffixBulkActions)
cfg.BulkFlushInterval = v.GetDuration(cfg.namespace + suffixBulkFlushInterval)
cfg.IndexPrefix = v.GetString(cfg.namespace + suffixIndexPrefix)
cfg.AwsIamConfig.Enabled = v.GetBool(cfg.namespace + suffixAwsIamEnabled)
}

// GetPrimary returns primary configuration.
Expand Down
5 changes: 4 additions & 1 deletion plugin/storage/es/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,13 @@ func TestOptions(t *testing.T) {
assert.Equal(t, int64(1), primary.NumReplicas)
assert.Equal(t, 72*time.Hour, primary.MaxSpanAge)
assert.False(t, primary.Sniffer)
assert.Equal(t, false, primary.AwsIamConfig.Enabled)

aux := opts.Get("archive")
assert.Equal(t, primary.Username, aux.Username)
assert.Equal(t, primary.Password, aux.Password)
assert.Equal(t, primary.Servers, aux.Servers)
assert.Equal(t, primary.AwsIamConfig, aux.AwsIamConfig)
}

func TestOptionsWithFlags(t *testing.T) {
Expand All @@ -50,7 +52,7 @@ func TestOptionsWithFlags(t *testing.T) {
"--es.sniffer=true",
"--es.max-span-age=48h",
"--es.num-shards=20",
"--es.num-replicas=10",
"--es.aws.iam_enabled=true",
// a couple overrides
"--es.aux.server-urls=3.3.3.3,4.4.4.4",
"--es.aux.max-span-age=24h",
Expand All @@ -62,6 +64,7 @@ func TestOptionsWithFlags(t *testing.T) {
assert.Equal(t, "hello", primary.Username)
assert.Equal(t, []string{"1.1.1.1", "2.2.2.2"}, primary.Servers)
assert.Equal(t, 48*time.Hour, primary.MaxSpanAge)
assert.Equal(t, true, primary.AwsIamConfig.Enabled)
assert.True(t, primary.Sniffer)

aux := opts.Get("es.aux")
Expand Down

0 comments on commit 4499321

Please sign in to comment.