Skip to content

Commit e36eb71

Browse files
committed
each kminion instance uses its own consumer group for end-to-end now so the instances don't "steal" messages from each other
1 parent d5a116c commit e36eb71

File tree

6 files changed

+106
-41
lines changed

6 files changed

+106
-41
lines changed

e2e/config_consumer.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,15 @@ const (
1313
)
1414

1515
type EndToEndConsumerConfig struct {
16-
GroupId string `koanf:"groupId"`
16+
GroupIdPrefix string `koanf:"groupIdPrefix"`
1717
RebalancingProtocol string `koanf:"rebalancingProtocol"`
1818

1919
RoundtripSla time.Duration `koanf:"roundtripSla"`
2020
CommitSla time.Duration `koanf:"commitSla"`
2121
}
2222

2323
func (c *EndToEndConsumerConfig) SetDefaults() {
24-
c.GroupId = "kminion-end-to-end"
24+
c.GroupIdPrefix = "kminion-end-to-end"
2525
c.RebalancingProtocol = "cooperativeSticky"
2626
c.RoundtripSla = 20 * time.Second
2727
c.CommitSla = 10 * time.Second // no idea what to use as a good default value

e2e/consumer.go

+23-20
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77
"time"
88

99
"github.com/twmb/franz-go/pkg/kgo"
10-
"github.com/twmb/franz-go/pkg/kmsg"
1110
"go.uber.org/zap"
1211
)
1312

@@ -26,17 +25,17 @@ func (s *Service) ConsumeFromManagementTopic(ctx context.Context) error {
2625
}
2726
client.AssignPartitions(topic)
2827

29-
// todo: use minionID as part of group id
30-
//
31-
client.AssignGroup(s.config.Consumer.GroupId, kgo.GroupTopics(topicName), balancer, kgo.DisableAutoCommit())
32-
s.logger.Info("Starting to consume " + topicName)
28+
// Create a consumer group with the prefix
29+
groupId := fmt.Sprintf("%v-%v", s.config.Consumer.GroupIdPrefix, s.minionID)
30+
client.AssignGroup(groupId, kgo.GroupTopics(topicName), balancer, kgo.DisableAutoCommit())
31+
s.logger.Info("Starting to consume end-to-end", zap.String("topicName", topicName), zap.String("groupId", groupId))
3332

3433
for {
3534
select {
3635
case <-ctx.Done():
3736
return nil
3837
default:
39-
fetches := client.PollRecords(ctx, 10)
38+
fetches := client.PollFetches(ctx)
4039
errors := fetches.Errors()
4140
for _, err := range errors {
4241
// Log all errors and continue afterwards as we might get errors and still have some fetch results
@@ -66,24 +65,28 @@ func (s *Service) ConsumeFromManagementTopic(ctx context.Context) error {
6665
// Commit offsets for processed messages
6766
// todo: the normal way to commit offsets with franz-go is pretty good, but in our special case
6867
// we want to do it manually, seperately for each partition, so we can track how long it took
69-
if uncommittedOffset := client.UncommittedOffsets(); uncommittedOffset != nil {
7068

71-
startCommitTimestamp := timeNowMs()
69+
// todo: use findGroupCoordinatorID
70+
// maybe ask travis about return value, we want to know what coordinator the offsets was committed to
71+
// kminion probably already exposed coordinator for every group
7272

73-
client.CommitOffsets(ctx, uncommittedOffset, func(_ *kmsg.OffsetCommitRequest, _ *kmsg.OffsetCommitResponse, err error) {
74-
// got commit response
75-
if err != nil {
76-
s.logger.Error(fmt.Sprintf("record had an error on commit: %v\n", err))
77-
return
78-
}
73+
// if uncommittedOffset := client.UncommittedOffsets(); uncommittedOffset != nil {
7974

80-
latencyMs := timeNowMs() - startCommitTimestamp
81-
commitLatency := time.Duration(latencyMs * float64(time.Millisecond))
75+
// startCommitTimestamp := timeNowMs()
8276

83-
// todo: partitionID
84-
s.onOffsetCommit(0, commitLatency)
85-
})
86-
}
77+
// client.CommitOffsets(ctx, uncommittedOffset, func(_ *kmsg.OffsetCommitRequest, _ *kmsg.OffsetCommitResponse, err error) {
78+
// // got commit response
79+
// if err != nil {
80+
// s.logger.Error(fmt.Sprintf("record had an error on commit: %v\n", err))
81+
// return
82+
// }
83+
84+
// latencyMs := timeNowMs() - startCommitTimestamp
85+
// commitLatency := time.Duration(latencyMs * float64(time.Millisecond))
86+
87+
// s.onOffsetCommit(commitLatency)
88+
// })
89+
// }
8790
}
8891
}
8992

e2e/producer.go

+9-3
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,9 @@ import (
1111
)
1212

1313
type EndToEndMessage struct {
14-
MinionID string `json:"minionID"`
15-
Timestamp float64 `json:"timestamp"`
14+
MinionID string `json:"minionID"` // unique for each running kminion instance
15+
MessageID string `json:"messageID"` // unique for each message
16+
Timestamp float64 `json:"timestamp"` // when the message was created, unix milliseconds
1617
}
1718

1819
func (s *Service) produceToManagementTopic(ctx context.Context) error {
@@ -32,6 +33,8 @@ func (s *Service) produceToManagementTopic(ctx context.Context) error {
3233
startTime := timeNowMs()
3334
s.endToEndMessagesProduced.Inc()
3435

36+
s.logger.Info("producing message...", zap.Any("record", record))
37+
3538
err = s.client.Produce(ctx, record, func(r *kgo.Record, err error) {
3639
endTime := timeNowMs()
3740
ackDurationMs := endTime - startTime
@@ -56,8 +59,11 @@ func (s *Service) produceToManagementTopic(ctx context.Context) error {
5659
func createEndToEndRecord(topicName string, minionID string) (*kgo.Record, error) {
5760

5861
timestamp := timeNowMs()
62+
msgId := uuid.NewString()
63+
5964
message := EndToEndMessage{
6065
MinionID: minionID,
66+
MessageID: msgId,
6167
Timestamp: timestamp,
6268
}
6369
mjson, err := json.Marshal(message)
@@ -66,7 +72,7 @@ func createEndToEndRecord(topicName string, minionID string) (*kgo.Record, error
6672
}
6773
record := &kgo.Record{
6874
Topic: topicName,
69-
Key: []byte(uuid.NewString()),
75+
// Key: []byte(msgId),
7076
Value: []byte(mjson),
7177
}
7278

e2e/service.go

+13-13
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,9 @@ type Service struct {
3131
endToEndMessagesReceived prometheus.Counter
3232
endToEndMessagesCommitted prometheus.Counter
3333

34-
endToEndAckLatency prometheus.Histogram
35-
endToEndRoundtripLatency prometheus.Histogram
36-
endToEndCommitLatency prometheus.Histogram
34+
endToEndAckLatency *prometheus.HistogramVec
35+
endToEndRoundtripLatency *prometheus.HistogramVec
36+
endToEndCommitLatency *prometheus.HistogramVec
3737
}
3838

3939
// NewService creates a new instance of the e2e moinitoring service (wow)
@@ -61,14 +61,14 @@ func NewService(cfg Config, logger *zap.Logger, kafkaSvc *kafka.Service, metricN
6161
Help: help,
6262
})
6363
}
64-
makeHistogram := func(name string, maxLatency time.Duration, help string) prometheus.Histogram {
65-
return promauto.NewHistogram(prometheus.HistogramOpts{
64+
makeHistogramVec := func(name string, maxLatency time.Duration, labelNames []string, help string) *prometheus.HistogramVec {
65+
return promauto.NewHistogramVec(prometheus.HistogramOpts{
6666
Namespace: metricNamespace,
6767
Subsystem: "end_to_end",
6868
Name: name,
6969
Help: help,
7070
Buckets: createHistogramBuckets(maxLatency),
71-
})
71+
}, labelNames)
7272
}
7373

7474
// Low-level info
@@ -81,9 +81,9 @@ func NewService(cfg Config, logger *zap.Logger, kafkaSvc *kafka.Service, metricN
8181
// Latency Histograms
8282
// More detailed info about how long stuff took
8383
// Since histograms also have an 'infinite' bucket, they can be used to detect small hickups "lost" messages
84-
svc.endToEndAckLatency = makeHistogram("produce_latency_seconds", cfg.Producer.AckSla, "Time until we received an ack for a produced message")
85-
svc.endToEndRoundtripLatency = makeHistogram("roundtrip_latency_seconds", cfg.Consumer.RoundtripSla, "Time it took between sending (producing) and receiving (consuming) a message")
86-
svc.endToEndCommitLatency = makeHistogram("commit_latency_seconds", cfg.Consumer.CommitSla, "Time kafka took to respond to kminion's offset commit")
84+
svc.endToEndAckLatency = makeHistogramVec("produce_latency_seconds", cfg.Producer.AckSla, []string{"partitionId"}, "Time until we received an ack for a produced message")
85+
svc.endToEndRoundtripLatency = makeHistogramVec("roundtrip_latency_seconds", cfg.Consumer.RoundtripSla, []string{"partitionId"}, "Time it took between sending (producing) and receiving (consuming) a message")
86+
svc.endToEndCommitLatency = makeHistogramVec("commit_latency_seconds", cfg.Consumer.CommitSla, []string{"groupCoordinator"}, "Time kafka took to respond to kminion's offset commit")
8787

8888
return svc, nil
8989
}
@@ -124,7 +124,7 @@ func (s *Service) Start(ctx context.Context) error {
124124
// called from e2e when a message is acknowledged
125125
func (s *Service) onAck(partitionId int32, duration time.Duration) {
126126
s.endToEndMessagesAcked.Inc()
127-
s.endToEndAckLatency.Observe(duration.Seconds())
127+
s.endToEndAckLatency.WithLabelValues(string(partitionId)).Observe(duration.Seconds())
128128
}
129129

130130
// called from e2e when a message completes a roundtrip (send to kafka, receive msg from kafka again)
@@ -139,18 +139,18 @@ func (s *Service) onRoundtrip(partitionId int32, duration time.Duration) {
139139
// }
140140

141141
s.endToEndMessagesReceived.Inc()
142-
s.endToEndRoundtripLatency.Observe(duration.Seconds())
142+
s.endToEndRoundtripLatency.WithLabelValues(string(partitionId)).Observe(duration.Seconds())
143143
}
144144

145145
// called from e2e when an offset commit is confirmed
146-
func (s *Service) onOffsetCommit(partitionId int32, duration time.Duration) {
146+
func (s *Service) onOffsetCommit(duration time.Duration, groupCoordinator string) {
147147

148148
// todo:
149149
// if the commit took too long, don't count it in 'commits' but add it to the histogram?
150150
// and how do we want to handle cases where we get an error??
151151
// should we have another metric that tells us about failed commits? or a label on the counter?
152152

153-
s.endToEndCommitLatency.Observe(duration.Seconds())
153+
s.endToEndCommitLatency.WithLabelValues(groupCoordinator).Observe(duration.Seconds())
154154

155155
if duration > s.config.Consumer.CommitSla {
156156
return

e2e/topic.go

+54
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package e2e
33
import (
44
"context"
55
"fmt"
6+
"strings"
67
"time"
78

89
"github.com/twmb/franz-go/pkg/kmsg"
@@ -217,15 +218,59 @@ func (s *Service) getTopicMetadata(ctx context.Context) (*kmsg.MetadataResponse,
217218
return res, nil
218219
}
219220

221+
func (s *Service) checkAndDeleteOldConsumerGroups(ctx context.Context) error {
222+
var groupsRq kmsg.ListGroupsRequest
223+
groupsRq.Default()
224+
groupsRq.StatesFilter = []string{"Empty"}
225+
226+
s.logger.Info("checking for empty consumer groups with kminion prefix...")
227+
228+
shardedResponse := s.client.RequestSharded(ctx, &groupsRq)
229+
errorCount := 0
230+
231+
matchingGroups := make([]string, 0, 10)
232+
233+
for _, responseShard := range shardedResponse {
234+
if responseShard.Err != nil {
235+
errorCount++
236+
s.logger.Error("error in response to ListGroupsRequest", zap.Error(responseShard.Err))
237+
continue
238+
}
239+
240+
r, ok := responseShard.Resp.(*kmsg.ListGroupsResponse)
241+
if !ok {
242+
s.logger.Error("cannot cast responseShard.Resp to kmsg.ListGroupsResponse")
243+
errorCount++
244+
continue
245+
}
246+
247+
for _, group := range r.Groups {
248+
name := group.Group
249+
if strings.HasPrefix(name, s.config.Consumer.GroupIdPrefix) {
250+
matchingGroups = append(matchingGroups, name)
251+
}
252+
}
253+
}
254+
255+
s.logger.Info(fmt.Sprintf("found %v matching consumer groups", len(matchingGroups)))
256+
for i, name := range matchingGroups {
257+
s.logger.Info(fmt.Sprintf("consumerGroups %v: %v", i, name))
258+
}
259+
260+
return nil
261+
}
262+
220263
func (s *Service) initEndToEnd(ctx context.Context) {
221264

222265
validateTopicTicker := time.NewTicker(s.config.TopicManagement.ReconciliationInterval)
223266
produceTicker := time.NewTicker(s.config.ProbeInterval)
267+
deleteOldGroupsTicker := time.NewTicker(5 * time.Second)
224268
// stop tickers when context is cancelled
225269
go func() {
226270
<-ctx.Done()
227271
produceTicker.Stop()
228272
validateTopicTicker.Stop()
273+
deleteOldGroupsTicker.Stop()
229274
}()
230275

231276
// keep checking end-to-end topic
@@ -238,6 +283,15 @@ func (s *Service) initEndToEnd(ctx context.Context) {
238283
}
239284
}()
240285

286+
// look for old consumer groups and delete them
287+
go func() {
288+
for range deleteOldGroupsTicker.C {
289+
err := s.checkAndDeleteOldConsumerGroups(ctx)
290+
if err != nil {
291+
s.logger.Error("failed to check for old consumer groups: %w", zap.Error(err))
292+
}
293+
}
294+
}()
241295
// start consuming topic
242296
go s.ConsumeFromManagementTopic(ctx)
243297

main.go

+5-3
Original file line numberDiff line numberDiff line change
@@ -64,9 +64,11 @@ func main() {
6464
if err != nil {
6565
logger.Fatal("failed to setup minion service", zap.Error(err))
6666
}
67-
err = minionSvc.Start(ctx)
68-
if err != nil {
69-
logger.Fatal("failed to start minion service", zap.Error(err))
67+
if false {
68+
err = minionSvc.Start(ctx)
69+
if err != nil {
70+
logger.Fatal("failed to start minion service", zap.Error(err))
71+
}
7072
}
7173

7274
// Create end to end testing service

0 commit comments

Comments
 (0)