From de968fa10dc2783c3cb2315910bb6e76f7a58a5c Mon Sep 17 00:00:00 2001 From: mauza Date: Fri, 11 Nov 2022 14:23:25 -0700 Subject: [PATCH 1/7] added async publisher --- pkg/kafka/publisher_async.go | 152 +++++++++++++++++++++++++++++++++++ 1 file changed, 152 insertions(+) create mode 100644 pkg/kafka/publisher_async.go diff --git a/pkg/kafka/publisher_async.go b/pkg/kafka/publisher_async.go new file mode 100644 index 0000000..a77b801 --- /dev/null +++ b/pkg/kafka/publisher_async.go @@ -0,0 +1,152 @@ +package kafka + +import ( + "fmt" + "go.opentelemetry.io/contrib/instrumentation/github.com/Shopify/sarama/otelsarama" + "time" + + "github.com/Shopify/sarama" + "github.com/pkg/errors" + + "github.com/ThreeDotsLabs/watermill" + "github.com/ThreeDotsLabs/watermill/message" +) + +type PublisherAsync struct { + config PublisherConfig + producer sarama.AsyncProducer + logger watermill.LoggerAdapter + + closed bool +} + +// NewAsyncPublisher creates a new Kafka PublisherAsync. +func NewAsyncPublisher( + config PublisherConfig, + logger watermill.LoggerAdapter, +) (*PublisherAsync, error) { + config.setDefaults() + + if err := config.Validate(); err != nil { + return nil, err + } + + if logger == nil { + logger = watermill.NopLogger{} + } + + producer, err := sarama.NewAsyncProducer(config.Brokers, config.OverwriteSaramaConfig) + if err != nil { + return nil, errors.Wrap(err, "cannot create Kafka producer") + } + + if config.OTELEnabled { + producer = otelsarama.WrapAsyncProducer(config.OverwriteSaramaConfig, producer) + } + + // Handle errors and successes + go func() { + for msg := range producer.Successes() { + logFields := make(watermill.LogFields, 1) + logFields["topic"] = msg.Topic + logger.Debug(fmt.Sprintf("publish success: %s - %d - %d", msg.Topic, msg.Partition, msg.Offset), logFields) + } + }() + go func() { + for msg := range producer.Errors() { + logFields := make(watermill.LogFields, 1) + logFields["topic"] = msg.Msg.Topic + logger.Error(msg.Error(), msg.Err, logFields) + } + }() + + return &PublisherAsync{ + config: config, + producer: producer, + logger: logger, + }, nil +} + +type PublisherAsyncConfig struct { + // Kafka brokers list. + Brokers []string + + // Marshaler is used to marshal messages from Watermill format into Kafka format. + Marshaler Marshaler + + // OverwriteSaramaConfig holds additional sarama settings. + OverwriteSaramaConfig *sarama.Config + + // If true then each sent message will be wrapped with Opentelemetry tracing, provided by otelsarama. + OTELEnabled bool +} + +func (c *PublisherAsyncConfig) setDefaults() { + if c.OverwriteSaramaConfig == nil { + c.OverwriteSaramaConfig = DefaultSaramaAsyncPublisherConfig() + } +} + +func (c PublisherAsyncConfig) Validate() error { + if len(c.Brokers) == 0 { + return errors.New("missing brokers") + } + if c.Marshaler == nil { + return errors.New("missing marshaler") + } + + return nil +} + +func DefaultSaramaAsyncPublisherConfig() *sarama.Config { + config := sarama.NewConfig() + + config.Producer.Retry.Max = 10 + config.Producer.Return.Successes = true + config.Producer.Return.Errors = true + config.Version = sarama.V1_0_0_0 + config.Metadata.Retry.Backoff = time.Second * 2 + config.ClientID = "watermill" + + return config +} + +// Publish publishes message to Kafka. +// +// Publish is blocking and wait for ack from Kafka. +// When one of messages delivery fails - function is interrupted. +func (p *PublisherAsync) Publish(topic string, msgs ...*message.Message) error { + if p.closed { + return errors.New("publisher closed") + } + + logFields := make(watermill.LogFields, 2) + logFields["topic"] = topic + + for _, msg := range msgs { + logFields["message_uuid"] = msg.UUID + p.logger.Trace("Sending message to Kafka", logFields) + + kafkaMsg, err := p.config.Marshaler.Marshal(topic, msg) + if err != nil { + return errors.Wrapf(err, "cannot marshal message %s", msg.UUID) + } + + p.producer.Input() <- kafkaMsg + } + + return nil +} + +func (p *PublisherAsync) Close() error { + if p.closed { + return nil + } + p.closed = true + + if err := p.producer.Close(); err != nil { + return errors.Wrap(err, "cannot close Kafka producer") + } + + return nil +} From c01c6259ce9131c9f8c02e517cd975b3a79336ca Mon Sep 17 00:00:00 2001 From: mauza Date: Mon, 14 Nov 2022 15:18:29 -0700 Subject: [PATCH 2/7] added errors and successes channels --- pkg/kafka/publisher_async.go | 33 ++++++++++----------------------- 1 file changed, 10 insertions(+), 23 deletions(-) diff --git a/pkg/kafka/publisher_async.go b/pkg/kafka/publisher_async.go index a77b801..9049f7f 100644 --- a/pkg/kafka/publisher_async.go +++ b/pkg/kafka/publisher_async.go @@ -1,7 +1,6 @@ package kafka import ( - "fmt" "go.opentelemetry.io/contrib/instrumentation/github.com/Shopify/sarama/otelsarama" "time" @@ -13,9 +12,11 @@ import ( ) type PublisherAsync struct { - config PublisherConfig - producer sarama.AsyncProducer - logger watermill.LoggerAdapter + config PublisherConfig + producer sarama.AsyncProducer + logger watermill.LoggerAdapter + errorsChan <-chan *sarama.ProducerError + successesChan <-chan *sarama.ProducerMessage closed bool } @@ -44,26 +45,12 @@ func NewAsyncPublisher( producer = otelsarama.WrapAsyncProducer(config.OverwriteSaramaConfig, producer) } - // Handle errors and successes - go func() { - for msg := range producer.Successes() { - logFields := make(watermill.LogFields, 1) - logFields["topic"] = msg.Topic - logger.Debug(fmt.Sprintf("publish success: %s - %d - %d", msg.Topic, msg.Partition, msg.Offset), logFields) - } - }() - go func() { - for msg := range producer.Errors() { - logFields := make(watermill.LogFields, 1) - logFields["topic"] = msg.Msg.Topic - logger.Error(msg.Error(), msg.Err, logFields) - } - }() - return &PublisherAsync{ - config: config, - producer: producer, - logger: logger, + config: config, + producer: producer, + logger: logger, + errorsChan: producer.Errors(), + successesChan: producer.Successes(), }, nil } From b9404b44921c99485c630ef2fa440a3d4450d667 Mon Sep 17 00:00:00 2001 From: mauza Date: Mon, 14 Nov 2022 15:24:41 -0700 Subject: [PATCH 3/7] clean up producer config creation --- pkg/kafka/publisher_async.go | 52 +++++------------------------------- 1 file changed, 6 insertions(+), 46 deletions(-) diff --git a/pkg/kafka/publisher_async.go b/pkg/kafka/publisher_async.go index 9049f7f..153a05f 100644 --- a/pkg/kafka/publisher_async.go +++ b/pkg/kafka/publisher_async.go @@ -1,11 +1,9 @@ package kafka import ( - "go.opentelemetry.io/contrib/instrumentation/github.com/Shopify/sarama/otelsarama" - "time" - "github.com/Shopify/sarama" "github.com/pkg/errors" + "go.opentelemetry.io/contrib/instrumentation/github.com/Shopify/sarama/otelsarama" "github.com/ThreeDotsLabs/watermill" "github.com/ThreeDotsLabs/watermill/message" @@ -26,7 +24,7 @@ func NewAsyncPublisher( config PublisherConfig, logger watermill.LoggerAdapter, ) (*PublisherAsync, error) { - config.setDefaults() + config.setAsyncDefaults() if err := config.Validate(); err != nil { return nil, err @@ -54,48 +52,10 @@ func NewAsyncPublisher( }, nil } -type PublisherAsyncConfig struct { - // Kafka brokers list. - Brokers []string - - // Marshaler is used to marshal messages from Watermill format into Kafka format. - Marshaler Marshaler - - // OverwriteSaramaConfig holds additional sarama settings. - OverwriteSaramaConfig *sarama.Config - - // If true then each sent message will be wrapped with Opentelemetry tracing, provided by otelsarama. - OTELEnabled bool -} - -func (c *PublisherAsyncConfig) setDefaults() { - if c.OverwriteSaramaConfig == nil { - c.OverwriteSaramaConfig = DefaultSaramaAsyncPublisherConfig() - } -} - -func (c PublisherAsyncConfig) Validate() error { - if len(c.Brokers) == 0 { - return errors.New("missing brokers") - } - if c.Marshaler == nil { - return errors.New("missing marshaler") - } - - return nil -} - -func DefaultSaramaAsyncPublisherConfig() *sarama.Config { - config := sarama.NewConfig() - - config.Producer.Retry.Max = 10 - config.Producer.Return.Successes = true - config.Producer.Return.Errors = true - config.Version = sarama.V1_0_0_0 - config.Metadata.Retry.Backoff = time.Second * 2 - config.ClientID = "watermill" - - return config +func (c *PublisherConfig) setAsyncDefaults() { + c.setDefaults() + c.OverwriteSaramaConfig.Producer.Return.Successes = true + c.OverwriteSaramaConfig.Producer.Return.Errors = true } // Publish publishes message to Kafka. From 4677fb592a6b3916f7fdb7db91a59cffb25e5346 Mon Sep 17 00:00:00 2001 From: mauza Date: Mon, 14 Nov 2022 15:58:09 -0700 Subject: [PATCH 4/7] added access to channels --- pkg/kafka/publisher.go | 4 ++-- pkg/kafka/publisher_async.go | 8 ++++++++ 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/pkg/kafka/publisher.go b/pkg/kafka/publisher.go index cd08736..3f3d807 100644 --- a/pkg/kafka/publisher.go +++ b/pkg/kafka/publisher.go @@ -66,7 +66,7 @@ type PublisherConfig struct { func (c *PublisherConfig) setDefaults() { if c.OverwriteSaramaConfig == nil { - c.OverwriteSaramaConfig = DefaultSaramaSyncPublisherConfig() + c.OverwriteSaramaConfig = DefaultSaramaPublisherConfig() } } @@ -81,7 +81,7 @@ func (c PublisherConfig) Validate() error { return nil } -func DefaultSaramaSyncPublisherConfig() *sarama.Config { +func DefaultSaramaPublisherConfig() *sarama.Config { config := sarama.NewConfig() config.Producer.Retry.Max = 10 diff --git a/pkg/kafka/publisher_async.go b/pkg/kafka/publisher_async.go index 153a05f..e244379 100644 --- a/pkg/kafka/publisher_async.go +++ b/pkg/kafka/publisher_async.go @@ -97,3 +97,11 @@ func (p *PublisherAsync) Close() error { return nil } + +func (p *PublisherAsync) Errors() <-chan *sarama.ProducerError { + return p.errorsChan +} + +func (p *PublisherAsync) Successes() <-chan *sarama.ProducerMessage { + return p.successesChan +} From 92bb8c4a52b351a2244af8f145957514d1890ae7 Mon Sep 17 00:00:00 2001 From: mauza Date: Mon, 14 Nov 2022 16:43:42 -0700 Subject: [PATCH 5/7] changed config back --- pkg/kafka/publisher.go | 4 ++-- pkg/kafka/publisher_async.go | 20 +++++++++++++++++--- 2 files changed, 19 insertions(+), 5 deletions(-) diff --git a/pkg/kafka/publisher.go b/pkg/kafka/publisher.go index 3f3d807..cd08736 100644 --- a/pkg/kafka/publisher.go +++ b/pkg/kafka/publisher.go @@ -66,7 +66,7 @@ type PublisherConfig struct { func (c *PublisherConfig) setDefaults() { if c.OverwriteSaramaConfig == nil { - c.OverwriteSaramaConfig = DefaultSaramaPublisherConfig() + c.OverwriteSaramaConfig = DefaultSaramaSyncPublisherConfig() } } @@ -81,7 +81,7 @@ func (c PublisherConfig) Validate() error { return nil } -func DefaultSaramaPublisherConfig() *sarama.Config { +func DefaultSaramaSyncPublisherConfig() *sarama.Config { config := sarama.NewConfig() config.Producer.Retry.Max = 10 diff --git a/pkg/kafka/publisher_async.go b/pkg/kafka/publisher_async.go index e244379..52263cd 100644 --- a/pkg/kafka/publisher_async.go +++ b/pkg/kafka/publisher_async.go @@ -4,6 +4,7 @@ import ( "github.com/Shopify/sarama" "github.com/pkg/errors" "go.opentelemetry.io/contrib/instrumentation/github.com/Shopify/sarama/otelsarama" + "time" "github.com/ThreeDotsLabs/watermill" "github.com/ThreeDotsLabs/watermill/message" @@ -53,9 +54,22 @@ func NewAsyncPublisher( } func (c *PublisherConfig) setAsyncDefaults() { - c.setDefaults() - c.OverwriteSaramaConfig.Producer.Return.Successes = true - c.OverwriteSaramaConfig.Producer.Return.Errors = true + if c.OverwriteSaramaConfig == nil { + c.OverwriteSaramaConfig = DefaultSaramaAsyncPublisherConfig() + } +} + +func DefaultSaramaAsyncPublisherConfig() *sarama.Config { + config := sarama.NewConfig() + + config.Producer.Retry.Max = 10 + config.Producer.Return.Successes = true + config.Producer.Return.Errors = true + config.Version = sarama.V1_0_0_0 + config.Metadata.Retry.Backoff = time.Second * 2 + config.ClientID = "watermill" + + return config } // Publish publishes message to Kafka. From d9109c8ee02d5e88fe7d6977e0693e3e6e9318ea Mon Sep 17 00:00:00 2001 From: mauza Date: Tue, 15 Nov 2022 13:33:22 -0700 Subject: [PATCH 6/7] added some tests --- pkg/kafka/publisher_async.go | 6 +- pkg/kafka/pubsub_async_test.go | 200 +++++++++++++++++++++++++++++++++ 2 files changed, 203 insertions(+), 3 deletions(-) create mode 100644 pkg/kafka/pubsub_async_test.go diff --git a/pkg/kafka/publisher_async.go b/pkg/kafka/publisher_async.go index 52263cd..bb2edd5 100644 --- a/pkg/kafka/publisher_async.go +++ b/pkg/kafka/publisher_async.go @@ -72,10 +72,10 @@ func DefaultSaramaAsyncPublisherConfig() *sarama.Config { return config } -// Publish publishes message to Kafka. +// Publish publishes message(s) to Kafka. // -// Publish is blocking and wait for ack from Kafka. -// When one of messages delivery fails - function is interrupted. +// Publish is not blocking +// Make sure you are reading from Errors and Successes channels func (p *PublisherAsync) Publish(topic string, msgs ...*message.Message) error { if p.closed { return errors.New("publisher closed") diff --git a/pkg/kafka/pubsub_async_test.go b/pkg/kafka/pubsub_async_test.go new file mode 100644 index 0000000..78e5322 --- /dev/null +++ b/pkg/kafka/pubsub_async_test.go @@ -0,0 +1,200 @@ +package kafka_test + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/Shopify/sarama" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/ThreeDotsLabs/watermill" + "github.com/ThreeDotsLabs/watermill-kafka/v2/pkg/kafka" + "github.com/ThreeDotsLabs/watermill/message" + "github.com/ThreeDotsLabs/watermill/message/subscriber" + "github.com/ThreeDotsLabs/watermill/pubsub/tests" +) + +func newAsyncPubSub(t *testing.T, marshaler kafka.MarshalerUnmarshaler, consumerGroup string) (*kafka.PublisherAsync, *kafka.Subscriber) { + logger := watermill.NewStdLogger(true, true) + + var err error + var publisher *kafka.PublisherAsync + + retriesLeft := 5 + for { + publishConfig := kafka.DefaultSaramaAsyncPublisherConfig() + publishConfig.Producer.Return.Successes = false + publishConfig.Producer.Return.Errors = false + publisher, err = kafka.NewAsyncPublisher(kafka.PublisherConfig{ + Brokers: kafkaBrokers(), + Marshaler: marshaler, + OverwriteSaramaConfig: publishConfig, + }, logger) + if err == nil || retriesLeft == 0 { + break + } + + retriesLeft-- + fmt.Printf("cannot create kafka Publisher: %s, retrying (%d retries left)", err, retriesLeft) + time.Sleep(time.Second * 2) + } + require.NoError(t, err) + + saramaConfig := kafka.DefaultSaramaSubscriberConfig() + saramaConfig.Consumer.Offsets.Initial = sarama.OffsetOldest + + saramaConfig.Admin.Timeout = time.Second * 30 + saramaConfig.Producer.RequiredAcks = sarama.WaitForAll + saramaConfig.ChannelBufferSize = 10240 + saramaConfig.Consumer.Group.Heartbeat.Interval = time.Millisecond * 500 + saramaConfig.Consumer.Group.Rebalance.Timeout = time.Second * 3 + + var subscriber *kafka.Subscriber + + retriesLeft = 5 + for { + subscriber, err = kafka.NewSubscriber( + kafka.SubscriberConfig{ + Brokers: kafkaBrokers(), + Unmarshaler: marshaler, + OverwriteSaramaConfig: saramaConfig, + ConsumerGroup: consumerGroup, + InitializeTopicDetails: &sarama.TopicDetail{ + NumPartitions: 8, + ReplicationFactor: 1, + }, + }, + logger, + ) + if err == nil || retriesLeft == 0 { + break + } + + retriesLeft-- + fmt.Printf("cannot create kafka Subscriber: %s, retrying (%d retries left)", err, retriesLeft) + time.Sleep(time.Second * 2) + } + + require.NoError(t, err) + + return publisher, subscriber +} + +func createPubSubWithConsumerGrupAsync(t *testing.T, consumerGroup string) (message.Publisher, message.Subscriber) { + return newAsyncPubSub(t, kafka.DefaultMarshaler{}, consumerGroup) +} + +func createPubSubAsync(t *testing.T) (message.Publisher, message.Subscriber) { + return createPubSubWithConsumerGrupAsync(t, "test") +} + +func createPartitionedPubSubAsync(t *testing.T) (message.Publisher, message.Subscriber) { + return newAsyncPubSub(t, kafka.NewWithPartitioningMarshaler(generatePartitionKey), "test") +} + +func createNoGroupPubSubAsync(t *testing.T) (message.Publisher, message.Subscriber) { + return newPubSub(t, kafka.DefaultMarshaler{}, "") +} + +func TestPublishSubscribeAsync(t *testing.T) { + features := tests.Features{ + ConsumerGroups: true, + ExactlyOnceDelivery: false, + GuaranteedOrder: false, + Persistent: true, + } + + tests.TestPubSub( + t, + features, + createPubSubAsync, + createPubSubWithConsumerGrupAsync, + ) +} + +func TestPublishSubscribeAsync_ordered(t *testing.T) { + if testing.Short() { + t.Skip("skipping long tests") + } + + tests.TestPubSub( + t, + tests.Features{ + ConsumerGroups: true, + ExactlyOnceDelivery: false, + GuaranteedOrder: true, + Persistent: true, + }, + createPartitionedPubSubAsync, + createPubSubWithConsumerGrupAsync, + ) +} + +func TestNoGroupSubscriberAsync(t *testing.T) { + if testing.Short() { + t.Skip("skipping long tests") + } + + tests.TestPubSub( + t, + tests.Features{ + ConsumerGroups: false, + ExactlyOnceDelivery: false, + GuaranteedOrder: false, + Persistent: true, + NewSubscriberReceivesOldMessages: true, + }, + createNoGroupPubSubAsync, + nil, + ) +} + +func TestCtxValuesAsync(t *testing.T) { + pub, sub := newAsyncPubSub(t, kafka.DefaultMarshaler{}, "") + topicName := "topic_" + watermill.NewUUID() + + var messagesToPublish []*message.Message + + for i := 0; i < 20; i++ { + id := watermill.NewUUID() + messagesToPublish = append(messagesToPublish, message.NewMessage(id, nil)) + } + err := pub.Publish(topicName, messagesToPublish...) + require.NoError(t, err, "cannot publish message") + + messages, err := sub.Subscribe(context.Background(), topicName) + require.NoError(t, err) + + receivedMessages, all := subscriber.BulkReadWithDeduplication(messages, len(messagesToPublish), time.Second*10) + require.True(t, all) + + expectedPartitionsOffsets := map[int32]int64{} + for _, msg := range receivedMessages { + partition, ok := kafka.MessagePartitionFromCtx(msg.Context()) + assert.True(t, ok) + + messagePartitionOffset, ok := kafka.MessagePartitionOffsetFromCtx(msg.Context()) + assert.True(t, ok) + + kafkaMsgTimestamp, ok := kafka.MessageTimestampFromCtx(msg.Context()) + assert.True(t, ok) + assert.NotZero(t, kafkaMsgTimestamp) + + if expectedPartitionsOffsets[partition] <= messagePartitionOffset { + // kafka partition offset is offset of the last message + 1 + expectedPartitionsOffsets[partition] = messagePartitionOffset + 1 + } + } + assert.NotEmpty(t, expectedPartitionsOffsets) + + offsets, err := sub.PartitionOffset(topicName) + require.NoError(t, err) + assert.NotEmpty(t, offsets) + + assert.EqualValues(t, expectedPartitionsOffsets, offsets) + + require.NoError(t, pub.Close()) +} From ce5c0d13574143d0fe0438281f12f09efa8c20ca Mon Sep 17 00:00:00 2001 From: mauza Date: Tue, 15 Nov 2022 14:06:17 -0700 Subject: [PATCH 7/7] changed module --- go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go.mod b/go.mod index 504a6ba..4e504d9 100644 --- a/go.mod +++ b/go.mod @@ -1,4 +1,4 @@ -module github.com/ThreeDotsLabs/watermill-kafka/v2 +module weavelab.xyz/watermill-kafka require ( github.com/Shopify/sarama v1.32.0