#include #include #include #include static char const * const address = "tcp://127.0.0.1:9001"; static char const send_buffer1[] = "foo"; static char const send_buffer2[] = "bar"; static char receive_buffer1[4U]; static char receive_buffer2[4U]; typedef struct { void * ctx; char const * address; } thread_data; static void * sending_thread(void * data) { thread_data const * d = data; void * const socket = zmq_socket(d->ctx, ZMQ_REQ); assert(NULL != socket); int const connect_result = zmq_connect(socket, d->address); assert(-1 != connect_result); int const send_result_should_succeed = zmq_send_const(socket, send_buffer1, sizeof(send_buffer1), 0); assert(((size_t) send_result_should_succeed) == sizeof(send_buffer1)); int const send_result_should_fail = zmq_send_const(socket, send_buffer2, sizeof(send_buffer2), 0); { int const saved_errno_value = errno; assert(-1 == send_result_should_fail); assert(EFSM == saved_errno_value); } int const disconnect_result = zmq_disconnect(socket, d->address); assert(-1 != disconnect_result); int const close_response = zmq_close(socket); assert(0 == close_response); return NULL; } static void * receiving_thread(void * data) { thread_data const * d = data; void * const socket = zmq_socket(d->ctx, ZMQ_REP); assert(NULL != socket); int const bind_result = zmq_bind(socket, d->address); assert(-1 != bind_result); { int const recv_result_should_succeed = zmq_recv(socket, receive_buffer1, sizeof(receive_buffer1), 0); assert(((size_t) recv_result_should_succeed) == sizeof(receive_buffer1)); assert(receive_buffer1[0U] == send_buffer1[0U]); assert(receive_buffer1[1U] == send_buffer1[1U]); assert(receive_buffer1[2U] == send_buffer1[2U]); assert(receive_buffer1[3U] == send_buffer1[3U]); } { int const recv_result_should_fail = zmq_recv(socket, receive_buffer2, sizeof(receive_buffer2), 0); { int const saved_errno_value = errno; assert(-1 == recv_result_should_fail); assert(EFSM == saved_errno_value); } } int const unbind_result = zmq_unbind(socket, d->address); assert(-1 != unbind_result); int const close_response = zmq_close(socket); assert(0 == close_response); return NULL; } int main(void) { int ii; void * const ctx = zmq_ctx_new(); assert(NULL != ctx); pthread_t sender; pthread_t receiver; thread_data d = { .ctx = ctx, .address = address }; ii = pthread_create(&receiver, NULL, receiving_thread, &d); assert(0 == ii); ii = pthread_create(&sender, NULL, sending_thread, &d); assert(0 == ii); ii = pthread_join(receiver, NULL); assert(0 == ii); ii = pthread_join(sender, NULL); assert(0 == ii); /* * according to docs, destroying a context can be interrupted * let's loop if that happens, until we detroy the context * or fail with some other error */ { int saved_errno_value; do { saved_errno_value = 0; int const destroy_result = zmq_ctx_destroy(ctx); if (-1 == destroy_result) { saved_errno_value = errno; } } while (EINTR == saved_errno_value); assert(0 == saved_errno_value); } return 0; }