Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions rclpy/rclpy/executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -329,8 +329,8 @@ async def _execute_timer(self, tmr, _):
await await_or_execute(tmr.callback)

def _take_subscription(self, sub):
with sub.handle as capsule:
msg_info = _rclpy.rclpy_take(capsule, sub.msg_type, sub.raw)
with sub.handle:
msg_info = sub.handle.take_message(sub.msg_type, sub.raw)
if msg_info is not None:
return msg_info[0]
return None
Expand Down Expand Up @@ -512,10 +512,11 @@ def _wait_for_ready_callbacks(
# Construct a wait set
wait_set = _rclpy.rclpy_get_zero_initialized_wait_set()
with ExitStack() as context_stack:
sub_capsules = []
sub_handles = []
for sub in subscriptions:
try:
sub_capsules.append(context_stack.enter_context(sub.handle))
context_stack.enter_context(sub.handle)
sub_handles.append(sub.handle)
except InvalidHandle:
entity_count.num_subscriptions -= 1

Expand Down Expand Up @@ -562,8 +563,8 @@ def _wait_for_ready_callbacks(
context_capsule)

_rclpy.rclpy_wait_set_clear_entities(wait_set)
for sub_capsule in sub_capsules:
_rclpy.rclpy_wait_set_add_entity('subscription', wait_set, sub_capsule)
for sub_handle in sub_handles:
_rclpy.rclpy_wait_set_add_subscription(wait_set, sub_handle)
for cli_handle in client_handles:
_rclpy.rclpy_wait_set_add_client(wait_set, cli_handle)
for srv_capsule in service_handles:
Expand Down
14 changes: 6 additions & 8 deletions rclpy/rclpy/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -1257,21 +1257,20 @@ def create_publisher(
check_is_valid_msg_type(msg_type)
try:
with self.handle as node_capsule:
publisher_capsule = _rclpy.rclpy_create_publisher(
publisher_object = _rclpy.Publisher(
node_capsule, msg_type, topic, qos_profile.get_c_qos_profile())
except ValueError:
failed = True
if failed:
self._validate_topic_or_service_name(topic)

publisher_handle = Handle(publisher_capsule)
try:
publisher = Publisher(
publisher_handle, msg_type, topic, qos_profile,
publisher_object, msg_type, topic, qos_profile,
event_callbacks=event_callbacks or PublisherEventCallbacks(),
callback_group=callback_group)
except Exception:
publisher_handle.destroy()
publisher_object.destroy_when_not_in_use()
raise
self.__publishers.append(publisher)
self._wake_executor()
Expand Down Expand Up @@ -1335,21 +1334,20 @@ def create_subscription(
check_is_valid_msg_type(msg_type)
try:
with self.handle as capsule:
subscription_capsule = _rclpy.rclpy_create_subscription(
subscription_object = _rclpy.Subscription(
capsule, msg_type, topic, qos_profile.get_c_qos_profile())
except ValueError:
failed = True
if failed:
self._validate_topic_or_service_name(topic)

subscription_handle = Handle(subscription_capsule)
try:
subscription = Subscription(
subscription_handle, msg_type,
subscription_object, msg_type,
topic, callback, callback_group, qos_profile, raw,
event_callbacks=event_callbacks or SubscriptionEventCallbacks())
except Exception:
subscription_handle.destroy()
subscription_object.destroy_when_not_in_use()
raise
self.__subscriptions.append(subscription)
callback_group.add_entity(subscription)
Expand Down
31 changes: 15 additions & 16 deletions rclpy/rclpy/publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
from typing import TypeVar, Union

from rclpy.callback_groups import CallbackGroup
from rclpy.handle import Handle
from rclpy.impl.implementation_singleton import rclpy_implementation as _rclpy
from rclpy.qos import QoSProfile
from rclpy.qos_event import PublisherEventCallbacks
Expand All @@ -28,7 +27,7 @@ class Publisher:

def __init__(
self,
publisher_handle: Handle,
publisher_impl: _rclpy.Publisher,
msg_type: MsgType,
topic: str,
qos_profile: QoSProfile,
Expand All @@ -44,18 +43,18 @@ def __init__(
A publisher is used as a primary means of communication in a ROS system by publishing
messages on a ROS topic.

:param publisher_handle: Capsule pointing to the underlying ``rcl_publisher_t`` object.
:param publisher_impl: Publisher wrapping the underlying ``rcl_publisher_t`` object.
:param msg_type: The type of ROS messages the publisher will publish.
:param topic: The name of the topic the publisher will publish to.
:param qos_profile: The quality of service profile to apply to the publisher.
"""
self.__handle = publisher_handle
self.__publisher = publisher_impl
self.msg_type = msg_type
self.topic = topic
self.qos_profile = qos_profile

self.event_handlers: QoSEventHandler = event_callbacks.create_event_handlers(
callback_group, publisher_handle, topic)
callback_group, publisher_impl, topic)

def publish(self, msg: Union[MsgType, bytes]) -> None:
"""
Expand All @@ -65,32 +64,32 @@ def publish(self, msg: Union[MsgType, bytes]) -> None:
:raises: TypeError if the type of the passed message isn't an instance
of the provided type when the publisher was constructed.
"""
with self.handle as capsule:
with self.handle:
if isinstance(msg, self.msg_type):
_rclpy.rclpy_publish(capsule, msg)
self.__publisher.publish(msg)
elif isinstance(msg, bytes):
_rclpy.rclpy_publish_raw(capsule, msg)
self.__publisher.publish_raw(msg)
else:
raise TypeError('Expected {}, got {}'.format(self.msg_type, type(msg)))

def get_subscription_count(self) -> int:
"""Get the amount of subscribers that this publisher has."""
with self.handle as capsule:
return _rclpy.rclpy_publisher_get_subscription_count(capsule)
with self.handle:
return self.__publisher.get_subscription_count()

@property
def topic_name(self) -> str:
with self.handle as capsule:
return _rclpy.rclpy_publisher_get_topic_name(capsule)
with self.handle:
return self.__publisher.get_topic_name()

@property
def handle(self):
return self.__handle
return self.__publisher

def destroy(self):
for handler in self.event_handlers:
handler.destroy()
self.handle.destroy()
self.__publisher.destroy_when_not_in_use()

def assert_liveliness(self) -> None:
"""
Expand All @@ -99,5 +98,5 @@ def assert_liveliness(self) -> None:
If the QoS Liveliness policy is set to MANUAL_BY_TOPIC, the
application must call this at least as often as ``QoSProfile.liveliness_lease_duration``.
"""
with self.handle as capsule:
_rclpy.rclpy_assert_liveliness(capsule)
with self.handle:
_rclpy.rclpy_assert_liveliness(self.handle)
38 changes: 19 additions & 19 deletions rclpy/rclpy/qos_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import rclpy
from rclpy.callback_groups import CallbackGroup
from rclpy.handle import Handle
from rclpy.impl.implementation_singleton import rclpy_implementation as _rclpy
from rclpy.logging import get_logger
from rclpy.qos import qos_policy_name_from_kind
Expand Down Expand Up @@ -70,15 +69,16 @@ def __init__(
callback_group: CallbackGroup,
callback: Callable,
event_type: IntEnum,
parent_handle: Handle,
parent_impl,
):
# Waitable init adds self to callback_group
super().__init__(callback_group)
self.event_type = event_type
self.callback = callback

with parent_handle as parent_capsule:
self.__event = _rclpy.QoSEvent(parent_capsule, event_type)
with parent_impl:
self.__event = _rclpy.QoSEvent(parent_impl, event_type)

self._ready_to_take_data = False
self._event_index = None

Expand Down Expand Up @@ -150,25 +150,25 @@ def __init__(
self.use_default_callbacks = use_default_callbacks

def create_event_handlers(
self, callback_group: CallbackGroup, subscription_handle: Handle, topic_name: str,
self, callback_group: CallbackGroup, subscription: _rclpy.Subscription, topic_name: str,
) -> List[QoSEventHandler]:
with subscription_handle as subscription_capsule:
logger = get_logger(_rclpy.rclpy_get_subscription_logger_name(subscription_capsule))
with subscription:
logger = get_logger(subscription.get_logger_name())

event_handlers = []
if self.deadline:
event_handlers.append(QoSEventHandler(
callback_group=callback_group,
callback=self.deadline,
event_type=QoSSubscriptionEventType.RCL_SUBSCRIPTION_REQUESTED_DEADLINE_MISSED,
parent_handle=subscription_handle))
parent_impl=subscription))

if self.incompatible_qos:
event_handlers.append(QoSEventHandler(
callback_group=callback_group,
callback=self.incompatible_qos,
event_type=QoSSubscriptionEventType.RCL_SUBSCRIPTION_REQUESTED_INCOMPATIBLE_QOS,
parent_handle=subscription_handle))
parent_impl=subscription))
elif self.use_default_callbacks:
# Register default callback when not specified
try:
Expand All @@ -184,7 +184,7 @@ def _default_incompatible_qos_callback(event):
callback_group=callback_group,
callback=_default_incompatible_qos_callback,
event_type=event_type,
parent_handle=subscription_handle))
parent_impl=subscription))

except UnsupportedEventTypeError:
pass
Expand All @@ -194,14 +194,14 @@ def _default_incompatible_qos_callback(event):
callback_group=callback_group,
callback=self.liveliness,
event_type=QoSSubscriptionEventType.RCL_SUBSCRIPTION_LIVELINESS_CHANGED,
parent_handle=subscription_handle))
parent_impl=subscription))

if self.message_lost:
event_handlers.append(QoSEventHandler(
callback_group=callback_group,
callback=self.message_lost,
event_type=QoSSubscriptionEventType.RCL_SUBSCRIPTION_MESSAGE_LOST,
parent_handle=subscription_handle))
parent_impl=subscription))

return event_handlers

Expand Down Expand Up @@ -235,32 +235,32 @@ def __init__(
self.use_default_callbacks = use_default_callbacks

def create_event_handlers(
self, callback_group: CallbackGroup, publisher_handle: Handle, topic_name: str,
self, callback_group: CallbackGroup, publisher: _rclpy.Publisher, topic_name: str,
) -> List[QoSEventHandler]:
with publisher_handle as publisher_capsule:
logger = get_logger(_rclpy.rclpy_get_publisher_logger_name(publisher_capsule))
with publisher:
logger = get_logger(publisher.get_logger_name())

event_handlers = []
if self.deadline:
event_handlers.append(QoSEventHandler(
callback_group=callback_group,
callback=self.deadline,
event_type=QoSPublisherEventType.RCL_PUBLISHER_OFFERED_DEADLINE_MISSED,
parent_handle=publisher_handle))
parent_impl=publisher))

if self.liveliness:
event_handlers.append(QoSEventHandler(
callback_group=callback_group,
callback=self.liveliness,
event_type=QoSPublisherEventType.RCL_PUBLISHER_LIVELINESS_LOST,
parent_handle=publisher_handle))
parent_impl=publisher))

if self.incompatible_qos:
event_handlers.append(QoSEventHandler(
callback_group=callback_group,
callback=self.incompatible_qos,
event_type=QoSPublisherEventType.RCL_PUBLISHER_OFFERED_INCOMPATIBLE_QOS,
parent_handle=publisher_handle))
parent_impl=publisher))
elif self.use_default_callbacks:
# Register default callback when not specified
try:
Expand All @@ -275,7 +275,7 @@ def _default_incompatible_qos_callback(event):
callback_group=callback_group,
callback=_default_incompatible_qos_callback,
event_type=QoSPublisherEventType.RCL_PUBLISHER_OFFERED_INCOMPATIBLE_QOS,
parent_handle=publisher_handle))
parent_impl=publisher))

except UnsupportedEventTypeError:
pass
Expand Down
19 changes: 9 additions & 10 deletions rclpy/rclpy/subscription.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
from typing import TypeVar

from rclpy.callback_groups import CallbackGroup
from rclpy.handle import Handle
from rclpy.impl.implementation_singleton import rclpy_implementation as _rclpy
from rclpy.qos import QoSProfile
from rclpy.qos_event import QoSEventHandler
Expand All @@ -31,7 +30,7 @@ class Subscription:

def __init__(
self,
subscription_handle: Handle,
subscription_impl: _rclpy.Subscription,
msg_type: MsgType,
topic: str,
callback: Callable,
Expand All @@ -46,8 +45,8 @@ def __init__(
.. warning:: Users should not create a subscription with this constructor, instead they
should call :meth:`.Node.create_subscription`.

:param subscription_handle: :class:`Handle` wrapping the underlying ``rcl_subscription_t``
object.
:param subscription_impl: :class:`Subscription` wrapping the underlying
``rcl_subscription_t`` object.
:param msg_type: The type of ROS messages the subscription will subscribe to.
:param topic: The name of the topic the subscription will subscribe to.
:param callback: A user-defined callback function that is called when a message is
Expand All @@ -58,7 +57,7 @@ def __init__(
:param raw: If ``True``, then received messages will be stored in raw binary
representation.
"""
self.__handle = subscription_handle
self.__subscription = subscription_impl
self.msg_type = msg_type
self.topic = topic
self.callback = callback
Expand All @@ -69,18 +68,18 @@ def __init__(
self.raw = raw

self.event_handlers: QoSEventHandler = event_callbacks.create_event_handlers(
callback_group, subscription_handle, topic)
callback_group, subscription_impl, topic)

@property
def handle(self):
return self.__handle
return self.__subscription

def destroy(self):
for handler in self.event_handlers:
handler.destroy()
self.handle.destroy()
self.handle.destroy_when_not_in_use()

@property
def topic_name(self):
with self.handle as h:
return _rclpy.rclpy_get_subscription_topic_name(h)
with self.handle:
return self.__subscription.get_topic_name()
Loading