Skip to content

Commit

Permalink
feat: mark all Kafka messages
Browse files Browse the repository at this point in the history
  • Loading branch information
alexec committed May 27, 2021
1 parent dcee8f5 commit 53c4d04
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 8 deletions.
7 changes: 2 additions & 5 deletions runner/sidecar/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,8 @@ func (h *handler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.Co
for m := range claim.Messages() {
h.partition = m.Partition
h.offset = m.Offset
if err := h.f(m.Value); err != nil {
// noop
} else {
sess.MarkMessage(m, "")
}
_ = h.f(m.Value) // TODO we should provide a way to deal with errors here, e.g. retries
sess.MarkMessage(m, "")
}
return nil
}
6 changes: 3 additions & 3 deletions runner/sidecar/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ func connectSources(ctx context.Context, toMain func([]byte) error) error {
})
for i := 0; i < int(x.Parallel); i++ {
if sub, err := sc.QueueSubscribe(x.Subject, fmt.Sprintf("%s-%s", pipelineName, spec.Name), func(m *stan.Msg) {
_ = f(m.Data)
_ = f(m.Data) // TODO we should decide what to do with errors here, currently we ignore them
}, stan.DurableName(clientID)); err != nil {
return fmt.Errorf("failed to subscribe: %w", err)
} else {
Expand Down Expand Up @@ -382,7 +382,7 @@ func connectSources(ctx context.Context, toMain func([]byte) error) error {
msg, err := ioutil.ReadAll(r.Body)
if err != nil {
logger.Error(err, "⚠ http →")
w.WriteHeader(500)
w.WriteHeader(400)
_, _ = w.Write([]byte(err.Error()))
return
}
Expand Down Expand Up @@ -539,7 +539,7 @@ func connectOut(toSink func([]byte) error) {
data, err := ioutil.ReadAll(r.Body)
if err != nil {
logger.Error(err, "failed to read message body from main via HTTP")
w.WriteHeader(500)
w.WriteHeader(400)
_, _ = w.Write([]byte(err.Error()))
return
}
Expand Down

0 comments on commit 53c4d04

Please sign in to comment.