Skip to content

feat: Redis streams as event source#1744

Merged
whynowy merged 6 commits intoargoproj:masterfrom
BulkBeing:redis-streams-2
Mar 23, 2022
Merged

feat: Redis streams as event source#1744
whynowy merged 6 commits intoargoproj:masterfrom
BulkBeing:redis-streams-2

Conversation

@BulkBeing
Copy link
Copy Markdown
Contributor

Checklist:

closes: #1369
previous discussion: #1395

Messages from the stream are read using the Redis consumer group. The main reason for using consumer group is to resume from the last read upon pod restarts. A common consumer group (defaults to "argo-events-cg") is created (if not already exists) on all specified streams. When using consumer group, each read through a consumer group is a write operation, because Redis needs to update the last retrieved message id and the pending entries list(PEL) of that specific user in the consumer group. So it can only work with the master Redis instance and not replicas (https://redis.io/topics/streams-intro).

Redis stream event source expects all the streams to be present on the Redis server. This event source only starts pulling messages from the streams when all of the specified streams exist on the Redis server. On the initial setup, the consumer group is created on all the specified streams to start reading from the latest message (not necessarily the beginning of the stream). On subsequent setups (the consumer group already exists on the streams) or during pod restarts, messages are pulled from the last unacknowledged message in the stream.

The consumer group is never deleted automatically. If you want a completely fresh setup again, you must delete the consumer group from the streams.

Tested with single and multiple streams on minikube cluster:

apiVersion: argoproj.io/v1alpha1
kind: EventSource
metadata:
  name: redis-stream
spec:
  redisStream:
    example:
      hostAddress: 192.168.134.128:6379
      db: 0
      maxMsgCountPerRead: 50
      consumerGroup: argo-events-cg
      streams:
        - FOO
        - BAR

With pod creation as sensor:

apiVersion: argoproj.io/v1alpha1
kind: Sensor
metadata:
  name: redis-stream-sensor
spec:
  template:
    serviceAccountName: argo-events-sa
  dependencies:
  - name: payload
    eventSourceName: redis-stream
    eventName: example
  triggers:
  - template:
      name: payload
      k8s:
        group: ""
        version: v1
        resource: pods
        operation: create
        source:
          resource:
            apiVersion: v1
            kind: Pod
            metadata:
              generateName: payload-
              labels:
                app: payload
            spec:
              containers:
              - name: hello
                image: alpine
                command: ["echo"]
                args: ["This is the message you sent me:\n", ""]
              restartPolicy: Never
        parameters:
          - src:
              dependencyName: payload
              dataKey: values
            dest: spec.containers.0.args.1

Logs from pods created by sensor:

➜ kubectl -n argo-events logs -f payload-fxlfp
This is the message you sent me:
 {"key-1":"val-1","key-2":"val-2"}

From event source pod:

➜ kubectl -n argo-events logs -f redis-stream-eventsource-nnr9k-566d47dd54-lfvc4
<--- snipped --->
{"level":"info","ts":1647501137.1609244,"logger":"argo-events.eventsource","caller":"redisStream/start.go:188","msg":"received a message","eventSourceName":"redis-stream","eventSourceType":"redisStream","eventName":"example","stream":"FOO","message_id":"1647501137156-0"}
{"level":"info","ts":1647501137.161182,"logger":"argo-events.eventsource","caller":"redisStream/start.go:199","msg":"dispatching the event on the data channel...","eventSourceName":"redis-stream","eventSourceType":"redisStream","eventName":"example","stream":"FOO"}
{"level":"info","ts":1647501137.1682003,"logger":"argo-events.eventsource","caller":"eventsources/eventing.go:523","msg":"succeeded to publish an event","eventSourceName":"redis-stream","eventName":"example","eventSourceType":"redisStream","eventID":"35303362356435612d393963312d343865662d613335302d323737356137653838383138"}

From sensor pod:

➜ kubectl -n argo-events logs -f redis-stream-sensor-sensor-tf6bg-79466bddd8-9rljc
<--- snipped --->
{"level":"info","ts":1647501224.617144,"logger":"argo-events.sensor","caller":"standard-k8s/standard-k8s.go:159","msg":"creating the object...","sensorName":"redis-stream-sensor","triggerName":"payload","triggerType":"Kubernetes"}
{"level":"info","ts":1647501224.6328142,"logger":"argo-events.sensor","caller":"sensors/listener.go:415","msg":"successfully processed the trigger","sensorName":"redis-stream-sensor","triggerName":"payload","triggerType":"Kubernetes","triggeredBy":["payload"],"triggeredByEvents":["37376230353662622d656631322d346665342d396465382d366462636362356661623366"]}

Signed-off-by: Sreekanth <prsreekanth920@gmail.com>
Signed-off-by: Sreekanth <prsreekanth920@gmail.com>
Signed-off-by: Sreekanth <prsreekanth920@gmail.com>
@BulkBeing
Copy link
Copy Markdown
Contributor Author

BulkBeing commented Mar 17, 2022

@whynowy The golangci-lint run --fix --verbose --concurrency 4 --timeout 5m finished in 56 seconds on my local VM. Is it possible for you to re-trigger it ?

@whynowy
Copy link
Copy Markdown
Member

whynowy commented Mar 21, 2022

@whynowy The golangci-lint run --fix --verbose --concurrency 4 --timeout 5m finished in 56 seconds on my local VM. Is it possible for you to re-trigger it ?

Ignore the failure in the CI.

Copy link
Copy Markdown
Member

@whynowy whynowy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM overall, couple of enhancements can be done with following up PRs. I'll merge it after fixing the minor log issue.

if err.Error() != "BUSYGROUP Consumer Group name already exists" {
return errors.Wrapf(err, "creating consumer group %s for stream %s on host %s for event source %s", consumersGroup, stream, redisEventSource.HostAddress, el.GetEventName())
}
log.Infof("Consumer group %s already exists", stream)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe you want to log the consumer group together with stream?

el.Metrics.EventProcessingFailed(el.GetEventSourceName(), el.GetEventName())
continue
}
if err := client.XAck(ctx, entry.Stream, consumersGroup, message.ID).Err(); err != nil {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since doing a batch read, this could be enhanced to XAck a list of message.ID.

Signed-off-by: Sreekanth <prsreekanth920@gmail.com>
Signed-off-by: Sreekanth <prsreekanth920@gmail.com>
@BulkBeing
Copy link
Copy Markdown
Contributor Author

@whynowy Made the suggested changes and tested the functionality on minikube.

if err == redis.Nil {
continue
}
return errors.Wrapf(err, "reading streams %s using XREADGROUP", strings.Join(redisEventSource.Streams, ", "))
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we just log the error and continue? For example, if there's a network connection issue, and it fails to read, we should let it go and retry right?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense. Made the change.

Signed-off-by: Sreekanth <prsreekanth920@gmail.com>
@whynowy whynowy merged commit 93cd06d into argoproj:master Mar 23, 2022
@whynowy
Copy link
Copy Markdown
Member

whynowy commented Mar 23, 2022

@BulkBeing - thanks for getting this added!

juliev0 pushed a commit to juliev0/argo-events that referenced this pull request Mar 29, 2022
* Support for Redis streams as event source

Signed-off-by: Sreekanth <prsreekanth920@gmail.com>
juliev0 pushed a commit to juliev0/argo-events that referenced this pull request Mar 29, 2022
* Support for Redis streams as event source

Signed-off-by: Sreekanth <prsreekanth920@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

support for redis streams

2 participants