Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FSSDK-9631] feat: add datafile syncer to synchronize datafile across agent nodes for webhook API #405

Merged
merged 18 commits into from
Dec 5, 2023
8 changes: 6 additions & 2 deletions cmd/optimizely/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/spf13/viper"

"github.com/optimizely/agent/config"
"github.com/optimizely/agent/pkg/handlers"
"github.com/optimizely/agent/pkg/metrics"
"github.com/optimizely/agent/pkg/optimizely"
"github.com/optimizely/agent/pkg/routers"
Expand Down Expand Up @@ -266,7 +267,10 @@ func main() {
sdkMetricsRegistry := optimizely.NewRegistry(agentMetricsRegistry)

ctx, cancel := context.WithCancel(context.Background()) // Create default service context
sg := server.NewGroup(ctx, conf.Server) // Create a new server group to manage the individual http listeners
defer cancel()
ctx = context.WithValue(ctx, handlers.LoggerKey, &log.Logger)

sg := server.NewGroup(ctx, conf.Server) // Create a new server group to manage the individual http listeners
optlyCache := optimizely.NewCache(ctx, *conf, sdkMetricsRegistry)
optlyCache.Init(conf.SDKKeys)

Expand All @@ -286,7 +290,7 @@ func main() {

log.Info().Str("version", conf.Version).Msg("Starting services.")
sg.GoListenAndServe("api", conf.API.Port, apiRouter)
sg.GoListenAndServe("webhook", conf.Webhook.Port, routers.NewWebhookRouter(optlyCache, conf.Webhook))
sg.GoListenAndServe("webhook", conf.Webhook.Port, routers.NewWebhookRouter(ctx, optlyCache, *conf))
mikechu-optimizely marked this conversation as resolved.
Show resolved Hide resolved
sg.GoListenAndServe("admin", conf.Admin.Port, adminRouter) // Admin should be added last.

// wait for server group to shutdown
Expand Down
12 changes: 8 additions & 4 deletions config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -244,16 +244,20 @@ runtime:
## (For n>1 the details of sampling may change.)
mutexProfileFraction: 0

## synchronization should be enabled when multiple replicas of agent is deployed
## if notification synchronization is enabled, then the active notification event-stream API
## will get the notifications from multiple replicas
## synchronization should be enabled when features for multiple nodes like notification streaming are deployed
synchronization:
pubsub:
redis:
host: "redis.demo.svc:6379"
password: ""
database: 0
channel: "optimizely-sync"
## if notification synchronization is enabled, then the active notification event-stream API
## will get the notifications from available replicas
notification:
enable: false
default: "redis"
## if datafile synchronization is enabled, then for each webhook API call
## the datafile will be sent to all available replicas to achieve better eventual consistency
datafile:
enable: false
default: "redis"
9 changes: 5 additions & 4 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func NewDefaultConfig() *AgentConfig {
"channel": "optimizely-notifications",
},
},
Notification: NotificationConfig{
Notification: FeatureSyncConfig{
Enable: false,
Default: "redis",
},
Expand Down Expand Up @@ -167,11 +167,12 @@ type AgentConfig struct {
// SyncConfig contains Synchronization configuration for the multiple Agent nodes
type SyncConfig struct {
Pubsub map[string]interface{} `json:"pubsub"`
Notification NotificationConfig `json:"notification"`
Notification FeatureSyncConfig `json:"notification"`
Datafile FeatureSyncConfig `json:"datafile"`
}

// NotificationConfig contains Notification Synchronization configuration for the multiple Agent nodes
type NotificationConfig struct {
// FeatureSyncConfig contains Notification Synchronization configuration for the multiple Agent nodes
type FeatureSyncConfig struct {
mikechu-optimizely marked this conversation as resolved.
Show resolved Hide resolved
Enable bool `json:"enable"`
Default string `json:"default"`
}
Expand Down
30 changes: 9 additions & 21 deletions pkg/handlers/notification.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"net/http"
"strings"

"github.com/go-redis/redis/v8"
"github.com/optimizely/agent/config"
"github.com/optimizely/agent/pkg/middleware"
"github.com/optimizely/agent/pkg/syncer"
Expand Down Expand Up @@ -212,26 +211,22 @@ func DefaultNotificationReceiver(ctx context.Context) (<-chan syncer.Event, erro
return messageChan, nil
}

func RedisNotificationReceiver(conf config.SyncConfig) NotificationReceiverFunc {
func SyncedNotificationReceiver(conf config.SyncConfig) NotificationReceiverFunc {
return func(ctx context.Context) (<-chan syncer.Event, error) {
sdkKey, ok := ctx.Value(SDKKey).(string)
if !ok || sdkKey == "" {
return nil, errors.New("sdk key not found")
}

redisSyncer, err := syncer.NewRedisSyncer(&zerolog.Logger{}, conf, sdkKey)
ncSyncer, err := syncer.NewSyncedNotificationCenter(ctx, sdkKey, conf)
if err != nil {
return nil, err
}

client := redis.NewClient(&redis.Options{
Addr: redisSyncer.Host,
Password: redisSyncer.Password,
DB: redisSyncer.Database,
})

// Subscribe to a Redis channel
pubsub := client.Subscribe(ctx, syncer.GetChannelForSDKKey(redisSyncer.Channel, sdkKey))
eventCh, err := ncSyncer.Subscribe(ctx, syncer.GetChannelForSDKKey(syncer.PubSubDefaultChan, sdkKey))
if err != nil {
return nil, err
}

dataChan := make(chan syncer.Event)

Expand All @@ -244,19 +239,12 @@ func RedisNotificationReceiver(conf config.SyncConfig) NotificationReceiverFunc
for {
select {
case <-ctx.Done():
client.Close()
pubsub.Close()
close(dataChan)
logger.Debug().Msg("context canceled, redis notification receiver is closed")
return
default:
mikechu-optimizely marked this conversation as resolved.
Show resolved Hide resolved
msg, err := pubsub.ReceiveMessage(ctx)
if err != nil {
logger.Err(err).Msg("failed to receive message from redis")
continue
}

case msg := <-eventCh:
var event syncer.Event
if err := json.Unmarshal([]byte(msg.Payload), &event); err != nil {
if err := json.Unmarshal([]byte(msg), &event); err != nil {
logger.Err(err).Msg("failed to unmarshal redis message")
continue
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/handlers/notification_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ func (suite *NotificationTestSuite) TestTrackAndProjectConfigWithSynchronization
"database": 0,
},
},
Notification: config.NotificationConfig{
Notification: config.FeatureSyncConfig{
Enable: true,
Default: "redis",
},
Expand Down Expand Up @@ -370,7 +370,7 @@ func TestRedisNotificationReceiver(t *testing.T) {
"database": 0,
},
},
Notification: config.NotificationConfig{
Notification: config.FeatureSyncConfig{
Enable: true,
Default: "redis",
},
Expand Down Expand Up @@ -407,7 +407,7 @@ func TestRedisNotificationReceiver(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := RedisNotificationReceiver(tt.args.conf)
got := SyncedNotificationReceiver(tt.args.conf)
if reflect.TypeOf(got) != reflect.TypeOf(tt.want) {
t.Errorf("RedisNotificationReceiver() = %v, want %v", got, tt.want)
}
Expand Down
60 changes: 54 additions & 6 deletions pkg/handlers/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,25 @@
package handlers

import (
"context"
"crypto/hmac"
"crypto/sha1"
"crypto/subtle"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"net/http"
"strconv"

"github.com/optimizely/agent/config"

"github.com/go-chi/render"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"

"github.com/optimizely/agent/pkg/optimizely"
"github.com/optimizely/agent/pkg/syncer"
)

const signatureHeader = "X-Hub-Signature"
Expand All @@ -56,15 +60,19 @@ type OptlyMessage struct {

// OptlyWebhookHandler handles incoming messages from Optimizely
type OptlyWebhookHandler struct {
optlyCache optimizely.Cache
ProjectMap map[int64]config.WebhookProject
optlyCache optimizely.Cache
ProjectMap map[int64]config.WebhookProject
configSyncer syncer.Syncer
syncEnabled bool
}

// NewWebhookHandler returns a new instance of OptlyWebhookHandler
func NewWebhookHandler(optlyCache optimizely.Cache, projectMap map[int64]config.WebhookProject) *OptlyWebhookHandler {
func NewWebhookHandler(optlyCache optimizely.Cache, projectMap map[int64]config.WebhookProject, configSyncer syncer.Syncer) *OptlyWebhookHandler {
return &OptlyWebhookHandler{
optlyCache: optlyCache,
ProjectMap: projectMap,
optlyCache: optlyCache,
ProjectMap: projectMap,
syncEnabled: configSyncer != nil,
configSyncer: configSyncer,
}
}

Expand Down Expand Up @@ -140,7 +148,47 @@ func (h *OptlyWebhookHandler) HandleWebhook(w http.ResponseWriter, r *http.Reque

// Iterate through all SDK keys and update config
for _, sdkKey := range webhookConfig.SDKKeys {
h.optlyCache.UpdateConfigs(sdkKey)
if h.syncEnabled {
if err := h.configSyncer.Sync(r.Context(), syncer.GetDatafileSyncChannel(), sdkKey); err != nil {
errMsg := fmt.Sprintf("datafile synced failed. reason: %s", err.Error())
log.Error().Msg(errMsg)
h.optlyCache.UpdateConfigs(sdkKey)
pulak-opti marked this conversation as resolved.
Show resolved Hide resolved
}
} else {
h.optlyCache.UpdateConfigs(sdkKey)
}
}
w.WriteHeader(http.StatusNoContent)
}

func (h *OptlyWebhookHandler) StartSyncer(ctx context.Context) error {
logger, ok := ctx.Value(LoggerKey).(*zerolog.Logger)
if !ok {
logger = &log.Logger
}

if !h.syncEnabled {
logger.Debug().Msg("datafile syncer is not enabled")
return nil
}

dataCh, err := h.configSyncer.Subscribe(ctx, syncer.GetDatafileSyncChannel())
if err != nil {
return err
}

go func() {
for {
select {
case <-ctx.Done():
logger.Debug().Msg("context canceled, syncer is stopped")
return
case key := <-dataCh:
h.optlyCache.UpdateConfigs(key)
logger.Info().Msgf("datafile synced successfully for sdkKey: %s", key)
}
mikechu-optimizely marked this conversation as resolved.
Show resolved Hide resolved
}
}()
logger.Debug().Msg("datafile syncer is started")
return nil
}
81 changes: 78 additions & 3 deletions pkg/handlers/webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package handlers

import (
"bytes"
"context"
"encoding/json"
"net/http"
"net/http/httptest"
Expand Down Expand Up @@ -46,6 +47,28 @@ func NewCache() *TestCache {
}
}

type TestDFSyncer struct {
syncCalled bool
subscribeCalled bool
}

func NewTestDFSyncer() *TestDFSyncer {
return &TestDFSyncer{
syncCalled: false,
subscribeCalled: false,
}
}

func (t *TestDFSyncer) Sync(_ context.Context, _ string, _ string) error {
t.syncCalled = true
return nil
}

func (t *TestDFSyncer) Subscribe(_ context.Context, _ string) (chan string, error) {
t.subscribeCalled = true
return make(chan string), nil
}

// GetClient returns a default OptlyClient for testing
func (tc *TestCache) GetClient(sdkKey string) (*optimizely.OptlyClient, error) {
return &optimizely.OptlyClient{
Expand Down Expand Up @@ -111,7 +134,7 @@ func TestHandleWebhookValidMessageInvalidSignature(t *testing.T) {
Secret: "I am secret",
},
}
optlyHandler := NewWebhookHandler(nil, testWebhookConfigs)
optlyHandler := NewWebhookHandler(nil, testWebhookConfigs, nil)
mikechu-optimizely marked this conversation as resolved.
Show resolved Hide resolved
webhookMsg := OptlyMessage{
ProjectID: 42,
Timestamp: 42424242,
Expand Down Expand Up @@ -146,7 +169,7 @@ func TestHandleWebhookSkippedCheckInvalidSignature(t *testing.T) {
SkipSignatureCheck: true,
},
}
optlyHandler := NewWebhookHandler(testCache, testWebhookConfigs)
optlyHandler := NewWebhookHandler(testCache, testWebhookConfigs, nil)
webhookMsg := OptlyMessage{
ProjectID: 42,
Timestamp: 42424242,
Expand Down Expand Up @@ -181,7 +204,7 @@ func TestHandleWebhookValidMessage(t *testing.T) {
Secret: "I am secret",
},
}
optlyHandler := NewWebhookHandler(testCache, testWebhookConfigs)
optlyHandler := NewWebhookHandler(testCache, testWebhookConfigs, nil)
webhookMsg := OptlyMessage{
ProjectID: 42,
Timestamp: 42424242,
Expand All @@ -208,3 +231,55 @@ func TestHandleWebhookValidMessage(t *testing.T) {
assert.Equal(t, http.StatusNoContent, rec.Code)
assert.Equal(t, true, testCache.updateConfigsCalled)
}

func TestHandleWebhookWithDatafileSyncer(t *testing.T) {
var testWebhookConfigs = map[int64]config.WebhookProject{
42: {
SDKKeys: []string{"myDatafile"},
Secret: "I am secret",
},
}
syncer := NewTestDFSyncer()

optlyHandler := NewWebhookHandler(nil, testWebhookConfigs, syncer)
webhookMsg := OptlyMessage{
ProjectID: 42,
Timestamp: 42424242,
Event: "project.datafile_updated",
Data: DatafileUpdateData{
Revision: 101,
OriginURL: "origin.optimizely.com/datafiles/myDatafile",
CDNUrl: "cdn.optimizely.com/datafiles/myDatafile",
Environment: "Production",
},
}

validWebhookMessage, _ := json.Marshal(webhookMsg)

req := httptest.NewRequest("POST", "/webhooks/optimizely", bytes.NewBuffer(validWebhookMessage))

// This sha1 has been computed from the Optimizely application
req.Header.Set(signatureHeader, "sha1=e0199de63fb7192634f52136d4ceb7dc6f191da3")

rec := httptest.NewRecorder()
handler := http.HandlerFunc(optlyHandler.HandleWebhook)
handler.ServeHTTP(rec, req)

assert.Equal(t, http.StatusNoContent, rec.Code)
assert.Equal(t, true, syncer.syncCalled)
}

func TestWebhookStartSyncer(t *testing.T) {
var testWebhookConfigs = map[int64]config.WebhookProject{
42: {
mikechu-optimizely marked this conversation as resolved.
Show resolved Hide resolved
SDKKeys: []string{"myDatafile"},
Secret: "I am secret",
},
}
syncer := NewTestDFSyncer()

optlyHandler := NewWebhookHandler(nil, testWebhookConfigs, syncer)
err := optlyHandler.StartSyncer(context.Background())
assert.NoError(t, err)
assert.Equal(t, true, syncer.subscribeCalled)
}
Loading