25
25
package nats
26
26
27
27
import (
28
+ "errors"
28
29
"net"
29
30
"strconv"
30
31
"sync"
31
32
"time"
32
33
33
34
"github.com/flowchartsman/retry"
34
35
"github.com/nats-io/nats.go"
35
- "github.com/nats-io/nats.go/encoders/protobuf"
36
36
"go.uber.org/atomic"
37
+ "google.golang.org/protobuf/proto"
37
38
38
39
"github.com/tochemey/goakt/v2/discovery"
39
- internalpb "github.com/tochemey/goakt/v2/internal/internalpb"
40
+ "github.com/tochemey/goakt/v2/internal/internalpb"
40
41
"github.com/tochemey/goakt/v2/log"
41
42
)
42
43
@@ -49,7 +50,7 @@ type Discovery struct {
49
50
registered * atomic.Bool
50
51
51
52
// define the nats connection
52
- natsConnection * nats.EncodedConn
53
+ connection * nats.Conn
53
54
// define a slice of subscriptions
54
55
subscriptions []* nats.Subscription
55
56
@@ -132,12 +133,8 @@ func (d *Discovery) Initialize() error {
132
133
return nil
133
134
})
134
135
135
- // create the NATs connection encoder
136
- encodedConn , err := nats .NewEncodedConn (connection , protobuf .PROTOBUF_ENCODER )
137
- if err != nil {
138
- return err
139
- }
140
- d .natsConnection = encodedConn
136
+ // create the NATs connection
137
+ d .connection = connection
141
138
d .initialized = atomic .NewBool (true )
142
139
return nil
143
140
}
@@ -152,33 +149,41 @@ func (d *Discovery) Register() error {
152
149
}
153
150
154
151
// create the subscription handler
155
- subscriptionHandler := func (_ , reply string , msg * internalpb.NatsMessage ) {
156
- switch msg .GetMessageType () {
152
+ handler := func (msg * nats.Msg ) {
153
+ message := new (internalpb.NatsMessage )
154
+ if err := proto .Unmarshal (msg .Data , message ); err != nil {
155
+ // TODO: need to read more and see how to propagate the error
156
+ d .connection .Opts .AsyncErrorCB (d .connection , msg .Sub , errors .New ("nats: Got an error trying to unmarshal: " + err .Error ()))
157
+ return
158
+ }
159
+
160
+ switch message .GetMessageType () {
157
161
case internalpb .NatsMessageType_NATS_MESSAGE_TYPE_DEREGISTER :
158
162
d .logger .Infof ("received an de-registration request from peer[name=%s, host=%s, port=%d]" ,
159
- msg .GetName (), msg .GetHost (), msg .GetPort ())
163
+ message .GetName (), message .GetHost (), message .GetPort ())
160
164
case internalpb .NatsMessageType_NATS_MESSAGE_TYPE_REGISTER :
161
165
d .logger .Infof ("received an registration request from peer[name=%s, host=%s, port=%d]" ,
162
- msg .GetName (), msg .GetHost (), msg .GetPort ())
166
+ message .GetName (), message .GetHost (), message .GetPort ())
163
167
case internalpb .NatsMessageType_NATS_MESSAGE_TYPE_REQUEST :
164
168
d .logger .Infof ("received an identification request from peer[name=%s, host=%s, port=%d]" ,
165
- msg .GetName (), msg .GetHost (), msg .GetPort ())
169
+ message .GetName (), message .GetHost (), message .GetPort ())
166
170
167
- replyMessage := & internalpb.NatsMessage {
171
+ response := & internalpb.NatsMessage {
168
172
Host : d .hostNode .Host ,
169
173
Port : int32 (d .hostNode .GossipPort ),
170
174
Name : d .hostNode .Name ,
171
175
MessageType : internalpb .NatsMessageType_NATS_MESSAGE_TYPE_RESPONSE ,
172
176
}
173
177
174
- if err := d .natsConnection .Publish (reply , replyMessage ); err != nil {
178
+ bytea , _ := proto .Marshal (response )
179
+ if err := d .connection .Publish (msg .Reply , bytea ); err != nil {
175
180
d .logger .Errorf ("failed to reply for identification request from peer[name=%s, host=%s, port=%d]" ,
176
- msg .GetName (), msg .GetHost (), msg .GetPort ())
181
+ message .GetName (), message .GetHost (), message .GetPort ())
177
182
}
178
183
}
179
184
}
180
185
181
- subscription , err := d .natsConnection .Subscribe (d .config .NatsSubject , subscriptionHandler )
186
+ subscription , err := d .connection .Subscribe (d .config .NatsSubject , handler )
182
187
if err != nil {
183
188
return err
184
189
}
@@ -215,14 +220,17 @@ func (d *Discovery) Deregister() error {
215
220
}
216
221
217
222
// send the de-registration message to notify peers
218
- if d .natsConnection != nil {
223
+ if d .connection != nil {
219
224
// send a message to deregister stating we are out
220
- return d . natsConnection . Publish ( d . config . NatsSubject , & internalpb.NatsMessage {
225
+ message := & internalpb.NatsMessage {
221
226
Host : d .hostNode .Host ,
222
227
Port : int32 (d .hostNode .GossipPort ),
223
228
Name : d .hostNode .Name ,
224
229
MessageType : internalpb .NatsMessageType_NATS_MESSAGE_TYPE_DEREGISTER ,
225
- })
230
+ }
231
+
232
+ bytea , _ := proto .Marshal (message )
233
+ return d .connection .Publish (d .config .NatsSubject , bytea )
226
234
}
227
235
228
236
d .registered .Store (false )
@@ -246,37 +254,45 @@ func (d *Discovery) DiscoverPeers() ([]string, error) {
246
254
// report their presence.
247
255
// collect as many responses as possible in the given timeout.
248
256
inbox := nats .NewInbox ()
249
- recv := make (chan * internalpb. NatsMessage )
257
+ recv := make (chan * nats. Msg , 1 )
250
258
251
259
// bind to receive messages
252
- sub , err := d .natsConnection . BindRecvChan (inbox , recv )
260
+ sub , err := d .connection . ChanSubscribe (inbox , recv )
253
261
if err != nil {
254
262
return nil , err
255
263
}
256
264
257
- if err = d . natsConnection . PublishRequest ( d . config . NatsSubject , inbox , & internalpb.NatsMessage {
265
+ request := & internalpb.NatsMessage {
258
266
Host : d .hostNode .Host ,
259
267
Port : int32 (d .hostNode .GossipPort ),
260
268
Name : d .hostNode .Name ,
261
269
MessageType : internalpb .NatsMessageType_NATS_MESSAGE_TYPE_REQUEST ,
262
- }); err != nil {
270
+ }
271
+
272
+ bytea , _ := proto .Marshal (request )
273
+ if err = d .connection .PublishRequest (d .config .NatsSubject , inbox , bytea ); err != nil {
263
274
return nil , err
264
275
}
265
276
266
277
var peers []string
267
278
timeout := time .After (d .config .Timeout )
268
279
me := d .hostNode .GossipAddress ()
269
-
270
280
for {
271
281
select {
272
- case m , ok := <- recv :
282
+ case msg , ok := <- recv :
273
283
if ! ok {
274
284
// Subscription is closed
275
285
return peers , nil
276
286
}
277
287
288
+ message := new (internalpb.NatsMessage )
289
+ if err := proto .Unmarshal (msg .Data , message ); err != nil {
290
+ d .logger .Errorf ("failed to unmarshal nats message: %v" , err )
291
+ return nil , err
292
+ }
293
+
278
294
// get the found peer address
279
- addr := net .JoinHostPort (m .GetHost (), strconv .Itoa (int (m .GetPort ())))
295
+ addr := net .JoinHostPort (message .GetHost (), strconv .Itoa (int (message .GetPort ())))
280
296
if addr == me {
281
297
continue
282
298
}
@@ -297,10 +313,10 @@ func (d *Discovery) Close() error {
297
313
298
314
d .initialized .Store (false )
299
315
300
- if d .natsConnection != nil {
316
+ if d .connection != nil {
301
317
defer func () {
302
- d .natsConnection .Close ()
303
- d .natsConnection = nil
318
+ d .connection .Close ()
319
+ d .connection = nil
304
320
}()
305
321
306
322
for _ , subscription := range d .subscriptions {
@@ -313,7 +329,7 @@ func (d *Discovery) Close() error {
313
329
}
314
330
}
315
331
316
- return d .natsConnection .Flush ()
332
+ return d .connection .Flush ()
317
333
}
318
334
return nil
319
335
}
0 commit comments