Skip to content

Commit

Permalink
Merge pull request #166 from Scandiravian/allow-no-crash-on-initial-c…
Browse files Browse the repository at this point in the history
…onnect-failure

Allow retry on failure during initial connection test
  • Loading branch information
weeco authored Aug 30, 2022
2 parents 5a5c788 + 7ac093a commit 020e30e
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 7 deletions.
3 changes: 3 additions & 0 deletions charts/kminion/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,9 @@ kminion:
# username: ""
# password: ""
# realm: ""
# # Whether to retry the initial test connection to Kafka. False will exit with code 1 on error,
# # while true will retry until success.
# retryInitConnection: false
#
# minion:
# consumerGroups:
Expand Down
2 changes: 2 additions & 0 deletions kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ type Config struct {

TLS TLSConfig `koanf:"tls"`
SASL SASLConfig `koanf:"sasl"`

RetryInitConnection bool `koanf:"retryInitConnection"`
}

func (c *Config) SetDefaults() {
Expand Down
24 changes: 17 additions & 7 deletions kafka/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,18 @@ func (s *Service) CreateAndTestClient(ctx context.Context, l *zap.Logger, opts [
}

// Test connection
connectCtx, cancel := context.WithTimeout(ctx, 15*time.Second)
defer cancel()
err = s.testConnection(client, connectCtx)
if err != nil {
return nil, fmt.Errorf("failed to test connectivity to Kafka cluster %w", err)
for {
err = s.testConnection(client, ctx)
if err == nil {
break
}

if !s.cfg.RetryInitConnection {
return nil, fmt.Errorf("failed to test connectivity to Kafka cluster %w", err)
}

logger.Warn("failed to test connectivity to Kafka cluster, retrying in 5 seconds", zap.Error(err))
time.Sleep(time.Second * 5)
}

return client, nil
Expand All @@ -61,17 +68,20 @@ func (s *Service) Brokers() []string {
// testConnection tries to fetch Broker metadata and prints some information if connection succeeds. An error will be
// returned if connecting fails.
func (s *Service) testConnection(client *kgo.Client, ctx context.Context) error {
connectCtx, cancel := context.WithTimeout(ctx, 15*time.Second)
defer cancel()

req := kmsg.MetadataRequest{
Topics: nil,
}
res, err := req.RequestWith(ctx, client)
res, err := req.RequestWith(connectCtx, client)
if err != nil {
return fmt.Errorf("failed to request metadata: %w", err)
}

// Request versions in order to guess Kafka Cluster version
versionsReq := kmsg.NewApiVersionsRequest()
versionsRes, err := versionsReq.RequestWith(ctx, client)
versionsRes, err := versionsReq.RequestWith(connectCtx, client)
if err != nil {
return fmt.Errorf("failed to request api versions: %w", err)
}
Expand Down

0 comments on commit 020e30e

Please sign in to comment.