Skip to content

Commit

Permalink
chore: fix ever increasing idle connections
Browse files Browse the repository at this point in the history
  • Loading branch information
cisse21 committed Apr 19, 2024
1 parent 5c15c7e commit 92622ae
Show file tree
Hide file tree
Showing 7 changed files with 53 additions and 40 deletions.
6 changes: 3 additions & 3 deletions app/apphandlers/embeddedAppHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,8 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)
return err
}

transformerFeaturesService := transformer.NewFeaturesService(ctx, transformer.FeaturesServiceConfig{
PollInterval: config.GetDuration("Transformer.pollInterval", 1, time.Second),
transformerFeaturesService := transformer.NewFeaturesService(ctx, config, transformer.FeaturesServiceOptions{
PollInterval: config.GetDuration("Transformer.pollInterval", 10, time.Second),
TransformerURL: config.GetString("DEST_TRANSFORM_URL", "http://localhost:9090"),
FeaturesRetryMaxAttempts: 10,
})
Expand Down Expand Up @@ -242,7 +242,7 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)

defer func() {
for _, enricher := range enrichers {
enricher.Close()
_ = enricher.Close()
}
}()

Expand Down
4 changes: 2 additions & 2 deletions app/apphandlers/gatewayAppHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,8 @@ func (a *gatewayApp) StartRudderCore(ctx context.Context, options *app.Options)
if err != nil {
return err
}
transformerFeaturesService := transformer.NewFeaturesService(ctx, transformer.FeaturesServiceConfig{
PollInterval: config.GetDuration("Transformer.pollInterval", 1, time.Second),
transformerFeaturesService := transformer.NewFeaturesService(ctx, config, transformer.FeaturesServiceOptions{
PollInterval: config.GetDuration("Transformer.pollInterval", 10, time.Second),
TransformerURL: config.GetString("DEST_TRANSFORM_URL", "http://localhost:9090"),
FeaturesRetryMaxAttempts: 10,
})
Expand Down
23 changes: 12 additions & 11 deletions app/apphandlers/processorAppHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ func (a *processorApp) Setup() error {
}

func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options) error {
config := config.Default
if !a.setupDone {
return fmt.Errorf("processor service cannot start, database is not setup")
}
Expand All @@ -105,7 +106,7 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options

reporting := a.app.Features().Reporting.Setup(ctx, backendconfig.DefaultBackendConfig)
defer reporting.Stop()
syncer := reporting.DatabaseSyncer(types.SyncerConfig{ConnInfo: misc.GetConnectionString(config.Default, "reporting")})
syncer := reporting.DatabaseSyncer(types.SyncerConfig{ConnInfo: misc.GetConnectionString(config, "reporting")})
g.Go(misc.WithBugsnag(func() error {
syncer()
return nil
Expand Down Expand Up @@ -136,8 +137,8 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options
return err
}

transformerFeaturesService := transformer.NewFeaturesService(ctx, transformer.FeaturesServiceConfig{
PollInterval: config.GetDuration("Transformer.pollInterval", 1, time.Second),
transformerFeaturesService := transformer.NewFeaturesService(ctx, config, transformer.FeaturesServiceOptions{
PollInterval: config.GetDuration("Transformer.pollInterval", 10, time.Second),
TransformerURL: config.GetString("DEST_TRANSFORM_URL", "http://localhost:9090"),
FeaturesRetryMaxAttempts: 10,
})
Expand Down Expand Up @@ -210,14 +211,14 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options

var schemaForwarder schema_forwarder.Forwarder
if config.GetBool("EventSchemas2.enabled", false) {
client, err := pulsar.NewClient(config.Default)
client, err := pulsar.NewClient(config)
if err != nil {
return err
}
defer client.Close()
schemaForwarder = schema_forwarder.NewForwarder(terminalErrFn, schemaDB, &client, backendconfig.DefaultBackendConfig, logger.NewLogger().Child("jobs_forwarder"), config.Default, stats.Default)
schemaForwarder = schema_forwarder.NewForwarder(terminalErrFn, schemaDB, &client, backendconfig.DefaultBackendConfig, logger.NewLogger().Child("jobs_forwarder"), config, stats.Default)
} else {
schemaForwarder = schema_forwarder.NewAbortingForwarder(terminalErrFn, schemaDB, logger.NewLogger().Child("jobs_forwarder"), config.Default, stats.Default)
schemaForwarder = schema_forwarder.NewAbortingForwarder(terminalErrFn, schemaDB, logger.NewLogger().Child("jobs_forwarder"), config, stats.Default)
}

modeProvider, err := resolveModeProvider(a.log, deploymentType)
Expand All @@ -227,18 +228,18 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options

adaptiveLimit := payload.SetupAdaptiveLimiter(ctx, g)

enrichers, err := setupPipelineEnrichers(config.Default, a.log, stats.Default)
enrichers, err := setupPipelineEnrichers(config, a.log, stats.Default)
if err != nil {
return fmt.Errorf("setting up pipeline enrichers: %w", err)
}

defer func() {
for _, enricher := range enrichers {
enricher.Close()
_ = enricher.Close()
}
}()

drainConfigManager, err := drain_config.NewDrainConfigManager(config.Default, a.log.Child("drain-config"))
drainConfigManager, err := drain_config.NewDrainConfigManager(config, a.log.Child("drain-config"))
if err != nil {
return fmt.Errorf("drain config manager setup: %v", err)
}
Expand Down Expand Up @@ -270,7 +271,7 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options
enrichers,
proc.WithAdaptiveLimit(adaptiveLimit),
)
throttlerFactory, err := throttler.NewFactory(config.Default, stats.Default)
throttlerFactory, err := throttler.NewFactory(config, stats.Default)
if err != nil {
return fmt.Errorf("failed to create throttler factory: %w", err)
}
Expand Down Expand Up @@ -314,7 +315,7 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options
Archiver: archiver.New(
archivalDB,
fileUploaderProvider,
config.Default,
config,
stats.Default,
archiver.WithAdaptiveLimit(adaptiveLimit),
),
Expand Down
9 changes: 5 additions & 4 deletions regulation-worker/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,10 @@ func main() {
}

func Run(ctx context.Context) error {
config := config.Default
config.Set("Diagnostics.enableDiagnostics", false)

stats.Default = stats.NewStats(config.Default, logger.Default, svcMetric.Instance,
stats.Default = stats.NewStats(config, logger.Default, svcMetric.Instance,
stats.WithServiceName("regulation-worker"),
)
if err := stats.Default.Start(ctx, rruntime.GoRoutineFactory); err != nil {
Expand Down Expand Up @@ -92,7 +93,7 @@ func Run(ctx context.Context) error {
// setting up oauth
OAuth := oauth.NewOAuthErrorHandler(backendconfig.DefaultBackendConfig, oauth.WithRudderFlow(oauth.RudderFlow_Delete))

apiManagerHttpClient := createHTTPClient(config.Default, httpTimeout, oauthV2Enabled)
apiManagerHttpClient := createHTTPClient(config, httpTimeout, oauthV2Enabled)

svc := service.JobSvc{
API: &client.JobAPI{
Expand All @@ -113,8 +114,8 @@ func Run(ctx context.Context) error {
OAuth: OAuth,
IsOAuthV2Enabled: oauthV2Enabled,
MaxOAuthRefreshRetryAttempts: config.GetInt("RegulationWorker.oauth.maxRefreshRetryAttempts", 1),
TransformerFeaturesService: transformer.NewFeaturesService(ctx, transformer.FeaturesServiceConfig{
PollInterval: config.GetDuration("Transformer.pollInterval", 1, time.Second),
TransformerFeaturesService: transformer.NewFeaturesService(ctx, config, transformer.FeaturesServiceOptions{
PollInterval: config.GetDuration("Transformer.pollInterval", 10, time.Second),
TransformerURL: config.GetString("DEST_TRANSFORM_URL", "http://localhost:9090"),
FeaturesRetryMaxAttempts: 10,
}),
Expand Down
17 changes: 14 additions & 3 deletions services/transformer/features.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ package transformer
import (
"context"
"encoding/json"
"net/http"
"time"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-server/rruntime"
)
Expand All @@ -16,7 +18,7 @@ const (
V1 = "v1"
)

type FeaturesServiceConfig struct {
type FeaturesServiceOptions struct {
PollInterval time.Duration
TransformerURL string
FeaturesRetryMaxAttempts int
Expand All @@ -38,12 +40,21 @@ var defaultTransformerFeatures = `{
"regulations": ["AM"],
}`

func NewFeaturesService(ctx context.Context, config FeaturesServiceConfig) FeaturesService {
func NewFeaturesService(ctx context.Context, config *config.Config, featConfig FeaturesServiceOptions) FeaturesService {
handler := &featuresService{
features: json.RawMessage(defaultTransformerFeatures),
logger: logger.NewLogger().Child("transformer-features"),
waitChan: make(chan struct{}),
config: config,
options: featConfig,
client: &http.Client{
Transport: &http.Transport{
DisableKeepAlives: config.GetBool("Transformer.Client.disableKeepAlives", true),
MaxConnsPerHost: config.GetInt("Transformer.Client.maxHTTPConnections", 100),
MaxIdleConnsPerHost: config.GetInt("Transformer.Client.maxHTTPIdleConnections", 10),
IdleConnTimeout: config.GetDuration("Transformer.Client.maxIdleConnDuration", 30, time.Second),
},
Timeout: config.GetDuration("HttpClient.processor.timeout", 30, time.Second),
},
}

rruntime.Go(func() { handler.syncTransformerFeatureJson(ctx) })
Expand Down
20 changes: 9 additions & 11 deletions services/transformer/features_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,16 @@ import (
"github.com/samber/lo"
"github.com/tidwall/gjson"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-server/utils/httputil"
)

type featuresService struct {
logger logger.Logger
waitChan chan struct{}
config FeaturesServiceConfig
options FeaturesServiceOptions
features json.RawMessage
client *http.Client
}

func (t *featuresService) SourceTransformerVersion() string {
Expand Down Expand Up @@ -59,22 +59,22 @@ func (t *featuresService) Wait() chan struct{} {

func (t *featuresService) syncTransformerFeatureJson(ctx context.Context) {
var initDone bool
t.logger.Infof("Fetching transformer features from %s", t.config.TransformerURL)
t.logger.Infof("Fetching transformer features from %s", t.options.TransformerURL)
for {
var downloaded bool
for i := 0; i < t.config.FeaturesRetryMaxAttempts; i++ {
for i := 0; i < t.options.FeaturesRetryMaxAttempts; i++ {

if ctx.Err() != nil {
return
}

retry := t.makeFeaturesFetchCall()
if retry {
t.logger.Infof("Fetched transformer features from %s (retry: %v)", t.config.TransformerURL, retry)
t.logger.Infof("Fetched transformer features from %s (retry: %v)", t.options.TransformerURL, retry)
select {
case <-ctx.Done():
return
case <-time.After(200 * time.Millisecond):
case <-time.After(2 * time.Millisecond):
continue
}
}
Expand All @@ -90,21 +90,19 @@ func (t *featuresService) syncTransformerFeatureJson(ctx context.Context) {
select {
case <-ctx.Done():
return
case <-time.After(t.config.PollInterval):
case <-time.After(t.options.PollInterval):
}
}
}

func (t *featuresService) makeFeaturesFetchCall() bool {
url := t.config.TransformerURL + "/features"
url := t.options.TransformerURL + "/features"
req, err := http.NewRequest("GET", url, bytes.NewReader([]byte{}))
if err != nil {
t.logger.Error("error creating request - ", err)
return true
}
tr := &http.Transport{}
client := &http.Client{Transport: tr, Timeout: config.GetDuration("HttpClient.processor.timeout", 30, time.Second)}
res, err := client.Do(req)
res, err := t.client.Do(req)
if err != nil {
t.logger.Error("error sending request - ", err)
return true
Expand Down
14 changes: 8 additions & 6 deletions services/transformer/features_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"

"github.com/rudderlabs/rudder-go-kit/config"

"github.com/rudderlabs/rudder-go-kit/logger"
)

Expand All @@ -19,7 +21,7 @@ var _ = Describe("Transformer features", func() {
handler := &featuresService{
logger: logger.NewLogger(),
waitChan: make(chan struct{}),
config: FeaturesServiceConfig{
options: FeaturesServiceOptions{
PollInterval: time.Duration(1),
FeaturesRetryMaxAttempts: 1,
},
Expand All @@ -40,7 +42,7 @@ var _ = Describe("Transformer features", func() {
features: json.RawMessage(defaultTransformerFeatures),
logger: logger.NewLogger(),
waitChan: make(chan struct{}),
config: FeaturesServiceConfig{
options: FeaturesServiceOptions{
PollInterval: time.Duration(1),
FeaturesRetryMaxAttempts: 1,
},
Expand All @@ -54,7 +56,7 @@ var _ = Describe("Transformer features", func() {
features: json.RawMessage(defaultTransformerFeatures),
logger: logger.NewLogger(),
waitChan: make(chan struct{}),
config: FeaturesServiceConfig{
options: FeaturesServiceOptions{
PollInterval: time.Duration(1),
FeaturesRetryMaxAttempts: 1,
},
Expand All @@ -68,7 +70,7 @@ var _ = Describe("Transformer features", func() {
features: json.RawMessage(defaultTransformerFeatures),
logger: logger.NewLogger(),
waitChan: make(chan struct{}),
config: FeaturesServiceConfig{
options: FeaturesServiceOptions{
PollInterval: time.Duration(1),
FeaturesRetryMaxAttempts: 1,
},
Expand All @@ -87,7 +89,7 @@ var _ = Describe("Transformer features", func() {
http.Error(w, "not found error", http.StatusNotFound)
}))

handler := NewFeaturesService(context.TODO(), FeaturesServiceConfig{
handler := NewFeaturesService(context.TODO(), config.Default, FeaturesServiceOptions{
PollInterval: time.Duration(1),
TransformerURL: transformerServer.URL,
FeaturesRetryMaxAttempts: 1,
Expand Down Expand Up @@ -116,7 +118,7 @@ var _ = Describe("Transformer features", func() {
_, _ = w.Write([]byte(mockTransformerResp))
}))

handler := NewFeaturesService(context.TODO(), FeaturesServiceConfig{
handler := NewFeaturesService(context.TODO(), config.Default, FeaturesServiceOptions{
PollInterval: time.Duration(1),
TransformerURL: transformerServer.URL,
FeaturesRetryMaxAttempts: 1,
Expand Down

0 comments on commit 92622ae

Please sign in to comment.