Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Process flex counters requests in separate thread #483

Merged
merged 2 commits into from
Jul 17, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 90 additions & 3 deletions syncd/syncd.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ std::shared_ptr<swss::RedisClient> g_redisClient;
std::shared_ptr<swss::ProducerTable> getResponse;
std::shared_ptr<swss::NotificationProducer> notifications;

std::shared_ptr<std::thread> g_processFlexCounterEventThread;
volatile bool g_processFlexCounterEventThreadRun = true;

/*
* TODO: Those are hard coded values for mlnx integration for v1.0.1 they need
* to be updated.
Expand Down Expand Up @@ -2954,25 +2957,98 @@ void processFlexCounterGroupEvent(
}
}

std::queue<swss::KeyOpFieldsValuesTuple> g_flexCounterEventQueue;

bool tryPopFlexCounterEvent(
_Out_ swss::KeyOpFieldsValuesTuple& kco)
{
SWSS_LOG_ENTER();

std::lock_guard<std::mutex> lock(g_mutex);

if (g_flexCounterEventQueue.empty())
{
return false;
}

kco = g_flexCounterEventQueue.front();

g_flexCounterEventQueue.pop();

return true;
}

void pushFlexCounterEvent(
_In_ const swss::KeyOpFieldsValuesTuple& kco)
{
SWSS_LOG_ENTER();

std::lock_guard<std::mutex> lock(g_mutex);

g_flexCounterEventQueue.push(kco);
}

bool processFlexCounterEvent(
_In_ const swss::KeyOpFieldsValuesTuple kco);

void processFlexCounterEventThread()
{
SWSS_LOG_ENTER();

while (g_processFlexCounterEventThreadRun)
{
swss::KeyOpFieldsValuesTuple kco;

if (tryPopFlexCounterEvent(kco))
{
if (!processFlexCounterEvent(kco))
{
// event was not successfully processed, put it again to the queue

pushFlexCounterEvent(kco);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may upset the event sequence in the processing failure case. The processing failure case, however, is the problem you would like to solve.

For the flex counter table in particular, it typically contains the counter id list that is expected to remain static and unchanged after SET. We should not see any two operations on the same key in the queue at the same time, although there leaves the possibility of running into such a sequence problem.

If we enqueue at the front and retry, we can avoid the event sequence problem. However, we may experience head of line blocking until the event processing retry succeeds.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't get the problem you described here, all those issues with counters are present here which we want to workaround because they are executed on the same channel and can be picked up by select independently, and i stated at the beginning that subscribe to the counter is broken design and of course you can came up with scenario that can break even this workaround, since you need to wait for RIF to be created and for example potentially not destroyed and created again, which all would be solved when syncd would become synchronous on which i already send PR

}
}

sleep(1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we wake up every 1 second only if there is a processing event failure? In other cases, we wake up on an enqueue event. Flex counter table event should be very rare after the initialization.

Copy link
Collaborator Author

@kcudnik kcudnik Jul 4, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is actually overkill to make other thread and condition variable to wait for specific event, also making that event, also don't guarantee that this event will be processed after second select will create rif port, so there must be wait somewhere anyway.
Even if those counter events are rare, this is generic case, i don't know whether orchagent will want to create/destroy next counter in 2 days of running or any other time

}
}

void processFlexCounterEvent(
_In_ swss::ConsumerTable &consumer)
{
SWSS_LOG_ENTER();

swss::KeyOpFieldsValuesTuple kco;

{
std::lock_guard<std::mutex> lock(g_mutex);
consumer.pop(kco);
}

// because flex counter event can arrive independently (on RIF interface)
// it may happen that it will be picked up from the select api before
// actual interface will be created, and subscription for counters will
// fail, so let's process each request in the thread and use queue for
// arriving events, and failed events will be put back to the queue until
// they will be processed

pushFlexCounterEvent(kco);
}

bool processFlexCounterEvent(
_In_ const swss::KeyOpFieldsValuesTuple kco)
{
SWSS_LOG_ENTER();

const auto &key = kfvKey(kco);
std::string &op = kfvOp(kco);
const std::string &op = kfvOp(kco);

std::size_t delimiter = key.find_first_of(":");
if (delimiter == std::string::npos)
{
SWSS_LOG_ERROR("Failed to parse the key %s", key.c_str());
return;

return true; // if key is invalid there is no need to process this event again
}

const auto groupName = key.substr(0, delimiter);
Expand All @@ -2987,7 +3063,7 @@ void processFlexCounterEvent(
SWSS_LOG_WARN("port VID %s, was not found (probably port was removed/splitted) and will remove from counters now",
sai_serialize_object_id(vid).c_str());

op = DEL_COMMAND;
return false;
}

sai_object_type_t objectType = redis_sai_object_type_query(vid); // VID and RID will have the same object type
Expand Down Expand Up @@ -3132,6 +3208,8 @@ void processFlexCounterEvent(

FlexCounter::setBufferPoolCounterList(vid, rid, groupName, bufferPoolCounterIds, statsMode);
}

return true;
}

void printUsage()
Expand Down Expand Up @@ -3846,6 +3924,11 @@ int syncd_main(int argc, char **argv)

twd.setCallback(timerWatchdogCallback);

g_processFlexCounterEventThreadRun = true;

g_processFlexCounterEventThread = std::make_shared<std::thread>(processFlexCounterEventThread);


while(runMainLoop)
{
try
Expand Down Expand Up @@ -4029,6 +4112,10 @@ int syncd_main(int argc, char **argv)

#endif

g_processFlexCounterEventThreadRun = false;

g_processFlexCounterEventThread->join();

FlexCounter::removeAllCounters();

{
Expand Down