Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions ros2bag/ros2bag/verb/play.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,13 @@
import yaml


def positive_float(arg: str) -> float:
value = float(arg)
if value <= 0:
raise ValueError(f'Value {value} is less than or equal to zero.')
return value


class PlayVerb(VerbExtension):
"""Play back ROS data from a bag."""

Expand Down Expand Up @@ -71,6 +78,10 @@ def add_arguments(self, parser, cli_name): # noqa: D102
' pragmas: [\"<setting_name>\" = <setting_value>]'
'Note that applicable settings are limited to read-only for ros2 bag play.'
'For a list of sqlite3 settings, refer to sqlite3 documentation')
parser.add_argument(
'--clock', type=positive_float, nargs='?', const=40, default=0,
help='Publish to /clock at a specific frequency in Hz, to act as a ROS Time Source. '
'Value must be positive. Defaults to not publishing.')

def main(self, *, args): # noqa: D102
qos_profile_overrides = {} # Specify a valid default
Expand Down Expand Up @@ -104,6 +115,7 @@ def main(self, *, args): # noqa: D102
play_options.topic_qos_profile_overrides = qos_profile_overrides
play_options.loop = args.loop
play_options.topic_remapping_options = topic_remapping
play_options.clock_publish_frequency = args.clock

player = Player()
player.play(storage_options, play_options)
1 change: 1 addition & 0 deletions rosbag2_py/src/rosbag2_py/_transport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ PYBIND11_MODULE(_transport, m) {
&PlayOptions::setTopicQoSProfileOverrides)
.def_readwrite("loop", &PlayOptions::loop)
.def_readwrite("topic_remapping_options", &PlayOptions::topic_remapping_options)
.def_readwrite("clock_publish_frequency", &PlayOptions::clock_publish_frequency)
;

py::class_<RecordOptions>(m, "RecordOptions")
Expand Down
6 changes: 6 additions & 0 deletions rosbag2_transport/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,12 @@ function(create_tests_for_rmw_implementation)
${SKIP_TEST}
LINK_LIBS rosbag2_transport
AMENT_DEPS rosbag2_cpp rosbag2_storage test_msgs rosbag2_test_common)

rosbag2_transport_add_gmock(test_play_publish_clock
test/rosbag2_transport/test_play_publish_clock.cpp
${SKIP_TEST}
LINK_LIBS rosbag2_transport
AMENT_DEPS rosbag2_cpp rosbag2_storage test_msgs rosbag2_test_common)
endfunction()

if(BUILD_TESTING)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ struct PlayOptions
std::unordered_map<std::string, rclcpp::QoS> topic_qos_profile_overrides = {};
bool loop = false;
std::vector<std::string> topic_remapping_options = {};

// Rate in Hz at which to publish to /clock.
// 0 (or negative) means that no publisher will be created
double clock_publish_frequency = 0.0;
};

} // namespace rosbag2_transport
Expand Down
16 changes: 16 additions & 0 deletions rosbag2_transport/src/rosbag2_transport/player.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,22 @@ void Player::prepare_clock(const PlayOptions & options, rcutils_time_point_value
{
double rate = options.rate > 0.0 ? options.rate : 1.0;
clock_ = std::make_unique<rosbag2_cpp::TimeControllerClock>(starting_time, rate);

// Create /clock publisher
if (options.clock_publish_frequency > 0.f) {
const auto publish_period = std::chrono::nanoseconds(
static_cast<uint64_t>(RCUTILS_S_TO_NS(1) / options.clock_publish_frequency));
// NOTE: PlayerClock does not own this publisher because rosbag2_cpp
// should not own transport-based functionality
clock_publisher_ = transport_node_->create_publisher<rosgraph_msgs::msg::Clock>(
"/clock", rclcpp::ClockQoS());
clock_publish_timer_ = transport_node_->create_wall_timer(
publish_period, [this]() {
auto msg = rosgraph_msgs::msg::Clock();
msg.clock = rclcpp::Time(clock_->now());
clock_publisher_->publish(msg);
});
}
}

} // namespace rosbag2_transport
4 changes: 4 additions & 0 deletions rosbag2_transport/src/rosbag2_transport/player.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@
#include "moodycamel/readerwriterqueue.h"

#include "rclcpp/node.hpp"
#include "rclcpp/publisher.hpp"
#include "rclcpp/qos.hpp"

#include "rosbag2_cpp/clocks/player_clock.hpp"
#include "rosbag2_storage/serialized_bag_message.hpp"
#include "rosbag2_transport/play_options.hpp"
#include "rosgraph_msgs/msg/clock.hpp"

namespace rosbag2_cpp
{
Expand Down Expand Up @@ -70,6 +72,8 @@ class Player
std::unordered_map<std::string, std::shared_ptr<rclcpp::GenericPublisher>> publishers_;
std::unordered_map<std::string, rclcpp::QoS> topic_qos_profile_overrides_;
std::unique_ptr<rosbag2_cpp::PlayerClock> clock_;
rclcpp::Publisher<rosgraph_msgs::msg::Clock>::SharedPtr clock_publisher_;
std::shared_ptr<rclcpp::TimerBase> clock_publish_timer_;
};

} // namespace rosbag2_transport
Expand Down
16 changes: 14 additions & 2 deletions rosbag2_transport/src/rosbag2_transport/rosbag2_transport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

#include "rclcpp/rclcpp.hpp"

#include "rcpputils/scope_exit.hpp"
#include "rcutils/time.h"

#include "rosbag2_cpp/reader.hpp"
Expand Down Expand Up @@ -89,9 +90,20 @@ std::shared_ptr<rclcpp::Node> Rosbag2Transport::setup_node(
void Rosbag2Transport::play(
const rosbag2_storage::StorageOptions & storage_options, const PlayOptions & play_options)
{
auto transport_node =
setup_node(play_options.node_prefix, play_options.topic_remapping_options);
rclcpp::executors::SingleThreadedExecutor exec;
exec.add_node(transport_node);
auto spin_thread = std::thread(
[&exec]() {
exec.spin();
});
auto exit = rcpputils::scope_exit(
[&exec, &spin_thread]() {
exec.cancel();
spin_thread.join();
});
try {
auto transport_node =
setup_node(play_options.node_prefix, play_options.topic_remapping_options);
Player player(reader_, transport_node);
do {
reader_->open(storage_options, {"", rmw_get_serialization_format()});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include <memory>

#include "rosbag2_test_common/subscription_manager.hpp"
#include "rosbag2_transport_test_fixture.hpp"

class RosBag2PlayTestFixture : public Rosbag2TransportTestFixture
Expand Down
115 changes: 115 additions & 0 deletions rosbag2_transport/test/rosbag2_transport/test_play_publish_clock.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <gmock/gmock.h>

#include <memory>
#include <utility>
#include <vector>

#include "rosbag2_transport/rosbag2_transport.hpp"
#include "rosgraph_msgs/msg/clock.hpp"
#include "test_msgs/message_fixtures.hpp"

#include "rosbag2_play_test_fixture.hpp"

using namespace ::testing; // NOLINT

namespace
{
rcutils_duration_value_t period_for_frequency(double frequency)
{
return static_cast<rcutils_duration_value_t>(RCUTILS_S_TO_NS(1) / frequency);
}
} // namespace

class ClockPublishFixture : public RosBag2PlayTestFixture
{
public:
ClockPublishFixture()
: RosBag2PlayTestFixture()
{
// Fake bag setup
auto topic_types = std::vector<rosbag2_storage::TopicMetadata>{
{"topic1", "test_msgs/BasicTypes", "", ""},
};

std::vector<std::shared_ptr<rosbag2_storage::SerializedBagMessage>> messages;
for (size_t i = 0; i < messages_to_play_; i++) {
auto message = get_messages_basic_types()[0];
message->int32_value = static_cast<int32_t>(i);
messages.push_back(
serialize_test_message("topic1", milliseconds_between_messages_ * i, message));
}

// Player setup
auto prepared_mock_reader = std::make_unique<MockSequentialReader>();
prepared_mock_reader->prepare(messages, topic_types);
reader_ = std::make_unique<rosbag2_cpp::Reader>(std::move(prepared_mock_reader));

sub_->add_subscription<rosgraph_msgs::msg::Clock>(
"/clock", expected_clock_messages_, rclcpp::ClockQoS());
}

void run_test()
{
auto await_received_messages = sub_->spin_subscriptions();
rosbag2_transport::Rosbag2Transport rosbag2_transport(reader_, writer_);
rosbag2_transport.play(storage_options_, play_options_);
await_received_messages.get();

// Check that we got enough messages
auto received_clock = sub_->get_received_messages<rosgraph_msgs::msg::Clock>("/clock");
EXPECT_THAT(received_clock, SizeIs(Ge(expected_clock_messages_)));

// Check time deltas between messages
const auto expect_clock_delta =
period_for_frequency(play_options_.clock_publish_frequency) * play_options_.rate;
const auto allowed_error = static_cast<rcutils_duration_value_t>(
expect_clock_delta * error_tolerance_);
// On Windows, publishing seems to take time to "warm up", ignore the first couple messages
const auto start_message = 2;
for (size_t i = start_message; i < expected_clock_messages_ - 1; i++) {
auto current = rclcpp::Time(received_clock[i]->clock).nanoseconds();
auto next = rclcpp::Time(received_clock[i + 1]->clock).nanoseconds();
auto delta = next - current;
auto error = std::abs(delta - expect_clock_delta);
EXPECT_LE(error, allowed_error) << "Message was too far from next: " << i;
}
}

// Number of bag messages to publish
size_t messages_to_play_ = 10;
const int64_t milliseconds_between_messages_ = 50;

// Amount of error allowed between expected and actual clock deltas (expected to be much smaller)
const double error_tolerance_ = 0.3;

// Wait for just a few clock messages, we're checking the time between them, not the total count
size_t expected_clock_messages_ = 6;
};

TEST_F(ClockPublishFixture, clock_is_published_at_chosen_frequency)
{
play_options_.clock_publish_frequency = 20;
run_test();
}

TEST_F(ClockPublishFixture, clock_respects_playback_rate)
{
play_options_.clock_publish_frequency = 20;
play_options_.rate = 0.5;
messages_to_play_ = 5;
run_test();
}