Skip to content

Commit

Permalink
Consume from newest offset
Browse files Browse the repository at this point in the history
  • Loading branch information
weeco committed Aug 13, 2021
1 parent f6b7b3f commit 326337f
Showing 1 changed file with 2 additions and 11 deletions.
13 changes: 2 additions & 11 deletions e2e/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ func NewService(ctx context.Context, cfg Config, logger *zap.Logger, kafkaSvc *k
kgo.ConsumerGroup(groupID),
kgo.ConsumeTopics(cfg.TopicManagement.Name),
kgo.Balancers(kgo.CooperativeStickyBalancer()),
kgo.DisableAutoCommit())
kgo.DisableAutoCommit(),
kgo.ConsumeResetOffset(kgo.NewOffset().AtEnd()))

// Prepare hooks
hooks := newEndToEndClientHooks(logger)
Expand Down Expand Up @@ -213,16 +214,6 @@ func (s *Service) startOffsetCommits(ctx context.Context) {

}

// called from e2e when a message completes a roundtrip (send to kafka, receive msg from kafka again)
func (s *Service) onRoundtrip(partitionId int32, duration time.Duration) {
if duration > s.config.Consumer.RoundtripSla {
return // message is too old
}

s.messagesReceived.Inc()
s.endToEndRoundtripLatency.WithLabelValues(fmt.Sprintf("%v", partitionId)).Observe(duration.Seconds())
}

// called from e2e when an offset commit is confirmed
func (s *Service) onOffsetCommit(brokerId int32, duration time.Duration) {

Expand Down

0 comments on commit 326337f

Please sign in to comment.