From 4676126e6e46812f452412017717c54c4ebf0b57 Mon Sep 17 00:00:00 2001 From: Jing Liu Date: Mon, 12 Jul 2021 13:18:22 -0700 Subject: [PATCH] Soak test for go client (#617) * Soak test: Soak test for go client. Measure performance for go clients using datadog. --- .travis.yml | 2 +- soaktest/README.md | 40 ++ soaktest/datadog.go | 137 ++++ soaktest/soakclient/soakclient.go | 364 ++++++++++ .../soakclient_transaction.go | 619 ++++++++++++++++++ soaktest/soaktest.go | 270 ++++++++ 6 files changed, 1431 insertions(+), 1 deletion(-) create mode 100644 soaktest/README.md create mode 100644 soaktest/datadog.go create mode 100644 soaktest/soakclient/soakclient.go create mode 100644 soaktest/soakclient_transaction/soakclient_transaction.go create mode 100644 soaktest/soaktest.go diff --git a/.travis.yml b/.travis.yml index 0ccd920ad..eb94cd239 100644 --- a/.travis.yml +++ b/.travis.yml @@ -80,7 +80,7 @@ install: script: # should be replaced with golangci-lint - - if [[ -f .do_lint ]]; then golint -set_exit_status ./examples/... ./kafka/... ./kafkatest/... ; fi + - if [[ -f .do_lint ]]; then golint -set_exit_status ./examples/... ./kafka/... ./kafkatest/... ./soaktest/... ; fi - for dir in kafka ; do (cd $dir && go test -timeout 180s -v ${BUILD_TYPE} ./...) ; done - go-kafkacat --help - library-version diff --git a/soaktest/README.md b/soaktest/README.md new file mode 100644 index 000000000..803d28f2c --- /dev/null +++ b/soaktest/README.md @@ -0,0 +1,40 @@ +# Information for soak test for go client +The soak test contains 2 applications: +soakclient.go for one application : Create Producer-1 to keep sending messages to the input-topic. +Create Consumer-2 to receive messages from the output-topic. + +soakclient_transaction.go for the other application: Create Consumer-1 to receive messages from the input-topic. +Create transactional producer Producer-2 to sending the received messages to the output-topic. +The initTransaction API registers a transactional.id when creating the producer. +The producer sends the messages to the output-topic. +Commit transaction after sending 100 messages each time or 1 second. + +# Instruction on configuration and running + +# Build soaktest API + $ cd soaktest + $ go mod init soaktest + $ go mod tidy + $ go install soaktest + +# Build and run soakclient.go + $ cd soaktest/soakclient + $ go build + $ ./soakclient -broker -inputTopic <..> -outTopic <..> -groupID <..> -inputTopicPartitionsNum <..> -outTopicPartitionsNum <..> -replicationFactor <..> -ccloudAPIKey -ccloudAPISecret + +# Build and run soakclient_transaction.go + $ cd soaktest/soakclient_transaction + $ go build + $ ./soakclient_transaction -broker -inputTopic <..> -outTopic <..> -outTopicPartitionsNum <..> -groupID <..> -transactionID <..> -ccloudAPIKey -ccloudAPISecret + +### The default values if doesn't input from the command lines + inputTopic: "inputTopic" + outTopic: "outTopic" + groupID : "groupID" + inputTopicPartitionsNum: 1 + outTopicPartitionsNum: 1 + replicationFactor: 1 + transactionID: transactionID + +# Recommend instance type and size +At lease 4 CPUs and 8 GB volume size diff --git a/soaktest/datadog.go b/soaktest/datadog.go new file mode 100644 index 000000000..bbcdc6281 --- /dev/null +++ b/soaktest/datadog.go @@ -0,0 +1,137 @@ +package soaktest + +/** + * Copyright 2021 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import ( + "github.com/DataDog/datadog-go/statsd" + "github.com/shirou/gopsutil/process" + "os" + "sync" + "syscall" + "time" +) + +const datadogHost = "127.0.0.1:8125" +const ddPfx = "kafka.client.soak.go." + +var lastRusage syscall.Rusage +var lastRusageTime time.Time + +const memoryRss = "memory.rss." +const cpuUser = "cpu.user" +const cpuSystem = "cpu.system" +const memoryRssMax = "memory.rss.max" + +var client, err = statsd.New(datadogHost) +var proc, _ = process.NewProcess(int32(os.Getpid())) + +// DatadogIncrement submits increment type metrics +func DatadogIncrement(metricsName string, incrval float64, tags []string) { + IncreTags := []string{"environment:dev"} + if tags != nil { + IncreTags = tags + } + client.Incr(ddPfx+metricsName, IncreTags, incrval) +} + +// DatadogGauge submits gauge type metrics +func DatadogGauge(metricsName string, + val float64, + tags []string) { + gaugeTags := []string{"environment:dev"} + if tags != nil { + gaugeTags = tags + } + client.Gauge(ddPfx+metricsName, val, gaugeTags, 1) +} + +// calcRusageDeltas calculates user CPU usage, system CPU usage +// and max rss memory +func calcRusageDeltas(metricsName string, + curr, prev syscall.Rusage, + elapsed float64) { + // User CPU % + userCPU := ((float64)(curr.Utime.Sec - prev.Utime.Sec)) / elapsed * 100.0 + DatadogGauge(metricsName+cpuUser, userCPU, nil) + + //System CPU % + sysCPU := ((float64)(curr.Stime.Sec - prev.Stime.Sec)) / elapsed * 100.0 + DatadogGauge(metricsName+cpuSystem, sysCPU, nil) + + //Max RSS memory (monotonic) + maxRss := float64(curr.Maxrss) / 1024.0 + DatadogGauge(metricsName+memoryRssMax, maxRss, nil) + + InfoLogger.Printf("User CPU: %f, System CPU: %f, MaxRSS %f MiB\n", + userCPU, sysCPU, maxRss) + +} + +// GetRusageMetrics is the entrance for resource calculation +// If caught a terminate signal, return, else call the GetRusage() function +// to calculate resource usage every 10 seconds +func GetRusageMetrics(metricsName string, + wg *sync.WaitGroup, + doneChan chan bool, + errorChan chan error) { + defer wg.Done() + ticker := time.NewTicker(10000 * time.Millisecond) + run := true + for run { + select { + case <-doneChan: + run = false + case <-ticker.C: + err := GetRusage(metricsName) + if err != nil { + ErrorLogger.Printf("Failed to get resource usage %s\n", err) + errorChan <- err + return + } + } + } +} + +// GetRusage calculates RSS memory usage +func GetRusage(metricsName string) error { + var ru syscall.Rusage + if err := syscall.Getrusage(syscall.RUSAGE_SELF, &ru); err != nil { + ErrorLogger.Printf("Error: unable to gather resource usage data: %s\n", + err) + return err + } + + now := time.Now() + if !lastRusageTime.IsZero() { + calcRusageDeltas(metricsName, + ru, + lastRusage, + float64(now.Sub(lastRusageTime)/time.Millisecond)) + } + lastRusage = ru + lastRusageTime = now + + // Current RSS memory + memoryInfo, err := proc.MemoryInfo() + if err != nil { + ErrorLogger.Printf("Error: unable to gather memory info: %s\n", err) + return err + } + rss := float64(memoryInfo.RSS) / (1024.0 * 1024.0) + DatadogGauge(memoryRss+metricsName, rss, nil) + return nil +} diff --git a/soaktest/soakclient/soakclient.go b/soaktest/soakclient/soakclient.go new file mode 100644 index 000000000..c99ff92db --- /dev/null +++ b/soaktest/soakclient/soakclient.go @@ -0,0 +1,364 @@ +package main + +/** + * Copyright 2021 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import ( + "context" + "flag" + "fmt" + "os" + "os/signal" + "soaktest" + "sync" + "syscall" + "time" + + "github.com/confluentinc/confluent-kafka-go/kafka" +) + +const endToEnd = "end.to.end." +const rate = 2 +const producerType = "InputProducer" +const consumerType = "OutputConsumer" + +func main() { + + broker := flag.String("broker", "", "bootstrap servers") + inputTopic := flag.String("inputTopic", "inputTopic", + "producer will deliver messages to this topic") + outTopic := flag.String("outTopic", "outTopic", + "consumer will consume messages from this topic") + groupID := flag.String("groupID", "groupID", + "the group consumer will join") + inputTopicPartitionsNum := flag.Int("inputTopicPartitionsNum", 4, + "inputTopic partition number") + outTopicPartitionsNum := flag.Int("outTopicPartitionsNum", 4, + "outTopic partition number") + replicationFactor := flag.Int("replicationFactor", 1, "topic replication") + ccloudAPIKey := flag.String("ccloudAPIKey", "", "sasl username") + ccloudAPISecret := flag.String("ccloudAPISecret", "", "sasl password") + + flag.Parse() + + soaktest.InitLogFiles(fmt.Sprintf("../log/soakclient_%s.log", + time.Now().Format("2006-01-02"))) + soaktest.InfoLogger.Printf("Starting the application...\n") + + num, version := kafka.LibraryVersion() + soaktest.InfoLogger.Printf("num = %d, librdkafka %q\n", num, version) + + var wg sync.WaitGroup + doneChan := make(chan bool, 1) + errorChan := make(chan error, 1) + + wg.Add(1) + go soaktest.GetRusageMetrics(endToEnd, &wg, doneChan, errorChan) + + maxDuration, err := time.ParseDuration("30s") + if err != nil { + soaktest.ErrorLogger.Printf("Failed to create maxDuration with "+ + "err %s\n", err) + os.Exit(1) + } + ctx, cancel := context.WithTimeout(context.Background(), maxDuration) + defer cancel() + + newTopics := []kafka.TopicSpecification{ + {Topic: *inputTopic, + NumPartitions: *inputTopicPartitionsNum, + ReplicationFactor: *replicationFactor}, + {Topic: *outTopic, + NumPartitions: *outTopicPartitionsNum, + ReplicationFactor: *replicationFactor}} + + err = CreateTopic(ctx, newTopics, *broker, *ccloudAPIKey, *ccloudAPISecret) + if err != nil { + soaktest.ErrorLogger.Printf("Failed to create topics: %s\n", err) + close(doneChan) + os.Exit(1) + } + + wg.Add(1) + go producer(*inputTopic, *broker, *ccloudAPIKey, + *ccloudAPISecret, &wg, doneChan, errorChan, + uint64(*inputTopicPartitionsNum)) + + wg.Add(1) + go consumer(*outTopic, *broker, *groupID, *ccloudAPIKey, + *ccloudAPISecret, &wg, doneChan, errorChan) + + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) + + select { + case sig := <-sigChan: + soaktest.InfoLogger.Printf("Signal caught: %v\n", sig) + case err := <-errorChan: + soaktest.ErrorLogger.Printf("Error caught: %s\n", err) + } + + close(doneChan) + close(errorChan) + for err = range errorChan { + soaktest.ErrorLogger.Printf("Error caught: %s\n", err) + } + wg.Wait() + soaktest.PrintConsumerStatus(consumerType) + soaktest.PrintProducerStatus(producerType) +} + +// CreateTopic creates the topics if it doesn't exist +func CreateTopic(ctx context.Context, + topics []kafka.TopicSpecification, + broker, ccloudAPIKey, ccloudAPISecret string) error { + conf := kafka.ConfigMap{ + "bootstrap.servers": broker, + "sasl.mechanisms": "PLAIN", + "security.protocol": "SASL_SSL", + "sasl.username": ccloudAPIKey, + "sasl.password": ccloudAPISecret} + AdminClient, err := kafka.NewAdminClient(&conf) + if err != nil { + soaktest.ErrorLogger.Printf("Failed to create AdminClient: %s\n", err) + return err + } + + result, err := AdminClient.CreateTopics(ctx, topics) + if err != nil { + soaktest.ErrorLogger.Printf("Failed to create new topics: %s\n", err) + return err + } + + for _, res := range result { + switch res.Error.Code() { + case kafka.ErrTopicAlreadyExists: + soaktest.ErrorLogger.Printf("Failed to create topic %s: %s\n", + res.Topic, res.Error) + + case kafka.ErrNoError: + soaktest.ErrorLogger.Printf("Succeed to create topic %s\n", + res.Topic) + + default: + err = fmt.Errorf("failed to create topic %s: %s", + res.Topic, res.Error) + return err + } + } + return nil +} + +// terminateProducer waits for all messages in the Producer queue to +// be delivered, closes the producer and pass error to the error channel +func terminateProducer(p *kafka.Producer, err error, errorChan chan error) { + remaining := p.Flush(30) + soaktest.InfoLogger.Printf("producer: %d message(s) remaining in queue "+ + "after flush()\n", remaining) + p.Close() + errorChan <- err +} + +// terminateConsumer closes consumer and pass error to the error channel +func terminateConsumer(c *kafka.Consumer, err error, errorChan chan error) { + c.Close() + errorChan <- err +} + +// producer produces messages to input topic +func producer(inputTopic, broker, ccloudAPIKey, ccloudAPISecret string, + wg *sync.WaitGroup, termChan chan bool, errorChan chan error, + partitionNum uint64) { + defer wg.Done() + p, err := kafka.NewProducer(&kafka.ConfigMap{ + "bootstrap.servers": broker, + "statistics.interval.ms": 5000, + "sasl.mechanisms": "PLAIN", + "security.protocol": "SASL_SSL", + "sasl.username": ccloudAPIKey, + "sasl.password": ccloudAPISecret}) + if err != nil { + soaktest.ErrorLogger.Printf("Failed to create producer %s\n", err) + errorChan <- err + return + } + + run := true + doneChan := make(chan bool) + tags := []string{fmt.Sprintf("topic:%s", inputTopic)} + partitionToMsgIDMap := make(map[uint64]uint64) + + go func() { + doTerm := false + for !doTerm { + select { + case e := <-p.Events(): + switch ev := e.(type) { + case *kafka.Message: + // Message delivery report + soaktest.ProducerDeliveryCheck(ev) + + case *kafka.Stats: + err = soaktest.HandleStatsEvent(ev, inputTopic) + if err != nil { + soaktest.ErrorLogger.Printf("Failed to "+ + "HandleStatsEvent: %s\n", err) + errorChan <- err + } + + case kafka.Error: + // kafka.Errors should generally be + // considered informational, the client + // will try to automatically recover. + soaktest.ProducerErrCnt++ + soaktest.DatadogIncrement(soaktest.ProducerError, 1, tags) + soaktest.ErrorLogger.Printf("kafka.Error: %v: %v for "+ + "producer %v\n", ev.Code(), ev, p) + + default: + soaktest.WarningLogger.Printf("Ignored event: %v for"+ + "producer %v\n", ev, p) + } + case <-doneChan: + doTerm = true + } + } + close(doneChan) + }() + + value := "Hello Go!" + + sleepIntvl := 1.0 / rate * 1000 + ticker := time.NewTicker(time.Millisecond * time.Duration(sleepIntvl)) + + for run { + select { + case <-termChan: + doneChan <- true + run = false + case <-ticker.C: + encodingTime, err := time.Now().GobEncode() + if err != nil { + soaktest.ErrorLogger.Printf("Failed to get MarshalText "+ + "%s\n", err) + doneChan <- true + terminateProducer(p, err, errorChan) + return + } + key := soaktest.ProduceMsgCnt % partitionNum + msgid := partitionToMsgIDMap[key] + 1 + err = p.Produce(&kafka.Message{ + TopicPartition: kafka.TopicPartition{ + Topic: &inputTopic, + Partition: kafka.PartitionAny}, + Key: soaktest.ConvertUint64ToByteArray(key), + Value: []byte(value), + Headers: []kafka.Header{{Key: "time", + Value: encodingTime}, + {Key: "msgid", + Value: soaktest.ConvertUint64ToByteArray(msgid)}}, + }, nil) + if err != nil { + soaktest.ErrorLogger.Printf("Failed to Produce message to "+ + "inputTopic %s, %s\n", inputTopic, err) + soaktest.DatadogIncrement(soaktest.FailedToProduceMsg, 1, tags) + } else { + partitionToMsgIDMap[key] = msgid + soaktest.ProduceMsgCnt++ + } + } + } + remaining := p.Flush(30) + p.Close() + soaktest.InfoLogger.Printf("producer: %d message(s) remaining in queue "+ + "after flush()\n", remaining) +} + +// consumer receives and verifies messages from output topic +func consumer(topic, broker, groupID, ccloudAPIKey, ccloudAPISecret string, + wg *sync.WaitGroup, termChan chan bool, errorChan chan error) { + wg.Done() + hwmarks := make(map[string]uint64) + c, err := kafka.NewConsumer(&kafka.ConfigMap{ + "bootstrap.servers": broker, + "group.id": groupID, + "auto.offset.reset": "earliest", + "statistics.interval.ms": 5000, + "sasl.mechanisms": "PLAIN", + "security.protocol": "SASL_SSL", + "sasl.username": ccloudAPIKey, + "sasl.password": ccloudAPISecret, + }) + if err != nil { + soaktest.ErrorLogger.Printf("Failed to create consumer: %s\n", err) + errorChan <- err + return + } + + err = c.Subscribe(topic, nil) + + if err != nil { + soaktest.ErrorLogger.Printf("Failed to Subscribe to topic: %s with "+ + "error %s\n", topic, err) + terminateConsumer(c, err, errorChan) + return + } + + run := true + tags := []string{fmt.Sprintf("topic:%s", topic)} + + for run { + select { + case <-termChan: + run = false + default: + ev := c.Poll(100) + if ev == nil { + continue + } + + switch e := ev.(type) { + case *kafka.Message: + soaktest.HandleMessage(e, hwmarks) + + case *kafka.Stats: + err = soaktest.HandleStatsEvent(e, topic) + if err != nil { + soaktest.ErrorLogger.Printf("Failed to "+ + "HandleStatsEvent: %s\n", err) + terminateConsumer(c, err, errorChan) + return + } + + case kafka.Error: + // kafka.Errors should generally be + // considered informational, the client + // will try to automatically recover. + soaktest.ConsumerErrCnt++ + soaktest.DatadogIncrement(soaktest.ConsumerError, 1, tags) + soaktest.ErrorLogger.Printf("kafka.Error: %v: %v for "+ + "consumer %v\n", e.Code(), e, c) + + default: + soaktest.WarningLogger.Printf("Ignored %v for consumer %v\n", + e, c) + } + + } + } + c.Close() + soaktest.ErrorLogger.Printf("Consumer closed\n") +} diff --git a/soaktest/soakclient_transaction/soakclient_transaction.go b/soaktest/soakclient_transaction/soakclient_transaction.go new file mode 100644 index 000000000..0341027b2 --- /dev/null +++ b/soaktest/soakclient_transaction/soakclient_transaction.go @@ -0,0 +1,619 @@ +package main + +/** + * Copyright 2021 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import ( + "context" + "flag" + "fmt" + "os" + "os/signal" + "soaktest" + "sync" + "syscall" + "time" + + "github.com/confluentinc/confluent-kafka-go/kafka" +) + +const producerTransactionCommitSucceed = "producer.transaction.commit.succeed" +const producerTransactionCommitFailed = "producer.transaction.commit.failed" +const producerTransactionAbortSucceed = "producer.transaction.abort.succeed" +const producerTransactionAbortFailed = "producer.transaction.abort.failed" +const transaction = "transaction." + +const producerType = "TxnProducer" +const consumerType = "TxnConsumer" + +var msgMissCntForInput uint64 +var msgDupCntForInput uint64 +var retryNum = 3 + +func main() { + + broker := flag.String("broker", "", "bootstrap servers") + inputTopic := flag.String("inputTopic", "inputTopic", + "producer will deliver messages to this topic") + outTopic := flag.String("outTopic", "outTopic", + "consumer will consume messages from this topic") + outTopicPartitionsNum := flag.Int("outTopicPartitionsNum", 1, + "outTopic partition number") + groupID := flag.String("groupID", "groupID", + "the group consumer will join") + transactionID := flag.String("transactionID", "transactionID", + "transaction id") + ccloudAPIKey := flag.String("ccloudAPIKey", "", "sasl username") + ccloudAPISecret := flag.String("ccloudAPISecret", "", "sasl password") + + flag.Parse() + + soaktest.InitLogFiles(fmt.Sprintf("../log/soakclient_transaction_%s.log", + time.Now().Format("2006-01-02"))) + soaktest.InfoLogger.Printf("Starting the application...") + + num, version := kafka.LibraryVersion() + soaktest.InfoLogger.Printf("num = %d, librdkafka %q\n", num, version) + + var wg sync.WaitGroup + doneChan := make(chan bool, 1) + errorChan := make(chan error, 1) + + wg.Add(1) + go soaktest.GetRusageMetrics(transaction, &wg, doneChan, errorChan) + + wg.Add(1) + go verifyProducerTransaction(*broker, *inputTopic, *outTopic, *groupID, + *transactionID, *ccloudAPIKey, *ccloudAPISecret, &wg, doneChan, + errorChan, uint64(*outTopicPartitionsNum)) + + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) + + select { + case sig := <-sigChan: + soaktest.InfoLogger.Printf("Signal caught: %v", sig) + case err := <-errorChan: + soaktest.ErrorLogger.Printf("Error caught: %v", err) + } + + close(doneChan) + close(errorChan) + for err := range errorChan { + soaktest.ErrorLogger.Printf("Error caught: %s\n", err) + } + wg.Wait() + soaktest.PrintConsumerStatus(consumerType) + soaktest.PrintProducerStatus(producerType) +} + +// getConsumerPosition gets consumer position according to partition id +func getConsumerPosition(consumer *kafka.Consumer, partition int32, + topic string) (kafka.TopicPartition, error) { + position, err := consumer.Position([]kafka.TopicPartition{{ + Topic: &topic, Partition: partition}}) + if err != nil { + soaktest.ErrorLogger.Printf("Failed to get position %s\n", err) + return kafka.TopicPartition{}, err + } else if len(position) == 0 { + err = fmt.Errorf("the position doesn't contain any element") + return kafka.TopicPartition{}, err + } + return position[0], nil +} + +// getAllPartitionPositions get offsets for all partitions +func getAllPartitionPositions(positionMap map[int32]kafka.TopicPartition) []kafka.TopicPartition { + partitionPosition := make([]kafka.TopicPartition, 0, len(positionMap)) + + for _, v := range positionMap { + partitionPosition = append(partitionPosition, v) + } + return partitionPosition +} + +// terminate closes producer and consumer, and also pass error to the +// error channel +func terminate(c *kafka.Consumer, p *kafka.Producer, err error, + errorChan chan error) { + c.Close() + p.Close() + errorChan <- err +} + +// verifyProducerTransaction receivev messages from input topic, +// then produce it the output topic with transaction producer. +// Commit transaction every 100 messages or every 1 second which every +// comes first +func verifyProducerTransaction(broker, inputTopic, outTopic, groupID, + transactionID, ccloudAPIKey, ccloudAPISecret string, wg *sync.WaitGroup, + termChan chan bool, errorChan chan error, outTopicPartitionsNum uint64) { + defer wg.Done() + transactionCommitNum := 1 + hwmarks := make(map[string]uint64) + hwmarksLastCommitted := make(map[string]uint64) + partitionPositionMap := make(map[int32]kafka.TopicPartition) + partitionToMsgIDMap := make(map[uint64]uint64) + partitionToMsgIDMapLastCommitted := make(map[uint64]uint64) + ticker := time.NewTicker(1000 * time.Millisecond) + + c, err := kafka.NewConsumer(&kafka.ConfigMap{ + "bootstrap.servers": broker, + "group.id": groupID, + "go.events.channel.enable": true, + "statistics.interval.ms": 5000, + "sasl.mechanisms": "PLAIN", + "security.protocol": "SASL_SSL", + "sasl.username": ccloudAPIKey, + "sasl.password": ccloudAPISecret}) + + if err != nil { + soaktest.ErrorLogger.Printf("Failed to create consumer: %s\n", err) + errorChan <- err + return + } + + err = c.Subscribe(inputTopic, nil) + if err != nil { + soaktest.ErrorLogger.Printf("Failed to subscribe to topic: %s with "+ + "error %s\n", inputTopic, err) + c.Close() + errorChan <- err + return + } + + p, err := kafka.NewProducer(&kafka.ConfigMap{ + "bootstrap.servers": broker, + "transactional.id": transactionID, + "statistics.interval.ms": 5000, + "sasl.mechanisms": "PLAIN", + "security.protocol": "SASL_SSL", + "sasl.username": ccloudAPIKey, + "sasl.password": ccloudAPISecret, + }) + if err != nil { + soaktest.ErrorLogger.Printf("Failed to create transactional "+ + "producer: %s\n", err) + c.Close() + errorChan <- err + return + } + + maxDuration, err := time.ParseDuration("15s") + if err != nil { + soaktest.ErrorLogger.Printf("Failed to parse Duration %v\n", err) + terminate(c, p, err, errorChan) + return + } + + ctx, cancel := context.WithTimeout(context.Background(), maxDuration) + defer cancel() + + // If InitTransactions succeed, will continue to BeginTransaction + // If InitTransactions failed with err.(kafka.Error).IsRetriable(), + // sleep 3 seconds then retry + // If failed with other errors, return the fatal error + for i := 0; i < retryNum; i++ { + err = p.InitTransactions(ctx) + if err != nil { + soaktest.ErrorLogger.Printf("InitTransactions() failed: %s\n", err) + if err.(kafka.Error).IsRetriable() { + if i == retryNum-1 { + soaktest.ErrorLogger.Printf("InitTransactions() failed "+ + "after %d times retries: %s\n", retryNum, err) + terminate(c, p, err, errorChan) + return + } + time.Sleep(3 * time.Second) + continue + } else { + soaktest.ErrorLogger.Printf("InitTransactions() failed: %s\n", + err) + terminate(c, p, err, errorChan) + return + } + } else { + break + } + } + + //Start producer transaction. + err = p.BeginTransaction() + if err != nil { + soaktest.ErrorLogger.Printf("Failed to begin transaction %s\n", err) + terminate(c, p, err, errorChan) + return + } + + producerTags := []string{fmt.Sprintf("topic:%s", outTopic)} + consumerTags := []string{fmt.Sprintf("topic:%s", inputTopic)} + + run := true + committed := false + for run == true { + select { + case <-termChan: + run = false + if soaktest.ProduceMsgCnt != 0 && !committed { + transactionCommitNum, err = commitTransaction( + getAllPartitionPositions(partitionPositionMap), + p, + c, + transactionCommitNum, + hwmarks, + hwmarksLastCommitted, + partitionToMsgIDMap, + partitionToMsgIDMapLastCommitted) + } + case <-ticker.C: + if soaktest.ProduceMsgCnt != 0 && !committed { + transactionCommitNum, err = commitTransaction( + getAllPartitionPositions(partitionPositionMap), + p, + c, + transactionCommitNum, + hwmarks, + hwmarksLastCommitted, + partitionToMsgIDMap, + partitionToMsgIDMapLastCommitted) + if err != nil { + soaktest.ErrorLogger.Printf("Failed to commit "+ + "transaction: %v\n", err) + terminate(c, p, err, errorChan) + return + } + committed = true + p.BeginTransaction() + } + case ev := <-p.Events(): + // Producer delivery report + switch e := ev.(type) { + case *kafka.Message: + soaktest.ProducerDeliveryCheck(e) + + case *kafka.Stats: + err = soaktest.HandleStatsEvent(e, outTopic) + if err != nil { + soaktest.ErrorLogger.Printf("Failed to handle stats "+ + "event for producer %v\n", err) + terminate(c, p, err, errorChan) + return + } + + case kafka.Error: + // kafka.Errors should generally be + // considered informational, the client + // will try to automatically recover. + soaktest.ProducerErrCnt++ + soaktest.DatadogIncrement(soaktest.ProducerError, + 1, producerTags) + soaktest.ErrorLogger.Printf("kafka.Error: %v: %v for "+ + "producer %v\n", e.Code(), e, p) + + default: + soaktest.WarningLogger.Printf("Ignored event: %s for "+ + "producer %v\n", ev, p) + } + + default: + var ev = c.Poll(100) + if ev == nil { + continue + } + switch e := ev.(type) { + case *kafka.Message: + if !soaktest.HandleMessage(e, hwmarks) { + soaktest.ErrorLogger.Printf("Message receive failed, " + + "skip producing message\n") + continue + } + key := soaktest.ProduceMsgCnt % outTopicPartitionsNum + msgid := partitionToMsgIDMap[key] + 1 + err = produceMessage(e, p, msgid, key, outTopic) + if err != nil { + soaktest.ErrorLogger.Printf("Failed to Produce message "+ + "to outTopic %s, %s\n", outTopic, err) + continue + } + partitionToMsgIDMap[key] = msgid + soaktest.ProduceMsgCnt++ + position, err := getConsumerPosition( + c, + e.TopicPartition.Partition, + inputTopic) + if err != nil { + soaktest.ErrorLogger.Printf("Failed to get consumer "+ + "position: %v\n", err) + terminate(c, p, err, errorChan) + return + } + partitionPositionMap[e.TopicPartition.Partition] = position + if soaktest.ProduceMsgCnt%100 == 0 && !committed { + transactionCommitNum, err = commitTransaction( + getAllPartitionPositions(partitionPositionMap), + p, + c, + transactionCommitNum, + hwmarks, + hwmarksLastCommitted, + partitionToMsgIDMap, + partitionToMsgIDMapLastCommitted) + if err != nil { + soaktest.ErrorLogger.Printf("Failed to commit "+ + "transaction %s\n", err) + terminate(c, p, err, errorChan) + return + } + committed = true + p.BeginTransaction() + } else { + committed = false + } + + case *kafka.Stats: + err = soaktest.HandleStatsEvent(e, inputTopic) + if err != nil { + soaktest.ErrorLogger.Printf("Failed to handle stats "+ + "event for consumer %v\n", err) + terminate(c, p, err, errorChan) + return + } + + case kafka.Error: + // kafka.Errors should generally be + // considered informational, the client + // will try to automatically recover. + soaktest.ConsumerErrCnt++ + soaktest.DatadogIncrement(soaktest.ConsumerError, + 1, consumerTags) + soaktest.ErrorLogger.Printf("kafka.Error: %v: %v for "+ + "consumer %v\n", e.Code(), e, c) + + default: + soaktest.WarningLogger.Printf("Ignored %v for consumer %v\n", + e, c) + } + } + } + p.Close() + c.Close() +} + +// produceMessage produces messages to output topic, +// the producer is transaction producer +func produceMessage(e *kafka.Message, + p *kafka.Producer, + msgid, key uint64, + outTopic string) error { + encodingTime, err := time.Now().GobEncode() + if err != nil { + soaktest.ErrorLogger.Printf("Failed to encode current time %v\n", err) + return err + } + + tags := []string{fmt.Sprintf("topic:%s", outTopic)} + err = p.Produce(&kafka.Message{ + TopicPartition: kafka.TopicPartition{ + Topic: &outTopic, + Partition: kafka.PartitionAny}, + Value: []byte(string(e.Value)), + Key: soaktest.ConvertUint64ToByteArray(key), + Headers: []kafka.Header{{Key: "time", + Value: encodingTime}, + {Key: "msgid", + Value: soaktest.ConvertUint64ToByteArray(msgid)}}, + }, nil) + if err != nil { + soaktest.ErrorLogger.Printf("Failed to Produce message to "+ + "outTopic %s, %s\n", outTopic, err) + soaktest.DatadogIncrement(soaktest.FailedToProduceMsg, 1, tags) + } + return err +} + +// commitTransaction commits transaction to output topic. +// Abort transaction once every 100 commit transactions +func commitTransaction(partitionPosition []kafka.TopicPartition, + p *kafka.Producer, + c *kafka.Consumer, + transactionCommitNum int, + hwmarks, hwmarksLastCommitted map[string]uint64, + partitionToMsgIDMap, partitionToMsgIDMapLastCommitted map[uint64]uint64) (commitNum int, err error) { + soaktest.InfoLogger.Printf("=== Committing transaction===\n") + + cgmd, err := c.GetConsumerGroupMetadata() + if err != nil { + soaktest.ErrorLogger.Printf("Failed to get Consumer Group "+ + "Metadata %v\n", err) + return transactionCommitNum, err + } + + // If SendOffsetsToTransaction succeed, will continue to commit + // or abort transaction + // If SendOffsetsToTransaction failed with err.(kafka.Error).IsRetriable(), + // sleep 3 seconds then retry + // If SendOffsetsToTransaction failed with err.(kafka.Error).TxnRequiresAbort(), + // AbortTransaction and return (transactionCommitNum, err) + // If failed with other errors, return transactionCommitNum and the + // fatal error + for i := 0; i < retryNum; i++ { + err = p.SendOffsetsToTransaction(nil, partitionPosition, cgmd) + if err != nil { + soaktest.ErrorLogger.Printf("SendOffsetsToTransaction() "+ + "failed: %s\n", err) + if err.(kafka.Error).IsRetriable() { + if i == retryNum-1 { + soaktest.ErrorLogger.Printf("SendOffsetsToTransaction() "+ + "failed with max retry %d times: %s\n", retryNum, err) + return transactionCommitNum, err + } + time.Sleep(3 * time.Second) + continue + } else if err.(kafka.Error).TxnRequiresAbort() { + err = p.AbortTransaction(nil) + if err != nil { + soaktest.ErrorLogger.Printf("AbortTransaction() "+ + "failed: %s\n", err) + return transactionCommitNum, err + } + rewindConsumerPosition(c) + return transactionCommitNum, nil + } else { + return transactionCommitNum, err + } + } else { + break + } + } + + if transactionCommitNum%100 != 0 { + // If CommitTransaction succeed, transactionCommitNum + 1 and return + // If CommitTransaction failed with err.(kafka.Error).IsRetriable(), + // sleep 3 seconds then retry + // If CommitTransaction failed with + // err.(kafka.Error).TxnRequiresAbort(), + // AbortTransaction and return (transactionCommitNum, err) + // If failed with other errors, return transactionCommitNum and the + // fatal error + for i := 0; i < retryNum; i++ { + err = p.CommitTransaction(nil) + if err != nil { + if i == 0 { + soaktest.DatadogIncrement( + producerTransactionCommitFailed, 1, nil) + } + soaktest.ErrorLogger.Printf("CommitTransaction() failed: %s\n", + err) + if err.(kafka.Error).IsRetriable() { + if i == retryNum-1 { + soaktest.ErrorLogger.Printf("CommitTransaction() "+ + "failed with max retry %d times: %s\n", + retryNum, err) + return transactionCommitNum, err + } + time.Sleep(3 * time.Second) + continue + } else if err.(kafka.Error).TxnRequiresAbort() { + err = p.AbortTransaction(nil) + if err != nil { + soaktest.ErrorLogger.Printf("AbortTransaction() "+ + "failed: %s\n", err) + return transactionCommitNum, err + } + err = rewindConsumerPosition(c) + if err != nil { + soaktest.ErrorLogger.Printf("rewindConsumerPosition()"+ + " failed: %s\n", err) + } + return transactionCommitNum, nil + } else { + return transactionCommitNum, err + } + } else { + for k, v := range hwmarks { + hwmarksLastCommitted[k] = v + } + for k, v := range partitionToMsgIDMap { + partitionToMsgIDMapLastCommitted[k] = v + } + soaktest.DatadogIncrement( + producerTransactionCommitSucceed, 1, nil) + transactionCommitNum++ + return transactionCommitNum, nil + } + } + } else { + // If AbortTransaction succeed, transactionCommitNum = 1 and return + // If AbortTransaction failed with err.(kafka.Error).IsRetriable(), + // sleep 3 seconds then retry + // If failed with other errors, return transactionCommitNum and the + // fatal error + for i := 0; i < retryNum; i++ { + err = p.AbortTransaction(nil) + if err != nil { + if i == 0 { + soaktest.DatadogIncrement( + producerTransactionAbortFailed, 1, nil) + } + soaktest.ErrorLogger.Printf("AbortTransaction() failed: %s\n", + err) + if err.(kafka.Error).IsRetriable() { + if i == retryNum-1 { + soaktest.ErrorLogger.Printf("AbortTransaction() "+ + "failed with max retry %d times: %s\n", retryNum, err) + return transactionCommitNum, err + } + time.Sleep(3 * time.Second) + continue + } else { + soaktest.ErrorLogger.Printf("AbortTransaction() "+ + "failed: %s\n", err) + return transactionCommitNum, err + } + } else { + soaktest.DatadogIncrement( + producerTransactionAbortSucceed, + 1, + nil) + transactionCommitNum = 1 + err = rewindConsumerPosition(c) + if err != nil { + soaktest.ErrorLogger.Printf("rewindConsumerPosition() "+ + "failed: %s\n", err) + } + // If AbortTransaction() happens, rewind the msgid to the + // last committed msid per partition + for k, v := range hwmarksLastCommitted { + hwmarks[k] = v + } + for k, v := range partitionToMsgIDMapLastCommitted { + partitionToMsgIDMap[k] = v + } + return transactionCommitNum, err + } + } + } + + return transactionCommitNum, err +} + +// rewindConsumerPosition Rewinds consumer's position to the +// pre-transaction offset +func rewindConsumerPosition(c *kafka.Consumer) error { + assignment, err := c.Assignment() + if err != nil { + soaktest.ErrorLogger.Printf("Assignment() failed: %s\n", err) + return err + } + + committed, err := c.Committed(assignment, 30*1000 /* 30s */) + if err != nil { + soaktest.ErrorLogger.Printf("Committed() failed: %s\n", err) + return err + } + + for _, tp := range committed { + if tp.Offset < 0 { + tp.Offset = kafka.OffsetBeginning + } + err := c.Seek(tp, 1) + if err != nil { + soaktest.ErrorLogger.Printf("Seek() failed: %s\n", err) + return err + } + } + return nil +} diff --git a/soaktest/soaktest.go b/soaktest/soaktest.go new file mode 100644 index 000000000..21ae5f75e --- /dev/null +++ b/soaktest/soaktest.go @@ -0,0 +1,270 @@ +package soaktest + +/** + * Copyright 2021 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import ( + "encoding/binary" + "encoding/json" + "fmt" + "log" + "os" + "path/filepath" + "strconv" + "time" + + "github.com/confluentinc/confluent-kafka-go/kafka" +) + +var drErrCnt uint64 +var drCnt uint64 +var producerErrorCbCnt uint64 +var msgDupCntForOutput uint64 +var msgMissCntForOutput uint64 +var consumerMsgCnt uint64 +var consumerMsgErrs uint64 + +// ConsumerErrCnt counts the number of consumer client-level errors +var ConsumerErrCnt uint64 + +// ProducerErrCnt counts the number of producer client-level errors +var ProducerErrCnt uint64 + +// ProduceMsgCnt is msg counter for producer +var ProduceMsgCnt uint64 + +// WarningLogger logs warning level logs to the log file +var WarningLogger *log.Logger + +// InfoLogger logs info level logs to the log file +var InfoLogger *log.Logger + +// ErrorLogger logs error level logs to the log file +var ErrorLogger *log.Logger + +// ConsumerError combines the metrics name for consumer client-level errors +const ConsumerError = "consumer.err" + +// ProducerError combines the metrics name for producer client-level errors +const ProducerError = "producer.err" + +// FailedToProduceMsg combines the metrics name if failed to produce messages +const FailedToProduceMsg = "failed.to.produce.message" + +var producerDr = "producer.dr" +var producerDrErr = "producer.dr.err" + +const consumerConsumeMsg = "consumer.consume.msg" +const consumerReceiveError = "consumer.receive.err" +const latency = "latency" +const consumerConsumeDupMSG = "consumer.consume.dup.msg" +const consumerConsumeMissMSG = "consumer.consume.miss.msg" +const brokerRttP99 = "broker.rtt.p99" +const brokerRttAvg = "broker.rtt.avg" + +// InitLogFiles initials the log folder and file, if the folder or file +// doesn't exist, create one, otherwise open it directly +func InitLogFiles(filename string) { + path := filepath.Dir(filename) + if _, err := os.Stat(path); os.IsNotExist(err) { + os.Mkdir(path, os.ModePerm) + } + if err != nil { + log.Fatal(err) + } + + file, err := os.OpenFile(filename, os.O_APPEND|os.O_CREATE|os.O_WRONLY, + 0644) + if err != nil { + log.Fatal(err) + } + + InfoLogger = log.New(file, "INFO: ", log.Ldate|log.Ltime|log.Lshortfile) + WarningLogger = log.New(file, "WARNING: ", + log.Ldate|log.Ltime|log.Lshortfile) + ErrorLogger = log.New(file, "ERROR: ", log.Ldate|log.Ltime|log.Lshortfile) +} + +// rttStats extracts broker rtt statistics from the raw map, +// monitors broker avg and p99 using datadog +func rttStats(raw map[string]interface{}, topic string) error { + for _, broker := range raw["brokers"].(map[string]interface{}) { + var avg float64 + var p99 float64 + var nodeid int + + topparsValue := broker.(map[string]interface{})["toppars"] + if topparsValue == nil { + continue + } + + rttValue := broker.(map[string]interface{})["rtt"] + avg, ok := rttValue.(map[string]interface{})["avg"].(float64) + if !ok { + err := fmt.Errorf("failed to convert avg to float64 %v", ok) + return err + } + + p99, ok = rttValue.(map[string]interface{})["p99"].(float64) + if !ok { + err := fmt.Errorf("failed to convert p99 to float64 %v", ok) + return err + } + + nid, ok := broker.(map[string]interface{})["nodeid"].(float64) + if !ok { + err := fmt.Errorf("failed to convert nodeid to float64 %v", ok) + return err + } + nodeid = int(nid) + + tags := []string{fmt.Sprintf("broker:%d", nodeid), + fmt.Sprintf("type:%s", raw["type"])} + + DatadogGauge(brokerRttP99, p99/1000000.0, tags) + DatadogGauge(brokerRttAvg, avg/1000000.0, tags) + } + return nil +} + +// HandleStatsEvent converts Stats events to map +func HandleStatsEvent(e *kafka.Stats, topic string) error { + var raw map[string]interface{} + err := json.Unmarshal([]byte(e.String()), &raw) + if err != nil { + ErrorLogger.Printf("Json unmarshall error: %s\n", err) + return err + } + + err = rttStats(raw, topic) + if err != nil { + ErrorLogger.Printf("Failed to calculate broker rtt statistics: %s", + err) + return err + } + return nil +} + +// ProducerDeliveryCheck handles delivery report for producer +func ProducerDeliveryCheck(e *kafka.Message) { + tags := []string{fmt.Sprintf("topic:%s, partition:%d", + *e.TopicPartition.Topic, e.TopicPartition.Partition)} + if e.TopicPartition.Error != nil { + drErrCnt++ + DatadogIncrement(producerDrErr, 1, tags) + ErrorLogger.Printf("Delivery failed: %v\n", e.TopicPartition) + } else { + drCnt++ + DatadogIncrement(producerDr, 1, tags) + if drCnt%1000 == 0 { + InfoLogger.Printf("Delivered message to topic "+ + "%s [%d] at offset %d\n", *e.TopicPartition.Topic, + e.TopicPartition.Partition, e.TopicPartition.Offset) + } + } +} + +// PrintConsumerStatus prints the information for the consumer +func PrintConsumerStatus(consumer string) { + InfoLogger.Printf("%s: %d messages consumed, %d duplicates, "+ + "%d missed, %d message errors, %d consumer client-level errors\n", + consumer, consumerMsgCnt, msgDupCntForOutput, msgMissCntForOutput, + consumerMsgErrs, ConsumerErrCnt) +} + +// PrintProducerStatus prints the information for the producer +func PrintProducerStatus(producer string) { + InfoLogger.Printf("%s: %d messages produced, %d delivered, %d "+ + "failed to deliver, %d producer client-level errors\n", + producer, ProduceMsgCnt, drCnt, drErrCnt, ProducerErrCnt) +} + +// HandleMessage handles received messages, monitors latency and +// verifies messages +func HandleMessage(e *kafka.Message, + hwmarks map[string]uint64) bool { + tags := []string{fmt.Sprintf("topic:%s", *e.TopicPartition.Topic), + fmt.Sprintf("partition:%s", *e.TopicPartition.Topic+"_"+ + strconv.FormatInt(int64(e.TopicPartition.Partition), 10))} + if e.TopicPartition.Error != nil { + consumerMsgErrs++ + DatadogIncrement(consumerReceiveError, 1, tags) + ErrorLogger.Printf("Consumer received message on TopicPartition: %s "+ + "failed with error: %s\n", e.TopicPartition, + e.TopicPartition.Error) + return false + } + consumerMsgCnt++ + DatadogIncrement(consumerConsumeMsg, 1, tags) + if consumerMsgCnt%1000 == 0 { + InfoLogger.Printf("Consumer received message on TopicPartition: "+ + "%s, Headers: %s, values: %s\n", e.TopicPartition, + e.Headers, string(e.Value)) + } + + if e.Headers != nil { + for _, hdr := range e.Headers { + if hdr.Key == "time" { + var timestamp time.Time + timestamp.GobDecode(hdr.Value) + DatadogGauge(latency, + time.Now().Sub(timestamp).Seconds(), + tags) + } else if hdr.Key == "msgid" { + msgid := binary.LittleEndian.Uint64(hdr.Value) + verifyMessage(e, hwmarks, msgid) + } + } + } + return true +} + +// verifyMessage verifies if there isn't any duplicate or +// lost messages from consumer side +func verifyMessage(e *kafka.Message, + hwmarks map[string]uint64, msgid uint64) { + tags := []string{fmt.Sprintf("topic:%s", *e.TopicPartition.Topic), + fmt.Sprintf("partition:%d", e.TopicPartition.Partition)} + hwkey := fmt.Sprintf("%s--%d", + *e.TopicPartition.Topic, + e.TopicPartition.Partition) + hw := hwmarks[hwkey] + if hw > 0 { + if msgid <= hw { + ErrorLogger.Printf("Consumer: Old or duplicate message %s [%d] "+ + "at offset %d with msgid %d (headers %s): wanted msgid > %d\n", + *e.TopicPartition.Topic, e.TopicPartition.Partition, + e.TopicPartition.Offset, msgid, e.Headers, hw) + msgDupCntForOutput += (hw + 1) - msgid + DatadogIncrement(consumerConsumeDupMSG, 1, tags) + } else if msgid > hw+1 { + ErrorLogger.Printf("Consumer: Lost messages, now at %s [%d] at "+ + "offset %d with msgid %d (headers %s): expected msgid %d+1\n", + *e.TopicPartition.Topic, e.TopicPartition.Partition, + e.TopicPartition.Offset, msgid, e.Headers, hw) + msgMissCntForOutput += msgid - (hw + 1) + DatadogIncrement(consumerConsumeMissMSG, 1, tags) + } + } + hwmarks[hwkey] = msgid +} + +// ConvertUint64ToByteArray converts the unit64 type to []byte +func ConvertUint64ToByteArray(num uint64) []byte { + byteArray := make([]byte, 8) + binary.LittleEndian.PutUint64(byteArray, num) + return byteArray +}