Skip to content

Commit

Permalink
Suppress FDB events if notifier queue exceeds limit (sonic-net#334)
Browse files Browse the repository at this point in the history
* Suppress FDB events if notifier queue exceeds limit

* Revert space change

* Address review comments

* Fix a build check

* Updated comments, changed to NOTICE and log every 1000 events

* Updated dropped count using a 32 bit counter
  • Loading branch information
prsunny authored and lguohan committed Aug 1, 2018
1 parent c2c5e7c commit 9380373
Showing 1 changed file with 87 additions and 29 deletions.
116 changes: 87 additions & 29 deletions syncd/syncd_notifications.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -461,8 +461,86 @@ void processNotification(
// that some notiffication arrived

std::condition_variable cv;
std::mutex queue_mutex;
std::queue<swss::KeyOpFieldsValuesTuple> ntf_queue;

class ntf_queue_t
{
public:
bool enqueue(swss::KeyOpFieldsValuesTuple msg);
bool tryDequeue(swss::KeyOpFieldsValuesTuple& msg);
size_t queueStats()
{
SWSS_LOG_ENTER();

std::lock_guard<std::mutex> lock(queue_mutex);

return ntf_queue.size();
}

private:
std::mutex queue_mutex;
std::queue<swss::KeyOpFieldsValuesTuple> ntf_queue;

/*
* Value based on typical L2 deployment with 256k MAC entries and
* some extra buffer for other events like port-state, q-deadlock etc
*/
const size_t limit = 300000;
};

static std::unique_ptr<ntf_queue_t> ntf_queue_hdlr;

bool ntf_queue_t::tryDequeue(
_Out_ swss::KeyOpFieldsValuesTuple &item)
{
std::lock_guard<std::mutex> lock(queue_mutex);

SWSS_LOG_ENTER();

if (ntf_queue.empty())
{
return false;
}

item = ntf_queue.front();

ntf_queue.pop();

return true;
}

bool ntf_queue_t::enqueue(
_In_ swss::KeyOpFieldsValuesTuple item)
{
SWSS_LOG_ENTER();

std::string notification = kfvKey(item);

/*
* If the queue exceeds the limit, then drop all further FDB events
* This is a temporary solution to handle high memory usage by syncd and the
* ntf-Q keeps growing. The permanent solution would be to make this stateful
* so that only the *latest* event is published.
*/
if (queueStats() < limit || notification != "fdb_event")
{
std::lock_guard<std::mutex> lock(queue_mutex);

ntf_queue.push(item);
return true;
}

static uint32_t log_count = 0;

if (!(log_count % 1000))
{
SWSS_LOG_NOTICE("Too many messages in queue (%ld), dropped %ld FDB events!",
queueStats(), (log_count+1));
}

++log_count;

return false;
}

void enqueue_notification(
_In_ std::string op,
Expand All @@ -475,13 +553,10 @@ void enqueue_notification(

swss::KeyOpFieldsValuesTuple item(op, data, entry);

// this is notification context, so we need to protect queue

std::lock_guard<std::mutex> lock(queue_mutex);

ntf_queue.push(item);

cv.notify_all();
if(ntf_queue_hdlr->enqueue(item))
{
cv.notify_all();
}
}

void enqueue_notification(
Expand Down Expand Up @@ -568,29 +643,12 @@ volatile bool runThread;
std::mutex ntf_mutex;
std::unique_lock<std::mutex> ulock(ntf_mutex);

bool tryDequeue(
_Out_ swss::KeyOpFieldsValuesTuple &item)
{
std::lock_guard<std::mutex> lock(queue_mutex);

SWSS_LOG_ENTER();

if (ntf_queue.empty())
{
return false;
}

item = ntf_queue.front();

ntf_queue.pop();

return true;
}

void ntf_process_function()
{
SWSS_LOG_ENTER();

ntf_queue_hdlr = std::unique_ptr<ntf_queue_t>(new ntf_queue_t);

while (runThread)
{
cv.wait(ulock);
Expand All @@ -602,7 +660,7 @@ void ntf_process_function()

swss::KeyOpFieldsValuesTuple item;

while (tryDequeue(item))
while (ntf_queue_hdlr->tryDequeue(item))
{
processNotification(item);
}
Expand Down

0 comments on commit 9380373

Please sign in to comment.