Skip to content

Commit 739ce80

Browse files
[flowstate_ros_bridge] Make page_size and other params configurable in ExecutiveBridge (#72)
* Make page_size and other params configurable Signed-off-by: Yadunund <[email protected]> * Change to integer types Signed-off-by: Luca Della Vedova <[email protected]> * Add support for multiple pages, remove page param Signed-off-by: Luca Della Vedova <[email protected]> --------- Signed-off-by: Yadunund <[email protected]> Signed-off-by: Luca Della Vedova <[email protected]> Co-authored-by: Luca Della Vedova <[email protected]>
1 parent 1262119 commit 739ce80

File tree

6 files changed

+56
-26
lines changed

6 files changed

+56
-26
lines changed

flowstate_ros_bridge/flowstate_ros_bridge.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,4 +12,6 @@ message FlowstateRosBridgeConfig {
1212
string flowstate_zenoh_router_address = 7;
1313
optional string external_zenoh_router_address = 8;
1414
repeated string bridge_plugins = 9;
15+
int64 executive_deadline_seconds = 10;
16+
int64 executive_update_rate_millis = 11;
1517
}

flowstate_ros_bridge/flowstate_ros_bridge_default_config.pbtxt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,4 +14,6 @@
1414
"flowstate_ros_bridge::WorldBridge",
1515
"flowstate_ros_bridge::ExecutiveBridge"
1616
],
17+
executive_deadline_seconds: 5
18+
executive_update_rate_millis: 1000
1719
}

flowstate_ros_bridge/src/bridges/world_bridge.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,7 @@ absl::Status WorldBridge::Data::SendObjectVisualizationMessages(
267267
}
268268
LOG(INFO) << "Total gltf size: " << total_gltf_size << " bytes";
269269

270-
if (!array_msg.markers.empty()){
270+
if (!array_msg.markers.empty()) {
271271
workcell_markers_pub_->publish(array_msg);
272272
}
273273

flowstate_ros_bridge/src/executive.cpp

Lines changed: 36 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ using GetSkillsResponse = intrinsic_proto::skills::GetSkillsResponse;
5454

5555
static constexpr std::string PROCESS_PARAMS_KEY = "parameters";
5656
static constexpr std::string RESOURCE_PARAMS_KEY = "resources";
57+
static constexpr std::size_t kListBehaviorTreesPageSize = 50;
58+
static constexpr std::size_t kListOperationsPageSize = 100;
5759

5860
///=============================================================================
5961
Executive::Executive(const std::string& executive_service_address,
@@ -261,18 +263,26 @@ void Executive::clear_and_delete_operations() {
261263
// First list active operations.
262264
auto client_context = make_client_context(this->deadline_seconds_);
263265
google::longrunning::ListOperationsRequest list_request;
264-
google::longrunning::ListOperationsResponse list_response;
265-
auto status = executive_stub_->ListOperations(
266-
client_context.get(), std::move(list_request), &list_response);
267-
if (status.ok()) {
266+
list_request.set_page_size(kListOperationsPageSize);
267+
do {
268+
google::longrunning::ListOperationsResponse list_response;
269+
const auto status = executive_stub_->ListOperations(
270+
client_context.get(), list_request, &list_response);
271+
272+
if (!status.ok()) {
273+
LOG(INFO) << "Error while listing operations.";
274+
return;
275+
}
268276
for (const auto& operation : list_response.operations()) {
269277
// TODO(Yadunund): If we want this bridge to not override
270278
// all the operations started by users from the frontend, we might
271279
// want to attach active operations to this bridge and monitor its
272280
// state until completion before clearing.
273281
delete_operation(operation.name(), executive_stub_, deadline_seconds_);
274282
}
283+
list_request.set_page_token(list_response.next_page_token());
275284
}
285+
while (!list_request.page_token().empty());
276286
}
277287

278288
///=============================================================================
@@ -307,15 +317,20 @@ auto Executive::behavior_trees() const
307317
client_context->set_deadline(std::chrono::system_clock::now() +
308318
std::chrono::seconds(deadline_seconds_));
309319
intrinsic_proto::solution::v1::ListBehaviorTreesRequest request;
310-
intrinsic_proto::solution::v1::ListBehaviorTreesResponse response;
311-
INTR_RETURN_IF_ERROR(
312-
intrinsic::ToAbslStatus(solution_stub_->ListBehaviorTrees(
313-
client_context.get(), request, &response)));
314-
320+
request.set_page_size(kListBehaviorTreesPageSize);
315321
std::vector<BehaviorTree> bts = {};
316-
for (auto& bt : *response.mutable_behavior_trees()) {
317-
bts.emplace_back(std::move(bt));
322+
do {
323+
intrinsic_proto::solution::v1::ListBehaviorTreesResponse response;
324+
INTR_RETURN_IF_ERROR(
325+
intrinsic::ToAbslStatus(solution_stub_->ListBehaviorTrees(
326+
client_context.get(), request, &response)));
327+
328+
for (auto& bt : *response.mutable_behavior_trees()) {
329+
bts.emplace_back(std::move(bt));
330+
}
331+
request.set_page_token(response.next_page_token());
318332
}
333+
while (!request.page_token().empty());
319334
return bts;
320335
}
321336

@@ -337,12 +352,10 @@ auto Executive::ProcessHandle::make(
337352
std::shared_ptr<ExecutiveService::Stub> executive_stub,
338353
std::size_t deadline_seconds, google::longrunning::Operation operation,
339354
ProcessFeedbackCallback feedback_cb, ProcessCompletedCallback completed_cb,
340-
std::size_t update_interval_millis)
341-
-> ProcessHandlePtr {
342-
auto handle = std::shared_ptr<ProcessHandle>(
343-
new ProcessHandle(std::move(executive_stub), std::move(deadline_seconds),
344-
std::move(operation), std::move(feedback_cb),
345-
std::move(completed_cb)));
355+
std::size_t update_interval_millis) -> ProcessHandlePtr {
356+
auto handle = std::shared_ptr<ProcessHandle>(new ProcessHandle(
357+
std::move(executive_stub), std::move(deadline_seconds),
358+
std::move(operation), std::move(feedback_cb), std::move(completed_cb)));
346359

347360
// Spawn a thread to monitor lifecycle of the process.
348361
handle->update_thread_ =
@@ -370,9 +383,8 @@ auto Executive::ProcessHandle::make(
370383
handle->feedback_cb_(false, absl::UnavailableError(ss.str()));
371384
}
372385

373-
handle->feedback_cb_(
374-
handle->current_operation_.done(),
375-
intrinsic::ToAbslStatus(status));
386+
handle->feedback_cb_(handle->current_operation_.done(),
387+
intrinsic::ToAbslStatus(status));
376388

377389
if (handle->current_operation_.done()) {
378390
// The operation may be done because it succeeded or errored
@@ -487,10 +499,10 @@ absl::StatusOr<Executive::ProcessHandlePtr> Executive::start(
487499
start_client_context.get(), std::move(start_request),
488500
&current_operation)));
489501

490-
current_process_ = ProcessHandle::make(
491-
executive_stub_, deadline_seconds_, std::move(current_operation),
492-
std::move(feedback_cb), std::move(completed_cb),
493-
this->update_rate_millis_);
502+
current_process_ =
503+
ProcessHandle::make(executive_stub_, deadline_seconds_,
504+
std::move(current_operation), std::move(feedback_cb),
505+
std::move(completed_cb), this->update_rate_millis_);
494506

495507
return current_process_;
496508
}

flowstate_ros_bridge/src/flowstate_ros_bridge.cpp

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,10 @@ constexpr const char* kWorldAddressParamName = "world_service_address";
4040
constexpr const char* kGeometryAddressParamName = "geometry_service_address";
4141
constexpr const char* kServiceTunnelParamName = "service_tunnel";
4242
constexpr const char* kBridgePluginParamName = "bridge_plugins";
43+
constexpr const char* kExecutiveDeadlineParamName =
44+
"executive_deadline_seconds";
45+
constexpr const char* kExecutiveUpdateRateMillisParamName =
46+
"executive_update_rate_millis";
4347

4448
///=============================================================================
4549
FlowstateROSBridge::FlowstateROSBridge(const rclcpp::NodeOptions& options)
@@ -56,6 +60,9 @@ FlowstateROSBridge::FlowstateROSBridge(const rclcpp::NodeOptions& options)
5660
service_tunnel.empty()
5761
? "executive.app-intrinsic-app-chart.svc.cluster.local:8080"
5862
: service_tunnel);
63+
this->declare_parameter(kExecutiveDeadlineParamName, 5);
64+
this->declare_parameter(kExecutiveUpdateRateMillisParamName, 1000);
65+
5966
this->declare_parameter(
6067
kSkillAddressParamName,
6168
service_tunnel.empty()
@@ -94,7 +101,10 @@ FlowstateROSBridge::FlowstateROSBridge(const rclcpp::NodeOptions& options)
94101
this->executive_ = std::make_shared<Executive>(
95102
this->get_parameter(kExecutiveAddressParamName).get_value<std::string>(),
96103
this->get_parameter(kSkillAddressParamName).get_value<std::string>(),
97-
this->get_parameter(kSolutionAddressParamName).get_value<std::string>());
104+
this->get_parameter(kSolutionAddressParamName).get_value<std::string>(),
105+
this->get_parameter(kExecutiveDeadlineParamName).get_value<std::size_t>(),
106+
this->get_parameter(kExecutiveUpdateRateMillisParamName)
107+
.get_value<std::size_t>());
98108

99109
// Initialize the world client.
100110
this->world_ = std::make_shared<World>(

flowstate_ros_bridge/src/flowstate_ros_bridge_main.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,10 @@ int main(int argc, char* argv[]) {
5959
// Get parameters from config
6060
params.emplace_back("executive_service_address",
6161
ros_config.executive_service_address());
62+
params.emplace_back("executive_deadline_seconds",
63+
ros_config.executive_deadline_seconds());
64+
params.emplace_back("executive_update_rate_millis",
65+
ros_config.executive_update_rate_millis());
6266
params.emplace_back("skill_registry_address",
6367
ros_config.skill_registry_address());
6468
params.emplace_back("solution_service_address",

0 commit comments

Comments
 (0)