@@ -3,7 +3,6 @@ package main
3
3
import (
4
4
"crypto/tls"
5
5
"crypto/x509"
6
- "encoding/json"
7
6
"errors"
8
7
"fmt"
9
8
"io"
@@ -12,10 +11,12 @@ import (
12
11
"net"
13
12
"path/filepath"
14
13
"strconv"
14
+ "strings"
15
15
"time"
16
16
17
17
"github.com/Shopify/sarama"
18
18
"github.com/golang/protobuf/proto"
19
+ "github.com/nileshsimaria/jtimon/dialout"
19
20
gnmi_dialout "github.com/nileshsimaria/jtimon/gnmi/dialout"
20
21
"github.com/nileshsimaria/jtimon/gnmi/gnmi"
21
22
@@ -52,25 +53,19 @@ type rpcInfoT struct {
52
53
rpc string
53
54
rpcId int8
54
55
running bool
55
- cfgChannel chan * dialOutConfigT
56
- config * dialOutConfigT
56
+ cfgChannel chan * dialout. DialOutRequest
57
+ config * dialout. DialOutRequest
57
58
jctx * JCtx // Currently inherited from device, so use this handle as read-only
58
59
59
60
device * deviceInfoT
60
61
}
61
62
62
- type dialOutConfigT struct {
63
- Device string `json:"device"`
64
- RpcType string `json:"rpc-type"`
65
- Paths []PathsConfig `json:"paths"`
66
- }
67
-
68
63
func newDialOutServer (rpcs []string ) * dialoutServerT {
69
64
s := & dialoutServerT {devices : map [string ]* deviceInfoT {}}
70
65
71
66
// Create kafka Client
72
67
kafkaCfg := sarama .NewConfig ()
73
- kafkaClient , err := sarama .NewClient ([] string { * kafkaBroker } , kafkaCfg )
68
+ kafkaClient , err := sarama .NewClient (strings . Split ( * kafkaBroker , "," ) , kafkaCfg )
74
69
if err != nil {
75
70
log .Fatalf ("Not able to connect to Kafka broker at %v: %v" , * kafkaBroker , err )
76
71
}
@@ -109,7 +104,7 @@ func newDialOutServer(rpcs []string) *dialoutServerT {
109
104
110
105
func createRpc (device * deviceInfoT , name string , id int ) (* rpcInfoT , error ) {
111
106
// Sarama's default channel size for consuming messages is 256
112
- cfgChannel := make (chan * dialOutConfigT , 512 )
107
+ cfgChannel := make (chan * dialout. DialOutRequest , 512 )
113
108
return & rpcInfoT {rpc : name , rpcId : int8 (id ), cfgChannel : cfgChannel , jctx : device .jctx , device : device }, nil
114
109
}
115
110
@@ -217,15 +212,15 @@ func (s *dialoutServerT) DialOutSubscriber(stream gnmi_dialout.Subscriber_DialOu
217
212
218
213
i := 0
219
214
length := len (rpc .cfgChannel )
220
- var dialOutCfg * dialOutConfigT
215
+ var cfgReq * dialout. DialOutRequest
221
216
// Drain n - 1 message to get latest config
222
217
for i < (length - 1 ) {
223
218
<- rpc .cfgChannel
224
219
i ++
225
220
}
226
221
log .Printf ("Waiting for config.." )
227
- dialOutCfg = <- rpc .cfgChannel
228
- log .Printf ("Read config.. : %v" , * dialOutCfg )
222
+ cfgReq = <- rpc .cfgChannel
223
+ log .Printf ("Read config.. : %v" , * cfgReq )
229
224
230
225
req := & gnmi.SubscribeRequest {}
231
226
req = & gnmi.SubscribeRequest {Request : & gnmi.SubscribeRequest_Subscribe {
@@ -235,7 +230,7 @@ func (s *dialoutServerT) DialOutSubscriber(stream gnmi_dialout.Subscriber_DialOu
235
230
},
236
231
}}
237
232
subReq := req .Request .(* gnmi.SubscribeRequest_Subscribe )
238
- subReq .Subscribe .Subscription , err = xPathsTognmiSubscription (dialOutCfg .Paths )
233
+ subReq .Subscribe .Subscription , err = xPathsTognmiSubscription (nil , cfgReq .Paths )
239
234
if err != nil {
240
235
//log.Printf("Host: %v, Invalid path config: %s", cn, cfg)
241
236
jLog (s .jctx , fmt .Sprintf ("Host: %v, Invalid path config: %s" , cn , "foo...." ))
@@ -247,7 +242,7 @@ func (s *dialoutServerT) DialOutSubscriber(stream gnmi_dialout.Subscriber_DialOu
247
242
248
243
for {
249
244
select {
250
- case dialOutCfg = <- rpc .cfgChannel :
245
+ case cfgReq = <- rpc .cfgChannel :
251
246
removeRpcFromDevice (rpc )
252
247
stream .Context ().Done ()
253
248
case producerError := <- (* s .dataProducer ).Errors ():
@@ -268,13 +263,16 @@ func (s *dialoutServerT) DialOutSubscriber(stream gnmi_dialout.Subscriber_DialOu
268
263
continue
269
264
}
270
265
271
- payload , err := proto .Marshal (rspFromDevice )
266
+ // TODO: Vivek Take data topic from cmd line
267
+ var dialOutRsp dialout.DialOutResponse
268
+ dialOutRsp .Device = cn
269
+ dialOutRsp .DialOutContext = cfgReq .DialOutContext
270
+ dialOutRsp .Response = append (dialOutRsp .Response , rspFromDevice )
271
+ payload , err := proto .Marshal (& dialOutRsp )
272
272
if err != nil {
273
273
log .Printf ("Marshalling failed for %s, len: %v\n " , rspString , len (rspString ))
274
274
continue
275
275
}
276
-
277
- // TODO: Vivek Take data topic from cmd line
278
276
(* s .dataProducer ).Input () <- & sarama.ProducerMessage {Topic : "gnmi-data" , Key : sarama .ByteEncoder (cn ), Value : sarama .ByteEncoder (payload )}
279
277
}
280
278
}
@@ -295,8 +293,8 @@ func consumePartition(server *dialoutServerT, topic string, partition int32, off
295
293
var tmpDeviceName string
296
294
for msg := range partitionConsumer .Messages () {
297
295
log .Printf ("topic: %v, partition: %v, offset: %v, msg key: %v, msg val: %v" , topic , partition , offset , string (msg .Key ), string (msg .Value ))
298
- var dialOutCfg dialOutConfigT
299
- err = json .Unmarshal (msg .Value , & dialOutCfg )
296
+ var dialOutCfg dialout. DialOutRequest
297
+ err = proto .Unmarshal (msg .Value , & dialOutCfg )
300
298
log .Printf ("dialOutCfg: %v" , dialOutCfg .Paths [0 ].Path )
301
299
if err != nil {
302
300
jLog (jctx , fmt .Sprintf ("Unmarshalling dialout config failed, ignoring" ))
0 commit comments