Skip to content

feat: Redis streams as event source#1395

Closed
BulkBeing wants to merge 3 commits intoargoproj:masterfrom
BulkBeing:redis-streams
Closed

feat: Redis streams as event source#1395
BulkBeing wants to merge 3 commits intoargoproj:masterfrom
BulkBeing:redis-streams

Conversation

@BulkBeing
Copy link
Contributor

@BulkBeing BulkBeing commented Oct 25, 2021

Checklist:

closes: #1369

Tested with single and multiple streams on minikube cluster:

apiVersion: argoproj.io/v1alpha1
kind: EventSource
metadata:
  name: redis-stream
spec:
  redisStream:
    example:
      hostAddress: 192.168.0.106:6379
      db: 0
      streams:
        - FOO
        - BAR

With pod creation as sensor:

apiVersion: argoproj.io/v1alpha1
kind: Sensor
metadata:
  name: redis-stream-sensor
spec:
  template:
    serviceAccountName: argo-events-sa
  dependencies:
  - name: payload
    eventSourceName: redis-stream
    eventName: example
  triggers:
  - template:
      name: payload
      k8s:
        group: ""
        version: v1
        resource: pods
        operation: create
        source:
          resource:
            apiVersion: v1
            kind: Pod
            metadata:
              generateName: payload-
              labels:
                app: payload
            spec:
              containers:
              - name: hello
                image: alpine
                command: ["echo"]
                args: ["This is the message you sent me:\n", ""]
              restartPolicy: Never
        parameters:
          - src:
              dependencyName: payload
              dataKey: body.message
            dest: spec.containers.0.args.1

Logs from pods created by sensor:

➜ kubectl -n argo-events logs -f payload-fxlfp
This is the message you sent me:
 {"stream":"BAR","message_id":"1635165767378-0","values":{"BARId":"BAR-1","second-key":"second-val"}}

➜ kubectl -n argo-events logs -f payload-xkfzs
This is the message you sent me:
 {"stream":"FOO","message_id":"1635165767376-0","values":{"FOOId":"FOO-1","second-key":"second-val"}}

From event source pod:

➜ kubectl -n argo-events logs -f redis-stream-eventsource-nnr9k-566d47dd54-lfvc4
<--- snipped --->
{"level":"info","ts":1635165767.3778944,"logger":"argo-events.eventsource","caller":"redisstream/start.go:161","msg":"received a message","eventSourceName":"redis-stream","eventSourceType":"redisStream","eventName":"example","stream":"FOO","message_id":"1635165767376-0"}
{"level":"info","ts":1635165767.37798,"logger":"argo-events.eventsource","caller":"redisstream/start.go:172","msg":"dispatching the event on the data channel...","eventSourceName":"redis-stream","eventSourceType":"redisStream","eventName":"example","stream":"FOO"}
{"level":"info","ts":1635165767.382242,"logger":"argo-events.eventsource","caller":"eventsources/eventing.go:427","msg":"succeeded to publish an event","eventSourceName":"redis-stream","eventName":"example","eventSourceType":"redisStream","eventID":"65383936383734332d666337662d343762662d623035382d373134313863326565373865"}
{"level":"info","ts":1635165767.3863325,"logger":"argo-events.eventsource","caller":"redisstream/start.go:161","msg":"received a message","eventSourceName":"redis-stream","eventSourceType":"redisStream","eventName":"example","stream":"BAR","message_id":"1635165767378-0"}
{"level":"info","ts":1635165767.3864572,"logger":"argo-events.eventsource","caller":"redisstream/start.go:172","msg":"dispatching the event on the data channel...","eventSourceName":"redis-stream","eventSourceType":"redisStream","eventName":"example","stream":"BAR"}
{"level":"info","ts":1635165767.3950868,"logger":"argo-events.eventsource","caller":"eventsources/eventing.go:427","msg":"succeeded to publish an event","eventSourceName":"redis-stream","eventName":"example","eventSourceType":"redisStream","eventID":"62353137353933352d363135392d346662642d396430622d393532376631396531353636"}

@BulkBeing BulkBeing marked this pull request as ready for review October 25, 2021 13:33
Signed-off-by: Sreekanth <prsreekanth920@gmail.com>
Signed-off-by: Sreekanth <prsreekanth920@gmail.com>
@BulkBeing BulkBeing changed the title Support for Redis streams as event source feat: Redis streams as event source Oct 25, 2021
Copy link
Member

@whynowy whynowy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the implementation!

Could you also add some docs to https://github.com/argoproj/argo-events/tree/master/docs/eventsources/setup?

return errors.Wrap(err, "failed to marshal the event data, rejecting the event...")
}
log.With("channel", message.Channel).Info("dispatching th event on the data channel...")
log.With("channel", message.Channel).Info("dispatching the event on the data channel...")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!


// Create a common consumer group on all streams.
// Only proceeds if all the streams are already present
consumersGroup := "argo-events-cg"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Can we make consumerGroup a user provided field?
  2. consumerGroup for other pub/sub systems like kafka can be created automatically when the client specifies it, is it different for Redis Stream, that it needs to be created explicitly? If that's the case, I prefer not to create it in the code, but assume it's already in place.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made it an optional user input in latest commit.
Redis consumer group is similar in functionality to kafka but implementation is different (details: https://redis.io/topics/streams-intro, search for "kafka"). Consumer group has to be created explicitly from client side. Each user of a consumer group is auto-created on first use.

The main purpose of the consumer group is to split a single stream across its users. However, I'm only using one user in the consumer group. My only need for reading streams through consumer group rather than directly, is for resuming from last read up on pod restarts/recreations. This helps to reduce duplicate messages on pod restarts or when rescheduled on another node. Else, on each restart, messages has to be read from start of the stream.

This assumes only one "redis stream event source" pod will be running at any point in time.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some other questions:

  1. Does consumerGroup creation need extra permission?
  2. Is there a limit for the number of consumerGroup from Redis Stream server side? In another word, if there are plenty of consumerGroups existing, will that impact anything?
  3. Does consumerGroup need to be cleaned up if no clients use it?

Using a static default consumerGroup here will cause a problem - think about they are multiple event sources uses same stream, and none of them provides a consumerGroup in the spec, it will end up with all the event sources use same consumerGroup.

My suggestions are:

  1. If nothing is provided in the spec, automatically generate the consumerGroup based on the spec (event source name, event name, etc), also think about if those consumerGroups need to be cleaned up (questions above);
  2. Make consumerGroup mandatory in the spec, and assume it has been created, it will simplify the logic.

I personally prefers the latter. Thoughts?

Copy link
Contributor Author

@BulkBeing BulkBeing Oct 29, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When using consumer group, each read through a consumer group is a write operation, because Redis needs to update the last retrieved message id and the pending entries list(PEL) of that specific user in the consumer group. So it can only work with the master Redis instance and not replicas.

As far as I know, there are no impacts. https://redis.io/topics/streams-intro doesn't mention any, and couldn't find anything on a quick google search.

There is an option to destroy a consumer group https://redis.io/commands/XGROUP. It has an additional time cost for deleting entries inside the consumer group pending entries list. From docs: "The consumer group will be destroyed even if there are active consumers and pending messages, so make sure to call this command only when really needed". I think it's better if the user takes the responsibility of deleting them. Based on these, the options can be something like this:

apiVersion: argoproj.io/v1alpha1
kind: EventSource
metadata:
  name: redis-stream
spec:
  redisStream:
    example:
      hostAddress: redis.argo-events.svc:6379
      streams:
        - name: FOO
          consumerGroup: foo-argo-cg
          createConsumerGroup: true # default is false
          countPerRead: 50 # field is optional

Does this look good?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm. I think I'm ok with creating the consumer group in the code for now, let's not expose createConsumerGroup.

I think you still need to consider following:

  1. Auto generate consumer group if it's not provided in the spec;
  2. When you create the consumerGroup, set the offset to be the latest instead of beginning

Also, since Redis can not rebalance the messages automatically when a client dies, I think it's ok to only use one consumer to process the messages. Just one question, will it be a problem if there are multiple clients trying to connect with same consumer? This happens during a pod restart.

You will also need to add the event source type to https://github.com/argoproj/argo-events/blob/master/pkg/apis/common/common.go#L58.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the feedback. I will continue working on this from this Friday.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Auto generating consumergroup might be a bit tricky. Since its only purpose here is to resume from last read upon pod restarts, we will need a name that stays constant across pod restarts. The name of the redis stream event source deployment will be a good choice but I'm not sure what is the proper way to get it from the pod (prefix of the pod's hostname might work). Should I just make this option mandatory ?

If 2 pods use same consumergroup and same consumer name to read from a redis stream, only one of them will see the new entry. Tested this by

XGROUP CREATE mystream mygroup $

# Ran this in two clients. Both blocks with same consumergroup and consumer until new message arrives.
# Only one client sees the new message.
XREADGROUP BLOCK 0 GROUP mygroup Alice STREAMS mystream >

Event source is already added to RecreateStrategyEventSources https://github.com/BulkBeing/argo-events/blob/e9abeb3896e235e9e53b31c8ac0ef115e333e4c3/pkg/apis/common/common.go#L72

Signed-off-by: Sreekanth <prsreekanth920@gmail.com>
// CommonCG refers to the Redis stream consumer group that will be
// created on all redis streams. Messages are read through this group. Defaults to 'argo-events-cg'
// +optional
CommonCG string `json:"commonConsumerGroup,omitempty" protobuf:"bytes,6,opt,name=commonConsumerGroup"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not name it ConsumerGroup directly?

@github-actions
Copy link
Contributor

This Pull Request is stale because it has been open for 60 days with
no activity. It will be closed in 7 days if no further activity.

@github-actions github-actions bot added the stale label Jan 16, 2022
@github-actions github-actions bot closed this Jan 24, 2022
@serafdev
Copy link

serafdev commented Mar 2, 2022

It seems like this code is getting there, @whynowy do you know what is missing? @BulkBeing does your team use this internally?

We're also interested in using Redis Streams as Event Source

@BulkBeing
Copy link
Contributor Author

@serafdev I don't use it. I had made the requested changes, but didn't get any further responses from @whynowy

@serafdev
Copy link

serafdev commented Mar 4, 2022

Do you need a hand on this PR @BulkBeing? Or maybe a hand to review it @whynowy? Don't hesitate pinging me I can tag in

@whynowy whynowy reopened this Mar 4, 2022
@whynowy
Copy link
Member

whynowy commented Mar 4, 2022

Sorry, I may have missed the changes. Reopened it, @BulkBeing - do you mind fixing the conflicts?

@BulkBeing
Copy link
Contributor Author

@whynowy I'm facing issues when I rebase with master. The changes in the PR (commits and files changed) starts to show other peoples changes too (tried twice, may be something wrong with how I'm doing it). May I raise a new PR with same changes?

@whynowy
Copy link
Member

whynowy commented Mar 8, 2022

@whynowy I'm facing issues when I rebase with master. The changes in the PR (commits and files changed) starts to show other peoples changes too (tried twice, may be something wrong with how I'm doing it). May I raise a new PR with same changes?

sounds good to me.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

support for redis streams

3 participants