Skip to content

Commit

Permalink
testing: remove dependency on kcl
Browse files Browse the repository at this point in the history
We previously relied on kcl to create topics, delete topics, and delete
groups. There was no reason for that.

We now use kmsg directly, which simplifies the tests and removes some
cryptic depdendency. For now, we default to 127.0.0.1:9092 for seed
brokers, but that can be overridden with KGO_SEEDS. We may add cert/sasl
support later if necessary.
  • Loading branch information
twmb committed Nov 27, 2021
1 parent 8edf934 commit d0a27f3
Showing 1 changed file with 60 additions and 36 deletions.
96 changes: 60 additions & 36 deletions pkg/kgo/helpers_test.go
Original file line number Diff line number Diff line change
@@ -1,33 +1,41 @@
package kgo

import (
"context"
"crypto/sha256"
"encoding/hex"
"fmt"
"os"
"os/exec"
"regexp"
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/twmb/franz-go/pkg/kerr"
"github.com/twmb/franz-go/pkg/kmsg"
)

const testRecordLimit = 1000000

var loggerNum int64
var adm *Client

func testLogger() Logger {
num := atomic.AddInt64(&loggerNum, 1)
pfx := fmt.Sprintf("[%d] ", num)
return BasicLogger(os.Stderr, testLogLevel, func() string {
return pfx
})
func init() {
seeds := os.Getenv("KGO_SEEDS")
if seeds == "" {
seeds = "127.0.0.1:9092"
}
var err error
adm, err = NewClient(SeedBrokers(strings.Split(seeds, ",")...))
if err != nil {
panic(fmt.Sprintf("unable to create admin client: %v", err))
}
}

var loggerNum int64

var testLogLevel = func() LogLevel {
level := strings.ToLower(os.Getenv("KGO_LOG_LEVEL"))
switch level {
Expand All @@ -39,6 +47,14 @@ var testLogLevel = func() LogLevel {
return LogLevelInfo
}()

func testLogger() Logger {
num := atomic.AddInt64(&loggerNum, 1)
pfx := fmt.Sprintf("[%d] ", num)
return BasicLogger(os.Stderr, testLogLevel, func() string {
return pfx
})
}

var randsha = func() func() string {
var mu sync.Mutex
last := time.Now().UnixNano()
Expand All @@ -58,56 +74,64 @@ var randsha = func() func() string {
}
}()

var okRe = regexp.MustCompile(`\bOK\b`)

func tmpTopic(tb testing.TB) (string, func()) {
tb.Helper()
path, err := exec.LookPath("kcl")
if err != nil {
tb.Fatal("unable to find `kcl` in $PATH; cannot create temporary topics for tests")
}

topic := randsha()

cmd := exec.Command(path, "admin", "topic", "create", topic, "-p", "20")
output, err := cmd.CombinedOutput()
if err != nil {
tb.Fatalf("unable to run kcl topic create command: %v", err)
req := kmsg.NewPtrCreateTopicsRequest()
reqTopic := kmsg.NewCreateTopicsRequestTopic()
reqTopic.Topic = topic
reqTopic.NumPartitions = 20
reqTopic.ReplicationFactor = 3
req.Topics = append(req.Topics, reqTopic)

resp, err := req.RequestWith(context.Background(), adm)
if err == nil {
err = kerr.ErrorForCode(resp.Topics[0].ErrorCode)
}
if !okRe.Match(output) {
tb.Fatalf("topic create failed")
if err != nil {
tb.Fatalf("unable to create topic %q: %v", topic, err)
}

return topic, func() {
tb.Helper()

tb.Logf("deleting topic %s", topic)
cmd := exec.Command(path, "admin", "topic", "delete", topic)
output, err := cmd.CombinedOutput()
if err != nil {
tb.Fatalf("unable to run kcl topic delete command: %v", err)

req := kmsg.NewPtrDeleteTopicsRequest()
req.TopicNames = []string{topic}
reqTopic := kmsg.NewDeleteTopicsRequestTopic()
reqTopic.Topic = kmsg.StringPtr(topic)
req.Topics = append(req.Topics, reqTopic)

resp, err := req.RequestWith(context.Background(), adm)
if err == nil {
err = kerr.ErrorForCode(resp.Topics[0].ErrorCode)
}
if !okRe.Match(output) {
tb.Fatalf("topic delete failed")
if err != nil {
tb.Fatalf("unable to delete topic %q: %v", topic, err)
}
}
}

func tmpGroup(tb testing.TB) (string, func()) {
tb.Helper()
path, err := exec.LookPath("kcl")
if err != nil {
tb.Fatal("unable to find `kcl` in $PATH; cannot ensure a created group will be deleted")
}

group := randsha()

return group, func() {
tb.Helper()
tb.Logf("deleting group %s", group)
cmd := exec.Command(path, "admin", "group", "delete", group)
_, err := cmd.CombinedOutput()

req := kmsg.NewPtrDeleteGroupsRequest()
req.Groups = []string{group}
resp, err := req.RequestWith(context.Background(), adm)
if err == nil {
err = kerr.ErrorForCode(resp.Groups[0].ErrorCode)
}
if err != nil {
tb.Fatalf("unable to run kcl group delete command: %v", err)
tb.Fatalf("unable to delete group %q: %v", group, err)
}
}
}
Expand Down Expand Up @@ -305,13 +329,13 @@ out:
}

if len(allKeys) != testRecordLimit {
t.Fatalf("consumers %d: got %d keys != exp %d", level, len(allKeys), testRecordLimit)
t.Errorf("consumers %d: got %d keys != exp %d", level, len(allKeys), testRecordLimit)
}

sort.Ints(allKeys)
for i := 0; i < testRecordLimit; i++ {
if allKeys[i] != i {
t.Fatalf("consumers %d: got key %d != exp %d", level, allKeys[i], i)
t.Fatalf("consumers %d: got key %d != exp %d, first 100: %v", level, allKeys[i], i, allKeys[:100])
}
}
}
Expand Down

0 comments on commit d0a27f3

Please sign in to comment.