diff --git a/minion/service.go b/minion/service.go index 7bc430b..7ed48ac 100644 --- a/minion/service.go +++ b/minion/service.go @@ -45,8 +45,11 @@ func NewService(cfg Config, logger *zap.Logger, kafkaSvc *kafka.Service, metrics minionHooks := newMinionClientHooks(logger.Named("kafka_hooks"), metricsNamespace) kgoOpts := []kgo.Opt{ kgo.WithHooks(minionHooks), - kgo.ConsumeTopics("__consumer_offsets"), - kgo.ConsumeResetOffset(kgo.NewOffset().AtStart()), + } + if cfg.ConsumerGroups.ScrapeMode == ConsumerGroupScrapeModeOffsetsTopic { + kgoOpts = append(kgoOpts, + kgo.ConsumeResetOffset(kgo.NewOffset().AtStart()), + kgo.ConsumeTopics("__consumer_offsets")) } client, err := kafkaSvc.CreateAndTestClient(ctx, logger, kgoOpts)