Skip to content

Commit

Permalink
[aclorch]: Use the Observer class to listen to port changes (#689)
Browse files Browse the repository at this point in the history
The previous implementation was using state database to check LAG
creation/removal. However, state database only reflects the changes
in the kernel.

This pull request changes the logic so that LAG changes will directly
notify the AclOrch to update pending ports per table. This simplifies
the logic in AclOrch and make it reliable on port change events.

Right now, only LAG creation is support, which aligns the legacy codes.
An extra pull request will be provided to ensure both LAG creation/removal
will be supported.

The unit test is provided to cover the two scenarios:
1. creating ACL table before creating LAG
2. creating LAG before creating ACL table

Signed-off-by: Shu0T1an ChenG <[email protected]>
  • Loading branch information
Shuotian Cheng authored Nov 17, 2018
1 parent d7d887a commit 067928d
Show file tree
Hide file tree
Showing 6 changed files with 257 additions and 148 deletions.
238 changes: 97 additions & 141 deletions orchagent/aclorch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1054,7 +1054,7 @@ bool AclTable::validate()
// Control plane ACLs are handled by a separate process
if (type == ACL_TABLE_UNKNOWN || type == ACL_TABLE_CTRLPLANE) return false;
if (stage == ACL_STAGE_UNKNOWN) return false;
if (portSet.empty()) return false;
if (portSet.empty() && pendingPortSet.empty()) return false;

return true;
}
Expand Down Expand Up @@ -1177,6 +1177,50 @@ bool AclTable::create()
return status == SAI_STATUS_SUCCESS;
}

void AclTable::update(SubjectType type, void *cntx)
{
SWSS_LOG_ENTER();

// Only interested in port change
if (type != SUBJECT_TYPE_PORT_CHANGE)
{
return;
}

PortUpdate *update = static_cast<PortUpdate *>(cntx);

Port &port = update->port;
if (update->add)
{
if (pendingPortSet.find(port.m_alias) != pendingPortSet.end())
{
sai_object_id_t bind_port_id;
if (gPortsOrch->getAclBindPortId(port.m_alias, bind_port_id))
{
link(bind_port_id);
bind(bind_port_id);

pendingPortSet.erase(port.m_alias);
portSet.emplace(port.m_alias);

SWSS_LOG_NOTICE("Bound port %s to ACL table %s",
port.m_alias.c_str(), id.c_str());
}
else
{
SWSS_LOG_ERROR("Failed to get port %s bind port ID",
port.m_alias.c_str());
return;
}
}
}
else
{
// TODO: deal with port removal scenario
}
}

// TODO: make bind/unbind symmetric
bool AclTable::bind(sai_object_id_t portOid)
{
SWSS_LOG_ENTER();
Expand Down Expand Up @@ -1737,7 +1781,9 @@ void AclOrch::init(vector<TableConnector>& connectors, PortsOrch *portOrch, Mirr
throw "AclOrch initialization failure";
}

// Attach observers
m_mirrorOrch->attach(this);
gPortsOrch->attach(this);

// Should be initialized last to guaranty that object is
// initialized before thread start.
Expand Down Expand Up @@ -1798,18 +1844,30 @@ void AclOrch::update(SubjectType type, void *cntx)
{
SWSS_LOG_ENTER();

if (type != SUBJECT_TYPE_MIRROR_SESSION_CHANGE && type != SUBJECT_TYPE_INT_SESSION_CHANGE)
if (type != SUBJECT_TYPE_MIRROR_SESSION_CHANGE &&
type != SUBJECT_TYPE_INT_SESSION_CHANGE &&
type != SUBJECT_TYPE_PORT_CHANGE)
{
SWSS_LOG_WARN("Received unwanted change update %d", type);
return;
}

unique_lock<mutex> lock(m_countersMutex);

for (const auto& table : m_AclTables)
// ACL table deals with port change
// ACL rule deals with mirror session change and int session change
for (auto& table : m_AclTables)
{
for (auto& rule : table.second.rules)
if (type == SUBJECT_TYPE_PORT_CHANGE)
{
rule.second->update(type, cntx);
table.second.update(type, cntx);
}
else
{
for (auto& rule : table.second.rules)
{
rule.second->update(type, cntx);
}
}
}
}
Expand All @@ -1835,11 +1893,6 @@ void AclOrch::doTask(Consumer &consumer)
unique_lock<mutex> lock(m_countersMutex);
doAclRuleTask(consumer);
}
else if (table_name == STATE_LAG_TABLE_NAME)
{
unique_lock<mutex> lock(m_countersMutex);
doAclTablePortUpdateTask(consumer);
}
else
{
SWSS_LOG_ERROR("Invalid table %s", table_name.c_str());
Expand All @@ -1857,20 +1910,22 @@ bool AclOrch::addAclTable(AclTable &newTable, string table_id)
/* If ACL table exists, remove the table first.*/
if (!removeAclTable(table_id))
{
SWSS_LOG_ERROR("Fail to remove the exsiting ACL table %s when try to add the new one.", table_id.c_str());
SWSS_LOG_ERROR("Failed to remove exsiting ACL table %s before adding the new one",
table_id.c_str());
return false;
}
}

if (createBindAclTable(newTable, table_oid))
{
m_AclTables[table_oid] = newTable;
SWSS_LOG_NOTICE("Successfully created ACL table %s, oid: %lX", newTable.description.c_str(), table_oid);
SWSS_LOG_NOTICE("Created ACL table %s oid:%lx",
newTable.id.c_str(), table_oid);
return true;
}
else
{
SWSS_LOG_ERROR("Failed to create table %s", table_id.c_str());
SWSS_LOG_ERROR("Failed to create ACL table %s", table_id.c_str());
return false;
}
}
Expand Down Expand Up @@ -1968,20 +2023,18 @@ void AclOrch::doAclTableTask(Consumer &consumer)
{
if (!processAclTableType(attr_value, newTable.type))
{
SWSS_LOG_ERROR("Failed to process table type for table %s", table_id.c_str());
SWSS_LOG_ERROR("Failed to process ACL table %s type",
table_id.c_str());
bAllAttributesOk = false;
break;
}
}
else if (attr_name == TABLE_PORTS)
{
bool suc = processPorts(newTable, attr_value, [&](sai_object_id_t portOid) {
newTable.link(portOid);
});

if (!suc)
if (!processAclTablePorts(attr_value, newTable))
{
SWSS_LOG_ERROR("Failed to process table ports for table %s", table_id.c_str());
SWSS_LOG_ERROR("Failed to process ACL table %s ports",
table_id.c_str());
bAllAttributesOk = false;
break;
}
Expand All @@ -1990,7 +2043,8 @@ void AclOrch::doAclTableTask(Consumer &consumer)
{
if (!processAclTableStage(attr_value, newTable.stage))
{
SWSS_LOG_ERROR("Failed to process table stage for table %s", table_id.c_str());
SWSS_LOG_ERROR("Failed to process ACL table %s stage",
table_id.c_str());
bAllAttributesOk = false;
break;
}
Expand All @@ -2013,7 +2067,8 @@ void AclOrch::doAclTableTask(Consumer &consumer)
else
{
it = consumer.m_toSync.erase(it);
SWSS_LOG_ERROR("Failed to create ACL table. Table configuration is invalid");
SWSS_LOG_ERROR("Failed to create ACL table %s, invalid configuration",
table_id.c_str());
}
}
else if (op == DEL_COMMAND)
Expand Down Expand Up @@ -2120,140 +2175,41 @@ void AclOrch::doAclRuleTask(Consumer &consumer)
}
}

void AclOrch::doAclTablePortUpdateTask(Consumer &consumer)
bool AclOrch::processAclTablePorts(string portList, AclTable &aclTable)
{
SWSS_LOG_ENTER();

auto it = consumer.m_toSync.begin();
while (it != consumer.m_toSync.end())
{
KeyOpFieldsValuesTuple t = it->second;
string key = kfvKey(t);
size_t found = key.find(consumer.getConsumerTable()->getTableNameSeparator().c_str());
string port_alias = key.substr(0, found);
string op = kfvOp(t);

SWSS_LOG_INFO("doAclTablePortUpdateTask: OP: %s, port_alias: %s", op.c_str(), port_alias.c_str());

if (op == SET_COMMAND)
{
for (auto itmap : m_AclTables)
{
auto table = itmap.second;
if (table.pendingPortSet.find(port_alias) != table.pendingPortSet.end())
{
SWSS_LOG_INFO("found the port: %s in ACL table: %s pending port list, bind it to ACL table.", port_alias.c_str(), table.description.c_str());

bool suc = processPendingPort(table, port_alias, [&](sai_object_id_t portOid) {
table.link(portOid);
});
auto port_list = tokenize(portList, ',');
set<string> ports(port_list.begin(), port_list.end());

if (!suc)
{
SWSS_LOG_ERROR("Failed to bind the ACL table: %s to port: %s", table.description.c_str(), port_alias.c_str());
}
else
{
table.pendingPortSet.erase(port_alias);
SWSS_LOG_DEBUG("port: %s bound to ACL table table: %s, remove it from pending list", port_alias.c_str(), table.description.c_str());
}
}
}
}
else if (op == DEL_COMMAND)
{
for (auto itmap : m_AclTables)
{
auto table = itmap.second;
if (table.portSet.find(port_alias) != table.portSet.end())
{
/*TODO: update the ACL table after port/lag deleted*/
table.pendingPortSet.emplace(port_alias);
SWSS_LOG_INFO("Add deleted port: %s to the pending list of ACL table: %s", port_alias.c_str(), table.description.c_str());
}
}
}
else
{
SWSS_LOG_ERROR("Unknown operation type %s", op.c_str());
}
it = consumer.m_toSync.erase(it);
}
}

bool AclOrch::processPorts(AclTable &aclTable, string portsList, std::function<void (sai_object_id_t)> inserter)
{
SWSS_LOG_ENTER();

vector<string> strList;

SWSS_LOG_DEBUG("Processing ACL table port list %s", portsList.c_str());

split(portsList, strList, ',');

set<string> strSet(strList.begin(), strList.end());
aclTable.portSet = strSet;

if (strList.size() != strSet.size())
{
SWSS_LOG_ERROR("Failed to process port list. Duplicate port entry");
return false;
}

if (strList.empty())
// TODO: Support adding ports afterwards
if (ports.empty())
{
SWSS_LOG_ERROR("Failed to process port list. List is empty");
SWSS_LOG_ERROR("Failed to process empty port list");
return false;
}

for (const auto& alias : strList)
for (auto alias : ports)
{
sai_object_id_t port_id;
Port port;
if (!gPortsOrch->getPort(alias, port))
{
SWSS_LOG_INFO("Port %s not configured yet, add it to ACL table %s pending list", alias.c_str(), aclTable.description.c_str());
SWSS_LOG_INFO("Add unready port %s to pending list for ACL table %s",
alias.c_str(), aclTable.id.c_str());
aclTable.pendingPortSet.emplace(alias);
continue;
}

if (gPortsOrch->getAclBindPortId(alias, port_id))
sai_object_id_t bind_port_id;
if (!gPortsOrch->getAclBindPortId(alias, bind_port_id))
{
inserter(port_id);
}
else
{
return false;
SWSS_LOG_ERROR("Failed to get port %s bind port ID for ACL table %s",
alias.c_str(), aclTable.id.c_str());
continue;
}
}

return true;
}

bool AclOrch::processPendingPort(AclTable &aclTable, string portAlias, std::function<void (sai_object_id_t)> inserter)
{
SWSS_LOG_ENTER();

SWSS_LOG_DEBUG("Processing ACL table port %s", portAlias.c_str());

sai_object_id_t port_id;

Port port;
if (!gPortsOrch->getPort(portAlias, port))
{
SWSS_LOG_INFO("Port %s not configured yet, add it to ACL table %s pending list", portAlias.c_str(), aclTable.description.c_str());
aclTable.pendingPortSet.insert(portAlias);
return true;
}

if (gPortsOrch->getAclBindPortId(portAlias, port_id))
{
inserter(port_id);
aclTable.bind(port_id);
}
else
{
return false;
aclTable.link(bind_port_id);
aclTable.portSet.emplace(alias);
}

return true;
Expand Down Expand Up @@ -2292,8 +2248,6 @@ bool AclOrch::processAclTableStage(string stage, acl_stage_type_t &acl_stage)
return true;
}



sai_object_id_t AclOrch::getTableById(string table_id)
{
SWSS_LOG_ENTER();
Expand All @@ -2320,7 +2274,8 @@ bool AclOrch::createBindAclTable(AclTable &aclTable, sai_object_id_t &table_oid)
sai_status_t status = bindAclTable(table_oid, aclTable);
if (status != SAI_STATUS_SUCCESS)
{
SWSS_LOG_ERROR("Failed to bind table %s to ports", aclTable.description.c_str());
SWSS_LOG_ERROR("Failed to bind table %s to ports",
aclTable.id.c_str());
return false;
}
return true;
Expand All @@ -2333,7 +2288,8 @@ sai_status_t AclOrch::deleteUnbindAclTable(sai_object_id_t table_oid)

if ((status = bindAclTable(table_oid, m_AclTables[table_oid], false)) != SAI_STATUS_SUCCESS)
{
SWSS_LOG_ERROR("Failed to unbind table %s", m_AclTables[table_oid].description.c_str());
SWSS_LOG_ERROR("Failed to unbind table %s",
m_AclTables[table_oid].id.c_str());
return status;
}

Expand Down
Loading

0 comments on commit 067928d

Please sign in to comment.