Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FSSDK-9631] feat: add datafile syncer to synchronize datafile across agent nodes for webhook API #405

Merged
merged 18 commits into from
Dec 5, 2023
1 change: 0 additions & 1 deletion config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,6 @@ synchronization:
host: "redis.demo.svc:6379"
password: ""
database: 0
channel: "optimizely-sync"
## if notification synchronization is enabled, then the active notification event-stream API
## will get the notifications from available replicas
notification:
Expand Down
6 changes: 3 additions & 3 deletions pkg/handlers/notification.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,19 +211,19 @@ func DefaultNotificationReceiver(ctx context.Context) (<-chan syncer.Event, erro
return messageChan, nil
}

func RedisNotificationReceiver(conf config.SyncConfig) NotificationReceiverFunc {
func SyncedNotificationReceiver(conf config.SyncConfig) NotificationReceiverFunc {
return func(ctx context.Context) (<-chan syncer.Event, error) {
sdkKey, ok := ctx.Value(SDKKey).(string)
if !ok || sdkKey == "" {
return nil, errors.New("sdk key not found")
}

redisSyncer, err := syncer.NewSyncedNotificationCenter(ctx, sdkKey, conf)
ncSyncer, err := syncer.NewSyncedNotificationCenter(ctx, sdkKey, conf)
if err != nil {
return nil, err
}

eventCh, err := redisSyncer.Subscribe(ctx, syncer.GetChannelForSDKKey(syncer.PubSubDefaultChan, sdkKey))
eventCh, err := ncSyncer.Subscribe(ctx, syncer.GetChannelForSDKKey(syncer.PubSubDefaultChan, sdkKey))
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/handlers/notification_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ func TestRedisNotificationReceiver(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := RedisNotificationReceiver(tt.args.conf)
got := SyncedNotificationReceiver(tt.args.conf)
if reflect.TypeOf(got) != reflect.TypeOf(tt.want) {
t.Errorf("RedisNotificationReceiver() = %v, want %v", got, tt.want)
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/handlers/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,15 +148,15 @@ func (h *OptlyWebhookHandler) HandleWebhook(w http.ResponseWriter, r *http.Reque

// Iterate through all SDK keys and update config
for _, sdkKey := range webhookConfig.SDKKeys {
h.optlyCache.UpdateConfigs(sdkKey)

if h.syncEnabled {
if err := h.configSyncer.Sync(r.Context(), syncer.GetDatafileSyncChannel(), sdkKey); err != nil {
errMsg := fmt.Sprintf("datafile synced failed. reason: %s", err.Error())
log.Error().Msg(errMsg)
h.optlyCache.UpdateConfigs(sdkKey)
pulak-opti marked this conversation as resolved.
Show resolved Hide resolved
}
} else {
h.optlyCache.UpdateConfigs(sdkKey)
}

}
w.WriteHeader(http.StatusNoContent)
}
Expand Down Expand Up @@ -185,7 +185,7 @@ func (h *OptlyWebhookHandler) StartSyncer(ctx context.Context) error {
return
case key := <-dataCh:
h.optlyCache.UpdateConfigs(key)
logger.Debug().Msg("datafile synced successfully")
logger.Info().Msgf("datafile synced successfully for sdkKey: %s", key)
}
mikechu-optimizely marked this conversation as resolved.
Show resolved Hide resolved
}
}()
Expand Down
4 changes: 1 addition & 3 deletions pkg/handlers/webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,6 @@ func TestHandleWebhookValidMessage(t *testing.T) {
}

func TestHandleWebhookWithDatafileSyncer(t *testing.T) {
testCache := NewCache()
var testWebhookConfigs = map[int64]config.WebhookProject{
42: {
SDKKeys: []string{"myDatafile"},
Expand All @@ -242,7 +241,7 @@ func TestHandleWebhookWithDatafileSyncer(t *testing.T) {
}
syncer := NewTestDFSyncer()

optlyHandler := NewWebhookHandler(testCache, testWebhookConfigs, syncer)
optlyHandler := NewWebhookHandler(nil, testWebhookConfigs, syncer)
webhookMsg := OptlyMessage{
ProjectID: 42,
Timestamp: 42424242,
Expand All @@ -267,7 +266,6 @@ func TestHandleWebhookWithDatafileSyncer(t *testing.T) {
handler.ServeHTTP(rec, req)

assert.Equal(t, http.StatusNoContent, rec.Code)
assert.Equal(t, true, testCache.updateConfigsCalled)
assert.Equal(t, true, syncer.syncCalled)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/routers/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func NewDefaultAPIRouter(optlyCache optimizely.Cache, conf config.AgentConfig, m
if conf.API.EnableNotifications {
nStreamHandler = handlers.NotificationEventStreamHandler(handlers.DefaultNotificationReceiver)
if conf.Synchronization.Notification.Enable {
nStreamHandler = handlers.NotificationEventStreamHandler(handlers.RedisNotificationReceiver(conf.Synchronization))
nStreamHandler = handlers.NotificationEventStreamHandler(handlers.SyncedNotificationReceiver(conf.Synchronization))
}
}

Expand Down
99 changes: 60 additions & 39 deletions pkg/syncer/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,55 +32,76 @@ const (
PubSubRedis = "redis"
)

type SycnFeatureFlag string

const (
SyncFeatureFlagNotificaiton SycnFeatureFlag = "sync-feature-flag-notification"
SycnFeatureFlagDatafile SycnFeatureFlag = "sync-feature-flag-datafile"
)

type PubSub interface {
Publish(ctx context.Context, channel string, message interface{}) error
Subscribe(ctx context.Context, channel string) (chan string, error)
}

func newPubSub(conf config.SyncConfig) (PubSub, error) {
if conf.Notification.Default == PubSubRedis {
pubsubConf, found := conf.Pubsub[PubSubRedis]
if !found {
return nil, errors.New("pubsub redis config not found")
func newPubSub(conf config.SyncConfig, featureFlag SycnFeatureFlag) (PubSub, error) {
if featureFlag == SyncFeatureFlagNotificaiton {
if conf.Notification.Default == PubSubRedis {
return getPubSubRedis(conf)
} else {
return nil, errors.New("pubsub type not supported")
}

redisConf, ok := pubsubConf.(map[string]interface{})
if !ok {
return nil, errors.New("pubsub redis config not valid")
} else if featureFlag == SycnFeatureFlagDatafile {
if conf.Datafile.Default == PubSubRedis {
return getPubSubRedis(conf)
} else {
return nil, errors.New("pubsub type not supported")
}
}
return nil, errors.New("provided feature flag not supported")
}

hostVal, found := redisConf["host"]
if !found {
return nil, errors.New("pubsub redis host not found")
}
host, ok := hostVal.(string)
if !ok {
return nil, errors.New("pubsub redis host not valid, host must be string")
}
func getPubSubRedis(conf config.SyncConfig) (PubSub, error) {
pubsubConf, found := conf.Pubsub[PubSubRedis]
if !found {
return nil, errors.New("pubsub redis config not found")
}

passwordVal, found := redisConf["password"]
if !found {
return nil, errors.New("pubsub redis password not found")
}
password, ok := passwordVal.(string)
if !ok {
return nil, errors.New("pubsub redis password not valid, password must be string")
}
redisConf, ok := pubsubConf.(map[string]interface{})
if !ok {
return nil, errors.New("pubsub redis config not valid")
}

databaseVal, found := redisConf["database"]
if !found {
return nil, errors.New("pubsub redis database not found")
}
database, ok := databaseVal.(int)
if !ok {
return nil, errors.New("pubsub redis database not valid, database must be int")
}
hostVal, found := redisConf["host"]
if !found {
return nil, errors.New("pubsub redis host not found")
}
host, ok := hostVal.(string)
if !ok {
return nil, errors.New("pubsub redis host not valid, host must be string")
}

return &pubsub.Redis{
Host: host,
Password: password,
Database: database,
}, nil
passwordVal, found := redisConf["password"]
if !found {
return nil, errors.New("pubsub redis password not found")
}
return nil, errors.New("pubsub type not supported")
password, ok := passwordVal.(string)
if !ok {
return nil, errors.New("pubsub redis password not valid, password must be string")
}

databaseVal, found := redisConf["database"]
if !found {
return nil, errors.New("pubsub redis database not found")
}
database, ok := databaseVal.(int)
if !ok {
return nil, errors.New("pubsub redis database not valid, database must be int")
}

return &pubsub.Redis{
Host: host,
Password: password,
Database: database,
}, nil
}
38 changes: 37 additions & 1 deletion pkg/syncer/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
func TestNewPubSub(t *testing.T) {
type args struct {
conf config.SyncConfig
flag SycnFeatureFlag
}
tests := []struct {
name string
Expand All @@ -51,6 +52,32 @@ func TestNewPubSub(t *testing.T) {
Enable: true,
},
},
flag: SyncFeatureFlagNotificaiton,
},
want: &pubsub.Redis{
Host: "localhost:6379",
Password: "",
Database: 0,
},
wantErr: false,
},
{
name: "Test with valid config for datafile",
args: args{
conf: config.SyncConfig{
Pubsub: map[string]interface{}{
"redis": map[string]interface{}{
"host": "localhost:6379",
"password": "",
"database": 0,
},
},
Datafile: config.FeatureSyncConfig{
Default: "redis",
Enable: true,
},
},
flag: SycnFeatureFlagDatafile,
},
want: &pubsub.Redis{
Host: "localhost:6379",
Expand All @@ -71,6 +98,7 @@ func TestNewPubSub(t *testing.T) {
Enable: true,
},
},
flag: SyncFeatureFlagNotificaiton,
},
want: nil,
wantErr: true,
Expand All @@ -87,6 +115,7 @@ func TestNewPubSub(t *testing.T) {
Enable: true,
},
},
flag: SyncFeatureFlagNotificaiton,
},
want: nil,
wantErr: true,
Expand All @@ -103,6 +132,7 @@ func TestNewPubSub(t *testing.T) {
Enable: true,
},
},
flag: SyncFeatureFlagNotificaiton,
},
want: nil,
wantErr: true,
Expand All @@ -123,6 +153,7 @@ func TestNewPubSub(t *testing.T) {
Enable: true,
},
},
flag: SyncFeatureFlagNotificaiton,
},
want: nil,
wantErr: true,
Expand All @@ -142,6 +173,7 @@ func TestNewPubSub(t *testing.T) {
Enable: true,
},
},
flag: SyncFeatureFlagNotificaiton,
},
want: nil,
wantErr: true,
Expand All @@ -161,6 +193,7 @@ func TestNewPubSub(t *testing.T) {
Enable: true,
},
},
flag: SyncFeatureFlagNotificaiton,
},
want: nil,
wantErr: true,
Expand All @@ -180,6 +213,7 @@ func TestNewPubSub(t *testing.T) {
Enable: true,
},
},
flag: SyncFeatureFlagNotificaiton,
},
want: nil,
wantErr: true,
Expand All @@ -200,6 +234,7 @@ func TestNewPubSub(t *testing.T) {
Enable: true,
},
},
flag: SyncFeatureFlagNotificaiton,
},
want: nil,
wantErr: true,
Expand All @@ -220,14 +255,15 @@ func TestNewPubSub(t *testing.T) {
Enable: true,
},
},
flag: SyncFeatureFlagNotificaiton,
},
want: nil,
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := newPubSub(tt.args.conf)
got, err := newPubSub(tt.args.conf, tt.args.flag)
if (err != nil) != tt.wantErr {
t.Errorf("NewPubSub() error = %v, wantErr %v", err, tt.wantErr)
return
Expand Down
4 changes: 2 additions & 2 deletions pkg/syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func NewSyncedNotificationCenter(ctx context.Context, sdkKey string, conf config
logger = &log.Logger
}

pubsub, err := newPubSub(conf)
pubsub, err := newPubSub(conf, SyncFeatureFlagNotificaiton)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -132,7 +132,7 @@ type DatafileSyncer struct {
}

func NewDatafileSyncer(conf config.SyncConfig) (*DatafileSyncer, error) {
pubsub, err := newPubSub(conf)
pubsub, err := newPubSub(conf, SycnFeatureFlagDatafile)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/syncer/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func TestNewDatafileSyncer(t *testing.T) {
"database": 0,
},
},
Notification: config.FeatureSyncConfig{
Datafile: config.FeatureSyncConfig{
Default: "redis",
Enable: true,
},
Expand All @@ -178,7 +178,7 @@ func TestNewDatafileSyncer(t *testing.T) {
"host": "invalid host",
},
},
Notification: config.FeatureSyncConfig{
Datafile: config.FeatureSyncConfig{
Default: "redis",
Enable: true,
},
Expand Down