@@ -18,44 +18,44 @@ const (
18
18
)
19
19
20
20
// Dispatcher processes the ingress and dispatching of scheduled messages.
21
- type Dispatcher struct {
21
+ type Dispatcher [ T any ] struct {
22
22
state dispatcherState
23
23
maxMessages int
24
24
25
- pq priorityQueue
26
- nextMessage * ScheduledMessage
27
- delayer delayer
25
+ pq priorityQueue [ T ]
26
+ nextMessage * ScheduledMessage [ T ]
27
+ delayer delayer [ T ]
28
28
29
29
delayerIdleChannel chan bool
30
- dispatchChannel chan interface {}
31
- ingressChannel chan * ScheduledMessage
30
+ dispatchChannel chan T
31
+ ingressChannel chan * ScheduledMessage [ T ]
32
32
shutdown chan error
33
33
stopProcess chan bool
34
34
35
35
mutex * sync.Mutex
36
36
}
37
37
38
38
// NewDispatcher creates a new instance of a Dispatcher.
39
- func NewDispatcher (config * DispatcherConfig ) (* Dispatcher , error ) {
39
+ func NewDispatcher [ T any ] (config * DispatcherConfig ) (* Dispatcher [ T ] , error ) {
40
40
if config .MaxMessages <= 0 {
41
41
return nil , errors .New ("MaxMessages should be greater than 0" )
42
42
}
43
43
44
44
newIdleChannel := make (chan bool , 1 )
45
- newDispatchChannel := make (chan interface {} , config .DispatchChannelSize )
46
- newPq := priorityQueue {
47
- items : make ([]* item , 0 ),
45
+ newDispatchChannel := make (chan T , config .DispatchChannelSize )
46
+ newPq := priorityQueue [ T ] {
47
+ items : make ([]* item [ T ] , 0 ),
48
48
maintainOrder : config .GuaranteeOrder ,
49
49
}
50
50
51
51
heap .Init (& newPq )
52
- return & Dispatcher {
52
+ return & Dispatcher [ T ] {
53
53
pq : newPq ,
54
54
maxMessages : config .MaxMessages ,
55
- delayer : newDelay (newDispatchChannel , newIdleChannel ),
55
+ delayer : newDelay [ T ] (newDispatchChannel , newIdleChannel ),
56
56
delayerIdleChannel : newIdleChannel ,
57
57
dispatchChannel : newDispatchChannel ,
58
- ingressChannel : make (chan * ScheduledMessage , config .IngressChannelSize ),
58
+ ingressChannel : make (chan * ScheduledMessage [ T ] , config .IngressChannelSize ),
59
59
shutdown : make (chan error ),
60
60
stopProcess : make (chan bool ),
61
61
mutex : & sync.Mutex {},
@@ -67,7 +67,7 @@ func NewDispatcher(config *DispatcherConfig) (*Dispatcher, error) {
67
67
//
68
68
// If drainImmediately is true, then all messages will be dispatched immediately regardless of the schedule set. Order
69
69
// can be lost if new messages are still being ingested.
70
- func (d * Dispatcher ) Shutdown (ctx context.Context , drainImmediately bool ) error {
70
+ func (d * Dispatcher [ T ] ) Shutdown (ctx context.Context , drainImmediately bool ) error {
71
71
if d .state == shutdown || d .state == shutdownAndDrain {
72
72
return errors .New ("shutdown has already happened" )
73
73
}
@@ -109,7 +109,7 @@ func (d *Dispatcher) Shutdown(ctx context.Context, drainImmediately bool) error
109
109
}
110
110
111
111
// Start initializes the processing of scheduled messages and blocks.
112
- func (d * Dispatcher ) Start () error {
112
+ func (d * Dispatcher [ T ] ) Start () error {
113
113
d .mutex .Lock ()
114
114
if d .state == shutdown || d .state == shutdownAndDrain {
115
115
return errors .New ("dispatcher is already running and shutting/shut down" )
@@ -124,7 +124,7 @@ func (d *Dispatcher) Start() error {
124
124
}
125
125
126
126
// Pause updates the state of the Dispatcher to stop processing messages and will close the main process loop.
127
- func (d * Dispatcher ) Pause () error {
127
+ func (d * Dispatcher [ T ] ) Pause () error {
128
128
d .mutex .Lock ()
129
129
if d .state == shutdown || d .state == shutdownAndDrain {
130
130
return errors .New ("dispatcher is shutting/shut down and cannot be paused" )
@@ -141,7 +141,7 @@ func (d *Dispatcher) Pause() error {
141
141
142
142
// Resume updates the state of the Dispatcher to start processing messages and starts the timer for the last message
143
143
// being processed and blocks.
144
- func (d * Dispatcher ) Resume () error {
144
+ func (d * Dispatcher [ T ] ) Resume () error {
145
145
d .mutex .Lock ()
146
146
if d .state == shutdown || d .state == shutdownAndDrain {
147
147
return errors .New ("dispatcher is shutting/shut down" )
@@ -159,7 +159,7 @@ func (d *Dispatcher) Resume() error {
159
159
}
160
160
161
161
// process handles the processing of scheduled messages.
162
- func (d * Dispatcher ) process () {
162
+ func (d * Dispatcher [ T ] ) process () {
163
163
for {
164
164
select {
165
165
case <- d .stopProcess :
@@ -180,7 +180,7 @@ func (d *Dispatcher) process() {
180
180
}
181
181
182
182
// handleShutdown drains the heap.
183
- func (d * Dispatcher ) handleShutdownAndDrain () {
183
+ func (d * Dispatcher [ T ] ) handleShutdownAndDrain () {
184
184
if d .state == shutdownAndDrain {
185
185
d .delayer .stop (true )
186
186
if len (d .delayerIdleChannel ) > 0 {
@@ -192,7 +192,7 @@ func (d *Dispatcher) handleShutdownAndDrain() {
192
192
193
193
// handlePriorityQueue checks whether the heap is full and will Pop the next message if present and when the delayer is
194
194
// idle.
195
- func (d * Dispatcher ) handlePriorityQueue () (cont bool ) {
195
+ func (d * Dispatcher [ T ] ) handlePriorityQueue () (cont bool ) {
196
196
// check if we've exceeded the maximum messages to store in the heap
197
197
if d .pq .Len () >= d .maxMessages {
198
198
if len (d .delayerIdleChannel ) > 0 {
@@ -210,7 +210,7 @@ func (d *Dispatcher) handlePriorityQueue() (cont bool) {
210
210
211
211
// handleIngress checks for new messages off the ingress channel and will either dispatch if `shutdownAndDrain`, replace
212
212
// the current delayer message or add to the heap.
213
- func (d * Dispatcher ) handleIngress () {
213
+ func (d * Dispatcher [ T ] ) handleIngress () {
214
214
if len (d .ingressChannel ) > 0 {
215
215
if msg , ok := <- d .ingressChannel ; ok {
216
216
if d .state == shutdownAndDrain {
@@ -232,26 +232,26 @@ func (d *Dispatcher) handleIngress() {
232
232
}
233
233
}
234
234
235
- func (d * Dispatcher ) waitNextMessage () {
236
- msg := heap .Pop (& d .pq ).(* ScheduledMessage )
235
+ func (d * Dispatcher [ T ] ) waitNextMessage () {
236
+ msg := heap .Pop (& d .pq ).(* ScheduledMessage [ T ] )
237
237
d .nextMessage = msg
238
238
d .delayer .wait (msg )
239
239
}
240
240
241
- func (d * Dispatcher ) drainHeap () {
241
+ func (d * Dispatcher [ T ] ) drainHeap () {
242
242
for d .pq .Len () > 0 {
243
- msg := heap .Pop (& d .pq ).(* ScheduledMessage )
243
+ msg := heap .Pop (& d .pq ).(* ScheduledMessage [ T ] )
244
244
// dispatch the message immediately
245
245
d .dispatchChannel <- msg .Message
246
246
}
247
247
}
248
248
249
249
// IngressChannel returns the send-only channel of type `ScheduledMessage`.
250
- func (d * Dispatcher ) IngressChannel () chan <- * ScheduledMessage {
250
+ func (d * Dispatcher [ T ] ) IngressChannel () chan <- * ScheduledMessage [ T ] {
251
251
return d .ingressChannel
252
252
}
253
253
254
- // DispatchChannel returns a receive-only channel of type `interface{} `.
255
- func (d * Dispatcher ) DispatchChannel () <- chan interface {} {
254
+ // DispatchChannel returns a receive-only channel of type `T `.
255
+ func (d * Dispatcher [ T ] ) DispatchChannel () <- chan T {
256
256
return d .dispatchChannel
257
257
}
0 commit comments