Skip to content

Commit 1349666

Browse files
committed
move end-to-end into its own package called e2e
1 parent cc9a4e9 commit 1349666

11 files changed

+242
-162
lines changed

minion/config_endtoend.go renamed to e2e/config.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,27 @@
1-
package minion
1+
package e2e
22

33
import (
44
"fmt"
55
"time"
66
)
77

8-
type EndToEndConfig struct {
8+
type Config struct {
99
Enabled bool `koanf:"enabled"`
1010
TopicManagement EndToEndTopicConfig `koanf:"topicManagement"`
1111
ProbeInterval time.Duration `koanf:"probeInterval"`
1212
Producer EndToEndProducerConfig `koanf:"producer"`
1313
Consumer EndToEndConsumerConfig `koanf:"consumer"`
1414
}
1515

16-
func (c *EndToEndConfig) SetDefaults() {
16+
func (c *Config) SetDefaults() {
1717
c.Enabled = false
1818
c.ProbeInterval = 2 * time.Second
1919
c.TopicManagement.SetDefaults()
2020
c.Producer.SetDefaults()
2121
c.Consumer.SetDefaults()
2222
}
2323

24-
func (c *EndToEndConfig) Validate() error {
24+
func (c *Config) Validate() error {
2525

2626
if !c.Enabled {
2727
return nil

minion/config_endtoend_consumer.go renamed to e2e/config_consumer.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package minion
1+
package e2e
22

33
import (
44
"fmt"

minion/config_endtoend_producer.go renamed to e2e/config_producer.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package minion
1+
package e2e
22

33
import (
44
"fmt"

minion/config_endtoend_topic.go renamed to e2e/config_topic.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package minion
1+
package e2e
22

33
import (
44
"fmt"

minion/endtoend_consumer.go renamed to e2e/consumer.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package minion
1+
package e2e
22

33
import (
44
"context"
@@ -13,10 +13,10 @@ import (
1313

1414
func (s *Service) ConsumeFromManagementTopic(ctx context.Context) error {
1515
client := s.kafkaSvc.Client
16-
topicName := s.Cfg.EndToEnd.TopicManagement.Name
16+
topicName := s.config.TopicManagement.Name
1717
topic := kgo.ConsumeTopics(kgo.NewOffset().AtEnd(), topicName)
1818
balancer := kgo.Balancers(kgo.CooperativeStickyBalancer()) // Default GroupBalancer
19-
switch s.Cfg.EndToEnd.Consumer.RebalancingProtocol {
19+
switch s.config.Consumer.RebalancingProtocol {
2020
case RoundRobin:
2121
balancer = kgo.Balancers(kgo.RoundRobinBalancer())
2222
case Range:
@@ -28,7 +28,7 @@ func (s *Service) ConsumeFromManagementTopic(ctx context.Context) error {
2828

2929
// todo: use minionID as part of group id
3030
//
31-
client.AssignGroup(s.Cfg.EndToEnd.Consumer.GroupId, kgo.GroupTopics(topicName), balancer, kgo.DisableAutoCommit())
31+
client.AssignGroup(s.config.Consumer.GroupId, kgo.GroupTopics(topicName), balancer, kgo.DisableAutoCommit())
3232
s.logger.Info("Starting to consume " + topicName)
3333

3434
for {

minion/endtoend_producer.go renamed to e2e/producer.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package minion
1+
package e2e
22

33
import (
44
"context"
@@ -17,7 +17,7 @@ type EndToEndMessage struct {
1717

1818
func (s *Service) produceToManagementTopic(ctx context.Context) error {
1919

20-
topicName := s.Cfg.EndToEnd.TopicManagement.Name
20+
topicName := s.config.TopicManagement.Name
2121

2222
record, err := createEndToEndRecord(topicName, s.minionID)
2323
if err != nil {

e2e/service.go

+134
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
package e2e
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"time"
7+
8+
"github.com/cloudhut/kminion/v2/kafka"
9+
"github.com/google/uuid"
10+
"github.com/prometheus/client_golang/prometheus"
11+
"github.com/prometheus/client_golang/prometheus/promauto"
12+
"github.com/twmb/franz-go/pkg/kgo"
13+
"go.uber.org/zap"
14+
)
15+
16+
type Service struct {
17+
// General
18+
config Config
19+
logger *zap.Logger
20+
21+
kafkaSvc *kafka.Service // creates kafka client for us
22+
client *kgo.Client
23+
24+
// Service
25+
minionID string // unique identifier, reported in metrics, in case multiple instances run at the same time
26+
lastRoundtripTimestamp float64 // creation time (in utc ms) of the message that most recently passed the roundtripSla check
27+
28+
// Metrics
29+
endToEndMessagesProduced prometheus.Counter
30+
endToEndMessagesAcked prometheus.Counter
31+
endToEndMessagesReceived prometheus.Counter
32+
endToEndMessagesCommitted prometheus.Counter
33+
34+
endToEndAckLatency prometheus.Histogram
35+
endToEndRoundtripLatency prometheus.Histogram
36+
endToEndCommitLatency prometheus.Histogram
37+
}
38+
39+
// NewService creates a new instance of the e2e moinitoring service (wow)
40+
func NewService(cfg Config, logger *zap.Logger, kafkaSvc *kafka.Service, metricNamespace string) (*Service, error) {
41+
42+
svc := &Service{
43+
config: cfg,
44+
logger: logger,
45+
kafkaSvc: kafkaSvc,
46+
client: nil,
47+
48+
minionID: uuid.NewString(),
49+
}
50+
51+
makeCounter := func(name string, help string) prometheus.Counter {
52+
return promauto.NewCounter(prometheus.CounterOpts{
53+
Namespace: metricNamespace,
54+
Subsystem: "end_to_end",
55+
Name: name,
56+
Help: help,
57+
})
58+
}
59+
makeHistogram := func(name string, maxLatency time.Duration, help string) prometheus.Histogram {
60+
return promauto.NewHistogram(prometheus.HistogramOpts{
61+
Namespace: metricNamespace,
62+
Subsystem: "end_to_end",
63+
Name: name,
64+
Help: help,
65+
Buckets: createHistogramBuckets(maxLatency),
66+
})
67+
}
68+
69+
// Low-level info
70+
// Users can construct alerts like "can't produce messages" themselves from those
71+
svc.endToEndMessagesProduced = makeCounter("messages_produced_total", "Number of messages that kminion's end-to-end test has tried to send to kafka")
72+
svc.endToEndMessagesAcked = makeCounter("messages_acked_total", "Number of messages kafka acknowledged as produced")
73+
svc.endToEndMessagesReceived = makeCounter("messages_received_total", "Number of *matching* messages kminion received. Every roundtrip message has a minionID (randomly generated on startup) and a timestamp. Kminion only considers a message a match if it it arrives within the configured roundtrip SLA (and it matches the minionID)")
74+
svc.endToEndMessagesCommitted = makeCounter("messages_committed_total", "Number of *matching* messages kminion successfully commited as read/processed. See 'messages_received' for what 'matching' means. Kminion will commit late/mismatching messages to kafka as well, but those won't be counted in this metric.")
75+
76+
// Latency Histograms
77+
// More detailed info about how long stuff took
78+
// Since histograms also have an 'infinite' bucket, they can be used to detect small hickups "lost" messages
79+
svc.endToEndAckLatency = makeHistogram("produce_latency_seconds", cfg.Producer.AckSla, "Time until we received an ack for a produced message")
80+
svc.endToEndRoundtripLatency = makeHistogram("roundtrip_latency_seconds", cfg.Consumer.RoundtripSla, "Time it took between sending (producing) and receiving (consuming) a message")
81+
svc.endToEndCommitLatency = makeHistogram("commit_latency_seconds", cfg.Consumer.CommitSla, "Time kafka took to respond to kminion's offset commit")
82+
83+
return svc, nil
84+
}
85+
86+
// Start starts the service (wow)
87+
func (s *Service) Start(ctx context.Context) error {
88+
89+
if err := s.validateManagementTopic(ctx); err != nil {
90+
return fmt.Errorf("could not validate end-to-end topic: %w", err)
91+
}
92+
93+
go s.initEndToEnd(ctx)
94+
95+
return nil
96+
}
97+
98+
// called from e2e when a message is acknowledged
99+
func (s *Service) onAck(partitionId int32, duration time.Duration) {
100+
s.endToEndMessagesAcked.Inc()
101+
s.endToEndAckLatency.Observe(duration.Seconds())
102+
}
103+
104+
// called from e2e when a message completes a roundtrip (send to kafka, receive msg from kafka again)
105+
func (s *Service) onRoundtrip(partitionId int32, duration time.Duration) {
106+
if duration > s.config.Consumer.RoundtripSla {
107+
return // message is too old
108+
}
109+
110+
// todo: track "lastRoundtripMessage"
111+
// if msg.Timestamp < s.lastRoundtripTimestamp {
112+
// return // msg older than what we recently processed (out of order, should never happen)
113+
// }
114+
115+
s.endToEndMessagesReceived.Inc()
116+
s.endToEndRoundtripLatency.Observe(duration.Seconds())
117+
}
118+
119+
// called from e2e when an offset commit is confirmed
120+
func (s *Service) onOffsetCommit(partitionId int32, duration time.Duration) {
121+
122+
// todo:
123+
// if the commit took too long, don't count it in 'commits' but add it to the histogram?
124+
// and how do we want to handle cases where we get an error??
125+
// should we have another metric that tells us about failed commits? or a label on the counter?
126+
127+
s.endToEndCommitLatency.Observe(duration.Seconds())
128+
129+
if duration > s.config.Consumer.CommitSla {
130+
return
131+
}
132+
133+
s.endToEndMessagesCommitted.Inc()
134+
}

minion/endtoend_topic.go renamed to e2e/topic.go

+41-32
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package minion
1+
package e2e
22

33
import (
44
"context"
@@ -11,8 +11,10 @@ import (
1111

1212
func (s *Service) validateManagementTopic(ctx context.Context) error {
1313

14-
expectedReplicationFactor := s.Cfg.EndToEnd.TopicManagement.ReplicationFactor
15-
expectedNumPartitionsPerBroker := s.Cfg.EndToEnd.TopicManagement.PartitionsPerBroker
14+
s.logger.Info("validating end-to-end topic...")
15+
16+
expectedReplicationFactor := s.config.TopicManagement.ReplicationFactor
17+
expectedNumPartitionsPerBroker := s.config.TopicManagement.PartitionsPerBroker
1618
topicMetadata, err := s.getTopicMetadata(ctx)
1719
if err != nil {
1820
return err
@@ -43,7 +45,7 @@ func (s *Service) validateManagementTopic(ctx context.Context) error {
4345
// topicMetadata.Brokers will return all the available brokers from the cluster
4446
isNumBrokerValid := len(topicMetadata.Brokers) >= expectedReplicationFactor
4547
if !isNumBrokerValid {
46-
return fmt.Errorf("current cluster size differs from the expected size. expected broker: %v NumOfBroker: %v", len(topicMetadata.Brokers), expectedReplicationFactor)
48+
return fmt.Errorf("current cluster size differs from the expected size (based on config topicManagement.replicationFactor). expected broker: %v NumOfBroker: %v", len(topicMetadata.Brokers), expectedReplicationFactor)
4749
}
4850

4951
// Check the number of Partition per broker, if it is too low create partition
@@ -55,7 +57,7 @@ func (s *Service) validateManagementTopic(ctx context.Context) error {
5557
assignment.Replicas = topicMetadata.Topics[0].Partitions[0].Replicas
5658

5759
topic := kmsg.NewCreatePartitionsRequestTopic()
58-
topic.Topic = s.Cfg.EndToEnd.TopicManagement.Name
60+
topic.Topic = s.config.TopicManagement.Name
5961
topic.Count = int32(expectedNumPartitionsPerBroker) // Should be greater than current partition number
6062
topic.Assignment = []kmsg.CreatePartitionsRequestTopicAssignment{assignment}
6163

@@ -86,17 +88,17 @@ func (s *Service) validateManagementTopic(ctx context.Context) error {
8688
}
8789
}
8890
}
89-
assignmentInvalid := len(distinctLeaderNodes) != s.Cfg.EndToEnd.TopicManagement.ReplicationFactor
91+
assignmentInvalid := len(distinctLeaderNodes) != s.config.TopicManagement.ReplicationFactor
9092
// Reassign Partitions on invalid assignment
9193
if assignmentInvalid {
9294
// Get the new AssignedReplicas by checking the ReplicationFactor config
93-
assignedReplicas := make([]int32, s.Cfg.EndToEnd.TopicManagement.ReplicationFactor)
95+
assignedReplicas := make([]int32, s.config.TopicManagement.ReplicationFactor)
9496
for index := range assignedReplicas {
9597
assignedReplicas[index] = int32(index)
9698
}
9799

98100
// Generate the partition assignments from PartitionPerBroker config
99-
partitions := make([]int32, s.Cfg.EndToEnd.TopicManagement.PartitionsPerBroker)
101+
partitions := make([]int32, s.config.TopicManagement.PartitionsPerBroker)
100102
reassignedPartitions := []kmsg.AlterPartitionAssignmentsRequestTopicPartition{}
101103
for index := range partitions {
102104
rp := kmsg.NewAlterPartitionAssignmentsRequestTopicPartition()
@@ -106,7 +108,7 @@ func (s *Service) validateManagementTopic(ctx context.Context) error {
106108
}
107109

108110
managamentTopicReassignment := kmsg.NewAlterPartitionAssignmentsRequestTopic()
109-
managamentTopicReassignment.Topic = s.Cfg.EndToEnd.TopicManagement.Name
111+
managamentTopicReassignment.Topic = s.config.TopicManagement.Name
110112
managamentTopicReassignment.Partitions = reassignedPartitions
111113

112114
reassignment := kmsg.NewAlterPartitionAssignmentsRequest()
@@ -153,9 +155,9 @@ func createTopicConfig(cfgTopic EndToEndTopicConfig) []kmsg.CreateTopicsRequestT
153155

154156
func (s *Service) createManagementTopic(ctx context.Context, topicMetadata *kmsg.MetadataResponse) error {
155157

156-
s.logger.Info(fmt.Sprintf("creating topic %s for EndToEnd metrics", s.Cfg.EndToEnd.TopicManagement.Name))
158+
s.logger.Info(fmt.Sprintf("creating topic %s for EndToEnd metrics", s.config.TopicManagement.Name))
157159

158-
cfgTopic := s.Cfg.EndToEnd.TopicManagement
160+
cfgTopic := s.config.TopicManagement
159161
topicConfigs := createTopicConfig(cfgTopic)
160162

161163
topic := kmsg.NewCreateTopicsRequestTopic()
@@ -200,7 +202,7 @@ func (s *Service) createManagementTopic(ctx context.Context, topicMetadata *kmsg
200202

201203
func (s *Service) getTopicMetadata(ctx context.Context) (*kmsg.MetadataResponse, error) {
202204

203-
cfg := s.Cfg.EndToEnd.TopicManagement
205+
cfg := s.config.TopicManagement
204206
topicReq := kmsg.NewMetadataRequestTopic()
205207
topicReq.Topic = &cfg.Name
206208

@@ -217,31 +219,38 @@ func (s *Service) getTopicMetadata(ctx context.Context) (*kmsg.MetadataResponse,
217219

218220
func (s *Service) initEndToEnd(ctx context.Context) {
219221

220-
reconciliationInterval := s.Cfg.EndToEnd.TopicManagement.ReconciliationInterval
221-
c1 := make(chan error, 1)
222+
validateTopicTicker := time.NewTicker(s.config.TopicManagement.ReconciliationInterval)
223+
produceTicker := time.NewTicker(s.config.ProbeInterval)
224+
// stop tickers when context is cancelled
225+
go func() {
226+
<-ctx.Done()
227+
produceTicker.Stop()
228+
validateTopicTicker.Stop()
229+
}()
222230

223-
// Run long running function on validating or reconciling that might be timeout
231+
// keep checking end-to-end topic
224232
go func() {
225-
err := s.validateManagementTopic(ctx)
226-
c1 <- err
233+
for range validateTopicTicker.C {
234+
err := s.validateManagementTopic(ctx)
235+
if err != nil {
236+
s.logger.Error("failed to validate end-to-end topic: %w", zap.Error(err))
237+
}
238+
}
227239
}()
228240

229-
// Listen on our channel AND a timeout channel - which ever happens first.
230-
select {
231-
case err := <-c1:
232-
s.logger.Warn("failed to validate management topic for endtoend metrics", zap.Error(err))
233-
return
234-
case <-time.After(reconciliationInterval):
235-
s.logger.Warn("time exceeded while validating/reconciling management topic of endtoend metrics")
236-
return
237-
default:
238-
go s.ConsumeFromManagementTopic(ctx)
239-
240-
t := time.NewTicker(s.Cfg.EndToEnd.ProbeInterval)
241-
for range t.C {
242-
s.produceToManagementTopic(ctx)
241+
// start consuming topic
242+
go s.ConsumeFromManagementTopic(ctx)
243+
244+
// start producing to topic
245+
go func() {
246+
for range produceTicker.C {
247+
err := s.produceToManagementTopic(ctx)
248+
if err != nil {
249+
s.logger.Error("failed to produce to end-to-end topic: %w", zap.Error(err))
250+
}
243251
}
244-
}
252+
}()
253+
245254
}
246255

247256
func timeNowMs() float64 {

0 commit comments

Comments
 (0)