Skip to content

Commit

Permalink
Merge pull request zeromq#2603 from bluca/xpub_manual_subs
Browse files Browse the repository at this point in the history
Problem: XPUB_MANUAL subscriptions not removed on peer term
  • Loading branch information
somdoron authored Jun 22, 2017
2 parents 2d83acc + 3536c4b commit f5da4b1
Show file tree
Hide file tree
Showing 2 changed files with 134 additions and 5 deletions.
12 changes: 12 additions & 0 deletions src/xpub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "pipe.hpp"
#include "err.hpp"
#include "msg.hpp"
#include "macros.hpp"

zmq::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
socket_base_t (parent_, tid_, sid_),
Expand Down Expand Up @@ -203,13 +204,24 @@ int zmq::xpub_t::xsetsockopt (int option_, const void *optval_,
return 0;
}

static void stub (unsigned char *data_, size_t size_, void *arg_)
{
LIBZMQ_UNUSED(data_);
LIBZMQ_UNUSED(size_);
LIBZMQ_UNUSED(arg_);
}

void zmq::xpub_t::xpipe_terminated (pipe_t *pipe_)
{
if (manual)
{
// Remove the pipe from the trie and send corresponding manual
// unsubscriptions upstream.
manual_subscriptions.rm (pipe_, send_unsubscription, this, false);
// Remove pipe without actually sending the message as it was taken
// care of by the manual call above. subscriptions is the real mtrie,
// so the pipe must be removed from there or it will be left over.
subscriptions.rm (pipe_, stub, NULL, false);
}
else
{
Expand Down
127 changes: 122 additions & 5 deletions tests/test_xpub_manual.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,10 @@ int test_basic()
// Create a publisher
void *pub = zmq_socket (ctx, ZMQ_XPUB);
assert (pub);
int rc = zmq_bind (pub, "inproc://soname");
assert (rc == 0);

// set pub socket options
int manual = 1;
rc = zmq_setsockopt(pub, ZMQ_XPUB_MANUAL, &manual, 4);
int rc = zmq_setsockopt(pub, ZMQ_XPUB_MANUAL, &manual, 4);
assert (rc == 0);
rc = zmq_bind (pub, "inproc://soname");
assert (rc == 0);

// Create a subscriber
Expand Down Expand Up @@ -468,13 +466,132 @@ int test_missing_subscriptions(void)
}


int test_unsubscribe_cleanup (void)
{
size_t len = MAX_SOCKET_STRING;
char my_endpoint[MAX_SOCKET_STRING];

void *ctx = zmq_ctx_new ();
assert (ctx);

// Create a publisher
void *pub = zmq_socket (ctx, ZMQ_XPUB);
assert (pub);
int manual = 1;
int rc = zmq_setsockopt (pub, ZMQ_XPUB_MANUAL, &manual, 4);
assert (rc == 0);
rc = zmq_bind (pub, "tcp://127.0.0.1:*");
assert (rc == 0);
rc = zmq_getsockopt (pub, ZMQ_LAST_ENDPOINT, my_endpoint, &len);
assert (rc == 0);

// Create a subscriber
void *sub = zmq_socket (ctx, ZMQ_XSUB);
assert (sub);
rc = zmq_connect (sub, my_endpoint);
assert (rc == 0);

// Subscribe for A
char subscription[2] = { 1, 'A'};
rc = zmq_send_const (sub, subscription, 2, 0);
assert (rc == 2);

char buffer[2];

// Receive subscriptions from subscriber
rc = zmq_recv(pub, buffer, 2, 0);
assert (rc == 2);
assert (buffer[0] == 1);
assert (buffer[1] == 'A');
rc = zmq_setsockopt (pub, ZMQ_SUBSCRIBE, "XA", 2);
assert (rc == 0);

// send 2 messages
rc = zmq_send_const (pub, "XA", 2, 0);
assert (rc == 2);
rc = zmq_send_const (pub, "XB", 2, 0);
assert (rc == 2);

// receive the single message
rc = zmq_recv (sub, buffer, 2, 0);
assert (rc == 2);
assert (buffer[0] == 'X');
assert (buffer[1] == 'A');

// should be nothing left in the queue
rc = zmq_recv (sub, buffer, 2, ZMQ_DONTWAIT);
assert (rc == -1);

// close the socket
rc = zmq_close (sub);
assert (rc == 0);

// closing the socket will result in an unsubscribe event
rc = zmq_recv (pub, buffer, 2, 0);
assert (rc == 2);
assert (buffer[0] == 0);
assert (buffer[1] == 'A');

// this doesn't really do anything
// there is no last_pipe set it will just fail silently
rc = zmq_setsockopt (pub, ZMQ_UNSUBSCRIBE, "XA", 2);
assert (rc == 0);

// reconnect
sub = zmq_socket (ctx, ZMQ_XSUB);
rc = zmq_connect (sub, my_endpoint);
assert (rc == 0);

// send a subscription for B
subscription[0] = 1;
subscription[1] = 'B';
rc = zmq_send (sub, subscription, 2, 0);
assert (rc == 2);

// receive the subscription, overwrite it to XB
rc = zmq_recv (pub, buffer, 2, 0);
assert (rc == 2);
assert (buffer[0] == 1);
assert(buffer[1] == 'B');
rc = zmq_setsockopt (pub, ZMQ_SUBSCRIBE, "XB", 2);
assert (rc == 0);

// send 2 messages
rc = zmq_send_const (pub, "XA", 2, 0);
assert (rc == 2);
rc = zmq_send_const (pub, "XB", 2, 0);
assert (rc == 2);

// receive the single message
rc = zmq_recv (sub, buffer, 2, 0);
assert (rc == 2);
assert (buffer[0] == 'X');
assert (buffer[1] == 'B'); // this assertion will fail

// should be nothing left in the queue
rc = zmq_recv (sub, buffer, 2, ZMQ_DONTWAIT);
assert (rc == -1);

// Clean up.
rc = zmq_close (pub);
assert (rc == 0);
rc = zmq_close (sub);
assert (rc == 0);
rc = zmq_ctx_term (ctx);
assert (rc == 0);

return 0 ;
}


int main(void)
{
setup_test_environment ();
test_basic ();
test_unsubscribe_manual ();
test_xpub_proxy_unsubscribe_on_disconnect ();
test_missing_subscriptions ();
test_unsubscribe_cleanup ();

return 0;
}

0 comments on commit f5da4b1

Please sign in to comment.