Skip to content

Commit

Permalink
feat: Unique Kafka Consumer GroupID (#251)
Browse files Browse the repository at this point in the history
  • Loading branch information
sarabala1979 authored Aug 19, 2021
1 parent 9800b44 commit 53a319d
Show file tree
Hide file tree
Showing 12 changed files with 41 additions and 15 deletions.
2 changes: 1 addition & 1 deletion examples/git/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@ module github.com/argoproj-labs/argo-dataflow/examples/git

go 1.16

require github.com/argoproj-labs/argo-dataflow v0.0.90
require github.com/argoproj-labs/argo-dataflow v0.0.93
4 changes: 2 additions & 2 deletions examples/git/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRF
github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883/go.mod h1:rCTlJbsFo29Kk6CurOXKm700vrz8f0KW0JNfpkRJY/8=
github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYUyUczH0OGQWaF5ceTx0UBShxjsH6f8oGKYe2c=
github.com/antonmedv/expr v1.8.9/go.mod h1:5qsM3oLGDND7sDmQGDXHkYfkjYMUX14qsgqmHhwGEk8=
github.com/argoproj-labs/argo-dataflow v0.0.90 h1:wbCzUSi3bZxeQudGCQi6Bxjv54bkxIXvPsLUoae3DZI=
github.com/argoproj-labs/argo-dataflow v0.0.90/go.mod h1:HOdmu21+Xx59R28S8+e+wMx+/ApHWNtglPqLvgLRI2E=
github.com/argoproj-labs/argo-dataflow v0.0.93 h1:yx6rscLQua7/QfVa80TkfQ6utkSdxOvqqWvFMdeOp74=
github.com/argoproj-labs/argo-dataflow v0.0.93/go.mod h1:HOdmu21+Xx59R28S8+e+wMx+/ApHWNtglPqLvgLRI2E=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878/go.mod h1:3AMJUQhVx52RsWOnlkpikZr01T/yAVN2gn0861vByNg=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs=
Expand Down
2 changes: 1 addition & 1 deletion runner/sidecar/source/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func New(ctx context.Context, secretInterface corev1.SecretInterface, clusterNam

var offset string
remark := fmt.Sprintf("%s.%s.%s.%s.sources.%s", clusterName, namespace, pipelineName, stepName, sourceName)
uid := sharedutil.MustHash(remark)
uid := sharedutil.GetSourceUID(clusterName, namespace, pipelineName, stepName, sourceName)
offset, err = getOffsetFromDB(ctx, db, uid)
if err != nil {
if err == sql.ErrNoRows {
Expand Down
11 changes: 5 additions & 6 deletions runner/sidecar/source/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type kafkaSource struct {
topic string
}

func New(ctx context.Context, secretInterface corev1.SecretInterface, clusterName, namespace, pipelineName, stepName, sourceName string, x dfv1.KafkaSource, f source.Func) (source.Interface, error) {
func New(ctx context.Context, secretInterface corev1.SecretInterface, consumerGroupID, sourceName string, x dfv1.KafkaSource, f source.Func) (source.Interface, error) {
config, err := kafka.GetConfig(ctx, secretInterface, x.Kafka.KafkaConfig)
if err != nil {
return nil, err
Expand All @@ -37,10 +37,9 @@ func New(ctx context.Context, secretInterface corev1.SecretInterface, clusterNam
if x.StartOffset == "First" {
config.Consumer.Offsets.Initial = sarama.OffsetOldest
}
// This ID can be up to 255 characters in length, and can include the following characters: a-z, A-Z, 0-9, . (dot), _ (underscore), and - (dash).
groupID := sharedutil.MustHash(fmt.Sprintf("%s.%s.%s.%s.sources.%s", clusterName, namespace, pipelineName, stepName, sourceName))
logger.Info("Kafka consumer group ID", "groupID", groupID)
consumerGroup, err := sarama.NewConsumerGroup(x.Brokers, groupID, config)

logger.Info("Kafka consumer group ID", "consumerGroupID", consumerGroupID)
consumerGroup, err := sarama.NewConsumerGroup(x.Brokers, consumerGroupID, config)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -71,7 +70,7 @@ func New(ctx context.Context, secretInterface corev1.SecretInterface, clusterNam
client,
consumerGroup,
adminClient,
groupID,
consumerGroupID,
x.Topic,
}, nil
}
Expand Down
2 changes: 1 addition & 1 deletion runner/sidecar/source/stan/stan.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func New(ctx context.Context, secretInterface corev1.SecretInterface, clusterNam

// https://docs.nats.io/developing-with-nats-streaming/queues
var sub stan.Subscription
queueName := sharedutil.MustHash(fmt.Sprintf("%s.%s.%s.%s.sources.%s", clusterName, namespace, pipelineName, stepName, sourceName))
queueName := sharedutil.GetSourceUID(clusterName, namespace, pipelineName, stepName, sourceName)
subFunc := func() (stan.Subscription, error) {
logger.Info("subscribing to STAN queue", "source", sourceName, "queueName", queueName)
sub, err := conn.QueueSubscribe(x.Subject, queueName, func(msg *stan.Msg) {
Expand Down
3 changes: 2 additions & 1 deletion runner/sidecar/sources.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ func connectSources(ctx context.Context, toMain func(context.Context, []byte) er
sources[sourceName] = y
}
} else if x := s.Kafka; x != nil {
if y, err := kafkasource.New(ctx, secretInterface, clusterName, namespace, pipelineName, stepName, sourceName, *x, f); err != nil {
groupID := sharedutil.GetSourceUID(clusterName, namespace, pipelineName, stepName, sourceName)
if y, err := kafkasource.New(ctx, secretInterface, groupID, sourceName, *x, f); err != nil {
return err
} else {
sources[sourceName] = y
Expand Down
2 changes: 1 addition & 1 deletion runtimes/golang1-16/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@ module github.com/argoproj-labs/argo-dataflow/runtimes/golang1-16

go 1.16

require github.com/argoproj-labs/argo-dataflow v0.0.90
require github.com/argoproj-labs/argo-dataflow v0.0.93
4 changes: 2 additions & 2 deletions runtimes/golang1-16/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRF
github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883/go.mod h1:rCTlJbsFo29Kk6CurOXKm700vrz8f0KW0JNfpkRJY/8=
github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYUyUczH0OGQWaF5ceTx0UBShxjsH6f8oGKYe2c=
github.com/antonmedv/expr v1.8.9/go.mod h1:5qsM3oLGDND7sDmQGDXHkYfkjYMUX14qsgqmHhwGEk8=
github.com/argoproj-labs/argo-dataflow v0.0.90 h1:wbCzUSi3bZxeQudGCQi6Bxjv54bkxIXvPsLUoae3DZI=
github.com/argoproj-labs/argo-dataflow v0.0.90/go.mod h1:HOdmu21+Xx59R28S8+e+wMx+/ApHWNtglPqLvgLRI2E=
github.com/argoproj-labs/argo-dataflow v0.0.93 h1:yx6rscLQua7/QfVa80TkfQ6utkSdxOvqqWvFMdeOp74=
github.com/argoproj-labs/argo-dataflow v0.0.93/go.mod h1:HOdmu21+Xx59R28S8+e+wMx+/ApHWNtglPqLvgLRI2E=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878/go.mod h1:3AMJUQhVx52RsWOnlkpikZr01T/yAVN2gn0861vByNg=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs=
Expand Down
1 change: 1 addition & 0 deletions shared/util/hash.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,4 @@ func MustHash(v interface{}) string {
return MustHash([]byte(MustJSON(v)))
}
}

1 change: 1 addition & 0 deletions shared/util/hash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ import (
func TestMustHash(t *testing.T) {
assert.Equal(t, "2c26b46b68ffc68ff99b453c1d30413413422d706483bfa0f98a5e886266e7ae", MustHash([]byte("foo")))
}

12 changes: 12 additions & 0 deletions shared/util/uid.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package util

import (
"fmt"

"k8s.io/utils/strings"
)

func GetSourceUID(clusterName, namespace, pipelineName, stepName, sourceName string) string {
hash := MustHash(fmt.Sprintf("%s.%s.%s.%s.sources.%s", clusterName, namespace, pipelineName, stepName, sourceName))
return fmt.Sprintf("dataflow-%s-%s-%s-%s-%s-%s", strings.ShortenString(clusterName, 3), strings.ShortenString(namespace, 3), strings.ShortenString(pipelineName, 3), strings.ShortenString(stepName, 3), strings.ShortenString(sourceName, 3), hash)
}
12 changes: 12 additions & 0 deletions shared/util/uid_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package util

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestGetSourceUID(t *testing.T) {
uniqueID := GetSourceUID("cluster", "default", "pipeline", "stepName", "source")
assert.Equal(t, "dataflow-clu-def-pip-ste-sou-7c07c91b03ebf978f5dda8b77130662e016493600b8ca4e6ffe12ec5183e3d25", uniqueID)
}

0 comments on commit 53a319d

Please sign in to comment.