Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion rclpy/rclpy/impl/_rclpy_pybind11.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ from rclpy.impl import service_introspection as service_introspection
from rclpy.node import Node as RCLPyNode
from rclpy.parameter import Parameter
from rclpy.subscription import MessageInfo
from rclpy.subscription_content_filter_options import ContentFilterOptions
from rclpy.task import Future
from rclpy.task import Task
from rclpy.type_support import (Action, FeedbackMessage, FeedbackT, GetResultServiceRequest,
Expand Down Expand Up @@ -600,7 +601,8 @@ class Timer(Destroyable):
class Subscription(Destroyable, Generic[MsgT]):

def __init__(self, node: Node, pymsg_type: type[MsgT], topic: str,
pyqos_profile: rmw_qos_profile_t) -> None: ...
pyqos_profile: rmw_qos_profile_t,
content_filter_options: Optional[ContentFilterOptions] = None) -> None: ...

@property
def pointer(self) -> int:
Expand All @@ -624,6 +626,15 @@ class Subscription(Destroyable, Generic[MsgT]):
def clear_on_new_message_callback(self) -> None:
"""Clear the on new message callback function for the subscription."""

def is_cft_enabled(self) -> bool:
"""Check if content filtering is enabled for this subscription."""

def set_content_filter(self, filter_expression: str, expression_parameters: list[str]) -> None:
"""Set the filter expression and expression parameters for the subscription."""

def get_content_filter(self) -> ContentFilterOptions:
"""Get the filter expression and expression parameters for the subscription."""


class rcl_time_point_t:

Expand Down
14 changes: 10 additions & 4 deletions rclpy/rclpy/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
from rclpy.subscription import GenericSubscriptionCallback
from rclpy.subscription import Subscription
from rclpy.subscription import SubscriptionCallbackUnion
from rclpy.subscription_content_filter_options import ContentFilterOptions
from rclpy.time_source import TimeSource
from rclpy.timer import Rate
from rclpy.timer import Timer
Expand Down Expand Up @@ -1642,7 +1643,8 @@ def create_subscription(
callback_group: Optional[CallbackGroup] = None,
event_callbacks: Optional[SubscriptionEventCallbacks] = None,
qos_overriding_options: Optional[QoSOverridingOptions] = None,
raw: Literal[True]
raw: Literal[True],
content_filter_options: Optional[ContentFilterOptions] = None
) -> Subscription[MsgT]: ...

@overload
Expand All @@ -1656,7 +1658,8 @@ def create_subscription(
callback_group: Optional[CallbackGroup] = None,
event_callbacks: Optional[SubscriptionEventCallbacks] = None,
qos_overriding_options: Optional[QoSOverridingOptions] = None,
raw: bool = False
raw: bool = False,
content_filter_options: Optional[ContentFilterOptions] = None
) -> Subscription[MsgT]: ...

def create_subscription(
Expand All @@ -1669,7 +1672,8 @@ def create_subscription(
callback_group: Optional[CallbackGroup] = None,
event_callbacks: Optional[SubscriptionEventCallbacks] = None,
qos_overriding_options: Optional[QoSOverridingOptions] = None,
raw: bool = False
raw: bool = False,
content_filter_options: Optional[ContentFilterOptions] = None
) -> Subscription[MsgT]:
"""
Create a new subscription.
Expand All @@ -1687,6 +1691,7 @@ def create_subscription(
:param event_callbacks: User-defined callbacks for middleware events.
:param raw: If ``True``, then received messages will be stored in raw binary
representation.
:param content_filter_options: The filter expression and parameters for content filtering.
"""
qos_profile = self._validate_qos_or_depth_parameter(qos_profile)

Expand Down Expand Up @@ -1714,7 +1719,8 @@ def create_subscription(
try:
with self.handle:
subscription_object = _rclpy.Subscription(
self.handle, msg_type, topic, qos_profile.get_c_qos_profile())
self.handle, msg_type, topic, qos_profile.get_c_qos_profile(),
content_filter_options)
except ValueError:
failed = True
if failed:
Expand Down
32 changes: 32 additions & 0 deletions rclpy/rclpy/subscription.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from rclpy.event_handler import SubscriptionEventCallbacks
from rclpy.impl.implementation_singleton import rclpy_implementation as _rclpy
from rclpy.qos import QoSProfile
from rclpy.subscription_content_filter_options import ContentFilterOptions
from rclpy.type_support import MsgT
from typing_extensions import TypeAlias

Expand All @@ -48,6 +49,9 @@ class MessageInfo(TypedDict):
publisher_gid: Optional[PublisherGID]


# Re-export exception defined in _rclpy C extension.
RCLError = _rclpy.RCLError

# Left to support Legacy TypeVars.
MsgType = TypeVar('MsgType')

Expand Down Expand Up @@ -200,6 +204,34 @@ def logger_name(self) -> str:
with self.handle:
return self.__subscription.get_logger_name()

@property
def is_cft_enabled(self) -> bool:
"""Check if content filtering is enabled for the subscription."""
with self.handle:
return self.__subscription.is_cft_enabled()

def set_content_filter(self, filter_expression: str, expression_parameters: list[str]) -> None:
"""
Set the filter expression and expression parameters for the subscription.

:param filter_expression: The filter expression to set.
:param expression_parameters: The expression parameters to set.
:raises: RCLError if internal error occurred when calling the rcl function.
"""
with self.handle:
self.__subscription.set_content_filter(filter_expression, expression_parameters)

def get_content_filter(self) -> ContentFilterOptions:
"""
Get the filter expression and expression parameters for the subscription.

:return: ContentFilterOptions object containing the filter expression and expression
parameters.
:raises: RCLError if internal error occurred when calling the rcl function.
"""
with self.handle:
return self.__subscription.get_content_filter()

def __enter__(self) -> 'Subscription[MsgT]':
return self

Expand Down
22 changes: 22 additions & 0 deletions rclpy/rclpy/subscription_content_filter_options.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Copyright 2025 Sony Group Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import NamedTuple


class ContentFilterOptions(NamedTuple):
"""Options to configure content filtered topic in the subscription."""

filter_expression: str
expression_parameters: list[str]
140 changes: 137 additions & 3 deletions rclpy/src/rclpy/subscription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

#include <pybind11/pybind11.h>
#include <pybind11/stl.h>

#include <rcl/error_handling.h>
#include <rcl/node.h>
Expand All @@ -21,10 +22,14 @@
#include <rosidl_runtime_c/message_type_support_struct.h>
#include <rmw/types.h>

#include <cstddef>
#include <memory>
#include <stdexcept>
#include <string>
#include <utility>
#include <vector>

#include <rcpputils/scope_exit.hpp>

#include "exceptions.hpp"
#include "node.hpp"
Expand All @@ -39,9 +44,25 @@ namespace rclpy
{
using events_executor::RclEventCallbackTrampoline;

namespace
{
std::vector<const char *>
get_c_vector_string(const std::vector<std::string> & strings_in)
{
std::vector<const char *> cstrings;
cstrings.reserve(strings_in.size());

for (size_t i = 0; i < strings_in.size(); ++i) {
cstrings.push_back(strings_in[i].c_str());
}

return cstrings;
}
} // namespace

Subscription::Subscription(
Node & node, py::object pymsg_type, std::string topic,
py::object pyqos_profile)
py::object pyqos_profile, py::object content_filter_options)
: node_(node)
{
auto msg_type = static_cast<rosidl_message_type_support_t *>(
Expand Down Expand Up @@ -75,6 +96,24 @@ Subscription::Subscription(

*rcl_subscription_ = rcl_get_zero_initialized_subscription();

std::string filter_expression;
std::vector<std::string> expression_parameters;
if (!content_filter_options.is_none()) {
filter_expression = content_filter_options.attr("filter_expression").cast<std::string>();
expression_parameters =
content_filter_options.attr("expression_parameters").cast<std::vector<std::string>>();
std::vector<const char *> cstrings =
get_c_vector_string(expression_parameters);
rcl_ret_t ret = rcl_subscription_options_set_content_filter_options(
filter_expression.c_str(),
cstrings.size(),
cstrings.data(),
&subscription_ops);
if (RCL_RET_OK != ret) {
throw rclpy::RCLError("Failed to set content_filter_options");
}
}

rcl_ret_t ret = rcl_subscription_init(
rcl_subscription_.get(), node_.rcl_ptr(), msg_type,
topic.c_str(), &subscription_ops);
Expand Down Expand Up @@ -237,11 +276,98 @@ Subscription::clear_on_new_message_callback()
}
}

bool
Subscription::is_cft_enabled() const
{
return rcl_subscription_is_cft_enabled(rcl_subscription_.get());
}

void
Subscription::set_content_filter(
const std::string & filter_expression,
const std::vector<std::string> & expression_parameters)
{
rcl_subscription_content_filter_options_t options =
rcl_get_zero_initialized_subscription_content_filter_options();
std::vector<const char *> cstrings = get_c_vector_string(expression_parameters);
rcl_ret_t ret = rcl_subscription_content_filter_options_init(
rcl_subscription_.get(),
filter_expression.c_str(),
cstrings.size(),
cstrings.data(),
&options);
if (RCL_RET_OK != ret) {
throw RCLError("Failed to init subscription content_filtered_topic option");
}

RCPPUTILS_SCOPE_EXIT(
{
rcl_ret_t ret = rcl_subscription_content_filter_options_fini(
rcl_subscription_.get(), &options);
if (RCL_RET_OK != ret) {
throw RCLError(
"Failed to fini subscription content_filtered_topic option: " +
std::string(rcl_get_error_string().str));
}
});

ret = rcl_subscription_set_content_filter(
rcl_subscription_.get(),
&options);
if (RCL_RET_OK != ret) {
throw RCLError("Failed to set cft expression parameters");
}
}

py::object
Subscription::get_content_filter() const
{
rcl_subscription_content_filter_options_t options =
rcl_get_zero_initialized_subscription_content_filter_options();

rcl_ret_t ret = rcl_subscription_get_content_filter(
rcl_subscription_.get(),
&options);
if (RCL_RET_OK != ret) {
throw RCLError("Failed to get cft expression parameters");
}

RCPPUTILS_SCOPE_EXIT(
{
rcl_ret_t ret = rcl_subscription_content_filter_options_fini(
rcl_subscription_.get(), &options);
if (RCL_RET_OK != ret) {
throw RCLError(
"Failed to fini subscription content_filtered_topic option: " +
std::string(rcl_get_error_string().str));
}
});

rmw_subscription_content_filter_options_t & content_filter_options =
options.rmw_subscription_content_filter_options;
std::vector<std::string> expression_parameters;
for (size_t i = 0; i < content_filter_options.expression_parameters.size; ++i) {
expression_parameters.push_back(content_filter_options.expression_parameters.data[i]);
}

py::object content_filter_options_class =
py::module_::import("rclpy.subscription_content_filter_options").attr("ContentFilterOptions");

return content_filter_options_class(
std::string(content_filter_options.filter_expression),
expression_parameters);
}

void
define_subscription(py::object module)
{
py::class_<Subscription, Destroyable, std::shared_ptr<Subscription>>(module, "Subscription")
.def(py::init<Node &, py::object, std::string, py::object>())
.def(py::init<Node &, py::object, std::string, py::object, py::object>(),
py::arg("node"),
py::arg("msg_type"),
py::arg("topic"),
py::arg("qos_profile"),
py::arg("content_filter_options") = py::none())
.def_property_readonly(
"pointer", [](const Subscription & subscription) {
return reinterpret_cast<size_t>(subscription.rcl_ptr());
Expand All @@ -262,6 +388,14 @@ define_subscription(py::object module)
.def(
"set_on_new_message_callback", &Subscription::set_on_new_message_callback,
py::arg("callback"))
.def("clear_on_new_message_callback", &Subscription::clear_on_new_message_callback);
.def("clear_on_new_message_callback", &Subscription::clear_on_new_message_callback)
.def("is_cft_enabled", &Subscription::is_cft_enabled,
"Check if content filtering is enabled for this subscription.")
.def(
"set_content_filter", &Subscription::set_content_filter,
"Set the filter expression and expression parameters for the subscription.")
.def(
"get_content_filter", &Subscription::get_content_filter,
"Get the filter expression and expression parameters for the subscription.");
}
} // namespace rclpy
Loading