From 29762d638e3f43e4b674d5ae1aea08e2149c93ab Mon Sep 17 00:00:00 2001 From: Igor Kulkov Date: Wed, 4 May 2022 16:23:52 +0300 Subject: [PATCH 1/3] implement oauth secret reload --- configurator/backend/go.mod | 1 + configurator/backend/go.sum | 2 + go.mod | 2 + go.sum | 2 + server/drivers/base/config.go | 7 +- server/drivers/base/types.go | 18 ++-- server/drivers/google_ads/config.go | 11 +-- server/go.mod | 3 +- server/go.sum | 2 + server/handlers/airbyte.go | 24 +++--- server/handlers/sources.go | 16 ++-- server/main.go | 12 +++ server/oauth/config_service.go | 74 ++++++++++++++++ server/oauth/oauth.go | 24 ++++++ server/oauth/oauth_fields.go | 48 ----------- server/oauth/reloadable_service.go | 129 ++++++++++++++++++++++++++++ 16 files changed, 287 insertions(+), 88 deletions(-) create mode 100644 server/oauth/config_service.go create mode 100644 server/oauth/oauth.go delete mode 100644 server/oauth/oauth_fields.go create mode 100644 server/oauth/reloadable_service.go diff --git a/configurator/backend/go.mod b/configurator/backend/go.mod index 467793eb9..f98fe3d0f 100644 --- a/configurator/backend/go.mod +++ b/configurator/backend/go.mod @@ -71,6 +71,7 @@ require ( github.com/go-playground/universal-translator v0.18.0 // indirect github.com/go-redsync/redsync/v4 v4.3.0 // indirect github.com/go-sql-driver/mysql v1.6.0 // indirect + github.com/gobeam/stringy v0.0.5 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/protobuf v1.5.2 // indirect diff --git a/configurator/backend/go.sum b/configurator/backend/go.sum index 21d03505e..c42e1b081 100644 --- a/configurator/backend/go.sum +++ b/configurator/backend/go.sum @@ -459,6 +459,8 @@ github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LB github.com/go-sql-driver/mysql v1.6.0 h1:BCTh4TKNUYmOmMUcQ3IipzF5prigylS7XXjEkfCHuOE= github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= +github.com/gobeam/stringy v0.0.5 h1:TvxQGSAqr/qF0SBVxa8Q67WWIo7bCWS0bM101WOd52g= +github.com/gobeam/stringy v0.0.5/go.mod h1:W3620X9dJHf2FSZF5fRnWekHcHQjwmCz8ZQ2d1qloqE= github.com/gobwas/glob v0.2.3/go.mod h1:d3Ez4x06l9bZtSvzIay5+Yzi0fmZzPgnTbPcKjJAkT8= github.com/goccy/go-json v0.9.6/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= github.com/godbus/dbus v0.0.0-20151105175453-c7fdd8b5cd55/go.mod h1:/YcGZj5zSblfDWMMoOzV4fas9FZnQYTkDnsGvmh2Grw= diff --git a/go.mod b/go.mod index 4d698288b..f559d5a4b 100644 --- a/go.mod +++ b/go.mod @@ -1,3 +1,5 @@ module github.com/jitsucom/jitsu go 1.17 + +require github.com/gobeam/stringy v0.0.5 // indirect diff --git a/go.sum b/go.sum index e69de29bb..bde0caef9 100644 --- a/go.sum +++ b/go.sum @@ -0,0 +1,2 @@ +github.com/gobeam/stringy v0.0.5 h1:TvxQGSAqr/qF0SBVxa8Q67WWIo7bCWS0bM101WOd52g= +github.com/gobeam/stringy v0.0.5/go.mod h1:W3620X9dJHf2FSZF5fRnWekHcHQjwmCz8ZQ2d1qloqE= diff --git a/server/drivers/base/config.go b/server/drivers/base/config.go index eafeb1436..074def774 100644 --- a/server/drivers/base/config.go +++ b/server/drivers/base/config.go @@ -8,7 +8,6 @@ import ( "github.com/jitsucom/jitsu/server/logging" "github.com/jitsucom/jitsu/server/oauth" "github.com/jitsucom/jitsu/server/timestamp" - "github.com/spf13/viper" ) const ConfigSignatureSuffix = "_JITSU_config" @@ -100,14 +99,14 @@ func StreamIdentifier(namespace, name string) string { } func FillPreconfiguredOauth(sourceType string, config interface{}) { - oathFields, ok := oauth.Fields[sourceType] + oathFields, ok := oauth.Get(sourceType) if ok { sourceConnectorConfig, ok := config.(map[string]interface{}) if ok { for k, v := range oathFields { cf, ok := sourceConnectorConfig[k] - if (!ok || cf == "") && viper.GetString(v) != "" { - sourceConnectorConfig[k] = viper.GetString(v) + if (!ok || cf == "") && v.Provided { + sourceConnectorConfig[k] = v.Value } } } diff --git a/server/drivers/base/types.go b/server/drivers/base/types.go index d345f8eb7..f7bcbe29d 100644 --- a/server/drivers/base/types.go +++ b/server/drivers/base/types.go @@ -5,14 +5,14 @@ import ( "encoding/json" "errors" "fmt" - "github.com/jitsucom/jitsu/server/logging" - "github.com/jitsucom/jitsu/server/oauth" - "github.com/jitsucom/jitsu/server/schema" - "github.com/spf13/viper" "io" "io/ioutil" "strings" "time" + + "github.com/jitsucom/jitsu/server/logging" + "github.com/jitsucom/jitsu/server/oauth" + "github.com/jitsucom/jitsu/server/schema" ) const ( @@ -78,13 +78,13 @@ func (gac *GoogleAuthConfig) FillPreconfiguredOauth(sourceType string) { if gac == nil || gac.Type != GoogleOAuthAuthorizationType { return } - oathFields, ok := oauth.Fields[sourceType] + oauthConfig, ok := oauth.Get(sourceType) if ok { - if clientId, ok := oathFields["client_id"]; gac.ClientID == "" && ok { - gac.ClientID = viper.GetString(clientId) + if clientId, ok := oauthConfig["client_id"]; gac.ClientID == "" && ok { + gac.ClientID = clientId.Value } - if clientSecret, ok := oathFields["client_secret"]; gac.ClientSecret == "" && ok { - gac.ClientSecret = viper.GetString(clientSecret) + if clientSecret, ok := oauthConfig["client_secret"]; gac.ClientSecret == "" && ok { + gac.ClientSecret = clientSecret.Value } } } diff --git a/server/drivers/google_ads/config.go b/server/drivers/google_ads/config.go index 6e5992efd..883f3db53 100644 --- a/server/drivers/google_ads/config.go +++ b/server/drivers/google_ads/config.go @@ -2,12 +2,13 @@ package google_ads import ( "errors" + "strconv" + "time" + "github.com/jitsucom/jitsu/server/adapters" "github.com/jitsucom/jitsu/server/drivers/base" "github.com/jitsucom/jitsu/server/oauth" "github.com/spf13/viper" - "strconv" - "time" ) //googleAdsHTTPConfiguration contains default amplitude HTTP timeouts/retry/delays,etc @@ -32,10 +33,10 @@ type GoogleAdsCollectionConfig struct { } func (gac *GoogleAdsConfig) FillPreconfiguredOauth(sourceType string) { - oathFields, ok := oauth.Fields[sourceType] + oauthConfig, ok := oauth.Get(sourceType) if ok { - if developerToken, ok := oathFields["developer_token"]; gac.DeveloperToken == "" && ok { - gac.DeveloperToken = viper.GetString(developerToken) + if developerToken, ok := oauthConfig["developer_token"]; gac.DeveloperToken == "" && ok { + gac.DeveloperToken = developerToken.Value } //backward compatibility with previous versions config if gac.DeveloperToken == "" && viper.GetString("google-ads.developer-token") != "" { diff --git a/server/go.mod b/server/go.mod index 0e542fb36..3b4261a28 100644 --- a/server/go.mod +++ b/server/go.mod @@ -18,7 +18,7 @@ require ( github.com/go-redsync/redsync/v4 v4.3.0 github.com/go-sql-driver/mysql v1.6.0 github.com/gomodule/redigo v1.8.2 - github.com/google/go-cmp v0.5.6 // indirect + github.com/google/go-cmp v0.5.6 github.com/google/go-github/v32 v32.1.0 github.com/google/go-querystring v1.1.0 // indirect github.com/google/martian v2.1.0+incompatible @@ -69,6 +69,7 @@ require ( ) require ( + github.com/gobeam/stringy v0.0.5 github.com/hashicorp/golang-lru v0.5.4 github.com/joomcode/errorx v1.1.0 ) diff --git a/server/go.sum b/server/go.sum index 02afd990e..dafca7525 100644 --- a/server/go.sum +++ b/server/go.sum @@ -369,6 +369,8 @@ github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LB github.com/go-sql-driver/mysql v1.6.0 h1:BCTh4TKNUYmOmMUcQ3IipzF5prigylS7XXjEkfCHuOE= github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= +github.com/gobeam/stringy v0.0.5 h1:TvxQGSAqr/qF0SBVxa8Q67WWIo7bCWS0bM101WOd52g= +github.com/gobeam/stringy v0.0.5/go.mod h1:W3620X9dJHf2FSZF5fRnWekHcHQjwmCz8ZQ2d1qloqE= github.com/gobwas/glob v0.2.3/go.mod h1:d3Ez4x06l9bZtSvzIay5+Yzi0fmZzPgnTbPcKjJAkT8= github.com/godbus/dbus v0.0.0-20151105175453-c7fdd8b5cd55/go.mod h1:/YcGZj5zSblfDWMMoOzV4fas9FZnQYTkDnsGvmh2Grw= github.com/godbus/dbus v0.0.0-20180201030542-885f9cc04c9c/go.mod h1:/YcGZj5zSblfDWMMoOzV4fas9FZnQYTkDnsGvmh2Grw= diff --git a/server/handlers/airbyte.go b/server/handlers/airbyte.go index 3ebbd5412..05a62554c 100644 --- a/server/handlers/airbyte.go +++ b/server/handlers/airbyte.go @@ -5,6 +5,11 @@ import ( "encoding/json" "errors" "fmt" + "io/ioutil" + "net/http" + "sort" + "time" + "github.com/gin-gonic/gin" "github.com/jitsucom/jitsu/server/airbyte" "github.com/jitsucom/jitsu/server/drivers/base" @@ -13,12 +18,6 @@ import ( "github.com/jitsucom/jitsu/server/oauth" "github.com/jitsucom/jitsu/server/runner" "github.com/jitsucom/jitsu/server/utils" - "github.com/spf13/viper" - "io/ioutil" - "net/http" - "sort" - "strings" - "time" ) const ( @@ -123,7 +122,7 @@ func (ah *AirbyteHandler) SpecHandler(c *gin.Context) { } func enrichOathFields(dockerImage string, spec interface{}) { - oathFields, ok := oauth.Fields[dockerImage] + oauthConfig, ok := oauth.Get(dockerImage) if ok { props, err := utils.ExtractObject(spec, "connectionSpecification", "properties") if err != nil { @@ -143,7 +142,7 @@ func enrichOathFields(dockerImage string, spec interface{}) { return } provided := make(map[string]bool) - for k, v := range oathFields { + for k, v := range oauthConfig { pr, ok := propsMap[k] if !ok { continue @@ -153,11 +152,10 @@ func enrichOathFields(dockerImage string, spec interface{}) { logging.Errorf("cannot convert property %s to map[string]interface{} from: %T", k, pr) continue } - prov := viper.GetString(v) != "" - prMap["env_name"] = strings.ReplaceAll(strings.ToUpper(v), ".", "_") - prMap["yaml_path"] = v - prMap["provided"] = prov - provided[k] = prov + prMap["env_name"] = v.EnvName + prMap["yaml_path"] = v.YAMLPath + prMap["provided"] = v.Provided + provided[k] = v.Provided } newReq := make([]interface{}, 0, len(required)-len(provided)) for _, v := range required { diff --git a/server/handlers/sources.go b/server/handlers/sources.go index 0acfc2aae..cfe7e6a93 100644 --- a/server/handlers/sources.go +++ b/server/handlers/sources.go @@ -2,6 +2,9 @@ package handlers import ( "fmt" + "net/http" + "strings" + "github.com/gin-gonic/gin" "github.com/hashicorp/go-multierror" "github.com/jitsucom/jitsu/server/adapters" @@ -15,9 +18,6 @@ import ( "github.com/jitsucom/jitsu/server/runner" "github.com/jitsucom/jitsu/server/schema" "github.com/jitsucom/jitsu/server/sources" - "github.com/spf13/viper" - "net/http" - "strings" ) //ClearCacheRequest is a dto for ClearCache endpoint @@ -172,13 +172,13 @@ func (sh *SourcesHandler) TestSourcesHandler(c *gin.Context) { func (sh *SourcesHandler) OauthFields(c *gin.Context) { res := make(map[string]interface{}) sourceType := c.Param("sourceType") - oathFields, ok := oauth.Fields[sourceType] + oauthConfig, ok := oauth.Get(sourceType) if ok { - for k, v := range oathFields { + for k, v := range oauthConfig { fieldObject := make(map[string]interface{}) - fieldObject["env_name"] = strings.ReplaceAll(strings.ToUpper(v), ".", "_") - fieldObject["yaml_path"] = v - fieldObject["provided"] = viper.GetString(v) != "" + fieldObject["env_name"] = v.EnvName + fieldObject["yaml_path"] = v.YAMLPath + fieldObject["provided"] = v.Provided res[k] = fieldObject } } diff --git a/server/main.go b/server/main.go index 46cff46f4..be0007f8c 100644 --- a/server/main.go +++ b/server/main.go @@ -18,6 +18,7 @@ import ( "syscall" "time" + "github.com/jitsucom/jitsu/server/oauth" "github.com/jitsucom/jitsu/server/script/node" "github.com/jitsucom/jitsu/server/templates" @@ -220,6 +221,17 @@ func main() { notifications.SystemErrorf("Panic:\n%s\n%s", value, string(debug.Stack())) } + if oauthSecretsURL := viper.GetString("oauth_secrets"); oauthSecretsURL != "" { + oauthService, err := oauth.NewReloadableService(oauthSecretsURL) + if err != nil { + logging.Errorf("Failed to load OAuth secrets from %s, falling back to YAML configuration: %v", + oauthSecretsURL, err) + } else { + appconfig.Instance.ScheduleClosing(oauthService) + oauth.Set(oauthService) + } + } + clusterID := metaStorage.GetOrCreateClusterID(uuid.New()) systemInfo := runtime.GetInfo() telemetry.EnrichSystemInfo(clusterID, systemInfo) diff --git a/server/oauth/config_service.go b/server/oauth/config_service.go new file mode 100644 index 000000000..0974527d0 --- /dev/null +++ b/server/oauth/config_service.go @@ -0,0 +1,74 @@ +package oauth + +import ( + "strings" + + "github.com/spf13/viper" +) + +var Fields = map[string]map[string]string{ + "source-github": { + "client_id": "github.client_id", + "client_secret": "github.client_secret", + }, + "source-bing-ads": { + "client_id": "bing_ads.client_id", + "client_secret": "bing_ads.client_secret", + "developer_token": "bing_ads.developer_token", + }, + "tap-adroll": { + "client_id": "adroll.client_id", + "client_secret": "adroll.client_secret", + }, + "google_analytics": { + "client_id": "google_analytics.client_id", + "client_secret": "google_analytics.client_secret", + }, + "google_ads": { + "client_id": "google_ads.client_id", + "client_secret": "google_ads.client_secret", + "developer_token": "google_ads.developer_token", + }, + "google_play": { + "client_id": "google_play.client_id", + "client_secret": "google_play.client_secret", + }, + "tap-google-sheets": { + "client_id": "google_sheets.client_id", + "client_secret": "google_sheets.client_secret", + }, + "firebase": { + "client_id": "firebase.client_id", + "client_secret": "firebase.client_secret", + }, + "tap-helpscout": { + "client_id": "helpscout.client_id", + "client_secret": "helpscout.client_secret", + }, + "tap-xero": { + "client_id": "xero.client_id", + "client_secret": "xero.client_secret", + }, +} + +type ConfigService struct{} + +func (s *ConfigService) Get(id string) (Secrets, bool) { + fields, ok := Fields[id] + if !ok { + return nil, false + } + + secret := make(Secrets) + for key, configKey := range fields { + value := viper.GetString(configKey) + secret[key] = SecretValue{ + Value: value, + EnvName: strings.ReplaceAll(strings.ToUpper(value), ".", "_"), + YAMLPath: configKey, + Provided: value != "", + } + } + + return secret, true +} diff --git a/server/oauth/oauth.go b/server/oauth/oauth.go new file mode 100644 index 000000000..bc662fdfe --- /dev/null +++ b/server/oauth/oauth.go @@ -0,0 +1,24 @@ +package oauth + +type SecretValue struct { + Value string + EnvName string + YAMLPath string + Provided bool +} + +type Secrets = map[string]SecretValue + +type Interface interface { + Get(id string) (Secrets, bool) +} + +var instance Interface = new(ConfigService) + +func Set(service Interface) { + instance = service +} + +func Get(id string) (Secrets, bool) { + return instance.Get(id) +} diff --git a/server/oauth/oauth_fields.go b/server/oauth/oauth_fields.go deleted file mode 100644 index fba7810a8..000000000 --- a/server/oauth/oauth_fields.go +++ /dev/null @@ -1,48 +0,0 @@ -package oauth - -var ( - Fields = map[string]map[string]string{ - "source-github": { - "client_id": "github.client_id", - "client_secret": "github.client_secret", - }, - "source-bing-ads": { - "client_id": "bing_ads.client_id", - "client_secret": "bing_ads.client_secret", - "developer_token": "bing_ads.developer_token", - }, - "tap-adroll": { - "client_id": "adroll.client_id", - "client_secret": "adroll.client_secret", - }, - "google_analytics": { - "client_id": "google_analytics.client_id", - "client_secret": "google_analytics.client_secret", - }, - "google_ads": { - "client_id": "google_ads.client_id", - "client_secret": "google_ads.client_secret", - "developer_token": "google_ads.developer_token", - }, - "google_play": { - "client_id": "google_play.client_id", - "client_secret": "google_play.client_secret", - }, - "tap-google-sheets": { - "client_id": "google_sheets.client_id", - "client_secret": "google_sheets.client_secret", - }, - "firebase": { - "client_id": "firebase.client_id", - "client_secret": "firebase.client_secret", - }, - "tap-helpscout": { - "client_id": "helpscout.client_id", - "client_secret": "helpscout.client_secret", - }, - "tap-xero": { - "client_id": "xero.client_id", - "client_secret": "xero.client_secret", - }, - } -) diff --git a/server/oauth/reloadable_service.go b/server/oauth/reloadable_service.go new file mode 100644 index 000000000..7057b1321 --- /dev/null +++ b/server/oauth/reloadable_service.go @@ -0,0 +1,129 @@ +package oauth + +import ( + "context" + "net/http" + "net/url" + "sync" + "time" + + "github.com/carlmjohnson/requests" + "github.com/gobeam/stringy" + "github.com/jitsucom/jitsu/server/logging" + "github.com/jitsucom/jitsu/server/safego" + "github.com/pkg/errors" +) + +type Credentials struct { + ID string `json:"id"` + Driver string `json:"driver"` + Secrets map[string]string `json:"secrets"` +} + +type ReloadableService struct { + url string + token string + secrets map[string]Secrets + mu sync.RWMutex + cancel func() + work sync.WaitGroup +} + +func NewReloadableService(rawURL string) (*ReloadableService, error) { + url, err := url.Parse(rawURL) + if err != nil { + return nil, errors.Wrapf(err, "parse oauth secrets url %s", rawURL) + } + + var token string + if user := url.User; user != nil { + token, _ = user.Password() + } + + ctx, cancel := context.WithCancel(context.Background()) + s := &ReloadableService{ + url: rawURL, + token: token, + secrets: make(map[string]Secrets), + cancel: cancel, + } + + if err := s.refreshSecrets(ctx); err != nil { + cancel() + return nil, errors.Wrap(err, "refresh oauth secrets") + } + + s.work.Add(1) + safego.Run(func() { + ticker := time.NewTicker(time.Minute) + defer func() { + ticker.Stop() + cancel() + s.work.Done() + }() + + for { + select { + case <-ticker.C: + if err := s.refreshSecrets(ctx); err != nil { + logging.Errorf("Failed to refresh OAuth secrets: %s", err) + continue + } + + logging.Debugf("Refreshed OAuth secrets successfully") + + case <-ctx.Done(): + return + } + } + }) + + return s, nil +} + +func (s *ReloadableService) refreshSecrets(ctx context.Context) error { + var resp []Credentials + if err := requests.URL(s.url). + Bearer(s.token). + CheckStatus(http.StatusOK). + ToJSON(&resp). + Fetch(ctx); err != nil { + return err + } + + newSecrets := make(map[string]Secrets) + for _, cred := range resp { + if _, ok := newSecrets[cred.ID]; ok { + return errors.Errorf("duplicate oauth provider ID %s", cred.ID) + } + + secret := make(Secrets) + for key, value := range cred.Secrets { + secret[stringy.New(key).SnakeCase().Get()] = SecretValue{ + Value: value, + Provided: true, + } + } + + newSecrets[cred.ID] = secret + } + + s.mu.Lock() + defer s.mu.Unlock() + s.secrets = newSecrets + + return nil +} + +func (s *ReloadableService) Get(id string) (Secrets, bool) { + s.mu.RLock() + defer s.mu.RUnlock() + secret, ok := s.secrets[id] + return secret, ok +} + +func (s *ReloadableService) Close() error { + s.cancel() + s.work.Wait() + return nil +} From 8080581191f3c5cc4638c215d8ef624c4b7cc144 Mon Sep 17 00:00:00 2001 From: Igor Kulkov Date: Wed, 4 May 2022 16:30:01 +0300 Subject: [PATCH 2/3] fix --- server/oauth/config_service.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/oauth/config_service.go b/server/oauth/config_service.go index 0974527d0..2cb44f1c8 100644 --- a/server/oauth/config_service.go +++ b/server/oauth/config_service.go @@ -64,7 +64,7 @@ func (s *ConfigService) Get(id string) (Secrets, bool) { value := viper.GetString(configKey) secret[key] = SecretValue{ Value: value, - EnvName: strings.ReplaceAll(strings.ToUpper(value), ".", "_"), + EnvName: strings.ReplaceAll(strings.ToUpper(configKey), ".", "_"), YAMLPath: configKey, Provided: value != "", } From 85bf19f3b75bff4079ed1d6555a6baeb9cd84cd9 Mon Sep 17 00:00:00 2001 From: Igor Kulkov Date: Wed, 4 May 2022 17:46:41 +0300 Subject: [PATCH 3/3] fix snake-case conversion --- server/oauth/reloadable_service.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/oauth/reloadable_service.go b/server/oauth/reloadable_service.go index 7057b1321..77e20f6b8 100644 --- a/server/oauth/reloadable_service.go +++ b/server/oauth/reloadable_service.go @@ -99,7 +99,7 @@ func (s *ReloadableService) refreshSecrets(ctx context.Context) error { secret := make(Secrets) for key, value := range cred.Secrets { - secret[stringy.New(key).SnakeCase().Get()] = SecretValue{ + secret[stringy.New(key).SnakeCase().ToLower()] = SecretValue{ Value: value, Provided: true, }