From b5f1b40786819987592425c5c9aff43cf7d1ece1 Mon Sep 17 00:00:00 2001 From: Jeroen Bobbeldijk Date: Wed, 13 Oct 2021 09:35:36 +0200 Subject: [PATCH] Use Keda context in some cases Signed-off-by: Jeroen Bobbeldijk --- pkg/scalers/redis_scaler.go | 20 +++++++++----------- pkg/scalers/redis_scaler_test.go | 3 ++- pkg/scalers/redis_streams_scaler.go | 12 ++++++------ pkg/scalers/redis_streams_scaler_test.go | 3 ++- 4 files changed, 19 insertions(+), 19 deletions(-) diff --git a/pkg/scalers/redis_scaler.go b/pkg/scalers/redis_scaler.go index 37a5a529afb..afbfd5d6499 100644 --- a/pkg/scalers/redis_scaler.go +++ b/pkg/scalers/redis_scaler.go @@ -24,14 +24,12 @@ const ( defaultEnableTLS = false ) -var ctx = context.Background() - type redisAddressParser func(metadata, resolvedEnv, authParams map[string]string) (redisConnectionInfo, error) type redisScaler struct { metadata *redisMetadata closeFn func() error - getListLengthFn func() (int64, error) + getListLengthFn func(ctx context.Context) (int64, error) } type redisConnectionInfo struct { @@ -105,7 +103,7 @@ func createClusteredRedisScaler(meta *redisMetadata, script string) (Scaler, err return nil } - listLengthFn := func() (int64, error) { + listLengthFn := func(ctx context.Context) (int64, error) { cmd := client.Eval(ctx, script, []string{meta.listName}) if cmd.Err() != nil { return -1, cmd.Err() @@ -135,7 +133,7 @@ func createSentinelRedisScaler(meta *redisMetadata, script string) (Scaler, erro return nil } - listLengthFn := func() (int64, error) { + listLengthFn := func(ctx context.Context) (int64, error) { cmd := client.Eval(ctx, script, []string{meta.listName}) if cmd.Err() != nil { return -1, cmd.Err() @@ -165,7 +163,7 @@ func createRedisScaler(meta *redisMetadata, script string) (Scaler, error) { return nil } - listLengthFn := func() (int64, error) { + listLengthFn := func(ctx context.Context) (int64, error) { cmd := client.Eval(ctx, script, []string{meta.listName}) if cmd.Err() != nil { return -1, cmd.Err() @@ -219,7 +217,7 @@ func parseRedisMetadata(config *ScalerConfig, parserFn redisAddressParser) (*red // IsActive checks if there is any element in the Redis list func (s *redisScaler) IsActive(ctx context.Context) (bool, error) { - length, err := s.getListLengthFn() + length, err := s.getListLengthFn(ctx) if err != nil { redisLog.Error(err, "error") @@ -253,7 +251,7 @@ func (s *redisScaler) GetMetricSpecForScaling() []v2beta2.MetricSpec { // GetMetrics connects to Redis and finds the length of the list func (s *redisScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) { - listLen, err := s.getListLengthFn() + listLen, err := s.getListLengthFn(ctx) if err != nil { redisLog.Error(err, "error getting list length") @@ -482,7 +480,7 @@ func getRedisClusterClient(info redisConnectionInfo) (*redis.ClusterClient, erro // confirm if connected c := redis.NewClusterClient(options) - err := c.Ping(ctx).Err() + err := c.Ping(context.Background()).Err() if err != nil { return nil, err } @@ -507,7 +505,7 @@ func getRedisSentinelClient(info redisConnectionInfo, dbIndex int) (*redis.Clien // confirm if connected c := redis.NewFailoverClient(options) - err := c.Ping(ctx).Err() + err := c.Ping(context.Background()).Err() if err != nil { return nil, err } @@ -529,7 +527,7 @@ func getRedisClient(info redisConnectionInfo, dbIndex int) (*redis.Client, error // confirm if connected c := redis.NewClient(options) - err := c.Ping(ctx).Err() + err := c.Ping(context.Background()).Err() if err != nil { return nil, err } diff --git a/pkg/scalers/redis_scaler_test.go b/pkg/scalers/redis_scaler_test.go index 6b298702ea8..27aecb1d1c7 100644 --- a/pkg/scalers/redis_scaler_test.go +++ b/pkg/scalers/redis_scaler_test.go @@ -1,6 +1,7 @@ package scalers import ( + "context" "errors" "testing" @@ -79,7 +80,7 @@ func TestRedisGetMetricSpecForScaling(t *testing.T) { t.Fatal("Could not parse metadata:", err) } closeFn := func() error { return nil } - lengthFn := func() (int64, error) { return -1, nil } + lengthFn := func(ctx context.Context) (int64, error) { return -1, nil } mockRedisScaler := redisScaler{ meta, closeFn, diff --git a/pkg/scalers/redis_streams_scaler.go b/pkg/scalers/redis_streams_scaler.go index 5bea2aa0487..d74db5a10d5 100644 --- a/pkg/scalers/redis_streams_scaler.go +++ b/pkg/scalers/redis_streams_scaler.go @@ -33,7 +33,7 @@ const ( type redisStreamsScaler struct { metadata *redisStreamsMetadata closeFn func() error - getPendingEntriesCountFn func() (int64, error) + getPendingEntriesCountFn func(ctx context.Context) (int64, error) } type redisStreamsMetadata struct { @@ -82,7 +82,7 @@ func createClusteredRedisStreamsScaler(meta *redisStreamsMetadata) (Scaler, erro return nil } - pendingEntriesCountFn := func() (int64, error) { + pendingEntriesCountFn := func(ctx context.Context) (int64, error) { pendingEntries, err := client.XPending(ctx, meta.streamName, meta.consumerGroupName).Result() if err != nil { return -1, err @@ -111,7 +111,7 @@ func createSentinelRedisStreamsScaler(meta *redisStreamsMetadata) (Scaler, error return nil } - pendingEntriesCountFn := func() (int64, error) { + pendingEntriesCountFn := func(ctx context.Context) (int64, error) { pendingEntries, err := client.XPending(ctx, meta.streamName, meta.consumerGroupName).Result() if err != nil { return -1, err @@ -140,7 +140,7 @@ func createRedisStreamsScaler(meta *redisStreamsMetadata) (Scaler, error) { return nil } - pendingEntriesCountFn := func() (int64, error) { + pendingEntriesCountFn := func(ctx context.Context) (int64, error) { pendingEntries, err := client.XPending(ctx, meta.streamName, meta.consumerGroupName).Result() if err != nil { return -1, err @@ -201,7 +201,7 @@ func parseRedisStreamsMetadata(config *ScalerConfig, parseFn redisAddressParser) // IsActive checks if there are pending entries in the 'Pending Entries List' for consumer group of a stream func (s *redisStreamsScaler) IsActive(ctx context.Context) (bool, error) { - count, err := s.getPendingEntriesCountFn() + count, err := s.getPendingEntriesCountFn(ctx) if err != nil { redisStreamsLog.Error(err, "error") @@ -233,7 +233,7 @@ func (s *redisStreamsScaler) GetMetricSpecForScaling() []v2beta2.MetricSpec { // GetMetrics fetches the number of pending entries for a consumer group in a stream func (s *redisStreamsScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) { - pendingEntriesCount, err := s.getPendingEntriesCountFn() + pendingEntriesCount, err := s.getPendingEntriesCountFn(ctx) if err != nil { redisStreamsLog.Error(err, "error fetching pending entries count") diff --git a/pkg/scalers/redis_streams_scaler_test.go b/pkg/scalers/redis_streams_scaler_test.go index 603c2492fbd..704ff95a59d 100644 --- a/pkg/scalers/redis_streams_scaler_test.go +++ b/pkg/scalers/redis_streams_scaler_test.go @@ -1,6 +1,7 @@ package scalers import ( + "context" "errors" "strconv" "testing" @@ -141,7 +142,7 @@ func TestRedisStreamsGetMetricSpecForScaling(t *testing.T) { t.Fatal("Could not parse metadata:", err) } closeFn := func() error { return nil } - getPendingEntriesCountFn := func() (int64, error) { return -1, nil } + getPendingEntriesCountFn := func(ctx context.Context) (int64, error) { return -1, nil } mockRedisStreamsScaler := redisStreamsScaler{meta, closeFn, getPendingEntriesCountFn} metricSpec := mockRedisStreamsScaler.GetMetricSpecForScaling()