From c5449c9e5a04d66cb3049e6da7325a6e4aaa0937 Mon Sep 17 00:00:00 2001 From: Elad Raz Date: Sun, 6 Mar 2016 16:36:38 +0200 Subject: [PATCH] common: Adding ConsumerTable class The ConsumerTable implements the consumer side of the notification channel. Opens a duplicated socket to the Redis DB and listens to notification messages made by the ProducerClass ("PUBLISH" command). The ConsumerTable implements the Selectable interface. Once a change had been made, it clears (by reading) the PUBLISH message and notify user. Usage: ConsumerTable c(&db, "TEST"); Select cs; cs.addSelectable(&cs); while ((ret = cs.select(&tmpcs, &tmpfd)) == Select::OBJECT) { c.pop(kco); // Work with KEY, OP and Value ... } The peek() command is used to read values without checking or poping the notification channel (Used to invoke "HGETALL" command directly). Usage: ConsumerTable c(&db, "TEST"); vector v; if (!c.peek("TEST", v)) { cout << "not exists" << endl; return 0; } for(auto i : v) { cout << fvField(i) << " = " << fvValue(i) << endl; } Signed-off-by: Elad Raz --- common/Makefile.am | 3 +- common/consumertable.cpp | 172 +++++++++++++++++++++++++++++++++++++++ common/consumertable.h | 42 ++++++++++ 3 files changed, 216 insertions(+), 1 deletion(-) create mode 100644 common/consumertable.cpp create mode 100644 common/consumertable.h diff --git a/common/Makefile.am b/common/Makefile.am index 6ef2a95742..841116e07c 100644 --- a/common/Makefile.am +++ b/common/Makefile.am @@ -15,7 +15,8 @@ libswsscommon_la_SOURCES = \ table.cpp \ json.cpp \ producertable.cpp \ - select.cpp + select.cpp \ + consumertable.cpp libswsscommon_la_CFLAGS = $(DBGFLAGS) $(AM_CFLAGS) $(CFLAGS_COMMON) libswsscommon_la_CPPFLAGS = $(DBGFLAGS) $(AM_CFLAGS) $(CFLAGS_COMMON) diff --git a/common/consumertable.cpp b/common/consumertable.cpp new file mode 100644 index 0000000000..4935371d44 --- /dev/null +++ b/common/consumertable.cpp @@ -0,0 +1,172 @@ +#include "common/redisreply.h" +#include "common/consumertable.h" +#include "common/json.h" +#include +#include +#include +#include +#include +#include + +/* The database is already alive and kicking, no need for more than a second */ +#define SUBSCRIBE_TIMEOUT (1000) + +using namespace std; + +namespace swss { + +ConsumerTable::ConsumerTable(DBConnector *db, string tableName) : + Table(db, tableName), + m_subscribe(NULL), + m_queueLength(0) +{ + bool again = true; + + while (again) + { + try + { + RedisReply watch(m_db, string("WATCH ") + getKeyQueueTableName(), REDIS_REPLY_STATUS); + watch.checkStatusOK(); + multi(); + enqueue(string("LLEN ") + getKeyQueueTableName(), REDIS_REPLY_INTEGER); + subsribe(); + enqueue(string("LLEN ") + getKeyQueueTableName(), REDIS_REPLY_INTEGER); + exec(); + again = false; + } + catch (...) + { + delete m_subscribe; + } + } + + m_queueLength = queueResultsFront()->integer; + /* No need for that since we have WATCH gurantee on the transaction */ +} + +ConsumerTable::~ConsumerTable() +{ + delete m_subscribe; +} + +static string pop_front(queue &q) +{ + string ret(q.front()->getContext()->str); + delete q.front(); + q.pop(); + return ret; +} + +void ConsumerTable::pop(KeyOpFieldsValuesTuple &kco) +{ + string rpop_key("RPOP "); + string rpop_value = rpop_key; + string rpop_op = rpop_key; + + multi(); + + rpop_key += getKeyQueueTableName(); + enqueue(rpop_key, REDIS_REPLY_STRING); + + rpop_op += getOpQueueTableName(); + enqueue(rpop_op, REDIS_REPLY_STRING); + + rpop_value += getValueQueueTableName(); + enqueue(rpop_value, REDIS_REPLY_STRING); + + exec(); + + vector fieldsValues; + string key = pop_front(m_results); + string op = pop_front(m_results); + JSon::readJson(pop_front(m_results), fieldsValues); + + kco = std::make_tuple(key, op, fieldsValues); +} + +bool ConsumerTable::peek(std::string key, vector &values) +{ + string hgetall_key("HGETALL "); + hgetall_key += getKeyName(key); + + RedisReply r(m_db, hgetall_key, REDIS_REPLY_ARRAY); + redisReply *reply = r.getContext(); + values.clear(); + + if (!reply->elements) + return false; + + if (reply->elements & 1) + throw system_error(make_error_code(errc::address_not_available), + "Unable to connect netlink socket"); + + for (unsigned int i = 0; i < reply->elements; i += 2) + values.push_back(make_tuple(reply->element[i]->str, + reply->element[i + 1]->str)); + + return true; +} + +void ConsumerTable::addFd(fd_set *fd) +{ + FD_SET(m_subscribe->getContext()->fd, fd); +} + +int ConsumerTable::readCache() +{ + redisReply *reply = NULL; + + /* Read the messages in queue before subsribe command execute */ + if (m_queueLength) { + m_queueLength--; + return ConsumerTable::DATA; + } + + if (redisGetReplyFromReader(m_subscribe->getContext(), + (void**)&reply) != REDIS_OK) + { + return Selectable::ERROR; + } else if (reply != NULL) + { + freeReplyObject(reply); + return Selectable::DATA; + } + + return Selectable::NODATA; +} + +void ConsumerTable::readMe() +{ + redisReply *reply = NULL; + + if (redisGetReply(m_subscribe->getContext(), (void**)&reply) != REDIS_OK) + throw "Unable to read redis reply"; + + freeReplyObject(reply); +} + +bool ConsumerTable::isMe(fd_set *fd) +{ + return FD_ISSET(m_subscribe->getContext()->fd, fd); +} + +void ConsumerTable::subsribe() +{ + /* Create new new context to DB */ + if (m_db->getContext()->connection_type == REDIS_CONN_TCP) + m_subscribe = new DBConnector(m_db->getDB(), + m_db->getContext()->tcp.host, + m_db->getContext()->tcp.port, + SUBSCRIBE_TIMEOUT); + else + m_subscribe = new DBConnector(m_db->getDB(), + m_db->getContext()->unix_sock.path, + SUBSCRIBE_TIMEOUT); + /* Send SUBSCRIBE #channel command */ + string s("SUBSCRIBE "); + s+= getChannelTableName(); + RedisReply r(m_subscribe, s, REDIS_REPLY_ARRAY); +} + +} diff --git a/common/consumertable.h b/common/consumertable.h new file mode 100644 index 0000000000..385aea3ccb --- /dev/null +++ b/common/consumertable.h @@ -0,0 +1,42 @@ +#ifndef __CONSUMERTABLE__ +#define __CONSUMERTABLE__ + +#include +#include +#include +#include +#include "common/dbconnector.h" +#include "common/table.h" +#include "common/selectable.h" + +namespace swss { + +class ConsumerTable : public Table, public Selectable +{ +public: + ConsumerTable(DBConnector *db, std::string tableName); + virtual ~ConsumerTable(); + + /* Get a singlesubsribe channel rpop */ + void pop(KeyOpFieldsValuesTuple &kco); + + /* Get a key content without poping it from the notificaiton list */ + /* return false if the key doesn't exists */ + bool peek(std::string key, std::vector &values); + + virtual void addFd(fd_set *fd); + virtual bool isMe(fd_set *fd); + virtual int readCache(); + virtual void readMe(); + +private: + /* Create a new redisContext, SELECT DB and SUBSRIBE */ + void subsribe(); + + DBConnector *m_subscribe; + unsigned int m_queueLength; +}; + +} + +#endif