Skip to content

Commit 902db73

Browse files
committed
Combined pull/pub modules and added a req module to give out object data.
1 parent 25bc6ab commit 902db73

File tree

7 files changed

+960
-264
lines changed

7 files changed

+960
-264
lines changed

mods/common.c

+271-55
Original file line numberDiff line numberDiff line change
@@ -11,81 +11,297 @@
1111
#include "naginclude/nagios.h"
1212
#include "naginclude/objects.h"
1313
#include "naginclude/broker.h"
14+
#include "naginclude/skiplist.h"
1415
#include <zmq.h>
1516
#include "json.h"
17+
#include "jansson.h"
1618

17-
int setup_zmq(char * args, int type,
18-
void ** ctxo, void ** socko) {
19-
void * ctx = NULL, *sock = NULL;
20-
char * bindto = NULL, *connectto = NULL;
21-
int numthreads = 1, hwm = 0;
22-
char * name, *val;
23-
24-
while(*args != '\0') {
25-
name = args;
26-
while(*args != ',' && *args != '\0') {
27-
if(*args == '=') {
28-
*args = '\0';
29-
val = args + 1;
30-
}
31-
args++;
32-
}
33-
*args = '\0';
34-
if(strcmp(name, "bind") == 0)
35-
bindto = val;
36-
else if(strcmp(name, "connect") == 0)
37-
connectto = val;
38-
else if(strcmp(name, "iothreads") == 0)
39-
numthreads = atoi(val);
40-
else if(strcmp(name, "hwm") == 0)
41-
hwm = atoi(val);
19+
NEB_API_VERSION(CURRENT_NEB_API_VERSION)
20+
static skiplist * lock_skiplist;
21+
static void * nagmq_handle = NULL;
22+
static pthread_t threadid;
23+
static pthread_cond_t init_cond = PTHREAD_COND_INITIALIZER;
24+
static pthread_mutex_t recv_loop_mutex = PTHREAD_MUTEX_INITIALIZER;
25+
void * zmq_ctx;
26+
json_t * config;
27+
28+
struct lock_skip_obj {
29+
char * host_name;
30+
char * service_description;
31+
pthread_mutex_t lock;
32+
};
33+
34+
int lock_obj_compare(void * ar, void * br) {
35+
struct lock_skip_obj *a = ar, *b = br;
36+
int r;
37+
if(a->service_description &&
38+
(r = strcmp(a->service_description, b->service_description)) != 0)
39+
return r;
40+
if((r = strcmp(a->host_name, b->host_name)) != 0)
41+
return r;
42+
return 0;
43+
}
44+
45+
void lock_obj(char * hostname, char * service) {
46+
if(!lock_skiplist)
47+
return;
48+
struct lock_skip_obj test = { hostname, service, PTHREAD_MUTEX_INITIALIZER };
49+
struct lock_skip_obj * lock = skiplist_find_first(lock_skiplist, &test, NULL);
50+
if(lock == NULL) {
51+
lock = malloc(sizeof(struct lock_skip_obj));
52+
lock->host_name = strdup(hostname);
53+
if(service)
54+
lock->service_description = strdup(service);
55+
else
56+
lock->service_description = NULL;
57+
pthread_mutex_init(&lock->lock, NULL);
58+
skiplist_insert(lock_skiplist, lock);
4259
}
4360

44-
if(!connectto && !bindto) {
45-
syslog(LOG_ERR, "Neither a connection or bind url was supplied to ZMQ");
46-
return -1;
61+
pthread_mutex_lock(&lock->lock);
62+
}
63+
64+
void unlock_obj(char * hostname, char * service) {
65+
if(!lock_skiplist)
66+
return;
67+
struct lock_skip_obj test = { hostname, service, PTHREAD_MUTEX_INITIALIZER };
68+
struct lock_skip_obj * lock = skiplist_find_first(lock_skiplist, &test, NULL);
69+
if(lock == NULL) {
70+
lock = malloc(sizeof(struct lock_skip_obj));
71+
lock->host_name = strdup(hostname);
72+
if(service)
73+
lock->service_description = strdup(service);
74+
else
75+
lock->service_description = NULL;
76+
pthread_mutex_init(&lock->lock, NULL);
77+
skiplist_insert(lock_skiplist, lock);
4778
}
4879

49-
ctx = zmq_init(numthreads);
50-
if(ctx == NULL) {
51-
syslog(LOG_ERR, "Error initialzing ZMQ context: %s",
52-
zmq_strerror(errno));
53-
return -1;
80+
pthread_mutex_unlock(&lock->lock);
81+
}
82+
83+
int nebmodule_deinit(int flags, int reason) {
84+
neb_deregister_module_callbacks(nagmq_handle);
85+
if(config)
86+
json_decref(config);
87+
struct lock_skip_obj * lock;
88+
while((lock = skiplist_pop(lock_skiplist)) != NULL) {
89+
if(lock->service_description)
90+
free(lock->service_description);
91+
free(lock->host_name);
92+
pthread_mutex_destroy(&lock->lock);
93+
free(lock);
5494
}
95+
skiplist_free(&lock_skiplist);
96+
return 0;
97+
}
98+
99+
int handle_pubstartup();
100+
void process_payload(struct payload * payload);
55101

56-
sock = zmq_socket(ctx, type);
102+
void * getsock(char * forwhat, int type) {
103+
json_t *connect = NULL, *bind = NULL;
104+
int hwm = 0;
105+
106+
if(json_unpack(config, "{ s?: { s?:o s?:o s?:i } }",
107+
forwhat, "connect", &connect, "bind", &bind, "hwm", &hwm) != 0) {
108+
syslog(LOG_ERR, "Parameter error while creating socket for %s",
109+
forwhat);
110+
return NULL;
111+
}
112+
113+
if(!connect && !bind)
114+
return NULL;
115+
116+
void * sock = zmq_socket(zmq_ctx, type);
57117
if(sock == NULL) {
58-
syslog(LOG_ERR, "Error creating ZMQ socket: %s",
59-
zmq_strerror(errno));
60-
zmq_term(ctx);
61-
return -1;
118+
syslog(LOG_ERR, "Error creating socket for %s: %s",
119+
forwhat, zmq_strerror(errno));
120+
return NULL;
62121
}
63122

64-
if(hwm > 0 && zmq_setsockopt(sock, ZMQ_HWM, &hwm, sizeof(hwm)) != 0) {
65-
syslog(LOG_ERR, "Error setting HWM to %d: %s",
66-
hwm, zmq_strerror(errno));
123+
if(hwm > 0 &&
124+
zmq_setsockopt(sock, ZMQ_HWM, &hwm, sizeof(hwm)) != 0) {
125+
syslog(LOG_ERR, "Error setting HWM for %s: %s",
126+
forwhat, zmq_strerror(errno));
67127
zmq_close(sock);
68-
zmq_term(ctx);
128+
return NULL;
129+
}
130+
131+
if(connect) {
132+
if(json_is_string(connect) && zmq_connect(sock,
133+
json_string_value(connect)) != 0) {
134+
syslog(LOG_ERR, "Error connecting %s to %s: %s",
135+
forwhat, json_string_value(connect), zmq_strerror(errno));
136+
zmq_close(sock);
137+
return NULL;
138+
} else if(json_is_array(connect)) {
139+
size_t i;
140+
for(i = 0; i < json_array_size(connect); i++) {
141+
json_t * target = json_array_get(connect, i);
142+
if(zmq_connect(sock, json_string_value(target)) != 0) {
143+
syslog(LOG_ERR, "Error connecting %s to %s: %s",
144+
forwhat, json_string_value(target), zmq_strerror(errno));
145+
zmq_close(sock);
146+
return NULL;
147+
}
148+
}
149+
}
150+
}
151+
152+
if(bind) {
153+
if(json_is_string(bind) && zmq_bind(sock,
154+
json_string_value(bind)) != 0) {
155+
syslog(LOG_ERR, "Error binding %s to %s: %s",
156+
forwhat, bind, zmq_strerror(errno));
157+
zmq_close(sock);
158+
return NULL;
159+
} else if(json_is_array(bind)) {
160+
size_t i;
161+
for(i = 0; i < json_array_size(bind); i++) {
162+
json_t * target = json_array_get(bind, i);
163+
if(zmq_bind(sock, json_string_value(target)) != 0) {
164+
syslog(LOG_ERR, "Error binding %s to %s: %s",
165+
forwhat, json_string_value(target), zmq_strerror(errno));
166+
zmq_close(sock);
167+
return NULL;
168+
}
169+
}
170+
}
171+
}
172+
173+
return sock;
174+
}
175+
176+
void process_pull_msg(void * sock);
177+
void process_req_msg(void * sock);
178+
179+
void * recv_loop(void * parg) {
180+
void * pullsock, * reqsock;
181+
int enablepull = 0, enablereq = 0;
182+
zmq_pollitem_t pollables[2];
183+
int npollables = 0;
184+
185+
if(json_unpack(config, "{ s?:{ s:b } s?:{ s:b } }",
186+
"pull", "enable", &enablepull,
187+
"reply", "enable", &enablereq) != 0) {
188+
syslog(LOG_ERR, "Parameter error while starting NagMQ");
189+
return NULL;
190+
}
191+
192+
if(enablepull) {
193+
pullsock = getsock("pull", ZMQ_PULL);
194+
if(!pullsock)
195+
return NULL;
196+
pollables[npollables].socket = pullsock;
197+
pollables[npollables++].events = ZMQ_POLLIN;
198+
}
199+
200+
if(enablereq) {
201+
reqsock = getsock("reply", ZMQ_REP);
202+
if(!reqsock) {
203+
if(pullsock)
204+
zmq_close(pullsock);
205+
return NULL;
206+
}
207+
pollables[npollables].socket = reqsock;
208+
pollables[npollables++].events = ZMQ_POLLIN;
209+
}
210+
211+
while(zmq_poll(pollables, npollables, -1) > -1) {
212+
int events;
213+
size_t size = sizeof(events);
214+
zmq_getsockopt(pullsock, ZMQ_EVENTS, &events, &size);
215+
if(events == ZMQ_POLLIN)
216+
process_pull_msg(pullsock);
217+
zmq_getsockopt(reqsock, ZMQ_EVENTS, &events, &size);
218+
if(events == ZMQ_POLLIN)
219+
process_req_msg(reqsock);
220+
}
221+
return NULL;
222+
}
223+
224+
extern void * pubext;
225+
226+
int handle_startup(int which, void * obj) {
227+
struct nebstruct_process_struct *ps = (struct nebstruct_process_struct *)obj;
228+
struct payload * payload;
229+
int numthreads = 1, enablepub = 0, enablepull = 0, enablereq = 0;
230+
231+
if(json_unpack(config, "{ s?:i, s?:{ s:b } s?:{ s:b } s?:{ s:b } }",
232+
"iothreads", &numthreads, "publish", "enable", &enablepub,
233+
"pull", "enable", &enablepull, "reply", "enable", &enablereq) != 0) {
234+
syslog(LOG_ERR, "Parameter error while starting NagMQ");
69235
return -1;
70236
}
71237

72-
if(connectto && zmq_connect(sock, connectto) < 0) {
73-
syslog(LOG_ERR, "Error connecting socket to %s: %s",
74-
connectto, zmq_strerror(errno));
75-
zmq_close(sock);
76-
zmq_term(ctx);
77-
return -1;
238+
if (ps->type == NEBTYPE_PROCESS_EVENTLOOPSTART) {
239+
if(!enablepub && !enablepull && !enablereq)
240+
return 0;
241+
242+
zmq_ctx = zmq_init(numthreads);
243+
if(zmq_ctx == NULL) {
244+
syslog(LOG_ERR, "Error initialzing ZMQ: %s",
245+
zmq_strerror(errno));
246+
return -1;
247+
}
248+
249+
if(enablepub && handle_pubstartup() < 0)
250+
return -1;
251+
252+
if(!enablepull && !enablereq)
253+
return 0;
254+
255+
pthread_mutex_lock(&recv_loop_mutex);
256+
if(pthread_create(&threadid, NULL, recv_loop, NULL) < 0) {
257+
syslog(LOG_ERR, "Error creating ZMQ recv loop: %m");
258+
return -1;
259+
}
260+
pthread_cond_wait(&init_cond, &recv_loop_mutex);
261+
pthread_detach(threadid);
262+
263+
if(enablepub) {
264+
payload = payload_new();
265+
payload_new_string(payload, "type", "eventloopstart");
266+
payload_new_timestamp(payload, "timestamp", &ps->timestamp);
267+
payload_finalize(payload);
268+
process_payload(payload);
269+
}
270+
} else if(ps->type == NEBTYPE_PROCESS_EVENTLOOPEND) {
271+
if(enablepub) {
272+
payload = payload_new();
273+
payload_new_string(payload, "type", "eventloopend");
274+
payload_new_timestamp(payload, "timestamp", &ps->timestamp);
275+
payload_finalize(payload);
276+
process_payload(payload);
277+
}
278+
279+
zmq_close(pubext);
280+
zmq_term(zmq_ctx);
281+
pthread_join(threadid, NULL);
78282
}
283+
return 0;
284+
}
79285

80-
if(bindto && zmq_bind(sock, bindto) < 0) {
81-
syslog(LOG_ERR, "Error binding socket to %s: %s",
82-
bindto, zmq_strerror(errno));
83-
zmq_close(sock);
84-
zmq_term(ctx);
286+
int nebmodule_init(int flags, char * localargs, nebmodule * handle) {
287+
json_error_t loaderr;
288+
neb_set_module_info(handle, NEBMODULE_MODINFO_TITLE, "nagmq subscriber");
289+
neb_set_module_info(handle, NEBMODULE_MODINFO_AUTHOR, "Jonathan Reams");
290+
neb_set_module_info(handle, NEBMODULE_MODINFO_VERSION, "0.8");
291+
neb_set_module_info(handle, NEBMODULE_MODINFO_LICENSE, "Apache v2");
292+
neb_set_module_info(handle, NEBMODULE_MODINFO_DESC,
293+
"Subscribes to Nagios data on 0MQ");
294+
295+
config = json_load_file(localargs, 0, &loaderr);
296+
if(config == NULL) {
297+
syslog(LOG_ERR, "Error loading NagMQ config: %s (at %d:%d)",
298+
loaderr.text, loaderr.line, loaderr.column);
85299
return -1;
86300
}
87-
*ctxo = ctx;
88-
*socko = sock;
301+
302+
neb_register_callback(NEBCALLBACK_PROCESS_DATA, handle,
303+
0, handle_startup);
304+
89305
return 0;
90306
}
91307

mods/json.c

+25-1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ struct payload * payload_new() {
2323
}
2424

2525
void payload_add_key(struct payload * po, char * key) {
26+
if(key == NULL)
27+
return;
2628
size_t keylen = strlen(key);
2729
adjust_payload_len(po, keylen + sizeof("\"\": "));
2830
po->bufused += sprintf(po->json_buf + po->bufused,
@@ -121,7 +123,29 @@ void payload_new_timestamp(struct payload * po,
121123
po->bufused -= 2;
122124
po->bufused += sprintf(po->json_buf + (po->bufused),
123125
" }, ");
124-
}
126+
}
127+
128+
void payload_start_array(struct payload * po, char * key) {
129+
payload_add_key(po, key);
130+
adjust_payload_len(po, sizeof("[ "));
131+
po->bufused += sprintf(po->json_buf + po->bufused, "[ ");
132+
}
133+
134+
void payload_end_array(struct payload * po) {
135+
adjust_payload_len(po, sizeof(", "));
136+
sprintf(po->json_buf + po->bufused - 2, " ], ");
137+
}
138+
139+
void payload_start_object(struct payload * po, char * key) {
140+
payload_add_key(po, key);
141+
adjust_payload_len(po, sizeof("{ "));
142+
po->bufused += sprintf(po->json_buf + po->bufused, "{ ");
143+
}
144+
145+
void payload_end_object(struct payload * po) {
146+
adjust_payload_len(po, sizeof(", "));
147+
sprintf(po->json_buf + po->bufused - 2, " }, ");
148+
}
125149

126150
void payload_finalize(struct payload * po) {
127151
sprintf(po->json_buf + po->bufused - 2, " }");

0 commit comments

Comments
 (0)