Skip to content

Commit d195b67

Browse files
Add wait for player to be ready for playback in Player::play_next() method (ros2#814)
Signed-off-by: Michael Orlov <[email protected]>
1 parent 30db1a0 commit d195b67

File tree

3 files changed

+30
-28
lines changed

3 files changed

+30
-28
lines changed

rosbag2_transport/include/rosbag2_transport/player.hpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -118,13 +118,14 @@ class Player : public rclcpp::Node
118118
/// published or rclcpp context shut down.
119119
/// \note If internal player queue is starving and storage has not been completely loaded,
120120
/// this method will wait until new element will be pushed to the queue.
121-
/// \return true if Player::play() has been started, player in pause mode and successfully
122-
/// played next message, otherwise false.
121+
/// \return true if player in pause mode and successfully played next message, otherwise false.
123122
ROSBAG2_TRANSPORT_PUBLIC
124123
bool play_next();
125124

126125
protected:
127-
std::atomic<bool> playing_messages_from_queue_{false};
126+
bool is_ready_to_play_from_queue_{false};
127+
std::mutex ready_to_play_from_queue_mutex_;
128+
std::condition_variable ready_to_play_from_queue_cv_;
128129
rclcpp::Publisher<rosgraph_msgs::msg::Clock>::SharedPtr clock_publisher_;
129130
std::unordered_map<std::string, std::shared_ptr<rclcpp::GenericPublisher>> publishers_;
130131

@@ -151,7 +152,6 @@ class Player : public rclcpp::Node
151152
std::mutex skip_message_in_main_play_loop_mutex_;
152153
bool skip_message_in_main_play_loop_ RCPPUTILS_TSA_GUARDED_BY
153154
(skip_message_in_main_play_loop_mutex_) = false;
154-
std::atomic_bool is_in_play_{false};
155155

156156
rclcpp::Service<rosbag2_interfaces::srv::Pause>::SharedPtr srv_pause_;
157157
rclcpp::Service<rosbag2_interfaces::srv::Resume>::SharedPtr srv_resume_;

rosbag2_transport/src/rosbag2_transport/player.cpp

Lines changed: 24 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -212,16 +212,12 @@ bool Player::is_storage_completely_loaded() const
212212

213213
void Player::play()
214214
{
215-
is_in_play_ = true;
216-
217215
float delay;
218216
if (play_options_.delay >= 0.0) {
219217
delay = play_options_.delay;
220218
} else {
221219
RCLCPP_WARN(
222-
this->get_logger(),
223-
"Invalid delay value: %f. Delay is disabled.",
224-
play_options_.delay);
220+
this->get_logger(), "Invalid delay value: %f. Delay is disabled.", play_options_.delay);
225221
delay = 0.0;
226222
}
227223

@@ -237,18 +233,28 @@ void Player::play()
237233
reader_->get_metadata().starting_time.time_since_epoch()).count();
238234
clock_->jump(starting_time);
239235

240-
storage_loading_future_ = std::async(
241-
std::launch::async,
242-
[this]() {load_storage_content();});
236+
storage_loading_future_ = std::async(std::launch::async, [this]() {load_storage_content();});
243237

244238
wait_for_filled_queue();
239+
{ // Notify play_next() that we are ready for playback
240+
std::lock_guard<std::mutex> lk(ready_to_play_from_queue_mutex_);
241+
is_ready_to_play_from_queue_ = true;
242+
ready_to_play_from_queue_cv_.notify_all();
243+
}
245244
play_messages_from_queue();
245+
{
246+
std::lock_guard<std::mutex> lk(ready_to_play_from_queue_mutex_);
247+
is_ready_to_play_from_queue_ = false;
248+
ready_to_play_from_queue_cv_.notify_all();
249+
}
246250
reader_->close();
247251
} while (rclcpp::ok() && play_options_.loop);
248252
} catch (std::runtime_error & e) {
249253
RCLCPP_ERROR(this->get_logger(), "Failed to play: %s", e.what());
250254
}
251-
is_in_play_ = false;
255+
std::lock_guard<std::mutex> lk(ready_to_play_from_queue_mutex_);
256+
is_ready_to_play_from_queue_ = false;
257+
ready_to_play_from_queue_cv_.notify_all();
252258
}
253259

254260
void Player::pause()
@@ -299,14 +305,19 @@ rosbag2_storage::SerializedBagMessageSharedPtr * Player::peek_next_message_from_
299305

300306
bool Player::play_next()
301307
{
302-
// Temporary take over playback from play_messages_from_queue()
303-
std::lock_guard<std::mutex> lk(skip_message_in_main_play_loop_mutex_);
304-
305-
if (!clock_->is_paused() || !is_in_play_) {
308+
if (!clock_->is_paused()) {
306309
return false;
307310
}
308311

312+
// Temporary take over playback from play_messages_from_queue()
313+
std::lock_guard<std::mutex> main_play_loop_lk(skip_message_in_main_play_loop_mutex_);
309314
skip_message_in_main_play_loop_ = true;
315+
// Wait for player to be ready for playback messages from queue i.e. wait for Player:play() to
316+
// be called if not yet and queue to be filled with messages.
317+
{
318+
std::unique_lock<std::mutex> lk(ready_to_play_from_queue_mutex_);
319+
ready_to_play_from_queue_cv_.wait(lk, [this] {return is_ready_to_play_from_queue_;});
320+
}
310321
rosbag2_storage::SerializedBagMessageSharedPtr * message_ptr = peek_next_message_from_queue();
311322

312323
bool next_message_published = false;
@@ -361,7 +372,6 @@ void Player::enqueue_up_to_boundary(uint64_t boundary)
361372

362373
void Player::play_messages_from_queue()
363374
{
364-
playing_messages_from_queue_ = true;
365375
// Note: We need to use message_queue_.peek() instead of message_queue_.try_dequeue(message)
366376
// to support play_next() API logic.
367377
rosbag2_storage::SerializedBagMessageSharedPtr * message_ptr = peek_next_message_from_queue();
@@ -386,7 +396,6 @@ void Player::play_messages_from_queue()
386396
message_ptr = peek_next_message_from_queue();
387397
}
388398
}
389-
playing_messages_from_queue_ = false;
390399
}
391400

392401
void Player::prepare_publishers()

rosbag2_transport/test/rosbag2_transport/test_play_next.cpp

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,8 @@ class MockPlayer : public rosbag2_transport::Player
5151

5252
void wait_for_playback_to_start()
5353
{
54-
while (!playing_messages_from_queue_) {
55-
std::this_thread::sleep_for(std::chrono::milliseconds(10));
56-
}
54+
std::unique_lock<std::mutex> lk(ready_to_play_from_queue_mutex_);
55+
ready_to_play_from_queue_cv_.wait(lk, [this] {return is_ready_to_play_from_queue_;});
5756
}
5857
};
5958

@@ -74,10 +73,8 @@ TEST_F(RosBag2PlayTestFixture, play_next_with_false_preconditions) {
7473

7574
ASSERT_FALSE(player->is_paused());
7675
ASSERT_FALSE(player->play_next());
77-
7876
player->pause();
7977
ASSERT_TRUE(player->is_paused());
80-
ASSERT_FALSE(player->play_next());
8178
}
8279

8380
TEST_F(RosBag2PlayTestFixture, play_next_playing_all_messages_without_delays) {
@@ -167,7 +164,6 @@ TEST_F(RosBag2PlayTestFixture, play_next_playing_one_by_one_messages_with_the_sa
167164
ASSERT_TRUE(player->is_paused());
168165

169166
auto player_future = std::async(std::launch::async, [&player]() -> void {player->play();});
170-
player->wait_for_playback_to_start();
171167

172168
ASSERT_TRUE(player->is_paused());
173169
ASSERT_TRUE(player->play_next());
@@ -221,7 +217,6 @@ TEST_F(RosBag2PlayTestFixture, play_respect_messages_timing_after_play_next) {
221217
ASSERT_TRUE(player->is_paused());
222218

223219
auto player_future = std::async(std::launch::async, [&player]() -> void {player->play();});
224-
player->wait_for_playback_to_start();
225220

226221
ASSERT_TRUE(player->is_paused());
227222
ASSERT_TRUE(player->play_next());
@@ -276,7 +271,6 @@ TEST_F(RosBag2PlayTestFixture, player_can_resume_after_play_next) {
276271
ASSERT_TRUE(player->is_paused());
277272

278273
auto player_future = std::async(std::launch::async, [&player]() -> void {player->play();});
279-
player->wait_for_playback_to_start();
280274

281275
ASSERT_TRUE(player->is_paused());
282276
ASSERT_TRUE(player->play_next());
@@ -333,7 +327,6 @@ TEST_F(RosBag2PlayTestFixture, play_next_playing_only_filtered_topics) {
333327
ASSERT_TRUE(player->is_paused());
334328

335329
auto player_future = std::async(std::launch::async, [&player]() -> void {player->play();});
336-
player->wait_for_playback_to_start();
337330

338331
ASSERT_TRUE(player->is_paused());
339332
ASSERT_TRUE(player->play_next());

0 commit comments

Comments
 (0)