Skip to content

Commit

Permalink
Wait for a fully initialized consumer before starting the producer
Browse files Browse the repository at this point in the history
  • Loading branch information
weeco committed Aug 13, 2021
1 parent 326337f commit 7e882f2
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 30 deletions.
48 changes: 19 additions & 29 deletions e2e/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,34 +10,30 @@ import (
"go.uber.org/zap"
)

func (s *Service) startConsumeMessages(ctx context.Context) {
func (s *Service) startConsumeMessages(ctx context.Context, initializedCh chan<- bool) {
client := s.client
s.logger.Info("Starting to consume end-to-end topic",
zap.String("topic_name", s.config.TopicManagement.Name),
zap.String("group_id", s.groupId))

isInitialized := false
for {
select {
case <-ctx.Done():
return
default:
fetches := client.PollFetches(ctx)
receiveTimestamp := time.Now()

// Log all errors and continue afterwards as we might get errors and still have some fetch results
errors := fetches.Errors()
for _, err := range errors {
s.logger.Error("kafka fetch error",
zap.String("topic", err.Topic),
zap.Int32("partition", err.Partition),
zap.Error(err.Err))
}
fetches := client.PollFetches(ctx)
if !isInitialized {
initializedCh <- true
isInitialized = true
}

// Process messages
fetches.EachRecord(func(record *kgo.Record) {
s.processMessage(record, receiveTimestamp)
})
// Log all errors and continue afterwards as we might get errors and still have some fetch results
errors := fetches.Errors()
for _, err := range errors {
s.logger.Error("kafka fetch error",
zap.String("topic", err.Topic),
zap.Int32("partition", err.Partition),
zap.Error(err.Err))
}

fetches.EachRecord(s.processMessage)
}
}

Expand Down Expand Up @@ -69,24 +65,18 @@ func (s *Service) commitOffsets(ctx context.Context) {
// - deserializes the message
// - checks if it is from us, or from another kminion process running somewhere else
// - hands it off to the service, which then reports metrics on it
func (s *Service) processMessage(record *kgo.Record, receiveTimestamp time.Time) {
func (s *Service) processMessage(record *kgo.Record) {
var msg EndToEndMessage
if jerr := json.Unmarshal(record.Value, &msg); jerr != nil {
s.logger.Error("failed to unmarshal message value", zap.Error(jerr))
return // maybe older version
}

if msg.MinionID != s.minionID {
return // not from us
}

// restore partition, which was not serialized
// restore partition, which is not serialized
msg.partition = int(record.Partition)

created := msg.creationTime()
latency := receiveTimestamp.Sub(created)

s.onRoundtrip(record.Partition, latency)

// notify the tracker that the message arrived
s.messageTracker.onMessageArrived(&msg)
}
9 changes: 8 additions & 1 deletion e2e/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,14 @@ func (s *Service) Start(ctx context.Context) error {

// finally start everything else (producing, consuming, continous validation, consumer group tracking)
go s.startReconciliation(ctx)
go s.startConsumeMessages(ctx)

// Start consumer and wait until we've received a response for the first poll which would indicate that the
// consumer is ready. Only if the consumer is ready we want to start the producer to ensure that we will not
// miss messages because the consumer wasn't ready.
initCh := make(chan bool)
go s.startConsumeMessages(ctx, initCh)
<-initCh

go s.startProducer(ctx)

// keep track of groups, delete old unused groups
Expand Down

0 comments on commit 7e882f2

Please sign in to comment.