#include #include #include #include #include static char const * const address = "tcp://127.0.0.1:9001"; static char const send_buffer1[] = "foo"; static char const send_buffer2[] = "bar"; static char receive_buffer1[4U]; static char receive_buffer2[4U]; typedef struct { char const * address; } thread_data; static void * sending_thread(thread_data *d, std::promise &sender_finished) { void * const ctx = zmq_ctx_new(); assert(NULL != ctx); void * const socket = zmq_socket(ctx,ZMQ_REQ); assert(NULL != socket); int linger = -1; zmq_setsockopt (socket, ZMQ_LINGER, &linger, sizeof (linger)); int const connect_result = zmq_connect(socket,d->address); assert(-1 != connect_result); int const send_result_should_succeed = zmq_send_const(socket,send_buffer1,sizeof(send_buffer1),0); assert(((size_t)send_result_should_succeed) == sizeof(send_buffer1)); int const send_result_should_fail = zmq_send_const(socket,send_buffer2,sizeof(send_buffer2),0); { int const saved_errno_value = errno; assert(-1 == send_result_should_fail); assert(EFSM == saved_errno_value); } int const disconnect_result = zmq_disconnect(socket,d->address); assert(-1 != disconnect_result); int const close_response = zmq_close(socket); assert(0 == close_response); sender_finished.set_value (true); /* * according to docs, destroying a context can be interrupted * let's loop if that happens, until we detroy the context * or fail with some other error */ { int saved_errno_value; do { saved_errno_value = 0; int const destroy_result = zmq_ctx_destroy(ctx); if(-1 == destroy_result) { saved_errno_value = errno; } } while(EINTR == saved_errno_value); assert(0 == saved_errno_value); } return NULL; } static void * receiving_thread(thread_data *d, std::promise &receiver_ready, std::future &sender_finished) { void * const ctx = zmq_ctx_new(); assert(NULL != ctx); void * const socket = zmq_socket(ctx,ZMQ_REP); assert(NULL != socket); int linger = -1; zmq_setsockopt (socket, ZMQ_LINGER, &linger, sizeof (linger)); int const bind_result = zmq_bind(socket,d->address); assert(-1 != bind_result); receiver_ready.set_value(true); // Signal main thread that we are ready to accept connections. { int const recv_result_should_succeed = zmq_recv(socket,receive_buffer1,sizeof(receive_buffer1),0); assert(((size_t)recv_result_should_succeed) == sizeof(receive_buffer1)); assert(receive_buffer1[0U] == send_buffer1[0U]); assert(receive_buffer1[1U] == send_buffer1[1U]); assert(receive_buffer1[2U] == send_buffer1[2U]); assert(receive_buffer1[3U] == send_buffer1[3U]); } { int const recv_result_should_fail = zmq_recv(socket,receive_buffer2,sizeof(receive_buffer2),0); { int const saved_errno_value = errno; assert(-1 == recv_result_should_fail); assert(EFSM == saved_errno_value); } } sender_finished.wait (); int const unbind_result = zmq_unbind(socket,d->address); assert(-1 != unbind_result); int const close_response = zmq_close(socket); assert(0 == close_response); /* * according to docs, destroying a context can be interrupted * let's loop if that happens, until we detroy the context * or fail with some other error */ { int saved_errno_value; do { saved_errno_value = 0; int const destroy_result = zmq_ctx_destroy(ctx); if(-1 == destroy_result) { saved_errno_value = errno; } } while(EINTR == saved_errno_value); assert(0 == saved_errno_value); } return NULL; } int main(void) { thread_data d = { address }; std::promise receiver_ready, sender_finished; std::future receiver_ready_future(receiver_ready.get_future()); std::future sender_finished_future(sender_finished.get_future ()); std::thread receiver(receiving_thread,&d,std::ref(receiver_ready),std::ref(sender_finished_future)); receiver_ready_future.wait(); // Wait for receiver thread to be ready to accept connections. std::thread sender(sending_thread,&d,std::ref(sender_finished)); sender.join(); receiver.join(); return 0; }