diff --git a/app/apphandlers/embeddedAppHandler.go b/app/apphandlers/embeddedAppHandler.go index 420ddce3a5c..4e46f48beaa 100644 --- a/app/apphandlers/embeddedAppHandler.go +++ b/app/apphandlers/embeddedAppHandler.go @@ -127,8 +127,8 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options) return err } - transformerFeaturesService := transformer.NewFeaturesService(ctx, transformer.FeaturesServiceConfig{ - PollInterval: config.GetDuration("Transformer.pollInterval", 1, time.Second), + transformerFeaturesService := transformer.NewFeaturesService(ctx, config, transformer.FeaturesServiceConfig{ + PollInterval: config.GetDuration("Transformer.pollInterval", 10, time.Second), TransformerURL: config.GetString("DEST_TRANSFORM_URL", "http://localhost:9090"), FeaturesRetryMaxAttempts: 10, }) @@ -242,7 +242,7 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options) defer func() { for _, enricher := range enrichers { - enricher.Close() + _ = enricher.Close() } }() diff --git a/app/apphandlers/gatewayAppHandler.go b/app/apphandlers/gatewayAppHandler.go index 7a7eb2e39ac..aab6d64b5b4 100644 --- a/app/apphandlers/gatewayAppHandler.go +++ b/app/apphandlers/gatewayAppHandler.go @@ -121,8 +121,8 @@ func (a *gatewayApp) StartRudderCore(ctx context.Context, options *app.Options) if err != nil { return err } - transformerFeaturesService := transformer.NewFeaturesService(ctx, transformer.FeaturesServiceConfig{ - PollInterval: config.GetDuration("Transformer.pollInterval", 1, time.Second), + transformerFeaturesService := transformer.NewFeaturesService(ctx, config, transformer.FeaturesServiceConfig{ + PollInterval: config.GetDuration("Transformer.pollInterval", 10, time.Second), TransformerURL: config.GetString("DEST_TRANSFORM_URL", "http://localhost:9090"), FeaturesRetryMaxAttempts: 10, }) diff --git a/app/apphandlers/processorAppHandler.go b/app/apphandlers/processorAppHandler.go index af6adb2d1a2..e1e4c2ea5db 100644 --- a/app/apphandlers/processorAppHandler.go +++ b/app/apphandlers/processorAppHandler.go @@ -89,6 +89,7 @@ func (a *processorApp) Setup() error { } func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options) error { + config := config.Default if !a.setupDone { return fmt.Errorf("processor service cannot start, database is not setup") } @@ -105,7 +106,7 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options reporting := a.app.Features().Reporting.Setup(ctx, backendconfig.DefaultBackendConfig) defer reporting.Stop() - syncer := reporting.DatabaseSyncer(types.SyncerConfig{ConnInfo: misc.GetConnectionString(config.Default, "reporting")}) + syncer := reporting.DatabaseSyncer(types.SyncerConfig{ConnInfo: misc.GetConnectionString(config, "reporting")}) g.Go(misc.WithBugsnag(func() error { syncer() return nil @@ -136,8 +137,8 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options return err } - transformerFeaturesService := transformer.NewFeaturesService(ctx, transformer.FeaturesServiceConfig{ - PollInterval: config.GetDuration("Transformer.pollInterval", 1, time.Second), + transformerFeaturesService := transformer.NewFeaturesService(ctx, config, transformer.FeaturesServiceConfig{ + PollInterval: config.GetDuration("Transformer.pollInterval", 10, time.Second), TransformerURL: config.GetString("DEST_TRANSFORM_URL", "http://localhost:9090"), FeaturesRetryMaxAttempts: 10, }) @@ -210,14 +211,14 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options var schemaForwarder schema_forwarder.Forwarder if config.GetBool("EventSchemas2.enabled", false) { - client, err := pulsar.NewClient(config.Default) + client, err := pulsar.NewClient(config) if err != nil { return err } defer client.Close() - schemaForwarder = schema_forwarder.NewForwarder(terminalErrFn, schemaDB, &client, backendconfig.DefaultBackendConfig, logger.NewLogger().Child("jobs_forwarder"), config.Default, stats.Default) + schemaForwarder = schema_forwarder.NewForwarder(terminalErrFn, schemaDB, &client, backendconfig.DefaultBackendConfig, logger.NewLogger().Child("jobs_forwarder"), config, stats.Default) } else { - schemaForwarder = schema_forwarder.NewAbortingForwarder(terminalErrFn, schemaDB, logger.NewLogger().Child("jobs_forwarder"), config.Default, stats.Default) + schemaForwarder = schema_forwarder.NewAbortingForwarder(terminalErrFn, schemaDB, logger.NewLogger().Child("jobs_forwarder"), config, stats.Default) } modeProvider, err := resolveModeProvider(a.log, deploymentType) @@ -227,18 +228,18 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options adaptiveLimit := payload.SetupAdaptiveLimiter(ctx, g) - enrichers, err := setupPipelineEnrichers(config.Default, a.log, stats.Default) + enrichers, err := setupPipelineEnrichers(config, a.log, stats.Default) if err != nil { return fmt.Errorf("setting up pipeline enrichers: %w", err) } defer func() { for _, enricher := range enrichers { - enricher.Close() + _ = enricher.Close() } }() - drainConfigManager, err := drain_config.NewDrainConfigManager(config.Default, a.log.Child("drain-config")) + drainConfigManager, err := drain_config.NewDrainConfigManager(config, a.log.Child("drain-config")) if err != nil { return fmt.Errorf("drain config manager setup: %v", err) } @@ -270,7 +271,7 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options enrichers, proc.WithAdaptiveLimit(adaptiveLimit), ) - throttlerFactory, err := throttler.NewFactory(config.Default, stats.Default) + throttlerFactory, err := throttler.NewFactory(config, stats.Default) if err != nil { return fmt.Errorf("failed to create throttler factory: %w", err) } @@ -314,7 +315,7 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options Archiver: archiver.New( archivalDB, fileUploaderProvider, - config.Default, + config, stats.Default, archiver.WithAdaptiveLimit(adaptiveLimit), ), diff --git a/regulation-worker/cmd/main.go b/regulation-worker/cmd/main.go index 6ef023cfbeb..cd76900fc87 100644 --- a/regulation-worker/cmd/main.go +++ b/regulation-worker/cmd/main.go @@ -54,9 +54,10 @@ func main() { } func Run(ctx context.Context) error { + config := config.Default config.Set("Diagnostics.enableDiagnostics", false) - stats.Default = stats.NewStats(config.Default, logger.Default, svcMetric.Instance, + stats.Default = stats.NewStats(config, logger.Default, svcMetric.Instance, stats.WithServiceName("regulation-worker"), ) if err := stats.Default.Start(ctx, rruntime.GoRoutineFactory); err != nil { @@ -92,7 +93,7 @@ func Run(ctx context.Context) error { // setting up oauth OAuth := oauth.NewOAuthErrorHandler(backendconfig.DefaultBackendConfig, oauth.WithRudderFlow(oauth.RudderFlow_Delete)) - apiManagerHttpClient := createHTTPClient(config.Default, httpTimeout, oauthV2Enabled) + apiManagerHttpClient := createHTTPClient(config, httpTimeout, oauthV2Enabled) svc := service.JobSvc{ API: &client.JobAPI{ @@ -113,8 +114,8 @@ func Run(ctx context.Context) error { OAuth: OAuth, IsOAuthV2Enabled: oauthV2Enabled, MaxOAuthRefreshRetryAttempts: config.GetInt("RegulationWorker.oauth.maxRefreshRetryAttempts", 1), - TransformerFeaturesService: transformer.NewFeaturesService(ctx, transformer.FeaturesServiceConfig{ - PollInterval: config.GetDuration("Transformer.pollInterval", 1, time.Second), + TransformerFeaturesService: transformer.NewFeaturesService(ctx, config, transformer.FeaturesServiceConfig{ + PollInterval: config.GetDuration("Transformer.pollInterval", 10, time.Second), TransformerURL: config.GetString("DEST_TRANSFORM_URL", "http://localhost:9090"), FeaturesRetryMaxAttempts: 10, }), diff --git a/services/transformer/features.go b/services/transformer/features.go index ef5f7b9d4a1..34a3f604233 100644 --- a/services/transformer/features.go +++ b/services/transformer/features.go @@ -5,8 +5,10 @@ package transformer import ( "context" "encoding/json" + "net/http" "time" + "github.com/rudderlabs/rudder-go-kit/config" "github.com/rudderlabs/rudder-go-kit/logger" "github.com/rudderlabs/rudder-server/rruntime" ) @@ -38,12 +40,21 @@ var defaultTransformerFeatures = `{ "regulations": ["AM"], }` -func NewFeaturesService(ctx context.Context, config FeaturesServiceConfig) FeaturesService { +func NewFeaturesService(ctx context.Context, config *config.Config, featConfig FeaturesServiceConfig) FeaturesService { handler := &featuresService{ features: json.RawMessage(defaultTransformerFeatures), logger: logger.NewLogger().Child("transformer-features"), waitChan: make(chan struct{}), - config: config, + options: featConfig, + client: &http.Client{ + Transport: &http.Transport{ + DisableKeepAlives: config.GetBool("Transformer.Client.disableKeepAlives", true), + MaxConnsPerHost: config.GetInt("Transformer.Client.maxHTTPConnections", 100), + MaxIdleConnsPerHost: config.GetInt("Transformer.Client.maxHTTPIdleConnections", 10), + IdleConnTimeout: config.GetDuration("Transformer.Client.maxIdleConnDuration", 30, time.Second), + }, + Timeout: config.GetDuration("HttpClient.processor.timeout", 30, time.Second), + }, } rruntime.Go(func() { handler.syncTransformerFeatureJson(ctx) }) diff --git a/services/transformer/features_impl.go b/services/transformer/features_impl.go index 45a08fa623f..759bd03dc20 100644 --- a/services/transformer/features_impl.go +++ b/services/transformer/features_impl.go @@ -11,7 +11,6 @@ import ( "github.com/samber/lo" "github.com/tidwall/gjson" - "github.com/rudderlabs/rudder-go-kit/config" "github.com/rudderlabs/rudder-go-kit/logger" "github.com/rudderlabs/rudder-server/utils/httputil" ) @@ -19,8 +18,9 @@ import ( type featuresService struct { logger logger.Logger waitChan chan struct{} - config FeaturesServiceConfig + options FeaturesServiceConfig features json.RawMessage + client *http.Client } func (t *featuresService) SourceTransformerVersion() string { @@ -59,10 +59,10 @@ func (t *featuresService) Wait() chan struct{} { func (t *featuresService) syncTransformerFeatureJson(ctx context.Context) { var initDone bool - t.logger.Infof("Fetching transformer features from %s", t.config.TransformerURL) + t.logger.Infof("Fetching transformer features from %s", t.options.TransformerURL) for { var downloaded bool - for i := 0; i < t.config.FeaturesRetryMaxAttempts; i++ { + for i := 0; i < t.options.FeaturesRetryMaxAttempts; i++ { if ctx.Err() != nil { return @@ -70,11 +70,11 @@ func (t *featuresService) syncTransformerFeatureJson(ctx context.Context) { retry := t.makeFeaturesFetchCall() if retry { - t.logger.Infof("Fetched transformer features from %s (retry: %v)", t.config.TransformerURL, retry) + t.logger.Infof("Fetched transformer features from %s (retry: %v)", t.options.TransformerURL, retry) select { case <-ctx.Done(): return - case <-time.After(200 * time.Millisecond): + case <-time.After(2 * time.Millisecond): continue } } @@ -90,21 +90,19 @@ func (t *featuresService) syncTransformerFeatureJson(ctx context.Context) { select { case <-ctx.Done(): return - case <-time.After(t.config.PollInterval): + case <-time.After(t.options.PollInterval): } } } func (t *featuresService) makeFeaturesFetchCall() bool { - url := t.config.TransformerURL + "/features" + url := t.options.TransformerURL + "/features" req, err := http.NewRequest("GET", url, bytes.NewReader([]byte{})) if err != nil { t.logger.Error("error creating request - ", err) return true } - tr := &http.Transport{} - client := &http.Client{Transport: tr, Timeout: config.GetDuration("HttpClient.processor.timeout", 30, time.Second)} - res, err := client.Do(req) + res, err := t.client.Do(req) if err != nil { t.logger.Error("error sending request - ", err) return true diff --git a/services/transformer/features_impl_test.go b/services/transformer/features_impl_test.go index 53ee69ebeda..bed86ef6e15 100644 --- a/services/transformer/features_impl_test.go +++ b/services/transformer/features_impl_test.go @@ -10,6 +10,8 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + "github.com/rudderlabs/rudder-go-kit/config" + "github.com/rudderlabs/rudder-go-kit/logger" ) @@ -19,7 +21,7 @@ var _ = Describe("Transformer features", func() { handler := &featuresService{ logger: logger.NewLogger(), waitChan: make(chan struct{}), - config: FeaturesServiceConfig{ + options: FeaturesServiceConfig{ PollInterval: time.Duration(1), FeaturesRetryMaxAttempts: 1, }, @@ -40,7 +42,7 @@ var _ = Describe("Transformer features", func() { features: json.RawMessage(defaultTransformerFeatures), logger: logger.NewLogger(), waitChan: make(chan struct{}), - config: FeaturesServiceConfig{ + options: FeaturesServiceConfig{ PollInterval: time.Duration(1), FeaturesRetryMaxAttempts: 1, }, @@ -54,7 +56,7 @@ var _ = Describe("Transformer features", func() { features: json.RawMessage(defaultTransformerFeatures), logger: logger.NewLogger(), waitChan: make(chan struct{}), - config: FeaturesServiceConfig{ + options: FeaturesServiceConfig{ PollInterval: time.Duration(1), FeaturesRetryMaxAttempts: 1, }, @@ -68,7 +70,7 @@ var _ = Describe("Transformer features", func() { features: json.RawMessage(defaultTransformerFeatures), logger: logger.NewLogger(), waitChan: make(chan struct{}), - config: FeaturesServiceConfig{ + options: FeaturesServiceConfig{ PollInterval: time.Duration(1), FeaturesRetryMaxAttempts: 1, }, @@ -87,7 +89,7 @@ var _ = Describe("Transformer features", func() { http.Error(w, "not found error", http.StatusNotFound) })) - handler := NewFeaturesService(context.TODO(), FeaturesServiceConfig{ + handler := NewFeaturesService(context.TODO(), config.Default, FeaturesServiceConfig{ PollInterval: time.Duration(1), TransformerURL: transformerServer.URL, FeaturesRetryMaxAttempts: 1, @@ -116,7 +118,7 @@ var _ = Describe("Transformer features", func() { _, _ = w.Write([]byte(mockTransformerResp)) })) - handler := NewFeaturesService(context.TODO(), FeaturesServiceConfig{ + handler := NewFeaturesService(context.TODO(), config.Default, FeaturesServiceConfig{ PollInterval: time.Duration(1), TransformerURL: transformerServer.URL, FeaturesRetryMaxAttempts: 1,