Skip to content

Commit

Permalink
fix: do not Kafka commit error message
Browse files Browse the repository at this point in the history
  • Loading branch information
alexec committed Jul 26, 2021
1 parent 8076a04 commit 9e17d95
Showing 1 changed file with 4 additions and 2 deletions.
6 changes: 4 additions & 2 deletions runner/sidecar/source/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@ func (handler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
func (handler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
func (h handler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
_ = h.f(context.Background(), msg.Value)
sess.MarkMessage(msg, "")
if err := h.f(context.Background(), msg.Value); err != nil {
} else {
sess.MarkMessage(msg, "")
}
h.i++
if h.i%dfv1.CommitN == 0 {
sess.Commit()
Expand Down

0 comments on commit 9e17d95

Please sign in to comment.