diff --git a/internal/config/config.go b/internal/config/config.go index 7640c509..b00d9672 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -12,16 +12,16 @@ import ( // Config global type Config struct { - URI string `json:"uri" yaml:"uri" env:"URI"` - Env string `json:"env" yaml:"env" env:"ENV"` // The environment (eg: prd, stg) - AppName string `json:"appName" yaml:"appName" env:"APPNAME"` // app name used for monitoring - Domain string `json:"domain" yaml:"domain" env:"DOMAIN"` - Readers Readers `json:"readers" yaml:"readers" env:"READERS"` - Writers Writers `json:"writers" yaml:"writers" env:"WRITERS"` - Storage Storage `json:"storage" yaml:"storage" env:"STORAGE"` - Tables Tables `json:"tables" yaml:"tables" env:"TABLES"` - Statsd *StatsD `json:"statsd,omitempty" yaml:"statsd" env:"STATSD"` - Computed []Computed `json:"computed" yaml:"computed" env:"COMPUTED"` + URI string `json:"uri" yaml:"uri" env:"URI"` + Env string `json:"env" yaml:"env" env:"ENV"` // The environment (eg: prd, stg) + AppName string `json:"appName" yaml:"appName" env:"APPNAME"` // app name used for monitoring + Domain string `json:"domain" yaml:"domain" env:"DOMAIN"` + Readers Readers `json:"readers" yaml:"readers" env:"READERS"` + Writers Writers `json:"writers" yaml:"writers" env:"WRITERS"` + Storage Storage `json:"storage" yaml:"storage" env:"STORAGE"` + Tables Tables `json:"tables" yaml:"tables" env:"TABLES"` + Statsd *StatsD `json:"statsd,omitempty" yaml:"statsd" env:"STATSD"` + Computed []Computed `json:"computed" yaml:"computed" env:"COMPUTED"` } // Tables is a list of table configs @@ -33,11 +33,11 @@ type Tables struct { // Timeseries is the config for the timeseries table type Timeseries struct { - Name string `json:"name" yaml:"name" env:"NAME"` // The name of the table - TTL int64 `json:"ttl,omitempty" yaml:"ttl" env:"TTL"` // The ttl (in seconds) for the storage, defaults to 1 hour. - HashBy string `json:"hashBy,omitempty" yaml:"hashBy" env:"HASHBY"` // The column to use as key (metric), defaults to 'event'. - SortBy string `json:"sortBy,omitempty" yaml:"sortBy" env:"SORTBY"` // The column to use as time, defaults to 'tsi'. - Schema string `json:"schema" yaml:"schema" env:"SCHEMA"` // The schema of the table + Name string `json:"name" yaml:"name" env:"NAME"` // The name of the table + TTL int64 `json:"ttl,omitempty" yaml:"ttl" env:"TTL"` // The ttl (in seconds) for the storage, defaults to 1 hour. + HashBy string `json:"hashBy,omitempty" yaml:"hashBy" env:"HASHBY"` // The column to use as key (metric), defaults to 'event'. + SortBy string `json:"sortBy,omitempty" yaml:"sortBy" env:"SORTBY"` // The column to use as time, defaults to 'tsi'. + Schema string `json:"schema" yaml:"schema" env:"SCHEMA"` // The schema of the table } // Log is the config for log table @@ -79,7 +79,7 @@ type S3SQS struct { Region string `json:"region" yaml:"region" env:"REGION"` Queue string `json:"queue" yaml:"queue" env:"QUEUE"` WaitTimeout int64 `json:"waitTimeout,omitempty" yaml:"waitTimeout" env:"WAITTIMEOUT"` // in seconds - VisibilityTimeout *int64 `json:"visibilityTimeout,omitempty" yaml:"visibilityTimeout" env:"VISIBILITYTIMEOUT"` // in seconds + VisibilityTimeout int64 `json:"visibilityTimeout,omitempty" yaml:"visibilityTimeout" env:"VISIBILITYTIMEOUT"` // in seconds Retries int `json:"retries" yaml:"retries" env:"RETRIES"` } diff --git a/internal/config/env/configurer.go b/internal/config/env/configurer.go index 15e05b2a..799f4527 100644 --- a/internal/config/env/configurer.go +++ b/internal/config/env/configurer.go @@ -11,6 +11,7 @@ import ( "strings" "github.com/kelindar/talaria/internal/config" + "gopkg.in/yaml.v2" ) var errConvert = errors.New("Unable to convert") @@ -21,6 +22,7 @@ type Configurer struct { key string } +// New creates a new configurer func New(key string) *Configurer { return &Configurer{ key: key, @@ -29,6 +31,10 @@ func New(key string) *Configurer { // Configure fetches the values of the env variable for file name and sets that in the config func (e *Configurer) Configure(c *config.Config) error { + if v, ok := os.LookupEnv(e.key); ok { + return yaml.Unmarshal([]byte(v), c) + } + populate(c, e.key) return nil } @@ -45,8 +51,14 @@ func populate(config interface{}, pre string) { reflectType := reflect.TypeOf(config).Elem() reflectValue := reflect.ValueOf(config).Elem() + println(pre, reflectType.String()) + for i := 0; i < reflectType.NumField(); i++ { field := reflectValue.Field(i) + name, tagged := reflectType.Field(i).Tag.Lookup("env") + if !tagged { + continue // Ignore untagged + } switch field.Kind() { case reflect.Interface, reflect.Struct: @@ -62,30 +74,27 @@ func populate(config interface{}, pre string) { // If pointer to interface or struct, the recursively populate the struct/interface case reflect.Interface, reflect.Struct: - v := field.Elem().Addr() - tag, ok := reflectType.Field(i).Tag.Lookup("env") - if ok { - populate(v.Interface(), pre+"_"+tag) - } + populate(field.Elem().Addr().Interface(), pre+"_"+name) + // If pointer to primitive types then directly fill the values case reflect.Int64, reflect.Int32, reflect.Int, reflect.Float32, reflect.Float64, reflect.Bool, reflect.String, reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64: - tag, ok := reflectType.Field(i).Tag.Lookup("env") + val, ok := os.LookupEnv(pre + "_" + name) if ok { - val, ok := os.LookupEnv(pre + "_" + tag) - if ok { - vlc, err := convert(field.Elem(), val) - if err == nil { - field.Elem().Set(reflect.ValueOf(vlc)) - } + vlc, err := convert(field.Elem(), val) + if err == nil { + field.Elem().Set(reflect.ValueOf(vlc)) } } } + } else if searchPrefix(pre + "_" + name) { + v := reflect.New(field.Type().Elem()) + populate(v.Interface(), pre+"_"+name) + field.Set(v) } case reflect.Array, reflect.Slice, reflect.Map, reflect.Chan, reflect.Func: - // not supported - return + return // not supported //For primitive types end the recursion and directly fill the values default: @@ -103,6 +112,16 @@ func populate(config interface{}, pre string) { } } +// searchPrefix searches the environment for a prefix +func searchPrefix(prefix string) bool { + for _, v := range os.Environ() { + if strings.HasPrefix(v, prefix) { + return true + } + } + return false +} + // convert a string to a particular type. // Returns error if the conversion is not possible func convert(key reflect.Value, value string) (interface{}, error) { diff --git a/internal/config/env/configurer_test.go b/internal/config/env/configurer_test.go index 59404303..5c3e4307 100644 --- a/internal/config/env/configurer_test.go +++ b/internal/config/env/configurer_test.go @@ -12,8 +12,7 @@ import ( "github.com/stretchr/testify/assert" ) -// BenchmarkEnv-12 100000 15758 ns/op 2896 B/op 98 allocs/op -func BenchmarkEnv(b *testing.B) { +func TestConfigure(t *testing.T) { c := &config.Config{} st := static.New() @@ -41,25 +40,75 @@ func BenchmarkEnv(b *testing.B) { // statsd os.Setenv("TALARIA_CONF_STATSD_HOST", "ab.com") - b.ReportAllocs() - for i := 0; i < b.N; i++ { - // populate the config with the env variable - e := New("TALARIA_CONF") - e.Configure(c) - - // asserts - assert.Equal(b, c.URI, "ab.com") - - assert.Equal(b, c.Readers.Presto.Port, int32(123)) - - assert.Equal(b, c.Writers.GRPC.Port, int32(100)) - assert.Equal(b, *c.Writers.S3SQS.VisibilityTimeout, int64(10)) + // populate the config with the env variable + e := New("TALARIA_CONF") + e.Configure(c) + + // asserts + assert.Equal(t, c.URI, "ab.com") + assert.Equal(t, c.Readers.Presto.Port, int32(123)) + assert.Equal(t, c.Writers.GRPC.Port, int32(100)) + assert.Equal(t, c.Writers.S3SQS.VisibilityTimeout, int64(10)) + assert.Equal(t, c.Storage.Directory, "dir") + assert.Equal(t, c.Tables.Timeseries.Name, "timeseries_eventlog") + assert.Equal(t, c.Tables.Timeseries.TTL, int64(10)) + assert.Equal(t, c.Statsd.Host, "ab.com") +} - assert.Equal(b, c.Storage.Directory, "dir") +func TestConfigure_Full(t *testing.T) { - assert.Equal(b, c.Tables.Timeseries.Name, "timeseries_eventlog") - assert.Equal(b, c.Tables.Timeseries.TTL, int64(10)) + c := &config.Config{} + st := static.New() + st.Configure(c) - assert.Equal(b, c.Statsd.Host, "ab.com") - } + // Write the full config + os.Setenv("TALARIA_CONF", `mode: staging +env: staging +domain: "ab.com" +readers: + presto: + schema: grab_x + port: 8042 +writers: + grpc: + port: 8085 +storage: + dir: "data/" + compact: + interval: 300 + file: + dir: "output/" +tables: + timeseries: + name: eventlog + ttl: 3600 + hashBy: event + sortBy: time + schema: | + event: string + time: int64 + data: json + log: + name: logs + nodes: + name: nodes +statsd: + host: "127.0.0.1" + port: 8126 +computed: + - name: "data" + type: json + func: | + local json = require("json") + function main(input) + return json.encode(input) + end +`) + + // populate the config with the env variable + e := New("TALARIA_CONF") + e.Configure(c) + + // asserts + assert.Equal(t, c.Storage.Compact.File.Directory, "output/") } diff --git a/internal/ingress/s3sqs/sqs/reader.go b/internal/ingress/s3sqs/sqs/reader.go index b062ec6e..71ba4946 100644 --- a/internal/ingress/s3sqs/sqs/reader.go +++ b/internal/ingress/s3sqs/sqs/reader.go @@ -36,8 +36,8 @@ func NewReader(c *config.S3SQS, region string) (*Reader, error) { consumer := sqs.New(sess) visibilityTimeout := defaultVisibilityTimeout - if c.VisibilityTimeout != nil { - visibilityTimeout = time.Second * time.Duration(*c.VisibilityTimeout) + if c.VisibilityTimeout > 0 { + visibilityTimeout = time.Second * time.Duration(c.VisibilityTimeout) } return &Reader{