Skip to content

Commit

Permalink
zmq: fix exit hanging problem (#5)
Browse files Browse the repository at this point in the history
  • Loading branch information
ymjiang authored Oct 14, 2019
1 parent 92e012f commit 01f5730
Showing 1 changed file with 5 additions and 3 deletions.
8 changes: 5 additions & 3 deletions src/zmq_van.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ class ZMQVan : public Van {
// join all threads
should_stop_ = true;
for (auto t : thread_list_) t->join();
PS_VLOG(1) << my_node_.ShortDebugString() << " all threads joined and destroyed";
// close sockets
int linger = 0;
int rc = zmq_setsockopt(receiver_, ZMQ_LINGER, &linger, sizeof(linger));
Expand Down Expand Up @@ -285,7 +286,7 @@ class ZMQVan : public Van {
CHECK(socket);
LOG(INFO) << "Start ZMQ recv thread";

while (!should_stop_) {
while (true) {
ZmqBufferContext *buf_ctx = new ZmqBufferContext();

for (int i = 0;; ++i) {
Expand All @@ -295,7 +296,7 @@ class ZMQVan : public Van {
std::lock_guard<std::mutex> lk(mu_);
// the zmq_msg_recv should be non-blocking, otherwise deadlock will happen
int tag = ZMQ_DONTWAIT;
if (zmq_msg_recv(zmsg, socket, tag) != -1) break;
if (should_stop_ || zmq_msg_recv(zmsg, socket, tag) != -1) break;
if (errno == EINTR) {
std::cout << "interrupted";
continue;
Expand All @@ -305,6 +306,7 @@ class ZMQVan : public Van {
CHECK(0) << "failed to receive message. errno: " << errno << " "
<< zmq_strerror(errno);
}
if (should_stop_) break;
char* buf = CHECK_NOTNULL((char*)zmq_msg_data(zmsg));
size_t size = zmq_msg_size(zmsg);

Expand All @@ -327,7 +329,7 @@ class ZMQVan : public Van {
if (!more) break;
}
} // for

if (should_stop_) break;
recv_buffers_.Push(*buf_ctx);
} // while
}
Expand Down

0 comments on commit 01f5730

Please sign in to comment.