Skip to content

Commit

Permalink
Only apply content filter to ALIVE changes (#4835)
Browse files Browse the repository at this point in the history
* Refs #20815: Add regression test

Signed-off-by: eduponz <[email protected]>

* Refs #20815: Only apply filter to ALIVE changes

Signed-off-by: eduponz <[email protected]>

* Refs #20815: Rename change_is_relevant_for_filter argument

Signed-off-by: eduponz <[email protected]>

* Refs #20815: Refactor test

Signed-off-by: eduponz <[email protected]>

* Refs #20815: Cast loop index to uint16_t for assigning it to the key field

Signed-off-by: eduponz <[email protected]>

---------

Signed-off-by: eduponz <[email protected]>
  • Loading branch information
EduPonz authored May 31, 2024
1 parent df90943 commit 863b990
Show file tree
Hide file tree
Showing 11 changed files with 326 additions and 42 deletions.
1 change: 1 addition & 0 deletions src/cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ set(${PROJECT_NAME}_source_files
rtps/participant/RTPSParticipant.cpp
rtps/participant/RTPSParticipantImpl.cpp
rtps/persistence/PersistenceFactory.cpp
rtps/reader/reader_utils.cpp
rtps/reader/RTPSReader.cpp
rtps/reader/StatefulPersistentReader.cpp
rtps/reader/StatefulReader.cpp
Expand Down
14 changes: 9 additions & 5 deletions src/cpp/fastdds/publisher/filtering/ReaderFilterCollection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,13 +133,17 @@ class ReaderFilterCollection
// Copy the signature
std::copy(entry.filter_signature.begin(), entry.filter_signature.end(), signature);

// Evaluate filter and update filtered_out_readers
bool filter_result = entry.filter->evaluate(change.serializedPayload, info, it->first);
if (!filter_result)
// Only evaluate filter on ALIVE changes, as UNREGISTERED and DISPOSED are always relevant
bool filter_result = true;
if (fastrtps::rtps::ALIVE == change.kind)
{
change.filtered_out_readers.emplace_back(it->first);
// Evaluate filter and update filtered_out_readers
filter_result = entry.filter->evaluate(change.serializedPayload, info, it->first);
if (!filter_result)
{
change.filtered_out_readers.emplace_back(it->first);
}
}

return filter_result;
};

Expand Down
11 changes: 6 additions & 5 deletions src/cpp/rtps/reader/StatefulReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
#include <fastdds/rtps/history/ReaderHistory.h>
#include <fastdds/rtps/reader/ReaderListener.h>

#include "reader_utils.hpp"
#include "rtps/RTPSDomainImpl.hpp"
#include <rtps/builtin/BuiltinProtocols.h>
#include <rtps/builtin/liveliness/WLP.h>
#include <rtps/DataSharing/DataSharingListener.hpp>
Expand All @@ -37,9 +39,6 @@
#include <rtps/participant/RTPSParticipantImpl.h>
#include <rtps/reader/WriterProxy.h>
#include <rtps/writer/LivelinessManager.hpp>

#include "rtps/RTPSDomainImpl.hpp"

#ifdef FASTDDS_STATISTICS
#include <statistics/types/monitorservice_types.hpp>
#endif // FASTDDS_STATISTICS
Expand Down Expand Up @@ -600,14 +599,15 @@ bool StatefulReader::processDataMsg(
return false;
}

if (data_filter_ && !data_filter_->is_relevant(*change, m_guid))
if (!fastdds::rtps::change_is_relevant_for_filter(*change, m_guid, data_filter_))
{
if (pWP)
{
pWP->irrelevant_change_set(change->sequenceNumber);
NotifyChanges(pWP);
send_ack_if_datasharing(this, mp_history, pWP, change->sequenceNumber);
}
// Change was filtered out, so there isn't anything else to do
return true;
}

Expand Down Expand Up @@ -783,7 +783,8 @@ bool StatefulReader::processDataFragMsg(

// Temporarilly assign the inline qos while evaluating the data filter
work_change->inline_qos = incomingChange->inline_qos;
bool filtered_out = data_filter_ && !data_filter_->is_relevant(*work_change, m_guid);
bool filtered_out =
!fastdds::rtps::change_is_relevant_for_filter(*work_change, m_guid, data_filter_);
work_change->inline_qos = SerializedPayload_t();

if (filtered_out)
Expand Down
11 changes: 6 additions & 5 deletions src/cpp/rtps/reader/StatelessReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,15 @@
#include <fastdds/rtps/history/ReaderHistory.h>
#include <fastdds/rtps/reader/ReaderListener.h>

#include "reader_utils.hpp"
#include "rtps/RTPSDomainImpl.hpp"
#include <rtps/builtin/BuiltinProtocols.h>
#include <rtps/builtin/liveliness/WLP.h>
#include <rtps/DataSharing/DataSharingListener.hpp>
#include <rtps/DataSharing/ReaderPool.hpp>
#include <rtps/participant/RTPSParticipantImpl.h>
#include <rtps/reader/StatelessReader.hpp>
#include <rtps/writer/LivelinessManager.hpp>

#include "rtps/RTPSDomainImpl.hpp"

#ifdef FASTDDS_STATISTICS
#include <statistics/types/monitorservice_types.hpp>
#endif // FASTDDS_STATISTICS
Expand Down Expand Up @@ -593,9 +592,10 @@ bool StatelessReader::processDataMsg(
return false;
}

if (data_filter_ && !data_filter_->is_relevant(*change, m_guid))
if (!fastdds::rtps::change_is_relevant_for_filter(*change, m_guid, data_filter_))
{
update_last_notified(change->writerGUID, change->sequenceNumber);
// Change was filtered out, so there isn't anything else to do
return true;
}

Expand Down Expand Up @@ -808,7 +808,8 @@ bool StatelessReader::processDataFragMsg(
{
// Temporarilly assign the inline qos while evaluating the data filter
change_completed->inline_qos = incomingChange->inline_qos;
bool filtered_out = data_filter_ && !data_filter_->is_relevant(*change_completed, m_guid);
bool filtered_out = !fastdds::rtps::change_is_relevant_for_filter(*change_completed, m_guid,
data_filter_);
change_completed->inline_qos = SerializedPayload_t();

if (filtered_out)
Expand Down
45 changes: 45 additions & 0 deletions src/cpp/rtps/reader/reader_utils.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright 2024 Proyectos y Sistemas de Mantenimiento SL (eProsima).
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

/**
* @file reader_utils.cpp
*/

#include "reader_utils.hpp"

#include <fastdds/rtps/common/ChangeKind_t.hpp>

namespace eprosima {
namespace fastdds {
namespace rtps {

bool change_is_relevant_for_filter(
const CacheChange& change,
const GUID& reader_guid,
const IReaderDataFilter* filter)
{
bool ret = true;

// Only evaluate filter on ALIVE changes, as UNREGISTERED and DISPOSED are always relevant
if ((nullptr != filter) && (fastrtps::rtps::ALIVE == change.kind) && (!filter->is_relevant(change, reader_guid)))
{
ret = false;
}

return ret;
}

} // namespace rtps
} // namespace fastdds
} // namespace eprosima
53 changes: 53 additions & 0 deletions src/cpp/rtps/reader/reader_utils.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright 2024 Proyectos y Sistemas de Mantenimiento SL (eProsima).
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

/**
* @file reader_utils.hpp
*/

#ifndef _FASTDDS_RTPS_READER_READERUTILS_H_
#define _FASTDDS_RTPS_READER_READERUTILS_H_

#include <fastdds/rtps/common/CacheChange.h>
#include <fastdds/rtps/common/Guid.h>
#include <fastdds/rtps/interfaces/IReaderDataFilter.hpp>
#include <fastdds/rtps/common/ChangeKind_t.hpp>

namespace eprosima {
namespace fastdds {
namespace rtps {

using CacheChange = fastrtps::rtps::CacheChange_t;
using GUID = fastrtps::rtps::GUID_t;

/**
* @brief Check if a change is relevant for a reader.
*
* @param change The CacheChange_t to be evaluated.
* @param reader_guid Reader's GUID_t.
* @param filter The IReaderDataFilter to be used.
*
* @return true if relevant, false otherwise.
*/
bool change_is_relevant_for_filter(
const CacheChange& change,
const GUID& reader_guid,
const IReaderDataFilter* filter);

} // namespace rtps
} // namespace fastdds
} // namespace eprosima


#endif // _FASTDDS_RTPS_READER_READERUTILS_H_
Loading

0 comments on commit 863b990

Please sign in to comment.