@@ -144,7 +144,7 @@ server_t<request_container_t, request_info_t>::server_t(
144
144
uint32_t request_timeout,
145
145
const health_check_matcher_t & health_check_matcher,
146
146
const std::string& health_check_response)
147
- : client(context, ZMQ_STREAM), proxy(context, ZMQ_DEALER), loopback(context, ZMQ_SUB ),
147
+ : client(context, ZMQ_STREAM), proxy(context, ZMQ_DEALER), loopback(context, ZMQ_PULL ),
148
148
interrupt (context, ZMQ_PUB), log(log), max_request_size(max_request_size),
149
149
request_timeout(request_timeout), request_id(0 ), health_check_matcher(health_check_matcher),
150
150
health_check_response(health_check_response.size(), health_check_response.data()) {
@@ -158,9 +158,7 @@ server_t<request_container_t, request_info_t>::server_t(
158
158
proxy.setsockopt (ZMQ_SNDHWM, &disabled, sizeof (disabled));
159
159
proxy.connect (proxy_endpoint.c_str ());
160
160
161
- // TODO: consider making this into a pull socket so we dont lose any results due to timing
162
161
loopback.setsockopt (ZMQ_RCVHWM, &disabled, sizeof (disabled));
163
- loopback.setsockopt (ZMQ_SUBSCRIBE, " " , 0 );
164
162
loopback.bind (result_endpoint.c_str ());
165
163
166
164
interrupt.setsockopt (ZMQ_SNDHWM, &disabled, sizeof (disabled));
@@ -468,7 +466,7 @@ worker_t::worker_t(zmq::context_t& context,
468
466
const cleanup_function_t & cleanup_function,
469
467
const std::string& heart_beat)
470
468
: upstream_proxy(context, ZMQ_DEALER), downstream_proxy(context, ZMQ_DEALER),
471
- loopback (context, ZMQ_PUB ), interrupt(context, ZMQ_SUB), work_function(work_function),
469
+ loopback (context, ZMQ_PUSH ), interrupt(context, ZMQ_SUB), work_function(work_function),
472
470
cleanup_function(cleanup_function), heart_beat_interval(5000 ), heart_beat(heart_beat),
473
471
job(std::numeric_limits<decltype(job)>::max()) {
474
472
0 commit comments