Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "rosbag2_storage/metadata_io.hpp"
#include "rosbag2_storage/storage_factory.hpp"
#include "rosbag2_storage/storage_factory_interface.hpp"
#include "rosbag2_storage/storage_filter.hpp"
#include "rosbag2_storage/storage_interfaces/read_only_interface.hpp"

// This is necessary because of using stl types here. It is completely safe, because
Expand Down Expand Up @@ -68,6 +69,8 @@ class ROSBAG2_CPP_PUBLIC SequentialReader

std::vector<rosbag2_storage::TopicMetadata> get_all_topics_and_types() override;

void set_filter(const rosbag2_storage::StorageFilter & storage_filter);
Comment thread
mabelzhang marked this conversation as resolved.
Outdated

/**
* Ask whether there is another database file to read from the list of relative
* file paths.
Expand Down
6 changes: 6 additions & 0 deletions rosbag2_cpp/src/rosbag2_cpp/readers/sequential_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,12 @@ std::vector<rosbag2_storage::TopicMetadata> SequentialReader::get_all_topics_and
throw std::runtime_error("Bag is not open. Call open() before reading.");
}

void SequentialReader::set_filter(
const rosbag2_storage::StorageFilter & storage_filter)
{
storage_->set_filter(storage_filter);
}

bool SequentialReader::has_next_file() const
{
return current_file_iterator_ + 1 != file_paths_.end();
Expand Down
2 changes: 2 additions & 0 deletions rosbag2_cpp/test/rosbag2_cpp/mock_storage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

#include "rosbag2_storage/bag_metadata.hpp"
#include "rosbag2_storage/serialized_bag_message.hpp"
#include "rosbag2_storage/storage_filter.hpp"
#include "rosbag2_storage/storage_interfaces/read_write_interface.hpp"
#include "rosbag2_storage/topic_metadata.hpp"

Expand All @@ -37,6 +38,7 @@ class MockStorage : public rosbag2_storage::storage_interfaces::ReadWriteInterfa
MOCK_METHOD1(write, void(std::shared_ptr<const rosbag2_storage::SerializedBagMessage>));
MOCK_METHOD0(get_all_topics_and_types, std::vector<rosbag2_storage::TopicMetadata>());
MOCK_METHOD0(get_metadata, rosbag2_storage::BagMetadata());
MOCK_METHOD1(set_filter, void(const rosbag2_storage::StorageFilter &));
MOCK_CONST_METHOD0(get_bagfile_size, uint64_t());
MOCK_CONST_METHOD0(get_relative_file_path, std::string());
MOCK_CONST_METHOD0(get_storage_identifier, std::string());
Expand Down
31 changes: 31 additions & 0 deletions rosbag2_storage/include/rosbag2_storage/storage_filter.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright 2020 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef ROSBAG2_STORAGE__STORAGE_FILTER_HPP_
#define ROSBAG2_STORAGE__STORAGE_FILTER_HPP_

#include <string>
#include <vector>

namespace rosbag2_storage
{

struct StorageFilter
{
std::vector<std::string> topics;
Comment thread
mabelzhang marked this conversation as resolved.
};

} // namespace rosbag2_storage

#endif // ROSBAG2_STORAGE__STORAGE_FILTER_HPP_
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include <string>

#include "rosbag2_storage/storage_filter.hpp"
#include "rosbag2_storage/storage_interfaces/base_info_interface.hpp"
#include "rosbag2_storage/storage_interfaces/base_io_interface.hpp"
#include "rosbag2_storage/storage_interfaces/base_read_interface.hpp"
Expand All @@ -38,6 +39,8 @@ class ROSBAG2_STORAGE_PUBLIC ReadOnlyInterface
uint64_t get_bagfile_size() const override = 0;

std::string get_storage_identifier() const override = 0;

virtual void set_filter(const StorageFilter & storage_filter) = 0;
};

} // namespace storage_interfaces
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include <string>

#include "rosbag2_storage/storage_filter.hpp"
#include "rosbag2_storage/storage_interfaces/read_only_interface.hpp"
#include "rosbag2_storage/storage_interfaces/base_write_interface.hpp"
#include "rosbag2_storage/visibility_control.hpp"
Expand All @@ -39,6 +40,8 @@ class ROSBAG2_STORAGE_PUBLIC ReadWriteInterface
std::string get_storage_identifier() const override = 0;

virtual uint64_t get_minimum_split_file_size() const = 0;

void set_filter(const StorageFilter & storage_filter) override = 0;
};

} // namespace storage_interfaces
Expand Down
6 changes: 6 additions & 0 deletions rosbag2_storage/test/rosbag2_storage/test_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,4 +104,10 @@ uint64_t TestPlugin::get_minimum_split_file_size() const
return test_constants::MIN_SPLIT_FILE_SIZE;
}

void TestPlugin::set_filter(
const rosbag2_storage::StorageFilter & /*storage_filter*/)
{
std::cout << "\nsetting storage filter\n";
}

PLUGINLIB_EXPORT_CLASS(TestPlugin, rosbag2_storage::storage_interfaces::ReadWriteInterface)
2 changes: 2 additions & 0 deletions rosbag2_storage/test/rosbag2_storage/test_plugin.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ class TestPlugin : public rosbag2_storage::storage_interfaces::ReadWriteInterfac
std::string get_storage_identifier() const override;

uint64_t get_minimum_split_file_size() const override;

void set_filter(const rosbag2_storage::StorageFilter & storage_filter) override;
};

#endif // ROSBAG2_STORAGE__TEST_PLUGIN_HPP_
Original file line number Diff line number Diff line change
Expand Up @@ -79,4 +79,10 @@ std::string TestReadOnlyPlugin::get_storage_identifier() const
return test_constants::READ_ONLY_PLUGIN_IDENTIFIER;
}

void TestReadOnlyPlugin::set_filter(
const rosbag2_storage::StorageFilter & /*storage_filter*/)
{
std::cout << "\nsetting storage filter\n";
}

PLUGINLIB_EXPORT_CLASS(TestReadOnlyPlugin, rosbag2_storage::storage_interfaces::ReadOnlyInterface)
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <string>
#include <vector>

#include "rosbag2_storage/storage_filter.hpp"
#include "rosbag2_storage/storage_interfaces/read_only_interface.hpp"

class TestReadOnlyPlugin : public rosbag2_storage::storage_interfaces::ReadOnlyInterface
Expand All @@ -41,6 +42,8 @@ class TestReadOnlyPlugin : public rosbag2_storage::storage_interfaces::ReadOnlyI
uint64_t get_bagfile_size() const override;

std::string get_storage_identifier() const override;

void set_filter(const rosbag2_storage::StorageFilter & storage_filter) override;
};

#endif // ROSBAG2_STORAGE__TEST_READ_ONLY_PLUGIN_HPP_
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "rcutils/types.h"
#include "rosbag2_storage/storage_interfaces/read_write_interface.hpp"
#include "rosbag2_storage/serialized_bag_message.hpp"
#include "rosbag2_storage/storage_filter.hpp"
#include "rosbag2_storage/topic_metadata.hpp"
#include "rosbag2_storage_default_plugins/sqlite/sqlite_wrapper.hpp"
#include "rosbag2_storage_default_plugins/visibility_control.hpp"
Expand Down Expand Up @@ -72,6 +73,8 @@ class ROSBAG2_STORAGE_DEFAULT_PLUGINS_PUBLIC SqliteStorage

uint64_t get_minimum_split_file_size() const override;

void set_filter(const rosbag2_storage::StorageFilter & storage_filter) override;

private:
void initialize();
void prepare_for_writing();
Expand All @@ -90,6 +93,7 @@ class ROSBAG2_STORAGE_DEFAULT_PLUGINS_PUBLIC SqliteStorage
std::unordered_map<std::string, int> topics_;
std::vector<rosbag2_storage::TopicMetadata> all_topics_and_types_;
std::string relative_path_;
rosbag2_storage::StorageFilter storage_filter_ {};
Comment thread
Karsten1987 marked this conversation as resolved.
};

} // namespace rosbag2_storage_plugins
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ void SqliteStorage::open(

ROSBAG2_STORAGE_DEFAULT_PLUGINS_LOG_INFO_STREAM(
"Opened database '" << relative_path_ << "' for " << to_string(io_flag) << ".");

storage_filter_.topics.clear();
}

void SqliteStorage::write(std::shared_ptr<const rosbag2_storage::SerializedBagMessage> message)
Expand Down Expand Up @@ -211,10 +213,27 @@ void SqliteStorage::prepare_for_writing()

void SqliteStorage::prepare_for_reading()
{
read_statement_ = database_->prepare_statement(
"SELECT data, timestamp, topics.name "
"FROM messages JOIN topics ON messages.topic_id = topics.id "
"ORDER BY messages.timestamp;");
if (!storage_filter_.topics.empty()) {
// Construct string for selected topics
std::string topic_list{""};
for (auto & topic : storage_filter_.topics) {
topic_list += "'" + topic + "'";
if (&topic != &storage_filter_.topics.back()) {
topic_list += ",";
}
}

read_statement_ = database_->prepare_statement(
"SELECT data, timestamp, topics.name "
"FROM messages JOIN topics ON messages.topic_id = topics.id "
"WHERE topics.name IN (" + topic_list + ")"
Comment thread
Karsten1987 marked this conversation as resolved.
"ORDER BY messages.timestamp;");
} else {
read_statement_ = database_->prepare_statement(
"SELECT data, timestamp, topics.name "
"FROM messages JOIN topics ON messages.topic_id = topics.id "
"ORDER BY messages.timestamp;");
}
message_result_ = read_statement_->execute_query<
std::shared_ptr<rcutils_uint8_array_t>, rcutils_time_point_value_t, std::string>();
current_message_row_ = message_result_.begin();
Expand Down Expand Up @@ -292,6 +311,12 @@ rosbag2_storage::BagMetadata SqliteStorage::get_metadata()
return metadata;
}

void SqliteStorage::set_filter(
const rosbag2_storage::StorageFilter & storage_filter)
{
storage_filter_ = storage_filter;
}

} // namespace rosbag2_storage_plugins

#include "pluginlib/class_list_macros.hpp" // NOLINT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

#include <gmock/gmock.h>

#include <rosbag2_storage/storage_filter.hpp>

Comment thread
mabelzhang marked this conversation as resolved.
Outdated
#include <memory>
#include <string>
#include <tuple>
Expand Down Expand Up @@ -111,6 +113,33 @@ TEST_F(StorageTestFixture, get_next_returns_messages_in_timestamp_order) {
EXPECT_FALSE(readable_storage->has_next());
}

TEST_F(StorageTestFixture, read_next_returns_filtered_messages) {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would appreciate having this test actually happening in rosbag2_cpp so it can be applied to more than just SQLite3.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had trouble creating the test. Can you suggest a test file in rosbag2_cpp that I can look at as an example of how to put messages into a reader? I tried adding it in rosbag2_cpp/test/rosbag2_cpp/test_sequential_reader.cpp, but I'm not able to get a valid object returned from read_next() - I get null pointers. Maybe because the object is a MockStorage?

The other problem is since the actual filtering is only implemented in sqlite3 right now, we wouldn't be able to get the right filtering behavior from a SequentialReader using MockStorage, right?

What I have done locally is I added a new class to test_sequential_reader.cpp, changed the reader_ object from a Reader to a SequentialReader, since the set_filter() is only in SequentialReader right now. If we want it to be in Reader, it also needs to be in BaseReadInterface and rosbag2_compression::SequentialCompressionReader. Do we want to define set_filter() beyond SequentialReader?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess you have two options here. Either you modify the mock functions in a way to take the filter logic into account or you shift the test over to rosbag2_tests where we have a fully set up architecture. This also runs the sqlite3 plugin and still gives you the opportunity to test the c++ API.

Do you think it makes sense to apply the filter to each individual reader instance? Then we should add the two functions (set_filter()/reset_filter()). I would assume that these filters are generic enough (filter by topic etc) to apply for each reader.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I spent some time putting the tests in rosbag2_cpp and rosbag2_tests. I did both halfway, and I think I finally figured out what the tests in these 3 packages do.

In the rosbag2_cpp tests, test_sequential_reader.cpp is intentionally using a mock object in mock_storage.cpp so that it tests the functions in the SequentialReader only, just to verify that reader_ calls functions inside storage_. The tests here are independent of the Storage object.

So with re/set_filter(), in the current setup, all that should be tested in rosbag2_cpp is that reader_.set_filter() will call storage_.set_filter(), and no checks on whether the returned objects are the correct topic names. That can be added - I have this locally.

The tests in rosbag2_storage test the actual function of the storage. So set_filter() here should be tested with the actual returned messages' topic names. This is the present state of the PR. I think we should keep these tests, because there's nowhere else that does unit test on the actual functionality of the Storage objects.

The tests in rosbag2_tests execute the command line, which would require implementing the topic filter in rosbag2_transport. That has not been implemented yet. It would require enough changes that maybe it should be in a separate PR? With rosbag2_transport tests, test_play.cpp uses a mock_sequential_reader.cpp mock object, which has its functions implemented. This can be updated with set_filter() to mock the functionality of the storage - I have this locally.

So my question is what we want to go into this PR - I have a mess of both routes on my local computer, with new tests in all 3 packages, and I haven't pushed anything. I think both the rosbag2_storage and rosbag2_cpp tests should be kept. In that case, I would just add the few lines to rosbag2_cpp tests as mentioned above to this PR, and leave this test in rosbag2_storage as well.

If we also want to add end-to-end tests and implement a new flag on rosbag2_transport, I can put that in a new branch and open a new PR. Otherwise I would just get rid of those changes. That option did not exist in ROS 1 bag playback and I don't know what flag we want to make it - maybe like ROS 1 bag record where -O is for the file name, and a subsequent list is the selective topics.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with basically everything you said here.

I would still appreciate some basic tests in rosbag2_cpp for things like what happens if you call the set_filter function twice in a row or test for the exception to be thrown if the storage is not initialized. That shouldn't take too much time, I think.

The functionality can be tested in the sqlite3 implementation of it (I think that's what you meant with rosbag2_storage, but I think it's indeed the default storage plugin).
But I would also assume we could get around rosbag2_transport or the ros2cli in the system tests (rosbag2_tests) by directly calling the rosbag2_cpp API. But I am okay with putting this in a separate PR to address once the option is exposed through the command line interface. I would ask you though in this case to open a new ticket for it to keep track of the outstanding work. It's up to you, really, which way you want to go. I am okay with both.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay. I added basic tests in rosbag2_cpp in 5e09b58.

I tried making a new test in rosbag2_tests and calling the SequentialReader::open() in rosbag2_cpp to open the bag in rosbag2_tests/resources/cdr_test. For some reason it doesn't open:

[22.820s] 4: [ERROR] [1585629564.565606326] [rosbag2_storage]: Could not open 'cdr_test.db3' with 'sqlite3'. Error: Failed to read from bag: File 'cdr_test.db3' does not exist!
[22.820s] 4: [ERROR] [1585629564.565750830] [rosbag2_storage]: Could not load/open plugin with storage id 'sqlite3'.

In fact, I cannot even open cdr_test and wrong_rmw_test with ros2 bag play on the command line. I get the same error:

$ ros2 bag play cdr_test
[ERROR] [rosbag2_storage]: Could not open 'cdr_test.db3' with 'sqlite3'. Error: Failed to read from bag: File 'cdr_test.db3' does not exist!
[ERROR] [rosbag2_storage]: Could not load/open plugin with storage id 'sqlite3'.
[ERROR] [rosbag2_transport]: Failed to play: No storage could be initialized. Abort

But ros2 bag info cdr_test works.
Is this expected? If so, then I won't be able to call Reader directly from rosbag2_tests. I will open a new ticket to expose the topic selection to rosbag2_transport, and then test using the existing test_rosbag2_play_end_to_end.cpp.

std::vector<std::tuple<std::string, int64_t, std::string, std::string, std::string>>
string_messages =
{std::make_tuple("topic1 message", 1, "topic1", "", ""),
std::make_tuple("topic2 message", 2, "topic2", "", ""),
std::make_tuple("topic3 message", 3, "topic3", "", "")};

write_messages_to_sqlite(string_messages);
std::unique_ptr<rosbag2_storage::storage_interfaces::ReadOnlyInterface> readable_storage =
std::make_unique<rosbag2_storage_plugins::SqliteStorage>();

auto db_filename = (rcpputils::fs::path(temporary_dir_path_) / "rosbag.db3").string();
readable_storage->open(db_filename);

rosbag2_storage::StorageFilter storage_filter;
storage_filter.topics.push_back("topic2");
storage_filter.topics.push_back("topic3");
readable_storage->set_filter(storage_filter);

EXPECT_TRUE(readable_storage->has_next());
auto first_message = readable_storage->read_next();
EXPECT_THAT(first_message->topic_name, Eq("topic2"));
EXPECT_TRUE(readable_storage->has_next());
auto second_message = readable_storage->read_next();
EXPECT_THAT(second_message->topic_name, Eq("topic3"));
Comment thread
mabelzhang marked this conversation as resolved.
}

TEST_F(StorageTestFixture, get_all_topics_and_types_returns_the_correct_vector) {
std::unique_ptr<rosbag2_storage::storage_interfaces::ReadWriteInterface> writable_storage =
std::make_unique<rosbag2_storage_plugins::SqliteStorage>();
Expand Down