Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 21 additions & 13 deletions ros2bag/ros2bag/verb/play.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
from ros2bag.verb import VerbExtension
from ros2cli.node import NODE_NAME_PREFIX
from rosbag2_py import get_registered_readers
from rosbag2_py import Player
from rosbag2_py import PlayOptions
from rosbag2_py import StorageOptions
import yaml


Expand All @@ -46,7 +49,7 @@ def add_arguments(self, parser, cli_name): # noqa: D102
'-r', '--rate', type=check_positive_float, default=1.0,
help='rate at which to play back messages. Valid range > 0.0.')
parser.add_argument(
'--topics', type=str, default='', nargs='+',
'--topics', type=str, default=[], nargs='+',
help='topics to replay, separated by space. If none specified, all topics will be '
'replayed.')
parser.add_argument(
Expand Down Expand Up @@ -83,20 +86,25 @@ def main(self, *, args): # noqa: D102
if args.storage_config_file:
storage_config_file = args.storage_config_file.name

# NOTE(hidmic): in merged install workspaces on Windows, Python entrypoint lookups
# combined with constrained environments (as imposed by colcon test)
# may result in DLL loading failures when attempting to import a C
# extension. Therefore, do not import rosbag2_transport at the module
# level but on demand, right before first use.
from rosbag2_transport import rosbag2_transport_py
rosbag2_transport_py.play(
topic_remapping = ['--ros-args']
for remap_rule in args.remap:
topic_remapping.append('--remap')
topic_remapping.append(remap_rule)

storage_options = StorageOptions(
uri=args.bag_file,
storage_id=args.storage,
node_prefix=NODE_NAME_PREFIX,
storage_config_uri=storage_config_file,
)
play_options = PlayOptions(
read_ahead_queue_size=args.read_ahead_queue_size,
node_prefix=NODE_NAME_PREFIX,
rate=args.rate,
topics=args.topics,
qos_profile_overrides=qos_profile_overrides,
topics_to_filter=args.topics,
topic_qos_profile_overrides=qos_profile_overrides,
loop=args.loop,
topic_remapping=args.remap,
storage_config_file=storage_config_file)
topic_remapping_options=topic_remapping,
)

player = Player()
player.play(storage_options, play_options)
13 changes: 13 additions & 0 deletions rosbag2_py/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ find_package(ament_cmake_ros REQUIRED)
find_package(rosbag2_compression REQUIRED)
find_package(rosbag2_cpp REQUIRED)
find_package(rosbag2_storage REQUIRED)
find_package(rosbag2_transport REQUIRED)

# Find python before pybind11
find_package(python_cmake_module REQUIRED)
Expand Down Expand Up @@ -106,13 +107,25 @@ ament_target_dependencies(_info PUBLIC
)
clean_windows_flags(_info)

pybind11_add_module(_transport SHARED
src/rosbag2_py/_transport.cpp
)
ament_target_dependencies(_transport PUBLIC
"rosbag2_compression"
"rosbag2_cpp"
"rosbag2_storage"
"rosbag2_transport"
)
clean_windows_flags(_transport)

# Install cython modules as sub-modules of the project
install(
TARGETS
_reader
_storage
_writer
_info
_transport
DESTINATION "${PYTHON_INSTALL_DIR}/${PROJECT_NAME}"
)

Expand Down
1 change: 1 addition & 0 deletions rosbag2_py/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
<depend>rosbag2_compression</depend>
<depend>rosbag2_cpp</depend>
<depend>rosbag2_storage</depend>
<depend>rosbag2_transport</depend>

<exec_depend>rpyutils</exec_depend>

Expand Down
6 changes: 6 additions & 0 deletions rosbag2_py/rosbag2_py/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@
from rosbag2_py._info import (
Info,
)
from rosbag2_py._transport import (
Player,
PlayOptions,
)

__all__ = [
'ConverterOptions',
Expand All @@ -54,4 +58,6 @@
'TopicInformation',
'BagMetadata',
'Info',
'Player',
'PlayOptions',
]
10 changes: 7 additions & 3 deletions rosbag2_py/src/rosbag2_py/_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,13 @@ PYBIND11_MODULE(_storage, m) {

pybind11::class_<rosbag2_storage::StorageOptions>(m, "StorageOptions")
.def(
pybind11::init<std::string, std::string, uint64_t, uint64_t, uint64_t>(),
pybind11::init<std::string, std::string, uint64_t, uint64_t, uint64_t, std::string>(),
pybind11::arg("uri"),
pybind11::arg("storage_id"),
pybind11::arg("max_bagfile_size") = 0,
pybind11::arg("max_bagfile_duration") = 0,
pybind11::arg("max_cache_size") = 0)
pybind11::arg("max_cache_size") = 0,
pybind11::arg("storage_config_uri") = "")
.def_readwrite("uri", &rosbag2_storage::StorageOptions::uri)
.def_readwrite("storage_id", &rosbag2_storage::StorageOptions::storage_id)
.def_readwrite(
Expand All @@ -58,7 +59,10 @@ PYBIND11_MODULE(_storage, m) {
&rosbag2_storage::StorageOptions::max_bagfile_duration)
.def_readwrite(
"max_cache_size",
&rosbag2_storage::StorageOptions::max_cache_size);
&rosbag2_storage::StorageOptions::max_cache_size)
.def_readwrite(
"storage_config_uri",
&rosbag2_storage::StorageOptions::storage_config_uri);

pybind11::class_<rosbag2_storage::StorageFilter>(m, "StorageFilter")
.def(
Expand Down
113 changes: 113 additions & 0 deletions rosbag2_py/src/rosbag2_py/_transport.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <memory>
#include <string>
#include <unordered_map>
#include <vector>

#include "rosbag2_compression/compression_options.hpp"
#include "rosbag2_compression/sequential_compression_reader.hpp"
#include "rosbag2_compression/sequential_compression_writer.hpp"
#include "rosbag2_cpp/reader.hpp"
#include "rosbag2_cpp/readers/sequential_reader.hpp"
#include "rosbag2_cpp/writer.hpp"
#include "rosbag2_cpp/writers/sequential_writer.hpp"
#include "rosbag2_storage/storage_options.hpp"
#include "rosbag2_transport/play_options.hpp"
#include "rosbag2_transport/rosbag2_transport.hpp"

#include "./pybind11.hpp"

using PlayOptions = rosbag2_transport::PlayOptions;
using Rosbag2Transport = rosbag2_transport::Rosbag2Transport;

namespace rosbag2_py
{

class Player
{
public:
Player() = default;
virtual ~Player() = default;

void play(
const rosbag2_storage::StorageOptions & storage_options,
const rosbag2_transport::PlayOptions & play_options)
{
auto writer = std::make_shared<rosbag2_cpp::Writer>(
std::make_unique<rosbag2_cpp::writers::SequentialWriter>());
std::shared_ptr<rosbag2_cpp::Reader> reader = nullptr;
// Determine whether to build compression or regular reader
{
rosbag2_storage::MetadataIo metadata_io{};
rosbag2_storage::BagMetadata metadata{};
if (metadata_io.metadata_file_exists(storage_options.uri)) {
metadata = metadata_io.read_metadata(storage_options.uri);
if (!metadata.compression_format.empty()) {
reader = std::make_shared<rosbag2_cpp::Reader>(
std::make_unique<rosbag2_compression::SequentialCompressionReader>());
}
}
if (reader == nullptr) {
reader = std::make_shared<rosbag2_cpp::Reader>(
std::make_unique<rosbag2_cpp::readers::SequentialReader>());
}
}

Rosbag2Transport impl(reader, writer);
impl.init();
impl.play(storage_options, play_options);
impl.shutdown();
}
};

} // namespace rosbag2_py

PYBIND11_MODULE(_transport, m) {
m.doc() = "Python wrapper of the rosbag2_transport API";

pybind11::class_<PlayOptions>(m, "PlayOptions")
.def(
pybind11::init<
size_t,
std::string,
float,
std::vector<std::string>,
std::unordered_map<std::string, rclcpp::QoS>,
bool,
std::vector<std::string>
>(),
pybind11::arg("read_ahead_queue_size"),
pybind11::arg("node_prefix") = "",
pybind11::arg("rate") = 1.0,
pybind11::arg("topics_to_filter") = std::vector<std::string>{},
pybind11::arg("topic_qos_profile_overrides") = std::unordered_map<std::string, rclcpp::QoS>{},
pybind11::arg("loop") = false,
pybind11::arg("topic_remapping_options") = std::vector<std::string>{}
)
.def_readwrite("read_ahead_queue_size", &PlayOptions::read_ahead_queue_size)
.def_readwrite("node_prefix", &PlayOptions::node_prefix)
.def_readwrite("rate", &PlayOptions::rate)
.def_readwrite("topics_to_filter", &PlayOptions::topics_to_filter)
.def_readwrite("topic_qos_profile_overrides", &PlayOptions::topic_qos_profile_overrides)
.def_readwrite("loop", &PlayOptions::loop)
.def_readwrite("topic_remapping_options", &PlayOptions::topic_remapping_options)
;

pybind11::class_<rosbag2_py::Player>(m, "Player")
.def(pybind11::init())
.def("play", &rosbag2_py::Player::play)
;
}
125 changes: 0 additions & 125 deletions rosbag2_transport/src/rosbag2_transport/rosbag2_transport_python.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -229,127 +229,6 @@ rosbag2_transport_record(PyObject * Py_UNUSED(self), PyObject * args, PyObject *
Py_RETURN_NONE;
}

std::vector<std::string> get_topic_remapping_options(PyObject * topic_remapping)
{
std::vector<std::string> topic_remapping_options = {"--ros-args"};
if (topic_remapping) {
PyObject * topic_remapping_iterator = PyObject_GetIter(topic_remapping);
if (topic_remapping_iterator) {
PyObject * topic;
while ((topic = PyIter_Next(topic_remapping_iterator))) {
topic_remapping_options.emplace_back("--remap");
topic_remapping_options.emplace_back(PyUnicode_AsUTF8(topic));

Py_DECREF(topic);
}
Py_DECREF(topic_remapping_iterator);
}
}
return topic_remapping_options;
}

static PyObject *
rosbag2_transport_play(PyObject * Py_UNUSED(self), PyObject * args, PyObject * kwargs)
{
rosbag2_transport::PlayOptions play_options{};
rosbag2_storage::StorageOptions storage_options{};

static const char * kwlist[] = {
"uri",
"storage_id",
"node_prefix",
"read_ahead_queue_size",
"rate",
"topics",
"qos_profile_overrides",
"loop",
"topic_remapping",
"storage_config_file",
nullptr
};

char * uri;
char * storage_id;
char * node_prefix;
size_t read_ahead_queue_size;
float rate;
PyObject * topics = nullptr;
PyObject * qos_profile_overrides{nullptr};
bool loop = false;
PyObject * topic_remapping = nullptr;
char * storage_config_file = nullptr;
if (!PyArg_ParseTupleAndKeywords(
args, kwargs, "sss|kfOObOs", const_cast<char **>(kwlist),
&uri,
&storage_id,
&node_prefix,
&read_ahead_queue_size,
&rate,
&topics,
&qos_profile_overrides,
&loop,
&topic_remapping,
&storage_config_file))
{
return nullptr;
}

storage_options.uri = std::string(uri);
storage_options.storage_id = std::string(storage_id);
storage_options.storage_config_uri = std::string(storage_config_file);

play_options.node_prefix = std::string(node_prefix);
play_options.read_ahead_queue_size = read_ahead_queue_size;
play_options.rate = rate;
play_options.loop = loop;

if (topics) {
PyObject * topic_iterator = PyObject_GetIter(topics);
if (topic_iterator != nullptr) {
PyObject * topic = nullptr;
while ((topic = PyIter_Next(topic_iterator))) {
play_options.topics_to_filter.emplace_back(PyUnicode_AsUTF8(topic));

Py_DECREF(topic);
}
Py_DECREF(topic_iterator);
}
}

auto topic_qos_overrides = PyObject_AsTopicQoSMap(qos_profile_overrides);
play_options.topic_qos_profile_overrides = topic_qos_overrides;

play_options.topic_remapping_options = get_topic_remapping_options(topic_remapping);

rosbag2_storage::MetadataIo metadata_io{};
rosbag2_storage::BagMetadata metadata{};
// Specify defaults
std::shared_ptr<rosbag2_cpp::Reader> reader;
auto writer = std::make_shared<rosbag2_cpp::Writer>(
std::make_unique<rosbag2_cpp::writers::SequentialWriter>());
// Change reader based on metadata options
if (metadata_io.metadata_file_exists(storage_options.uri)) {
metadata = metadata_io.read_metadata(storage_options.uri);
if (metadata.compression_format == "zstd") {
reader = std::make_shared<rosbag2_cpp::Reader>(
std::make_unique<rosbag2_compression::SequentialCompressionReader>());
} else {
reader = std::make_shared<rosbag2_cpp::Reader>(
std::make_unique<rosbag2_cpp::readers::SequentialReader>());
}
} else {
reader = std::make_shared<rosbag2_cpp::Reader>(
std::make_unique<rosbag2_cpp::readers::SequentialReader>());
}

rosbag2_transport::Rosbag2Transport transport(reader, writer);
transport.init();
transport.play(storage_options, play_options);
transport.shutdown();

Py_RETURN_NONE;
}

/// Define the public methods of this module
#if __GNUC__ >= 8
# pragma GCC diagnostic push
Expand All @@ -360,10 +239,6 @@ static PyMethodDef rosbag2_transport_methods[] = {
"record", reinterpret_cast<PyCFunction>(rosbag2_transport_record), METH_VARARGS | METH_KEYWORDS,
"Record to bag"
},
{
"play", reinterpret_cast<PyCFunction>(rosbag2_transport_play), METH_VARARGS | METH_KEYWORDS,
"Play bag"
},
{nullptr, nullptr, 0, nullptr} /* sentinel */
};
#if __GNUC__ >= 8
Expand Down