From cbe60d2dd1f366a082f045317fd6b194f2f17708 Mon Sep 17 00:00:00 2001 From: Pulak Bhowmick Date: Fri, 17 Nov 2023 00:07:56 +0600 Subject: [PATCH 01/18] feat: add redis syncer for webhook --- Dockerfile | 14 +++++++ cmd/optimizely/main.go | 5 ++- config.yaml | 17 ++++---- config/config.go | 7 ++++ pkg/handlers/webhook.go | 86 ++++++++++++++++++++++++++++++++++++++++- pkg/routers/webhook.go | 12 +++++- pkg/syncer/syncer.go | 26 +++++++++++-- 7 files changed, 151 insertions(+), 16 deletions(-) create mode 100644 Dockerfile diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 00000000..fd8aea13 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,14 @@ +FROM golang:1.21 as builder +RUN addgroup -u 1000 agentgroup &&\ + useradd -u 1000 agentuser -g agentgroup +WORKDIR /go/src/github.com/optimizely/agent +COPY . . +RUN make setup build &&\ + make ci_build_static_binary + +FROM scratch +COPY --from=builder /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ +COPY --from=builder /go/src/github.com/optimizely/agent/bin/optimizely /optimizely +COPY --from=builder /etc/passwd /etc/passwd +USER agentuser +CMD ["/optimizely"] diff --git a/cmd/optimizely/main.go b/cmd/optimizely/main.go index d58f95b4..c99d4932 100644 --- a/cmd/optimizely/main.go +++ b/cmd/optimizely/main.go @@ -266,7 +266,8 @@ func main() { sdkMetricsRegistry := optimizely.NewRegistry(agentMetricsRegistry) ctx, cancel := context.WithCancel(context.Background()) // Create default service context - sg := server.NewGroup(ctx, conf.Server) // Create a new server group to manage the individual http listeners + defer cancel() + sg := server.NewGroup(ctx, conf.Server) // Create a new server group to manage the individual http listeners optlyCache := optimizely.NewCache(ctx, *conf, sdkMetricsRegistry) optlyCache.Init(conf.SDKKeys) @@ -286,7 +287,7 @@ func main() { log.Info().Str("version", conf.Version).Msg("Starting services.") sg.GoListenAndServe("api", conf.API.Port, apiRouter) - sg.GoListenAndServe("webhook", conf.Webhook.Port, routers.NewWebhookRouter(optlyCache, conf.Webhook)) + sg.GoListenAndServe("webhook", conf.Webhook.Port, routers.NewWebhookRouter(ctx, optlyCache, *conf)) sg.GoListenAndServe("admin", conf.Admin.Port, adminRouter) // Admin should be added last. // wait for server group to shutdown diff --git a/config.yaml b/config.yaml index 87b872a0..be919ad1 100644 --- a/config.yaml +++ b/config.yaml @@ -145,17 +145,17 @@ webhook: ## http listener port port: "8089" # ## a map of Optimizely Projects to one or more SDK keys -# projects: -# ## : Optimizely project id as an integer -# : -# ## sdkKeys: a list of SDKs linked to this project -# sdkKeys: -# - + projects: + ## : Optimizely project id as an integer + 23897262460: + ## sdkKeys: a list of SDKs linked to this project + sdkKeys: + - RiowyaMnbPxLa4dPWrDqu # - # ## secret: webhook secret used the validate the notification # secret: # ## skipSignatureCheck: override the signature check (not recommended for production) -# skipSignatureCheck: true + skipSignatureCheck: true ## ## optimizely client configurations (options passed to the underlying go-sdk) @@ -257,3 +257,6 @@ synchronization: notification: enable: false default: "redis" + datafile: + enable: true + default: "redis" diff --git a/config/config.go b/config/config.go index 428c3992..1ba36adb 100644 --- a/config/config.go +++ b/config/config.go @@ -168,6 +168,7 @@ type AgentConfig struct { type SyncConfig struct { Pubsub map[string]interface{} `json:"pubsub"` Notification NotificationConfig `json:"notification"` + Datafile DatafileSyncConfig `json:"datafile"` } // NotificationConfig contains Notification Synchronization configuration for the multiple Agent nodes @@ -176,6 +177,12 @@ type NotificationConfig struct { Default string `json:"default"` } +// DatafileSyncConfig contains Datafile Synchronization configuration for the multiple Agent nodes +type DatafileSyncConfig struct { + Enable bool `json:"enable"` + Default string `json:"default"` +} + // HTTPSDisabledWarning is logged when keyfile and certfile are not provided in server configuration var HTTPSDisabledWarning = "keyfile and certfile not available, so server will use HTTP. For production deployments, it is recommended to either set keyfile and certfile for HTTPS, or run Agent behind a load balancer/reverse proxy that uses HTTPS." diff --git a/pkg/handlers/webhook.go b/pkg/handlers/webhook.go index 86fc6ade..528fdbdf 100644 --- a/pkg/handlers/webhook.go +++ b/pkg/handlers/webhook.go @@ -18,21 +18,26 @@ package handlers import ( + "context" "crypto/hmac" "crypto/sha1" "crypto/subtle" "encoding/hex" "encoding/json" + "fmt" "io" "net/http" "strconv" + "github.com/go-redis/redis/v8" "github.com/optimizely/agent/config" "github.com/go-chi/render" + "github.com/rs/zerolog" "github.com/rs/zerolog/log" "github.com/optimizely/agent/pkg/optimizely" + "github.com/optimizely/agent/pkg/syncer" ) const signatureHeader = "X-Hub-Signature" @@ -58,13 +63,15 @@ type OptlyMessage struct { type OptlyWebhookHandler struct { optlyCache optimizely.Cache ProjectMap map[int64]config.WebhookProject + syncConfig config.SyncConfig } // NewWebhookHandler returns a new instance of OptlyWebhookHandler -func NewWebhookHandler(optlyCache optimizely.Cache, projectMap map[int64]config.WebhookProject) *OptlyWebhookHandler { +func NewWebhookHandler(optlyCache optimizely.Cache, projectMap map[int64]config.WebhookProject, conf config.SyncConfig) *OptlyWebhookHandler { return &OptlyWebhookHandler{ optlyCache: optlyCache, ProjectMap: projectMap, + syncConfig: conf, } } @@ -140,7 +147,84 @@ func (h *OptlyWebhookHandler) HandleWebhook(w http.ResponseWriter, r *http.Reque // Iterate through all SDK keys and update config for _, sdkKey := range webhookConfig.SDKKeys { + fmt.Println("=========== updating config =============") h.optlyCache.UpdateConfigs(sdkKey) } + + if h.syncConfig.Datafile.Enable { + log.Info().Msg("======================= Syncing datafile ============================") + for _, sdkKey := range webhookConfig.SDKKeys { + log.Info().Msg("====================== sdk key ============================") + log.Info().Msg(sdkKey) + syncer, err := syncer.NewRedisSyncer(&zerolog.Logger{}, h.syncConfig, sdkKey) + if err != nil { + errMsg := fmt.Sprintf("datafile synced failed. reason: %s", err.Error()) + log.Error().Msg(errMsg) + render.Status(r, http.StatusInternalServerError) + render.JSON(w, r, render.M{ + "error": errMsg, + }) + return + } + + if err := syncer.SyncConfig(sdkKey); err != nil { + errMsg := fmt.Sprintf("datafile synced failed. reason: %s", err.Error()) + log.Error().Msg(errMsg) + render.Status(r, http.StatusInternalServerError) + render.JSON(w, r, render.M{ + "error": errMsg, + }) + return + } + } + } w.WriteHeader(http.StatusNoContent) } + +func (h *OptlyWebhookHandler) StartSyncer(ctx context.Context) error { + fmt.Println("================ starting syncer ===================") + redisSyncer, err := syncer.NewRedisSyncer(&zerolog.Logger{}, h.syncConfig, "") + if err != nil { + return err + } + + client := redis.NewClient(&redis.Options{ + Addr: redisSyncer.Host, + Password: redisSyncer.Password, + DB: redisSyncer.Database, + }) + + // Subscribe to a Redis channel + pubsub := client.Subscribe(ctx, syncer.GetDatafileSyncChannel()) + + logger, ok := ctx.Value(LoggerKey).(*zerolog.Logger) + if !ok { + logger = &zerolog.Logger{} + } + + go func() { + for { + select { + case <-ctx.Done(): + pubsub.Close() + client.Close() + logger.Debug().Msg("context canceled, redis notification receiver is closed") + return + default: + // fmt.Println("====================== waiting for message ============================") + msg, err := pubsub.ReceiveMessage(ctx) + if err != nil { + logger.Err(err).Msg("failed to receive message from redis") + continue + } + + fmt.Println("===================== message from redis: ", msg.Payload, "=========================") + logger.Info().Msg("received message from redis") + logger.Info().Msg(msg.Payload) + + h.optlyCache.UpdateConfigs(msg.Payload) + } + } + }() + return nil +} diff --git a/pkg/routers/webhook.go b/pkg/routers/webhook.go index 8d0af18a..3526080f 100644 --- a/pkg/routers/webhook.go +++ b/pkg/routers/webhook.go @@ -18,6 +18,9 @@ package routers import ( + "context" + "fmt" + "github.com/optimizely/agent/config" "github.com/optimizely/agent/pkg/handlers" @@ -29,12 +32,17 @@ import ( ) // NewWebhookRouter returns HTTP API router -func NewWebhookRouter(optlyCache optimizely.Cache, conf config.WebhookConfig) *chi.Mux { +func NewWebhookRouter(ctx context.Context, optlyCache optimizely.Cache, conf config.AgentConfig) *chi.Mux { r := chi.NewRouter() r.Use(chimw.AllowContentType("application/json")) r.Use(render.SetContentType(render.ContentTypeJSON)) - webhookAPI := handlers.NewWebhookHandler(optlyCache, conf.Projects) + webhookAPI := handlers.NewWebhookHandler(optlyCache, conf.Webhook.Projects, conf.Synchronization) + if conf.Synchronization.Datafile.Enable { + if err := webhookAPI.StartSyncer(ctx); err != nil { + fmt.Errorf("failed to start datafile syncer: %s", err.Error()) + } + } r.Post("/webhooks/optimizely", webhookAPI.HandleWebhook) return r diff --git a/pkg/syncer/syncer.go b/pkg/syncer/syncer.go index aa102188..b8c394fe 100644 --- a/pkg/syncer/syncer.go +++ b/pkg/syncer/syncer.go @@ -64,13 +64,10 @@ func NewRedisSyncer(logger *zerolog.Logger, conf config.SyncConfig, sdkKey strin mutexLock.Lock() defer mutexLock.Unlock() - if nc, found := ncCache[sdkKey]; found { + if nc, found := ncCache[sdkKey]; found && sdkKey != "" { return nc, nil } - if !conf.Notification.Enable { - return nil, errors.New("notification syncer is not enabled") - } if conf.Notification.Default != PubSubRedis { return nil, errors.New("redis syncer is not set as default") } @@ -159,6 +156,27 @@ func (r *RedisSyncer) Send(t notification.Type, n interface{}) error { return nil } +func (r *RedisSyncer) SyncConfig(sdkKey string) error { + client := redis.NewClient(&redis.Options{ + Addr: r.Host, + Password: r.Password, + DB: r.Database, + }) + defer client.Close() + channel := GetDatafileSyncChannel() + + if err := client.Publish(r.ctx, channel, sdkKey).Err(); err != nil { + r.logger.Err(err).Msg("failed to publish datafile sync event to pub/sub") + return err + } + fmt.Println("====================== published message ============================") + return nil +} + +func GetDatafileSyncChannel() string { + return fmt.Sprintf("%s-datafile", PubSubDefaultChan) +} + func GetChannelForSDKKey(channel, key string) string { return fmt.Sprintf("%s-%s", channel, key) } From dfb0f018d534c1721321e759e991229249e0ad33 Mon Sep 17 00:00:00 2001 From: Pulak Bhowmick Date: Wed, 22 Nov 2023 02:34:45 +0600 Subject: [PATCH 02/18] Modify syncer --- config/config.go | 16 +--- pkg/handlers/notification.go | 28 ++---- pkg/handlers/notification_test.go | 4 +- pkg/handlers/webhook.go | 40 +++----- pkg/optimizely/cache.go | 4 +- pkg/syncer/pubsub.go | 100 +++++++++++++++++++ pkg/syncer/syncer.go | 154 ++++++++++-------------------- 7 files changed, 181 insertions(+), 165 deletions(-) create mode 100644 pkg/syncer/pubsub.go diff --git a/config/config.go b/config/config.go index 1ba36adb..67234cf8 100644 --- a/config/config.go +++ b/config/config.go @@ -135,7 +135,7 @@ func NewDefaultConfig() *AgentConfig { "channel": "optimizely-notifications", }, }, - Notification: NotificationConfig{ + Notification: FeatureSyncConfig{ Enable: false, Default: "redis", }, @@ -167,18 +167,12 @@ type AgentConfig struct { // SyncConfig contains Synchronization configuration for the multiple Agent nodes type SyncConfig struct { Pubsub map[string]interface{} `json:"pubsub"` - Notification NotificationConfig `json:"notification"` - Datafile DatafileSyncConfig `json:"datafile"` + Notification FeatureSyncConfig `json:"notification"` + Datafile FeatureSyncConfig `json:"datafile"` } -// NotificationConfig contains Notification Synchronization configuration for the multiple Agent nodes -type NotificationConfig struct { - Enable bool `json:"enable"` - Default string `json:"default"` -} - -// DatafileSyncConfig contains Datafile Synchronization configuration for the multiple Agent nodes -type DatafileSyncConfig struct { +// FeatureSyncConfig contains Notification Synchronization configuration for the multiple Agent nodes +type FeatureSyncConfig struct { Enable bool `json:"enable"` Default string `json:"default"` } diff --git a/pkg/handlers/notification.go b/pkg/handlers/notification.go index 5c391cf8..e2ec4119 100644 --- a/pkg/handlers/notification.go +++ b/pkg/handlers/notification.go @@ -25,7 +25,6 @@ import ( "net/http" "strings" - "github.com/go-redis/redis/v8" "github.com/optimizely/agent/config" "github.com/optimizely/agent/pkg/middleware" "github.com/optimizely/agent/pkg/syncer" @@ -219,19 +218,15 @@ func RedisNotificationReceiver(conf config.SyncConfig) NotificationReceiverFunc return nil, errors.New("sdk key not found") } - redisSyncer, err := syncer.NewRedisSyncer(&zerolog.Logger{}, conf, sdkKey) + redisSyncer, err := syncer.NewSyncedNotificationCenter(ctx, &zerolog.Logger{}, sdkKey, conf) if err != nil { return nil, err } - client := redis.NewClient(&redis.Options{ - Addr: redisSyncer.Host, - Password: redisSyncer.Password, - DB: redisSyncer.Database, - }) - - // Subscribe to a Redis channel - pubsub := client.Subscribe(ctx, syncer.GetChannelForSDKKey(redisSyncer.Channel, sdkKey)) + eventCh, err := redisSyncer.Subscribe(ctx, syncer.GetChannelForSDKKey("opti", sdkKey)) + if err != nil { + return nil, err + } dataChan := make(chan syncer.Event) @@ -244,19 +239,12 @@ func RedisNotificationReceiver(conf config.SyncConfig) NotificationReceiverFunc for { select { case <-ctx.Done(): - client.Close() - pubsub.Close() + close(dataChan) logger.Debug().Msg("context canceled, redis notification receiver is closed") return - default: - msg, err := pubsub.ReceiveMessage(ctx) - if err != nil { - logger.Err(err).Msg("failed to receive message from redis") - continue - } - + case msg := <-eventCh: var event syncer.Event - if err := json.Unmarshal([]byte(msg.Payload), &event); err != nil { + if err := json.Unmarshal([]byte(msg), &event); err != nil { logger.Err(err).Msg("failed to unmarshal redis message") continue } diff --git a/pkg/handlers/notification_test.go b/pkg/handlers/notification_test.go index 82d0b95e..09235a06 100644 --- a/pkg/handlers/notification_test.go +++ b/pkg/handlers/notification_test.go @@ -216,7 +216,7 @@ func (suite *NotificationTestSuite) TestTrackAndProjectConfigWithSynchronization "database": 0, }, }, - Notification: config.NotificationConfig{ + Notification: config.FeatureSyncConfig{ Enable: true, Default: "redis", }, @@ -370,7 +370,7 @@ func TestRedisNotificationReceiver(t *testing.T) { "database": 0, }, }, - Notification: config.NotificationConfig{ + Notification: config.FeatureSyncConfig{ Enable: true, Default: "redis", }, diff --git a/pkg/handlers/webhook.go b/pkg/handlers/webhook.go index 528fdbdf..2080de9e 100644 --- a/pkg/handlers/webhook.go +++ b/pkg/handlers/webhook.go @@ -29,7 +29,6 @@ import ( "net/http" "strconv" - "github.com/go-redis/redis/v8" "github.com/optimizely/agent/config" "github.com/go-chi/render" @@ -156,7 +155,7 @@ func (h *OptlyWebhookHandler) HandleWebhook(w http.ResponseWriter, r *http.Reque for _, sdkKey := range webhookConfig.SDKKeys { log.Info().Msg("====================== sdk key ============================") log.Info().Msg(sdkKey) - syncer, err := syncer.NewRedisSyncer(&zerolog.Logger{}, h.syncConfig, sdkKey) + dfSyncer, err := syncer.NewDatafileSyncer(h.syncConfig) if err != nil { errMsg := fmt.Sprintf("datafile synced failed. reason: %s", err.Error()) log.Error().Msg(errMsg) @@ -167,7 +166,7 @@ func (h *OptlyWebhookHandler) HandleWebhook(w http.ResponseWriter, r *http.Reque return } - if err := syncer.SyncConfig(sdkKey); err != nil { + if err := dfSyncer.Sync(r.Context(), syncer.GetDatafileSyncChannel(), sdkKey); err != nil { errMsg := fmt.Sprintf("datafile synced failed. reason: %s", err.Error()) log.Error().Msg(errMsg) render.Status(r, http.StatusInternalServerError) @@ -183,46 +182,31 @@ func (h *OptlyWebhookHandler) HandleWebhook(w http.ResponseWriter, r *http.Reque func (h *OptlyWebhookHandler) StartSyncer(ctx context.Context) error { fmt.Println("================ starting syncer ===================") - redisSyncer, err := syncer.NewRedisSyncer(&zerolog.Logger{}, h.syncConfig, "") + dfSyncer, err := syncer.NewDatafileSyncer(h.syncConfig) if err != nil { return err } - client := redis.NewClient(&redis.Options{ - Addr: redisSyncer.Host, - Password: redisSyncer.Password, - DB: redisSyncer.Database, - }) - - // Subscribe to a Redis channel - pubsub := client.Subscribe(ctx, syncer.GetDatafileSyncChannel()) - logger, ok := ctx.Value(LoggerKey).(*zerolog.Logger) if !ok { logger = &zerolog.Logger{} } + dataCh, err := dfSyncer.Subscribe(ctx, syncer.GetDatafileSyncChannel()) + if err != nil { + return err + } + go func() { for { select { case <-ctx.Done(): - pubsub.Close() - client.Close() logger.Debug().Msg("context canceled, redis notification receiver is closed") return - default: - // fmt.Println("====================== waiting for message ============================") - msg, err := pubsub.ReceiveMessage(ctx) - if err != nil { - logger.Err(err).Msg("failed to receive message from redis") - continue - } - - fmt.Println("===================== message from redis: ", msg.Payload, "=========================") - logger.Info().Msg("received message from redis") - logger.Info().Msg(msg.Payload) - - h.optlyCache.UpdateConfigs(msg.Payload) + case key := <-dataCh: + fmt.Println("=========== updating config =============") + fmt.Println("for key: ", key) + h.optlyCache.UpdateConfigs(key) } } }() diff --git a/pkg/optimizely/cache.go b/pkg/optimizely/cache.go index f1401311..4816ef56 100644 --- a/pkg/optimizely/cache.go +++ b/pkg/optimizely/cache.go @@ -252,11 +252,11 @@ func defaultLoader( } if agentConf.Synchronization.Notification.Enable { - redisSyncer, err := syncer.NewRedisSyncer(&zerolog.Logger{}, agentConf.Synchronization, sdkKey) + syncedNC, err := syncer.NewSyncedNotificationCenter(context.Background(), &zerolog.Logger{}, sdkKey, agentConf.Synchronization) if err != nil { return nil, err } - clientOptions = append(clientOptions, client.WithNotificationCenter(redisSyncer)) + clientOptions = append(clientOptions, client.WithNotificationCenter(syncedNC)) } var clientUserProfileService decision.UserProfileService diff --git a/pkg/syncer/pubsub.go b/pkg/syncer/pubsub.go new file mode 100644 index 00000000..7f7e7123 --- /dev/null +++ b/pkg/syncer/pubsub.go @@ -0,0 +1,100 @@ +/**************************************************************************** + * Copyright 2023 Optimizely, Inc. and contributors * + * * + * Licensed under the Apache License, Version 2.0 (the "License"); * + * you may not use this file except in compliance with the License. * + * You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, software * + * distributed under the License is distributed on an "AS IS" BASIS, * + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * + * See the License for the specific language governing permissions and * + * limitations under the License. * + ***************************************************************************/ + +// Package syncer provides synchronization across Agent nodes +package syncer + +import ( + "context" + "errors" + + "github.com/go-redis/redis/v8" + "github.com/optimizely/agent/config" +) + +const ( + // PubSubDefaultChan will be used as default pubsub channel name + PubSubDefaultChan = "optimizely-sync" + // PubSubRedis is the name of pubsub type of Redis + PubSubRedis = "redis" +) + +type PubSub interface { + Publish(ctx context.Context, channel string, message interface{}) error + Subscribe(ctx context.Context, channel string) (chan string, error) +} + +func NewPubSub(conf config.SyncConfig) (PubSub, error) { + if conf.Notification.Default == PubSubRedis { + return &pubsubRedis{ + host: conf.Pubsub[PubSubRedis].(map[string]interface{})["host"].(string), + password: conf.Pubsub[PubSubRedis].(map[string]interface{})["password"].(string), + database: conf.Pubsub[PubSubRedis].(map[string]interface{})["database"].(int), + }, nil + } + return nil, errors.New("pubsub type not supported") +} + +type pubsubRedis struct { + host string + password string + database int +} + +func (r *pubsubRedis) Publish(ctx context.Context, channel string, message interface{}) error { + client := redis.NewClient(&redis.Options{ + Addr: r.host, + Password: r.password, + DB: r.database, + }) + defer client.Close() + + return client.Publish(ctx, channel, message).Err() +} + +func (r *pubsubRedis) Subscribe(ctx context.Context, channel string) (chan string, error) { + client := redis.NewClient(&redis.Options{ + Addr: r.host, + Password: r.password, + DB: r.database, + }) + + // Subscribe to a Redis channel + pubsub := client.Subscribe(ctx, channel) + + ch := make(chan string) + + go func() { + for { + select { + case <-ctx.Done(): + pubsub.Close() + client.Close() + close(ch) + return + default: + msg, err := pubsub.ReceiveMessage(ctx) + if err != nil { + continue + } + + ch <- msg.Payload + + } + } + }() + return ch, nil +} diff --git a/pkg/syncer/syncer.go b/pkg/syncer/syncer.go index b8c394fe..09e55f91 100644 --- a/pkg/syncer/syncer.go +++ b/pkg/syncer/syncer.go @@ -20,117 +20,69 @@ package syncer import ( "context" "encoding/json" - "errors" "fmt" "sync" - "github.com/go-redis/redis/v8" "github.com/optimizely/agent/config" "github.com/optimizely/go-sdk/pkg/notification" "github.com/rs/zerolog" ) -const ( - // PubSubDefaultChan will be used as default pubsub channel name - PubSubDefaultChan = "optimizely-sync" - // PubSubRedis is the name of pubsub type of Redis - PubSubRedis = "redis" -) - var ( - ncCache = make(map[string]*RedisSyncer) + ncCache = make(map[string]*SyncedNotificationCenter) mutexLock = &sync.Mutex{} ) -// Event holds the notification event with it's type -type Event struct { - Type notification.Type `json:"type"` - Message interface{} `json:"message"` +type NotificationSyncer interface { + notification.Center + Subscribe(ctx context.Context, channel string) (chan string, error) } // RedisSyncer defines Redis pubsub configuration -type RedisSyncer struct { - ctx context.Context - Host string - Password string - Database int - Channel string - logger *zerolog.Logger - sdkKey string +type SyncedNotificationCenter struct { + ctx context.Context + logger *zerolog.Logger + sdkKey string + pubsub PubSub } -// NewRedisSyncer returns an instance of RedisNotificationSyncer -func NewRedisSyncer(logger *zerolog.Logger, conf config.SyncConfig, sdkKey string) (*RedisSyncer, error) { - mutexLock.Lock() - defer mutexLock.Unlock() +// Event holds the notification event with it's type +type Event struct { + Type notification.Type `json:"type"` + Message interface{} `json:"message"` +} - if nc, found := ncCache[sdkKey]; found && sdkKey != "" { +func NewSyncedNotificationCenter(ctx context.Context, logger *zerolog.Logger, sdkKey string, conf config.SyncConfig) (NotificationSyncer, error) { + if nc, ok := ncCache[sdkKey]; ok { return nc, nil } - if conf.Notification.Default != PubSubRedis { - return nil, errors.New("redis syncer is not set as default") - } - if conf.Pubsub == nil { - return nil, errors.New("redis config is not given") - } - - redisConfig, found := conf.Pubsub[PubSubRedis].(map[string]interface{}) - if !found { - return nil, errors.New("redis pubsub config not found") - } - - host, ok := redisConfig["host"].(string) - if !ok { - return nil, errors.New("redis host not provided in correct format") - } - password, ok := redisConfig["password"].(string) - if !ok { - return nil, errors.New("redis password not provider in correct format") - } - database, ok := redisConfig["database"].(int) - if !ok { - return nil, errors.New("redis database not provided in correct format") - } - channel, ok := redisConfig["channel"].(string) - if !ok { - channel = PubSubDefaultChan - } - - if logger == nil { - logger = &zerolog.Logger{} + pubsub, err := NewPubSub(conf) + if err != nil { + return nil, err } - nc := &RedisSyncer{ - ctx: context.Background(), - Host: host, - Password: password, - Database: database, - Channel: channel, - logger: logger, - sdkKey: sdkKey, + nc := &SyncedNotificationCenter{ + logger: logger, + sdkKey: sdkKey, + pubsub: pubsub, } ncCache[sdkKey] = nc return nc, nil } -func (r *RedisSyncer) WithContext(ctx context.Context) *RedisSyncer { - r.ctx = ctx - return r -} - // AddHandler is empty but needed to implement notification.Center interface -func (r *RedisSyncer) AddHandler(_ notification.Type, _ func(interface{})) (int, error) { +func (r *SyncedNotificationCenter) AddHandler(_ notification.Type, _ func(interface{})) (int, error) { return 0, nil } // RemoveHandler is empty but needed to implement notification.Center interface -func (r *RedisSyncer) RemoveHandler(_ int, t notification.Type) error { +func (r *SyncedNotificationCenter) RemoveHandler(_ int, t notification.Type) error { return nil } // Send will send the notification to the specified channel in the Redis pubsub -func (r *RedisSyncer) Send(t notification.Type, n interface{}) error { +func (r *SyncedNotificationCenter) Send(t notification.Type, n interface{}) error { event := Event{ Type: t, Message: n, @@ -141,36 +93,11 @@ func (r *RedisSyncer) Send(t notification.Type, n interface{}) error { return err } - client := redis.NewClient(&redis.Options{ - Addr: r.Host, - Password: r.Password, - DB: r.Database, - }) - defer client.Close() - channel := GetChannelForSDKKey(r.Channel, r.sdkKey) - - if err := client.Publish(r.ctx, channel, jsonEvent).Err(); err != nil { - r.logger.Err(err).Msg("failed to publish json event to pub/sub") - return err - } - return nil + return r.pubsub.Publish(r.ctx, GetChannelForSDKKey(PubSubDefaultChan, r.sdkKey), jsonEvent) } -func (r *RedisSyncer) SyncConfig(sdkKey string) error { - client := redis.NewClient(&redis.Options{ - Addr: r.Host, - Password: r.Password, - DB: r.Database, - }) - defer client.Close() - channel := GetDatafileSyncChannel() - - if err := client.Publish(r.ctx, channel, sdkKey).Err(); err != nil { - r.logger.Err(err).Msg("failed to publish datafile sync event to pub/sub") - return err - } - fmt.Println("====================== published message ============================") - return nil +func (r *SyncedNotificationCenter) Subscribe(ctx context.Context, channel string) (chan string, error) { + return r.pubsub.Subscribe(ctx, channel) } func GetDatafileSyncChannel() string { @@ -180,3 +107,26 @@ func GetDatafileSyncChannel() string { func GetChannelForSDKKey(channel, key string) string { return fmt.Sprintf("%s-%s", channel, key) } + +type DatafileSyncer struct { + pubsub PubSub +} + +func NewDatafileSyncer(conf config.SyncConfig) (*DatafileSyncer, error) { + pubsub, err := NewPubSub(conf) + if err != nil { + return nil, err + } + + return &DatafileSyncer{ + pubsub: pubsub, + }, nil +} + +func (r *DatafileSyncer) Sync(ctx context.Context, channel string, sdkKey string) error { + return r.pubsub.Publish(ctx, channel, sdkKey) +} + +func (r *DatafileSyncer) Subscribe(ctx context.Context, channel string) (chan string, error) { + return r.pubsub.Subscribe(ctx, channel) +} From 5e56fb23a70dffafa689b4bba8b350f602edd8da Mon Sep 17 00:00:00 2001 From: Pulak Bhowmick Date: Wed, 22 Nov 2023 20:51:04 +0600 Subject: [PATCH 03/18] fix bug --- pkg/handlers/notification.go | 2 +- pkg/syncer/pubsub.go | 29 ++++++++++++++++++++++++++--- pkg/syncer/syncer.go | 4 ++++ 3 files changed, 31 insertions(+), 4 deletions(-) diff --git a/pkg/handlers/notification.go b/pkg/handlers/notification.go index e2ec4119..29069492 100644 --- a/pkg/handlers/notification.go +++ b/pkg/handlers/notification.go @@ -223,7 +223,7 @@ func RedisNotificationReceiver(conf config.SyncConfig) NotificationReceiverFunc return nil, err } - eventCh, err := redisSyncer.Subscribe(ctx, syncer.GetChannelForSDKKey("opti", sdkKey)) + eventCh, err := redisSyncer.Subscribe(ctx, syncer.GetChannelForSDKKey(syncer.PubSubDefaultChan, sdkKey)) if err != nil { return nil, err } diff --git a/pkg/syncer/pubsub.go b/pkg/syncer/pubsub.go index 7f7e7123..946b36ea 100644 --- a/pkg/syncer/pubsub.go +++ b/pkg/syncer/pubsub.go @@ -39,10 +39,33 @@ type PubSub interface { func NewPubSub(conf config.SyncConfig) (PubSub, error) { if conf.Notification.Default == PubSubRedis { + host, ok := conf.Pubsub[PubSubRedis].(map[string]interface{})["host"].(string) + if !ok { + return nil, errors.New("host is not valid") + } + password, ok := conf.Pubsub[PubSubRedis].(map[string]interface{})["password"].(string) + if !ok { + return nil, errors.New("password is not valid") + } + database, ok := conf.Pubsub[PubSubRedis].(map[string]interface{})["database"].(int) + if !ok { + return nil, errors.New("database is not valid") + } + + client := redis.NewClient(&redis.Options{ + Addr: host, + Password: password, + DB: database, + }) + defer client.Close() + if err := client.Ping(context.Background()).Err(); err != nil { + return nil, err + } + return &pubsubRedis{ - host: conf.Pubsub[PubSubRedis].(map[string]interface{})["host"].(string), - password: conf.Pubsub[PubSubRedis].(map[string]interface{})["password"].(string), - database: conf.Pubsub[PubSubRedis].(map[string]interface{})["database"].(int), + host: host, + password: password, + database: database, }, nil } return nil, errors.New("pubsub type not supported") diff --git a/pkg/syncer/syncer.go b/pkg/syncer/syncer.go index 09e55f91..86e21a0e 100644 --- a/pkg/syncer/syncer.go +++ b/pkg/syncer/syncer.go @@ -53,6 +53,9 @@ type Event struct { } func NewSyncedNotificationCenter(ctx context.Context, logger *zerolog.Logger, sdkKey string, conf config.SyncConfig) (NotificationSyncer, error) { + mutexLock.Lock() + defer mutexLock.Unlock() + if nc, ok := ncCache[sdkKey]; ok { return nc, nil } @@ -63,6 +66,7 @@ func NewSyncedNotificationCenter(ctx context.Context, logger *zerolog.Logger, sd } nc := &SyncedNotificationCenter{ + ctx: ctx, logger: logger, sdkKey: sdkKey, pubsub: pubsub, From 27161f5925cb9caa9eb9f1c4cbf3260902ae9c55 Mon Sep 17 00:00:00 2001 From: Pulak Bhowmick Date: Thu, 23 Nov 2023 16:40:17 +0600 Subject: [PATCH 04/18] add various fix --- config.yaml | 8 ++++-- pkg/handlers/notification_test.go | 46 +++++++++++++++---------------- pkg/handlers/webhook_test.go | 6 ++-- pkg/optimizely/cache.go | 5 ++-- pkg/routers/webhook_test.go | 5 ++-- 5 files changed, 37 insertions(+), 33 deletions(-) diff --git a/config.yaml b/config.yaml index be919ad1..a61acfbe 100644 --- a/config.yaml +++ b/config.yaml @@ -245,8 +245,6 @@ runtime: mutexProfileFraction: 0 ## synchronization should be enabled when multiple replicas of agent is deployed -## if notification synchronization is enabled, then the active notification event-stream API -## will get the notifications from multiple replicas synchronization: pubsub: redis: @@ -254,9 +252,13 @@ synchronization: password: "" database: 0 channel: "optimizely-sync" + ## if notification synchronization is enabled, then the active notification event-stream API + ## will get the notifications from available replicas notification: - enable: false + enable: true default: "redis" + ## if datafile synchronization is enabled, then for each webhook API call + ## the datafile will be sent to all available replicas to achieve better eventual consistency datafile: enable: true default: "redis" diff --git a/pkg/handlers/notification_test.go b/pkg/handlers/notification_test.go index 09235a06..0a426ffb 100644 --- a/pkg/handlers/notification_test.go +++ b/pkg/handlers/notification_test.go @@ -362,19 +362,19 @@ func TestDefaultNotificationReceiver(t *testing.T) { } func TestRedisNotificationReceiver(t *testing.T) { - conf := config.SyncConfig{ - Pubsub: map[string]interface{}{ - "redis": map[string]interface{}{ - "host": "localhost:6379", - "password": "", - "database": 0, - }, - }, - Notification: config.FeatureSyncConfig{ - Enable: true, - Default: "redis", - }, - } + // conf := config.SyncConfig{ + // Pubsub: map[string]interface{}{ + // "redis": map[string]interface{}{ + // "host": "localhost:6379", + // "password": "", + // "database": 0, + // }, + // }, + // Notification: config.FeatureSyncConfig{ + // Enable: true, + // Default: "redis", + // }, + // } type args struct { conf config.SyncConfig ctx context.Context @@ -384,16 +384,16 @@ func TestRedisNotificationReceiver(t *testing.T) { args args want NotificationReceiverFunc }{ - { - name: "Test happy path", - args: args{ - conf: conf, - ctx: context.WithValue(context.Background(), SDKKey, "random-sdk-key-1"), - }, - want: func(ctx context.Context) (<-chan syncer.Event, error) { - return make(<-chan syncer.Event), nil - }, - }, + // { + // name: "Test happy path", + // args: args{ + // conf: conf, + // ctx: context.WithValue(context.Background(), SDKKey, "random-sdk-key-1"), + // }, + // want: func(ctx context.Context) (<-chan syncer.Event, error) { + // return make(<-chan syncer.Event), nil + // }, + // }, { name: "Test empty config", args: args{ diff --git a/pkg/handlers/webhook_test.go b/pkg/handlers/webhook_test.go index 50eb81c7..026bf1c1 100644 --- a/pkg/handlers/webhook_test.go +++ b/pkg/handlers/webhook_test.go @@ -111,7 +111,7 @@ func TestHandleWebhookValidMessageInvalidSignature(t *testing.T) { Secret: "I am secret", }, } - optlyHandler := NewWebhookHandler(nil, testWebhookConfigs) + optlyHandler := NewWebhookHandler(nil, testWebhookConfigs, config.SyncConfig{}) webhookMsg := OptlyMessage{ ProjectID: 42, Timestamp: 42424242, @@ -146,7 +146,7 @@ func TestHandleWebhookSkippedCheckInvalidSignature(t *testing.T) { SkipSignatureCheck: true, }, } - optlyHandler := NewWebhookHandler(testCache, testWebhookConfigs) + optlyHandler := NewWebhookHandler(testCache, testWebhookConfigs, config.SyncConfig{}) webhookMsg := OptlyMessage{ ProjectID: 42, Timestamp: 42424242, @@ -181,7 +181,7 @@ func TestHandleWebhookValidMessage(t *testing.T) { Secret: "I am secret", }, } - optlyHandler := NewWebhookHandler(testCache, testWebhookConfigs) + optlyHandler := NewWebhookHandler(testCache, testWebhookConfigs, config.SyncConfig{}) webhookMsg := OptlyMessage{ ProjectID: 42, Timestamp: 42424242, diff --git a/pkg/optimizely/cache.go b/pkg/optimizely/cache.go index 4816ef56..b974d9fa 100644 --- a/pkg/optimizely/cache.go +++ b/pkg/optimizely/cache.go @@ -254,9 +254,10 @@ func defaultLoader( if agentConf.Synchronization.Notification.Enable { syncedNC, err := syncer.NewSyncedNotificationCenter(context.Background(), &zerolog.Logger{}, sdkKey, agentConf.Synchronization) if err != nil { - return nil, err + log.Error().Err(err).Msgf("Failed to create SyncedNotificationCenter, reason: %s", err.Error()) + } else { + clientOptions = append(clientOptions, client.WithNotificationCenter(syncedNC)) } - clientOptions = append(clientOptions, client.WithNotificationCenter(syncedNC)) } var clientUserProfileService decision.UserProfileService diff --git a/pkg/routers/webhook_test.go b/pkg/routers/webhook_test.go index 60f4daaa..ac9f3691 100644 --- a/pkg/routers/webhook_test.go +++ b/pkg/routers/webhook_test.go @@ -19,6 +19,7 @@ package routers import ( "bytes" + "context" "net/http" "net/http/httptest" "testing" @@ -29,8 +30,8 @@ import ( func TestWebhookAllowedContentTypeMiddleware(t *testing.T) { - conf := config.WebhookConfig{} - router := NewWebhookRouter(nil, conf) + conf := config.AgentConfig{} + router := NewWebhookRouter(context.Background(), nil, conf) // Testing unsupported content type body := " test@123.com " From bad2d76c3da599869220b0c0174988c7536d341d Mon Sep 17 00:00:00 2001 From: Pulak Bhowmick Date: Thu, 23 Nov 2023 17:01:45 +0600 Subject: [PATCH 05/18] refactor code --- pkg/handlers/webhook.go | 25 ++++--------------------- pkg/routers/webhook.go | 4 ++-- 2 files changed, 6 insertions(+), 23 deletions(-) diff --git a/pkg/handlers/webhook.go b/pkg/handlers/webhook.go index 2080de9e..f9c2ff4c 100644 --- a/pkg/handlers/webhook.go +++ b/pkg/handlers/webhook.go @@ -146,42 +146,27 @@ func (h *OptlyWebhookHandler) HandleWebhook(w http.ResponseWriter, r *http.Reque // Iterate through all SDK keys and update config for _, sdkKey := range webhookConfig.SDKKeys { - fmt.Println("=========== updating config =============") h.optlyCache.UpdateConfigs(sdkKey) - } - if h.syncConfig.Datafile.Enable { - log.Info().Msg("======================= Syncing datafile ============================") - for _, sdkKey := range webhookConfig.SDKKeys { - log.Info().Msg("====================== sdk key ============================") - log.Info().Msg(sdkKey) + if h.syncConfig.Datafile.Enable { dfSyncer, err := syncer.NewDatafileSyncer(h.syncConfig) if err != nil { errMsg := fmt.Sprintf("datafile synced failed. reason: %s", err.Error()) log.Error().Msg(errMsg) - render.Status(r, http.StatusInternalServerError) - render.JSON(w, r, render.M{ - "error": errMsg, - }) - return + continue } if err := dfSyncer.Sync(r.Context(), syncer.GetDatafileSyncChannel(), sdkKey); err != nil { errMsg := fmt.Sprintf("datafile synced failed. reason: %s", err.Error()) log.Error().Msg(errMsg) - render.Status(r, http.StatusInternalServerError) - render.JSON(w, r, render.M{ - "error": errMsg, - }) - return } } + } w.WriteHeader(http.StatusNoContent) } func (h *OptlyWebhookHandler) StartSyncer(ctx context.Context) error { - fmt.Println("================ starting syncer ===================") dfSyncer, err := syncer.NewDatafileSyncer(h.syncConfig) if err != nil { return err @@ -201,11 +186,9 @@ func (h *OptlyWebhookHandler) StartSyncer(ctx context.Context) error { for { select { case <-ctx.Done(): - logger.Debug().Msg("context canceled, redis notification receiver is closed") + logger.Debug().Msg("context canceled, syncer is stopped") return case key := <-dataCh: - fmt.Println("=========== updating config =============") - fmt.Println("for key: ", key) h.optlyCache.UpdateConfigs(key) } } diff --git a/pkg/routers/webhook.go b/pkg/routers/webhook.go index 3526080f..b8caf060 100644 --- a/pkg/routers/webhook.go +++ b/pkg/routers/webhook.go @@ -19,10 +19,10 @@ package routers import ( "context" - "fmt" "github.com/optimizely/agent/config" "github.com/optimizely/agent/pkg/handlers" + "github.com/rs/zerolog/log" "github.com/go-chi/chi/v5" chimw "github.com/go-chi/chi/v5/middleware" @@ -40,7 +40,7 @@ func NewWebhookRouter(ctx context.Context, optlyCache optimizely.Cache, conf con webhookAPI := handlers.NewWebhookHandler(optlyCache, conf.Webhook.Projects, conf.Synchronization) if conf.Synchronization.Datafile.Enable { if err := webhookAPI.StartSyncer(ctx); err != nil { - fmt.Errorf("failed to start datafile syncer: %s", err.Error()) + log.Error().Msgf("failed to start datafile syncer: %s", err.Error()) } } From 20758ccfd18e61c4e3a44ed0342fc6d3abfe2b92 Mon Sep 17 00:00:00 2001 From: Pulak Bhowmick Date: Thu, 23 Nov 2023 19:02:52 +0600 Subject: [PATCH 06/18] refactor code --- pkg/handlers/webhook.go | 39 ++++++++++------------ pkg/handlers/webhook_test.go | 65 ++++++++++++++++++++++++++++++++++-- pkg/routers/webhook.go | 17 +++++++++- pkg/syncer/syncer.go | 5 +++ 4 files changed, 100 insertions(+), 26 deletions(-) diff --git a/pkg/handlers/webhook.go b/pkg/handlers/webhook.go index f9c2ff4c..0cadce79 100644 --- a/pkg/handlers/webhook.go +++ b/pkg/handlers/webhook.go @@ -60,17 +60,19 @@ type OptlyMessage struct { // OptlyWebhookHandler handles incoming messages from Optimizely type OptlyWebhookHandler struct { - optlyCache optimizely.Cache - ProjectMap map[int64]config.WebhookProject - syncConfig config.SyncConfig + optlyCache optimizely.Cache + ProjectMap map[int64]config.WebhookProject + configSyncer syncer.Syncer + syncEnabled bool } // NewWebhookHandler returns a new instance of OptlyWebhookHandler -func NewWebhookHandler(optlyCache optimizely.Cache, projectMap map[int64]config.WebhookProject, conf config.SyncConfig) *OptlyWebhookHandler { +func NewWebhookHandler(optlyCache optimizely.Cache, projectMap map[int64]config.WebhookProject, configSyncer syncer.Syncer) *OptlyWebhookHandler { return &OptlyWebhookHandler{ - optlyCache: optlyCache, - ProjectMap: projectMap, - syncConfig: conf, + optlyCache: optlyCache, + ProjectMap: projectMap, + syncEnabled: configSyncer != nil, + configSyncer: configSyncer, } } @@ -148,15 +150,8 @@ func (h *OptlyWebhookHandler) HandleWebhook(w http.ResponseWriter, r *http.Reque for _, sdkKey := range webhookConfig.SDKKeys { h.optlyCache.UpdateConfigs(sdkKey) - if h.syncConfig.Datafile.Enable { - dfSyncer, err := syncer.NewDatafileSyncer(h.syncConfig) - if err != nil { - errMsg := fmt.Sprintf("datafile synced failed. reason: %s", err.Error()) - log.Error().Msg(errMsg) - continue - } - - if err := dfSyncer.Sync(r.Context(), syncer.GetDatafileSyncChannel(), sdkKey); err != nil { + if h.syncEnabled { + if err := h.configSyncer.Sync(r.Context(), syncer.GetDatafileSyncChannel(), sdkKey); err != nil { errMsg := fmt.Sprintf("datafile synced failed. reason: %s", err.Error()) log.Error().Msg(errMsg) } @@ -167,17 +162,17 @@ func (h *OptlyWebhookHandler) HandleWebhook(w http.ResponseWriter, r *http.Reque } func (h *OptlyWebhookHandler) StartSyncer(ctx context.Context) error { - dfSyncer, err := syncer.NewDatafileSyncer(h.syncConfig) - if err != nil { - return err - } - logger, ok := ctx.Value(LoggerKey).(*zerolog.Logger) if !ok { logger = &zerolog.Logger{} } - dataCh, err := dfSyncer.Subscribe(ctx, syncer.GetDatafileSyncChannel()) + if !h.syncEnabled { + logger.Debug().Msg("datafile syncer is not enabled") + return nil + } + + dataCh, err := h.configSyncer.Subscribe(ctx, syncer.GetDatafileSyncChannel()) if err != nil { return err } diff --git a/pkg/handlers/webhook_test.go b/pkg/handlers/webhook_test.go index 026bf1c1..c75f0508 100644 --- a/pkg/handlers/webhook_test.go +++ b/pkg/handlers/webhook_test.go @@ -19,6 +19,7 @@ package handlers import ( "bytes" + "context" "encoding/json" "net/http" "net/http/httptest" @@ -46,6 +47,25 @@ func NewCache() *TestCache { } } +type TestDFSyncer struct { + syncCalled bool +} + +func NewTestDFSyncer() *TestDFSyncer { + return &TestDFSyncer{ + syncCalled: false, + } +} + +func (t *TestDFSyncer) Sync(_ context.Context, _ string, _ string) error { + t.syncCalled = true + return nil +} + +func (t *TestDFSyncer) Subscribe(_ context.Context, _ string) (chan string, error) { + return make(chan string), nil +} + // GetClient returns a default OptlyClient for testing func (tc *TestCache) GetClient(sdkKey string) (*optimizely.OptlyClient, error) { return &optimizely.OptlyClient{ @@ -111,7 +131,7 @@ func TestHandleWebhookValidMessageInvalidSignature(t *testing.T) { Secret: "I am secret", }, } - optlyHandler := NewWebhookHandler(nil, testWebhookConfigs, config.SyncConfig{}) + optlyHandler := NewWebhookHandler(nil, testWebhookConfigs, nil) webhookMsg := OptlyMessage{ ProjectID: 42, Timestamp: 42424242, @@ -146,7 +166,7 @@ func TestHandleWebhookSkippedCheckInvalidSignature(t *testing.T) { SkipSignatureCheck: true, }, } - optlyHandler := NewWebhookHandler(testCache, testWebhookConfigs, config.SyncConfig{}) + optlyHandler := NewWebhookHandler(testCache, testWebhookConfigs, nil) webhookMsg := OptlyMessage{ ProjectID: 42, Timestamp: 42424242, @@ -181,7 +201,45 @@ func TestHandleWebhookValidMessage(t *testing.T) { Secret: "I am secret", }, } - optlyHandler := NewWebhookHandler(testCache, testWebhookConfigs, config.SyncConfig{}) + optlyHandler := NewWebhookHandler(testCache, testWebhookConfigs, nil) + webhookMsg := OptlyMessage{ + ProjectID: 42, + Timestamp: 42424242, + Event: "project.datafile_updated", + Data: DatafileUpdateData{ + Revision: 101, + OriginURL: "origin.optimizely.com/datafiles/myDatafile", + CDNUrl: "cdn.optimizely.com/datafiles/myDatafile", + Environment: "Production", + }, + } + + validWebhookMessage, _ := json.Marshal(webhookMsg) + + req := httptest.NewRequest("POST", "/webhooks/optimizely", bytes.NewBuffer(validWebhookMessage)) + + // This sha1 has been computed from the Optimizely application + req.Header.Set(signatureHeader, "sha1=e0199de63fb7192634f52136d4ceb7dc6f191da3") + + rec := httptest.NewRecorder() + handler := http.HandlerFunc(optlyHandler.HandleWebhook) + handler.ServeHTTP(rec, req) + + assert.Equal(t, http.StatusNoContent, rec.Code) + assert.Equal(t, true, testCache.updateConfigsCalled) +} + +func TestHandleWebhookWithDatafileSyncer(t *testing.T) { + testCache := NewCache() + var testWebhookConfigs = map[int64]config.WebhookProject{ + 42: { + SDKKeys: []string{"myDatafile"}, + Secret: "I am secret", + }, + } + syncer := NewTestDFSyncer() + + optlyHandler := NewWebhookHandler(testCache, testWebhookConfigs, syncer) webhookMsg := OptlyMessage{ ProjectID: 42, Timestamp: 42424242, @@ -207,4 +265,5 @@ func TestHandleWebhookValidMessage(t *testing.T) { assert.Equal(t, http.StatusNoContent, rec.Code) assert.Equal(t, true, testCache.updateConfigsCalled) + assert.Equal(t, true, syncer.syncCalled) } diff --git a/pkg/routers/webhook.go b/pkg/routers/webhook.go index b8caf060..50bb98ce 100644 --- a/pkg/routers/webhook.go +++ b/pkg/routers/webhook.go @@ -19,9 +19,11 @@ package routers import ( "context" + "fmt" "github.com/optimizely/agent/config" "github.com/optimizely/agent/pkg/handlers" + "github.com/optimizely/agent/pkg/syncer" "github.com/rs/zerolog/log" "github.com/go-chi/chi/v5" @@ -37,7 +39,20 @@ func NewWebhookRouter(ctx context.Context, optlyCache optimizely.Cache, conf con r.Use(chimw.AllowContentType("application/json")) r.Use(render.SetContentType(render.ContentTypeJSON)) - webhookAPI := handlers.NewWebhookHandler(optlyCache, conf.Webhook.Projects, conf.Synchronization) + + var dfSyncer syncer.Syncer + + if conf.Synchronization.Datafile.Enable { + sc, err := syncer.NewDatafileSyncer(conf.Synchronization) + if err != nil { + errMsg := fmt.Sprintf("datafile synced failed. reason: %s", err.Error()) + log.Error().Msg(errMsg) + } else { + dfSyncer = sc + } + } + + webhookAPI := handlers.NewWebhookHandler(optlyCache, conf.Webhook.Projects, dfSyncer) if conf.Synchronization.Datafile.Enable { if err := webhookAPI.StartSyncer(ctx); err != nil { log.Error().Msgf("failed to start datafile syncer: %s", err.Error()) diff --git a/pkg/syncer/syncer.go b/pkg/syncer/syncer.go index 86e21a0e..8fbd9f66 100644 --- a/pkg/syncer/syncer.go +++ b/pkg/syncer/syncer.go @@ -38,6 +38,11 @@ type NotificationSyncer interface { Subscribe(ctx context.Context, channel string) (chan string, error) } +type Syncer interface { + Sync(ctx context.Context, channel string, sdkKey string) error + Subscribe(ctx context.Context, channel string) (chan string, error) +} + // RedisSyncer defines Redis pubsub configuration type SyncedNotificationCenter struct { ctx context.Context From 2204260fd98951e1faea5b271aed3573ac2d021b Mon Sep 17 00:00:00 2001 From: Pulak Bhowmick Date: Thu, 23 Nov 2023 19:44:02 +0600 Subject: [PATCH 07/18] add unit test --- pkg/handlers/webhook_test.go | 22 ++++++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/pkg/handlers/webhook_test.go b/pkg/handlers/webhook_test.go index c75f0508..0d01ed14 100644 --- a/pkg/handlers/webhook_test.go +++ b/pkg/handlers/webhook_test.go @@ -48,12 +48,14 @@ func NewCache() *TestCache { } type TestDFSyncer struct { - syncCalled bool + syncCalled bool + subscribeCalled bool } func NewTestDFSyncer() *TestDFSyncer { return &TestDFSyncer{ - syncCalled: false, + syncCalled: false, + subscribeCalled: false, } } @@ -63,6 +65,7 @@ func (t *TestDFSyncer) Sync(_ context.Context, _ string, _ string) error { } func (t *TestDFSyncer) Subscribe(_ context.Context, _ string) (chan string, error) { + t.subscribeCalled = true return make(chan string), nil } @@ -267,3 +270,18 @@ func TestHandleWebhookWithDatafileSyncer(t *testing.T) { assert.Equal(t, true, testCache.updateConfigsCalled) assert.Equal(t, true, syncer.syncCalled) } + +func TestWebhookStartSyncer(t *testing.T) { + var testWebhookConfigs = map[int64]config.WebhookProject{ + 42: { + SDKKeys: []string{"myDatafile"}, + Secret: "I am secret", + }, + } + syncer := NewTestDFSyncer() + + optlyHandler := NewWebhookHandler(nil, testWebhookConfigs, syncer) + err := optlyHandler.StartSyncer(context.Background()) + assert.NoError(t, err) + assert.Equal(t, true, syncer.subscribeCalled) +} From 12773d1bb6ef32fb19b46f2799ad1d01eed73938 Mon Sep 17 00:00:00 2001 From: Pulak Bhowmick Date: Thu, 23 Nov 2023 23:27:40 +0600 Subject: [PATCH 08/18] improve logging --- cmd/optimizely/main.go | 3 +++ pkg/handlers/webhook.go | 4 +++- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/cmd/optimizely/main.go b/cmd/optimizely/main.go index c99d4932..2ea00f88 100644 --- a/cmd/optimizely/main.go +++ b/cmd/optimizely/main.go @@ -33,6 +33,7 @@ import ( "github.com/spf13/viper" "github.com/optimizely/agent/config" + "github.com/optimizely/agent/pkg/handlers" "github.com/optimizely/agent/pkg/metrics" "github.com/optimizely/agent/pkg/optimizely" "github.com/optimizely/agent/pkg/routers" @@ -267,6 +268,8 @@ func main() { ctx, cancel := context.WithCancel(context.Background()) // Create default service context defer cancel() + ctx = context.WithValue(ctx, handlers.LoggerKey, &log.Logger) + sg := server.NewGroup(ctx, conf.Server) // Create a new server group to manage the individual http listeners optlyCache := optimizely.NewCache(ctx, *conf, sdkMetricsRegistry) optlyCache.Init(conf.SDKKeys) diff --git a/pkg/handlers/webhook.go b/pkg/handlers/webhook.go index 0cadce79..48728db5 100644 --- a/pkg/handlers/webhook.go +++ b/pkg/handlers/webhook.go @@ -164,7 +164,7 @@ func (h *OptlyWebhookHandler) HandleWebhook(w http.ResponseWriter, r *http.Reque func (h *OptlyWebhookHandler) StartSyncer(ctx context.Context) error { logger, ok := ctx.Value(LoggerKey).(*zerolog.Logger) if !ok { - logger = &zerolog.Logger{} + logger = &log.Logger } if !h.syncEnabled { @@ -185,8 +185,10 @@ func (h *OptlyWebhookHandler) StartSyncer(ctx context.Context) error { return case key := <-dataCh: h.optlyCache.UpdateConfigs(key) + logger.Debug().Msg("datafile synced successfully") } } }() + logger.Debug().Msg("datafile syncer is started") return nil } From 0ebd517906174f72e839cea1a3c18a9f0aad266c Mon Sep 17 00:00:00 2001 From: Pulak Bhowmick Date: Fri, 24 Nov 2023 16:55:04 +0600 Subject: [PATCH 09/18] add unit test for pubsub --- pkg/syncer/pubsub.go | 42 +++++++---- pkg/syncer/pubsub_test.go | 142 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 170 insertions(+), 14 deletions(-) create mode 100644 pkg/syncer/pubsub_test.go diff --git a/pkg/syncer/pubsub.go b/pkg/syncer/pubsub.go index 946b36ea..27787b97 100644 --- a/pkg/syncer/pubsub.go +++ b/pkg/syncer/pubsub.go @@ -39,27 +39,41 @@ type PubSub interface { func NewPubSub(conf config.SyncConfig) (PubSub, error) { if conf.Notification.Default == PubSubRedis { - host, ok := conf.Pubsub[PubSubRedis].(map[string]interface{})["host"].(string) + pubsubConf, found := conf.Pubsub[PubSubRedis] + if !found { + return nil, errors.New("pubsub redis config not found") + } + + redisConf, ok := pubsubConf.(map[string]interface{}) if !ok { - return nil, errors.New("host is not valid") + return nil, errors.New("pubsub redis config not valid") + } + + hostVal, found := redisConf["host"] + if !found { + return nil, errors.New("pubsub redis host not found") } - password, ok := conf.Pubsub[PubSubRedis].(map[string]interface{})["password"].(string) + host, ok := hostVal.(string) if !ok { - return nil, errors.New("password is not valid") + return nil, errors.New("pubsub redis host not valid, host must be string") } - database, ok := conf.Pubsub[PubSubRedis].(map[string]interface{})["database"].(int) + + passwordVal, found := redisConf["password"] + if !found { + return nil, errors.New("pubsub redis password not found") + } + password, ok := passwordVal.(string) if !ok { - return nil, errors.New("database is not valid") + return nil, errors.New("pubsub redis password not valid, password must be string") } - client := redis.NewClient(&redis.Options{ - Addr: host, - Password: password, - DB: database, - }) - defer client.Close() - if err := client.Ping(context.Background()).Err(); err != nil { - return nil, err + databaseVal, found := redisConf["database"] + if !found { + return nil, errors.New("pubsub redis database not found") + } + database, ok := databaseVal.(int) + if !ok { + return nil, errors.New("pubsub redis database not valid, database must be int") } return &pubsubRedis{ diff --git a/pkg/syncer/pubsub_test.go b/pkg/syncer/pubsub_test.go new file mode 100644 index 00000000..a540af3d --- /dev/null +++ b/pkg/syncer/pubsub_test.go @@ -0,0 +1,142 @@ +/**************************************************************************** + * Copyright 2023 Optimizely, Inc. and contributors * + * * + * Licensed under the Apache License, Version 2.0 (the "License"); * + * you may not use this file except in compliance with the License. * + * You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, software * + * distributed under the License is distributed on an "AS IS" BASIS, * + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * + * See the License for the specific language governing permissions and * + * limitations under the License. * + ***************************************************************************/ + +// Package syncer provides synchronization across Agent nodes +package syncer + +import ( + "reflect" + "testing" + + "github.com/optimizely/agent/config" +) + +func TestNewPubSub(t *testing.T) { + type args struct { + conf config.SyncConfig + } + tests := []struct { + name string + args args + want PubSub + wantErr bool + }{ + { + name: "Test with valid config", + args: args{ + conf: config.SyncConfig{ + Pubsub: map[string]interface{}{ + "redis": map[string]interface{}{ + "host": "localhost:6379", + "password": "", + "database": 0, + }, + }, + Notification: config.FeatureSyncConfig{ + Default: "redis", + Enable: true, + }, + }, + }, + want: &pubsubRedis{ + host: "localhost:6379", + password: "", + database: 0, + }, + wantErr: false, + }, + { + name: "Test with invalid config", + args: args{ + conf: config.SyncConfig{ + Pubsub: map[string]interface{}{ + "nopt-redis": map[string]interface{}{}, + }, + Notification: config.FeatureSyncConfig{ + Default: "redis", + Enable: true, + }, + }, + }, + want: nil, + wantErr: true, + }, + { + name: "Test with nil config", + args: args{ + conf: config.SyncConfig{ + Pubsub: map[string]interface{}{ + "redis": nil, + }, + Notification: config.FeatureSyncConfig{ + Default: "redis", + Enable: true, + }, + }, + }, + want: nil, + wantErr: true, + }, + { + name: "Test with empty config", + args: args{ + conf: config.SyncConfig{ + Pubsub: map[string]interface{}{ + "redis": nil, + }, + Notification: config.FeatureSyncConfig{ + Default: "redis", + Enable: true, + }, + }, + }, + want: nil, + wantErr: true, + }, + { + name: "Test with invalid redis config", + args: args{ + conf: config.SyncConfig{ + Pubsub: map[string]interface{}{ + "redis": map[string]interface{}{ + "host": 123, + "password": "", + "database": "invalid-db", + }, + }, + Notification: config.FeatureSyncConfig{ + Default: "redis", + Enable: true, + }, + }, + }, + want: nil, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := NewPubSub(tt.args.conf) + if (err != nil) != tt.wantErr { + t.Errorf("NewPubSub() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("NewPubSub() = %v, want %v", got, tt.want) + } + }) + } +} From a6eee8b1aab7f779259646ed4c6ed0b8892f6cf5 Mon Sep 17 00:00:00 2001 From: Pulak Bhowmick Date: Fri, 24 Nov 2023 19:01:48 +0600 Subject: [PATCH 10/18] add unit test --- pkg/handlers/notification.go | 2 +- pkg/optimizely/cache.go | 3 +- pkg/syncer/pubsub.go | 2 +- pkg/syncer/pubsub_test.go | 2 +- pkg/syncer/syncer.go | 16 ++++- pkg/syncer/syncer_test.go | 115 +++++++++++++++++++++++++++++++++++ 6 files changed, 132 insertions(+), 8 deletions(-) create mode 100644 pkg/syncer/syncer_test.go diff --git a/pkg/handlers/notification.go b/pkg/handlers/notification.go index 29069492..b913fce5 100644 --- a/pkg/handlers/notification.go +++ b/pkg/handlers/notification.go @@ -218,7 +218,7 @@ func RedisNotificationReceiver(conf config.SyncConfig) NotificationReceiverFunc return nil, errors.New("sdk key not found") } - redisSyncer, err := syncer.NewSyncedNotificationCenter(ctx, &zerolog.Logger{}, sdkKey, conf) + redisSyncer, err := syncer.NewSyncedNotificationCenter(ctx, sdkKey, conf) if err != nil { return nil, err } diff --git a/pkg/optimizely/cache.go b/pkg/optimizely/cache.go index b974d9fa..cef2f8f7 100644 --- a/pkg/optimizely/cache.go +++ b/pkg/optimizely/cache.go @@ -41,7 +41,6 @@ import ( odpCachePkg "github.com/optimizely/go-sdk/pkg/odp/cache" cmap "github.com/orcaman/concurrent-map" - "github.com/rs/zerolog" "github.com/rs/zerolog/log" ) @@ -252,7 +251,7 @@ func defaultLoader( } if agentConf.Synchronization.Notification.Enable { - syncedNC, err := syncer.NewSyncedNotificationCenter(context.Background(), &zerolog.Logger{}, sdkKey, agentConf.Synchronization) + syncedNC, err := syncer.NewSyncedNotificationCenter(context.Background(), sdkKey, agentConf.Synchronization) if err != nil { log.Error().Err(err).Msgf("Failed to create SyncedNotificationCenter, reason: %s", err.Error()) } else { diff --git a/pkg/syncer/pubsub.go b/pkg/syncer/pubsub.go index 27787b97..29d3432a 100644 --- a/pkg/syncer/pubsub.go +++ b/pkg/syncer/pubsub.go @@ -37,7 +37,7 @@ type PubSub interface { Subscribe(ctx context.Context, channel string) (chan string, error) } -func NewPubSub(conf config.SyncConfig) (PubSub, error) { +func newPubSub(conf config.SyncConfig) (PubSub, error) { if conf.Notification.Default == PubSubRedis { pubsubConf, found := conf.Pubsub[PubSubRedis] if !found { diff --git a/pkg/syncer/pubsub_test.go b/pkg/syncer/pubsub_test.go index a540af3d..a222da84 100644 --- a/pkg/syncer/pubsub_test.go +++ b/pkg/syncer/pubsub_test.go @@ -129,7 +129,7 @@ func TestNewPubSub(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got, err := NewPubSub(tt.args.conf) + got, err := newPubSub(tt.args.conf) if (err != nil) != tt.wantErr { t.Errorf("NewPubSub() error = %v, wantErr %v", err, tt.wantErr) return diff --git a/pkg/syncer/syncer.go b/pkg/syncer/syncer.go index 8fbd9f66..340ab474 100644 --- a/pkg/syncer/syncer.go +++ b/pkg/syncer/syncer.go @@ -26,6 +26,11 @@ import ( "github.com/optimizely/agent/config" "github.com/optimizely/go-sdk/pkg/notification" "github.com/rs/zerolog" + "github.com/rs/zerolog/log" +) + +const ( + LoggerCtxKey = "syncer-logger" ) var ( @@ -57,7 +62,7 @@ type Event struct { Message interface{} `json:"message"` } -func NewSyncedNotificationCenter(ctx context.Context, logger *zerolog.Logger, sdkKey string, conf config.SyncConfig) (NotificationSyncer, error) { +func NewSyncedNotificationCenter(ctx context.Context, sdkKey string, conf config.SyncConfig) (NotificationSyncer, error) { mutexLock.Lock() defer mutexLock.Unlock() @@ -65,7 +70,12 @@ func NewSyncedNotificationCenter(ctx context.Context, logger *zerolog.Logger, sd return nc, nil } - pubsub, err := NewPubSub(conf) + logger, ok := ctx.Value(LoggerCtxKey).(*zerolog.Logger) + if !ok { + logger = &log.Logger + } + + pubsub, err := newPubSub(conf) if err != nil { return nil, err } @@ -122,7 +132,7 @@ type DatafileSyncer struct { } func NewDatafileSyncer(conf config.SyncConfig) (*DatafileSyncer, error) { - pubsub, err := NewPubSub(conf) + pubsub, err := newPubSub(conf) if err != nil { return nil, err } diff --git a/pkg/syncer/syncer_test.go b/pkg/syncer/syncer_test.go new file mode 100644 index 00000000..0f7ce9ba --- /dev/null +++ b/pkg/syncer/syncer_test.go @@ -0,0 +1,115 @@ +/**************************************************************************** + * Copyright 2023 Optimizely, Inc. and contributors * + * * + * Licensed under the Apache License, Version 2.0 (the "License"); * + * you may not use this file except in compliance with the License. * + * You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, software * + * distributed under the License is distributed on an "AS IS" BASIS, * + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * + * See the License for the specific language governing permissions and * + * limitations under the License. * + ***************************************************************************/ + +// Package syncer provides synchronization across Agent nodes +package syncer + +import ( + "context" + "reflect" + "testing" + + "github.com/optimizely/agent/config" + "github.com/rs/zerolog/log" +) + +func TestNewSyncedNotificationCenter(t *testing.T) { + type args struct { + ctx context.Context + sdkKey string + conf config.SyncConfig + } + tests := []struct { + name string + args args + want NotificationSyncer + wantErr bool + }{ + { + name: "Test with valid config", + args: args{ + ctx: context.Background(), + sdkKey: "123", + conf: config.SyncConfig{ + Pubsub: map[string]interface{}{ + "redis": map[string]interface{}{ + "host": "localhost:6379", + "password": "", + "database": 0, + }, + }, + Notification: config.FeatureSyncConfig{ + Default: "redis", + Enable: true, + }, + }, + }, + want: &SyncedNotificationCenter{ + ctx: context.Background(), + logger: &log.Logger, + sdkKey: "123", + pubsub: &pubsubRedis{ + host: "localhost:6379", + password: "", + database: 0, + }, + }, + wantErr: false, + }, + { + name: "Test with invalid sync config", + args: args{ + ctx: context.Background(), + sdkKey: "1234", + conf: config.SyncConfig{ + Pubsub: map[string]interface{}{ + "not-redis": map[string]interface{}{ + "host": "invalid host", + }, + }, + Notification: config.FeatureSyncConfig{ + Default: "redis", + Enable: true, + }, + }, + }, + want: nil, + wantErr: true, + }, + { + name: "Test with empty sync config", + args: args{ + ctx: context.Background(), + sdkKey: "1234", + conf: config.SyncConfig{}, + }, + want: nil, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := NewSyncedNotificationCenter(tt.args.ctx, tt.args.sdkKey, tt.args.conf) + if (err != nil) != tt.wantErr { + t.Errorf("NewSyncedNotificationCenter() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("NewSyncedNotificationCenter() = %v, want %v", got, tt.want) + } + }) + } +} From da5782eb7d7bbc4796776ceac5875eabb1eb7aab Mon Sep 17 00:00:00 2001 From: Pulak Bhowmick Date: Fri, 24 Nov 2023 19:03:20 +0600 Subject: [PATCH 11/18] add unit test --- pkg/handlers/notification_test.go | 46 +++++++++++++++---------------- 1 file changed, 23 insertions(+), 23 deletions(-) diff --git a/pkg/handlers/notification_test.go b/pkg/handlers/notification_test.go index 0a426ffb..09235a06 100644 --- a/pkg/handlers/notification_test.go +++ b/pkg/handlers/notification_test.go @@ -362,19 +362,19 @@ func TestDefaultNotificationReceiver(t *testing.T) { } func TestRedisNotificationReceiver(t *testing.T) { - // conf := config.SyncConfig{ - // Pubsub: map[string]interface{}{ - // "redis": map[string]interface{}{ - // "host": "localhost:6379", - // "password": "", - // "database": 0, - // }, - // }, - // Notification: config.FeatureSyncConfig{ - // Enable: true, - // Default: "redis", - // }, - // } + conf := config.SyncConfig{ + Pubsub: map[string]interface{}{ + "redis": map[string]interface{}{ + "host": "localhost:6379", + "password": "", + "database": 0, + }, + }, + Notification: config.FeatureSyncConfig{ + Enable: true, + Default: "redis", + }, + } type args struct { conf config.SyncConfig ctx context.Context @@ -384,16 +384,16 @@ func TestRedisNotificationReceiver(t *testing.T) { args args want NotificationReceiverFunc }{ - // { - // name: "Test happy path", - // args: args{ - // conf: conf, - // ctx: context.WithValue(context.Background(), SDKKey, "random-sdk-key-1"), - // }, - // want: func(ctx context.Context) (<-chan syncer.Event, error) { - // return make(<-chan syncer.Event), nil - // }, - // }, + { + name: "Test happy path", + args: args{ + conf: conf, + ctx: context.WithValue(context.Background(), SDKKey, "random-sdk-key-1"), + }, + want: func(ctx context.Context) (<-chan syncer.Event, error) { + return make(<-chan syncer.Event), nil + }, + }, { name: "Test empty config", args: args{ From 6c9b0a17ace054202e68be14abc13ac5ff9dfc79 Mon Sep 17 00:00:00 2001 From: Pulak Bhowmick Date: Fri, 24 Nov 2023 20:41:47 +0600 Subject: [PATCH 12/18] add unit test --- pkg/syncer/syncer.go | 2 +- pkg/syncer/syncer_test.go | 168 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 169 insertions(+), 1 deletion(-) diff --git a/pkg/syncer/syncer.go b/pkg/syncer/syncer.go index 340ab474..17bca6aa 100644 --- a/pkg/syncer/syncer.go +++ b/pkg/syncer/syncer.go @@ -34,7 +34,7 @@ const ( ) var ( - ncCache = make(map[string]*SyncedNotificationCenter) + ncCache = make(map[string]NotificationSyncer) mutexLock = &sync.Mutex{} ) diff --git a/pkg/syncer/syncer_test.go b/pkg/syncer/syncer_test.go index 0f7ce9ba..c5812710 100644 --- a/pkg/syncer/syncer_test.go +++ b/pkg/syncer/syncer_test.go @@ -24,8 +24,24 @@ import ( "github.com/optimizely/agent/config" "github.com/rs/zerolog/log" + "github.com/stretchr/testify/assert" ) +type testPubSub struct { + publishCalled bool + subscribeCalled bool +} + +func (r *testPubSub) Publish(ctx context.Context, channel string, message interface{}) error { + r.publishCalled = true + return nil +} + +func (r *testPubSub) Subscribe(ctx context.Context, channel string) (chan string, error) { + r.subscribeCalled = true + return nil, nil +} + func TestNewSyncedNotificationCenter(t *testing.T) { type args struct { ctx context.Context @@ -113,3 +129,155 @@ func TestNewSyncedNotificationCenter(t *testing.T) { }) } } + +func TestNewDatafileSyncer(t *testing.T) { + type args struct { + conf config.SyncConfig + } + tests := []struct { + name string + args args + want *DatafileSyncer + wantErr bool + }{ + { + name: "Test with valid config", + args: args{ + conf: config.SyncConfig{ + Pubsub: map[string]interface{}{ + "redis": map[string]interface{}{ + "host": "localhost:6379", + "password": "", + "database": 0, + }, + }, + Notification: config.FeatureSyncConfig{ + Default: "redis", + Enable: true, + }, + }, + }, + want: &DatafileSyncer{ + pubsub: &pubsubRedis{ + host: "localhost:6379", + password: "", + database: 0, + }, + }, + wantErr: false, + }, + { + name: "Test with invalid sync config", + args: args{ + conf: config.SyncConfig{ + Pubsub: map[string]interface{}{ + "not-redis": map[string]interface{}{ + "host": "invalid host", + }, + }, + Notification: config.FeatureSyncConfig{ + Default: "redis", + Enable: true, + }, + }, + }, + want: nil, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := NewDatafileSyncer(tt.args.conf) + if (err != nil) != tt.wantErr { + t.Errorf("NewDatafileSyncer() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("NewDatafileSyncer() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestDatafileSyncer_Sync(t *testing.T) { + type fields struct { + pubsub PubSub + } + type args struct { + ctx context.Context + channel string + sdkKey string + } + tests := []struct { + name string + fields fields + args args + wantErr bool + }{ + { + name: "Test datafile sync", + fields: fields{ + pubsub: &testPubSub{}, + }, + args: args{ + ctx: context.Background(), + channel: "test-ch", + sdkKey: "123", + }, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + r := &DatafileSyncer{ + pubsub: tt.fields.pubsub, + } + if err := r.Sync(tt.args.ctx, tt.args.channel, tt.args.sdkKey); (err != nil) != tt.wantErr { + t.Errorf("DatafileSyncer.Sync() error = %v, wantErr %v", err, tt.wantErr) + } + + assert.True(t, tt.fields.pubsub.(*testPubSub).publishCalled) + }) + } +} + +func TestDatafileSyncer_Subscribe(t *testing.T) { + type fields struct { + pubsub PubSub + } + type args struct { + ctx context.Context + channel string + } + tests := []struct { + name string + fields fields + args args + wantErr bool + }{ + { + name: "Test datafile sync", + fields: fields{ + pubsub: &testPubSub{}, + }, + args: args{ + ctx: context.Background(), + channel: "test-ch", + }, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + r := &DatafileSyncer{ + pubsub: tt.fields.pubsub, + } + _, err := r.Subscribe(tt.args.ctx, tt.args.channel) + if (err != nil) != tt.wantErr { + t.Errorf("DatafileSyncer.Subscribe() error = %v, wantErr %v", err, tt.wantErr) + return + } + assert.True(t, tt.fields.pubsub.(*testPubSub).subscribeCalled) + }) + } +} From 1a86e94adf39834bfa5dce1c5852be9b621a79e2 Mon Sep 17 00:00:00 2001 From: Pulak Bhowmick Date: Fri, 24 Nov 2023 21:40:51 +0600 Subject: [PATCH 13/18] add unit test --- pkg/syncer/pubsub_test.go | 97 ++++++++++++++++++++++++++++++++++++ pkg/syncer/syncer_test.go | 102 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 199 insertions(+) diff --git a/pkg/syncer/pubsub_test.go b/pkg/syncer/pubsub_test.go index a222da84..c45cd295 100644 --- a/pkg/syncer/pubsub_test.go +++ b/pkg/syncer/pubsub_test.go @@ -126,6 +126,103 @@ func TestNewPubSub(t *testing.T) { want: nil, wantErr: true, }, + { + name: "Test with invalid redis config without host", + args: args{ + conf: config.SyncConfig{ + Pubsub: map[string]interface{}{ + "redis": map[string]interface{}{ + "password": "", + "database": 0, + }, + }, + Notification: config.FeatureSyncConfig{ + Default: "redis", + Enable: true, + }, + }, + }, + want: nil, + wantErr: true, + }, + { + name: "Test with invalid redis config without password", + args: args{ + conf: config.SyncConfig{ + Pubsub: map[string]interface{}{ + "redis": map[string]interface{}{ + "host": "localhost:6379", + "database": 0, + }, + }, + Notification: config.FeatureSyncConfig{ + Default: "redis", + Enable: true, + }, + }, + }, + want: nil, + wantErr: true, + }, + { + name: "Test with invalid redis config without db", + args: args{ + conf: config.SyncConfig{ + Pubsub: map[string]interface{}{ + "redis": map[string]interface{}{ + "host": "localhost:6379", + "password": "", + }, + }, + Notification: config.FeatureSyncConfig{ + Default: "redis", + Enable: true, + }, + }, + }, + want: nil, + wantErr: true, + }, + { + name: "Test with invalid redis config with invalid password", + args: args{ + conf: config.SyncConfig{ + Pubsub: map[string]interface{}{ + "redis": map[string]interface{}{ + "host": "localhost:6379", + "password": 1234, + "database": 0, + }, + }, + Notification: config.FeatureSyncConfig{ + Default: "redis", + Enable: true, + }, + }, + }, + want: nil, + wantErr: true, + }, + { + name: "Test with invalid redis config with invalid database", + args: args{ + conf: config.SyncConfig{ + Pubsub: map[string]interface{}{ + "redis": map[string]interface{}{ + "host": "localhost:6379", + "password": "", + "database": "invalid-db", + }, + }, + Notification: config.FeatureSyncConfig{ + Default: "redis", + Enable: true, + }, + }, + }, + want: nil, + wantErr: true, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/pkg/syncer/syncer_test.go b/pkg/syncer/syncer_test.go index c5812710..67d89d7b 100644 --- a/pkg/syncer/syncer_test.go +++ b/pkg/syncer/syncer_test.go @@ -23,6 +23,8 @@ import ( "testing" "github.com/optimizely/agent/config" + "github.com/optimizely/go-sdk/pkg/notification" + "github.com/rs/zerolog" "github.com/rs/zerolog/log" "github.com/stretchr/testify/assert" ) @@ -281,3 +283,103 @@ func TestDatafileSyncer_Subscribe(t *testing.T) { }) } } + +func TestSyncedNotificationCenter_Send(t *testing.T) { + type fields struct { + ctx context.Context + logger *zerolog.Logger + sdkKey string + pubsub PubSub + } + type args struct { + t notification.Type + n interface{} + } + tests := []struct { + name string + fields fields + args args + wantErr bool + }{ + { + name: "Test notification send", + fields: fields{ + ctx: context.Background(), + logger: &log.Logger, + sdkKey: "123", + pubsub: &testPubSub{}, + }, + args: args{ + t: notification.Decision, + n: "test", + }, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + r := &SyncedNotificationCenter{ + ctx: tt.fields.ctx, + logger: tt.fields.logger, + sdkKey: tt.fields.sdkKey, + pubsub: tt.fields.pubsub, + } + if err := r.Send(tt.args.t, tt.args.n); (err != nil) != tt.wantErr { + t.Errorf("SyncedNotificationCenter.Send() error = %v, wantErr %v", err, tt.wantErr) + } + + assert.True(t, tt.fields.pubsub.(*testPubSub).publishCalled) + }) + } +} + +func TestSyncedNotificationCenter_Subscribe(t *testing.T) { + type fields struct { + ctx context.Context + logger *zerolog.Logger + sdkKey string + pubsub PubSub + } + type args struct { + ctx context.Context + channel string + } + tests := []struct { + name string + fields fields + args args + wantErr bool + }{ + { + name: "Test notification send", + fields: fields{ + ctx: context.Background(), + logger: &log.Logger, + sdkKey: "123", + pubsub: &testPubSub{}, + }, + args: args{ + ctx: context.Background(), + channel: "test-ch", + }, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + r := &SyncedNotificationCenter{ + ctx: tt.fields.ctx, + logger: tt.fields.logger, + sdkKey: tt.fields.sdkKey, + pubsub: tt.fields.pubsub, + } + _, err := r.Subscribe(tt.args.ctx, tt.args.channel) + if (err != nil) != tt.wantErr { + t.Errorf("SyncedNotificationCenter.Subscribe() error = %v, wantErr %v", err, tt.wantErr) + return + } + + assert.True(t, tt.fields.pubsub.(*testPubSub).subscribeCalled) + }) + } +} From 863821558e36f2acf35e832869c55278fc2fc72b Mon Sep 17 00:00:00 2001 From: Pulak Bhowmick Date: Fri, 24 Nov 2023 22:29:42 +0600 Subject: [PATCH 14/18] refactor code --- pkg/syncer/pubsub.go | 61 +++---------------------------- pkg/syncer/pubsub/redis.go | 75 ++++++++++++++++++++++++++++++++++++++ pkg/syncer/pubsub_test.go | 9 +++-- pkg/syncer/syncer_test.go | 17 +++++---- 4 files changed, 94 insertions(+), 68 deletions(-) create mode 100644 pkg/syncer/pubsub/redis.go diff --git a/pkg/syncer/pubsub.go b/pkg/syncer/pubsub.go index 29d3432a..0502afdd 100644 --- a/pkg/syncer/pubsub.go +++ b/pkg/syncer/pubsub.go @@ -21,8 +21,8 @@ import ( "context" "errors" - "github.com/go-redis/redis/v8" "github.com/optimizely/agent/config" + "github.com/optimizely/agent/pkg/syncer/pubsub" ) const ( @@ -76,62 +76,11 @@ func newPubSub(conf config.SyncConfig) (PubSub, error) { return nil, errors.New("pubsub redis database not valid, database must be int") } - return &pubsubRedis{ - host: host, - password: password, - database: database, + return &pubsub.Redis{ + Host: host, + Password: password, + Database: database, }, nil } return nil, errors.New("pubsub type not supported") } - -type pubsubRedis struct { - host string - password string - database int -} - -func (r *pubsubRedis) Publish(ctx context.Context, channel string, message interface{}) error { - client := redis.NewClient(&redis.Options{ - Addr: r.host, - Password: r.password, - DB: r.database, - }) - defer client.Close() - - return client.Publish(ctx, channel, message).Err() -} - -func (r *pubsubRedis) Subscribe(ctx context.Context, channel string) (chan string, error) { - client := redis.NewClient(&redis.Options{ - Addr: r.host, - Password: r.password, - DB: r.database, - }) - - // Subscribe to a Redis channel - pubsub := client.Subscribe(ctx, channel) - - ch := make(chan string) - - go func() { - for { - select { - case <-ctx.Done(): - pubsub.Close() - client.Close() - close(ch) - return - default: - msg, err := pubsub.ReceiveMessage(ctx) - if err != nil { - continue - } - - ch <- msg.Payload - - } - } - }() - return ch, nil -} diff --git a/pkg/syncer/pubsub/redis.go b/pkg/syncer/pubsub/redis.go new file mode 100644 index 00000000..c9251791 --- /dev/null +++ b/pkg/syncer/pubsub/redis.go @@ -0,0 +1,75 @@ +/**************************************************************************** + * Copyright 2023 Optimizely, Inc. and contributors * + * * + * Licensed under the Apache License, Version 2.0 (the "License"); * + * you may not use this file except in compliance with the License. * + * You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, software * + * distributed under the License is distributed on an "AS IS" BASIS, * + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * + * See the License for the specific language governing permissions and * + * limitations under the License. * + ***************************************************************************/ + +// Package pubsub provides pubsub functionality for the agent syncer +package pubsub + +import ( + "context" + + "github.com/go-redis/redis/v8" +) + +type Redis struct { + Host string + Password string + Database int +} + +func (r *Redis) Publish(ctx context.Context, channel string, message interface{}) error { + client := redis.NewClient(&redis.Options{ + Addr: r.Host, + Password: r.Password, + DB: r.Database, + }) + defer client.Close() + + return client.Publish(ctx, channel, message).Err() +} + +func (r *Redis) Subscribe(ctx context.Context, channel string) (chan string, error) { + client := redis.NewClient(&redis.Options{ + Addr: r.Host, + Password: r.Password, + DB: r.Database, + }) + + // Subscribe to a Redis channel + pubsub := client.Subscribe(ctx, channel) + + ch := make(chan string) + + go func() { + for { + select { + case <-ctx.Done(): + pubsub.Close() + client.Close() + close(ch) + return + default: + msg, err := pubsub.ReceiveMessage(ctx) + if err != nil { + continue + } + + ch <- msg.Payload + + } + } + }() + return ch, nil +} diff --git a/pkg/syncer/pubsub_test.go b/pkg/syncer/pubsub_test.go index c45cd295..1d684a5e 100644 --- a/pkg/syncer/pubsub_test.go +++ b/pkg/syncer/pubsub_test.go @@ -22,6 +22,7 @@ import ( "testing" "github.com/optimizely/agent/config" + "github.com/optimizely/agent/pkg/syncer/pubsub" ) func TestNewPubSub(t *testing.T) { @@ -51,10 +52,10 @@ func TestNewPubSub(t *testing.T) { }, }, }, - want: &pubsubRedis{ - host: "localhost:6379", - password: "", - database: 0, + want: &pubsub.Redis{ + Host: "localhost:6379", + Password: "", + Database: 0, }, wantErr: false, }, diff --git a/pkg/syncer/syncer_test.go b/pkg/syncer/syncer_test.go index 67d89d7b..ebc76b84 100644 --- a/pkg/syncer/syncer_test.go +++ b/pkg/syncer/syncer_test.go @@ -23,6 +23,7 @@ import ( "testing" "github.com/optimizely/agent/config" + "github.com/optimizely/agent/pkg/syncer/pubsub" "github.com/optimizely/go-sdk/pkg/notification" "github.com/rs/zerolog" "github.com/rs/zerolog/log" @@ -79,10 +80,10 @@ func TestNewSyncedNotificationCenter(t *testing.T) { ctx: context.Background(), logger: &log.Logger, sdkKey: "123", - pubsub: &pubsubRedis{ - host: "localhost:6379", - password: "", - database: 0, + pubsub: &pubsub.Redis{ + Host: "localhost:6379", + Password: "", + Database: 0, }, }, wantErr: false, @@ -160,10 +161,10 @@ func TestNewDatafileSyncer(t *testing.T) { }, }, want: &DatafileSyncer{ - pubsub: &pubsubRedis{ - host: "localhost:6379", - password: "", - database: 0, + pubsub: &pubsub.Redis{ + Host: "localhost:6379", + Password: "", + Database: 0, }, }, wantErr: false, From e1658a080a9db07f3cb5fd13db04c5650487a07b Mon Sep 17 00:00:00 2001 From: Pulak Bhowmick Date: Fri, 24 Nov 2023 22:46:33 +0600 Subject: [PATCH 15/18] cleanup --- Dockerfile | 14 -------------- config.yaml | 14 +++++++------- 2 files changed, 7 insertions(+), 21 deletions(-) delete mode 100644 Dockerfile diff --git a/Dockerfile b/Dockerfile deleted file mode 100644 index fd8aea13..00000000 --- a/Dockerfile +++ /dev/null @@ -1,14 +0,0 @@ -FROM golang:1.21 as builder -RUN addgroup -u 1000 agentgroup &&\ - useradd -u 1000 agentuser -g agentgroup -WORKDIR /go/src/github.com/optimizely/agent -COPY . . -RUN make setup build &&\ - make ci_build_static_binary - -FROM scratch -COPY --from=builder /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ -COPY --from=builder /go/src/github.com/optimizely/agent/bin/optimizely /optimizely -COPY --from=builder /etc/passwd /etc/passwd -USER agentuser -CMD ["/optimizely"] diff --git a/config.yaml b/config.yaml index a61acfbe..419bebce 100644 --- a/config.yaml +++ b/config.yaml @@ -145,17 +145,17 @@ webhook: ## http listener port port: "8089" # ## a map of Optimizely Projects to one or more SDK keys - projects: - ## : Optimizely project id as an integer - 23897262460: - ## sdkKeys: a list of SDKs linked to this project - sdkKeys: - - RiowyaMnbPxLa4dPWrDqu +# projects: +# ## : Optimizely project id as an integer +# : +# ## sdkKeys: a list of SDKs linked to this project +# sdkKeys: +# - # - # ## secret: webhook secret used the validate the notification # secret: # ## skipSignatureCheck: override the signature check (not recommended for production) - skipSignatureCheck: true +# skipSignatureCheck: true ## ## optimizely client configurations (options passed to the underlying go-sdk) From 611f900cc5524da1f4804adbab3d28d9f3c09a5f Mon Sep 17 00:00:00 2001 From: Pulak Bhowmick Date: Fri, 24 Nov 2023 22:47:34 +0600 Subject: [PATCH 16/18] update config --- config.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/config.yaml b/config.yaml index 419bebce..606a84a2 100644 --- a/config.yaml +++ b/config.yaml @@ -255,10 +255,10 @@ synchronization: ## if notification synchronization is enabled, then the active notification event-stream API ## will get the notifications from available replicas notification: - enable: true + enable: false default: "redis" ## if datafile synchronization is enabled, then for each webhook API call ## the datafile will be sent to all available replicas to achieve better eventual consistency datafile: - enable: true + enable: false default: "redis" From 430ec5b00cab1cee197029e369113e8f918181da Mon Sep 17 00:00:00 2001 From: Pulak Bhowmick Date: Wed, 29 Nov 2023 17:47:06 +0600 Subject: [PATCH 17/18] add review changes --- config.yaml | 1 - pkg/handlers/notification.go | 6 +- pkg/handlers/notification_test.go | 2 +- pkg/handlers/webhook.go | 8 +-- pkg/handlers/webhook_test.go | 4 +- pkg/routers/api.go | 2 +- pkg/syncer/pubsub.go | 99 +++++++++++++++++++------------ pkg/syncer/pubsub_test.go | 38 +++++++++++- pkg/syncer/syncer.go | 4 +- pkg/syncer/syncer_test.go | 4 +- 10 files changed, 111 insertions(+), 57 deletions(-) diff --git a/config.yaml b/config.yaml index 606a84a2..ff2b09b8 100644 --- a/config.yaml +++ b/config.yaml @@ -251,7 +251,6 @@ synchronization: host: "redis.demo.svc:6379" password: "" database: 0 - channel: "optimizely-sync" ## if notification synchronization is enabled, then the active notification event-stream API ## will get the notifications from available replicas notification: diff --git a/pkg/handlers/notification.go b/pkg/handlers/notification.go index b913fce5..356a4f9a 100644 --- a/pkg/handlers/notification.go +++ b/pkg/handlers/notification.go @@ -211,19 +211,19 @@ func DefaultNotificationReceiver(ctx context.Context) (<-chan syncer.Event, erro return messageChan, nil } -func RedisNotificationReceiver(conf config.SyncConfig) NotificationReceiverFunc { +func SyncedNotificationReceiver(conf config.SyncConfig) NotificationReceiverFunc { return func(ctx context.Context) (<-chan syncer.Event, error) { sdkKey, ok := ctx.Value(SDKKey).(string) if !ok || sdkKey == "" { return nil, errors.New("sdk key not found") } - redisSyncer, err := syncer.NewSyncedNotificationCenter(ctx, sdkKey, conf) + ncSyncer, err := syncer.NewSyncedNotificationCenter(ctx, sdkKey, conf) if err != nil { return nil, err } - eventCh, err := redisSyncer.Subscribe(ctx, syncer.GetChannelForSDKKey(syncer.PubSubDefaultChan, sdkKey)) + eventCh, err := ncSyncer.Subscribe(ctx, syncer.GetChannelForSDKKey(syncer.PubSubDefaultChan, sdkKey)) if err != nil { return nil, err } diff --git a/pkg/handlers/notification_test.go b/pkg/handlers/notification_test.go index 09235a06..b1e62ab5 100644 --- a/pkg/handlers/notification_test.go +++ b/pkg/handlers/notification_test.go @@ -407,7 +407,7 @@ func TestRedisNotificationReceiver(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got := RedisNotificationReceiver(tt.args.conf) + got := SyncedNotificationReceiver(tt.args.conf) if reflect.TypeOf(got) != reflect.TypeOf(tt.want) { t.Errorf("RedisNotificationReceiver() = %v, want %v", got, tt.want) } diff --git a/pkg/handlers/webhook.go b/pkg/handlers/webhook.go index 48728db5..1e911216 100644 --- a/pkg/handlers/webhook.go +++ b/pkg/handlers/webhook.go @@ -148,15 +148,15 @@ func (h *OptlyWebhookHandler) HandleWebhook(w http.ResponseWriter, r *http.Reque // Iterate through all SDK keys and update config for _, sdkKey := range webhookConfig.SDKKeys { - h.optlyCache.UpdateConfigs(sdkKey) - if h.syncEnabled { if err := h.configSyncer.Sync(r.Context(), syncer.GetDatafileSyncChannel(), sdkKey); err != nil { errMsg := fmt.Sprintf("datafile synced failed. reason: %s", err.Error()) log.Error().Msg(errMsg) + h.optlyCache.UpdateConfigs(sdkKey) } + } else { + h.optlyCache.UpdateConfigs(sdkKey) } - } w.WriteHeader(http.StatusNoContent) } @@ -185,7 +185,7 @@ func (h *OptlyWebhookHandler) StartSyncer(ctx context.Context) error { return case key := <-dataCh: h.optlyCache.UpdateConfigs(key) - logger.Debug().Msg("datafile synced successfully") + logger.Info().Msgf("datafile synced successfully for sdkKey: %s", key) } } }() diff --git a/pkg/handlers/webhook_test.go b/pkg/handlers/webhook_test.go index 0d01ed14..221e3e0f 100644 --- a/pkg/handlers/webhook_test.go +++ b/pkg/handlers/webhook_test.go @@ -233,7 +233,6 @@ func TestHandleWebhookValidMessage(t *testing.T) { } func TestHandleWebhookWithDatafileSyncer(t *testing.T) { - testCache := NewCache() var testWebhookConfigs = map[int64]config.WebhookProject{ 42: { SDKKeys: []string{"myDatafile"}, @@ -242,7 +241,7 @@ func TestHandleWebhookWithDatafileSyncer(t *testing.T) { } syncer := NewTestDFSyncer() - optlyHandler := NewWebhookHandler(testCache, testWebhookConfigs, syncer) + optlyHandler := NewWebhookHandler(nil, testWebhookConfigs, syncer) webhookMsg := OptlyMessage{ ProjectID: 42, Timestamp: 42424242, @@ -267,7 +266,6 @@ func TestHandleWebhookWithDatafileSyncer(t *testing.T) { handler.ServeHTTP(rec, req) assert.Equal(t, http.StatusNoContent, rec.Code) - assert.Equal(t, true, testCache.updateConfigsCalled) assert.Equal(t, true, syncer.syncCalled) } diff --git a/pkg/routers/api.go b/pkg/routers/api.go index 7eb15d6e..79df6103 100644 --- a/pkg/routers/api.go +++ b/pkg/routers/api.go @@ -85,7 +85,7 @@ func NewDefaultAPIRouter(optlyCache optimizely.Cache, conf config.AgentConfig, m if conf.API.EnableNotifications { nStreamHandler = handlers.NotificationEventStreamHandler(handlers.DefaultNotificationReceiver) if conf.Synchronization.Notification.Enable { - nStreamHandler = handlers.NotificationEventStreamHandler(handlers.RedisNotificationReceiver(conf.Synchronization)) + nStreamHandler = handlers.NotificationEventStreamHandler(handlers.SyncedNotificationReceiver(conf.Synchronization)) } } diff --git a/pkg/syncer/pubsub.go b/pkg/syncer/pubsub.go index 0502afdd..6436c03e 100644 --- a/pkg/syncer/pubsub.go +++ b/pkg/syncer/pubsub.go @@ -32,55 +32,76 @@ const ( PubSubRedis = "redis" ) +type SycnFeatureFlag string + +const ( + SyncFeatureFlagNotificaiton SycnFeatureFlag = "sync-feature-flag-notification" + SycnFeatureFlagDatafile SycnFeatureFlag = "sync-feature-flag-datafile" +) + type PubSub interface { Publish(ctx context.Context, channel string, message interface{}) error Subscribe(ctx context.Context, channel string) (chan string, error) } -func newPubSub(conf config.SyncConfig) (PubSub, error) { - if conf.Notification.Default == PubSubRedis { - pubsubConf, found := conf.Pubsub[PubSubRedis] - if !found { - return nil, errors.New("pubsub redis config not found") +func newPubSub(conf config.SyncConfig, featureFlag SycnFeatureFlag) (PubSub, error) { + if featureFlag == SyncFeatureFlagNotificaiton { + if conf.Notification.Default == PubSubRedis { + return getPubSubRedis(conf) + } else { + return nil, errors.New("pubsub type not supported") } - - redisConf, ok := pubsubConf.(map[string]interface{}) - if !ok { - return nil, errors.New("pubsub redis config not valid") + } else if featureFlag == SycnFeatureFlagDatafile { + if conf.Datafile.Default == PubSubRedis { + return getPubSubRedis(conf) + } else { + return nil, errors.New("pubsub type not supported") } + } + return nil, errors.New("provided feature flag not supported") +} - hostVal, found := redisConf["host"] - if !found { - return nil, errors.New("pubsub redis host not found") - } - host, ok := hostVal.(string) - if !ok { - return nil, errors.New("pubsub redis host not valid, host must be string") - } +func getPubSubRedis(conf config.SyncConfig) (PubSub, error) { + pubsubConf, found := conf.Pubsub[PubSubRedis] + if !found { + return nil, errors.New("pubsub redis config not found") + } - passwordVal, found := redisConf["password"] - if !found { - return nil, errors.New("pubsub redis password not found") - } - password, ok := passwordVal.(string) - if !ok { - return nil, errors.New("pubsub redis password not valid, password must be string") - } + redisConf, ok := pubsubConf.(map[string]interface{}) + if !ok { + return nil, errors.New("pubsub redis config not valid") + } - databaseVal, found := redisConf["database"] - if !found { - return nil, errors.New("pubsub redis database not found") - } - database, ok := databaseVal.(int) - if !ok { - return nil, errors.New("pubsub redis database not valid, database must be int") - } + hostVal, found := redisConf["host"] + if !found { + return nil, errors.New("pubsub redis host not found") + } + host, ok := hostVal.(string) + if !ok { + return nil, errors.New("pubsub redis host not valid, host must be string") + } - return &pubsub.Redis{ - Host: host, - Password: password, - Database: database, - }, nil + passwordVal, found := redisConf["password"] + if !found { + return nil, errors.New("pubsub redis password not found") } - return nil, errors.New("pubsub type not supported") + password, ok := passwordVal.(string) + if !ok { + return nil, errors.New("pubsub redis password not valid, password must be string") + } + + databaseVal, found := redisConf["database"] + if !found { + return nil, errors.New("pubsub redis database not found") + } + database, ok := databaseVal.(int) + if !ok { + return nil, errors.New("pubsub redis database not valid, database must be int") + } + + return &pubsub.Redis{ + Host: host, + Password: password, + Database: database, + }, nil } diff --git a/pkg/syncer/pubsub_test.go b/pkg/syncer/pubsub_test.go index 1d684a5e..31b3dc1d 100644 --- a/pkg/syncer/pubsub_test.go +++ b/pkg/syncer/pubsub_test.go @@ -28,6 +28,7 @@ import ( func TestNewPubSub(t *testing.T) { type args struct { conf config.SyncConfig + flag SycnFeatureFlag } tests := []struct { name string @@ -51,6 +52,32 @@ func TestNewPubSub(t *testing.T) { Enable: true, }, }, + flag: SyncFeatureFlagNotificaiton, + }, + want: &pubsub.Redis{ + Host: "localhost:6379", + Password: "", + Database: 0, + }, + wantErr: false, + }, + { + name: "Test with valid config for datafile", + args: args{ + conf: config.SyncConfig{ + Pubsub: map[string]interface{}{ + "redis": map[string]interface{}{ + "host": "localhost:6379", + "password": "", + "database": 0, + }, + }, + Datafile: config.FeatureSyncConfig{ + Default: "redis", + Enable: true, + }, + }, + flag: SycnFeatureFlagDatafile, }, want: &pubsub.Redis{ Host: "localhost:6379", @@ -71,6 +98,7 @@ func TestNewPubSub(t *testing.T) { Enable: true, }, }, + flag: SyncFeatureFlagNotificaiton, }, want: nil, wantErr: true, @@ -87,6 +115,7 @@ func TestNewPubSub(t *testing.T) { Enable: true, }, }, + flag: SyncFeatureFlagNotificaiton, }, want: nil, wantErr: true, @@ -103,6 +132,7 @@ func TestNewPubSub(t *testing.T) { Enable: true, }, }, + flag: SyncFeatureFlagNotificaiton, }, want: nil, wantErr: true, @@ -123,6 +153,7 @@ func TestNewPubSub(t *testing.T) { Enable: true, }, }, + flag: SyncFeatureFlagNotificaiton, }, want: nil, wantErr: true, @@ -142,6 +173,7 @@ func TestNewPubSub(t *testing.T) { Enable: true, }, }, + flag: SyncFeatureFlagNotificaiton, }, want: nil, wantErr: true, @@ -161,6 +193,7 @@ func TestNewPubSub(t *testing.T) { Enable: true, }, }, + flag: SyncFeatureFlagNotificaiton, }, want: nil, wantErr: true, @@ -180,6 +213,7 @@ func TestNewPubSub(t *testing.T) { Enable: true, }, }, + flag: SyncFeatureFlagNotificaiton, }, want: nil, wantErr: true, @@ -200,6 +234,7 @@ func TestNewPubSub(t *testing.T) { Enable: true, }, }, + flag: SyncFeatureFlagNotificaiton, }, want: nil, wantErr: true, @@ -220,6 +255,7 @@ func TestNewPubSub(t *testing.T) { Enable: true, }, }, + flag: SyncFeatureFlagNotificaiton, }, want: nil, wantErr: true, @@ -227,7 +263,7 @@ func TestNewPubSub(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got, err := newPubSub(tt.args.conf) + got, err := newPubSub(tt.args.conf, tt.args.flag) if (err != nil) != tt.wantErr { t.Errorf("NewPubSub() error = %v, wantErr %v", err, tt.wantErr) return diff --git a/pkg/syncer/syncer.go b/pkg/syncer/syncer.go index 17bca6aa..93ad6fb4 100644 --- a/pkg/syncer/syncer.go +++ b/pkg/syncer/syncer.go @@ -75,7 +75,7 @@ func NewSyncedNotificationCenter(ctx context.Context, sdkKey string, conf config logger = &log.Logger } - pubsub, err := newPubSub(conf) + pubsub, err := newPubSub(conf, SyncFeatureFlagNotificaiton) if err != nil { return nil, err } @@ -132,7 +132,7 @@ type DatafileSyncer struct { } func NewDatafileSyncer(conf config.SyncConfig) (*DatafileSyncer, error) { - pubsub, err := newPubSub(conf) + pubsub, err := newPubSub(conf, SycnFeatureFlagDatafile) if err != nil { return nil, err } diff --git a/pkg/syncer/syncer_test.go b/pkg/syncer/syncer_test.go index ebc76b84..c03d6553 100644 --- a/pkg/syncer/syncer_test.go +++ b/pkg/syncer/syncer_test.go @@ -154,7 +154,7 @@ func TestNewDatafileSyncer(t *testing.T) { "database": 0, }, }, - Notification: config.FeatureSyncConfig{ + Datafile: config.FeatureSyncConfig{ Default: "redis", Enable: true, }, @@ -178,7 +178,7 @@ func TestNewDatafileSyncer(t *testing.T) { "host": "invalid host", }, }, - Notification: config.FeatureSyncConfig{ + Datafile: config.FeatureSyncConfig{ Default: "redis", Enable: true, }, From fde83f07a3492f89e4104d7bc224d5e8bba47ebb Mon Sep 17 00:00:00 2001 From: Pulak Bhowmick Date: Thu, 30 Nov 2023 16:33:00 +0600 Subject: [PATCH 18/18] update config doc --- config.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/config.yaml b/config.yaml index ff2b09b8..431ddd6c 100644 --- a/config.yaml +++ b/config.yaml @@ -244,7 +244,7 @@ runtime: ## (For n>1 the details of sampling may change.) mutexProfileFraction: 0 -## synchronization should be enabled when multiple replicas of agent is deployed +## synchronization should be enabled when features for multiple nodes like notification streaming are deployed synchronization: pubsub: redis: