Skip to content

Commit

Permalink
fix: delay start-up until Kafka is ready
Browse files Browse the repository at this point in the history
  • Loading branch information
alexec committed Jul 2, 2021
1 parent 015d160 commit 4e1d2ff
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 2 deletions.
8 changes: 7 additions & 1 deletion runner/sidecar/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,13 @@ import (
"github.com/Shopify/sarama"
)

func newHandler(f func(ctx context.Context, msg []byte) error) *handler {
return &handler{f: f}
}

type handler struct {
f func(context.Context, []byte) error
f func(context.Context, []byte) error
started bool
}

func (h *handler) Setup(sarama.ConsumerGroupSession) error {
Expand All @@ -22,6 +27,7 @@ func (h *handler) Close() error {
}

func (h *handler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
h.started = true
for m := range claim.Messages() {
msg := m.Value
if err := h.f(context.Background(), msg); err != nil {
Expand Down
5 changes: 4 additions & 1 deletion runner/sidecar/sources.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func connectKafkaSource(ctx context.Context, x *dfv1.Kafka, sourceName string, f
logger.Info("closing kafka consumer group", "source", sourceName)
return group.Close()
})
handler := &handler{f: f}
handler := newHandler(f)
go wait.JitterUntil(func() {
if err := group.Consume(ctx, []string{x.Topic}, handler); err != nil {
logger.Error(err, "failed to create kafka consumer")
Expand All @@ -147,6 +147,9 @@ func connectKafkaSource(ctx context.Context, x *dfv1.Kafka, sourceName string, f
logger.Info("closing kafka handler", "source", sourceName)
return handler.Close()
})
for ; !handler.started; {
time.Sleep(time.Second)
}
if leadReplica() {
registerKafkaSetPendingHook(x, sourceName, client, config, groupName)
}
Expand Down

0 comments on commit 4e1d2ff

Please sign in to comment.