Skip to content

Commit

Permalink
Add test code and Dockerfile(s) for redis cluster scaler (#21)
Browse files Browse the repository at this point in the history
  • Loading branch information
Deepak Sah authored Oct 25, 2021
1 parent c552363 commit ff2aca8
Show file tree
Hide file tree
Showing 6 changed files with 395 additions and 0 deletions.
10 changes: 10 additions & 0 deletions e2e/images/redis-cluster/Dockerfile.lists
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
FROM golang

WORKDIR /cmd/lists

COPY ./cmd/lists .
COPY go.mod go.sum ./

RUN go build -o main .

CMD [ "./main" ]
10 changes: 10 additions & 0 deletions e2e/images/redis-cluster/Dockerfile.streams
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
FROM golang

WORKDIR /cmd/streams

COPY ./cmd/streams .
COPY go.mod go.sum ./

RUN go build -o main .

CMD [ "./main" ]
116 changes: 116 additions & 0 deletions e2e/images/redis-cluster/cmd/lists/lists.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package main

import (
"context"
"fmt"
"log"
"os"
"strconv"
"strings"
"time"

redis "github.com/go-redis/redis/v8"
)

func splitAndTrim(s, sep, toTrim string) []string {
x := strings.Split(s, sep)
for i := range x {
x[i] = strings.Trim(x[i], toTrim)
}
return x
}

func parseAddress() []string {
addrString := os.Getenv("REDIS_ADDRESSES")
if len(addrString) != 0 {
return splitAndTrim(addrString, ",", " ")
}
hostString := os.Getenv("REDIS_HOSTS")
portString := os.Getenv("REDIS_PORTS")
hosts := splitAndTrim(hostString, ",", " ")
ports := splitAndTrim(portString, ",", " ")
addrs := []string{}
if len(hosts) != len(ports) {
return addrs
}
for i := range hosts {
addrs = append(addrs, fmt.Sprintf("%s:%s", hosts[i], ports[i]))
}
return addrs
}

func writeToRedisList() error {
addrs := parseAddress()
pass := os.Getenv("REDIS_PASSWORD")
opts := redis.ClusterOptions{
Addrs: addrs,
Password: pass,
}
client := redis.NewClusterClient(&opts)
list := os.Getenv("LIST_NAME")
itemCount, err := strconv.ParseInt(os.Getenv("NO_LIST_ITEMS_TO_WRITE"), 10, 32)
if err != nil {
return fmt.Errorf("number of items to write should be a number: %s", err.Error())
}

for i := 0; i < int(itemCount); i++ {
x := client.LPush(context.Background(), list, i)
if x.Err() != nil {
return fmt.Errorf("failed to write to redis list: %s", x.Err().Error())
}
time.Sleep(time.Millisecond * 100)
}
return nil
}

func readFromRedisList() error {
addrs := parseAddress()
pass := os.Getenv("REDIS_PASSWORD")
opts := redis.ClusterOptions{
Addrs: addrs,
Password: pass,
}
client := redis.NewClusterClient(&opts)
list := os.Getenv("LIST_NAME")

waitTime, err := strconv.ParseInt(os.Getenv("READ_PROCESS_TIME"), 10, 32)
if err != nil {
return fmt.Errorf("read process time should be a number: %s", err.Error())
}

for {
len, err := client.LLen(context.Background(), list).Result()
if err != nil {
return err
}
if len > 0 {
x := client.LPop(context.Background(), list)
if x.Err() != nil {
return fmt.Errorf("failed to read from redis list: %s", x.Err().Error())
}
}
time.Sleep(time.Millisecond * time.Duration(waitTime))
}
}

func main() {
action := ""
if len(os.Args) > 0 {
action = os.Args[1]
}
if action == "write" {
err := writeToRedisList()
if err != nil {
log.Fatalf("write to redis list failed: %v\n", err)
}
log.Println("write to redis list is successful")
} else if action == "read" {
err := readFromRedisList()
if err != nil {
log.Fatalf("read from redis list failed: %v\n", err)
}
log.Println("read from redis list is successful")
} else {
log.Printf("unknown action: %s\n", action)
}
}
157 changes: 157 additions & 0 deletions e2e/images/redis-cluster/cmd/streams/streams.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
package main

import (
"context"
"fmt"
"log"
"os"
"strconv"
"strings"
"time"

redis "github.com/go-redis/redis/v8"
)

func splitAndTrim(s, sep, toTrim string) []string {
x := strings.Split(s, sep)
for i := range x {
x[i] = strings.Trim(x[i], toTrim)
}
return x
}

func parseAddress() []string {
addrString := os.Getenv("REDIS_ADDRESSES")
if len(addrString) != 0 {
return splitAndTrim(addrString, ",", " ")
}
hostString := os.Getenv("REDIS_HOSTS")
portString := os.Getenv("REDIS_PORTS")
hosts := splitAndTrim(hostString, ",", " ")
ports := splitAndTrim(portString, ",", " ")
addrs := []string{}
if len(hosts) != len(ports) {
return addrs
}
for i := range hosts {
addrs = append(addrs, fmt.Sprintf("%s:%s", hosts[i], ports[i]))
}
return addrs
}

func checkPendingEntries(c *redis.ClusterClient, stream, consumer string, ids *[]string) {
for {
pe, err := c.XPending(context.Background(), stream, consumer).Result()
if err != nil {
log.Printf("failed to get pending entries: %s", err.Error())
time.Sleep(2 * time.Second)
continue
}

if pe.Count == 100 {
// wait for other consumers to read pending entries.
time.Sleep(20 * time.Second)
log.Printf("ACKing %d entries...\n", len(*ids))
c.XAck(context.Background(), stream, consumer, *ids...)
return
}
time.Sleep(2 * time.Second)
}
}

func redisStreamConsumer() error {
addrs := parseAddress()
pass := os.Getenv("REDIS_PASSWORD")
opts := redis.ClusterOptions{
Addrs: addrs,
Password: pass,
}
client := redis.NewClusterClient(&opts)
stream := os.Getenv("REDIS_STREAM_NAME")

consumerGroup := os.Getenv("REDIS_STREAM_CONSUMER_GROUP_NAME")
_, err := client.XGroupCreate(context.Background(), stream, consumerGroup, "0").Result()
if err != nil && !strings.Contains(err.Error(), "Consumer Group name already exists") {
return fmt.Errorf("failed to create consumer group: %s", err.Error())
}

pendingEntries := []string{}
go checkPendingEntries(client, stream, consumerGroup, &pendingEntries)

msgCount := 0
for {
length, err := client.XLen(context.Background(), stream).Result()
if err != nil {
return err
}
if length > 0 {
x := client.XReadGroup(context.Background(), &redis.XReadGroupArgs{
Group: consumerGroup,
Consumer: "damn-you",
Streams: []string{stream, ">"},
Count: 1,
Block: 0,
})
if x.Err() != nil {
return fmt.Errorf("failed to create consumer group to redis stream: %s", x.Err().Error())
}

res, err := x.Result()
if err != nil {
return fmt.Errorf("failed to read from redis stream: %v", err)
}

msgCount++
log.Printf("read %d messages from stream\n", msgCount)
pendingEntries = append(pendingEntries, res[0].Messages[0].ID)
}
time.Sleep(500 * time.Millisecond)
}
}

func redisStreamProducer() error {
addrs := parseAddress()
pass := os.Getenv("REDIS_PASSWORD")
opts := redis.ClusterOptions{
Addrs: addrs,
Password: pass,
}
client := redis.NewClusterClient(&opts)
stream := os.Getenv("REDIS_STREAM_NAME")

count, err := strconv.ParseInt(os.Getenv("NUM_MESSAGES"), 10, 32)
if err != nil {
return fmt.Errorf("number of items to write should be a number: %s", err.Error())
}

for i := 0; i < int(count); i++ {
x := client.XAdd(context.Background(), &redis.XAddArgs{
Stream: stream,
Values: map[string]interface{}{"key": "value"},
})
if x.Err() != nil {
return fmt.Errorf("failed to write to redis stream: %s", x.Err().Error())
}
}
return nil
}

func main() {
mode := ""
if len(os.Args) > 0 {
mode = os.Args[1]
}
if mode == "consumer" {
if err := redisStreamConsumer(); err != nil {
log.Fatalf("read from redis stream failed: %v\n", err)
}
log.Println("read from redis stream is successful")
} else if mode == "producer" {
if err := redisStreamProducer(); err != nil {
log.Fatalf("write to redis stream failed: %v\n", err)
}
log.Println("write to redis stream is successful")
} else {
log.Printf("unknown mode: %s\n", mode)
}
}
5 changes: 5 additions & 0 deletions e2e/images/redis-cluster/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
module github.com/kedacore/test-tools/e2e/images/redis-cluster

go 1.16

require github.com/go-redis/redis/v8 v8.11.4
Loading

0 comments on commit ff2aca8

Please sign in to comment.