Skip to content

Commit

Permalink
Add timeout for consumer initialization
Browse files Browse the repository at this point in the history
  • Loading branch information
weeco committed Aug 16, 2021
1 parent 6f76df2 commit f6ab00e
Showing 1 changed file with 7 additions and 0 deletions.
7 changes: 7 additions & 0 deletions e2e/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,11 +168,15 @@ func (s *Service) Start(ctx context.Context) error {
// consumer is ready. Only if the consumer is ready we want to start the producer to ensure that we will not
// miss messages because the consumer wasn't ready.
initCh := make(chan bool)
s.logger.Info("initializing consumer and waiting until it has received the first messages")
go s.startConsumeMessages(ctx, initCh)

// Produce an init message until the consumer received at least one fetch
initTicker := time.NewTicker(500 * time.Millisecond)
isInitialized := false
timeoutCtx, timeoutCancel := context.WithTimeout(ctx, 60*time.Second)
defer timeoutCancel()

for !isInitialized {
select {
case <-initTicker.C:
Expand All @@ -185,6 +189,9 @@ func (s *Service) Start(ctx context.Context) error {
isInitialized = true
s.logger.Info("consumer has been successfully initialized")
break
case <-timeoutCtx.Done():
s.logger.Error("failed to initialize consumer successfully")
return fmt.Errorf("failed to initialize consumer within 30s")
case <-ctx.Done():
return nil
}
Expand Down

0 comments on commit f6ab00e

Please sign in to comment.