Skip to content

Commit

Permalink
ZMQ based state tables
Browse files Browse the repository at this point in the history
  • Loading branch information
liuh-80 committed Nov 22, 2022
1 parent 3801611 commit 8b7499c
Show file tree
Hide file tree
Showing 10 changed files with 255 additions and 304 deletions.
2 changes: 1 addition & 1 deletion BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ cc_library(
includes = [
"common",
],
linkopts = ["-lpthread -lhiredis -lnl-genl-3 -lnl-nf-3 -lnl-route-3 -lnl-3 -lzmq -lboost_serialization -luuid -lyang"],
linkopts = ["-lpthread -lhiredis -lnl-genl-3 -lnl-nf-3 -lnl-route-3 -lnl-3 -lzmq -lboost_serialization -luuid -lyang -lrt"],
visibility = ["//visibility:public"],
)

Expand Down
4 changes: 2 additions & 2 deletions common/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ libswsscommon_la_SOURCES = \
json.cpp \
producertable.cpp \
producerstatetable.cpp \
shmproducerstatetable.cpp \
zmqproducerstatetable.cpp \
rediscommand.cpp \
redistran.cpp \
redisselect.cpp \
Expand All @@ -52,7 +52,7 @@ libswsscommon_la_SOURCES = \
consumertable.cpp \
consumertablebase.cpp \
consumerstatetable.cpp \
shmconsumerstatetable.cpp \
zmqconsumerstatetable.cpp \
ipaddress.cpp \
ipprefix.cpp \
ipaddresses.cpp \
Expand Down
173 changes: 0 additions & 173 deletions common/shmconsumerstatetable.cpp

This file was deleted.

127 changes: 127 additions & 0 deletions common/zmqconsumerstatetable.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
#include <string>
#include <deque>
#include <limits>
#include <hiredis/hiredis.h>
#include "dbconnector.h"
#include "table.h"
#include "selectable.h"
#include "selectableevent.h"
#include "redisselect.h"
#include "redisapi.h"
#include "zmqconsumerstatetable.h"
#include "json.h"

#include <zmq.h>

using namespace std;

namespace swss {

ZmqConsumerStateTable::ZmqConsumerStateTable(DBConnector *db, const std::string &tableName, const std::string& endpoint, int popBatchSize, int pri)
: Selectable(pri)
, m_db(db)
, m_tableName(tableName)
, m_endpoint(endpoint)
{
m_tableSeparator = TableBase::gettableSeparator(db->getDbId());
m_runThread = true;
m_mqPollThread = std::make_shared<std::thread>(&ZmqConsumerStateTable::mqPollThread, this);
}

ZmqConsumerStateTable::~ZmqConsumerStateTable()
{
m_runThread = false;
m_mqPollThread->join();
}

void ZmqConsumerStateTable::mqPollThread()
{
SWSS_LOG_ENTER();
SWSS_LOG_NOTICE("mqPollThread begin");
std::vector<uint8_t> buffer;
buffer.resize(MQ_RESPONSE_BUFFER_SIZE);

// Producer/Consumer state table are n:m mapping, so need use PUSH/PUUL pattern http://api.zeromq.org/master:zmq-socket
void* context = zmq_ctx_new();;
void* socket = zmq_socket(context, ZMQ_PULL);
int rc = zmq_bind(socket, m_endpoint.c_str());
if (rc != 0)
{
SWSS_LOG_THROW("zmq_bind failed on endpoint: %s, zmqerrno: %d",
m_endpoint.c_str(),
zmq_errno());
}

while (m_runThread)
{
// receive message
rc = zmq_recv(socket, buffer.data(), MQ_RESPONSE_BUFFER_SIZE, ZMQ_DONTWAIT);
if (rc < 0
&& (zmq_errno() == EINTR || zmq_errno() == EAGAIN))
{
continue;
}

if (rc < 0)
{
SWSS_LOG_THROW("zmq_recv failed, zmqerrno: %d", zmq_errno());
}

if (rc >= MQ_RESPONSE_BUFFER_SIZE)
{
SWSS_LOG_THROW("zmq_recv message was truncated (over %d bytes, received %d), increase buffer size, message DROPPED",
MQ_RESPONSE_BUFFER_SIZE,
rc);
}

buffer.at(rc) = 0; // make sure that we end string with zero before parse

{
std::lock_guard<std::mutex> lock(m_dataQueueMutex);
m_dataQueue.push((char*)buffer.data());
}

m_selectableEvent.notify(); // will release epoll
}

zmq_close(socket);
zmq_ctx_destroy(context);

SWSS_LOG_NOTICE("end");
}

/* Get multiple pop elements */
void ZmqConsumerStateTable::pops(std::deque<KeyOpFieldsValuesTuple> &vkco, const std::string& /*prefix*/)
{
if (m_dataQueue.empty())
{
return;
}

// For new data append to m_dataQueue during pops, will not be include in result.
auto count = m_dataQueue.size();
vkco.clear();
vkco.resize(count);
for (size_t ie = 0; ie < count; ie++)
{
auto& kco = vkco[ie];
auto& values = kfvFieldsValues(kco);
values.clear();

swss::JSon::readJson(m_dataQueue.front(), values);

// set key and OP
swss::FieldValueTuple fvt = values.at(0);
kfvKey(kco) = fvField(fvt);
kfvOp(kco) = fvValue(fvt);

values.erase(values.begin());

{
std::lock_guard<std::mutex> lock(m_dataQueueMutex);
m_dataQueue.pop();
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,14 @@

namespace swss {

class ShmConsumerStateTable : public Selectable
class ZmqConsumerStateTable : public Selectable
{
public:
/* The default value of pop batch size is 128 */
static constexpr int DEFAULT_POP_BATCH_SIZE = 128;

ShmConsumerStateTable(DBConnector *db, const std::string &tableName, int popBatchSize = DEFAULT_POP_BATCH_SIZE, int pri = 0);
~ShmConsumerStateTable();

static bool TryRemoveShmQueue(const std::string &queueName);
static std::string GetQueueName(const std::string &dbName, const std::string &tableName);
ZmqConsumerStateTable(DBConnector *db, const std::string &tableName, const std::string& endpoint, int popBatchSize = DEFAULT_POP_BATCH_SIZE, int pri = 0);
~ZmqConsumerStateTable();

/* Get multiple pop elements */
void pops(std::deque<KeyOpFieldsValuesTuple> &vkco, const std::string &prefix = EMPTY_PREFIX);
Expand Down Expand Up @@ -85,8 +82,6 @@ class ShmConsumerStateTable : public Selectable
private:
void mqPollThread();

std::string m_queueName;

volatile bool m_runThread;

std::shared_ptr<std::thread> m_mqPollThread;
Expand All @@ -100,6 +95,8 @@ class ShmConsumerStateTable : public Selectable
DBConnector *m_db;

std::string m_tableName;

std::string m_endpoint;

std::string m_tableSeparator;
};
Expand Down
Loading

0 comments on commit 8b7499c

Please sign in to comment.