Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions flowstate_ros_bridge/flowstate_ros_bridge.proto
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,7 @@ message FlowstateRosBridgeConfig {
string flowstate_zenoh_router_address = 7;
optional string external_zenoh_router_address = 8;
repeated string bridge_plugins = 9;
string executive_deadline_seconds = 10;
string executive_update_rate_millis = 11;
string executive_page_size = 12;
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,7 @@
"flowstate_ros_bridge::WorldBridge",
"flowstate_ros_bridge::ExecutiveBridge"
],
executive_deadline_seconds: "5"
executive_update_rate_millis: "1000"
executive_page_size: "100"
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ class Executive : public std::enable_shared_from_this<Executive> {
* @param update_rate_millis The frequency in milliseconds at which the
* Executive performs periodic tasks (e.g., status checks, updates). Defaults
* to 1000 milliseconds (1 second).
* @param page_size The number of items to retrieve per page when listing
* resources. Defaults to 100.
*
* @note The validity or reachability of the provided addresses is typically
* checked upon attempting connection, not necessarily within this
Expand All @@ -85,7 +87,7 @@ class Executive : public std::enable_shared_from_this<Executive> {
const std::string &skill_registry_address,
const std::string &solution_service_address,
std::size_t deadline_seconds = 5,
std::size_t update_rate_millis = 1000);
std::size_t update_rate_millis = 1000, std::size_t page_size = 100);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed this with @luca-della-vedova who suggested relying on next_page_token to recursively get all pages. We change the arg to std::optional<std::size_t> with default nullopt.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually upon further thought I feel we should remove the parameter altogether and instead:

  • In the clear_and_delete_operations here, always iterate through all operations. The function's API promises to delete all currently running operations, deleting only one page goes against that.
  • In the behavior_trees here, either do the same as the above or make it a function parameter with default value, i.e.:
// Current
absl::StatusOr<std::vector<BehaviorTree>> behavior_trees() const;
// After, return the first `n` or all the behavior trees.
absl::StatusOr<std::vector<BehaviorTree>> behavior_trees(std::optional<std::size_t> n = std::nullopt) const;

But still my recommendation would still be to remove it altogether since it's not clear what the ordering of the behavior trees returned would be, and "first n" might mean anything from "alphabetical order", "last edit order", "creation order", non-guaranteed order, and unless we can guarantee a fixed ordering users should probably not rely on it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Yadunund This is an implementation of the approach where we always get all running operations and behavior trees 4142ba3


// Establish connections with various services.
absl::Status connect();
Expand Down Expand Up @@ -152,6 +154,7 @@ class Executive : public std::enable_shared_from_this<Executive> {
std::string solution_service_address_;
std::size_t deadline_seconds_;
std::size_t update_rate_millis_;
std::size_t page_size_;
bool connected_;
std::shared_ptr<ExecutiveService::Stub> executive_stub_;
std::unique_ptr<SkillRegistry::Stub> skill_registry_stub_;
Expand Down
2 changes: 1 addition & 1 deletion flowstate_ros_bridge/src/bridges/world_bridge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ absl::Status WorldBridge::Data::SendObjectVisualizationMessages(
}
LOG(INFO) << "Total gltf size: " << total_gltf_size << " bytes";

if (!array_msg.markers.empty()){
if (!array_msg.markers.empty()) {
workcell_markers_pub_->publish(array_msg);
}

Expand Down
28 changes: 14 additions & 14 deletions flowstate_ros_bridge/src/executive.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,13 @@ Executive::Executive(const std::string& executive_service_address,
const std::string& skill_registry_address,
const std::string& solution_service_address,
std::size_t deadline_seconds,
std::size_t update_rate_millis)
std::size_t update_rate_millis, std::size_t page_size)
: executive_service_address_(std::move(executive_service_address)),
skill_registry_address_(std::move(skill_registry_address)),
solution_service_address_(std::move(solution_service_address)),
deadline_seconds_(deadline_seconds),
update_rate_millis_(update_rate_millis),
page_size_(page_size),
connected_(false),
current_process_(nullptr) {
// Do nothing.
Expand Down Expand Up @@ -261,6 +262,7 @@ void Executive::clear_and_delete_operations() {
// First list active operations.
auto client_context = make_client_context(this->deadline_seconds_);
google::longrunning::ListOperationsRequest list_request;
list_request.set_page_size(page_size_);
google::longrunning::ListOperationsResponse list_response;
auto status = executive_stub_->ListOperations(
client_context.get(), std::move(list_request), &list_response);
Expand Down Expand Up @@ -307,6 +309,7 @@ auto Executive::behavior_trees() const
client_context->set_deadline(std::chrono::system_clock::now() +
std::chrono::seconds(deadline_seconds_));
intrinsic_proto::solution::v1::ListBehaviorTreesRequest request;
request.set_page_size(page_size_);
intrinsic_proto::solution::v1::ListBehaviorTreesResponse response;
INTR_RETURN_IF_ERROR(
intrinsic::ToAbslStatus(solution_stub_->ListBehaviorTrees(
Expand Down Expand Up @@ -337,12 +340,10 @@ auto Executive::ProcessHandle::make(
std::shared_ptr<ExecutiveService::Stub> executive_stub,
std::size_t deadline_seconds, google::longrunning::Operation operation,
ProcessFeedbackCallback feedback_cb, ProcessCompletedCallback completed_cb,
std::size_t update_interval_millis)
-> ProcessHandlePtr {
auto handle = std::shared_ptr<ProcessHandle>(
new ProcessHandle(std::move(executive_stub), std::move(deadline_seconds),
std::move(operation), std::move(feedback_cb),
std::move(completed_cb)));
std::size_t update_interval_millis) -> ProcessHandlePtr {
auto handle = std::shared_ptr<ProcessHandle>(new ProcessHandle(
std::move(executive_stub), std::move(deadline_seconds),
std::move(operation), std::move(feedback_cb), std::move(completed_cb)));

// Spawn a thread to monitor lifecycle of the process.
handle->update_thread_ =
Expand Down Expand Up @@ -370,9 +371,8 @@ auto Executive::ProcessHandle::make(
handle->feedback_cb_(false, absl::UnavailableError(ss.str()));
}

handle->feedback_cb_(
handle->current_operation_.done(),
intrinsic::ToAbslStatus(status));
handle->feedback_cb_(handle->current_operation_.done(),
intrinsic::ToAbslStatus(status));

if (handle->current_operation_.done()) {
// The operation may be done because it succeeded or errored
Expand Down Expand Up @@ -487,10 +487,10 @@ absl::StatusOr<Executive::ProcessHandlePtr> Executive::start(
start_client_context.get(), std::move(start_request),
&current_operation)));

current_process_ = ProcessHandle::make(
executive_stub_, deadline_seconds_, std::move(current_operation),
std::move(feedback_cb), std::move(completed_cb),
this->update_rate_millis_);
current_process_ =
ProcessHandle::make(executive_stub_, deadline_seconds_,
std::move(current_operation), std::move(feedback_cb),
std::move(completed_cb), this->update_rate_millis_);

return current_process_;
}
Expand Down
16 changes: 15 additions & 1 deletion flowstate_ros_bridge/src/flowstate_ros_bridge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ constexpr const char* kWorldAddressParamName = "world_service_address";
constexpr const char* kGeometryAddressParamName = "geometry_service_address";
constexpr const char* kServiceTunnelParamName = "service_tunnel";
constexpr const char* kBridgePluginParamName = "bridge_plugins";
constexpr const char* kExecutiveDeadlineParamName =
"executive_deadline_seconds";
constexpr const char* kExecutiveUpdateRateMillisParamName =
"executive_update_rate_millis";
constexpr const char* kExecutivePageSizeParamName = "executive_page_size";

///=============================================================================
FlowstateROSBridge::FlowstateROSBridge(const rclcpp::NodeOptions& options)
Expand All @@ -56,6 +61,10 @@ FlowstateROSBridge::FlowstateROSBridge(const rclcpp::NodeOptions& options)
service_tunnel.empty()
? "executive.app-intrinsic-app-chart.svc.cluster.local:8080"
: service_tunnel);
this->declare_parameter(kExecutiveDeadlineParamName, 5);
this->declare_parameter(kExecutiveUpdateRateMillisParamName, 1000);
this->declare_parameter(kExecutivePageSizeParamName, 100);

this->declare_parameter(
kSkillAddressParamName,
service_tunnel.empty()
Expand Down Expand Up @@ -94,7 +103,12 @@ FlowstateROSBridge::FlowstateROSBridge(const rclcpp::NodeOptions& options)
this->executive_ = std::make_shared<Executive>(
this->get_parameter(kExecutiveAddressParamName).get_value<std::string>(),
this->get_parameter(kSkillAddressParamName).get_value<std::string>(),
this->get_parameter(kSolutionAddressParamName).get_value<std::string>());
this->get_parameter(kSolutionAddressParamName).get_value<std::string>(),
this->get_parameter(kExecutiveDeadlineParamName).get_value<std::size_t>(),
this->get_parameter(kExecutiveUpdateRateMillisParamName)
.get_value<std::size_t>(),
this->get_parameter(kExecutivePageSizeParamName)
.get_value<std::size_t>());

// Initialize the world client.
this->world_ = std::make_shared<World>(
Expand Down
5 changes: 5 additions & 0 deletions flowstate_ros_bridge/src/flowstate_ros_bridge_main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ int main(int argc, char* argv[]) {
// Get parameters from config
params.emplace_back("executive_service_address",
ros_config.executive_service_address());
params.emplace_back("executive_deadline_seconds",
ros_config.executive_deadline_seconds());
params.emplace_back("executive_update_rate_millis",
ros_config.executive_update_rate_millis());
params.emplace_back("executive_page_size", ros_config.executive_page_size());
params.emplace_back("skill_registry_address",
ros_config.skill_registry_address());
params.emplace_back("solution_service_address",
Expand Down
Loading