-
Notifications
You must be signed in to change notification settings - Fork 0
/
forwarder.go
103 lines (82 loc) · 3.03 KB
/
forwarder.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
package outbox
import (
"context"
"fmt"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/nikolayk812/pgx-outbox/types"
)
// Forwarder reads unpublished messages from the outbox table, publishes them and then marks them as published.
// It is recommended to run a single Forwarder instance per outbox table, i.e. in Kubernetes cronjob,
// or at least to isolate different Forwarder instances acting on the same outbox table by using different filters in outbox.Reader.
type Forwarder interface {
Forward(ctx context.Context, limit int) (types.ForwardStats, error)
}
type forwarder struct {
reader Reader
publisher Publisher
filter types.MessageFilter
}
func NewForwarder(reader Reader, publisher Publisher, opts ...ForwardOption) (Forwarder, error) {
if reader == nil {
return nil, ErrReaderNil
}
if publisher == nil {
return nil, ErrPublisherNil
}
f := &forwarder{
reader: reader,
publisher: publisher,
}
for _, opt := range opts {
opt(f)
}
if err := f.filter.Validate(); err != nil {
return nil, fmt.Errorf("filter.Validate: %w", err)
}
return f, nil
}
func NewForwarderFromPool(table string, pool *pgxpool.Pool, publisher Publisher, opts ...ForwardOption) (Forwarder, error) {
reader, err := NewReader(table, pool)
if err != nil {
return nil, fmt.Errorf("NewReader: %w", err)
}
forwarder, err := NewForwarder(reader, publisher, opts...)
if err != nil {
return nil, fmt.Errorf("NewForwarder: %w", err)
}
return forwarder, nil
}
// Forward reads unpublished messages from the outbox table according to the limit and filter in outbox.Reader,
// publishes them and then marks them as published in the outbox table.
// It returns the number of messages read, published and acknowledged.
// If any of the messages fail to be published, the function returns an error immediately.
// It means that the messages published before the error occurred will be not be acknowledged
// and will be published again on the next run.
// If a message cannot be published for any reason, it would block the forwarder from making progress.
// Hence, the forwarder progress (running in a cronjob) should be monitored and
// an action should be taken if it stops making progress, i.e. removing a poison message from the outbox table manually.
func (f *forwarder) Forward(ctx context.Context, limit int) (types.ForwardStats, error) {
var fs types.ForwardStats
messages, err := f.reader.Read(ctx, limit)
if err != nil {
return fs, fmt.Errorf("reader.Read: %w", err)
}
if len(messages) == 0 {
return fs, nil
}
fs.Read = len(messages)
for idx, message := range messages {
if err := f.publisher.Publish(ctx, message); err != nil {
return fs, fmt.Errorf("publisher.Publish index[%d] topic[%s] id[%d]: %w", idx, message.Topic, message.ID, err)
}
fs.Published++
}
ids := types.Messages(messages).IDs()
// if it fails here, messages would be published again on the next run
marked, err := f.reader.Ack(ctx, ids)
if err != nil {
return fs, fmt.Errorf("reader.Ack count[%d]: %w", len(ids), err)
}
fs.Acked = marked
return fs, nil
}