Skip to content

Commit

Permalink
common: Adding ConsumerTable class
Browse files Browse the repository at this point in the history
The ConsumerTable implements the consumer side of the notification channel.
Opens a duplicated socket to the Redis DB and listens to notification
messages made by the ProducerClass ("PUBLISH" command).

The ConsumerTable implements the Selectable interface. Once a change had been
made, it clears (by reading) the PUBLISH message and notify user.

Usage:
        ConsumerTable c(&db, "TEST");
        Select cs;

        cs.addSelectable(&cs);
        while ((ret = cs.select(&tmpcs, &tmpfd)) == Select::OBJECT)
        {
                c.pop(kco);
                // Work with KEY, OP and Value
                ...
        }

The peek() command is used to read values without checking or poping the
notification channel (Used to invoke "HGETALL" command directly).
Usage:
        ConsumerTable c(&db, "TEST");
        vector<FieldValueTuple> v;
        if (!c.peek("TEST", v))
        {
               cout << "not exists" << endl;
               return 0;
        }

        for(auto i : v)
        {
               cout << fvField(i) << " = " << fvValue(i) << endl;
        }

Signed-off-by: Elad Raz <[email protected]>
  • Loading branch information
eladraz authored and Shuotian Cheng committed Mar 7, 2016
1 parent ebb7957 commit c5449c9
Show file tree
Hide file tree
Showing 3 changed files with 216 additions and 1 deletion.
3 changes: 2 additions & 1 deletion common/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ libswsscommon_la_SOURCES = \
table.cpp \
json.cpp \
producertable.cpp \
select.cpp
select.cpp \
consumertable.cpp

libswsscommon_la_CFLAGS = $(DBGFLAGS) $(AM_CFLAGS) $(CFLAGS_COMMON)
libswsscommon_la_CPPFLAGS = $(DBGFLAGS) $(AM_CFLAGS) $(CFLAGS_COMMON)
Expand Down
172 changes: 172 additions & 0 deletions common/consumertable.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
#include "common/redisreply.h"
#include "common/consumertable.h"
#include "common/json.h"
#include <stdio.h>
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
#include <iostream>
#include <system_error>

/* The database is already alive and kicking, no need for more than a second */
#define SUBSCRIBE_TIMEOUT (1000)

using namespace std;

namespace swss {

ConsumerTable::ConsumerTable(DBConnector *db, string tableName) :
Table(db, tableName),
m_subscribe(NULL),
m_queueLength(0)
{
bool again = true;

while (again)
{
try
{
RedisReply watch(m_db, string("WATCH ") + getKeyQueueTableName(), REDIS_REPLY_STATUS);
watch.checkStatusOK();
multi();
enqueue(string("LLEN ") + getKeyQueueTableName(), REDIS_REPLY_INTEGER);
subsribe();
enqueue(string("LLEN ") + getKeyQueueTableName(), REDIS_REPLY_INTEGER);
exec();
again = false;
}
catch (...)
{
delete m_subscribe;
}
}

m_queueLength = queueResultsFront()->integer;
/* No need for that since we have WATCH gurantee on the transaction */
}

ConsumerTable::~ConsumerTable()
{
delete m_subscribe;
}

static string pop_front(queue<RedisReply *> &q)
{
string ret(q.front()->getContext()->str);
delete q.front();
q.pop();
return ret;
}

void ConsumerTable::pop(KeyOpFieldsValuesTuple &kco)
{
string rpop_key("RPOP ");
string rpop_value = rpop_key;
string rpop_op = rpop_key;

multi();

rpop_key += getKeyQueueTableName();
enqueue(rpop_key, REDIS_REPLY_STRING);

rpop_op += getOpQueueTableName();
enqueue(rpop_op, REDIS_REPLY_STRING);

rpop_value += getValueQueueTableName();
enqueue(rpop_value, REDIS_REPLY_STRING);

exec();

vector<FieldValueTuple> fieldsValues;
string key = pop_front(m_results);
string op = pop_front(m_results);
JSon::readJson(pop_front(m_results), fieldsValues);

kco = std::make_tuple(key, op, fieldsValues);
}

bool ConsumerTable::peek(std::string key, vector<FieldValueTuple> &values)
{
string hgetall_key("HGETALL ");
hgetall_key += getKeyName(key);

RedisReply r(m_db, hgetall_key, REDIS_REPLY_ARRAY);
redisReply *reply = r.getContext();
values.clear();

if (!reply->elements)
return false;

if (reply->elements & 1)
throw system_error(make_error_code(errc::address_not_available),
"Unable to connect netlink socket");

for (unsigned int i = 0; i < reply->elements; i += 2)
values.push_back(make_tuple(reply->element[i]->str,
reply->element[i + 1]->str));

return true;
}

void ConsumerTable::addFd(fd_set *fd)
{
FD_SET(m_subscribe->getContext()->fd, fd);
}

int ConsumerTable::readCache()
{
redisReply *reply = NULL;

/* Read the messages in queue before subsribe command execute */
if (m_queueLength) {
m_queueLength--;
return ConsumerTable::DATA;
}

if (redisGetReplyFromReader(m_subscribe->getContext(),
(void**)&reply) != REDIS_OK)
{
return Selectable::ERROR;
} else if (reply != NULL)
{
freeReplyObject(reply);
return Selectable::DATA;
}

return Selectable::NODATA;
}

void ConsumerTable::readMe()
{
redisReply *reply = NULL;

if (redisGetReply(m_subscribe->getContext(), (void**)&reply) != REDIS_OK)
throw "Unable to read redis reply";

freeReplyObject(reply);
}

bool ConsumerTable::isMe(fd_set *fd)
{
return FD_ISSET(m_subscribe->getContext()->fd, fd);
}

void ConsumerTable::subsribe()
{
/* Create new new context to DB */
if (m_db->getContext()->connection_type == REDIS_CONN_TCP)
m_subscribe = new DBConnector(m_db->getDB(),
m_db->getContext()->tcp.host,
m_db->getContext()->tcp.port,
SUBSCRIBE_TIMEOUT);
else
m_subscribe = new DBConnector(m_db->getDB(),
m_db->getContext()->unix_sock.path,
SUBSCRIBE_TIMEOUT);
/* Send SUBSCRIBE #channel command */
string s("SUBSCRIBE ");
s+= getChannelTableName();
RedisReply r(m_subscribe, s, REDIS_REPLY_ARRAY);
}

}
42 changes: 42 additions & 0 deletions common/consumertable.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#ifndef __CONSUMERTABLE__
#define __CONSUMERTABLE__

#include <string>
#include <vector>
#include <limits>
#include <hiredis/hiredis.h>
#include "common/dbconnector.h"
#include "common/table.h"
#include "common/selectable.h"

namespace swss {

class ConsumerTable : public Table, public Selectable
{
public:
ConsumerTable(DBConnector *db, std::string tableName);
virtual ~ConsumerTable();

/* Get a singlesubsribe channel rpop */
void pop(KeyOpFieldsValuesTuple &kco);

/* Get a key content without poping it from the notificaiton list */
/* return false if the key doesn't exists */
bool peek(std::string key, std::vector<FieldValueTuple> &values);

virtual void addFd(fd_set *fd);
virtual bool isMe(fd_set *fd);
virtual int readCache();
virtual void readMe();

private:
/* Create a new redisContext, SELECT DB and SUBSRIBE */
void subsribe();

DBConnector *m_subscribe;
unsigned int m_queueLength;
};

}

#endif

0 comments on commit c5449c9

Please sign in to comment.