Skip to content

Commit 322c078

Browse files
authored
allow multiple acks to be returned from connector (#1912)
1 parent f92354e commit 322c078

File tree

1 file changed

+22
-12
lines changed

1 file changed

+22
-12
lines changed

Diff for: pkg/lifecycle/stream/destination_acker.go

+22-12
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"context"
2020
"sync"
2121

22+
"github.com/conduitio/conduit/pkg/connector"
2223
"github.com/conduitio/conduit/pkg/foundation/cerrors"
2324
"github.com/conduitio/conduit/pkg/foundation/log"
2425
"github.com/gammazero/deque"
@@ -122,6 +123,8 @@ func (n *DestinationAckerNode) worker(
122123
errChan <- err
123124
}
124125

126+
var acks []connector.DestinationAck
127+
125128
defer close(errChan)
126129
for range signalChan {
127130
// signal is received when a new message is in the queue
@@ -136,22 +139,29 @@ func (n *DestinationAckerNode) worker(
136139
msg := n.queue.PopFront()
137140
n.m.Unlock()
138141

139-
acks, err := n.Destination.Ack(ctx)
140-
if err != nil {
141-
handleError(msg, cerrors.Errorf("error while fetching acks: %w", err))
142-
return
143-
}
144-
for _, ack := range acks {
145-
if !bytes.Equal(msg.Record.Position, ack.Position) {
146-
handleError(msg, cerrors.Errorf("received unexpected ack, expected position %q but got %q", msg.Record.Position, ack.Position))
147-
return
148-
}
149-
err = n.handleAck(msg, ack.Error)
142+
if len(acks) == 0 {
143+
// Ack can return multiple acks, store them and check the position
144+
// for the current message
145+
var err error
146+
acks, err = n.Destination.Ack(ctx)
150147
if err != nil {
151-
errChan <- err
148+
handleError(msg, cerrors.Errorf("error while fetching acks: %w", err))
152149
return
153150
}
154151
}
152+
153+
ack := acks[0]
154+
acks = acks[1:]
155+
156+
if !bytes.Equal(msg.Record.Position, ack.Position) {
157+
handleError(msg, cerrors.Errorf("received unexpected ack, expected position %q but got %q", msg.Record.Position, ack.Position))
158+
return
159+
}
160+
err := n.handleAck(msg, ack.Error)
161+
if err != nil {
162+
errChan <- err
163+
return
164+
}
155165
}
156166
}
157167
}

0 commit comments

Comments
 (0)