Skip to content

Commit b4a08c5

Browse files
committed
more refactoring, seperate client hooks for each package
1 parent 1349666 commit b4a08c5

18 files changed

+232
-109
lines changed

e2e/client_hooks.go

+93
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
package e2e
2+
3+
import (
4+
"net"
5+
"time"
6+
7+
"github.com/prometheus/client_golang/prometheus"
8+
"github.com/prometheus/client_golang/prometheus/promauto"
9+
"github.com/twmb/franz-go/pkg/kgo"
10+
"go.uber.org/zap"
11+
)
12+
13+
// in e2e we only use client hooks for logging connect/disconnect messages
14+
type clientHooks struct {
15+
logger *zap.Logger
16+
17+
requestSentCount prometheus.Counter
18+
bytesSent prometheus.Counter
19+
20+
requestsReceivedCount prometheus.Counter
21+
bytesReceived prometheus.Counter
22+
}
23+
24+
func newEndToEndClientHooks(logger *zap.Logger, metricsNamespace string) *clientHooks {
25+
requestSentCount := promauto.NewCounter(prometheus.CounterOpts{
26+
Namespace: metricsNamespace,
27+
Subsystem: "kafka",
28+
Name: "requests_sent_total"})
29+
bytesSent := promauto.NewCounter(prometheus.CounterOpts{
30+
Namespace: metricsNamespace,
31+
Subsystem: "kafka",
32+
Name: "sent_bytes",
33+
})
34+
35+
requestsReceivedCount := promauto.NewCounter(prometheus.CounterOpts{
36+
Namespace: metricsNamespace,
37+
Subsystem: "kafka",
38+
Name: "requests_received_total"})
39+
bytesReceived := promauto.NewCounter(prometheus.CounterOpts{
40+
Namespace: metricsNamespace,
41+
Subsystem: "kafka",
42+
Name: "received_bytes",
43+
})
44+
45+
return &clientHooks{
46+
logger: logger,
47+
48+
requestSentCount: requestSentCount,
49+
bytesSent: bytesSent,
50+
51+
requestsReceivedCount: requestsReceivedCount,
52+
bytesReceived: bytesReceived,
53+
}
54+
}
55+
56+
func (c clientHooks) OnConnect(meta kgo.BrokerMetadata, dialDur time.Duration, _ net.Conn, err error) {
57+
if err != nil {
58+
c.logger.Debug("kafka connection failed", zap.String("broker_host", meta.Host), zap.Error(err))
59+
return
60+
}
61+
c.logger.Debug("kafka connection succeeded",
62+
zap.String("host", meta.Host),
63+
zap.Duration("dial_duration", dialDur))
64+
}
65+
66+
func (c clientHooks) OnDisconnect(meta kgo.BrokerMetadata, _ net.Conn) {
67+
c.logger.Debug("kafka broker disconnected",
68+
zap.String("host", meta.Host))
69+
}
70+
71+
// OnRead is passed the broker metadata, the key for the response that
72+
// was read, the number of bytes read, how long the Client waited
73+
// before reading the response, how long it took to read the response,
74+
// and any error.
75+
//
76+
// The bytes written does not count any tls overhead.
77+
// OnRead is called after a read from a broker.
78+
func (c clientHooks) OnRead(_ kgo.BrokerMetadata, _ int16, bytesRead int, _, _ time.Duration, _ error) {
79+
c.requestsReceivedCount.Inc()
80+
c.bytesReceived.Add(float64(bytesRead))
81+
}
82+
83+
// OnWrite is passed the broker metadata, the key for the request that
84+
// was written, the number of bytes written, how long the request
85+
// waited before being written, how long it took to write the request,
86+
// and any error.
87+
//
88+
// The bytes written does not count any tls overhead.
89+
// OnWrite is called after a write to a broker.
90+
func (c clientHooks) OnWrite(_ kgo.BrokerMetadata, _ int16, bytesWritten int, _, _ time.Duration, _ error) {
91+
c.requestSentCount.Inc()
92+
c.bytesSent.Add(float64(bytesWritten))
93+
}

e2e/consumer.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import (
1212
)
1313

1414
func (s *Service) ConsumeFromManagementTopic(ctx context.Context) error {
15-
client := s.kafkaSvc.Client
15+
client := s.client
1616
topicName := s.config.TopicManagement.Name
1717
topic := kgo.ConsumeTopics(kgo.NewOffset().AtEnd(), topicName)
1818
balancer := kgo.Balancers(kgo.CooperativeStickyBalancer()) // Default GroupBalancer

e2e/producer.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,11 @@ package e2e
33
import (
44
"context"
55
"encoding/json"
6-
"fmt"
76
"time"
87

98
"github.com/google/uuid"
109
"github.com/twmb/franz-go/pkg/kgo"
10+
"go.uber.org/zap"
1111
)
1212

1313
type EndToEndMessage struct {
@@ -32,13 +32,13 @@ func (s *Service) produceToManagementTopic(ctx context.Context) error {
3232
startTime := timeNowMs()
3333
s.endToEndMessagesProduced.Inc()
3434

35-
err = s.kafkaSvc.Client.Produce(ctx, record, func(r *kgo.Record, err error) {
35+
err = s.client.Produce(ctx, record, func(r *kgo.Record, err error) {
3636
endTime := timeNowMs()
3737
ackDurationMs := endTime - startTime
3838
ackDuration := time.Duration(ackDurationMs) * time.Millisecond
3939

4040
if err != nil {
41-
fmt.Printf("record had a produce error: %v\n", err)
41+
s.logger.Error("error producing record: %w", zap.Error(err))
4242
} else {
4343
s.onAck(r.Partition, ackDuration)
4444
}

e2e/service.go

+30-3
Original file line numberDiff line numberDiff line change
@@ -37,13 +37,18 @@ type Service struct {
3737
}
3838

3939
// NewService creates a new instance of the e2e moinitoring service (wow)
40-
func NewService(cfg Config, logger *zap.Logger, kafkaSvc *kafka.Service, metricNamespace string) (*Service, error) {
40+
func NewService(cfg Config, logger *zap.Logger, kafkaSvc *kafka.Service, metricNamespace string, ctx context.Context) (*Service, error) {
41+
42+
client, err := createKafkaClient(cfg, logger, kafkaSvc, ctx)
43+
if err != nil {
44+
return nil, fmt.Errorf("failed to create kafka client for e2e: %w", err)
45+
}
4146

4247
svc := &Service{
4348
config: cfg,
44-
logger: logger,
49+
logger: logger.With(zap.String("source", "end_to_end")),
4550
kafkaSvc: kafkaSvc,
46-
client: nil,
51+
client: client,
4752

4853
minionID: uuid.NewString(),
4954
}
@@ -83,6 +88,28 @@ func NewService(cfg Config, logger *zap.Logger, kafkaSvc *kafka.Service, metricN
8388
return svc, nil
8489
}
8590

91+
func createKafkaClient(cfg Config, logger *zap.Logger, kafkaSvc *kafka.Service, ctx context.Context) (*kgo.Client, error) {
92+
93+
// Add RequiredAcks, as options can't be altered later
94+
kgoOpts := []kgo.Opt{}
95+
if cfg.Enabled {
96+
ack := kgo.AllISRAcks()
97+
if cfg.Producer.RequiredAcks == 1 {
98+
ack = kgo.LeaderAck()
99+
kgoOpts = append(kgoOpts, kgo.DisableIdempotentWrite())
100+
}
101+
kgoOpts = append(kgoOpts, kgo.RequiredAcks(ack))
102+
}
103+
104+
// Prepare hooks
105+
hooksChildLogger := logger.With(zap.String("source", "end_to_end"))
106+
e2eHooks := newEndToEndClientHooks(hooksChildLogger, "")
107+
kgoOpts = append(kgoOpts, kgo.WithHooks(e2eHooks))
108+
109+
// Create kafka service and check if client can successfully connect to Kafka cluster
110+
return kafkaSvc.CreateAndTestClient(logger, kgoOpts, ctx)
111+
}
112+
86113
// Start starts the service (wow)
87114
func (s *Service) Start(ctx context.Context) error {
88115

e2e/topic.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ func (s *Service) validateManagementTopic(ctx context.Context) error {
6363

6464
create := kmsg.NewCreatePartitionsRequest()
6565
create.Topics = []kmsg.CreatePartitionsRequestTopic{topic}
66-
_, err := create.RequestWith(ctx, s.kafkaSvc.Client)
66+
_, err := create.RequestWith(ctx, s.client)
6767
if err != nil {
6868
return fmt.Errorf("failed to do kmsg request on creating partitions: %w", err)
6969
}
@@ -114,7 +114,7 @@ func (s *Service) validateManagementTopic(ctx context.Context) error {
114114
reassignment := kmsg.NewAlterPartitionAssignmentsRequest()
115115
reassignment.Topics = []kmsg.AlterPartitionAssignmentsRequestTopic{managamentTopicReassignment}
116116

117-
_, err := reassignment.RequestWith(ctx, s.kafkaSvc.Client)
117+
_, err := reassignment.RequestWith(ctx, s.client)
118118
if err != nil {
119119
return fmt.Errorf("failed to do kmsg request on topic reassignment: %w", err)
120120
}
@@ -187,7 +187,7 @@ func (s *Service) createManagementTopic(ctx context.Context, topicMetadata *kmsg
187187
req := kmsg.NewCreateTopicsRequest()
188188
req.Topics = []kmsg.CreateTopicsRequestTopic{topic}
189189

190-
res, err := req.RequestWith(ctx, s.kafkaSvc.Client)
190+
res, err := req.RequestWith(ctx, s.client)
191191
// Sometimes it won't throw Error, but the Error will be abstracted to res.Topics[0].ErrorMessage
192192
if res.Topics[0].ErrorMessage != nil {
193193
return fmt.Errorf("failed to create topic: %s", *res.Topics[0].ErrorMessage)
@@ -209,7 +209,7 @@ func (s *Service) getTopicMetadata(ctx context.Context) (*kmsg.MetadataResponse,
209209
req := kmsg.NewMetadataRequest()
210210
req.Topics = []kmsg.MetadataRequestTopic{topicReq}
211211

212-
res, err := req.RequestWith(ctx, s.kafkaSvc.Client)
212+
res, err := req.RequestWith(ctx, s.client)
213213
if err != nil {
214214
return nil, fmt.Errorf("failed to request metadata: %w", err)
215215
}

kafka/client_config_helper.go

+3-5
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@ import (
2424

2525
// NewKgoConfig creates a new Config for the Kafka Client as exposed by the franz-go library.
2626
// If TLS certificates can't be read an error will be returned.
27-
func NewKgoConfig(cfg Config, logger *zap.Logger, hooks kgo.Hook) ([]kgo.Opt, error) {
27+
// logger is only used to print warnings about TLS.
28+
func NewKgoConfig(cfg Config, logger *zap.Logger) ([]kgo.Opt, error) {
2829
opts := []kgo.Opt{
2930
kgo.SeedBrokers(cfg.Brokers...),
3031
kgo.MaxVersions(kversion.V2_7_0()),
@@ -35,13 +36,10 @@ func NewKgoConfig(cfg Config, logger *zap.Logger, hooks kgo.Hook) ([]kgo.Opt, er
3536

3637
// Create Logger
3738
kgoLogger := KgoZapLogger{
38-
logger: logger.With(zap.String("source", "kafka_client")).Sugar(),
39+
logger: logger.Sugar(),
3940
}
4041
opts = append(opts, kgo.WithLogger(kgoLogger))
4142

42-
// Attach hooks
43-
opts = append(opts, kgo.WithHooks(hooks))
44-
4543
// Add Rack Awareness if configured
4644
if cfg.RackID != "" {
4745
opts = append(opts, kgo.Rack(cfg.RackID))

kafka/service.go

+29-19
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"fmt"
66
"strings"
7+
"time"
78

89
"github.com/twmb/franz-go/pkg/kerr"
910
"github.com/twmb/franz-go/pkg/kgo"
@@ -14,52 +15,61 @@ import (
1415

1516
type Service struct {
1617
cfg Config
17-
Client *kgo.Client
1818
logger *zap.Logger
1919
}
2020

21-
func NewService(cfg Config, logger *zap.Logger, opts []kgo.Opt) (*Service, error) {
22-
// Create Kafka Client
23-
hooksChildLogger := logger.With(zap.String("source", "kafka_client_hooks"))
24-
clientHooks := newClientHooks(hooksChildLogger, "")
25-
26-
kgoOpts, err := NewKgoConfig(cfg, logger, clientHooks)
27-
for _, opt := range opts {
28-
kgoOpts = append(kgoOpts, opt)
21+
func NewService(cfg Config, logger *zap.Logger) *Service {
22+
return &Service{
23+
cfg: cfg,
24+
logger: logger.With(zap.String("source", "kafka_service")),
2925
}
26+
}
27+
28+
// Create a client with the services default settings
29+
// logger: will be used to log connections, errors, warnings about tls config, ...
30+
func (s *Service) CreateAndTestClient(logger *zap.Logger, opts []kgo.Opt, ctx context.Context) (*kgo.Client, error) {
31+
// Config with default options
32+
kgoOpts, err := NewKgoConfig(s.cfg, logger)
3033
if err != nil {
3134
return nil, fmt.Errorf("failed to create a valid kafka Client config: %w", err)
3235
}
36+
// Append user (the service calling this method) provided options
37+
kgoOpts = append(kgoOpts, opts...)
3338

34-
kafkaClient, err := kgo.NewClient(kgoOpts...)
39+
// Create kafka client
40+
client, err := kgo.NewClient(kgoOpts...)
3541
if err != nil {
3642
return nil, fmt.Errorf("failed to create kafka Client: %w", err)
3743
}
3844

39-
return &Service{
40-
cfg: cfg,
41-
Client: kafkaClient,
42-
logger: logger,
43-
}, nil
45+
// Test connection
46+
connectCtx, cancel := context.WithTimeout(ctx, 15*time.Second)
47+
defer cancel()
48+
err = s.testConnection(client, connectCtx)
49+
if err != nil {
50+
logger.Fatal("failed to test connectivity to Kafka cluster", zap.Error(err))
51+
}
52+
53+
return client, nil
4454
}
4555

46-
// TestConnection tries to fetch Broker metadata and prints some information if connection succeeds. An error will be
56+
// testConnection tries to fetch Broker metadata and prints some information if connection succeeds. An error will be
4757
// returned if connecting fails.
48-
func (s *Service) TestConnection(ctx context.Context) error {
58+
func (s *Service) testConnection(client *kgo.Client, ctx context.Context) error {
4959
s.logger.Info("connecting to Kafka seed brokers, trying to fetch cluster metadata",
5060
zap.String("seed_brokers", strings.Join(s.cfg.Brokers, ",")))
5161

5262
req := kmsg.MetadataRequest{
5363
Topics: nil,
5464
}
55-
res, err := req.RequestWith(ctx, s.Client)
65+
res, err := req.RequestWith(ctx, client)
5666
if err != nil {
5767
return fmt.Errorf("failed to request metadata: %w", err)
5868
}
5969

6070
// Request versions in order to guess Kafka Cluster version
6171
versionsReq := kmsg.NewApiVersionsRequest()
62-
versionsRes, err := versionsReq.RequestWith(ctx, s.Client)
72+
versionsRes, err := versionsReq.RequestWith(ctx, client)
6373
if err != nil {
6474
return fmt.Errorf("failed to request api versions: %w", err)
6575
}

0 commit comments

Comments
 (0)