Skip to content

Commit

Permalink
feat: Autoscale for Redis Streams Source (#726)
Browse files Browse the repository at this point in the history
Signed-off-by: Julie Vogelmani <[email protected]>
  • Loading branch information
juliev0 authored May 8, 2023
1 parent 4e1a56a commit c8aafc7
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 4 deletions.
1 change: 1 addition & 0 deletions docs/user-guide/reference/autoscaling.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ Numaflow is able to run with both `Horizontal Pod Autoscaling` and `Vertical Pod
Numaflow provides `0 - N` autoscaling capability out of the box, it's available for all the `UDF`, `Sink` and following `Source` vertices.

- Kafka
- Redis Streams

Numaflow autoscaling is enabled by default, there are some parameters can be tuned to achieve better results.

Expand Down
8 changes: 5 additions & 3 deletions docs/user-guide/sources/redis-source.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

A Redis Streams source is used to ingest messages from [Redis Streams](https://redis.io/docs/data-types/streams-tutorial/).

It is recommended to use this with Redis versions >= 7.0 (in order for autoscaling to work).

## Example:

```yaml
Expand All @@ -18,9 +20,9 @@ spec:
```

Please see [API](https://github.com/numaproj/numaflow/blob/main/docs/APIs.md#redisstreamssource) for details on how to optionally do the following:
- Define TLS
- Define username/password
- Connect to Redis Sentinel
* Define TLS
* Define username/password
* Connect to Redis Sentinel

# Published message
Incoming messages may have a single Key/Value pair or multiple. In either case, the published message will have Keys equivalent to the incoming Key(s) and Payload equivalent to the JSON serialization of the map of keys to values.
Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/numaflow/v1alpha1/vertex_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (v Vertex) Scalable() bool {
}
if v.IsASource() {
src := v.Spec.Source
if src.Kafka != nil {
if src.Kafka != nil || src.RedisStreams != nil {
return true
}
}
Expand Down
19 changes: 19 additions & 0 deletions pkg/shared/clients/redis/redis_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ func (br *RedisStreamsRead) Read(_ context.Context, count int64) ([]*isb.ReadMes
var messages = make([]*isb.ReadMessage, 0, count)
var xstreams []redis.XStream
var err error

// start with 0-0 if CheckBackLog is true
labels := map[string]string{"buffer": br.GetName()}
if br.Options.CheckBackLog {
Expand Down Expand Up @@ -154,6 +155,24 @@ func (br *RedisStreamsRead) Ack(_ context.Context, offsets []isb.Offset) []error

func (br *RedisStreamsRead) NoAck(_ context.Context, _ []isb.Offset) {}

func (br *RedisStreamsRead) Pending(_ context.Context) (int64, error) {
// try calling XINFO GROUPS <stream> and look for 'Lag' key.
// For Redis Server < v7.0, this always returns 0; therefore it's recommended to use >= v7.0

result := br.Client.XInfoGroups(RedisContext, br.Stream)
groups, err := result.Result()
if err != nil {
return isb.PendingNotAvailable, fmt.Errorf("error calling XInfoGroups: %v", err)
}
// find our ConsumerGroup
for _, group := range groups {
if group.Name == br.Group {
return group.Lag, nil
}
}
return isb.PendingNotAvailable, fmt.Errorf("ConsumerGroup %q not found in XInfoGroups result %+v", br.Group, groups)
}

// processXReadResult is used to process the results of XREADGROUP
func (br *RedisStreamsRead) processXReadResult(startIndex string, count int64) ([]redis.XStream, error) {
result := br.Client.XReadGroup(RedisContext, &redis.XReadGroupArgs{
Expand Down

0 comments on commit c8aafc7

Please sign in to comment.