From f6ab00e2b0dc2e95582b0cdb65ee94bf5a145739 Mon Sep 17 00:00:00 2001 From: Martin Schneppenheim Date: Mon, 16 Aug 2021 11:04:58 +0200 Subject: [PATCH] Add timeout for consumer initialization --- e2e/service.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/e2e/service.go b/e2e/service.go index f9e32bc..a53b328 100644 --- a/e2e/service.go +++ b/e2e/service.go @@ -168,11 +168,15 @@ func (s *Service) Start(ctx context.Context) error { // consumer is ready. Only if the consumer is ready we want to start the producer to ensure that we will not // miss messages because the consumer wasn't ready. initCh := make(chan bool) + s.logger.Info("initializing consumer and waiting until it has received the first messages") go s.startConsumeMessages(ctx, initCh) // Produce an init message until the consumer received at least one fetch initTicker := time.NewTicker(500 * time.Millisecond) isInitialized := false + timeoutCtx, timeoutCancel := context.WithTimeout(ctx, 60*time.Second) + defer timeoutCancel() + for !isInitialized { select { case <-initTicker.C: @@ -185,6 +189,9 @@ func (s *Service) Start(ctx context.Context) error { isInitialized = true s.logger.Info("consumer has been successfully initialized") break + case <-timeoutCtx.Done(): + s.logger.Error("failed to initialize consumer successfully") + return fmt.Errorf("failed to initialize consumer within 30s") case <-ctx.Done(): return nil }