4
4
"context"
5
5
"sync"
6
6
7
+ nc "github.com/nats-io/nats.go"
7
8
"go.infratographer.com/permissions-api/internal/query"
8
9
"go.infratographer.com/permissions-api/internal/types"
9
10
"go.infratographer.com/x/events"
@@ -26,6 +27,7 @@ type Subscriber struct {
26
27
changeChannels []<- chan * message.Message
27
28
logger * zap.SugaredLogger
28
29
subscriber * events.Subscriber
30
+ subOpts []nc.SubOpt
29
31
qe query.Engine
30
32
}
31
33
@@ -39,25 +41,33 @@ func WithLogger(l *zap.SugaredLogger) SubscriberOption {
39
41
}
40
42
}
41
43
42
- // NewSubscriber creates a new Subscriber
43
- func NewSubscriber (ctx context.Context , cfg events.SubscriberConfig , engine query.Engine , opts ... SubscriberOption ) (* Subscriber , error ) {
44
- sub , err := events .NewSubscriber (cfg )
45
- if err != nil {
46
- return nil , err
44
+ // WithNatsSubOpts sets the logger for the Subscriber
45
+ func WithNatsSubOpts (options ... nc.SubOpt ) SubscriberOption {
46
+ return func (s * Subscriber ) {
47
+ s .subOpts = append (s .subOpts , options ... )
47
48
}
49
+ }
48
50
51
+ // NewSubscriber creates a new Subscriber
52
+ func NewSubscriber (ctx context.Context , cfg events.SubscriberConfig , engine query.Engine , opts ... SubscriberOption ) (* Subscriber , error ) {
49
53
s := & Subscriber {
50
- ctx : ctx ,
51
- logger : zap .NewNop ().Sugar (),
52
- subscriber : sub ,
53
- qe : engine ,
54
+ ctx : ctx ,
55
+ logger : zap .NewNop ().Sugar (),
56
+ qe : engine ,
54
57
}
55
58
56
59
for _ , opt := range opts {
57
60
opt (s )
58
61
}
59
62
60
- s .logger .Debugw ("subscriber configuration" , cfg )
63
+ sub , err := events .NewSubscriber (cfg , s .subOpts ... )
64
+ if err != nil {
65
+ return nil , err
66
+ }
67
+
68
+ s .subscriber = sub
69
+
70
+ s .logger .Debugw ("subscriber configuration" , "config" , cfg )
61
71
62
72
return s , nil
63
73
}
0 commit comments