From 90a68f5fb7c72f8e3de4d1a6faab8f5e6dd256a4 Mon Sep 17 00:00:00 2001 From: "Nguyen Viet Hoang (Stephen)" Date: Thu, 17 Oct 2024 18:59:33 +0900 Subject: [PATCH 1/8] feat: support for multiple exporters --- cmd/relayproxy/config/config.go | 11 ++ cmd/relayproxy/config/config_test.go | 102 ++++++++++++ cmd/relayproxy/service/gofeatureflag.go | 19 ++- .../valid-yaml-exporter-and-exporters.yaml | 21 +++ .../config/valid-yaml-multiple-exporters.yaml | 18 +++ config.go | 18 +++ config_test.go | 102 ++++++++++++ .../data_export_log_and_file/flags.goff.yaml | 22 +++ examples/data_export_log_and_file/main.go | 96 ++++++++++++ feature_flag.go | 43 +++-- variation.go | 7 +- variation_test.go | 148 ++++++++++-------- website/docs/go_module/configuration.md | 1 + .../docs/relay_proxy/configure_relay_proxy.md | 1 + 14 files changed, 527 insertions(+), 82 deletions(-) create mode 100644 cmd/relayproxy/testdata/config/valid-yaml-exporter-and-exporters.yaml create mode 100644 cmd/relayproxy/testdata/config/valid-yaml-multiple-exporters.yaml create mode 100644 examples/data_export_log_and_file/flags.goff.yaml create mode 100644 examples/data_export_log_and_file/main.go diff --git a/cmd/relayproxy/config/config.go b/cmd/relayproxy/config/config.go index 33ab485fcdc..5480816d823 100644 --- a/cmd/relayproxy/config/config.go +++ b/cmd/relayproxy/config/config.go @@ -199,6 +199,9 @@ type Config struct { // Exporter is the configuration on how to export data Exporter *ExporterConf `mapstructure:"exporter" koanf:"exporter"` + // Exporters is the exact same things than Exporter but allows to give more than 1 exporter at the time. + Exporters *[]ExporterConf `mapstructure:"exporters" koanf:"exporters"` + // Notifiers is the configuration on where to notify a flag change Notifiers []NotifierConf `mapstructure:"notifier" koanf:"notifier"` @@ -331,6 +334,14 @@ func (c *Config) IsValid() error { } } + if c.Exporters != nil { + for _, exporter := range *c.Exporters { + if err := exporter.IsValid(); err != nil { + return err + } + } + } + if c.Notifiers != nil { for _, notif := range c.Notifiers { if err := notif.IsValid(); err != nil { diff --git a/cmd/relayproxy/config/config_test.go b/cmd/relayproxy/config/config_test.go index f829b647e16..55b2f378746 100644 --- a/cmd/relayproxy/config/config_test.go +++ b/cmd/relayproxy/config/config_test.go @@ -89,6 +89,86 @@ func TestParseConfig_fileFromPflag(t *testing.T) { }, wantErr: assert.NoError, }, + { + name: "Valid yaml file with multiple exporters", + fileLocation: "../testdata/config/valid-yaml-multiple-exporters.yaml", + want: &config.Config{ + ListenPort: 1031, + PollingInterval: 1000, + FileFormat: "yaml", + Host: "localhost", + Retriever: &config.RetrieverConf{ + Kind: "http", + URL: "https://raw.githubusercontent.com/thomaspoignant/go-feature-flag/main/examples/retriever_file/flags.goff.yaml", + }, + Exporters: &[]config.ExporterConf{ + { + Kind: "log", + }, + { + Kind: "file", + OutputDir: "./", + }, + }, + StartWithRetrieverError: false, + RestAPITimeout: 5000, + Version: "1.X.X", + EnableSwagger: true, + AuthorizedKeys: config.APIKeys{ + Admin: []string{ + "apikey3", + }, + Evaluation: []string{ + "apikey1", + "apikey2", + }, + }, + LogLevel: "info", + }, + wantErr: assert.NoError, + }, + { + name: "Valid yaml file with both exporter and exporters", + fileLocation: "../testdata/config/valid-yaml-exporter-and-exporters.yaml", + want: &config.Config{ + ListenPort: 1031, + PollingInterval: 1000, + FileFormat: "yaml", + Host: "localhost", + Retriever: &config.RetrieverConf{ + Kind: "http", + URL: "https://raw.githubusercontent.com/thomaspoignant/go-feature-flag/main/examples/retriever_file/flags.goff.yaml", + }, + Exporter: &config.ExporterConf{ + Kind: "log", + }, + Exporters: &[]config.ExporterConf{ + { + Kind: "webhook", + EndpointURL: "https://example.com/webhook", + }, + { + Kind: "file", + OutputDir: "./", + }, + }, + StartWithRetrieverError: false, + RestAPITimeout: 5000, + Version: "1.X.X", + EnableSwagger: true, + AuthorizedKeys: config.APIKeys{ + Admin: []string{ + "apikey3", + }, + Evaluation: []string{ + "apikey1", + "apikey2", + }, + }, + LogLevel: "info", + }, + wantErr: assert.NoError, + }, { name: "Valid json file", fileLocation: "../testdata/config/valid-file.json", @@ -323,6 +403,7 @@ func TestConfig_IsValid(t *testing.T) { Retriever *config.RetrieverConf Retrievers *[]config.RetrieverConf Exporter *config.ExporterConf + Exporters *[]config.ExporterConf Notifiers []config.NotifierConf LogLevel string Debug bool @@ -454,6 +535,26 @@ func TestConfig_IsValid(t *testing.T) { }, wantErr: assert.Error, }, + { + name: "invalid exporter in the list of exporters", + fields: fields{ + ListenPort: 8080, + Retriever: &config.RetrieverConf{ + Kind: "file", + Path: "../testdata/config/valid-file.yaml", + }, + Exporters: &[]config.ExporterConf{ + { + Kind: "webhook", + EndpointURL: "https://example.com/webhook", + }, + { + Kind: "file", + }, + }, + }, + wantErr: assert.Error, + }, { name: "invalid notifier", fields: fields{ @@ -508,6 +609,7 @@ func TestConfig_IsValid(t *testing.T) { StartWithRetrieverError: tt.fields.StartWithRetrieverError, Retriever: tt.fields.Retriever, Exporter: tt.fields.Exporter, + Exporters: tt.fields.Exporters, Notifiers: tt.fields.Notifiers, Retrievers: tt.fields.Retrievers, LogLevel: tt.fields.LogLevel, diff --git a/cmd/relayproxy/service/gofeatureflag.go b/cmd/relayproxy/service/gofeatureflag.go index ca0d4dd6cac..4d21218e380 100644 --- a/cmd/relayproxy/service/gofeatureflag.go +++ b/cmd/relayproxy/service/gofeatureflag.go @@ -67,14 +67,26 @@ func NewGoFeatureFlagClient( } } - var exp ffclient.DataExporter + var mainDataExporter ffclient.DataExporter if proxyConf.Exporter != nil { - exp, err = initDataExporter(proxyConf.Exporter) + mainDataExporter, err = initDataExporter(proxyConf.Exporter) if err != nil { return nil, err } } + // Manage the case where we have multiple data exporters + dataExporters := make([]ffclient.DataExporter, 0) + if proxyConf.Exporters != nil { + for _, e := range *proxyConf.Exporters { + currentExporter, err := initDataExporter(&e) + if err != nil { + return nil, err + } + dataExporters = append(dataExporters, currentExporter) + } + } + notif, err := initNotifier(proxyConf.Notifiers) if err != nil { return nil, err @@ -89,7 +101,8 @@ func NewGoFeatureFlagClient( Retrievers: retrievers, Notifiers: notif, FileFormat: proxyConf.FileFormat, - DataExporter: exp, + DataExporter: mainDataExporter, + DataExporters: dataExporters, StartWithRetrieverError: proxyConf.StartWithRetrieverError, EnablePollingJitter: proxyConf.EnablePollingJitter, EvaluationContextEnrichment: proxyConf.EvaluationContextEnrichment, diff --git a/cmd/relayproxy/testdata/config/valid-yaml-exporter-and-exporters.yaml b/cmd/relayproxy/testdata/config/valid-yaml-exporter-and-exporters.yaml new file mode 100644 index 00000000000..7c3658bb82a --- /dev/null +++ b/cmd/relayproxy/testdata/config/valid-yaml-exporter-and-exporters.yaml @@ -0,0 +1,21 @@ +listen: 1031 +pollingInterval: 1000 +startWithRetrieverError: false +retriever: + kind: http + url: https://raw.githubusercontent.com/thomaspoignant/go-feature-flag/main/examples/retriever_file/flags.goff.yaml +exporter: + kind: log +exporters: + - kind: webhook + endpointURL: https://example.com/webhook + - kind: file + outputDir: ./ +enableSwagger: true +authorizedKeys: + evaluation: + - apikey1 # owner: userID1 + - apikey2 # owner: userID2 + admin: + - apikey3 +loglevel: info diff --git a/cmd/relayproxy/testdata/config/valid-yaml-multiple-exporters.yaml b/cmd/relayproxy/testdata/config/valid-yaml-multiple-exporters.yaml new file mode 100644 index 00000000000..084da54fa8f --- /dev/null +++ b/cmd/relayproxy/testdata/config/valid-yaml-multiple-exporters.yaml @@ -0,0 +1,18 @@ +listen: 1031 +pollingInterval: 1000 +startWithRetrieverError: false +retriever: + kind: http + url: https://raw.githubusercontent.com/thomaspoignant/go-feature-flag/main/examples/retriever_file/flags.goff.yaml +exporters: + - kind: log + - kind: file + outputDir: ./ +enableSwagger: true +authorizedKeys: + evaluation: + - apikey1 # owner: userID1 + - apikey2 # owner: userID2 + admin: + - apikey3 +loglevel: info diff --git a/config.go b/config.go index 145f077a885..d947e3b517e 100644 --- a/config.go +++ b/config.go @@ -66,6 +66,10 @@ type Config struct { // DataExporter (optional) is the configuration where we store how we should output the flags variations results DataExporter DataExporter + // DataExporters (optional) is the list of configurations where we store how we should output the flags variations results + // Multiple exporters can be used to send the data to multiple destinations in parallel without interfering with each other. + DataExporters []DataExporter + // StartWithRetrieverError (optional) If true, the SDK will start even if we did not get any flags from the retriever. // It will serve only default values until all the retrievers returns the flags. // The init method will not return any error if the flag file is unreachable. @@ -118,6 +122,20 @@ func (c *Config) GetRetrievers() ([]retriever.Retriever, error) { return retrievers, nil } +// GetDataExporters returns the list of DataExporter configured. +func (c *Config) GetDataExporters() []DataExporter { + dataExporters := make([]DataExporter, 0) + // If we have both DataExporter and DataExporters fields configured, we are first looking at what is available + // in DataExporter before looking at what is in DataExporters. + if c.DataExporter != (DataExporter{}) { + dataExporters = append(dataExporters, c.DataExporter) + } + if len(c.DataExporters) > 0 { + dataExporters = append(dataExporters, c.DataExporters...) + } + return dataExporters +} + // SetOffline set GO Feature Flag in offline mode. func (c *Config) SetOffline(control bool) { if c.offlineMutex == nil { diff --git a/config_test.go b/config_test.go index af2b40538c1..019070bf76b 100644 --- a/config_test.go +++ b/config_test.go @@ -114,6 +114,108 @@ func TestConfig_GetRetrievers(t *testing.T) { } } +func TestConfig_GetDataExporters(t *testing.T) { + type fields struct { + DataExporter ffClient.DataExporter + DataExporters []ffClient.DataExporter + } + tests := []struct { + name string + fields fields + want []ffClient.DataExporter + }{ + { + name: "No data exporter", + fields: fields{}, + want: []ffClient.DataExporter{}, + }, + { + name: "Single data exporter", + fields: fields{ + DataExporter: ffClient.DataExporter{ + FlushInterval: 10 * time.Second, + MaxEventInMemory: 100, + }, + }, + want: []ffClient.DataExporter{ + { + FlushInterval: 10 * time.Second, + MaxEventInMemory: 100, + }, + }, + }, + { + name: "Multiple data exporters", + fields: fields{ + DataExporters: []ffClient.DataExporter{ + { + FlushInterval: 20 * time.Second, + MaxEventInMemory: 200, + }, + { + FlushInterval: 30 * time.Second, + MaxEventInMemory: 300, + }, + }, + }, + want: []ffClient.DataExporter{ + { + FlushInterval: 20 * time.Second, + MaxEventInMemory: 200, + }, + { + FlushInterval: 30 * time.Second, + MaxEventInMemory: 300, + }, + }, + }, + { + name: "Both single and multiple data exporters", + fields: fields{ + DataExporter: ffClient.DataExporter{ + FlushInterval: 10 * time.Second, + MaxEventInMemory: 100, + }, + DataExporters: []ffClient.DataExporter{ + { + FlushInterval: 20 * time.Second, + MaxEventInMemory: 200, + }, + { + FlushInterval: 30 * time.Second, + MaxEventInMemory: 300, + }, + }, + }, + want: []ffClient.DataExporter{ + { + FlushInterval: 10 * time.Second, + MaxEventInMemory: 100, + }, + { + FlushInterval: 20 * time.Second, + MaxEventInMemory: 200, + }, + { + FlushInterval: 30 * time.Second, + MaxEventInMemory: 300, + }, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + c := &ffClient.Config{ + DataExporter: tt.fields.DataExporter, + DataExporters: tt.fields.DataExporters, + } + got := c.GetDataExporters() + assert.Equal(t, tt.want, got) + }) + } +} + func TestOfflineConfig(t *testing.T) { c := ffClient.Config{ Offline: true, diff --git a/examples/data_export_log_and_file/flags.goff.yaml b/examples/data_export_log_and_file/flags.goff.yaml new file mode 100644 index 00000000000..b0783257a9d --- /dev/null +++ b/examples/data_export_log_and_file/flags.goff.yaml @@ -0,0 +1,22 @@ +new-admin-access: + variations: + default_var: false + false_var: false + true_var: true + defaultRule: + percentage: + false_var: 70 + true_var: 30 + +flag-only-for-admin: + variations: + default_var: false + false_var: false + true_var: true + targeting: + - query: admin eq true + percentage: + false_var: 0 + true_var: 100 + defaultRule: + variation: default_var diff --git a/examples/data_export_log_and_file/main.go b/examples/data_export_log_and_file/main.go new file mode 100644 index 00000000000..5d111ae8a7a --- /dev/null +++ b/examples/data_export_log_and_file/main.go @@ -0,0 +1,96 @@ +package main + +import ( + "context" + "log" + "log/slog" + "time" + + "github.com/thomaspoignant/go-feature-flag/ffcontext" + + "github.com/thomaspoignant/go-feature-flag/exporter/fileexporter" + "github.com/thomaspoignant/go-feature-flag/exporter/logsexporter" + "github.com/thomaspoignant/go-feature-flag/retriever/fileretriever" + + ffclient "github.com/thomaspoignant/go-feature-flag" +) + +func main() { + // Init ffclient with a file retriever. + err := ffclient.Init(ffclient.Config{ + PollingInterval: 10 * time.Second, + LeveledLogger: slog.Default(), + Context: context.Background(), + Retriever: &fileretriever.Retriever{ + Path: "examples/data_export_log_and_file/flags.goff.yaml", + }, + DataExporter: ffclient.DataExporter{ + FlushInterval: 1 * time.Second, + MaxEventInMemory: 2, + Exporter: &fileexporter.Exporter{ + Format: "json", + OutputDir: "./examples/data_export_log_and_file/variation-events/", + Filename: " flag-variation-EXAMPLE-{{ .Timestamp}}.{{ .Format}}", + }, + }, + DataExporters: []ffclient.DataExporter{ + { + FlushInterval: 1 * time.Second, + MaxEventInMemory: 4, + Exporter: &logsexporter.Exporter{ + LogFormat: "user=\"{{ .UserKey}}\", flag=\"{{ .Key}}\", " + + "value=\"{{ .Value}}\", variation=\"{{ .Variation}}\"", + }, + }, + }, + }) + // Check init errors. + if err != nil { + log.Fatal(err) + } + // defer closing ffclient + defer ffclient.Close() + + // create users + user1 := ffcontext. + NewEvaluationContextBuilder("aea2fdc1-b9a0-417a-b707-0c9083de68e3"). + AddCustom("anonymous", true). + Build() + user2 := ffcontext.NewEvaluationContext("332460b9-a8aa-4f7a-bc5d-9cc33632df9a") + + _, _ = ffclient.BoolVariation("new-admin-access", user1, false) + _, _ = ffclient.BoolVariation("new-admin-access", user2, false) + _, _ = ffclient.StringVariation("unknown-flag", user1, "defaultValue") + _, _ = ffclient.JSONVariation("unknown-flag-2", user1, map[string]interface{}{"test": "toto"}) + + // Wait 2 seconds to have a second file + time.Sleep(2 * time.Second) + _, _ = ffclient.BoolVariation("new-admin-access", user1, false) + _, _ = ffclient.BoolVariation("new-admin-access", user2, false) + + /* + The output which is written in the file will be like this: + + flag-variation-EXAMPLE-.json: + {"kind":"feature","contextKind":"anonymousUser","userKey":"aea2fdc1-b9a0-417a-b707-0c9083de68e3","creationDate":1618234129,"key":"new-admin-access","variation":"True","value":true,"default":false,"source":"SERVER"} + {"kind":"feature","contextKind":"user","userKey":"332460b9-a8aa-4f7a-bc5d-9cc33632df9a","creationDate":1618234129,"key":"new-admin-access","variation":"False","value":false,"default":false,"source":"SERVER"} + ---- + flag-variation-EXAMPLE-.log: + {"kind":"feature","contextKind":"anonymousUser","userKey":"aea2fdc1-b9a0-417a-b707-0c9083de68e3","creationDate":1618234129,"key":"unknown-flag","variation":"SdkDefault","value":"defaultValue","default":true,"source":"SERVER"} + {"kind":"feature","contextKind":"anonymousUser","userKey":"aea2fdc1-b9a0-417a-b707-0c9083de68e3","creationDate":1618234129,"key":"unknown-flag-2","variation":"SdkDefault","value":{"test":"toto"},"default":true,"source":"SERVER"} + ---- + flag-variation-EXAMPLE-.json: + {"kind":"feature","contextKind":"anonymousUser","userKey":"aea2fdc1-b9a0-417a-b707-0c9083de68e3","creationDate":1618234131,"key":"new-admin-access","variation":"True","value":true,"default":false,"source":"SERVER"} + {"kind":"feature","contextKind":"user","userKey":"332460b9-a8aa-4f7a-bc5d-9cc33632df9a","creationDate":1618234131,"key":"new-admin-access","variation":"False","value":false,"default":false,"source":"SERVER"} + + Meanwhile, the output which is written in the log will be like this: + + user="aea2fdc1-b9a0-417a-b707-0c9083de68e3", flag="new-admin-access", value="true", variation="true_var" + user="332460b9-a8aa-4f7a-bc5d-9cc33632df9a", flag="new-admin-access", value="false", variation="false_var" + user="aea2fdc1-b9a0-417a-b707-0c9083de68e3", flag="unknown-flag", value="defaultValue", variation="SdkDefault" + user="aea2fdc1-b9a0-417a-b707-0c9083de68e3", flag="unknown-flag-2", value="map[test:toto]", variation="SdkDefault" + user="aea2fdc1-b9a0-417a-b707-0c9083de68e3", flag="new-admin-access", value="true", variation="true_var" + user="332460b9-a8aa-4f7a-bc5d-9cc33632df9a", flag="new-admin-access", value="false", variation="false_var" + + */ +} diff --git a/feature_flag.go b/feature_flag.go index cd91c97be37..ac61aea0805 100644 --- a/feature_flag.go +++ b/feature_flag.go @@ -43,11 +43,11 @@ func Init(config Config) error { // GoFeatureFlag is the main object of the library // it contains the cache, the config, the updater and the exporter. type GoFeatureFlag struct { - cache cache.Manager - config Config - bgUpdater backgroundUpdater - dataExporter *exporter.Scheduler - retrieverManager *retriever.Manager + cache cache.Manager + config Config + bgUpdater backgroundUpdater + dataExporterSchedulers []*exporter.Scheduler + retrieverManager *retriever.Manager } // ff is the default object for go-feature-flag @@ -122,14 +122,24 @@ func New(config Config) (*GoFeatureFlag, error) { go goFF.startFlagUpdaterDaemon() - if goFF.config.DataExporter.Exporter != nil { - // init the data exporter - goFF.dataExporter = exporter.NewScheduler(goFF.config.Context, goFF.config.DataExporter.FlushInterval, - goFF.config.DataExporter.MaxEventInMemory, goFF.config.DataExporter.Exporter, goFF.config.internalLogger) - - // we start the daemon only if we have a bulk exporter - if goFF.config.DataExporter.Exporter.IsBulk() { - go goFF.dataExporter.StartDaemon() + dataExporters := config.GetDataExporters() + + // Initialize a Scheduler for each DataExporter, if any DataExporter is configured. + if len(dataExporters) > 0 { + goFF.dataExporterSchedulers = make([]*exporter.Scheduler, len(dataExporters)) + for i, dataExporter := range dataExporters { + goFF.dataExporterSchedulers[i] = exporter.NewScheduler( + goFF.config.Context, + dataExporter.FlushInterval, + dataExporter.MaxEventInMemory, + dataExporter.Exporter, + goFF.config.internalLogger, + ) + + // Start daemon if it's a bulk exporter + if dataExporter.Exporter.IsBulk() { + go goFF.dataExporterSchedulers[i].StartDaemon() + } } } } @@ -177,9 +187,12 @@ func (g *GoFeatureFlag) Close() { g.bgUpdater.close() } - if g.dataExporter != nil { - g.dataExporter.Close() + for _, dataExporterScheduler := range g.dataExporterSchedulers { + if dataExporterScheduler != nil { + dataExporterScheduler.Close() + } } + if g.retrieverManager != nil { _ = g.retrieverManager.Shutdown(g.config.Context) } diff --git a/variation.go b/variation.go index f9d32cd9545..7d2a0ec2b61 100644 --- a/variation.go +++ b/variation.go @@ -254,9 +254,10 @@ func (g *GoFeatureFlag) getFlagFromCache(flagKey string) (flag.Flag, error) { // CollectEventData is collecting events and sending them to the data exporter to be stored. func (g *GoFeatureFlag) CollectEventData(event exporter.FeatureEvent) { - if g != nil && g.dataExporter != nil { - // Add event in the exporter - g.dataExporter.AddEvent(event) + for _, dataExporterScheduler := range g.dataExporterSchedulers { + if dataExporterScheduler != nil { + dataExporterScheduler.AddEvent(event) + } } } diff --git a/variation_test.go b/variation_test.go index a95e6c0c97e..0d8015186dd 100644 --- a/variation_test.go +++ b/variation_test.go @@ -327,11 +327,13 @@ func TestBoolVariation(t *testing.T) { LeveledLogger: logger, Offline: tt.args.offline, }, - dataExporter: exporter.NewScheduler(context.Background(), 0, 0, - &logsexporter.Exporter{ - LogFormat: "user=\"{{ .UserKey}}\", flag=\"{{ .Key}}\", " + - "value=\"{{ .Value}}\", variation=\"{{ .Variation}}\"", - }, &fflog.FFLogger{LeveledLogger: logger}), + dataExporterSchedulers: []*exporter.Scheduler{ + exporter.NewScheduler(context.Background(), 0, 0, + &logsexporter.Exporter{ + LogFormat: "user=\"{{ .UserKey}}\", flag=\"{{ .Key}}\", " + + "value=\"{{ .Value}}\", variation=\"{{ .Variation}}\"", + }, &fflog.FFLogger{LeveledLogger: logger}), + }, } } @@ -727,11 +729,13 @@ func TestBoolVariationDetails(t *testing.T) { LeveledLogger: logger, Offline: tt.args.offline, }, - dataExporter: exporter.NewScheduler(context.Background(), 0, 0, - &logsexporter.Exporter{ - LogFormat: "user=\"{{ .UserKey}}\", flag=\"{{ .Key}}\", " + - "value=\"{{ .Value}}\", variation=\"{{ .Variation}}\"", - }, &fflog.FFLogger{LeveledLogger: logger}), + dataExporterSchedulers: []*exporter.Scheduler{ + exporter.NewScheduler(context.Background(), 0, 0, + &logsexporter.Exporter{ + LogFormat: "user=\"{{ .UserKey}}\", flag=\"{{ .Key}}\", " + + "value=\"{{ .Value}}\", variation=\"{{ .Variation}}\"", + }, &fflog.FFLogger{LeveledLogger: logger}), + }, } } @@ -1036,11 +1040,13 @@ func TestFloat64Variation(t *testing.T) { LeveledLogger: logger, Offline: tt.args.offline, }, - dataExporter: exporter.NewScheduler(context.Background(), 0, 0, - &logsexporter.Exporter{ - LogFormat: "user=\"{{ .UserKey}}\", flag=\"{{ .Key}}\", " + - "value=\"{{ .Value}}\", variation=\"{{ .Variation}}\"", - }, &fflog.FFLogger{LeveledLogger: logger}), + dataExporterSchedulers: []*exporter.Scheduler{ + exporter.NewScheduler(context.Background(), 0, 0, + &logsexporter.Exporter{ + LogFormat: "user=\"{{ .UserKey}}\", flag=\"{{ .Key}}\", " + + "value=\"{{ .Value}}\", variation=\"{{ .Variation}}\"", + }, &fflog.FFLogger{LeveledLogger: logger}), + }, } } @@ -1354,11 +1360,13 @@ func TestFloat64VariationDetails(t *testing.T) { LeveledLogger: logger, Offline: tt.args.offline, }, - dataExporter: exporter.NewScheduler(context.Background(), 0, 0, - &logsexporter.Exporter{ - LogFormat: "user=\"{{ .UserKey}}\", flag=\"{{ .Key}}\", " + - "value=\"{{ .Value}}\", variation=\"{{ .Variation}}\"", - }, &fflog.FFLogger{LeveledLogger: logger}), + dataExporterSchedulers: []*exporter.Scheduler{ + exporter.NewScheduler(context.Background(), 0, 0, + &logsexporter.Exporter{ + LogFormat: "user=\"{{ .UserKey}}\", flag=\"{{ .Key}}\", " + + "value=\"{{ .Value}}\", variation=\"{{ .Variation}}\"", + }, &fflog.FFLogger{LeveledLogger: logger}), + }, } } @@ -1648,9 +1656,11 @@ func TestJSONArrayVariation(t *testing.T) { LeveledLogger: logger, Offline: tt.args.offline, }, - dataExporter: exporter.NewScheduler(context.Background(), 0, 0, - &logsexporter.Exporter{LogFormat: "user=\"{{ .UserKey}}\", flag=\"{{ .Key}}\", " + - "value=\"{{ .Value}}\", variation=\"{{ .Variation}}\""}, &fflog.FFLogger{LeveledLogger: logger}), + dataExporterSchedulers: []*exporter.Scheduler{ + exporter.NewScheduler(context.Background(), 0, 0, + &logsexporter.Exporter{LogFormat: "user=\"{{ .UserKey}}\", flag=\"{{ .Key}}\", " + + "value=\"{{ .Value}}\", variation=\"{{ .Variation}}\""}, &fflog.FFLogger{LeveledLogger: logger}), + }, } } @@ -1953,9 +1963,11 @@ func TestJSONArrayVariationDetails(t *testing.T) { LeveledLogger: logger, Offline: tt.args.offline, }, - dataExporter: exporter.NewScheduler(context.Background(), 0, 0, - &logsexporter.Exporter{LogFormat: "user=\"{{ .UserKey}}\", flag=\"{{ .Key}}\", " + - "value=\"{{ .Value}}\", variation=\"{{ .Variation}}\""}, &fflog.FFLogger{LeveledLogger: logger}), + dataExporterSchedulers: []*exporter.Scheduler{ + exporter.NewScheduler(context.Background(), 0, 0, + &logsexporter.Exporter{LogFormat: "user=\"{{ .UserKey}}\", flag=\"{{ .Key}}\", " + + "value=\"{{ .Value}}\", variation=\"{{ .Variation}}\""}, &fflog.FFLogger{LeveledLogger: logger}), + }, } } @@ -2225,11 +2237,13 @@ func TestJSONVariation(t *testing.T) { LeveledLogger: logger, Offline: tt.args.offline, }, - dataExporter: exporter.NewScheduler(context.Background(), 0, 0, - &logsexporter.Exporter{ - LogFormat: "user=\"{{ .UserKey}}\", flag=\"{{ .Key}}\", " + - "value=\"{{ .Value}}\", variation=\"{{ .Variation}}\"", - }, &fflog.FFLogger{LeveledLogger: logger}), + dataExporterSchedulers: []*exporter.Scheduler{ + exporter.NewScheduler(context.Background(), 0, 0, + &logsexporter.Exporter{ + LogFormat: "user=\"{{ .UserKey}}\", flag=\"{{ .Key}}\", " + + "value=\"{{ .Value}}\", variation=\"{{ .Variation}}\"", + }, &fflog.FFLogger{LeveledLogger: logger}), + }, } } @@ -2458,11 +2472,13 @@ func TestJSONVariationDetails(t *testing.T) { LeveledLogger: logger, Offline: tt.args.offline, }, - dataExporter: exporter.NewScheduler(context.Background(), 0, 0, - &logsexporter.Exporter{ - LogFormat: "user=\"{{ .UserKey}}\", flag=\"{{ .Key}}\", " + - "value=\"{{ .Value}}\", variation=\"{{ .Variation}}\"", - }, &fflog.FFLogger{LeveledLogger: logger}), + dataExporterSchedulers: []*exporter.Scheduler{ + exporter.NewScheduler(context.Background(), 0, 0, + &logsexporter.Exporter{ + LogFormat: "user=\"{{ .UserKey}}\", flag=\"{{ .Key}}\", " + + "value=\"{{ .Value}}\", variation=\"{{ .Variation}}\"", + }, &fflog.FFLogger{LeveledLogger: logger}), + }, } } @@ -2735,11 +2751,13 @@ func TestStringVariation(t *testing.T) { LeveledLogger: logger, Offline: tt.args.offline, }, - dataExporter: exporter.NewScheduler(context.Background(), 0, 0, - &logsexporter.Exporter{ - LogFormat: "user=\"{{ .UserKey}}\", flag=\"{{ .Key}}\", " + - "value=\"{{ .Value}}\", variation=\"{{ .Variation}}\"", - }, &fflog.FFLogger{LeveledLogger: logger}), + dataExporterSchedulers: []*exporter.Scheduler{ + exporter.NewScheduler(context.Background(), 0, 0, + &logsexporter.Exporter{ + LogFormat: "user=\"{{ .UserKey}}\", flag=\"{{ .Key}}\", " + + "value=\"{{ .Value}}\", variation=\"{{ .Variation}}\"", + }, &fflog.FFLogger{LeveledLogger: logger}), + }, } } got, err := StringVariation(tt.args.flagKey, tt.args.user, tt.args.defaultValue) @@ -2967,11 +2985,13 @@ func TestStringVariationDetails(t *testing.T) { LeveledLogger: logger, Offline: tt.args.offline, }, - dataExporter: exporter.NewScheduler(context.Background(), 0, 0, - &logsexporter.Exporter{ - LogFormat: "user=\"{{ .UserKey}}\", flag=\"{{ .Key}}\", " + - "value=\"{{ .Value}}\", variation=\"{{ .Variation}}\"", - }, &fflog.FFLogger{LeveledLogger: logger}), + dataExporterSchedulers: []*exporter.Scheduler{ + exporter.NewScheduler(context.Background(), 0, 0, + &logsexporter.Exporter{ + LogFormat: "user=\"{{ .UserKey}}\", flag=\"{{ .Key}}\", " + + "value=\"{{ .Value}}\", variation=\"{{ .Variation}}\"", + }, &fflog.FFLogger{LeveledLogger: logger}), + }, } } got, err := StringVariationDetails(tt.args.flagKey, tt.args.user, tt.args.defaultValue) @@ -3274,11 +3294,13 @@ func TestIntVariation(t *testing.T) { LeveledLogger: logger, Offline: tt.args.offline, }, - dataExporter: exporter.NewScheduler(context.Background(), 0, 0, - &logsexporter.Exporter{ - LogFormat: "user=\"{{ .UserKey}}\", flag=\"{{ .Key}}\", " + - "value=\"{{ .Value}}\", variation=\"{{ .Variation}}\"", - }, &fflog.FFLogger{LeveledLogger: logger}), + dataExporterSchedulers: []*exporter.Scheduler{ + exporter.NewScheduler(context.Background(), 0, 0, + &logsexporter.Exporter{ + LogFormat: "user=\"{{ .UserKey}}\", flag=\"{{ .Key}}\", " + + "value=\"{{ .Value}}\", variation=\"{{ .Variation}}\"", + }, &fflog.FFLogger{LeveledLogger: logger}), + }, } } got, err := IntVariation(tt.args.flagKey, tt.args.user, tt.args.defaultValue) @@ -3548,11 +3570,13 @@ func TestIntVariationDetails(t *testing.T) { LeveledLogger: logger, Offline: tt.args.offline, }, - dataExporter: exporter.NewScheduler(context.Background(), 0, 0, - &logsexporter.Exporter{ - LogFormat: "user=\"{{ .UserKey}}\", flag=\"{{ .Key}}\", " + - "value=\"{{ .Value}}\", variation=\"{{ .Variation}}\"", - }, &fflog.FFLogger{LeveledLogger: logger}), + dataExporterSchedulers: []*exporter.Scheduler{ + exporter.NewScheduler(context.Background(), 0, 0, + &logsexporter.Exporter{ + LogFormat: "user=\"{{ .UserKey}}\", flag=\"{{ .Key}}\", " + + "value=\"{{ .Value}}\", variation=\"{{ .Variation}}\"", + }, &fflog.FFLogger{LeveledLogger: logger}), + }, } } got, err := IntVariationDetails(tt.args.flagKey, tt.args.user, tt.args.defaultValue) @@ -3916,11 +3940,13 @@ func TestRawVariation(t *testing.T) { LeveledLogger: logger, Offline: tt.args.offline, }, - dataExporter: exporter.NewScheduler(context.Background(), 0, 0, - &logsexporter.Exporter{ - LogFormat: "user=\"{{ .UserKey}}\", flag=\"{{ .Key}}\", " + - "value=\"{{ .Value}}\", variation=\"{{ .Variation}}\"", - }, &fflog.FFLogger{LeveledLogger: logger}), + dataExporterSchedulers: []*exporter.Scheduler{ + exporter.NewScheduler(context.Background(), 0, 0, + &logsexporter.Exporter{ + LogFormat: "user=\"{{ .UserKey}}\", flag=\"{{ .Key}}\", " + + "value=\"{{ .Value}}\", variation=\"{{ .Variation}}\"", + }, &fflog.FFLogger{LeveledLogger: logger}), + }, } } diff --git a/website/docs/go_module/configuration.md b/website/docs/go_module/configuration.md index 579c23c7cd5..5062f51f9c8 100644 --- a/website/docs/go_module/configuration.md +++ b/website/docs/go_module/configuration.md @@ -18,6 +18,7 @@ During the initialization you must give a [`ffclient.Config{}`](https://pkg.go.d | `Context` | *(optional)*
The context used by the retriever.
Default: **`context.Background()`** | | `Environment` | *(optional)*
The environment the app is running under, can be checked in feature flag rules.
Default: `""`
*Check [**"environments"** section](../configure_flag/flag_format/#environments) to understand how to use this parameter.* | | `DataExporter` | *(optional)*
DataExporter defines the method for exporting data on the usage of your flags.
*see [export data section](data_collection/index.md) for more details*. | +| `DataExporters` | *(optional)*
DataExporters is exactly the same thing as `DataExporter` but you can configure more than 1 exporter for your variation events.
All exporters are flushed in parallel without interdependencies.
*see [export data section](data_collection/index.md) for more details*. | | `FileFormat` | *(optional)*
Format of your configuration file. Available formats are `yaml`, `toml` and `json`, if you omit the field it will try to unmarshal the file as a `yaml` file.
Default: **`YAML`** | | `LeveledLogger` | *(optional)*
LeveledLogger is used to log what `go-feature-flag` is doing.
It should be a `slog` instance.
If no logger is provided the module will not log anything.
Default: **No log** | | `Notifiers` | *(optional)*
List of notifiers to call when your flag file has been changed.
*See [notifiers section](./notifier/index.md) for more details*. | diff --git a/website/docs/relay_proxy/configure_relay_proxy.md b/website/docs/relay_proxy/configure_relay_proxy.md index b4fd789aa51..2ed4584b085 100644 --- a/website/docs/relay_proxy/configure_relay_proxy.md +++ b/website/docs/relay_proxy/configure_relay_proxy.md @@ -46,6 +46,7 @@ ex: `AUTHORIZEDKEYS_EVALUATION=my-first-key,my-second-key`)_. | `fileFormat` | string | `yaml` | This is the format of your `go-feature-flag` configuration file. Acceptable values are `yaml`, `json`, `toml`. | | `startWithRetrieverError` | boolean | `false` | By default the **relay proxy** will crash if it is not able to retrieve the flags from the configuration.
If you don't want your relay proxy to crash, you can set `startWithRetrieverError` to true. Until the flag is retrievable the relay proxy will only answer with default values. | | `exporter` | [exporter](#exporter) | **none** | Exporter is the configuration used to export data. | +| `exporters` | [[]exporter](#exporter) | **none** | Exporters is the exact same thing as `exporter` but you can configure more than 1 exporter. | `notifier` | [notifier](#notifier) | **none** | Notifiers is the configuration on where to notify a flag change. | | `authorizedKeys` | [authorizedKeys](#type-authorizedkeys) | **none** | List of authorized API keys. | | `evaluationContextEnrichment` | object | **none** | It is a free field that will be merged with the evaluation context sent during the evaluation. It is useful to add common attributes to all the evaluations, such as a server version, environment, etc.

These fields will be included in the custom attributes of the evaluation context.

If in the evaluation context you have a field with the same name, it will be override by the `evaluationContextEnrichment`. | From 1c17a5465d4c03ae59116c7aa8d564fb67f6bff1 Mon Sep 17 00:00:00 2001 From: "Nguyen Viet Hoang (Stephen)" Date: Thu, 17 Oct 2024 22:56:17 +0900 Subject: [PATCH 2/8] chore: update unit tests --- .github/workflows/unassign-issue.yml | 3 +- cmd/relayproxy/config/config.go | 37 +++++++-- cmd/relayproxy/service/gofeatureflag_test.go | 82 ++++++++++++++++++++ config.go | 4 +- 4 files changed, 115 insertions(+), 11 deletions(-) diff --git a/.github/workflows/unassign-issue.yml b/.github/workflows/unassign-issue.yml index b72193a44ce..16e656a3f51 100644 --- a/.github/workflows/unassign-issue.yml +++ b/.github/workflows/unassign-issue.yml @@ -107,8 +107,7 @@ jobs: const staleIssues = await github.paginate(github.rest.issues.listForRepo, { owner, repo, state: 'open', - labels: staleLabel, - assignee: '*' + labels: staleLabel }); for (const issue of staleIssues) { diff --git a/cmd/relayproxy/config/config.go b/cmd/relayproxy/config/config.go index 5480816d823..b0e59541c21 100644 --- a/cmd/relayproxy/config/config.go +++ b/cmd/relayproxy/config/config.go @@ -309,6 +309,28 @@ func (c *Config) IsValid() error { return fmt.Errorf("invalid port %d", c.ListenPort) } + if err := c.validateRetrievers(); err != nil { + return err + } + + if err := c.validateExporters(); err != nil { + return err + } + + if err := c.validateNotifiers(); err != nil { + return err + } + + if c.LogLevel != "" { + if _, err := zapcore.ParseLevel(c.LogLevel); err != nil { + return err + } + } + + return nil +} + +func (c *Config) validateRetrievers() error { if c.Retriever == nil && c.Retrievers == nil { return fmt.Errorf("no retriever available in the configuration") } @@ -327,7 +349,10 @@ func (c *Config) IsValid() error { } } - // Exporter is optional + return nil +} + +func (c *Config) validateExporters() error { if c.Exporter != nil { if err := c.Exporter.IsValid(); err != nil { return err @@ -342,6 +367,10 @@ func (c *Config) IsValid() error { } } + return nil +} + +func (c *Config) validateNotifiers() error { if c.Notifiers != nil { for _, notif := range c.Notifiers { if err := notif.IsValid(); err != nil { @@ -349,12 +378,6 @@ func (c *Config) IsValid() error { } } } - if c.LogLevel != "" { - if _, err := zapcore.ParseLevel(c.LogLevel); err != nil { - return err - } - } - return nil } diff --git a/cmd/relayproxy/service/gofeatureflag_test.go b/cmd/relayproxy/service/gofeatureflag_test.go index 597080ff042..ecf36ab52f7 100644 --- a/cmd/relayproxy/service/gofeatureflag_test.go +++ b/cmd/relayproxy/service/gofeatureflag_test.go @@ -438,3 +438,85 @@ func TestNewGoFeatureFlagClient_ProxyConfNil(t *testing.T) { assert.Nil(t, goff, "Expected GoFeatureFlag client to be nil when proxyConf is nil") assert.EqualError(t, err, "proxy config is empty", "Expected error message to indicate empty proxy config") } + +func TestNewGoFeatureFlagClient(t *testing.T) { + // Create a logger for testing + logger := zap.NewNop() + + tests := []struct { + name string + proxyConf *config.Config + notifiers []notifier.Notifier + wantErr bool + }{ + { + name: "Valid configuration with HTTP retriever and webhook exporter", + proxyConf: &config.Config{ + ListenPort: 8080, + PollingInterval: 50000, + FileFormat: "yaml", + Retriever: &config.RetrieverConf{ + Kind: "http", + URL: "https://raw.githubusercontent.com/thomaspoignant/go-feature-flag/main/examples/retriever_file/flags.goff.yaml", + }, + Exporter: &config.ExporterConf{ + Kind: "webhook", + EndpointURL: "https://example.com/webhook", + Secret: "secret123", + }, + }, + notifiers: nil, + wantErr: false, + }, + { + name: "Valid configuration with multiple retrievers and exporters", + proxyConf: &config.Config{ + ListenPort: 8080, + PollingInterval: 60000, + FileFormat: "yaml", + Retrievers: &[]config.RetrieverConf{ + { + Kind: "http", + URL: "https://raw.githubusercontent.com/thomaspoignant/go-feature-flag/main/examples/retriever_file/flags.goff.yaml", + }, + }, + Exporters: &[]config.ExporterConf{ + { + Kind: "log", + }, + }, + }, + notifiers: nil, + wantErr: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + client, err := NewGoFeatureFlagClient(tt.proxyConf, logger, tt.notifiers) + + if tt.wantErr { + assert.Error(t, err) + assert.Nil(t, client) + } else { + assert.NoError(t, err) + assert.NotNil(t, client) + + // Additional checks on the client configuration + assert.Equal(t, int64(tt.proxyConf.PollingInterval), client.GetPollingInterval()) + + // Check if the client is not offline + assert.False(t, client.IsOffline()) + + // Force a refresh and check if it succeeds + assert.True(t, client.ForceRefresh()) + + // Check if the cache refresh date is recent + assert.WithinDuration(t, time.Now(), client.GetCacheRefreshDate(), 5*time.Second) + + // Clean up + client.Close() + } + }) + } +} diff --git a/config.go b/config.go index d947e3b517e..33d4407b91c 100644 --- a/config.go +++ b/config.go @@ -66,8 +66,8 @@ type Config struct { // DataExporter (optional) is the configuration where we store how we should output the flags variations results DataExporter DataExporter - // DataExporters (optional) is the list of configurations where we store how we should output the flags variations results - // Multiple exporters can be used to send the data to multiple destinations in parallel without interfering with each other. + // DataExporters (optional) are configurations where we store how to output the flags variations results + // Multiple exporters can be used to send data to multiple destinations in parallel without interference. DataExporters []DataExporter // StartWithRetrieverError (optional) If true, the SDK will start even if we did not get any flags from the retriever. From b64acf6974a50083d3b4800c2f81aa8540f0a87a Mon Sep 17 00:00:00 2001 From: "Nguyen Viet Hoang (Stephen)" Date: Thu, 17 Oct 2024 23:34:57 +0900 Subject: [PATCH 3/8] chore: reduce cognitive complexity --- cmd/relayproxy/service/gofeatureflag.go | 86 ++++++++++++++++--------- 1 file changed, 57 insertions(+), 29 deletions(-) diff --git a/cmd/relayproxy/service/gofeatureflag.go b/cmd/relayproxy/service/gofeatureflag.go index 4d21218e380..e8b3b0d6f5f 100644 --- a/cmd/relayproxy/service/gofeatureflag.go +++ b/cmd/relayproxy/service/gofeatureflag.go @@ -41,75 +41,103 @@ func NewGoFeatureFlagClient( logger *zap.Logger, notifiers []notifier.Notifier, ) (*ffclient.GoFeatureFlag, error) { - var mainRetriever retriever.Retriever - var err error - if proxyConf == nil { return nil, fmt.Errorf("proxy config is empty") } + mainRetriever, retrievers, err := initRetrievers(proxyConf) + if err != nil { + return nil, err + } + + mainDataExporter, dataExporters, err := initExporters(proxyConf) + if err != nil { + return nil, err + } + + notif, err := initNotifiers(proxyConf.Notifiers, notifiers) + if err != nil { + return nil, err + } + + f := ffclient.Config{ + PollingInterval: time.Duration(proxyConf.PollingInterval) * time.Millisecond, + LeveledLogger: slog.New(slogzap.Option{Level: slog.LevelDebug, Logger: logger}.NewZapHandler()), + Context: context.Background(), + Retriever: mainRetriever, + Retrievers: retrievers, + Notifiers: notif, + FileFormat: proxyConf.FileFormat, + DataExporter: mainDataExporter, + DataExporters: dataExporters, + StartWithRetrieverError: proxyConf.StartWithRetrieverError, + EnablePollingJitter: proxyConf.EnablePollingJitter, + EvaluationContextEnrichment: proxyConf.EvaluationContextEnrichment, + PersistentFlagConfigurationFile: proxyConf.PersistentFlagConfigurationFile, + } + + return ffclient.New(f) +} + +func initRetrievers(proxyConf *config.Config) (retriever.Retriever, []retriever.Retriever, error) { + var mainRetriever retriever.Retriever + var err error + if proxyConf.Retriever != nil { mainRetriever, err = initRetriever(proxyConf.Retriever) if err != nil { - return nil, err + return nil, nil, err } } - // Manage if we have more than 1 retriever retrievers := make([]retriever.Retriever, 0) if proxyConf.Retrievers != nil { for _, r := range *proxyConf.Retrievers { currentRetriever, err := initRetriever(&r) if err != nil { - return nil, err + return nil, nil, err } retrievers = append(retrievers, currentRetriever) } } + return mainRetriever, retrievers, nil +} + +func initExporters(proxyConf *config.Config) (ffclient.DataExporter, []ffclient.DataExporter, error) { var mainDataExporter ffclient.DataExporter + var err error + if proxyConf.Exporter != nil { mainDataExporter, err = initDataExporter(proxyConf.Exporter) if err != nil { - return nil, err + return ffclient.DataExporter{}, nil, err } } - // Manage the case where we have multiple data exporters dataExporters := make([]ffclient.DataExporter, 0) if proxyConf.Exporters != nil { for _, e := range *proxyConf.Exporters { currentExporter, err := initDataExporter(&e) if err != nil { - return nil, err + return ffclient.DataExporter{}, nil, err } dataExporters = append(dataExporters, currentExporter) } } - notif, err := initNotifier(proxyConf.Notifiers) + return mainDataExporter, dataExporters, nil +} + +func initNotifiers( + configNotifiers []config.NotifierConf, + additionalNotifiers []notifier.Notifier, +) ([]notifier.Notifier, error) { + notif, err := initNotifier(configNotifiers) if err != nil { return nil, err } - notif = append(notif, notifiers...) - - f := ffclient.Config{ - PollingInterval: time.Duration(proxyConf.PollingInterval) * time.Millisecond, - LeveledLogger: slog.New(slogzap.Option{Level: slog.LevelDebug, Logger: logger}.NewZapHandler()), - Context: context.Background(), - Retriever: mainRetriever, - Retrievers: retrievers, - Notifiers: notif, - FileFormat: proxyConf.FileFormat, - DataExporter: mainDataExporter, - DataExporters: dataExporters, - StartWithRetrieverError: proxyConf.StartWithRetrieverError, - EnablePollingJitter: proxyConf.EnablePollingJitter, - EvaluationContextEnrichment: proxyConf.EvaluationContextEnrichment, - PersistentFlagConfigurationFile: proxyConf.PersistentFlagConfigurationFile, - } - - return ffclient.New(f) + return append(notif, additionalNotifiers...), nil } // initRetriever initialize the retriever based on the configuration From f99def8417636adc8d55d6111faf0e7e584da187 Mon Sep 17 00:00:00 2001 From: "Nguyen Viet Hoang (Stephen)" Date: Tue, 22 Oct 2024 13:45:26 +0900 Subject: [PATCH 4/8] chore: revert github workflows fix --- .github/workflows/unassign-issue.yml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/workflows/unassign-issue.yml b/.github/workflows/unassign-issue.yml index 16e656a3f51..b72193a44ce 100644 --- a/.github/workflows/unassign-issue.yml +++ b/.github/workflows/unassign-issue.yml @@ -107,7 +107,8 @@ jobs: const staleIssues = await github.paginate(github.rest.issues.listForRepo, { owner, repo, state: 'open', - labels: staleLabel + labels: staleLabel, + assignee: '*' }); for (const issue of staleIssues) { From 159f0b107c2ddc5521292c509852f222d63a6f3d Mon Sep 17 00:00:00 2001 From: "Nguyen Viet Hoang (Stephen)" Date: Tue, 31 Dec 2024 00:40:40 +0900 Subject: [PATCH 5/8] chore: update multi-exporter handling --- cmd/relayproxy/service/gofeatureflag.go | 18 +- examples/data_export_log_and_file/main.go | 155 ++++++++++---- exporter/data_exporter.go | 241 ++++++++++++++++------ feature_flag.go | 46 ++++- 4 files changed, 335 insertions(+), 125 deletions(-) diff --git a/cmd/relayproxy/service/gofeatureflag.go b/cmd/relayproxy/service/gofeatureflag.go index 5d3b679abf4..a40ed5b3777 100644 --- a/cmd/relayproxy/service/gofeatureflag.go +++ b/cmd/relayproxy/service/gofeatureflag.go @@ -122,14 +122,16 @@ func initExporters(proxyConf *config.Config) (ffclient.DataExporter, []ffclient. } } - dataExporters := make([]ffclient.DataExporter, 0) - if proxyConf.Exporters != nil { - for _, e := range *proxyConf.Exporters { - currentExporter, err := initDataExporter(&e) - if err != nil { - return ffclient.DataExporter{}, nil, err - } - dataExporters = append(dataExporters, currentExporter) + if proxyConf.Exporters == nil { + return mainDataExporter, nil, nil + } + + // Initialize each exporter with its own configuration + dataExporters := make([]ffclient.DataExporter, len(*proxyConf.Exporters)) + for i, e := range *proxyConf.Exporters { + dataExporters[i], err = initDataExporter(&e) + if err != nil { + return ffclient.DataExporter{}, nil, err } } diff --git a/examples/data_export_log_and_file/main.go b/examples/data_export_log_and_file/main.go index 5d111ae8a7a..35518ebe85b 100644 --- a/examples/data_export_log_and_file/main.go +++ b/examples/data_export_log_and_file/main.go @@ -16,7 +16,7 @@ import ( ) func main() { - // Init ffclient with a file retriever. + // Init ffclient with multiple exporters err := ffclient.Init(ffclient.Config{ PollingInterval: 10 * time.Second, LeveledLogger: slog.Default(), @@ -24,73 +24,142 @@ func main() { Retriever: &fileretriever.Retriever{ Path: "examples/data_export_log_and_file/flags.goff.yaml", }, + // Main exporter (bulk) - file exporter with small buffer and short interval DataExporter: ffclient.DataExporter{ - FlushInterval: 1 * time.Second, - MaxEventInMemory: 2, + FlushInterval: 2 * time.Second, // Flush every 2 seconds + MaxEventInMemory: 3, // Flush after 3 events Exporter: &fileexporter.Exporter{ Format: "json", OutputDir: "./examples/data_export_log_and_file/variation-events/", - Filename: " flag-variation-EXAMPLE-{{ .Timestamp}}.{{ .Format}}", + Filename: "bulk-main-{{ .Timestamp}}.{{ .Format}}", }, }, + // Multiple additional exporters with different configurations DataExporters: []ffclient.DataExporter{ { + // Bulk exporter with larger buffer and longer interval + FlushInterval: 5 * time.Second, // Flush every 5 seconds + MaxEventInMemory: 5, // Flush after 5 events + Exporter: &fileexporter.Exporter{ + Format: "json", + OutputDir: "./examples/data_export_log_and_file/variation-events/", + Filename: "bulk-secondary-{{ .Timestamp}}.{{ .Format}}", + }, + }, + { + // Non-bulk exporter (logs) - should process immediately FlushInterval: 1 * time.Second, - MaxEventInMemory: 4, + MaxEventInMemory: 1, Exporter: &logsexporter.Exporter{ - LogFormat: "user=\"{{ .UserKey}}\", flag=\"{{ .Key}}\", " + - "value=\"{{ .Value}}\", variation=\"{{ .Variation}}\"", + LogFormat: "IMMEDIATE - user=\"{{ .UserKey}}\", flag=\"{{ .Key}}\", value=\"{{ .Value}}\", variation=\"{{ .Variation}}\"", + }, + }, + { + // Another bulk exporter with different settings + FlushInterval: 3 * time.Second, // Flush every 3 seconds + MaxEventInMemory: 4, // Flush after 4 events + Exporter: &fileexporter.Exporter{ + Format: "json", + OutputDir: "./examples/data_export_log_and_file/variation-events/", + Filename: "bulk-tertiary-{{ .Timestamp}}.{{ .Format}}", }, }, }, }) - // Check init errors. + if err != nil { log.Fatal(err) } - // defer closing ffclient defer ffclient.Close() - // create users - user1 := ffcontext. - NewEvaluationContextBuilder("aea2fdc1-b9a0-417a-b707-0c9083de68e3"). - AddCustom("anonymous", true). - Build() - user2 := ffcontext.NewEvaluationContext("332460b9-a8aa-4f7a-bc5d-9cc33632df9a") + // Create test users + user1 := ffcontext.NewEvaluationContextBuilder("user1").Build() + user2 := ffcontext.NewEvaluationContextBuilder("user2").Build() + user3 := ffcontext.NewEvaluationContextBuilder("user3").Build() - _, _ = ffclient.BoolVariation("new-admin-access", user1, false) - _, _ = ffclient.BoolVariation("new-admin-access", user2, false) - _, _ = ffclient.StringVariation("unknown-flag", user1, "defaultValue") - _, _ = ffclient.JSONVariation("unknown-flag-2", user1, map[string]interface{}{"test": "toto"}) + // Test scenario to trigger different flush conditions - // Wait 2 seconds to have a second file - time.Sleep(2 * time.Second) + log.Println("Phase 1: Generate 3 events") _, _ = ffclient.BoolVariation("new-admin-access", user1, false) _, _ = ffclient.BoolVariation("new-admin-access", user2, false) + _, _ = ffclient.BoolVariation("new-admin-access", user3, false) + + log.Println("Waiting 1 second") + time.Sleep(1000 * time.Millisecond) + + log.Println("Phase 2: Generate 2 more events") + _, _ = ffclient.StringVariation("unknown-flag", user1, "default1") + _, _ = ffclient.StringVariation("unknown-flag", user2, "default2") + + log.Println("Waiting 2 seconds...") + time.Sleep(2000 * time.Millisecond) + + log.Println("Phase 3: Generate 2 more events") + _, _ = ffclient.JSONVariation("json-flag", user1, map[string]interface{}{"test": "value1"}) + _, _ = ffclient.JSONVariation("json-flag", user2, map[string]interface{}{"test": "value2"}) + + log.Println("Waiting 3 seconds...") + time.Sleep(3000 * time.Millisecond) + + log.Println("Phase 4: Generate 1 final event") + _, _ = ffclient.JSONVariation("json-flag", user3, map[string]interface{}{"test": "value3"}) + + log.Println("Waiting 5 seconds...") + time.Sleep(5000 * time.Millisecond) /* - The output which is written in the file will be like this: - - flag-variation-EXAMPLE-.json: - {"kind":"feature","contextKind":"anonymousUser","userKey":"aea2fdc1-b9a0-417a-b707-0c9083de68e3","creationDate":1618234129,"key":"new-admin-access","variation":"True","value":true,"default":false,"source":"SERVER"} - {"kind":"feature","contextKind":"user","userKey":"332460b9-a8aa-4f7a-bc5d-9cc33632df9a","creationDate":1618234129,"key":"new-admin-access","variation":"False","value":false,"default":false,"source":"SERVER"} - ---- - flag-variation-EXAMPLE-.log: - {"kind":"feature","contextKind":"anonymousUser","userKey":"aea2fdc1-b9a0-417a-b707-0c9083de68e3","creationDate":1618234129,"key":"unknown-flag","variation":"SdkDefault","value":"defaultValue","default":true,"source":"SERVER"} - {"kind":"feature","contextKind":"anonymousUser","userKey":"aea2fdc1-b9a0-417a-b707-0c9083de68e3","creationDate":1618234129,"key":"unknown-flag-2","variation":"SdkDefault","value":{"test":"toto"},"default":true,"source":"SERVER"} - ---- - flag-variation-EXAMPLE-.json: - {"kind":"feature","contextKind":"anonymousUser","userKey":"aea2fdc1-b9a0-417a-b707-0c9083de68e3","creationDate":1618234131,"key":"new-admin-access","variation":"True","value":true,"default":false,"source":"SERVER"} - {"kind":"feature","contextKind":"user","userKey":"332460b9-a8aa-4f7a-bc5d-9cc33632df9a","creationDate":1618234131,"key":"new-admin-access","variation":"False","value":false,"default":false,"source":"SERVER"} - - Meanwhile, the output which is written in the log will be like this: - - user="aea2fdc1-b9a0-417a-b707-0c9083de68e3", flag="new-admin-access", value="true", variation="true_var" - user="332460b9-a8aa-4f7a-bc5d-9cc33632df9a", flag="new-admin-access", value="false", variation="false_var" - user="aea2fdc1-b9a0-417a-b707-0c9083de68e3", flag="unknown-flag", value="defaultValue", variation="SdkDefault" - user="aea2fdc1-b9a0-417a-b707-0c9083de68e3", flag="unknown-flag-2", value="map[test:toto]", variation="SdkDefault" - user="aea2fdc1-b9a0-417a-b707-0c9083de68e3", flag="new-admin-access", value="true", variation="true_var" - user="332460b9-a8aa-4f7a-bc5d-9cc33632df9a", flag="new-admin-access", value="false", variation="false_var" + Expected behavior: + + Phase 1 (3 events): + - Main exporter: Flushes immediately (hit max 3) + - Secondary exporter: Holds events (not yet at max 5) + - Tertiary exporter: Holds events (not yet at max 4) + - Logger: Processes immediately + + After 1s: + - No flushes (intervals not reached) + + Phase 2 (+2 events, total 5): + - Main exporter: Holds 2 events (not yet at max 3) + - Secondary exporter: Flushes immediately (hit max 5) + - Tertiary exporter: Flushes immediately at 4 events and then holds 1 event + - Logger: Processes immediately + + After 2s: + - Main exporter: Flushes (interval hit) + - Secondary exporter: Empty after previous flush + - Tertiary exporter: Holds 1 event (not yet at max 4) + + Phase 3 (+2 events, total 2 since last flush): + - Main exporter: Holds 2 events (not yet at max 3) + - Secondary exporter: Holds 2 events (not yet at max 5) + - Tertiary exporter: Holds 3 events (not yet at max 4) + - Logger: Processes immediately + + After 3s: + - Main exporter: Flushes (interval hit) + - Secondary exporter: Flushes (interval hit) + - Tertiary exporter: Flushed after only 1 second + + Phase 4 (+1 event, total 3 since last flush): + - Main exporter: Holds 1 event (not yet at max 3) + - Secondary exporter: Holds 1 event (not yet at max 5) + - Tertiary exporter: Holds 1 event (not yet at max 4) + - Logger: Processes immediately + + After 5s: + - Main exporter: Flushed after only 3 seconds + - Secondary exporter: Flushes remaining events (interval hit) + - Tertiary exporter: Flushed after only 1 second + + Finally: + - All exporters will flush any remaining events on Close() + Note: + - Total we have 8 events + - Main exporter will generate 4 files containing 3, 2, 2, 1 events respectively + - Secondary exporter will generate 3 files containing 5, 2, 1 events respectively + - Tertiary exporter will generate 3 files containing 4, 3, 1 events respectively + - Logger will generate 8 logs */ } diff --git a/exporter/data_exporter.go b/exporter/data_exporter.go index 3038a1d25d7..44fbc133527 100644 --- a/exporter/data_exporter.go +++ b/exporter/data_exporter.go @@ -16,110 +16,221 @@ const ( defaultMaxEventInMemory = int64(100000) ) -// NewScheduler allows creating a new instance of Scheduler ready to be used to export data. +// ExporterConfig holds the configuration for an individual exporter +type ExporterConfig struct { + Exporter CommonExporter + FlushInterval time.Duration + MaxEventInMemory int64 +} + +// ExporterState maintains the state for a single exporter +type ExporterState struct { + config ExporterConfig + ticker *time.Ticker + lastIndex int // Index of the last processed event +} + +// Scheduler handles data collection for one or more exporters +type Scheduler struct { + sharedCache []FeatureEvent + bulkExporters map[CommonExporter]*ExporterState // Only bulk exporters that need periodic flushing + directExporters []CommonExporter // Non-bulk exporters that flush immediately + mutex sync.Mutex + daemonChan chan struct{} + logger *fflog.FFLogger + ctx context.Context +} + +// NewScheduler creates a new scheduler that handles one exporter func NewScheduler(ctx context.Context, flushInterval time.Duration, maxEventInMemory int64, exp CommonExporter, logger *fflog.FFLogger, +) *Scheduler { + // Convert single exporter parameters to ExporterConfig + config := ExporterConfig{ + Exporter: exp, + FlushInterval: flushInterval, + MaxEventInMemory: maxEventInMemory, + } + return NewMultiScheduler(ctx, []ExporterConfig{config}, logger) +} + +// NewMultiScheduler creates a scheduler that handles multiple exporters +func NewMultiScheduler(ctx context.Context, exporterConfigs []ExporterConfig, logger *fflog.FFLogger, ) *Scheduler { if ctx == nil { ctx = context.Background() } - if flushInterval == 0 { - flushInterval = defaultFlushInterval - } + bulkExporters := make(map[CommonExporter]*ExporterState) + directExporters := make([]CommonExporter, 0) - if maxEventInMemory == 0 { - maxEventInMemory = defaultMaxEventInMemory + for _, config := range exporterConfigs { + if config.FlushInterval == 0 { + config.FlushInterval = defaultFlushInterval + } + if config.MaxEventInMemory == 0 { + config.MaxEventInMemory = defaultMaxEventInMemory + } + + if config.Exporter.IsBulk() { + state := &ExporterState{ + config: config, + lastIndex: -1, + ticker: time.NewTicker(config.FlushInterval), + } + bulkExporters[config.Exporter] = state + } else { + directExporters = append(directExporters, config.Exporter) + } } return &Scheduler{ - localCache: make([]FeatureEvent, 0), + sharedCache: make([]FeatureEvent, 0), + bulkExporters: bulkExporters, + directExporters: directExporters, mutex: sync.Mutex{}, - maxEventInCache: maxEventInMemory, - exporter: exp, daemonChan: make(chan struct{}), - ticker: time.NewTicker(flushInterval), logger: logger, ctx: ctx, } } -// Scheduler is the struct that handle the data collection. -type Scheduler struct { - localCache []FeatureEvent - mutex sync.Mutex - daemonChan chan struct{} - ticker *time.Ticker - maxEventInCache int64 - exporter CommonExporter - logger *fflog.FFLogger - ctx context.Context -} +// AddEvent adds an event to the shared cache and handles immediate export for non-bulk exporters +func (s *Scheduler) AddEvent(event FeatureEvent) { + s.mutex.Lock() + defer s.mutex.Unlock() -// AddEvent allow adding an event to the local cache and to call the exporter if we reach -// the maximum number of events that can be present in the cache. -func (dc *Scheduler) AddEvent(event FeatureEvent) { - if !dc.exporter.IsBulk() { - err := sendEvents(dc.ctx, dc.exporter, dc.logger, []FeatureEvent{event}) + // Handle non-bulk exporters immediately + for _, exporter := range s.directExporters { + err := sendEvents(s.ctx, exporter, s.logger, []FeatureEvent{event}) if err != nil { - dc.logger.Error(err.Error()) + s.logger.Error(err.Error()) + } + } + + // If we have no bulk exporters, we're done + if len(s.bulkExporters) == 0 { + return + } + + // Add event to shared cache for bulk exporters + s.sharedCache = append(s.sharedCache, event) + currentIndex := len(s.sharedCache) - 1 + + // Check if any bulk exporters need to flush due to max events + for _, state := range s.bulkExporters { + pendingCount := currentIndex - state.lastIndex + if state.config.MaxEventInMemory > 0 && int64(pendingCount) >= state.config.MaxEventInMemory { + s.flushExporter(state) } + } + + // Clean up events that have been processed by all exporters + s.cleanupProcessedEvents() +} + +// getPendingEvents returns events that haven't been processed by this exporter +func (s *Scheduler) getPendingEvents(state *ExporterState) []FeatureEvent { + if state.lastIndex+1 >= len(s.sharedCache) { + return nil + } + return s.sharedCache[state.lastIndex+1:] +} + +// flushExporter sends pending events to the specified exporter +func (s *Scheduler) flushExporter(state *ExporterState) { + pendingEvents := s.getPendingEvents(state) + if len(pendingEvents) == 0 { + return + } + + err := sendEvents(s.ctx, state.config.Exporter, s.logger, pendingEvents) + if err != nil { + s.logger.Error(err.Error()) + return + } + + // Update last processed index + state.lastIndex = len(s.sharedCache) - 1 +} + +// cleanupProcessedEvents removes events that have been processed by all bulk exporters +func (s *Scheduler) cleanupProcessedEvents() { + // If no bulk exporters, we can clear the cache + if len(s.bulkExporters) == 0 { + s.sharedCache = make([]FeatureEvent, 0) return } - dc.mutex.Lock() - defer dc.mutex.Unlock() - if int64(len(dc.localCache)) >= dc.maxEventInCache { - dc.flush() + // Find minimum lastIndex among bulk exporters + minIndex := len(s.sharedCache) + for _, state := range s.bulkExporters { + if state.lastIndex < minIndex { + minIndex = state.lastIndex + } + } + + // If all exporters have processed some events, we can remove them + if minIndex > 0 { + // Keep events from minIndex+1 onwards + s.sharedCache = s.sharedCache[minIndex+1:] + // Update lastIndex for all exporters + for _, state := range s.bulkExporters { + state.lastIndex -= (minIndex + 1) + } } - dc.localCache = append(dc.localCache, event) } -// StartDaemon will start a goroutine to check every X seconds if we should send the data. -// The daemon is started only if we have a bulk exporter. -func (dc *Scheduler) StartDaemon() { +// StartDaemon starts the periodic flush for bulk exporters +func (s *Scheduler) StartDaemon() { + // If no bulk exporters, no need for daemon + if len(s.bulkExporters) == 0 { + return + } + for { select { - case <-dc.ticker.C: - // send data and clear local cache - dc.mutex.Lock() - dc.flush() - dc.mutex.Unlock() - case <-dc.daemonChan: - // stop the daemon + case <-s.daemonChan: return + default: + s.mutex.Lock() + for _, state := range s.bulkExporters { + select { + case <-state.ticker.C: + s.flushExporter(state) + default: + // Continue if this ticker hasn't triggered + } + } + s.cleanupProcessedEvents() + s.mutex.Unlock() + // Small sleep to prevent busy waiting + time.Sleep(100 * time.Millisecond) } } } -// Close will stop the daemon and send the data still in the cache -func (dc *Scheduler) Close() { - // Close the daemon - dc.ticker.Stop() - close(dc.daemonChan) - - // Send the data still in the cache - dc.mutex.Lock() - dc.flush() - dc.mutex.Unlock() -} +// Close stops all tickers and flushes remaining events +func (s *Scheduler) Close() { + s.mutex.Lock() + defer s.mutex.Unlock() -// GetLogger will return the logger used by the scheduler -func (dc *Scheduler) GetLogger(level slog.Level) *log.Logger { - if dc.logger == nil { - return nil + // Stop all tickers and flush bulk exporters + for _, state := range s.bulkExporters { + state.ticker.Stop() + s.flushExporter(state) } - return dc.logger.GetLogLogger(level) + + close(s.daemonChan) + s.sharedCache = nil } -// flush will call the data exporter and clear the cache -func (dc *Scheduler) flush() { - err := sendEvents(dc.ctx, dc.exporter, dc.logger, dc.localCache) - if err != nil { - dc.logger.Error(err.Error()) - return +// GetLogger returns the logger used by the scheduler +func (s *Scheduler) GetLogger(level slog.Level) *log.Logger { + if s.logger == nil { + return nil } - dc.localCache = make([]FeatureEvent, 0) + return s.logger.GetLogLogger(level) } func sendEvents(ctx context.Context, exporter CommonExporter, logger *fflog.FFLogger, events []FeatureEvent) error { diff --git a/feature_flag.go b/feature_flag.go index 719e8d84bcc..ae95624213a 100644 --- a/feature_flag.go +++ b/feature_flag.go @@ -126,21 +126,49 @@ func New(config Config) (*GoFeatureFlag, error) { // Initialize a Scheduler for each DataExporter, if any DataExporter is configured. if len(dataExporters) > 0 { - goFF.dataExporterSchedulers = make([]*exporter.Scheduler, len(dataExporters)) - for i, dataExporter := range dataExporters { - goFF.dataExporterSchedulers[i] = exporter.NewScheduler( + var scheduler *exporter.Scheduler + if len(dataExporters) == 1 { + // Single exporter case + scheduler = exporter.NewScheduler( goFF.config.Context, - dataExporter.FlushInterval, - dataExporter.MaxEventInMemory, - dataExporter.Exporter, + dataExporters[0].FlushInterval, + dataExporters[0].MaxEventInMemory, + dataExporters[0].Exporter, goFF.config.internalLogger, ) + } else { + // Multiple exporters case + exporterConfigs := make([]exporter.ExporterConfig, len(dataExporters)) + for i, de := range dataExporters { + exporterConfigs[i] = exporter.ExporterConfig{ + Exporter: de.Exporter, + FlushInterval: de.FlushInterval, + MaxEventInMemory: de.MaxEventInMemory, + } + } + + scheduler = exporter.NewMultiScheduler( + goFF.config.Context, + exporterConfigs, + goFF.config.internalLogger, + ) + } - // Start daemon if it's a bulk exporter - if dataExporter.Exporter.IsBulk() { - go goFF.dataExporterSchedulers[i].StartDaemon() + // Start daemon if we have any bulk exporters + hasBulkExporters := false + for _, de := range dataExporters { + if de.Exporter.IsBulk() { + hasBulkExporters = true + break } } + if hasBulkExporters { + go scheduler.StartDaemon() + } + + // Store the scheduler + goFF.dataExporterSchedulers = make([]*exporter.Scheduler, 1) + goFF.dataExporterSchedulers[0] = scheduler } } config.internalLogger.Debug("GO Feature Flag is initialized") From 9f732ca45bacb0e0bed7e8b3a1e6bdbeb4caa2fc Mon Sep 17 00:00:00 2001 From: "Nguyen Viet Hoang (Stephen)" Date: Tue, 31 Dec 2024 01:04:41 +0900 Subject: [PATCH 6/8] chore: fix unit test --- cmd/relayproxy/config/config_test.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/cmd/relayproxy/config/config_test.go b/cmd/relayproxy/config/config_test.go index d4e425ad894..1faa9d8e171 100644 --- a/cmd/relayproxy/config/config_test.go +++ b/cmd/relayproxy/config/config_test.go @@ -109,7 +109,6 @@ func TestParseConfig_fileFromPflag(t *testing.T) { }, }, StartWithRetrieverError: false, - RestAPITimeout: 5000, Version: "1.X.X", EnableSwagger: true, AuthorizedKeys: config.APIKeys{ @@ -151,7 +150,6 @@ func TestParseConfig_fileFromPflag(t *testing.T) { }, }, StartWithRetrieverError: false, - RestAPITimeout: 5000, Version: "1.X.X", EnableSwagger: true, AuthorizedKeys: config.APIKeys{ From 7d00e461a1b65b13b38302bc966bf71577d7cc13 Mon Sep 17 00:00:00 2001 From: "Nguyen Viet Hoang (Stephen)" Date: Fri, 3 Jan 2025 15:35:20 +0900 Subject: [PATCH 7/8] chore: fix lints --- examples/data_export_log_and_file/main.go | 3 +- exporter/data_exporter.go | 24 ++-- feature_flag.go | 137 ++++++++++++---------- testutils/mock/exporter_mock.go | 1 + 4 files changed, 92 insertions(+), 73 deletions(-) diff --git a/examples/data_export_log_and_file/main.go b/examples/data_export_log_and_file/main.go index 35518ebe85b..f67cbbcf761 100644 --- a/examples/data_export_log_and_file/main.go +++ b/examples/data_export_log_and_file/main.go @@ -160,6 +160,7 @@ func main() { - Main exporter will generate 4 files containing 3, 2, 2, 1 events respectively - Secondary exporter will generate 3 files containing 5, 2, 1 events respectively - Tertiary exporter will generate 3 files containing 4, 3, 1 events respectively - - Logger will generate 8 logs + - Logger will generate 8 events in the logs + (format "IMMEDIATE - user=\"{{ .UserKey}}\", flag=\"{{ .Key}}\", value=\"{{ .Value}}\", variation=\"{{ .Variation}}\"") */ } diff --git a/exporter/data_exporter.go b/exporter/data_exporter.go index 44fbc133527..2c2f3205313 100644 --- a/exporter/data_exporter.go +++ b/exporter/data_exporter.go @@ -17,15 +17,15 @@ const ( ) // ExporterConfig holds the configuration for an individual exporter -type ExporterConfig struct { +type Config struct { Exporter CommonExporter FlushInterval time.Duration MaxEventInMemory int64 } // ExporterState maintains the state for a single exporter -type ExporterState struct { - config ExporterConfig +type State struct { + config Config ticker *time.Ticker lastIndex int // Index of the last processed event } @@ -33,8 +33,8 @@ type ExporterState struct { // Scheduler handles data collection for one or more exporters type Scheduler struct { sharedCache []FeatureEvent - bulkExporters map[CommonExporter]*ExporterState // Only bulk exporters that need periodic flushing - directExporters []CommonExporter // Non-bulk exporters that flush immediately + bulkExporters map[CommonExporter]*State // Only bulk exporters that need periodic flushing + directExporters []CommonExporter // Non-bulk exporters that flush immediately mutex sync.Mutex daemonChan chan struct{} logger *fflog.FFLogger @@ -46,22 +46,22 @@ func NewScheduler(ctx context.Context, flushInterval time.Duration, maxEventInMe exp CommonExporter, logger *fflog.FFLogger, ) *Scheduler { // Convert single exporter parameters to ExporterConfig - config := ExporterConfig{ + config := Config{ Exporter: exp, FlushInterval: flushInterval, MaxEventInMemory: maxEventInMemory, } - return NewMultiScheduler(ctx, []ExporterConfig{config}, logger) + return NewMultiScheduler(ctx, []Config{config}, logger) } // NewMultiScheduler creates a scheduler that handles multiple exporters -func NewMultiScheduler(ctx context.Context, exporterConfigs []ExporterConfig, logger *fflog.FFLogger, +func NewMultiScheduler(ctx context.Context, exporterConfigs []Config, logger *fflog.FFLogger, ) *Scheduler { if ctx == nil { ctx = context.Background() } - bulkExporters := make(map[CommonExporter]*ExporterState) + bulkExporters := make(map[CommonExporter]*State) directExporters := make([]CommonExporter, 0) for _, config := range exporterConfigs { @@ -73,7 +73,7 @@ func NewMultiScheduler(ctx context.Context, exporterConfigs []ExporterConfig, lo } if config.Exporter.IsBulk() { - state := &ExporterState{ + state := &State{ config: config, lastIndex: -1, ticker: time.NewTicker(config.FlushInterval), @@ -130,7 +130,7 @@ func (s *Scheduler) AddEvent(event FeatureEvent) { } // getPendingEvents returns events that haven't been processed by this exporter -func (s *Scheduler) getPendingEvents(state *ExporterState) []FeatureEvent { +func (s *Scheduler) getPendingEvents(state *State) []FeatureEvent { if state.lastIndex+1 >= len(s.sharedCache) { return nil } @@ -138,7 +138,7 @@ func (s *Scheduler) getPendingEvents(state *ExporterState) []FeatureEvent { } // flushExporter sends pending events to the specified exporter -func (s *Scheduler) flushExporter(state *ExporterState) { +func (s *Scheduler) flushExporter(state *State) { pendingEvents := s.getPendingEvents(state) if len(pendingEvents) == 0 { return diff --git a/feature_flag.go b/feature_flag.go index ae95624213a..70083f542c4 100644 --- a/feature_flag.go +++ b/feature_flag.go @@ -54,21 +54,18 @@ type GoFeatureFlag struct { var ff *GoFeatureFlag var onceFF sync.Once -// New creates a new go-feature-flag instances that retrieve the config from a YAML file -// and return everything you need to manage your flags. -func New(config Config) (*GoFeatureFlag, error) { +// validateAndSetDefaults validates the config and sets default values +func validateAndSetDefaults(config *Config) error { switch { case config.PollingInterval == 0: // The default value for the poll interval is 60 seconds config.PollingInterval = 60 * time.Second case config.PollingInterval < 0: // Check that value is not negative - return nil, fmt.Errorf("%d is not a valid PollingInterval value, it need to be > 0", config.PollingInterval) + return fmt.Errorf("%d is not a valid PollingInterval value, it need to be > 0", config.PollingInterval) case config.PollingInterval < time.Second: // the minimum value for the polling policy is 1 second config.PollingInterval = time.Second - default: - // do nothing } if config.offlineMutex == nil { @@ -80,6 +77,75 @@ func New(config Config) (*GoFeatureFlag, error) { LegacyLogger: config.Logger, } + return nil +} + +// initializeRetrievers sets up and initializes the retriever manager +func initializeRetrievers(config Config) (*retriever.Manager, error) { + retrievers, err := config.GetRetrievers() + if err != nil { + return nil, err + } + + manager := retriever.NewManager(config.Context, retrievers, config.internalLogger) + err = manager.Init(config.Context) + if err != nil && !config.StartWithRetrieverError { + return nil, fmt.Errorf("impossible to initialize the retrievers, please check your configuration: %v", err) + } + + return manager, nil +} + +// initializeExporters sets up the data exporters and starts their daemons if needed +func initializeExporters(config Config) []*exporter.Scheduler { + dataExporters := config.GetDataExporters() + if len(dataExporters) == 0 { + return nil + } + + var scheduler *exporter.Scheduler + if len(dataExporters) == 1 { + scheduler = exporter.NewScheduler( + config.Context, + dataExporters[0].FlushInterval, + dataExporters[0].MaxEventInMemory, + dataExporters[0].Exporter, + config.internalLogger, + ) + } else { + exporterConfigs := make([]exporter.Config, len(dataExporters)) + for i, de := range dataExporters { + exporterConfigs[i] = exporter.Config{ + Exporter: de.Exporter, + FlushInterval: de.FlushInterval, + MaxEventInMemory: de.MaxEventInMemory, + } + } + scheduler = exporter.NewMultiScheduler( + config.Context, + exporterConfigs, + config.internalLogger, + ) + } + + // Start daemon if we have any bulk exporters + for _, de := range dataExporters { + if de.Exporter.IsBulk() { + go scheduler.StartDaemon() + break + } + } + + return []*exporter.Scheduler{scheduler} +} + +// New creates a new go-feature-flag instances that retrieve the config from a YAML file +// and return everything you need to manage your flags. +func New(config Config) (*GoFeatureFlag, error) { + if err := validateAndSetDefaults(&config); err != nil { + return nil, err + } + goFF := &GoFeatureFlag{ config: config, } @@ -92,15 +158,11 @@ func New(config Config) (*GoFeatureFlag, error) { goFF.bgUpdater = newBackgroundUpdater(config.PollingInterval, config.EnablePollingJitter) goFF.cache = cache.New(notificationService, config.PersistentFlagConfigurationFile, config.internalLogger) - retrievers, err := config.GetRetrievers() + retrieverManager, err := initializeRetrievers(config) if err != nil { return nil, err } - goFF.retrieverManager = retriever.NewManager(config.Context, retrievers, config.internalLogger) - err = goFF.retrieverManager.Init(config.Context) - if err != nil && !config.StartWithRetrieverError { - return nil, fmt.Errorf("impossible to initialize the retrievers, please check your configuration: %v", err) - } + goFF.retrieverManager = retrieverManager err = retrieveFlagsAndUpdateCache(goFF.config, goFF.cache, goFF.retrieverManager, true) if err != nil { @@ -122,55 +184,10 @@ func New(config Config) (*GoFeatureFlag, error) { go goFF.startFlagUpdaterDaemon() - dataExporters := config.GetDataExporters() - - // Initialize a Scheduler for each DataExporter, if any DataExporter is configured. - if len(dataExporters) > 0 { - var scheduler *exporter.Scheduler - if len(dataExporters) == 1 { - // Single exporter case - scheduler = exporter.NewScheduler( - goFF.config.Context, - dataExporters[0].FlushInterval, - dataExporters[0].MaxEventInMemory, - dataExporters[0].Exporter, - goFF.config.internalLogger, - ) - } else { - // Multiple exporters case - exporterConfigs := make([]exporter.ExporterConfig, len(dataExporters)) - for i, de := range dataExporters { - exporterConfigs[i] = exporter.ExporterConfig{ - Exporter: de.Exporter, - FlushInterval: de.FlushInterval, - MaxEventInMemory: de.MaxEventInMemory, - } - } - - scheduler = exporter.NewMultiScheduler( - goFF.config.Context, - exporterConfigs, - goFF.config.internalLogger, - ) - } - - // Start daemon if we have any bulk exporters - hasBulkExporters := false - for _, de := range dataExporters { - if de.Exporter.IsBulk() { - hasBulkExporters = true - break - } - } - if hasBulkExporters { - go scheduler.StartDaemon() - } - - // Store the scheduler - goFF.dataExporterSchedulers = make([]*exporter.Scheduler, 1) - goFF.dataExporterSchedulers[0] = scheduler - } + schedulers := initializeExporters(config) + goFF.dataExporterSchedulers = schedulers } + config.internalLogger.Debug("GO Feature Flag is initialized") return goFF, nil } diff --git a/testutils/mock/exporter_mock.go b/testutils/mock/exporter_mock.go index e8ce2b63511..6d5dfccb878 100644 --- a/testutils/mock/exporter_mock.go +++ b/testutils/mock/exporter_mock.go @@ -26,6 +26,7 @@ func (m *Exporter) Export(ctx context.Context, _ *fflog.FFLogger, events []expor m.ExportedEvents = append(m.ExportedEvents, events...) if m.Err != nil { if m.ExpectedNumberErr > m.CurrentNumberErr { + m.ExportedEvents = m.ExportedEvents[:len(m.ExportedEvents)-len(events)] m.CurrentNumberErr++ return m.Err } From 2a49d4c9bf527846214895d8c3c344e373e3a2dd Mon Sep 17 00:00:00 2001 From: "Nguyen Viet Hoang (Stephen)" Date: Fri, 3 Jan 2025 16:25:18 +0900 Subject: [PATCH 8/8] chore: add unit test --- feature_flag_test.go | 99 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 99 insertions(+) diff --git a/feature_flag_test.go b/feature_flag_test.go index 1b8d2b95469..3c60990a309 100644 --- a/feature_flag_test.go +++ b/feature_flag_test.go @@ -743,3 +743,102 @@ func Test_DisableNotifierOnInit(t *testing.T) { }) } } + +func TestMultipleDataExporters(t *testing.T) { + // Create a client with multiple exporters + config := ffclient.Config{ + PollingInterval: 5 * time.Second, + Retriever: &fileretriever.Retriever{Path: "testdata/flag-config.yaml"}, + LeveledLogger: slog.Default(), + // Main exporter (bulk) + DataExporter: ffclient.DataExporter{ + FlushInterval: 2 * time.Second, + MaxEventInMemory: 3, + Exporter: &mock.Exporter{ + Bulk: true, + }, + }, + // Additional exporters + DataExporters: []ffclient.DataExporter{ + { + // Bulk exporter with different settings + FlushInterval: 5 * time.Second, + MaxEventInMemory: 5, + Exporter: &mock.Exporter{ + Bulk: true, + }, + }, + { + // Non-bulk exporter + FlushInterval: 1 * time.Second, // Should be ignored + MaxEventInMemory: 1, // Should be ignored + Exporter: &mock.Exporter{ + Bulk: false, + }, + }, + { + // Another bulk exporter + FlushInterval: 3 * time.Second, + MaxEventInMemory: 4, + Exporter: &mock.Exporter{ + Bulk: true, + }, + }, + }, + } + + gffClient, err := ffclient.New(config) + assert.NoError(t, err) + defer gffClient.Close() + + // Create test user + user := ffcontext.NewEvaluationContext("test-user") + + // Generate events to test exporters + // Phase 1: Generate 3 events (should trigger main exporter's MaxEventInMemory) + _, _ = gffClient.BoolVariation("test-flag", user, false) + _, _ = gffClient.BoolVariation("test-flag", user, false) + _, _ = gffClient.BoolVariation("test-flag", user, false) + + // Wait 1 second + time.Sleep(1 * time.Second) + + // Phase 2: Generate 2 more events (should trigger secondary exporter's MaxEventInMemory) + _, _ = gffClient.StringVariation("unknown-flag", user, "default1") + _, _ = gffClient.StringVariation("unknown-flag", user, "default2") + + // Wait 2 seconds (should trigger main exporter's FlushInterval) + time.Sleep(2 * time.Second) + + // Phase 3: Generate 2 more events + _, _ = gffClient.JSONVariation("json-flag", user, map[string]interface{}{"test": "value1"}) + _, _ = gffClient.JSONVariation("json-flag", user, map[string]interface{}{"test": "value2"}) + + // Wait 3 seconds (should trigger tertiary exporter's FlushInterval) + time.Sleep(3 * time.Second) + + // Phase 4: Generate 1 final event + _, _ = gffClient.JSONVariation("json-flag", user, map[string]interface{}{"test": "value3"}) + + // Wait 5 seconds (should trigger secondary exporter's FlushInterval) + time.Sleep(5 * time.Second) + + // Verify that all exporters received events + for _, de := range config.GetDataExporters() { + mockExporter, ok := de.Exporter.(*mock.Exporter) + assert.True(t, ok, "Exporter should be a mock exporter") + + if !mockExporter.IsBulk() { + // Non-bulk exporter should have received each event immediately + assert.Equal(t, 8, len(mockExporter.GetExportedEvents()), "Non-bulk exporter should have received all events") + } else { + // Bulk exporters should have received events in batches + events := mockExporter.GetExportedEvents() + assert.Greater(t, len(events), 0, "Bulk exporter should have received some events") + // Each batch should respect the MaxEventInMemory limit + for _, event := range events { + assert.NotNil(t, event) + } + } + } +}