-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathisoblue-uservice.js
244 lines (216 loc) · 5.48 KB
/
isoblue-uservice.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
"use strict";
// Packages
var fs = require("fs");
var avro = require("avsc");
var kafka = require("node-rdkafka");
var oada = require("@oada/oada-cache").default;
// Logging
var logInfo = require("debug")("isoblue-data-uservice:info");
var logError = require("debug")("isoblue-data-uservice:error");
// Parameters
const topics = ["remote", "debug"]; // Kafka topics
const kafka_group_id = process.env.KAFKA_GROUP_ID || "isoblue-gps"; // Kafka group ID
const domain = process.env.OADA_URL; // OADA server URL
const token = process.env.OADA_TOKEN || "abc"; // OADA token
const kafka_broker = process.env.KAFKA_BROKER;
// avro
const type_gps = avro.Type.forSchema(
JSON.parse(fs.readFileSync("./schema/gps.avsc")),
);
const type_hb = avro.Type.forSchema(
JSON.parse(fs.readFileSync("./schema/d_hb.avsc")),
);
var tree = {
bookmarks: {
_type: "application/vnd.oada.bookmarks.1+json",
_rev: 0,
isoblue: {
_type: "application/vnd.oada.isoblue.1+json",
_rev: 0,
"device-index": {
"*": {
_type: "application/vnd.oada.isoblue.device.1+json",
_rev: 0,
"*": {
_type: "application/vnd.oada.isoblue.dataset.1+json",
_rev: 0,
"day-index": {
"*": {
_type: "application/vnd.oada.isoblue.day.1+json",
_rev: 0,
"hour-index": {
"*": {
_type: "application/vnd.oada.isoblue.hour.1+json",
},
},
},
},
},
},
},
},
},
};
var connectionArgs = {
websocket: false,
domain,
token,
cache: false,
};
// Process GPS message
function handleGPSmsg(conn, m) {
const key_split = m.key.toString().split(":");
const isoblueId = key_split[1]; // ISOBlue ID
var gps_datum;
try {
gps_datum = type_gps.fromBuffer(m.value); // deserialize
} catch (err) {
logError("Failed to deserialize a GPS message.");
return;
}
if (gps_datum.gps.object_name !== "TPV") {
return;
}
// read fields
const gps_tpv_datum = gps_datum.gps.object.tpv_record;
const genTime = gps_tpv_datum["time"];
const lat = gps_tpv_datum["lat"];
const lng = gps_tpv_datum["lon"];
// log
logInfo(`Rcvd GPS: t=${genTime} lat=${lat} lon=${lng}`);
/* get the day bucket from generation timestamp */
const ts = new Date(genTime * 1000);
const date = ts.toISOString().split("T")[0];
const hour =
ts
.toISOString()
.split("T")[1]
.split(":")[0] + ":00";
/* construct the JSON object */
const data = {
"sec-index": {
[genTime]: {
lat: lat,
lng: lng,
},
},
};
// path
const path =
`/bookmarks/isoblue/device-index/${isoblueId}/location/day-index/${date}/` +
`hour-index/${hour}`;
// put
conn
.put({
tree,
path,
data,
})
.catch(err => {
logError(err);
});
}
// Process heartbeat message
function handleHBmsg(conn, m) {
const key_split = m.key.toString().split(":");
const isoblueId = key_split[1]; // ISOBlue ID
var hb_datum;
try {
hb_datum = type_hb.fromBuffer(m.value); // deserialize
} catch (err) {
logError("Failed to deserialize a heartbeat message.");
return;
}
/* read each field */
const genTime = hb_datum["timestamp"];
const cellrssi = hb_datum["cellns"];
const wifirssi = hb_datum["wifins"];
const statled = hb_datum["statled"];
const netled = hb_datum["netled"];
// log
logInfo(`Rcvd HB: t=${genTime}`);
/* get the day bucket from generation timestamp */
const ts = new Date(genTime * 1000);
const date = ts.toISOString().split("T")[0];
const hour =
ts
.toISOString()
.split("T")[1]
.split(":")[0] + ":00";
/* we have a heartbeat message, get the current (recv) timestamp */
const recTime = Date.now() / 1000;
/* construct the JSON object */
const data = {
"sec-index": {
[genTime]: {
genTime: genTime,
recTime: recTime,
interfaces: [
{ type: "cellular", rssi: cellrssi },
{ type: "wifi", rssi: wifirssi },
],
ledStatuses: [
{ name: "net", status: netled },
{ name: "stat", status: statled },
],
},
},
};
// path
var path =
`/bookmarks/isoblue/device-index/${isoblueId}/heartbeat/day-index/${date}/` +
`hour-index/${hour}`;
// put
conn
.put({
tree,
path,
data,
})
.catch(err => {
logError(err);
});
}
oada.connect(connectionArgs).then(conn => {
logInfo("Connected to OADA server.");
var consumer = new kafka.KafkaConsumer(
{
//debug: "all",
"group.id": kafka_group_id,
"metadata.broker.list": kafka_broker,
"enable.auto.commit": false,
},
{
"auto.offset.reset": "latest",
},
{ encoding: "buffer" },
);
//log debug messages
consumer.on("event.log", function(log) {
logInfo(log);
});
// log error messages
consumer.on("event.error", function(err) {
logError("Error from consumer");
logError(err);
});
// ready
consumer.on("ready", function(args) {
logInfo("Kafka ready.");
logInfo(args);
consumer.subscribe(topics);
consumer.consume();
});
// handle data
consumer.on("data", function(m) {
var key_split = m.key.toString().split(":");
if (key_split[0] == "gps") {
handleGPSmsg(conn, m);
} else if (key_split[0] == "hb") {
handleHBmsg(conn, m);
}
return;
});
// connect
consumer.connect();
});