From af82d4ba9ec997a9589194721dac2d0bfaa3c61f Mon Sep 17 00:00:00 2001 From: Jeroen Bobbeldijk Date: Mon, 11 Oct 2021 15:07:21 +0200 Subject: [PATCH 1/6] Improve Redis support Signed-off-by: Jeroen Bobbeldijk --- go.mod | 4 +- go.sum | 9 +- pkg/scalers/redis_scaler.go | 197 +++++++- pkg/scalers/redis_scaler_test.go | 524 ++++++++++++++++++++- pkg/scalers/redis_streams_scaler.go | 45 +- pkg/scalers/redis_streams_scaler_test.go | 576 ++++++++++++++++++++++- pkg/scaling/scale_handler.go | 14 +- 7 files changed, 1338 insertions(+), 31 deletions(-) diff --git a/go.mod b/go.mod index 8d7e63ca0c7..c32da004017 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,7 @@ require ( github.com/denisenkom/go-mssqldb v0.10.0 github.com/go-logr/logr v0.4.0 github.com/go-playground/assert/v2 v2.0.1 - github.com/go-redis/redis v6.15.9+incompatible + github.com/go-redis/redis/v8 v8.11.4 github.com/go-sql-driver/mysql v1.6.0 github.com/golang/mock v1.6.0 github.com/golang/protobuf v1.5.2 @@ -43,7 +43,7 @@ require ( google.golang.org/api v0.56.0 google.golang.org/genproto v0.0.0-20210831024726-fe130286e0e2 google.golang.org/grpc v1.40.0 - google.golang.org/protobuf v1.27.1 // indirect + google.golang.org/protobuf v1.27.1 k8s.io/api v0.22.1 k8s.io/apimachinery v0.22.1 k8s.io/apiserver v0.22.1 diff --git a/go.sum b/go.sum index dd88554dc81..9e33d22992e 100644 --- a/go.sum +++ b/go.sum @@ -184,8 +184,9 @@ github.com/certifi/gocertifi v0.0.0-20191021191039-0944d244cd40/go.mod h1:sGbDF6 github.com/certifi/gocertifi v0.0.0-20200922220541-2c3bb06c6054/go.mod h1:sGbDF6GwGcLpkNXPUTkMRoywsNa/ol15pxFe6ERfguA= github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= -github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE= +github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= @@ -243,6 +244,8 @@ github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZm github.com/dgryski/go-gk v0.0.0-20140819190930-201884a44051/go.mod h1:qm+vckxRlDt0aOla0RYJJVeqHZlWfOm2UIxHaqPB46E= github.com/dgryski/go-gk v0.0.0-20200319235926-a69029f61654/go.mod h1:qm+vckxRlDt0aOla0RYJJVeqHZlWfOm2UIxHaqPB46E= github.com/dgryski/go-lttb v0.0.0-20180810165845-318fcdf10a77/go.mod h1:Va5MyIzkU0rAM92tn3hb3Anb7oz7KcnixF49+2wOMe4= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no= github.com/dimchansky/utfbom v1.1.0/go.mod h1:rO41eb7gLfo8SF1jd9F8HplJm1Fewwi4mQvIirEdv+8= github.com/dimchansky/utfbom v1.1.1 h1:vV6w1AhK4VMnhBno/TPVCoK9U/LP0PkLCS9tbxHdi/U= @@ -350,8 +353,8 @@ github.com/go-playground/universal-translator v0.17.0 h1:icxd5fm+REJzpZx7ZfpaD87 github.com/go-playground/universal-translator v0.17.0/go.mod h1:UkSxE5sNxxRwHyU+Scu5vgOQjsIJAF8j9muTVoKLVtA= github.com/go-playground/validator/v10 v10.2.0 h1:KgJ0snyC2R9VXYN2rneOtQcw5aHQB1Vv0sFl1UcHBOY= github.com/go-playground/validator/v10 v10.2.0/go.mod h1:uOYAAleCW8F/7oMFd6aG0GOhaH6EGOAJShg8Id5JGkI= -github.com/go-redis/redis v6.15.9+incompatible h1:K0pv1D7EQUjfyoMql+r/jZqCLizCGKFlFgcHWWmHQjg= -github.com/go-redis/redis v6.15.9+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA= +github.com/go-redis/redis/v8 v8.11.4 h1:kHoYkfZP6+pe04aFTnhDH6GDROa5yJdHJVNxV3F46Tg= +github.com/go-redis/redis/v8 v8.11.4/go.mod h1:2Z2wHZXdQpCDXEGzqMockDpNyYvi2l4Pxt6RJr792+w= github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= github.com/go-sql-driver/mysql v1.6.0 h1:BCTh4TKNUYmOmMUcQ3IipzF5prigylS7XXjEkfCHuOE= github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= diff --git a/pkg/scalers/redis_scaler.go b/pkg/scalers/redis_scaler.go index 8ae551482bc..03092afa985 100644 --- a/pkg/scalers/redis_scaler.go +++ b/pkg/scalers/redis_scaler.go @@ -7,7 +7,7 @@ import ( "strconv" "strings" - "github.com/go-redis/redis" + "github.com/go-redis/redis/v8" v2beta2 "k8s.io/api/autoscaling/v2beta2" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -24,6 +24,8 @@ const ( defaultEnableTLS = false ) +var ctx = context.Background() + type redisAddressParser func(metadata, resolvedEnv, authParams map[string]string) (redisConnectionInfo, error) type redisScaler struct { @@ -33,11 +35,15 @@ type redisScaler struct { } type redisConnectionInfo struct { - addresses []string - password string - hosts []string - ports []string - enableTLS bool + addresses []string + username string + password string + sentinelUsername string + sentinelPassword string + sentinelMaster string + hosts []string + ports []string + enableTLS bool } type redisMetadata struct { @@ -50,7 +56,7 @@ type redisMetadata struct { var redisLog = logf.Log.WithName("redis_scaler") // NewRedisScaler creates a new redisScaler -func NewRedisScaler(isClustered bool, config *ScalerConfig) (Scaler, error) { +func NewRedisScaler(isClustered, isSentinel bool, config *ScalerConfig) (Scaler, error) { luaScript := ` local listName = KEYS[1] local listType = redis.call('type', listName).ok @@ -70,7 +76,14 @@ func NewRedisScaler(isClustered bool, config *ScalerConfig) (Scaler, error) { return nil, fmt.Errorf("error parsing redis metadata: %s", err) } return createClusteredRedisScaler(meta, luaScript) + } else if isSentinel { + meta, err := parseRedisMetadata(config, parseRedisSentinelAddress) + if err != nil { + return nil, fmt.Errorf("error parsing redis metadata: %s", err) + } + return createSentinelRedisScaler(meta, luaScript) } + meta, err := parseRedisMetadata(config, parseRedisAddress) if err != nil { return nil, fmt.Errorf("error parsing redis metadata: %s", err) @@ -93,7 +106,37 @@ func createClusteredRedisScaler(meta *redisMetadata, script string) (Scaler, err } listLengthFn := func() (int64, error) { - cmd := client.Eval(script, []string{meta.listName}) + cmd := client.Eval(ctx, script, []string{meta.listName}) + if cmd.Err() != nil { + return -1, cmd.Err() + } + + return cmd.Int64() + } + + return &redisScaler{ + metadata: meta, + closeFn: closeFn, + getListLengthFn: listLengthFn, + }, nil +} + +func createSentinelRedisScaler(meta *redisMetadata, script string) (Scaler, error) { + client, err := getRedisSentinelClient(meta.connectionInfo, meta.databaseIndex) + if err != nil { + return nil, fmt.Errorf("connection to redis sentinel failed: %s", err) + } + + closeFn := func() error { + if err := client.Close(); err != nil { + redisLog.Error(err, "error closing redis client") + return err + } + return nil + } + + listLengthFn := func() (int64, error) { + cmd := client.Eval(ctx, script, []string{meta.listName}) if cmd.Err() != nil { return -1, cmd.Err() } @@ -123,7 +166,7 @@ func createRedisScaler(meta *redisMetadata, script string) (Scaler, error) { } listLengthFn := func() (int64, error) { - cmd := client.Eval(script, []string{meta.listName}) + cmd := client.Eval(ctx, script, []string{meta.listName}) if cmd.Err() != nil { return -1, cmd.Err() } @@ -263,6 +306,14 @@ func parseRedisAddress(metadata, resolvedEnv, authParams map[string]string) (red return info, fmt.Errorf("no address or host given. address should be in the format of host:port or you should set the host/port values") } + if authParams["username"] != "" { + info.username = authParams["username"] + } else if metadata["username"] != "" { + info.username = metadata["username"] + } else if metadata["usernameFromEnv"] != "" { + info.username = resolvedEnv[metadata["usernameFromEnv"]] + } + if authParams["password"] != "" { info.password = authParams["password"] } else if metadata["passwordFromEnv"] != "" { @@ -323,12 +374,110 @@ func parseRedisClusterAddress(metadata, resolvedEnv, authParams map[string]strin return info, fmt.Errorf("no addresses or hosts given. address should be a comma separated list of host:port or set the host/port values") } + if authParams["username"] != "" { + info.username = authParams["username"] + } else if metadata["username"] != "" { + info.username = metadata["username"] + } else if metadata["usernameFromEnv"] != "" { + info.username = resolvedEnv[metadata["usernameFromEnv"]] + } + + if authParams["password"] != "" { + info.password = authParams["password"] + } else if metadata["passwordFromEnv"] != "" { + info.password = resolvedEnv[metadata["passwordFromEnv"]] + } + + info.enableTLS = defaultEnableTLS + if val, ok := metadata["enableTLS"]; ok { + tls, err := strconv.ParseBool(val) + if err != nil { + return info, fmt.Errorf("enableTLS parsing error %s", err.Error()) + } + info.enableTLS = tls + } + + return info, nil +} + +func parseRedisSentinelAddress(metadata, resolvedEnv, authParams map[string]string) (redisConnectionInfo, error) { + info := redisConnectionInfo{} + switch { + case authParams["addresses"] != "": + info.addresses = splitAndTrim(authParams["addresses"]) + case metadata["addresses"] != "": + info.addresses = splitAndTrim(metadata["addresses"]) + case metadata["addressesFromEnv"] != "": + info.addresses = splitAndTrim(resolvedEnv[metadata["addressesFromEnv"]]) + default: + switch { + case authParams["hosts"] != "": + info.hosts = splitAndTrim(authParams["hosts"]) + case metadata["hosts"] != "": + info.hosts = splitAndTrim(metadata["hosts"]) + case metadata["hostsFromEnv"] != "": + info.hosts = splitAndTrim(resolvedEnv[metadata["hostsFromEnv"]]) + } + + switch { + case authParams["ports"] != "": + info.ports = splitAndTrim(authParams["ports"]) + case metadata["ports"] != "": + info.ports = splitAndTrim(metadata["ports"]) + case metadata["portsFromEnv"] != "": + info.ports = splitAndTrim(resolvedEnv[metadata["portsFromEnv"]]) + } + + if len(info.hosts) != 0 && len(info.ports) != 0 { + if len(info.hosts) != len(info.ports) { + return info, fmt.Errorf("not enough hosts or ports given. number of hosts should be equal to the number of ports") + } + for i := range info.hosts { + info.addresses = append(info.addresses, fmt.Sprintf("%s:%s", info.hosts[i], info.ports[i])) + } + } + } + + if len(info.addresses) == 0 { + return info, fmt.Errorf("no addresses or hosts given. address should be a comma separated list of host:port or set the host/port values") + } + + if authParams["username"] != "" { + info.username = authParams["username"] + } else if metadata["username"] != "" { + info.username = metadata["username"] + } else if metadata["usernameFromEnv"] != "" { + info.username = resolvedEnv[metadata["usernameFromEnv"]] + } + if authParams["password"] != "" { info.password = authParams["password"] } else if metadata["passwordFromEnv"] != "" { info.password = resolvedEnv[metadata["passwordFromEnv"]] } + if authParams["sentinelUsername"] != "" { + info.sentinelUsername = authParams["sentinelUsername"] + } else if metadata["sentinelUsername"] != "" { + info.sentinelUsername = metadata["sentinelUsername"] + } else if metadata["sentinelUsernameFromEnv"] != "" { + info.sentinelUsername = resolvedEnv[metadata["sentinelUsernameFromEnv"]] + } + + if authParams["sentinelPassword"] != "" { + info.sentinelPassword = authParams["sentinelPassword"] + } else if metadata["sentinelPasswordFromEnv"] != "" { + info.sentinelPassword = resolvedEnv[metadata["sentinelPasswordFromEnv"]] + } + + if authParams["sentinelMaster"] != "" { + info.sentinelMaster = authParams["sentinelMaster"] + } else if metadata["sentinelMaster"] != "" { + info.sentinelMaster = metadata["sentinelMaster"] + } else if metadata["sentinelMasterFromEnv"] != "" { + info.sentinelMaster = resolvedEnv[metadata["sentinelMasterFromEnv"]] + } + info.enableTLS = defaultEnableTLS if val, ok := metadata["enableTLS"]; ok { tls, err := strconv.ParseBool(val) @@ -344,6 +493,7 @@ func parseRedisClusterAddress(metadata, resolvedEnv, authParams map[string]strin func getRedisClusterClient(info redisConnectionInfo) (*redis.ClusterClient, error) { options := &redis.ClusterOptions{ Addrs: info.addresses, + Username: info.username, Password: info.password, } if info.enableTLS { @@ -354,7 +504,31 @@ func getRedisClusterClient(info redisConnectionInfo) (*redis.ClusterClient, erro // confirm if connected c := redis.NewClusterClient(options) - err := c.Ping().Err() + err := c.Ping(ctx).Err() + if err != nil { + return nil, err + } + return c, nil +} + +func getRedisSentinelClient(info redisConnectionInfo, dbIndex int) (*redis.Client, error) { + options := &redis.FailoverOptions{ + Username: info.username, + Password: info.password, + DB: dbIndex, + SentinelAddrs: info.addresses, + SentinelPassword: info.sentinelPassword, + MasterName: info.sentinelMaster, + } + if info.enableTLS { + options.TLSConfig = &tls.Config{ + InsecureSkipVerify: info.enableTLS, + } + } + + // confirm if connected + c := redis.NewFailoverClient(options) + err := c.Ping(ctx).Err() if err != nil { return nil, err } @@ -364,6 +538,7 @@ func getRedisClusterClient(info redisConnectionInfo) (*redis.ClusterClient, erro func getRedisClient(info redisConnectionInfo, dbIndex int) (*redis.Client, error) { options := &redis.Options{ Addr: info.addresses[0], + Username: info.username, Password: info.password, DB: dbIndex, } @@ -375,7 +550,7 @@ func getRedisClient(info redisConnectionInfo, dbIndex int) (*redis.Client, error // confirm if connected c := redis.NewClient(options) - err := c.Ping().Err() + err := c.Ping(ctx).Err() if err != nil { return nil, err } diff --git a/pkg/scalers/redis_scaler_test.go b/pkg/scalers/redis_scaler_test.go index aacae87a03e..6b298702ea8 100644 --- a/pkg/scalers/redis_scaler_test.go +++ b/pkg/scalers/redis_scaler_test.go @@ -8,9 +8,13 @@ import ( ) var testRedisResolvedEnv = map[string]string{ - "REDIS_HOST": "none", - "REDIS_PORT": "6379", - "REDIS_PASSWORD": "none", + "REDIS_HOST": "none", + "REDIS_PORT": "6379", + "REDIS_USERNAME": "none", + "REDIS_PASSWORD": "none", + "REDIS_SENTINEL_MASTER": "none", + "REDIS_SENTINEL_USERNAME": "none", + "REDIS_SENTINEL_PASSWORD": "none", } type parseRedisMetadataTestData struct { @@ -171,6 +175,115 @@ func TestParseRedisClusterMetadata(t *testing.T) { }, wantErr: nil, }, + { + name: "username given in authParams", + metadata: map[string]string{ + "hosts": "a, b, c", + "ports": "1, 2, 3", + "listName": "mylist", + }, + authParams: map[string]string{ + "username": "username", + }, + wantMeta: &redisMetadata{ + targetListLength: 5, + listName: "mylist", + connectionInfo: redisConnectionInfo{ + addresses: []string{"a:1", "b:2", "c:3"}, + hosts: []string{"a", "b", "c"}, + ports: []string{"1", "2", "3"}, + username: "username", + }, + }, + wantErr: nil, + }, + { + name: "username given in metadata", + metadata: map[string]string{ + "hosts": "a, b, c", + "ports": "1, 2, 3", + "listName": "mylist", + "username": "username", + }, + authParams: map[string]string{}, + wantMeta: &redisMetadata{ + targetListLength: 5, + listName: "mylist", + connectionInfo: redisConnectionInfo{ + addresses: []string{"a:1", "b:2", "c:3"}, + hosts: []string{"a", "b", "c"}, + ports: []string{"1", "2", "3"}, + username: "username", + }, + }, + wantErr: nil, + }, + { + name: "username given in metadata from env", + metadata: map[string]string{ + "hosts": "a, b, c", + "ports": "1, 2, 3", + "listName": "mylist", + "usernameFromEnv": "REDIS_USERNAME", + }, + authParams: map[string]string{}, + resolvedEnv: testRedisResolvedEnv, + wantMeta: &redisMetadata{ + targetListLength: 5, + listName: "mylist", + connectionInfo: redisConnectionInfo{ + addresses: []string{"a:1", "b:2", "c:3"}, + hosts: []string{"a", "b", "c"}, + ports: []string{"1", "2", "3"}, + username: "none", + }, + }, + wantErr: nil, + }, + { + name: "password given in authParams", + metadata: map[string]string{ + "hosts": "a, b, c", + "ports": "1, 2, 3", + "listName": "mylist", + }, + authParams: map[string]string{ + "password": "password", + }, + wantMeta: &redisMetadata{ + targetListLength: 5, + listName: "mylist", + connectionInfo: redisConnectionInfo{ + addresses: []string{"a:1", "b:2", "c:3"}, + hosts: []string{"a", "b", "c"}, + ports: []string{"1", "2", "3"}, + password: "password", + }, + }, + wantErr: nil, + }, + { + name: "password given in metadata from env", + metadata: map[string]string{ + "hosts": "a, b, c", + "ports": "1, 2, 3", + "listName": "mylist", + "passwordFromEnv": "REDIS_PASSWORD", + }, + authParams: map[string]string{}, + resolvedEnv: testRedisResolvedEnv, + wantMeta: &redisMetadata{ + targetListLength: 5, + listName: "mylist", + connectionInfo: redisConnectionInfo{ + addresses: []string{"a:1", "b:2", "c:3"}, + hosts: []string{"a", "b", "c"}, + ports: []string{"1", "2", "3"}, + password: "none", + }, + }, + wantErr: nil, + }, } for _, testCase := range cases { @@ -191,3 +304,408 @@ func TestParseRedisClusterMetadata(t *testing.T) { }) } } + +func TestParseRedisSentinelMetadata(t *testing.T) { + cases := []struct { + name string + metadata map[string]string + resolvedEnv map[string]string + authParams map[string]string + wantMeta *redisMetadata + wantErr error + }{ + { + name: "empty metadata", + wantMeta: nil, + wantErr: errors.New("no addresses or hosts given. address should be a comma separated list of host:port or set the host/port values"), + }, + { + name: "unequal number of hosts/ports", + metadata: map[string]string{ + "hosts": "a, b, c", + "ports": "1, 2", + }, + wantMeta: nil, + wantErr: errors.New("not enough hosts or ports given. number of hosts should be equal to the number of ports"), + }, + { + name: "no list name", + metadata: map[string]string{ + "hosts": "a, b, c", + "ports": "1, 2, 3", + "listLength": "5", + }, + wantMeta: nil, + wantErr: errors.New("no list name given"), + }, + { + name: "invalid list length", + metadata: map[string]string{ + "hosts": "a, b, c", + "ports": "1, 2, 3", + "listName": "mylist", + "listLength": "invalid", + }, + wantMeta: nil, + wantErr: errors.New("list length parsing error"), + }, + { + name: "address is defined in auth params", + metadata: map[string]string{ + "listName": "mylist", + }, + authParams: map[string]string{ + "addresses": ":7001, :7002", + }, + wantMeta: &redisMetadata{ + targetListLength: 5, + listName: "mylist", + connectionInfo: redisConnectionInfo{ + addresses: []string{":7001", ":7002"}, + }, + }, + wantErr: nil, + }, + { + name: "hosts and ports given in auth params", + metadata: map[string]string{ + "listName": "mylist", + }, + authParams: map[string]string{ + "hosts": " a, b, c ", + "ports": "1, 2, 3", + }, + wantMeta: &redisMetadata{ + targetListLength: 5, + listName: "mylist", + connectionInfo: redisConnectionInfo{ + addresses: []string{"a:1", "b:2", "c:3"}, + hosts: []string{"a", "b", "c"}, + ports: []string{"1", "2", "3"}, + }, + }, + wantErr: nil, + }, + { + name: "hosts and ports given in auth params", + metadata: map[string]string{ + "listName": "mylist", + }, + authParams: map[string]string{ + "hosts": " a, b, c ", + "ports": "1, 2, 3", + }, + wantMeta: &redisMetadata{ + targetListLength: 5, + listName: "mylist", + connectionInfo: redisConnectionInfo{ + addresses: []string{"a:1", "b:2", "c:3"}, + hosts: []string{"a", "b", "c"}, + ports: []string{"1", "2", "3"}, + }, + }, + wantErr: nil, + }, + { + name: "username given in authParams", + metadata: map[string]string{ + "hosts": "a, b, c", + "ports": "1, 2, 3", + "listName": "mylist", + }, + authParams: map[string]string{ + "username": "username", + }, + wantMeta: &redisMetadata{ + targetListLength: 5, + listName: "mylist", + connectionInfo: redisConnectionInfo{ + addresses: []string{"a:1", "b:2", "c:3"}, + hosts: []string{"a", "b", "c"}, + ports: []string{"1", "2", "3"}, + username: "username", + }, + }, + wantErr: nil, + }, + { + name: "username given in metadata", + metadata: map[string]string{ + "hosts": "a, b, c", + "ports": "1, 2, 3", + "listName": "mylist", + "username": "username", + }, + authParams: map[string]string{}, + wantMeta: &redisMetadata{ + targetListLength: 5, + listName: "mylist", + connectionInfo: redisConnectionInfo{ + addresses: []string{"a:1", "b:2", "c:3"}, + hosts: []string{"a", "b", "c"}, + ports: []string{"1", "2", "3"}, + username: "username", + }, + }, + wantErr: nil, + }, + { + name: "username given in metadata from env", + metadata: map[string]string{ + "hosts": "a, b, c", + "ports": "1, 2, 3", + "listName": "mylist", + "usernameFromEnv": "REDIS_USERNAME", + }, + authParams: map[string]string{}, + resolvedEnv: testRedisResolvedEnv, + wantMeta: &redisMetadata{ + targetListLength: 5, + listName: "mylist", + connectionInfo: redisConnectionInfo{ + addresses: []string{"a:1", "b:2", "c:3"}, + hosts: []string{"a", "b", "c"}, + ports: []string{"1", "2", "3"}, + username: "none", + }, + }, + wantErr: nil, + }, + { + name: "password given in authParams", + metadata: map[string]string{ + "hosts": "a, b, c", + "ports": "1, 2, 3", + "listName": "mylist", + }, + authParams: map[string]string{ + "password": "password", + }, + wantMeta: &redisMetadata{ + targetListLength: 5, + listName: "mylist", + connectionInfo: redisConnectionInfo{ + addresses: []string{"a:1", "b:2", "c:3"}, + hosts: []string{"a", "b", "c"}, + ports: []string{"1", "2", "3"}, + password: "password", + }, + }, + wantErr: nil, + }, + { + name: "password given in metadata from env", + metadata: map[string]string{ + "hosts": "a, b, c", + "ports": "1, 2, 3", + "listName": "mylist", + "passwordFromEnv": "REDIS_PASSWORD", + }, + authParams: map[string]string{}, + resolvedEnv: testRedisResolvedEnv, + wantMeta: &redisMetadata{ + targetListLength: 5, + listName: "mylist", + connectionInfo: redisConnectionInfo{ + addresses: []string{"a:1", "b:2", "c:3"}, + hosts: []string{"a", "b", "c"}, + ports: []string{"1", "2", "3"}, + password: "none", + }, + }, + wantErr: nil, + }, + { + name: "sentinelUsername given in authParams", + metadata: map[string]string{ + "hosts": "a, b, c", + "ports": "1, 2, 3", + "listName": "mylist", + }, + authParams: map[string]string{ + "sentinelUsername": "sentinelUsername", + }, + wantMeta: &redisMetadata{ + targetListLength: 5, + listName: "mylist", + connectionInfo: redisConnectionInfo{ + addresses: []string{"a:1", "b:2", "c:3"}, + hosts: []string{"a", "b", "c"}, + ports: []string{"1", "2", "3"}, + sentinelUsername: "sentinelUsername", + }, + }, + wantErr: nil, + }, + { + name: "sentinelUsername given in metadata", + metadata: map[string]string{ + "hosts": "a, b, c", + "ports": "1, 2, 3", + "listName": "mylist", + "sentinelUsername": "sentinelUsername", + }, + authParams: map[string]string{}, + wantMeta: &redisMetadata{ + targetListLength: 5, + listName: "mylist", + connectionInfo: redisConnectionInfo{ + addresses: []string{"a:1", "b:2", "c:3"}, + hosts: []string{"a", "b", "c"}, + ports: []string{"1", "2", "3"}, + sentinelUsername: "sentinelUsername", + }, + }, + wantErr: nil, + }, + { + name: "sentinelUsername given in metadata from env", + metadata: map[string]string{ + "hosts": "a, b, c", + "ports": "1, 2, 3", + "listName": "mylist", + "sentinelUsernameFromEnv": "REDIS_SENTINEL_USERNAME", + }, + authParams: map[string]string{}, + resolvedEnv: testRedisResolvedEnv, + wantMeta: &redisMetadata{ + targetListLength: 5, + listName: "mylist", + connectionInfo: redisConnectionInfo{ + addresses: []string{"a:1", "b:2", "c:3"}, + hosts: []string{"a", "b", "c"}, + ports: []string{"1", "2", "3"}, + sentinelUsername: "none", + }, + }, + wantErr: nil, + }, + { + name: "sentinelPassword given in authParams", + metadata: map[string]string{ + "hosts": "a, b, c", + "ports": "1, 2, 3", + "listName": "mylist", + }, + authParams: map[string]string{ + "sentinelPassword": "sentinelPassword", + }, + wantMeta: &redisMetadata{ + targetListLength: 5, + listName: "mylist", + connectionInfo: redisConnectionInfo{ + addresses: []string{"a:1", "b:2", "c:3"}, + hosts: []string{"a", "b", "c"}, + ports: []string{"1", "2", "3"}, + sentinelPassword: "sentinelPassword", + }, + }, + wantErr: nil, + }, + { + name: "sentinelPassword given in metadata from env", + metadata: map[string]string{ + "hosts": "a, b, c", + "ports": "1, 2, 3", + "listName": "mylist", + "sentinelPasswordFromEnv": "REDIS_SENTINEL_PASSWORD", + }, + authParams: map[string]string{}, + resolvedEnv: testRedisResolvedEnv, + wantMeta: &redisMetadata{ + targetListLength: 5, + listName: "mylist", + connectionInfo: redisConnectionInfo{ + addresses: []string{"a:1", "b:2", "c:3"}, + hosts: []string{"a", "b", "c"}, + ports: []string{"1", "2", "3"}, + sentinelPassword: "none", + }, + }, + wantErr: nil, + }, + { + name: "sentinelMaster given in authParams", + metadata: map[string]string{ + "hosts": "a, b, c", + "ports": "1, 2, 3", + "listName": "mylist", + }, + authParams: map[string]string{ + "sentinelMaster": "sentinelMaster", + }, + wantMeta: &redisMetadata{ + targetListLength: 5, + listName: "mylist", + connectionInfo: redisConnectionInfo{ + addresses: []string{"a:1", "b:2", "c:3"}, + hosts: []string{"a", "b", "c"}, + ports: []string{"1", "2", "3"}, + sentinelMaster: "sentinelMaster", + }, + }, + wantErr: nil, + }, + { + name: "sentinelMaster given in metadata", + metadata: map[string]string{ + "hosts": "a, b, c", + "ports": "1, 2, 3", + "listName": "mylist", + "sentinelMaster": "sentinelMaster", + }, + authParams: map[string]string{}, + wantMeta: &redisMetadata{ + targetListLength: 5, + listName: "mylist", + connectionInfo: redisConnectionInfo{ + addresses: []string{"a:1", "b:2", "c:3"}, + hosts: []string{"a", "b", "c"}, + ports: []string{"1", "2", "3"}, + sentinelMaster: "sentinelMaster", + }, + }, + wantErr: nil, + }, + { + name: "sentinelMaster given in metadata from env", + metadata: map[string]string{ + "hosts": "a, b, c", + "ports": "1, 2, 3", + "listName": "mylist", + "sentinelMasterFromEnv": "REDIS_SENTINEL_MASTER", + }, + authParams: map[string]string{}, + resolvedEnv: testRedisResolvedEnv, + wantMeta: &redisMetadata{ + targetListLength: 5, + listName: "mylist", + connectionInfo: redisConnectionInfo{ + addresses: []string{"a:1", "b:2", "c:3"}, + hosts: []string{"a", "b", "c"}, + ports: []string{"1", "2", "3"}, + sentinelMaster: "none", + }, + }, + wantErr: nil, + }, + } + + for _, testCase := range cases { + c := testCase + t.Run(c.name, func(t *testing.T) { + config := &ScalerConfig{ + TriggerMetadata: c.metadata, + ResolvedEnv: c.resolvedEnv, + AuthParams: c.authParams, + } + meta, err := parseRedisMetadata(config, parseRedisSentinelAddress) + if c.wantErr != nil { + assert.Contains(t, err.Error(), c.wantErr.Error()) + } else { + assert.NoError(t, err) + } + assert.Equal(t, c.wantMeta, meta) + }) + } +} diff --git a/pkg/scalers/redis_streams_scaler.go b/pkg/scalers/redis_streams_scaler.go index 930effb3082..66c5a3205f9 100644 --- a/pkg/scalers/redis_streams_scaler.go +++ b/pkg/scalers/redis_streams_scaler.go @@ -24,7 +24,11 @@ const ( pendingEntriesCountMetadata = "pendingEntriesCount" streamNameMetadata = "stream" consumerGroupNameMetadata = "consumerGroup" + usernameMetadata = "username" passwordMetadata = "password" + sentinelUsernameMetadata = "sentinelUsername" + sentinelPasswordMetadata = "sentinelPassword" + sentinelMasterMetadata = "sentinelMaster" databaseIndexMetadata = "databaseIndex" enableTLSMetadata = "enableTLS" ) @@ -46,13 +50,19 @@ type redisStreamsMetadata struct { var redisStreamsLog = logf.Log.WithName("redis_streams_scaler") // NewRedisStreamsScaler creates a new redisStreamsScaler -func NewRedisStreamsScaler(isClustered bool, config *ScalerConfig) (Scaler, error) { +func NewRedisStreamsScaler(isClustered, isSentinel bool, config *ScalerConfig) (Scaler, error) { if isClustered { meta, err := parseRedisStreamsMetadata(config, parseRedisClusterAddress) if err != nil { return nil, fmt.Errorf("error parsing redis streams metadata: %s", err) } return createClusteredRedisStreamsScaler(meta) + } else if isSentinel { + meta, err := parseRedisStreamsMetadata(config, parseRedisSentinelAddress) + if err != nil { + return nil, fmt.Errorf("error parsing redis streams metadata: %s", err) + } + return createSentinelRedisStreamsScaler(meta) } meta, err := parseRedisStreamsMetadata(config, parseRedisAddress) if err != nil { @@ -76,7 +86,36 @@ func createClusteredRedisStreamsScaler(meta *redisStreamsMetadata) (Scaler, erro } pendingEntriesCountFn := func() (int64, error) { - pendingEntries, err := client.XPending(meta.streamName, meta.consumerGroupName).Result() + pendingEntries, err := client.XPending(ctx, meta.streamName, meta.consumerGroupName).Result() + if err != nil { + return -1, err + } + return pendingEntries.Count, nil + } + + return &redisStreamsScaler{ + metadata: meta, + closeFn: closeFn, + getPendingEntriesCountFn: pendingEntriesCountFn, + }, nil +} + +func createSentinelRedisStreamsScaler(meta *redisStreamsMetadata) (Scaler, error) { + client, err := getRedisSentinelClient(meta.connectionInfo, meta.databaseIndex) + if err != nil { + return nil, fmt.Errorf("connection to redis sentinel failed: %s", err) + } + + closeFn := func() error { + if err := client.Close(); err != nil { + redisStreamsLog.Error(err, "error closing redis client") + return err + } + return nil + } + + pendingEntriesCountFn := func() (int64, error) { + pendingEntries, err := client.XPending(ctx, meta.streamName, meta.consumerGroupName).Result() if err != nil { return -1, err } @@ -105,7 +144,7 @@ func createRedisStreamsScaler(meta *redisStreamsMetadata) (Scaler, error) { } pendingEntriesCountFn := func() (int64, error) { - pendingEntries, err := client.XPending(meta.streamName, meta.consumerGroupName).Result() + pendingEntries, err := client.XPending(ctx, meta.streamName, meta.consumerGroupName).Result() if err != nil { return -1, err } diff --git a/pkg/scalers/redis_streams_scaler_test.go b/pkg/scalers/redis_streams_scaler_test.go index 187d01bf89a..603c2492fbd 100644 --- a/pkg/scalers/redis_streams_scaler_test.go +++ b/pkg/scalers/redis_streams_scaler_test.go @@ -16,14 +16,15 @@ func TestParseRedisStreamsMetadata(t *testing.T) { authParams map[string]string } - authParams := map[string]string{"password": "foobarred"} + authParams := map[string]string{"username": "foobarred", "password": "foobarred"} testCases := []testCase{ { name: "with address", - metadata: map[string]string{"stream": "my-stream", "consumerGroup": "my-stream-consumer-group", "pendingEntriesCount": "5", "addressFromEnv": "REDIS_SERVICE", "passwordFromEnv": "REDIS_PASSWORD", "databaseIndex": "0", "enableTLS": "true"}, + metadata: map[string]string{"stream": "my-stream", "consumerGroup": "my-stream-consumer-group", "pendingEntriesCount": "5", "addressFromEnv": "REDIS_SERVICE", "usernameFromEnv": "REDIS_USERNAME", "passwordFromEnv": "REDIS_PASSWORD", "databaseIndex": "0", "enableTLS": "true"}, resolvedEnv: map[string]string{ "REDIS_SERVICE": "myredis:6379", + "REDIS_USERNAME": "foobarred", "REDIS_PASSWORD": "foobarred", }, authParams: nil, @@ -31,10 +32,11 @@ func TestParseRedisStreamsMetadata(t *testing.T) { { name: "with host and port", - metadata: map[string]string{"stream": "my-stream", "consumerGroup": "my-stream-consumer-group", "pendingEntriesCount": "15", "hostFromEnv": "REDIS_HOST", "port": "REDIS_PORT", "passwordFromEnv": "REDIS_PASSWORD", "databaseIndex": "0", "enableTLS": "false"}, + metadata: map[string]string{"stream": "my-stream", "consumerGroup": "my-stream-consumer-group", "pendingEntriesCount": "15", "hostFromEnv": "REDIS_HOST", "port": "REDIS_PORT", "usernameFromEnv": "REDIS_USERNAME", "passwordFromEnv": "REDIS_PASSWORD", "databaseIndex": "0", "enableTLS": "false"}, resolvedEnv: map[string]string{ "REDIS_HOST": "myredis", "REDIS_PORT": "6379", + "REDIS_USERNAME": "foobarred", "REDIS_PASSWORD": "foobarred", }, authParams: authParams, @@ -51,11 +53,14 @@ func TestParseRedisStreamsMetadata(t *testing.T) { assert.Equal(t, strconv.Itoa(m.targetPendingEntriesCount), tc.metadata[pendingEntriesCountMetadata]) if authParams != nil { // if authParam is used + assert.Equal(t, m.connectionInfo.username, authParams[usernameMetadata]) assert.Equal(t, m.connectionInfo.password, authParams[passwordMetadata]) } else { - // if metadata is used to pass password env var name + // if metadata is used to pass credentials' env var names + assert.Equal(t, m.connectionInfo.username, tc.resolvedEnv[tc.metadata[usernameMetadata]]) assert.Equal(t, m.connectionInfo.password, tc.resolvedEnv[tc.metadata[passwordMetadata]]) } + assert.Equal(t, strconv.Itoa(m.databaseIndex), tc.metadata[databaseIndexMetadata]) b, err := strconv.ParseBool(tc.metadata[enableTLSMetadata]) assert.Nil(t, err) @@ -243,6 +248,130 @@ func TestParseRedisClusterStreamsMetadata(t *testing.T) { }, wantErr: nil, }, + { + name: "username given in authParams", + metadata: map[string]string{ + "hosts": "a, b, c", + "ports": "1, 2, 3", + "stream": "my-stream", + "pendingEntriesCount": "10", + "consumerGroup": "consumer1", + }, + authParams: map[string]string{ + "username": "username", + }, + wantMeta: &redisStreamsMetadata{ + streamName: "my-stream", + targetPendingEntriesCount: 10, + consumerGroupName: "consumer1", + connectionInfo: redisConnectionInfo{ + addresses: []string{"a:1", "b:2", "c:3"}, + hosts: []string{"a", "b", "c"}, + ports: []string{"1", "2", "3"}, + username: "username", + }, + }, + wantErr: nil, + }, + { + name: "username given in metadata", + metadata: map[string]string{ + "hosts": "a, b, c", + "ports": "1, 2, 3", + "stream": "my-stream", + "pendingEntriesCount": "10", + "consumerGroup": "consumer1", + "username": "username", + }, + authParams: map[string]string{}, + wantMeta: &redisStreamsMetadata{ + streamName: "my-stream", + targetPendingEntriesCount: 10, + consumerGroupName: "consumer1", + connectionInfo: redisConnectionInfo{ + addresses: []string{"a:1", "b:2", "c:3"}, + hosts: []string{"a", "b", "c"}, + ports: []string{"1", "2", "3"}, + username: "username", + }, + }, + wantErr: nil, + }, + { + name: "username given in metadata from env", + metadata: map[string]string{ + "hosts": "a, b, c", + "ports": "1, 2, 3", + "stream": "my-stream", + "pendingEntriesCount": "10", + "consumerGroup": "consumer1", + "usernameFromEnv": "REDIS_USERNAME", + }, + authParams: map[string]string{}, + resolvedEnv: testRedisResolvedEnv, + wantMeta: &redisStreamsMetadata{ + streamName: "my-stream", + targetPendingEntriesCount: 10, + consumerGroupName: "consumer1", + connectionInfo: redisConnectionInfo{ + addresses: []string{"a:1", "b:2", "c:3"}, + hosts: []string{"a", "b", "c"}, + ports: []string{"1", "2", "3"}, + username: "none", + }, + }, + wantErr: nil, + }, + { + name: "password given in authParams", + metadata: map[string]string{ + "hosts": "a, b, c", + "ports": "1, 2, 3", + "stream": "my-stream", + "pendingEntriesCount": "10", + "consumerGroup": "consumer1", + }, + authParams: map[string]string{ + "password": "password", + }, + wantMeta: &redisStreamsMetadata{ + streamName: "my-stream", + targetPendingEntriesCount: 10, + consumerGroupName: "consumer1", + connectionInfo: redisConnectionInfo{ + addresses: []string{"a:1", "b:2", "c:3"}, + hosts: []string{"a", "b", "c"}, + ports: []string{"1", "2", "3"}, + password: "password", + }, + }, + wantErr: nil, + }, + { + name: "password given in metadata from env", + metadata: map[string]string{ + "hosts": "a, b, c", + "ports": "1, 2, 3", + "stream": "my-stream", + "pendingEntriesCount": "10", + "consumerGroup": "consumer1", + "passwordFromEnv": "REDIS_PASSWORD", + }, + authParams: map[string]string{}, + resolvedEnv: testRedisResolvedEnv, + wantMeta: &redisStreamsMetadata{ + streamName: "my-stream", + targetPendingEntriesCount: 10, + consumerGroupName: "consumer1", + connectionInfo: redisConnectionInfo{ + addresses: []string{"a:1", "b:2", "c:3"}, + hosts: []string{"a", "b", "c"}, + ports: []string{"1", "2", "3"}, + password: "none", + }, + }, + wantErr: nil, + }, } for _, testCase := range cases { @@ -263,3 +392,442 @@ func TestParseRedisClusterStreamsMetadata(t *testing.T) { }) } } + +func TestParseRedisSentinelStreamsMetadata(t *testing.T) { + cases := []struct { + name string + metadata map[string]string + resolvedEnv map[string]string + authParams map[string]string + wantMeta *redisStreamsMetadata + wantErr error + }{ + { + name: "empty metadata", + wantMeta: nil, + wantErr: errors.New("no addresses or hosts given. address should be a comma separated list of host:port or set the host/port values"), + }, + { + name: "unequal number of hosts/ports", + metadata: map[string]string{ + "hosts": "a, b, c", + "ports": "1, 2", + }, + wantMeta: nil, + wantErr: errors.New("not enough hosts or ports given. number of hosts should be equal to the number of ports"), + }, + { + name: "no stream name", + metadata: map[string]string{ + "hosts": "a, b, c", + "ports": "1, 2, 3", + "pendingEntriesCount": "5", + }, + wantMeta: nil, + wantErr: errors.New("missing redis stream name"), + }, + { + name: "missing pending entries count", + metadata: map[string]string{ + "hosts": "a, b, c", + "ports": "1, 2, 3", + "stream": "my-stream", + }, + wantMeta: nil, + wantErr: errors.New("missing pending entries count"), + }, + { + name: "invalid pending entries count", + metadata: map[string]string{ + "hosts": "a, b, c", + "ports": "1, 2, 3", + "pendingEntriesCount": "invalid", + }, + wantMeta: nil, + wantErr: errors.New("error parsing pending entries count"), + }, + { + name: "address is defined in auth params", + metadata: map[string]string{ + "stream": "my-stream", + "pendingEntriesCount": "10", + "consumerGroup": "consumer1", + }, + authParams: map[string]string{ + "addresses": ":7001, :7002", + }, + wantMeta: &redisStreamsMetadata{ + streamName: "my-stream", + targetPendingEntriesCount: 10, + consumerGroupName: "consumer1", + connectionInfo: redisConnectionInfo{ + addresses: []string{":7001", ":7002"}, + }, + }, + wantErr: nil, + }, + { + name: "hosts and ports given in auth params", + metadata: map[string]string{ + "stream": "my-stream", + "pendingEntriesCount": "10", + "consumerGroup": "consumer1", + }, + authParams: map[string]string{ + "hosts": " a, b, c ", + "ports": "1, 2, 3", + }, + wantMeta: &redisStreamsMetadata{ + streamName: "my-stream", + targetPendingEntriesCount: 10, + consumerGroupName: "consumer1", + connectionInfo: redisConnectionInfo{ + addresses: []string{"a:1", "b:2", "c:3"}, + hosts: []string{"a", "b", "c"}, + ports: []string{"1", "2", "3"}, + }, + }, + wantErr: nil, + }, + { + name: "username given in authParams", + metadata: map[string]string{ + "hosts": "a, b, c", + "ports": "1, 2, 3", + "stream": "my-stream", + "pendingEntriesCount": "10", + "consumerGroup": "consumer1", + }, + authParams: map[string]string{ + "username": "username", + }, + wantMeta: &redisStreamsMetadata{ + streamName: "my-stream", + targetPendingEntriesCount: 10, + consumerGroupName: "consumer1", + connectionInfo: redisConnectionInfo{ + addresses: []string{"a:1", "b:2", "c:3"}, + hosts: []string{"a", "b", "c"}, + ports: []string{"1", "2", "3"}, + username: "username", + }, + }, + wantErr: nil, + }, + { + name: "username given in metadata", + metadata: map[string]string{ + "hosts": "a, b, c", + "ports": "1, 2, 3", + "stream": "my-stream", + "pendingEntriesCount": "10", + "consumerGroup": "consumer1", + "username": "username", + }, + authParams: map[string]string{}, + wantMeta: &redisStreamsMetadata{ + streamName: "my-stream", + targetPendingEntriesCount: 10, + consumerGroupName: "consumer1", + connectionInfo: redisConnectionInfo{ + addresses: []string{"a:1", "b:2", "c:3"}, + hosts: []string{"a", "b", "c"}, + ports: []string{"1", "2", "3"}, + username: "username", + }, + }, + wantErr: nil, + }, + { + name: "username given in metadata from env", + metadata: map[string]string{ + "hosts": "a, b, c", + "ports": "1, 2, 3", + "stream": "my-stream", + "pendingEntriesCount": "10", + "consumerGroup": "consumer1", + "usernameFromEnv": "REDIS_USERNAME", + }, + authParams: map[string]string{}, + resolvedEnv: testRedisResolvedEnv, + wantMeta: &redisStreamsMetadata{ + streamName: "my-stream", + targetPendingEntriesCount: 10, + consumerGroupName: "consumer1", + connectionInfo: redisConnectionInfo{ + addresses: []string{"a:1", "b:2", "c:3"}, + hosts: []string{"a", "b", "c"}, + ports: []string{"1", "2", "3"}, + username: "none", + }, + }, + wantErr: nil, + }, + { + name: "password given in authParams", + metadata: map[string]string{ + "hosts": "a, b, c", + "ports": "1, 2, 3", + "stream": "my-stream", + "pendingEntriesCount": "10", + "consumerGroup": "consumer1", + }, + authParams: map[string]string{ + "password": "password", + }, + wantMeta: &redisStreamsMetadata{ + streamName: "my-stream", + targetPendingEntriesCount: 10, + consumerGroupName: "consumer1", + connectionInfo: redisConnectionInfo{ + addresses: []string{"a:1", "b:2", "c:3"}, + hosts: []string{"a", "b", "c"}, + ports: []string{"1", "2", "3"}, + password: "password", + }, + }, + wantErr: nil, + }, + { + name: "password given in metadata from env", + metadata: map[string]string{ + "hosts": "a, b, c", + "ports": "1, 2, 3", + "stream": "my-stream", + "pendingEntriesCount": "10", + "consumerGroup": "consumer1", + "passwordFromEnv": "REDIS_PASSWORD", + }, + authParams: map[string]string{}, + resolvedEnv: testRedisResolvedEnv, + wantMeta: &redisStreamsMetadata{ + streamName: "my-stream", + targetPendingEntriesCount: 10, + consumerGroupName: "consumer1", + connectionInfo: redisConnectionInfo{ + addresses: []string{"a:1", "b:2", "c:3"}, + hosts: []string{"a", "b", "c"}, + ports: []string{"1", "2", "3"}, + password: "none", + }, + }, + wantErr: nil, + }, + { + name: "sentinelUsername given in authParams", + metadata: map[string]string{ + "hosts": "a, b, c", + "ports": "1, 2, 3", + "stream": "my-stream", + "pendingEntriesCount": "10", + "consumerGroup": "consumer1", + }, + authParams: map[string]string{ + "sentinelUsername": "sentinelUsername", + }, + wantMeta: &redisStreamsMetadata{ + streamName: "my-stream", + targetPendingEntriesCount: 10, + consumerGroupName: "consumer1", + connectionInfo: redisConnectionInfo{ + addresses: []string{"a:1", "b:2", "c:3"}, + hosts: []string{"a", "b", "c"}, + ports: []string{"1", "2", "3"}, + sentinelUsername: "sentinelUsername", + }, + }, + wantErr: nil, + }, + { + name: "sentinelUsername given in metadata", + metadata: map[string]string{ + "hosts": "a, b, c", + "ports": "1, 2, 3", + "stream": "my-stream", + "pendingEntriesCount": "10", + "consumerGroup": "consumer1", + "sentinelUsername": "sentinelUsername", + }, + authParams: map[string]string{}, + wantMeta: &redisStreamsMetadata{ + streamName: "my-stream", + targetPendingEntriesCount: 10, + consumerGroupName: "consumer1", + connectionInfo: redisConnectionInfo{ + addresses: []string{"a:1", "b:2", "c:3"}, + hosts: []string{"a", "b", "c"}, + ports: []string{"1", "2", "3"}, + sentinelUsername: "sentinelUsername", + }, + }, + wantErr: nil, + }, + { + name: "sentinelUsername given in metadata from env", + metadata: map[string]string{ + "hosts": "a, b, c", + "ports": "1, 2, 3", + "stream": "my-stream", + "pendingEntriesCount": "10", + "consumerGroup": "consumer1", + "sentinelUsernameFromEnv": "REDIS_SENTINEL_USERNAME", + }, + authParams: map[string]string{}, + resolvedEnv: testRedisResolvedEnv, + wantMeta: &redisStreamsMetadata{ + streamName: "my-stream", + targetPendingEntriesCount: 10, + consumerGroupName: "consumer1", + connectionInfo: redisConnectionInfo{ + addresses: []string{"a:1", "b:2", "c:3"}, + hosts: []string{"a", "b", "c"}, + ports: []string{"1", "2", "3"}, + sentinelUsername: "none", + }, + }, + wantErr: nil, + }, + { + name: "sentinelPassword given in authParams", + metadata: map[string]string{ + "hosts": "a, b, c", + "ports": "1, 2, 3", + "stream": "my-stream", + "pendingEntriesCount": "10", + "consumerGroup": "consumer1", + }, + authParams: map[string]string{ + "sentinelPassword": "sentinelPassword", + }, + wantMeta: &redisStreamsMetadata{ + streamName: "my-stream", + targetPendingEntriesCount: 10, + consumerGroupName: "consumer1", + connectionInfo: redisConnectionInfo{ + addresses: []string{"a:1", "b:2", "c:3"}, + hosts: []string{"a", "b", "c"}, + ports: []string{"1", "2", "3"}, + sentinelPassword: "sentinelPassword", + }, + }, + wantErr: nil, + }, + { + name: "sentinelPassword given in metadata from env", + metadata: map[string]string{ + "hosts": "a, b, c", + "ports": "1, 2, 3", + "stream": "my-stream", + "pendingEntriesCount": "10", + "consumerGroup": "consumer1", + "sentinelPasswordFromEnv": "REDIS_SENTINEL_PASSWORD", + }, + authParams: map[string]string{}, + resolvedEnv: testRedisResolvedEnv, + wantMeta: &redisStreamsMetadata{ + streamName: "my-stream", + targetPendingEntriesCount: 10, + consumerGroupName: "consumer1", + connectionInfo: redisConnectionInfo{ + addresses: []string{"a:1", "b:2", "c:3"}, + hosts: []string{"a", "b", "c"}, + ports: []string{"1", "2", "3"}, + sentinelPassword: "none", + }, + }, + wantErr: nil, + }, + { + name: "sentinelMaster given in authParams", + metadata: map[string]string{ + "hosts": "a, b, c", + "ports": "1, 2, 3", + "stream": "my-stream", + "pendingEntriesCount": "10", + "consumerGroup": "consumer1", + }, + authParams: map[string]string{ + "sentinelMaster": "sentinelMaster", + }, + wantMeta: &redisStreamsMetadata{ + streamName: "my-stream", + targetPendingEntriesCount: 10, + consumerGroupName: "consumer1", + connectionInfo: redisConnectionInfo{ + addresses: []string{"a:1", "b:2", "c:3"}, + hosts: []string{"a", "b", "c"}, + ports: []string{"1", "2", "3"}, + sentinelMaster: "sentinelMaster", + }, + }, + wantErr: nil, + }, + { + name: "sentinelMaster given in metadata", + metadata: map[string]string{ + "hosts": "a, b, c", + "ports": "1, 2, 3", + "stream": "my-stream", + "pendingEntriesCount": "10", + "consumerGroup": "consumer1", + "sentinelMaster": "sentinelMaster", + }, + authParams: map[string]string{}, + wantMeta: &redisStreamsMetadata{ + streamName: "my-stream", + targetPendingEntriesCount: 10, + consumerGroupName: "consumer1", + connectionInfo: redisConnectionInfo{ + addresses: []string{"a:1", "b:2", "c:3"}, + hosts: []string{"a", "b", "c"}, + ports: []string{"1", "2", "3"}, + sentinelMaster: "sentinelMaster", + }, + }, + wantErr: nil, + }, + { + name: "sentinelMaster given in metadata from env", + metadata: map[string]string{ + "hosts": "a, b, c", + "ports": "1, 2, 3", + "stream": "my-stream", + "pendingEntriesCount": "10", + "consumerGroup": "consumer1", + "sentinelMasterFromEnv": "REDIS_SENTINEL_MASTER", + }, + authParams: map[string]string{}, + resolvedEnv: testRedisResolvedEnv, + wantMeta: &redisStreamsMetadata{ + streamName: "my-stream", + targetPendingEntriesCount: 10, + consumerGroupName: "consumer1", + connectionInfo: redisConnectionInfo{ + addresses: []string{"a:1", "b:2", "c:3"}, + hosts: []string{"a", "b", "c"}, + ports: []string{"1", "2", "3"}, + sentinelMaster: "none", + }, + }, + wantErr: nil, + }, + } + + for _, testCase := range cases { + c := testCase + t.Run(c.name, func(t *testing.T) { + config := &ScalerConfig{ + TriggerMetadata: c.metadata, + ResolvedEnv: c.resolvedEnv, + AuthParams: c.authParams, + } + meta, err := parseRedisStreamsMetadata(config, parseRedisSentinelAddress) + if c.wantErr != nil { + assert.Contains(t, err.Error(), c.wantErr.Error()) + } else { + assert.NoError(t, err) + } + assert.Equal(t, c.wantMeta, meta) + }) + } +} diff --git a/pkg/scaling/scale_handler.go b/pkg/scaling/scale_handler.go index e2da33b7c0d..b2e69b63421 100644 --- a/pkg/scaling/scale_handler.go +++ b/pkg/scaling/scale_handler.go @@ -379,13 +379,17 @@ func buildScaler(client client.Client, triggerType string, config *scalers.Scale case "rabbitmq": return scalers.NewRabbitMQScaler(config) case "redis": - return scalers.NewRedisScaler(false, config) + return scalers.NewRedisScaler(false, false, config) case "redis-cluster": - return scalers.NewRedisScaler(true, config) - case "redis-cluster-streams": - return scalers.NewRedisStreamsScaler(true, config) + return scalers.NewRedisScaler(true, false, config) + case "redis-sentinel": + return scalers.NewRedisScaler(false, true, config) case "redis-streams": - return scalers.NewRedisStreamsScaler(false, config) + return scalers.NewRedisStreamsScaler(false, false, config) + case "redis-cluster-streams": + return scalers.NewRedisStreamsScaler(true, false, config) + case "redis-sentinel-streams": + return scalers.NewRedisStreamsScaler(false, true, config) case "selenium-grid": return scalers.NewSeleniumGridScaler(config) case "solace-event-queue": From ef84416490571693890f6072228d4bf94273f5a5 Mon Sep 17 00:00:00 2001 From: Jeroen Bobbeldijk Date: Mon, 11 Oct 2021 15:13:38 +0200 Subject: [PATCH 2/6] Update changelog Signed-off-by: Jeroen Bobbeldijk --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index d706e8321f9..01893699558 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,6 +25,7 @@ - TODO ([#XXX](https://github.com/kedacore/keda/pull/XXX)) - ScaledJob: introduce MultipleScalersCalculation ([#2016](https://github.com/kedacore/keda/pull/2016)) - Add Graphite Scaler ([#1628](https://github.com/kedacore/keda/pull/2092)) +- Improve Redis Scaler, upgrade library, add username and Sentinel support ([#2181](https://github.com/kedacore/keda/pull/2181)) ### Improvements From bf437d7971df18bb19e0eafb94b9ec891b90cd89 Mon Sep 17 00:00:00 2001 From: Jeroen Bobbeldijk Date: Mon, 11 Oct 2021 16:18:57 +0200 Subject: [PATCH 3/6] Fix scalers sorting, pass sentinelUsername in connection settings, change if statements to switch statements Signed-off-by: Jeroen Bobbeldijk --- pkg/scalers/redis_scaler.go | 89 +++++++++++------------------ pkg/scalers/redis_streams_scaler.go | 3 - pkg/scaling/scale_handler.go | 8 +-- 3 files changed, 38 insertions(+), 62 deletions(-) diff --git a/pkg/scalers/redis_scaler.go b/pkg/scalers/redis_scaler.go index 03092afa985..37a5a529afb 100644 --- a/pkg/scalers/redis_scaler.go +++ b/pkg/scalers/redis_scaler.go @@ -306,11 +306,12 @@ func parseRedisAddress(metadata, resolvedEnv, authParams map[string]string) (red return info, fmt.Errorf("no address or host given. address should be in the format of host:port or you should set the host/port values") } - if authParams["username"] != "" { + switch { + case authParams["username"] != "": info.username = authParams["username"] - } else if metadata["username"] != "" { + case metadata["username"] != "": info.username = metadata["username"] - } else if metadata["usernameFromEnv"] != "" { + case metadata["usernameFromEnv"] != "": info.username = resolvedEnv[metadata["usernameFromEnv"]] } @@ -332,7 +333,7 @@ func parseRedisAddress(metadata, resolvedEnv, authParams map[string]string) (red return info, nil } -func parseRedisClusterAddress(metadata, resolvedEnv, authParams map[string]string) (redisConnectionInfo, error) { +func parseRedisMultipleAddress(metadata, resolvedEnv, authParams map[string]string) (redisConnectionInfo, error) { info := redisConnectionInfo{} switch { case authParams["addresses"] != "": @@ -374,11 +375,21 @@ func parseRedisClusterAddress(metadata, resolvedEnv, authParams map[string]strin return info, fmt.Errorf("no addresses or hosts given. address should be a comma separated list of host:port or set the host/port values") } - if authParams["username"] != "" { + return info, nil +} + +func parseRedisClusterAddress(metadata, resolvedEnv, authParams map[string]string) (redisConnectionInfo, error) { + info, err := parseRedisMultipleAddress(metadata, resolvedEnv, authParams) + if err != nil { + return info, err + } + + switch { + case authParams["username"] != "": info.username = authParams["username"] - } else if metadata["username"] != "" { + case metadata["username"] != "": info.username = metadata["username"] - } else if metadata["usernameFromEnv"] != "" { + case metadata["usernameFromEnv"] != "": info.username = resolvedEnv[metadata["usernameFromEnv"]] } @@ -401,52 +412,17 @@ func parseRedisClusterAddress(metadata, resolvedEnv, authParams map[string]strin } func parseRedisSentinelAddress(metadata, resolvedEnv, authParams map[string]string) (redisConnectionInfo, error) { - info := redisConnectionInfo{} - switch { - case authParams["addresses"] != "": - info.addresses = splitAndTrim(authParams["addresses"]) - case metadata["addresses"] != "": - info.addresses = splitAndTrim(metadata["addresses"]) - case metadata["addressesFromEnv"] != "": - info.addresses = splitAndTrim(resolvedEnv[metadata["addressesFromEnv"]]) - default: - switch { - case authParams["hosts"] != "": - info.hosts = splitAndTrim(authParams["hosts"]) - case metadata["hosts"] != "": - info.hosts = splitAndTrim(metadata["hosts"]) - case metadata["hostsFromEnv"] != "": - info.hosts = splitAndTrim(resolvedEnv[metadata["hostsFromEnv"]]) - } - - switch { - case authParams["ports"] != "": - info.ports = splitAndTrim(authParams["ports"]) - case metadata["ports"] != "": - info.ports = splitAndTrim(metadata["ports"]) - case metadata["portsFromEnv"] != "": - info.ports = splitAndTrim(resolvedEnv[metadata["portsFromEnv"]]) - } - - if len(info.hosts) != 0 && len(info.ports) != 0 { - if len(info.hosts) != len(info.ports) { - return info, fmt.Errorf("not enough hosts or ports given. number of hosts should be equal to the number of ports") - } - for i := range info.hosts { - info.addresses = append(info.addresses, fmt.Sprintf("%s:%s", info.hosts[i], info.ports[i])) - } - } - } - - if len(info.addresses) == 0 { - return info, fmt.Errorf("no addresses or hosts given. address should be a comma separated list of host:port or set the host/port values") + info, err := parseRedisMultipleAddress(metadata, resolvedEnv, authParams) + if err != nil { + return info, err } - if authParams["username"] != "" { + switch { + case authParams["username"] != "": info.username = authParams["username"] - } else if metadata["username"] != "" { + case metadata["username"] != "": info.username = metadata["username"] - } else if metadata["usernameFromEnv"] != "" { + case metadata["usernameFromEnv"] != "": info.username = resolvedEnv[metadata["usernameFromEnv"]] } @@ -456,11 +432,12 @@ func parseRedisSentinelAddress(metadata, resolvedEnv, authParams map[string]stri info.password = resolvedEnv[metadata["passwordFromEnv"]] } - if authParams["sentinelUsername"] != "" { + switch { + case authParams["sentinelUsername"] != "": info.sentinelUsername = authParams["sentinelUsername"] - } else if metadata["sentinelUsername"] != "" { + case metadata["sentinelUsername"] != "": info.sentinelUsername = metadata["sentinelUsername"] - } else if metadata["sentinelUsernameFromEnv"] != "" { + case metadata["sentinelUsernameFromEnv"] != "": info.sentinelUsername = resolvedEnv[metadata["sentinelUsernameFromEnv"]] } @@ -470,11 +447,12 @@ func parseRedisSentinelAddress(metadata, resolvedEnv, authParams map[string]stri info.sentinelPassword = resolvedEnv[metadata["sentinelPasswordFromEnv"]] } - if authParams["sentinelMaster"] != "" { + switch { + case authParams["sentinelMaster"] != "": info.sentinelMaster = authParams["sentinelMaster"] - } else if metadata["sentinelMaster"] != "" { + case metadata["sentinelMaster"] != "": info.sentinelMaster = metadata["sentinelMaster"] - } else if metadata["sentinelMasterFromEnv"] != "" { + case metadata["sentinelMasterFromEnv"] != "": info.sentinelMaster = resolvedEnv[metadata["sentinelMasterFromEnv"]] } @@ -517,6 +495,7 @@ func getRedisSentinelClient(info redisConnectionInfo, dbIndex int) (*redis.Clien Password: info.password, DB: dbIndex, SentinelAddrs: info.addresses, + SentinelUsername: info.sentinelUsername, SentinelPassword: info.sentinelPassword, MasterName: info.sentinelMaster, } diff --git a/pkg/scalers/redis_streams_scaler.go b/pkg/scalers/redis_streams_scaler.go index 66c5a3205f9..5bea2aa0487 100644 --- a/pkg/scalers/redis_streams_scaler.go +++ b/pkg/scalers/redis_streams_scaler.go @@ -26,9 +26,6 @@ const ( consumerGroupNameMetadata = "consumerGroup" usernameMetadata = "username" passwordMetadata = "password" - sentinelUsernameMetadata = "sentinelUsername" - sentinelPasswordMetadata = "sentinelPassword" - sentinelMasterMetadata = "sentinelMaster" databaseIndexMetadata = "databaseIndex" enableTLSMetadata = "enableTLS" ) diff --git a/pkg/scaling/scale_handler.go b/pkg/scaling/scale_handler.go index b2e69b63421..9b6e522b171 100644 --- a/pkg/scaling/scale_handler.go +++ b/pkg/scaling/scale_handler.go @@ -382,14 +382,14 @@ func buildScaler(client client.Client, triggerType string, config *scalers.Scale return scalers.NewRedisScaler(false, false, config) case "redis-cluster": return scalers.NewRedisScaler(true, false, config) - case "redis-sentinel": - return scalers.NewRedisScaler(false, true, config) - case "redis-streams": - return scalers.NewRedisStreamsScaler(false, false, config) case "redis-cluster-streams": return scalers.NewRedisStreamsScaler(true, false, config) + case "redis-sentinel": + return scalers.NewRedisScaler(false, true, config) case "redis-sentinel-streams": return scalers.NewRedisStreamsScaler(false, true, config) + case "redis-streams": + return scalers.NewRedisStreamsScaler(false, false, config) case "selenium-grid": return scalers.NewSeleniumGridScaler(config) case "solace-event-queue": From db96f64b33ca263c2ebd5cc04a84fddbc80795ec Mon Sep 17 00:00:00 2001 From: Jeroen Bobbeldijk Date: Tue, 12 Oct 2021 13:17:25 +0200 Subject: [PATCH 4/6] Add e2e tests for sentinel Signed-off-by: Jeroen Bobbeldijk --- tests/scalers/redis-sentinel-lists.test.ts | 567 +++++++++++++++++++ tests/scalers/redis-sentinel-streams.test.ts | 227 ++++++++ 2 files changed, 794 insertions(+) create mode 100644 tests/scalers/redis-sentinel-lists.test.ts create mode 100644 tests/scalers/redis-sentinel-streams.test.ts diff --git a/tests/scalers/redis-sentinel-lists.test.ts b/tests/scalers/redis-sentinel-lists.test.ts new file mode 100644 index 00000000000..b3b082d1b51 --- /dev/null +++ b/tests/scalers/redis-sentinel-lists.test.ts @@ -0,0 +1,567 @@ +import test from 'ava' +import * as sh from 'shelljs' +import * as tmp from 'tmp' +import * as fs from 'fs' +import {waitForRollout} from "./helpers"; + +const redisNamespace = 'redis-sentinel' +const redisService = 'redis-sentinel' +const testNamespace = 'redis-sentinel-lists-test' +const redisStatefulSetName = 'redis-sentinel-node' +const redisSentinelName = 'redis-sentinel' +const redisSentinelMasterName = 'mymaster' +const redisPassword = 'my-password' +let redisHost = '' +const redisPort = 26379 +let redisAddress = '' +const listNameForHostPortRef = 'my-test-list-host-port-ref' +const listNameForAddressRef = 'my-test-list-address-ref' +const listNameForHostPortTriggerAuth = 'my-test-list-host-port-trigger' +const redisWorkerHostPortRefDeploymentName = 'redis-worker-test-hostport' +const redisWorkerAddressRefDeploymentName = 'redis-worker-test-address' +const redisWorkerHostPortRefTriggerAuthDeploymentName = 'redis-worker-test-hostport-triggerauth' +const itemsToWrite = 200 +const deploymentContainerImage = 'goku321/redis-cluster-list:v1.7' +const writeJobNameForHostPortRef = 'redis-writer-host-port-ref' +const writeJobNameForAddressRef = 'redis-writer-address-ref' +const writeJobNameForHostPortInTriggerAuth = 'redis-writer-host-port-trigger-auth' + +test.before(t => { + // Deploy Redis sentinel. + sh.exec(`kubectl create namespace ${redisNamespace}`) + sh.exec(`helm repo add bitnami https://charts.bitnami.com/bitnami`) + + let sentinelStatus = sh.exec(`helm install --timeout 600s ${redisSentinelName} --namespace ${redisNamespace} --set "sentinel.enabled=true" --set "global.redis.password=${redisPassword}" bitnami/redis`).code + t.is(0, + sentinelStatus, + 'creating a Redis sentinel setup should work.' + ) + + // Wait for Redis sentinel to be ready. + t.is(0, waitForRollout('statefulset', redisStatefulSetName, redisNamespace)) + + // Get Redis sentinel address. + redisHost = sh.exec(`kubectl get svc ${redisService} -n ${redisNamespace} -o jsonpath='{.spec.clusterIP}'`) + redisAddress = `${redisHost}:${redisPort}` + + // Create test namespace. + sh.exec(`kubectl create namespace ${testNamespace}`) + + const triggerAuthTmpFile = tmp.fileSync() + const base64Password = Buffer.from(redisPassword).toString('base64') + fs.writeFileSync(triggerAuthTmpFile.name, scaledObjectTriggerAuthYaml.replace('{{REDIS_PASSWORD}}', base64Password).replace('{{REDIS_SENTINEL_PASSWORD}}', base64Password)) + + t.is( + 0, + sh.exec(`kubectl apply -f ${triggerAuthTmpFile.name} --namespace ${testNamespace}`).code, + 'creating trigger auth should work..' + ) + + const triggerAuthHostPortTmpFile = tmp.fileSync() + + fs.writeFileSync(triggerAuthHostPortTmpFile.name, + scaledObjectTriggerAuthHostPortYaml.replace('{{REDIS_PASSWORD}}', base64Password) + .replace('{{REDIS_SENTINEL_PASSWORD}}', base64Password) + .replace('{{REDIS_SENTINEL_MASTER}}', Buffer.from(redisSentinelMasterName).toString('base64')) + .replace('{{REDIS_HOSTS}}', Buffer.from(redisHost).toString('base64')) + .replace('{{REDIS_PORTS}}', Buffer.from(redisPort.toString()).toString('base64')) + ) + + t.is( + 0, + sh.exec(`kubectl apply -f ${triggerAuthHostPortTmpFile.name} --namespace ${testNamespace}`).code, + 'creating trigger auth with host port should work..' + ) + + // Create a deployment with host and port. + const deploymentHostPortRefTmpFile = tmp.fileSync() + + fs.writeFileSync(deploymentHostPortRefTmpFile.name, redisListDeployHostPortYaml.replace(/{{REDIS_PASSWORD}}/g, redisPassword) + .replace(/{{REDIS_SENTINEL_PASSWORD}}/g, redisPassword) + .replace(/{{REDIS_SENTINEL_MASTER}}/g, redisSentinelMasterName) + .replace(/{{REDIS_HOSTS}}/g, redisHost) + .replace(/{{REDIS_PORTS}}/g, redisPort.toString()) + .replace(/{{LIST_NAME}}/g, listNameForHostPortRef) + .replace(/{{DEPLOYMENT_NAME}}/g, redisWorkerHostPortRefDeploymentName) + .replace(/{{CONTAINER_IMAGE}}/g, deploymentContainerImage) + ) + + t.is( + 0, + sh.exec(`kubectl apply -f ${deploymentHostPortRefTmpFile.name} --namespace ${testNamespace}`).code, + 'creating a deployment using redis host and port envs should work..' + ) + + const deploymentAddressRefTmpFile = tmp.fileSync() + + fs.writeFileSync(deploymentAddressRefTmpFile.name, redisListDeployAddressYaml.replace(/{{REDIS_PASSWORD}}/g, redisPassword) + .replace(/{{REDIS_SENTINEL_PASSWORD}}/g, redisPassword) + .replace(/{{REDIS_SENTINEL_MASTER}}/g, redisSentinelMasterName) + .replace(/{{REDIS_ADDRESSES}}/g, redisAddress) + .replace(/{{LIST_NAME}}/g, listNameForAddressRef) + .replace(/{{DEPLOYMENT_NAME}}/g, redisWorkerAddressRefDeploymentName) + .replace(/{{CONTAINER_IMAGE}}/g, deploymentContainerImage) + ) + + t.is( + 0, + sh.exec(`kubectl apply -f ${deploymentAddressRefTmpFile.name} --namespace ${testNamespace}`).code, + 'creating a deployment using redis address var should work..' + ) + + + const deploymentHostPortRefTriggerAuthTmpFile = tmp.fileSync() + + fs.writeFileSync(deploymentHostPortRefTriggerAuthTmpFile.name, redisListDeployHostPortInTriggerAuhYaml.replace(/{{REDIS_PASSWORD}}/g, redisPassword) + .replace(/{{REDIS_SENTINEL_PASSWORD}}/g, redisPassword) + .replace(/{{REDIS_SENTINEL_MASTER}}/g, redisSentinelMasterName) + .replace(/{{REDIS_HOSTS}}/g, redisHost) + .replace(/{{REDIS_PORTS}}/g, redisPort.toString()) + .replace(/{{LIST_NAME}}/g, listNameForHostPortTriggerAuth) + .replace(/{{DEPLOYMENT_NAME}}/g, redisWorkerHostPortRefTriggerAuthDeploymentName) + .replace(/{{CONTAINER_IMAGE}}/g, deploymentContainerImage) + ) + + t.is( + 0, + sh.exec(`kubectl apply -f ${deploymentHostPortRefTriggerAuthTmpFile.name} --namespace ${testNamespace}`).code, + 'creating a deployment using redis host port in trigger auth should work..' + ) +}) + +test.serial('Deployment for redis host and port env vars should have 0 replica on start', t => { + + const replicaCount = sh.exec( + `kubectl get deployment/${redisWorkerHostPortRefDeploymentName} --namespace ${testNamespace} -o jsonpath="{.spec.replicas}"` + ).stdout + t.is(replicaCount, '0', 'replica count should start out as 0') +}) + + +test.serial(`Deployment using redis host port env vars should max and scale to 5 with ${itemsToWrite} items written to list and back to 0`, t => { + runWriteJob(t, writeJobNameForHostPortRef, listNameForHostPortRef) + + let replicaCount = '0' + for (let i = 0; i < 30 && replicaCount !== '5'; i++) { + replicaCount = sh.exec( + `kubectl get deployment/${redisWorkerHostPortRefDeploymentName} --namespace ${testNamespace} -o jsonpath="{.spec.replicas}"` + ).stdout + t.log('(scale up) replica count is:' + replicaCount) + if (replicaCount !== '5') { + sh.exec('sleep 3s') + } + } + + t.is('5', replicaCount, 'Replica count should be 5 within 60 seconds') + + for (let i = 0; i < 12 && replicaCount !== '0'; i++) { + replicaCount = sh.exec( + `kubectl get deployment/${redisWorkerHostPortRefDeploymentName} --namespace ${testNamespace} -o jsonpath="{.spec.replicas}"` + ).stdout + t.log('(scale down) replica count is:' + replicaCount) + if (replicaCount !== '0') { + sh.exec('sleep 10s') + } + } + + t.is('0', replicaCount, 'Replica count should be 0 within 2 minutes') +}) + +test.serial('Deployment for redis address env var should have 0 replica on start', t => { + + const replicaCount = sh.exec( + `kubectl get deployment/${redisWorkerAddressRefDeploymentName} --namespace ${testNamespace} -o jsonpath="{.spec.replicas}"` + ).stdout + t.is(replicaCount, '0', 'replica count should start out as 0') +}) + + + +test.serial(`Deployment using redis address env var should max and scale to 5 with ${itemsToWrite} items written to list and back to 0`, t => { + + runWriteJob(t, writeJobNameForAddressRef, listNameForAddressRef) + + let replicaCount = '0' + for (let i = 0; i < 30 && replicaCount !== '5'; i++) { + replicaCount = sh.exec( + `kubectl get deployment/${redisWorkerAddressRefDeploymentName} --namespace ${testNamespace} -o jsonpath="{.spec.replicas}"` + ).stdout + t.log('(scale up) replica count is:' + replicaCount) + if (replicaCount !== '5') { + sh.exec('sleep 3s') + } + } + + t.is('5', replicaCount, 'Replica count should be 5 within 60 seconds') + + for (let i = 0; i < 12 && replicaCount !== '0'; i++) { + replicaCount = sh.exec( + `kubectl get deployment/${redisWorkerAddressRefDeploymentName} --namespace ${testNamespace} -o jsonpath="{.spec.replicas}"` + ).stdout + t.log('(scale down) replica count is:' + replicaCount) + if (replicaCount !== '0') { + sh.exec('sleep 10s') + } + } + + t.is('0', replicaCount, 'Replica count should be 0 within 2 minutes') +}) + + +test.serial('Deployment for redis host and port in the trigger auth should have 0 replica on start', t => { + + const replicaCount = sh.exec( + `kubectl get deployment/${redisWorkerHostPortRefTriggerAuthDeploymentName} --namespace ${testNamespace} -o jsonpath="{.spec.replicas}"` + ).stdout + t.is(replicaCount, '0', 'replica count should start out as 0') +}) + + +test.serial(`Deployment using redis host port in triggerAuth should max and scale to 5 with ${itemsToWrite} items written to list and back to 0`, t => { + + runWriteJob(t, writeJobNameForHostPortInTriggerAuth, listNameForHostPortTriggerAuth) + + let replicaCount = '0' + for (let i = 0; i < 30 && replicaCount !== '5'; i++) { + replicaCount = sh.exec( + `kubectl get deployment/${redisWorkerHostPortRefTriggerAuthDeploymentName} --namespace ${testNamespace} -o jsonpath="{.spec.replicas}"` + ).stdout + t.log('(scale up) replica count is:' + replicaCount) + if (replicaCount !== '5') { + sh.exec('sleep 3s') + } + } + + t.is('5', replicaCount, 'Replica count should be 5 within 60 seconds') + + for (let i = 0; i < 12 && replicaCount !== '0'; i++) { + replicaCount = sh.exec( + `kubectl get deployment/${redisWorkerHostPortRefTriggerAuthDeploymentName} --namespace ${testNamespace} -o jsonpath="{.spec.replicas}"` + ).stdout + t.log('(scale down) replica count is:' + replicaCount) + if (replicaCount !== '0') { + sh.exec('sleep 10s') + } + } + + t.is('0', replicaCount, 'Replica count should be 0 within 2 minutes') +}) + + +test.after.always.cb('clean up deployment', t => { + const resources = [ + `job/${writeJobNameForHostPortRef}`, + `job/${writeJobNameForAddressRef}`, + `job/${writeJobNameForHostPortInTriggerAuth}`, + `scaledobject.keda.sh/${redisWorkerHostPortRefDeploymentName}`, + `scaledobject.keda.sh/${redisWorkerAddressRefDeploymentName}`, + `scaledobject.keda.sh/${redisWorkerHostPortRefTriggerAuthDeploymentName}`, + 'triggerauthentication.keda.sh/keda-redis-sentinel-list-triggerauth', + 'triggerauthentication.keda.sh/keda-redis-sentinel-list-triggerauth-host-port', + `deployment/${redisWorkerAddressRefDeploymentName}`, + `deployment/${redisWorkerHostPortRefTriggerAuthDeploymentName}`, + `deployment/${redisWorkerHostPortRefDeploymentName}`, + 'secret/redis-password', + ] + + for (const resource of resources) { + sh.exec(`kubectl delete ${resource} --namespace ${testNamespace}`) + } + sh.exec(`kubectl delete namespace ${testNamespace}`) + + sh.exec(`helm delete ${redisSentinelName} --namespace ${redisNamespace}`) + sh.exec(`kubectl delete namespace ${redisNamespace}`) + t.end() +}) + +function runWriteJob(t, jobName, listName) { + // write to list + const tmpFile = tmp.fileSync() + fs.writeFileSync(tmpFile.name, writeJobYaml.replace('{{REDIS_ADDRESSES}}', redisAddress).replace('{{REDIS_PASSWORD}}', redisPassword) + .replace('{{REDIS_SENTINEL_PASSWORD}}', redisPassword) + .replace('{{REDIS_SENTINEL_MASTER}}', redisSentinelMasterName) + .replace('{{LIST_NAME}}', listName) + .replace('{{NUMBER_OF_ITEMS_TO_WRITE}}', itemsToWrite.toString()) + .replace('{{CONTAINER_IMAGE}}', deploymentContainerImage) + .replace('{{JOB_NAME}}', jobName) + ) + + t.is( + 0, + sh.exec(`kubectl apply -f ${tmpFile.name} --namespace ${testNamespace}`).code, + 'list writer job should apply.' + ) + + // wait for the write job to complete + for (let i = 0; i < 20; i++) { + const succeeded = sh.exec(`kubectl get job ${writeJobNameForHostPortRef} --namespace ${testNamespace} -o jsonpath='{.items[0].status.succeeded}'`).stdout + if (succeeded == '1') { + break + } + sh.exec('sleep 1s') + } +} + +const redisListDeployHostPortYaml = `apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{DEPLOYMENT_NAME}} + labels: + app: {{DEPLOYMENT_NAME}} +spec: + replicas: 0 + selector: + matchLabels: + app: {{DEPLOYMENT_NAME}} + template: + metadata: + labels: + app: {{DEPLOYMENT_NAME}} + spec: + containers: + - name: redis-worker + image: {{CONTAINER_IMAGE}} + imagePullPolicy: IfNotPresent + command: ["./main"] + args: ["read"] + env: + - name: REDIS_HOSTS + value: {{REDIS_HOSTS}} + - name: REDIS_PORTS + value: "{{REDIS_PORTS}}" + - name: LIST_NAME + value: {{LIST_NAME}} + - name: REDIS_PASSWORD + value: {{REDIS_PASSWORD}} + - name: REDIS_SENTINEL_PASSWORD + value: {{REDIS_SENTINEL_PASSWORD}} + - name: REDIS_SENTINEL_MASTER + value: {{REDIS_SENTINEL_MASTER}} + - name: READ_PROCESS_TIME + value: "500" +--- +apiVersion: keda.sh/v1alpha1 +kind: ScaledObject +metadata: + name: {{DEPLOYMENT_NAME}} +spec: + scaleTargetRef: + name: {{DEPLOYMENT_NAME}} + pollingInterval: 5 + cooldownPeriod: 30 + minReplicaCount: 0 + maxReplicaCount: 5 + triggers: + - type: redis-sentinel + metadata: + hostsFromEnv: REDIS_HOSTS + portsFromEnv: REDIS_PORTS + listName: {{LIST_NAME}} + listLength: "5" + sentinelMaster: {{REDIS_SENTINEL_MASTER}} + authenticationRef: + name: keda-redis-sentinel-list-triggerauth +` + + +const redisListDeployAddressYaml = `apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{DEPLOYMENT_NAME}} + labels: + app: {{DEPLOYMENT_NAME}} +spec: + replicas: 0 + selector: + matchLabels: + app: {{DEPLOYMENT_NAME}} + template: + metadata: + labels: + app: {{DEPLOYMENT_NAME}} + spec: + containers: + - name: redis-worker + image: {{CONTAINER_IMAGE}} + imagePullPolicy: IfNotPresent + command: ["./main"] + args: ["read"] + env: + - name: REDIS_ADDRESSES + value: {{REDIS_ADDRESSES}} + - name: LIST_NAME + value: {{LIST_NAME}} + - name: REDIS_PASSWORD + value: {{REDIS_PASSWORD}} + - name: REDIS_SENTINEL_PASSWORD + value: {{REDIS_SENTINEL_PASSWORD}} + - name: REDIS_SENTINEL_MASTER + value: {{REDIS_SENTINEL_MASTER}} + - name: READ_PROCESS_TIME + value: "500" +--- +apiVersion: keda.sh/v1alpha1 +kind: ScaledObject +metadata: + name: {{DEPLOYMENT_NAME}} +spec: + scaleTargetRef: + name: {{DEPLOYMENT_NAME}} + pollingInterval: 5 + cooldownPeriod: 30 + minReplicaCount: 0 + maxReplicaCount: 5 + triggers: + - type: redis-sentinel + metadata: + addressesFromEnv: REDIS_ADDRESSES + listName: {{LIST_NAME}} + listLength: "5" + sentinelMaster: {{REDIS_SENTINEL_MASTER}} + authenticationRef: + name: keda-redis-sentinel-list-triggerauth +` + +const redisListDeployHostPortInTriggerAuhYaml = `apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{DEPLOYMENT_NAME}} + labels: + app: {{DEPLOYMENT_NAME}} +spec: + replicas: 0 + selector: + matchLabels: + app: {{DEPLOYMENT_NAME}} + template: + metadata: + labels: + app: {{DEPLOYMENT_NAME}} + spec: + containers: + - name: redis-worker + image: {{CONTAINER_IMAGE}} + imagePullPolicy: IfNotPresent + command: ["./main"] + args: ["read"] + env: + - name: REDIS_HOSTS + value: {{REDIS_HOSTS}} + - name: REDIS_PORTS + value: "{{REDIS_PORTS}}" + - name: LIST_NAME + value: {{LIST_NAME}} + - name: REDIS_PASSWORD + value: {{REDIS_PASSWORD}} + - name: REDIS_SENTINEL_PASSWORD + value: {{REDIS_SENTINEL_PASSWORD}} + - name: REDIS_SENTINEL_MASTER + value: {{REDIS_SENTINEL_MASTER}} + - name: READ_PROCESS_TIME + value: "500" +--- +apiVersion: keda.sh/v1alpha1 +kind: ScaledObject +metadata: + name: {{DEPLOYMENT_NAME}} +spec: + scaleTargetRef: + name: {{DEPLOYMENT_NAME}} + pollingInterval: 5 + cooldownPeriod: 30 + minReplicaCount: 0 + maxReplicaCount: 5 + triggers: + - type: redis-sentinel + metadata: + listName: {{LIST_NAME}} + listLength: "5" + sentinelMaster: {{REDIS_SENTINEL_MASTER}} + authenticationRef: + name: keda-redis-sentinel-list-triggerauth-host-port +` + +const scaledObjectTriggerAuthHostPortYaml = `apiVersion: v1 +kind: Secret +metadata: + name: redis-config +type: Opaque +data: + password: {{REDIS_PASSWORD}} + sentinelPassword: {{REDIS_SENTINEL_PASSWORD}} + redisHost: {{REDIS_HOSTS}} + redisPort: {{REDIS_PORTS}} +--- +apiVersion: keda.sh/v1alpha1 +kind: TriggerAuthentication +metadata: + name: keda-redis-sentinel-list-triggerauth-host-port +spec: + secretTargetRef: + - parameter: password + name: redis-config + key: password + - parameter: sentinelPassword + name: redis-config + key: sentinelPassword + - parameter: hosts + name: redis-config + key: redisHost + - parameter: ports + name: redis-config + key: redisPort +` + +const scaledObjectTriggerAuthYaml = `apiVersion: v1 +kind: Secret +metadata: + name: redis-password +type: Opaque +data: + password: {{REDIS_PASSWORD}} + sentinelPassword: {{REDIS_SENTINEL_PASSWORD}} +--- +apiVersion: keda.sh/v1alpha1 +kind: TriggerAuthentication +metadata: + name: keda-redis-sentinel-list-triggerauth +spec: + secretTargetRef: + - parameter: password + name: redis-password + key: password + - parameter: sentinelPassword + name: redis-password + key: sentinelPassword +` + + +const writeJobYaml = `apiVersion: batch/v1 +kind: Job +metadata: + name: {{JOB_NAME}} +spec: + template: + spec: + containers: + - name: redis + image: {{CONTAINER_IMAGE}} + imagePullPolicy: IfNotPresent + command: ["./main"] + env: + - name: REDIS_ADDRESSES + value: {{REDIS_ADDRESSES}} + - name: REDIS_PASSWORD + value: {{REDIS_PASSWORD}} + - name: REDIS_SENTINEL_PASSWORD + value: {{REDIS_SENTINEL_PASSWORD}} + - name: REDIS_SENTINEL_MASTER + value: {{REDIS_SENTINEL_MASTER}} + - name: LIST_NAME + value: {{LIST_NAME}} + - name: NO_LIST_ITEMS_TO_WRITE + value: "{{NUMBER_OF_ITEMS_TO_WRITE}}" + args: ["write"] + restartPolicy: Never + backoffLimit: 4 +` diff --git a/tests/scalers/redis-sentinel-streams.test.ts b/tests/scalers/redis-sentinel-streams.test.ts new file mode 100644 index 00000000000..57149bc11d0 --- /dev/null +++ b/tests/scalers/redis-sentinel-streams.test.ts @@ -0,0 +1,227 @@ +import test from 'ava' +import * as sh from 'shelljs' +import * as tmp from 'tmp' +import * as fs from 'fs' +import {waitForDeploymentReplicaCount, waitForRollout} from "./helpers"; + +const redisNamespace = 'redis-sentinel-streams' +const redisSentinelName = 'redis-sentinel-streams' +const redisSentinelMasterName = 'mymaster' +const redisStatefulSetName = 'redis-sentinel-streams-node' +const redisService = 'redis-sentinel-streams' +const testNamespace = 'redis-sentinel-streams-test' +const redisPassword = 'foobared' +let redisHost = '' +const numMessages = 100 + +test.before(t => { + // Deploy Redis Sentinel. + sh.exec(`kubectl create namespace ${redisNamespace}`) + sh.exec(`helm repo add bitnami https://charts.bitnami.com/bitnami`) + + let sentinelStatus = sh.exec(`helm install --timeout 600s ${redisSentinelName} --namespace ${redisNamespace} --set "sentinel.enabled=true" --set "global.redis.password=${redisPassword}" bitnami/redis`).code + t.is(0, + sentinelStatus, + 'creating a Redis Sentinel setup should work.' + ) + + // Wait for Redis Sentinel to be ready. + let exitCode = waitForRollout('statefulset', redisStatefulSetName, redisNamespace) + t.is(0, exitCode, 'expected rollout status for redis to finish successfully') + + // Get Redis Sentinel address. + redisHost = sh.exec(`kubectl get svc ${redisService} -n ${redisNamespace} -o jsonpath='{.spec.clusterIP}'`) + + // Create test namespace. + sh.exec(`kubectl create namespace ${testNamespace}`) + + // Deploy streams consumer app, scaled object etc. + const tmpFile = tmp.fileSync() + const base64Password = Buffer.from(redisPassword).toString('base64') + + fs.writeFileSync(tmpFile.name, redisStreamsDeployYaml.replace('{{REDIS_PASSWORD}}', base64Password).replace('{{REDIS_SENTINEL_PASSWORD}}', base64Password).replace('{{REDIS_SENTINEL_MASTER}}', redisSentinelMasterName).replace('{{REDIS_HOSTS}}', redisHost)) + t.is( + 0, + sh.exec(`kubectl apply -f ${tmpFile.name} --namespace ${testNamespace}`).code, + 'creating a deployment should work..' + ) +}) + +test.serial('Deployment should have 1 replica on start', t => { + + const replicaCount = sh.exec( + `kubectl get deployment/redis-streams-consumer --namespace ${testNamespace} -o jsonpath="{.spec.replicas}"` + ).stdout + t.is(replicaCount, '1', 'replica count should start out as 1') +}) + +test.serial(`Deployment should scale to 5 with ${numMessages} messages and back to 1`, async t => { + // Publish messages to redis streams. + const tmpFile = tmp.fileSync() + fs.writeFileSync(tmpFile.name, producerDeployYaml.replace('{{NUM_MESSAGES}}', numMessages.toString()) + .replace('{{REDIS_SENTINEL_MASTER}}', redisSentinelMasterName) + .replace('{{REDIS_HOSTS}}', redisHost)) + t.is( + 0, + sh.exec(`kubectl apply -f ${tmpFile.name} --namespace ${testNamespace}`).code, + 'producer job should apply.' + ) + + // Wait for producer job to finish. + for (let i = 0; i < 40; i++) { + const succeeded = sh.exec(`kubectl get job --namespace ${testNamespace} -o jsonpath='{.items[0].status.succeeded}'`).stdout + if (succeeded == '1') { + break + } + sh.exec('sleep 1s') + } + // With messages published, the consumer deployment should start receiving the messages. + t.true(await waitForDeploymentReplicaCount(5, 'redis-streams-consumer', testNamespace, 30, 3000), 'Replica count should be 5 within 60 seconds') + t.true(await waitForDeploymentReplicaCount(1, 'redis-streams-consumer', testNamespace, 60, 10000), 'Replica count should be 1 within 10 minutes') +}) + + + +test.after.always.cb('clean up deployment', t => { + const resources = [ + 'scaledobject.keda.sh/redis-streams-scaledobject', + 'triggerauthentications.keda.sh/keda-redis-stream-triggerauth', + 'secret/redis-password', + 'deployment/redis-streams-consumer', + 'job/redis-streams-producer', + ] + + for (const resource of resources) { + sh.exec(`kubectl delete ${resource} --namespace ${testNamespace}`) + } + sh.exec(`kubectl delete namespace ${testNamespace}`) + + sh.exec(`helm delete ${redisSentinelName} --namespace ${redisNamespace}`) + sh.exec(`kubectl delete namespace ${redisNamespace}`) + t.end() +}) + +const redisStreamsDeployYaml = `apiVersion: v1 +kind: Secret +metadata: + name: redis-password +type: Opaque +data: + password: {{REDIS_PASSWORD}} + sentinelPassword: {{REDIS_SENTINEL_PASSWORD}} +--- +apiVersion: keda.sh/v1alpha1 +kind: TriggerAuthentication +metadata: + name: keda-redis-stream-triggerauth +spec: + secretTargetRef: + - parameter: password + name: redis-password + key: password + - parameter: sentinelPassword + name: redis-password + key: sentinelPassword +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: redis-streams-consumer +spec: + replicas: 1 + selector: + matchLabels: + app: redis-streams-consumer + template: + metadata: + labels: + app: redis-streams-consumer + spec: + containers: + - name: redis-streams-consumer + image: goku321/redis-cluster-streams:v2.5 + command: ["./main"] + args: ["consumer"] + imagePullPolicy: Always + env: + - name: REDIS_HOSTS + value: {{REDIS_HOSTS}} + - name: REDIS_PORTS + value: "26379" + - name: REDIS_STREAM_NAME + value: my-stream + - name: REDIS_STREAM_CONSUMER_GROUP_NAME + value: consumer-group-1 + - name: REDIS_SENTINEL_MASTER + value: {{REDIS_SENTINEL_MASTER}} + - name: REDIS_PASSWORD + valueFrom: + secretKeyRef: + name: redis-password + key: password + - name: REDIS_SENTINEL_PASSWORD + valueFrom: + secretKeyRef: + name: redis-password + key: sentinelPassword +--- +apiVersion: keda.sh/v1alpha1 +kind: ScaledObject +metadata: + name: redis-streams-scaledobject +spec: + scaleTargetRef: + name: redis-streams-consumer + pollingInterval: 5 + cooldownPeriod: 10 + minReplicaCount: 1 + maxReplicaCount: 5 + triggers: + - type: redis-sentinel-streams + metadata: + hostsFromEnv: REDIS_HOSTS + portsFromEnv: REDIS_PORTS + sentinelPasswordFromEnv: REDIS_SENTINEL_MASTER + stream: my-stream + consumerGroup: consumer-group-1 + pendingEntriesCount: "10" + authenticationRef: + name: keda-redis-stream-triggerauth +` + +const producerDeployYaml = `apiVersion: batch/v1 +kind: Job +metadata: + name: redis-streams-producer +spec: + template: + spec: + containers: + - name: producer + image: goku321/redis-cluster-streams:v2.5 + command: ["./main"] + args: ["producer"] + imagePullPolicy: Always + env: + - name: REDIS_HOSTS + value: {{REDIS_HOSTS}} + - name: REDIS_PORTS + value: "26379" + - name: REDIS_STREAM_NAME + value: my-stream + - name: NUM_MESSAGES + value: "{{NUM_MESSAGES}}" + - name: REDIS_SENTINEL_MASTER + value: "{{REDIS_SENTINEL_MASTER}}" + - name: REDIS_PASSWORD + valueFrom: + secretKeyRef: + name: redis-password + key: password + - name: REDIS_SENTINEL_PASSWORD + valueFrom: + secretKeyRef: + name: redis-password + key: sentinelPassword + restartPolicy: Never +` From b5f1b40786819987592425c5c9aff43cf7d1ece1 Mon Sep 17 00:00:00 2001 From: Jeroen Bobbeldijk Date: Wed, 13 Oct 2021 09:35:36 +0200 Subject: [PATCH 5/6] Use Keda context in some cases Signed-off-by: Jeroen Bobbeldijk --- pkg/scalers/redis_scaler.go | 20 +++++++++----------- pkg/scalers/redis_scaler_test.go | 3 ++- pkg/scalers/redis_streams_scaler.go | 12 ++++++------ pkg/scalers/redis_streams_scaler_test.go | 3 ++- 4 files changed, 19 insertions(+), 19 deletions(-) diff --git a/pkg/scalers/redis_scaler.go b/pkg/scalers/redis_scaler.go index 37a5a529afb..afbfd5d6499 100644 --- a/pkg/scalers/redis_scaler.go +++ b/pkg/scalers/redis_scaler.go @@ -24,14 +24,12 @@ const ( defaultEnableTLS = false ) -var ctx = context.Background() - type redisAddressParser func(metadata, resolvedEnv, authParams map[string]string) (redisConnectionInfo, error) type redisScaler struct { metadata *redisMetadata closeFn func() error - getListLengthFn func() (int64, error) + getListLengthFn func(ctx context.Context) (int64, error) } type redisConnectionInfo struct { @@ -105,7 +103,7 @@ func createClusteredRedisScaler(meta *redisMetadata, script string) (Scaler, err return nil } - listLengthFn := func() (int64, error) { + listLengthFn := func(ctx context.Context) (int64, error) { cmd := client.Eval(ctx, script, []string{meta.listName}) if cmd.Err() != nil { return -1, cmd.Err() @@ -135,7 +133,7 @@ func createSentinelRedisScaler(meta *redisMetadata, script string) (Scaler, erro return nil } - listLengthFn := func() (int64, error) { + listLengthFn := func(ctx context.Context) (int64, error) { cmd := client.Eval(ctx, script, []string{meta.listName}) if cmd.Err() != nil { return -1, cmd.Err() @@ -165,7 +163,7 @@ func createRedisScaler(meta *redisMetadata, script string) (Scaler, error) { return nil } - listLengthFn := func() (int64, error) { + listLengthFn := func(ctx context.Context) (int64, error) { cmd := client.Eval(ctx, script, []string{meta.listName}) if cmd.Err() != nil { return -1, cmd.Err() @@ -219,7 +217,7 @@ func parseRedisMetadata(config *ScalerConfig, parserFn redisAddressParser) (*red // IsActive checks if there is any element in the Redis list func (s *redisScaler) IsActive(ctx context.Context) (bool, error) { - length, err := s.getListLengthFn() + length, err := s.getListLengthFn(ctx) if err != nil { redisLog.Error(err, "error") @@ -253,7 +251,7 @@ func (s *redisScaler) GetMetricSpecForScaling() []v2beta2.MetricSpec { // GetMetrics connects to Redis and finds the length of the list func (s *redisScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) { - listLen, err := s.getListLengthFn() + listLen, err := s.getListLengthFn(ctx) if err != nil { redisLog.Error(err, "error getting list length") @@ -482,7 +480,7 @@ func getRedisClusterClient(info redisConnectionInfo) (*redis.ClusterClient, erro // confirm if connected c := redis.NewClusterClient(options) - err := c.Ping(ctx).Err() + err := c.Ping(context.Background()).Err() if err != nil { return nil, err } @@ -507,7 +505,7 @@ func getRedisSentinelClient(info redisConnectionInfo, dbIndex int) (*redis.Clien // confirm if connected c := redis.NewFailoverClient(options) - err := c.Ping(ctx).Err() + err := c.Ping(context.Background()).Err() if err != nil { return nil, err } @@ -529,7 +527,7 @@ func getRedisClient(info redisConnectionInfo, dbIndex int) (*redis.Client, error // confirm if connected c := redis.NewClient(options) - err := c.Ping(ctx).Err() + err := c.Ping(context.Background()).Err() if err != nil { return nil, err } diff --git a/pkg/scalers/redis_scaler_test.go b/pkg/scalers/redis_scaler_test.go index 6b298702ea8..27aecb1d1c7 100644 --- a/pkg/scalers/redis_scaler_test.go +++ b/pkg/scalers/redis_scaler_test.go @@ -1,6 +1,7 @@ package scalers import ( + "context" "errors" "testing" @@ -79,7 +80,7 @@ func TestRedisGetMetricSpecForScaling(t *testing.T) { t.Fatal("Could not parse metadata:", err) } closeFn := func() error { return nil } - lengthFn := func() (int64, error) { return -1, nil } + lengthFn := func(ctx context.Context) (int64, error) { return -1, nil } mockRedisScaler := redisScaler{ meta, closeFn, diff --git a/pkg/scalers/redis_streams_scaler.go b/pkg/scalers/redis_streams_scaler.go index 5bea2aa0487..d74db5a10d5 100644 --- a/pkg/scalers/redis_streams_scaler.go +++ b/pkg/scalers/redis_streams_scaler.go @@ -33,7 +33,7 @@ const ( type redisStreamsScaler struct { metadata *redisStreamsMetadata closeFn func() error - getPendingEntriesCountFn func() (int64, error) + getPendingEntriesCountFn func(ctx context.Context) (int64, error) } type redisStreamsMetadata struct { @@ -82,7 +82,7 @@ func createClusteredRedisStreamsScaler(meta *redisStreamsMetadata) (Scaler, erro return nil } - pendingEntriesCountFn := func() (int64, error) { + pendingEntriesCountFn := func(ctx context.Context) (int64, error) { pendingEntries, err := client.XPending(ctx, meta.streamName, meta.consumerGroupName).Result() if err != nil { return -1, err @@ -111,7 +111,7 @@ func createSentinelRedisStreamsScaler(meta *redisStreamsMetadata) (Scaler, error return nil } - pendingEntriesCountFn := func() (int64, error) { + pendingEntriesCountFn := func(ctx context.Context) (int64, error) { pendingEntries, err := client.XPending(ctx, meta.streamName, meta.consumerGroupName).Result() if err != nil { return -1, err @@ -140,7 +140,7 @@ func createRedisStreamsScaler(meta *redisStreamsMetadata) (Scaler, error) { return nil } - pendingEntriesCountFn := func() (int64, error) { + pendingEntriesCountFn := func(ctx context.Context) (int64, error) { pendingEntries, err := client.XPending(ctx, meta.streamName, meta.consumerGroupName).Result() if err != nil { return -1, err @@ -201,7 +201,7 @@ func parseRedisStreamsMetadata(config *ScalerConfig, parseFn redisAddressParser) // IsActive checks if there are pending entries in the 'Pending Entries List' for consumer group of a stream func (s *redisStreamsScaler) IsActive(ctx context.Context) (bool, error) { - count, err := s.getPendingEntriesCountFn() + count, err := s.getPendingEntriesCountFn(ctx) if err != nil { redisStreamsLog.Error(err, "error") @@ -233,7 +233,7 @@ func (s *redisStreamsScaler) GetMetricSpecForScaling() []v2beta2.MetricSpec { // GetMetrics fetches the number of pending entries for a consumer group in a stream func (s *redisStreamsScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) { - pendingEntriesCount, err := s.getPendingEntriesCountFn() + pendingEntriesCount, err := s.getPendingEntriesCountFn(ctx) if err != nil { redisStreamsLog.Error(err, "error fetching pending entries count") diff --git a/pkg/scalers/redis_streams_scaler_test.go b/pkg/scalers/redis_streams_scaler_test.go index 603c2492fbd..704ff95a59d 100644 --- a/pkg/scalers/redis_streams_scaler_test.go +++ b/pkg/scalers/redis_streams_scaler_test.go @@ -1,6 +1,7 @@ package scalers import ( + "context" "errors" "strconv" "testing" @@ -141,7 +142,7 @@ func TestRedisStreamsGetMetricSpecForScaling(t *testing.T) { t.Fatal("Could not parse metadata:", err) } closeFn := func() error { return nil } - getPendingEntriesCountFn := func() (int64, error) { return -1, nil } + getPendingEntriesCountFn := func(ctx context.Context) (int64, error) { return -1, nil } mockRedisStreamsScaler := redisStreamsScaler{meta, closeFn, getPendingEntriesCountFn} metricSpec := mockRedisStreamsScaler.GetMetricSpecForScaling() From 599fd0946eff77c011929eaa0ef05a53d4b96831 Mon Sep 17 00:00:00 2001 From: Jeroen Bobbeldijk Date: Mon, 1 Nov 2021 14:05:37 +0100 Subject: [PATCH 6/6] Use proper images for e2e tests Signed-off-by: Jeroen Bobbeldijk --- tests/scalers/redis-sentinel-lists.test.ts | 2 +- tests/scalers/redis-sentinel-streams.test.ts | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/scalers/redis-sentinel-lists.test.ts b/tests/scalers/redis-sentinel-lists.test.ts index b3b082d1b51..cc95430222d 100644 --- a/tests/scalers/redis-sentinel-lists.test.ts +++ b/tests/scalers/redis-sentinel-lists.test.ts @@ -21,7 +21,7 @@ const redisWorkerHostPortRefDeploymentName = 'redis-worker-test-hostport' const redisWorkerAddressRefDeploymentName = 'redis-worker-test-address' const redisWorkerHostPortRefTriggerAuthDeploymentName = 'redis-worker-test-hostport-triggerauth' const itemsToWrite = 200 -const deploymentContainerImage = 'goku321/redis-cluster-list:v1.7' +const deploymentContainerImage = 'ghcr.io/kedacore/tests-redis-sentinel-lists' const writeJobNameForHostPortRef = 'redis-writer-host-port-ref' const writeJobNameForAddressRef = 'redis-writer-address-ref' const writeJobNameForHostPortInTriggerAuth = 'redis-writer-host-port-trigger-auth' diff --git a/tests/scalers/redis-sentinel-streams.test.ts b/tests/scalers/redis-sentinel-streams.test.ts index 57149bc11d0..61e43173803 100644 --- a/tests/scalers/redis-sentinel-streams.test.ts +++ b/tests/scalers/redis-sentinel-streams.test.ts @@ -139,7 +139,7 @@ spec: spec: containers: - name: redis-streams-consumer - image: goku321/redis-cluster-streams:v2.5 + image: ghcr.io/kedacore/tests-redis-sentinel-streams command: ["./main"] args: ["consumer"] imagePullPolicy: Always @@ -198,7 +198,7 @@ spec: spec: containers: - name: producer - image: goku321/redis-cluster-streams:v2.5 + image: ghcr.io/kedacore/tests-redis-sentinel-streams command: ["./main"] args: ["producer"] imagePullPolicy: Always