Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Problem: XPUB_MANUAL subscriptions not removed on peer term #2603

Merged
merged 2 commits into from
Jun 22, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions src/xpub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "pipe.hpp"
#include "err.hpp"
#include "msg.hpp"
#include "macros.hpp"

zmq::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
socket_base_t (parent_, tid_, sid_),
Expand Down Expand Up @@ -203,13 +204,24 @@ int zmq::xpub_t::xsetsockopt (int option_, const void *optval_,
return 0;
}

static void stub (unsigned char *data_, size_t size_, void *arg_)
{
LIBZMQ_UNUSED(data_);
LIBZMQ_UNUSED(size_);
LIBZMQ_UNUSED(arg_);
}

void zmq::xpub_t::xpipe_terminated (pipe_t *pipe_)
{
if (manual)
{
// Remove the pipe from the trie and send corresponding manual
// unsubscriptions upstream.
manual_subscriptions.rm (pipe_, send_unsubscription, this, false);
// Remove pipe without actually sending the message as it was taken
// care of by the manual call above. subscriptions is the real mtrie,
// so the pipe must be removed from there or it will be left over.
subscriptions.rm (pipe_, stub, NULL, false);
}
else
{
Expand Down
127 changes: 122 additions & 5 deletions tests/test_xpub_manual.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,10 @@ int test_basic()
// Create a publisher
void *pub = zmq_socket (ctx, ZMQ_XPUB);
assert (pub);
int rc = zmq_bind (pub, "inproc://soname");
assert (rc == 0);

// set pub socket options
int manual = 1;
rc = zmq_setsockopt(pub, ZMQ_XPUB_MANUAL, &manual, 4);
int rc = zmq_setsockopt(pub, ZMQ_XPUB_MANUAL, &manual, 4);
assert (rc == 0);
rc = zmq_bind (pub, "inproc://soname");
assert (rc == 0);

// Create a subscriber
Expand Down Expand Up @@ -468,13 +466,132 @@ int test_missing_subscriptions(void)
}


int test_unsubscribe_cleanup (void)
{
size_t len = MAX_SOCKET_STRING;
char my_endpoint[MAX_SOCKET_STRING];

void *ctx = zmq_ctx_new ();
assert (ctx);

// Create a publisher
void *pub = zmq_socket (ctx, ZMQ_XPUB);
assert (pub);
int manual = 1;
int rc = zmq_setsockopt (pub, ZMQ_XPUB_MANUAL, &manual, 4);
assert (rc == 0);
rc = zmq_bind (pub, "tcp://127.0.0.1:*");
assert (rc == 0);
rc = zmq_getsockopt (pub, ZMQ_LAST_ENDPOINT, my_endpoint, &len);
assert (rc == 0);

// Create a subscriber
void *sub = zmq_socket (ctx, ZMQ_XSUB);
assert (sub);
rc = zmq_connect (sub, my_endpoint);
assert (rc == 0);

// Subscribe for A
char subscription[2] = { 1, 'A'};
rc = zmq_send_const (sub, subscription, 2, 0);
assert (rc == 2);

char buffer[2];

// Receive subscriptions from subscriber
rc = zmq_recv(pub, buffer, 2, 0);
assert (rc == 2);
assert (buffer[0] == 1);
assert (buffer[1] == 'A');
rc = zmq_setsockopt (pub, ZMQ_SUBSCRIBE, "XA", 2);
assert (rc == 0);

// send 2 messages
rc = zmq_send_const (pub, "XA", 2, 0);
assert (rc == 2);
rc = zmq_send_const (pub, "XB", 2, 0);
assert (rc == 2);

// receive the single message
rc = zmq_recv (sub, buffer, 2, 0);
assert (rc == 2);
assert (buffer[0] == 'X');
assert (buffer[1] == 'A');

// should be nothing left in the queue
rc = zmq_recv (sub, buffer, 2, ZMQ_DONTWAIT);
assert (rc == -1);

// close the socket
rc = zmq_close (sub);
assert (rc == 0);

// closing the socket will result in an unsubscribe event
rc = zmq_recv (pub, buffer, 2, 0);
assert (rc == 2);
assert (buffer[0] == 0);
assert (buffer[1] == 'A');

// this doesn't really do anything
// there is no last_pipe set it will just fail silently
rc = zmq_setsockopt (pub, ZMQ_UNSUBSCRIBE, "XA", 2);
assert (rc == 0);

// reconnect
sub = zmq_socket (ctx, ZMQ_XSUB);
rc = zmq_connect (sub, my_endpoint);
assert (rc == 0);

// send a subscription for B
subscription[0] = 1;
subscription[1] = 'B';
rc = zmq_send (sub, subscription, 2, 0);
assert (rc == 2);

// receive the subscription, overwrite it to XB
rc = zmq_recv (pub, buffer, 2, 0);
assert (rc == 2);
assert (buffer[0] == 1);
assert(buffer[1] == 'B');
rc = zmq_setsockopt (pub, ZMQ_SUBSCRIBE, "XB", 2);
assert (rc == 0);

// send 2 messages
rc = zmq_send_const (pub, "XA", 2, 0);
assert (rc == 2);
rc = zmq_send_const (pub, "XB", 2, 0);
assert (rc == 2);

// receive the single message
rc = zmq_recv (sub, buffer, 2, 0);
assert (rc == 2);
assert (buffer[0] == 'X');
assert (buffer[1] == 'B'); // this assertion will fail

// should be nothing left in the queue
rc = zmq_recv (sub, buffer, 2, ZMQ_DONTWAIT);
assert (rc == -1);

// Clean up.
rc = zmq_close (pub);
assert (rc == 0);
rc = zmq_close (sub);
assert (rc == 0);
rc = zmq_ctx_term (ctx);
assert (rc == 0);

return 0 ;
}


int main(void)
{
setup_test_environment ();
test_basic ();
test_unsubscribe_manual ();
test_xpub_proxy_unsubscribe_on_disconnect ();
test_missing_subscriptions ();
test_unsubscribe_cleanup ();

return 0;
}