Skip to content

Commit 946f91e

Browse files
committed
Adding ZeroMQ 4 support
This build verifies that everything compiles okay against ZeroMQ 4, and adds the security features that come with it. Both the executor (mqexec) and the event broker module now support curve authentication/encryption. Authentication is done via an SSH authorized_keys-style file configured at runtime.
1 parent e0431c4 commit 946f91e

File tree

7 files changed

+382
-13
lines changed

7 files changed

+382
-13
lines changed

dnxmq/mqbroker.c

+4-4
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ void parse_sock_directive(json_t * arg, zmq_pollitem_t * pollable, int * noblock
9191
char *type;
9292
#if ZMQ_VERSION_MAJOR == 2
9393
int64_t hwm = 0, swap = 0, affinity = 0;
94-
#elif ZMQ_VERSION_MAJOR == 3
94+
#elif ZMQ_VERSION_MAJOR >= 3
9595
int sndhwm = -1, rcvhwm = -1, maxmsgsize = 0, backlog = 0;
9696
int64_t affinity = 0;
9797
json_t * accept_filters = NULL;
@@ -103,7 +103,7 @@ void parse_sock_directive(json_t * arg, zmq_pollitem_t * pollable, int * noblock
103103
"type", &type, "connect", &connect, "bind", &bind,
104104
"subscribe", &subscribe, "hwm", &hwm, "swap", &swap,
105105
"affinity", &affinity, "noblock", noblock) != 0)
106-
#elif ZMQ_VERSION_MAJOR == 3
106+
#elif ZMQ_VERSION_MAJOR >= 3
107107
if(json_unpack(arg, "{s:s s?:o s?:o s?:o s?i s?i s?i s?b s?i s?o s?i}",
108108
"type", &type, "connect", &connect, "bind", &bind,
109109
"subscribe", &subscribe, "sndhwm", &sndhwm, "rcvhwm", &rcvhwm,
@@ -153,7 +153,7 @@ void parse_sock_directive(json_t * arg, zmq_pollitem_t * pollable, int * noblock
153153
zmq_setsockopt(sock, ZMQ_HWM, &hwm, sizeof(hwm));
154154
if(swap > 0)
155155
zmq_setsockopt(sock, ZMQ_SWAP, &swap, sizeof(swap));
156-
#elif ZMQ_VERSION_MAJOR == 3
156+
#elif ZMQ_VERSION_MAJOR >= 3
157157
if(sndhwm > -1)
158158
zmq_setsockopt(sock, ZMQ_SNDHWM, &sndhwm, sizeof(sndhwm));
159159
if(rcvhwm > -1)
@@ -199,7 +199,7 @@ void do_forward(void * in, void *out, void *mon, int noblock, int monnoblock) {
199199
int rc;
200200
#if ZMQ_VERSION_MAJOR == 2
201201
int64_t rcvmore;
202-
#elif ZMQ_VERSION_MAJOR == 3
202+
#elif ZMQ_VERSION_MAJOR >= 3
203203
int rcvmore;
204204
#endif
205205
size_t size = sizeof(rcvmore);

dnxmq/mqexec.c

+54-3
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,9 @@ ev_io pullio;
4343
char * rootpath = NULL, *unprivpath = NULL;
4444
size_t rootpathlen = 0, unprivpathlen = 0;
4545
uid_t runas = 0;
46+
#if ZMQ_VERSION_MAJOR > 3
47+
char * curve_private = NULL, *curve_public = NULL, *curve_server = NULL;
48+
#endif
4649

4750
struct child_job {
4851
json_t * input;
@@ -512,7 +515,7 @@ void recv_job_cb(struct ev_loop * loop, ev_io * i, int event) {
512515
zmq_msg_t inmsg;
513516
#if ZMQ_VERSION_MAJOR == 2
514517
int64_t rcvmore = 0;
515-
#elif ZMQ_VERSION_MAJOR == 3
518+
#elif ZMQ_VERSION_MAJOR >= 3
516519
int rcvmore = 0;
517520
#endif
518521
size_t rms = sizeof(rcvmore);
@@ -549,13 +552,35 @@ void parse_sock_directive(void * socket, json_t * arg, int bind) {
549552
zmq_bind(socket, json_string_value(arg));
550553
else
551554
zmq_connect(socket, json_string_value(arg));
555+
#if ZMQ_VERSION_MAJOR > 3
556+
if(curve_private) {
557+
zmq_setsockopt(socket, ZMQ_CURVE_SECRETKEY,
558+
curve_private, strlen(curve_private));
559+
zmq_setsockopt(socket, ZMQ_CURVE_PUBLICKEY,
560+
curve_public, strlen(curve_public));
561+
zmq_setsockopt(socket, ZMQ_CURVE_SERVERKEY,
562+
curve_server, strlen(curve_server));
563+
}
564+
#endif
552565
} else if(json_is_object(arg)) {
553566
char * addr = NULL;
554567
json_t * subscribe = NULL;
555568
if(json_unpack(arg, "{s:s s?:b s?:o}",
556569
"address", &addr, "bind", &bind, "subscribe",
557570
&subscribe) != 0)
558571
return;
572+
573+
#if ZMQ_VERSION_MAJOR > 3
574+
if(curve_private) {
575+
zmq_setsockopt(socket, ZMQ_CURVE_SECRETKEY,
576+
curve_private, strlen(curve_private));
577+
zmq_setsockopt(socket, ZMQ_CURVE_PUBLICKEY,
578+
curve_public, strlen(curve_public));
579+
zmq_setsockopt(socket, ZMQ_CURVE_SERVERKEY,
580+
curve_server, strlen(curve_server));
581+
}
582+
#endif
583+
559584
if(bind)
560585
zmq_bind(socket, addr);
561586
else
@@ -610,6 +635,7 @@ int main(int argc, char ** argv) {
610635
json_error_t config_err;
611636
char ch, *configobj = "executor", *tmprootpath = NULL,
612637
*tmpunprivpath = NULL, *tmpunprivuser = NULL;
638+
json_error_t jsonerr;
613639

614640
while((ch = getopt(argc, argv, "vsdhc:")) != -1) {
615641
switch(ch) {
@@ -657,15 +683,31 @@ int main(int argc, char ** argv) {
657683
exit(1);
658684
}
659685

660-
if(json_unpack(config, "{s:{s?:o s:o s?i s?b s?b s?:o s?o s?s s?s s?s}}",
686+
#if ZMQ_VERSION_MAJOR < 4
687+
if(json_unpack_ex(config, &jsonerr, 0,
688+
"{s:{s?:o s:o s?i s?b s?b s?:o s?o s?s s?s s?s}}",
661689
configobj, "jobs", &jobs, "results", &results,
662690
"iothreads", &iothreads, "verbose", &verbose,
663691
"syslog", &usesyslog, "filter", &filter,
664692
"publisher", &publisher, "rootpath", &tmprootpath,
665693
"unprivpath", &tmpunprivpath, "unprivuser", &tmpunprivuser) != 0) {
666-
logit(ERR, "Error getting config");
694+
logit(ERR, "Error getting config %s", jsonerr.text);
667695
exit(-1);
668696
}
697+
#else
698+
if(json_unpack_ex(config, &jsonerr, 0,
699+
"{s:{s?:o s:o s?i s?b s?b s?:o s?o s?s s?s s?s s?{s:s s:s s:s}}}",
700+
configobj, "jobs", &jobs, "results", &results,
701+
"iothreads", &iothreads, "verbose", &verbose,
702+
"syslog", &usesyslog, "filter", &filter,
703+
"publisher", &publisher, "rootpath", &tmprootpath,
704+
"unprivpath", &tmpunprivpath, "unprivuser", &tmpunprivuser,
705+
"curve", "publickey", &curve_public, "privatekey", &curve_private,
706+
"serverkey", &curve_server) != 0) {
707+
logit(ERR, "Error getting config: %s", jsonerr.text);
708+
exit(-1);
709+
}
710+
#endif
669711

670712
parse_filter(filter,0);
671713

@@ -697,6 +739,15 @@ int main(int argc, char ** argv) {
697739
runas = pwdent->pw_uid;
698740
}
699741

742+
#if ZMQ_VERSION_MAJOR > 3
743+
if(curve_public)
744+
curve_public = strdup(curve_public);
745+
if(curve_private)
746+
curve_private = strdup(curve_private);
747+
if(curve_server)
748+
curve_server = strdup(curve_server);
749+
#endif
750+
700751
zmqctx = zmq_init(iothreads);
701752
if(zmqctx == NULL)
702753
exit(-1);

0 commit comments

Comments
 (0)