Skip to content

Commit

Permalink
refactor: rename variables
Browse files Browse the repository at this point in the history
  • Loading branch information
alexec committed Jun 15, 2021
1 parent 2fd2422 commit c63a715
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 2 deletions.
3 changes: 2 additions & 1 deletion runner/sidecar/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ func (h *handler) Close() error {
func (h *handler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
ctx := context.Background()
for m := range claim.Messages() {
if err := h.f(ctx, m.Value); err != nil {
msg := m.Value
if err := h.f(ctx, msg); err != nil {
// noop
} else {
sess.MarkMessage(m, "")
Expand Down
4 changes: 3 additions & 1 deletion runner/sidecar/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,8 @@ func connectSources(ctx context.Context, toMain func(context.Context, []byte) er
if x := source.Cron; x != nil {
_, err := crn.AddFunc(x.Schedule, func() {
ctx := context.Background()
_ = f(ctx, []byte(time.Now().Format(x.Layout))) // TODO
msg := []byte(time.Now().Format(x.Layout))
_ = f(ctx, msg)
})
if err != nil {
return fmt.Errorf("failed to schedule cron %q: %w", x.Schedule, err)
Expand Down Expand Up @@ -440,6 +441,7 @@ func connectSources(ctx context.Context, toMain func(context.Context, []byte) er
queueName := fmt.Sprintf("%s-%s-source-%s", pipelineName, stepName, sourceName)
if sub, err := sc.QueueSubscribe(x.Subject, queueName, func(msg *stan.Msg) {
ctx := context.Background()
logger.Info("message", "type", "stan", "source", sourceName, "msg", msg.Data)
if err := f(ctx, msg.Data); err != nil {
if err := msg.Ack(); err != nil {
logger.Error(err, "failed to ack message", "msg", msg)
Expand Down

0 comments on commit c63a715

Please sign in to comment.