Skip to content

Commit

Permalink
chore(executor/kafka): upgrade sarama version (#277)
Browse files Browse the repository at this point in the history
allow set kafka version

Signed-off-by: Yvonnick Esnault <[email protected]>
  • Loading branch information
yesnault authored Oct 9, 2020
1 parent a103112 commit 259eb6b
Show file tree
Hide file tree
Showing 6 changed files with 175 additions and 76 deletions.
6 changes: 4 additions & 2 deletions executors/kafka/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ In your yaml file, you can use:
- with_sasl_handshaked optional
- user optional
- password optional
- kafka_version optional, defaut is 0.10.2.0

- client_type mandator: producer or consumer

Expand All @@ -27,6 +28,7 @@ In your yaml file, you can use:
# for producer client type:
- messages
- messages_file


```

Expand All @@ -47,7 +49,7 @@ testcases:
user: "{{.kafkaUser}}"
password: "{{.kafkaPwd}}"
addrs:
- "{{.kafkaHost}}:{{kafkaPort}}"
- "{{.kafkaHost}}:{{.kafkaPort}}"
messages:
- topic: test-topic
value: '{"hello":"bar"}'
Expand All @@ -62,7 +64,7 @@ testcases:
messageLimit: 1
groupID: venom
addrs:
- "{{.kafkaHost}}:{{kafkaPort}}"
- "{{.kafkaHost}}:{{.kafkaPort}}"
topics:
- test-topic
assertions:
Expand Down
178 changes: 116 additions & 62 deletions executors/kafka/executor.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package kafka

import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
Expand All @@ -10,7 +11,6 @@ import (
"time"

"github.com/Shopify/sarama"
cluster "github.com/bsm/sarama-cluster"
"github.com/mitchellh/mapstructure"
"github.com/ovh/venom"
"github.com/ovh/venom/executors"
Expand Down Expand Up @@ -66,6 +66,9 @@ type Executor struct {

//MessagesFile represents the messages into the file sended by producer (messages field would be ignored)
MessagesFile string `json:"messages_file,omitempty" yaml:"messages_file,omitempty"`

// Kafka version, default is 0.10.2.0
KafkaVersion string `json:"kafka_version,omitempty" yaml:"kafka_version,omitempty"`
}

// Result represents a step result.
Expand Down Expand Up @@ -126,22 +129,20 @@ func (Executor) Run(testCaseContext venom.TestCaseContext, l venom.Logger, step
}

func (e Executor) produceMessages(workdir string) error {

if len(e.Messages) == 0 && e.MessagesFile == "" {
return fmt.Errorf("At least messages or messagesFile property must be setted")
}

producerCfg := sarama.NewConfig()
producerCfg.Net.TLS.Enable = e.WithTLS // Enable TLS anyway
producerCfg.Net.SASL.Enable = e.WithSASL
producerCfg.Net.SASL.User = e.User
producerCfg.Net.SASL.Password = e.Password
producerCfg.Producer.RequiredAcks = sarama.WaitForLocal
producerCfg.Producer.Retry.Max = 10
producerCfg.Net.DialTimeout = 5 * time.Second
producerCfg.Producer.Return.Successes = true
producerCfg.Producer.Return.Errors = true
sp, err := sarama.NewSyncProducer(e.Addrs, producerCfg)
config, err := e.getKafkaConfig()
if err != nil {
return err
}

config.Producer.RequiredAcks = sarama.WaitForLocal
config.Producer.Retry.Max = 10
config.Producer.Return.Successes = true

sp, err := sarama.NewSyncProducer(e.Addrs, config)
if err != nil {
return err
}
Expand Down Expand Up @@ -182,67 +183,120 @@ func (e Executor) consumeMessages(l venom.Logger) ([]Message, []interface{}, err
return nil, nil, fmt.Errorf("You must provide topics")
}

consumerConfig := cluster.NewConfig()
consumerConfig.Net.TLS.Enable = e.WithTLS
consumerConfig.Net.SASL.Enable = e.WithSASL
consumerConfig.Net.SASL.User = e.User
consumerConfig.Net.SASL.Password = e.Password

config, err := e.getKafkaConfig()
if err != nil {
return nil, nil, err
}
if strings.TrimSpace(e.InitialOffset) == "oldest" {
consumerConfig.Consumer.Offsets.Initial = sarama.OffsetOldest
config.Consumer.Offsets.Initial = sarama.OffsetOldest
}

consumer, err := cluster.NewConsumer(e.Addrs, e.GroupID, e.Topics, consumerConfig)
consumerGroup, err := sarama.NewConsumerGroup(e.Addrs, e.GroupID, config)
if err != nil {
return nil, nil, err
return nil, nil, fmt.Errorf("error instanciate consumer err:%s", err)
}
defer consumer.Close()

timeout := time.Duration(e.Timeout) * time.Millisecond
ctx := context.Background()
ctx, _ = context.WithTimeout(ctx, time.Duration(e.Timeout)*time.Millisecond)

messages := []Message{}
messagesJSON := []interface{}{}
// Track errors
go func() {
for err := range consumerGroup.Errors() {
l.Errorf("error on consume:%s", err)
}
}()

h := &handler{
messages: []Message{},
messagesJSON: []interface{}{},
markOffset: e.MarkOffset,
messageLimit: e.MessageLimit,
logger: l,
}

reading:
for {
select {
case message := <-consumer.Messages():
messages = append(messages, Message{
Topic: message.Topic,
Value: string(message.Value),
})
messageJSONArray := []MessageJSON{}
if err := json.Unmarshal(message.Value, &messageJSONArray); err != nil {
messageJSONMap := map[string]interface{}{}
if err2 := json.Unmarshal(message.Value, &messageJSONMap); err2 == nil {
messagesJSON = append(messagesJSON, MessageJSON{
Topic: message.Topic,
Value: messageJSONMap,
})
} else {
messagesJSON = append(messagesJSON, MessageJSON{
Topic: message.Topic,
Value: string(message.Value),
})
}
if err := consumerGroup.Consume(ctx, e.Topics, h); err != nil {
l.Errorf("error on consume:%s", err)
}

return h.messages, h.messagesJSON, nil
}

func (e Executor) getKafkaConfig() (*sarama.Config, error) {
config := sarama.NewConfig()
config.Net.TLS.Enable = e.WithTLS
config.Net.SASL.Enable = e.WithSASL
config.Net.SASL.User = e.User
config.Net.SASL.Password = e.Password
config.Consumer.Return.Errors = true
config.Net.DialTimeout = 10 * time.Second

if e.KafkaVersion != "" {
kafkaVersion, err := sarama.ParseKafkaVersion(e.KafkaVersion)
if err != nil {
return config, fmt.Errorf("error parsing Kafka version %v err:%s", kafkaVersion, err)
}
config.Version = kafkaVersion
} else {
config.Version = sarama.V0_10_2_0
}

return config, nil
}

// handler represents a Sarama consumer group consumer
type handler struct {
messages []Message
messagesJSON []interface{}
markOffset bool
messageLimit int
logger venom.Logger
}

// Setup is run at the beginning of a new session, before ConsumeClaim
func (h *handler) Setup(s sarama.ConsumerGroupSession) error {
return nil
}

// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (h *handler) Cleanup(sarama.ConsumerGroupSession) error {
return nil
}

// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (h *handler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for message := range claim.Messages() {
h.messages = append(h.messages, Message{
Topic: message.Topic,
Value: string(message.Value),
})
messageJSONArray := []MessageJSON{}
if err := json.Unmarshal(message.Value, &messageJSONArray); err != nil {
messageJSONMap := map[string]interface{}{}
if err2 := json.Unmarshal(message.Value, &messageJSONMap); err2 == nil {
h.messagesJSON = append(h.messagesJSON, MessageJSON{
Topic: message.Topic,
Value: messageJSONMap,
})
} else {
messagesJSON = append(messagesJSON, MessageJSON{
h.messagesJSON = append(h.messagesJSON, MessageJSON{
Topic: message.Topic,
Value: messageJSONArray,
Value: string(message.Value),
})
}
if e.MarkOffset {
consumer.MarkOffset(message, "")
}
if e.MessageLimit > 0 && len(messages) >= e.MessageLimit {
break reading
}
case <-time.After(timeout):
l.Infof("Timeout reached")
break reading
} else {
h.messagesJSON = append(h.messagesJSON, MessageJSON{
Topic: message.Topic,
Value: messageJSONArray,
})
}
if h.markOffset {
session.MarkMessage(message, "")
}
if h.messageLimit > 0 && len(h.messages) >= h.messageLimit {
h.logger.Infof("message limit reached")
return nil
}
session.MarkMessage(message, "delivered")
}

return messages, messagesJSON, nil

return nil
}
12 changes: 3 additions & 9 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,20 @@ module github.com/ovh/venom
go 1.13

require (
github.com/Shopify/sarama v1.18.0
github.com/Shopify/sarama v1.27.1
github.com/Shopify/toxiproxy v2.1.4+incompatible // indirect
github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d
github.com/bsm/sarama-cluster v2.1.15+incompatible
github.com/eapache/go-resiliency v1.1.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 // indirect
github.com/eapache/queue v1.1.0 // indirect
github.com/fatih/color v1.6.0
github.com/frankban/quicktest v1.5.0 // indirect
github.com/fsamin/go-dump v1.0.8
github.com/fullstorydev/grpcurl v1.4.0
github.com/garyburd/redigo v1.6.0
github.com/go-sql-driver/mysql v1.4.1
github.com/go-testfixtures/testfixtures/v3 v3.1.1
github.com/gobuffalo/packr v1.30.1 // indirect
github.com/golang/protobuf v1.3.1
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db // indirect
github.com/google/go-github v15.0.0+incompatible
github.com/google/go-querystring v0.0.0-20170111101155-53e6ce116135 // indirect
github.com/hashicorp/hcl v1.0.0
Expand All @@ -38,9 +35,7 @@ require (
github.com/onsi/ginkgo v1.10.2 // indirect
github.com/onsi/gomega v1.7.0 // indirect
github.com/ovh/go-ovh v0.0.0-20180328085145-498310cd1182
github.com/pierrec/lz4 v2.3.0+incompatible // indirect
github.com/pkg/errors v0.8.1-0.20180311214515-816c9085562c
github.com/rcrowley/go-metrics v0.0.0-20180503174638-e2704e165165 // indirect
github.com/rubenv/sql-migrate v0.0.0-20180217203553-081fe17d19ff
github.com/sclevine/agouti v3.0.1-0.20180306165625-6ada53bb069e+incompatible
github.com/sirupsen/logrus v1.4.2
Expand All @@ -49,11 +44,10 @@ require (
github.com/spf13/cobra v0.0.5
github.com/spf13/viper v1.4.0 // indirect
github.com/streadway/amqp v0.0.0-20200108173154-1c71cc93ed71
github.com/stretchr/testify v1.3.0
github.com/stretchr/testify v1.6.1
github.com/yesnault/go-imap v0.0.0-20160710142244-eb9bbb66bd7b
github.com/ziutek/mymysql v1.5.4 // indirect
golang.org/x/crypto v0.0.0-20190621222207-cc06ce4a13d4
golang.org/x/text v0.3.2 // indirect
golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a
google.golang.org/grpc v1.21.0
gopkg.in/gorp.v1 v1.7.1 // indirect
gopkg.in/ini.v1 v1.34.0 // indirect
Expand Down
Loading

0 comments on commit 259eb6b

Please sign in to comment.