Skip to content

Commit

Permalink
feat: Allow scaling using redis stream length (#4277) (#4390)
Browse files Browse the repository at this point in the history
Co-authored-by: Jorge Turrado Ferrero <[email protected]>
Co-authored-by: jmadajczak <[email protected]>
  • Loading branch information
3 people authored Mar 28, 2023
1 parent 04e5742 commit 388bf37
Show file tree
Hide file tree
Showing 9 changed files with 984 additions and 97 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio
### New

- **CPU/Memory scaler**: Add support for scale to zero if there are multiple triggers([#4269](https://github.com/kedacore/keda/issues/4269))
- TODO ([#XXX](https://github.com/kedacore/keda/issue/XXX))
- **Redis Scalers**: Allow scaling using redis stream length ([#4277](https://github.com/kedacore/keda/issues/4277))
- **General:** Introduce new Solr Scaler ([#4234](https://github.com/kedacore/keda/issues/4234))

### Improvements
Expand Down
135 changes: 84 additions & 51 deletions pkg/scalers/redis_streams_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,21 @@ import (
kedautil "github.com/kedacore/keda/v2/pkg/util"
)

type scaleFactor int8

const (
xPendingFactor scaleFactor = iota + 1
xLengthFactor
)

const (
// defaults
defaultTargetPendingEntriesCount = 5
defaultDBIndex = 0
defaultDBIndex = 0
defaultTargetEntries = 5

// metadata names
pendingEntriesCountMetadata = "pendingEntriesCount"
streamLengthMetadata = "streamLength"
streamNameMetadata = "stream"
consumerGroupNameMetadata = "consumerGroup"
usernameMetadata = "username"
Expand All @@ -30,15 +38,17 @@ const (
)

type redisStreamsScaler struct {
metricType v2.MetricTargetType
metadata *redisStreamsMetadata
closeFn func() error
getPendingEntriesCountFn func(ctx context.Context) (int64, error)
logger logr.Logger
metricType v2.MetricTargetType
metadata *redisStreamsMetadata
closeFn func() error
getEntriesCountFn func(ctx context.Context) (int64, error)
logger logr.Logger
}

type redisStreamsMetadata struct {
scaleFactor scaleFactor
targetPendingEntriesCount int64
targetStreamLength int64
streamName string
consumerGroupName string
databaseIndex int
Expand Down Expand Up @@ -89,21 +99,15 @@ func createClusteredRedisStreamsScaler(ctx context.Context, meta *redisStreamsMe
return nil
}

pendingEntriesCountFn := func(ctx context.Context) (int64, error) {
pendingEntries, err := client.XPending(ctx, meta.streamName, meta.consumerGroupName).Result()
if err != nil {
return -1, err
}
return pendingEntries.Count, nil
}
entriesCountFn, err := createEntriesCountFn(client, meta)

return &redisStreamsScaler{
metricType: metricType,
metadata: meta,
closeFn: closeFn,
getPendingEntriesCountFn: pendingEntriesCountFn,
logger: logger,
}, nil
metricType: metricType,
metadata: meta,
closeFn: closeFn,
getEntriesCountFn: entriesCountFn,
logger: logger,
}, err
}

func createSentinelRedisStreamsScaler(ctx context.Context, meta *redisStreamsMetadata, metricType v2.MetricTargetType, logger logr.Logger) (Scaler, error) {
Expand Down Expand Up @@ -133,27 +137,42 @@ func createScaler(client *redis.Client, meta *redisStreamsMetadata, metricType v
return nil
}

pendingEntriesCountFn := func(ctx context.Context) (int64, error) {
pendingEntries, err := client.XPending(ctx, meta.streamName, meta.consumerGroupName).Result()
if err != nil {
return -1, err
}
return pendingEntries.Count, nil
}
entriesCountFn, err := createEntriesCountFn(client, meta)

return &redisStreamsScaler{
metricType: metricType,
metadata: meta,
closeFn: closeFn,
getPendingEntriesCountFn: pendingEntriesCountFn,
logger: logger,
}, nil
metricType: metricType,
metadata: meta,
closeFn: closeFn,
getEntriesCountFn: entriesCountFn,
logger: logger,
}, err
}

var (
// ErrRedisMissingPendingEntriesCount is returned when "pendingEntriesCount" is missing.
ErrRedisMissingPendingEntriesCount = errors.New("missing pending entries count")
func createEntriesCountFn(client redis.Cmdable, meta *redisStreamsMetadata) (entriesCountFn func(ctx context.Context) (int64, error), err error) {
switch meta.scaleFactor {
case xPendingFactor:
entriesCountFn = func(ctx context.Context) (int64, error) {
pendingEntries, err := client.XPending(ctx, meta.streamName, meta.consumerGroupName).Result()
if err != nil {
return -1, err
}
return pendingEntries.Count, nil
}
case xLengthFactor:
entriesCountFn = func(ctx context.Context) (int64, error) {
entriesLength, err := client.XLen(ctx, meta.streamName).Result()
if err != nil {
return -1, err
}
return entriesLength, nil
}
default:
err = fmt.Errorf("unrecognized scale factor %v", meta.scaleFactor)
}
return
}

var (
// ErrRedisMissingStreamName is returned when "stream" is missing.
ErrRedisMissingStreamName = errors.New("missing redis stream name")
)
Expand Down Expand Up @@ -185,18 +204,6 @@ func parseRedisStreamsMetadata(config *ScalerConfig, parseFn redisAddressParser)
meta.connectionInfo.unsafeSsl = parsedVal
}

meta.targetPendingEntriesCount = defaultTargetPendingEntriesCount

if val, ok := config.TriggerMetadata[pendingEntriesCountMetadata]; ok {
pendingEntriesCount, err := strconv.ParseInt(val, 10, 64)
if err != nil {
return nil, fmt.Errorf("error parsing pending entries count: %w", err)
}
meta.targetPendingEntriesCount = pendingEntriesCount
} else {
return nil, ErrRedisMissingPendingEntriesCount
}

if val, ok := config.TriggerMetadata[streamNameMetadata]; ok {
meta.streamName = val
} else {
Expand All @@ -205,8 +212,25 @@ func parseRedisStreamsMetadata(config *ScalerConfig, parseFn redisAddressParser)

if val, ok := config.TriggerMetadata[consumerGroupNameMetadata]; ok {
meta.consumerGroupName = val
meta.scaleFactor = xPendingFactor
meta.targetPendingEntriesCount = defaultTargetEntries
if val, ok := config.TriggerMetadata[pendingEntriesCountMetadata]; ok {
pendingEntriesCount, err := strconv.ParseInt(val, 10, 64)
if err != nil {
return nil, fmt.Errorf("error parsing pending entries count: %w", err)
}
meta.targetPendingEntriesCount = pendingEntriesCount
}
} else {
return nil, fmt.Errorf("missing redis stream consumer group name")
meta.scaleFactor = xLengthFactor
meta.targetStreamLength = defaultTargetEntries
if val, ok := config.TriggerMetadata[streamLengthMetadata]; ok {
streamLength, err := strconv.ParseInt(val, 10, 64)
if err != nil {
return nil, fmt.Errorf("error parsing stream length: %w", err)
}
meta.targetStreamLength = streamLength
}
}

meta.databaseIndex = defaultDBIndex
Expand All @@ -228,19 +252,28 @@ func (s *redisStreamsScaler) Close(context.Context) error {

// GetMetricSpecForScaling returns the metric spec for the HPA
func (s *redisStreamsScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec {
var metricValue int64

switch s.metadata.scaleFactor {
case xPendingFactor:
metricValue = s.metadata.targetPendingEntriesCount
case xLengthFactor:
metricValue = s.metadata.targetStreamLength
}

externalMetric := &v2.ExternalMetricSource{
Metric: v2.MetricIdentifier{
Name: GenerateMetricNameWithIndex(s.metadata.scalerIndex, kedautil.NormalizeString(fmt.Sprintf("redis-streams-%s", s.metadata.streamName))),
},
Target: GetMetricTarget(s.metricType, s.metadata.targetPendingEntriesCount),
Target: GetMetricTarget(s.metricType, metricValue),
}
metricSpec := v2.MetricSpec{External: externalMetric, Type: externalMetricType}
return []v2.MetricSpec{metricSpec}
}

// GetMetricsAndActivity fetches the number of pending entries for a consumer group in a stream
func (s *redisStreamsScaler) GetMetricsAndActivity(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) {
pendingEntriesCount, err := s.getPendingEntriesCountFn(ctx)
pendingEntriesCount, err := s.getEntriesCountFn(ctx)

if err != nil {
s.logger.Error(err, "error fetching pending entries count")
Expand Down
Loading

0 comments on commit 388bf37

Please sign in to comment.