Skip to content

Commit

Permalink
Merge pull request #514 from twmb/kfake
Browse files Browse the repository at this point in the history
kfake: add SeedTopics, TLS options
  • Loading branch information
twmb authored Jul 12, 2023
2 parents f4d7ca4 + e55dd7f commit c1bb2be
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 6 deletions.
30 changes: 26 additions & 4 deletions pkg/kfake/cluster.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package kfake

import (
"crypto/tls"
"errors"
"fmt"
"math/rand"
Expand Down Expand Up @@ -144,7 +145,7 @@ func NewCluster(opts ...Opt) (c *Cluster, err error) {
if len(cfg.ports) > 0 {
port = cfg.ports[i]
}
ln, err := newListener(port)
ln, err := newListener(port, c.cfg.tls)
if err != nil {
c.Close()
return nil, err
Expand All @@ -160,6 +161,20 @@ func NewCluster(opts ...Opt) (c *Cluster, err error) {
}
c.controller = c.bs[len(c.bs)-1]
go c.run()

seedTopics := make(map[string]int32)
for _, sts := range cfg.seedTopics {
p := sts.p
if p < 1 {
p = int32(cfg.defaultNumParts)
}
for _, t := range sts.ts {
seedTopics[t] = p
}
}
for t, p := range seedTopics {
c.data.mkt(t, int(p), -1, nil)
}
return c, nil
}

Expand All @@ -185,8 +200,15 @@ func (c *Cluster) Close() {
}
}

func newListener(port int) (net.Listener, error) {
return net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", port))
func newListener(port int, tc *tls.Config) (net.Listener, error) {
l, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", port))
if err != nil {
return nil, err
}
if tc != nil {
l = tls.NewListener(l, tc)
}
return l, nil
}

func (b *broker) listen() {
Expand Down Expand Up @@ -491,7 +513,7 @@ func (c *Cluster) AddNode(nodeID int32, port int) (int32, int, error) {
port = 0
}
var ln net.Listener
if ln, err = newListener(port); err != nil {
if ln, err = newListener(port, c.cfg.tls); err != nil {
return
}
_, strPort, _ := net.SplitHostPort(ln.Addr().String())
Expand Down
31 changes: 29 additions & 2 deletions pkg/kfake/config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package kfake

import "time"
import (
"crypto/tls"
"time"
)

// Opt is an option to configure a client.
type Opt interface {
Expand All @@ -11,19 +14,26 @@ type opt struct{ fn func(*cfg) }

func (opt opt) apply(cfg *cfg) { opt.fn(cfg) }

type seedTopics struct {
p int32
ts []string
}

type cfg struct {
nbrokers int
ports []int
logger Logger
clusterID string
allowAutoTopic bool
defaultNumParts int
seedTopics []seedTopics

minSessionTimeout time.Duration
maxSessionTimeout time.Duration

enableSASL bool
sasls map[struct{ m, u string }]string // cleared after client initialization
tls *tls.Config
}

// NumBrokers sets the number of brokers to start in the fake cluster.
Expand Down Expand Up @@ -54,7 +64,8 @@ func AllowAutoTopicCreation() Opt {
}

// DefaultNumPartitions sets the number of partitions to create by default for
// auto created topics / CreateTopics with -1 partitions.
// auto created topics / CreateTopics with -1 partitions, overriding the
// default of 10.
func DefaultNumPartitions(n int) Opt {
return opt{func(cfg *cfg) { cfg.defaultNumParts = n }}
}
Expand Down Expand Up @@ -86,3 +97,19 @@ func EnableSASL() Opt {
func Superuser(method, user, pass string) Opt {
return opt{func(cfg *cfg) { cfg.sasls[struct{ m, u string }{method, user}] = pass }}
}

// TLS enables TLS for the cluster, using the provided TLS config for
// listening.
func TLS(c *tls.Config) Opt {
return opt{func(cfg *cfg) { cfg.tls = c }}
}

// SeedTopics provides topics to create by default in the cluster. Each topic
// will use the given partitions and use the default internal replication
// factor. If you use a non-positive number for partitions, [DefaultNumPartitions]
// is used. This option can be provided multiple times if you want to seed
// topics with different partition counts. If a topic is provided in multiple
// options, the last specification wins.
func SeedTopics(partitions int32, ts ...string) Opt {
return opt{func(cfg *cfg) { cfg.seedTopics = append(cfg.seedTopics, seedTopics{partitions, ts}) }}
}
1 change: 1 addition & 0 deletions pkg/kfake/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
func main() {
c, err := kfake.NewCluster(
kfake.Ports(9092, 9093, 9094),
kfake.SeedTopics(-1, "foo"),
)
if err != nil {
panic(err)
Expand Down

0 comments on commit c1bb2be

Please sign in to comment.