Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,14 @@ class ComponentManagerIsolated : public rclcpp_components::ComponentManager
{
std::shared_ptr<rclcpp::Executor> executor;
std::thread thread;
std::promise<void> promise;
};

public:
~ComponentManagerIsolated()
{
if (node_wrappers_.size()) {
for (auto & executor_wrapper : dedicated_executor_wrappers_) {
executor_wrapper.second.promise.set_value();
executor_wrapper.second.executor->cancel();
executor_wrapper.second.thread.join();
cancel_executor(executor_wrapper.second);
}
node_wrappers_.clear();
}
Expand All @@ -67,8 +64,8 @@ class ComponentManagerIsolated : public rclcpp_components::ComponentManager
exec->add_node(node_wrappers_[node_id].get_node_base_interface());
executor_wrapper.executor = exec;
executor_wrapper.thread = std::thread(
[exec, cancel_token = executor_wrapper.promise.get_future()]() {
exec->spin_until_future_complete(cancel_token);
[exec]() {
exec->spin();
});
dedicated_executor_wrappers_[node_id] = std::move(executor_wrapper);
}
Expand All @@ -81,14 +78,36 @@ class ComponentManagerIsolated : public rclcpp_components::ComponentManager
{
auto executor_wrapper = dedicated_executor_wrappers_.find(node_id);
if (executor_wrapper != dedicated_executor_wrappers_.end()) {
executor_wrapper->second.promise.set_value();
executor_wrapper->second.executor->cancel();
executor_wrapper->second.thread.join();
cancel_executor(executor_wrapper->second);
dedicated_executor_wrappers_.erase(executor_wrapper);
}
}

private:
/// Stops a spinning executor avoiding race conditions.
/**
* @param executor_wrapper executor to stop and its associated thread
*/
void cancel_executor(DedicatedExecutorWrapper & executor_wrapper)
{
// We can't immediately call the cancel() API on an executor because if it is not
// already spinning, this operation will have no effect.
// We rely on the assumption that this class creates executors and then immediately makes them
// spin in a separate thread, i.e. the time gap between when the executor is created and when
// it starts to spin is small (although it's not negligible).

while (!executor_wrapper.executor->is_spinning()) {
// This is an arbitrarily small delay to avoid busy looping
rclcpp::sleep_for(std::chrono::milliseconds(1));
}

// After the while loop we are sure that the executor is now spinning, so
// the call to cancel() will work.
executor_wrapper.executor->cancel();
// Wait for the thread task to return
executor_wrapper.thread.join();
}

std::unordered_map<uint64_t, DedicatedExecutorWrapper> dedicated_executor_wrappers_;
};

Expand Down