Skip to content

Commit

Permalink
fix: report sink errors
Browse files Browse the repository at this point in the history
  • Loading branch information
alexec committed May 14, 2021
1 parent 1fbfbd1 commit 98917eb
Showing 1 changed file with 8 additions and 1 deletion.
9 changes: 8 additions & 1 deletion runner/sidecar/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,11 @@ func connectSink() (func([]byte) error, error) {
toSinks = append(toSinks, func(m []byte) error {
withLock(func() { sinkStatues.Set(sink.Name, replica, printable(m)) })
debug.Info("◷ → stan", "subject", s.Subject, "m", printable(m))
return sc.Publish(s.Subject, m)
err := sc.Publish(s.Subject, m)
if err != nil {
withLock(func() {sinkStatues.IncErrors(sink.Name, replica, err)})
}
return err
})
} else if k := sink.Kafka; k != nil {
logger.Info("connecting sink", "type", "kafka", "brokers", k.Brokers, "topic", k.Topic, "version", k.Version)
Expand All @@ -514,6 +518,9 @@ func connectSink() (func([]byte) error, error) {
Topic: k.Topic,
Value: sarama.ByteEncoder(m),
})
if err != nil {
withLock(func() { sinkStatues.IncErrors(sink.Name, replica, err) })
}
return err
})
} else if l := sink.Log; l != nil {
Expand Down

0 comments on commit 98917eb

Please sign in to comment.