diff --git a/runner/sidecar/source/cron/cron.go b/runner/sidecar/source/cron/cron.go index bfbb0c91..5d75cbc7 100644 --- a/runner/sidecar/source/cron/cron.go +++ b/runner/sidecar/source/cron/cron.go @@ -32,7 +32,9 @@ func New(x dfv1.Cron, f source.Func) (source.Interface, error) { _, err := crn.AddFunc(x.Schedule, func() { msg := []byte(time.Now().Format(x.Layout)) - _ = f(context.Background(), msg) + if err := f(context.Background(), msg); err != nil { + logger.Error(err, "failed to process message") + } }) if err != nil { return nil, fmt.Errorf("failed to schedule cron %q: %w", x.Schedule, err) diff --git a/runner/sidecar/source/db/db.go b/runner/sidecar/source/db/db.go index f6069b27..3cfa9592 100644 --- a/runner/sidecar/source/db/db.go +++ b/runner/sidecar/source/db/db.go @@ -80,7 +80,7 @@ func New(ctx context.Context, secretInterface corev1.SecretInterface, clusterNam offset = fmt.Sprintf("%v", d[x.OffsetColumn]) return nil }); err != nil { - logger.Error(err, "failed to process data query: %w", err) + logger.Error(err, "failed to process data query") } } } @@ -196,7 +196,7 @@ func queryData(ctx context.Context, db *sql.DB, query, offsetColumn, offset stri } } if err = f(entry); err != nil { - logger.Error(err, "failed process data: %w", err) + logger.Error(err, "failed to process message") } } return nil diff --git a/runner/sidecar/source/kafka/handler.go b/runner/sidecar/source/kafka/handler.go index fd3ef0cf..2a2b7d70 100644 --- a/runner/sidecar/source/kafka/handler.go +++ b/runner/sidecar/source/kafka/handler.go @@ -26,6 +26,7 @@ func (h handler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.Con logger.Info("starting consuming claim", "partition", claim.Partition()) for msg := range claim.Messages() { if err := h.f(sess.Context(), msg.Value); err != nil { + logger.Error(err, "failed to process message") } else { sess.MarkMessage(msg, "") } diff --git a/runner/sidecar/source/stan/stan.go b/runner/sidecar/source/stan/stan.go index 9ac3b6be..ec088c60 100644 --- a/runner/sidecar/source/stan/stan.go +++ b/runner/sidecar/source/stan/stan.go @@ -54,7 +54,7 @@ func New(ctx context.Context, secretInterface corev1.SecretInterface, clusterNam logger.Info("subscribing to STAN queue", "source", sourceName, "queueName", queueName) sub, err := conn.QueueSubscribe(x.Subject, queueName, func(msg *stan.Msg) { if err := f(context.Background(), msg.Data); err != nil { - // noop + logger.Error(err, "failed to process message") } else if err := msg.Ack(); err != nil { logger.Error(err, "failed to ack message", "source", sourceName) }