Skip to content

Commit

Permalink
fix: log failed to process message
Browse files Browse the repository at this point in the history
  • Loading branch information
alexec committed Sep 7, 2021
1 parent e5f12ce commit ab6d38a
Show file tree
Hide file tree
Showing 4 changed files with 7 additions and 4 deletions.
4 changes: 3 additions & 1 deletion runner/sidecar/source/cron/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ func New(x dfv1.Cron, f source.Func) (source.Interface, error) {

_, err := crn.AddFunc(x.Schedule, func() {
msg := []byte(time.Now().Format(x.Layout))
_ = f(context.Background(), msg)
if err := f(context.Background(), msg); err != nil {
logger.Error(err, "failed to process message")
}
})
if err != nil {
return nil, fmt.Errorf("failed to schedule cron %q: %w", x.Schedule, err)
Expand Down
4 changes: 2 additions & 2 deletions runner/sidecar/source/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func New(ctx context.Context, secretInterface corev1.SecretInterface, clusterNam
offset = fmt.Sprintf("%v", d[x.OffsetColumn])
return nil
}); err != nil {
logger.Error(err, "failed to process data query: %w", err)
logger.Error(err, "failed to process data query")
}
}
}
Expand Down Expand Up @@ -196,7 +196,7 @@ func queryData(ctx context.Context, db *sql.DB, query, offsetColumn, offset stri
}
}
if err = f(entry); err != nil {
logger.Error(err, "failed process data: %w", err)
logger.Error(err, "failed to process message")
}
}
return nil
Expand Down
1 change: 1 addition & 0 deletions runner/sidecar/source/kafka/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func (h handler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.Con
logger.Info("starting consuming claim", "partition", claim.Partition())
for msg := range claim.Messages() {
if err := h.f(sess.Context(), msg.Value); err != nil {
logger.Error(err, "failed to process message")
} else {
sess.MarkMessage(msg, "")
}
Expand Down
2 changes: 1 addition & 1 deletion runner/sidecar/source/stan/stan.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func New(ctx context.Context, secretInterface corev1.SecretInterface, clusterNam
logger.Info("subscribing to STAN queue", "source", sourceName, "queueName", queueName)
sub, err := conn.QueueSubscribe(x.Subject, queueName, func(msg *stan.Msg) {
if err := f(context.Background(), msg.Data); err != nil {
// noop
logger.Error(err, "failed to process message")
} else if err := msg.Ack(); err != nil {
logger.Error(err, "failed to ack message", "source", sourceName)
}
Expand Down

0 comments on commit ab6d38a

Please sign in to comment.