From eb0290a3c3359782511de03512d73b6d5d5efe62 Mon Sep 17 00:00:00 2001 From: jparisu Date: Thu, 15 Sep 2022 11:44:36 +0200 Subject: [PATCH] New Enchanced Thread Pool Implementation Signed-off-by: jparisu --- ddsrouter_cmake/cmake/test/test_target.cmake | 27 +- .../thread/connector/OneShotConnector.hpp | 51 +++ .../thread/connector/SlotConnector.hpp | 65 ++++ .../connector/impl/OneShotConnector.ipp | 75 ++++ .../thread/connector/impl/SlotConnector.ipp | 62 ++++ .../thread/manager/AsyncManager.hpp | 68 ++++ .../thread/manager/IManager.hpp | 40 ++ .../thread/manager/StdThreadPool.hpp | 86 +++++ .../thread/manager/SyncManager.hpp | 49 +++ .../thread/task/ArgsOwnedTask.hpp | 109 ++++++ .../ddsrouter_utils/thread/task/ITask.hpp | 41 +++ .../ddsrouter_utils/thread/task/OwnedTask.hpp | 54 +++ .../thread/task/ReferenceTask.hpp | 50 +++ .../thread/task/impl/ArgsOwnedTask.ipp | 52 +++ .../thread/thread/CustomThread.hpp | 47 +++ .../wait/ConsumerWaitHandler.hpp | 16 +- .../wait/DBQueueWaitHandler.hpp | 4 - .../wait/impl/ConsumerWaitHandler.ipp | 2 +- .../wait/impl/DBQueueWaitHandler.ipp | 14 +- .../src/cpp/thread/manager/AsyncManager.cpp | 75 ++++ .../src/cpp/thread/manager/StdThreadPool.cpp | 136 +++++++ .../src/cpp/thread/manager/SyncManager.cpp | 35 ++ .../src/cpp/thread/task/OwnedTask.cpp | 47 +++ .../src/cpp/thread/task/ReferenceTask.cpp | 45 +++ ddsrouter_utils/test/unittest/CMakeLists.txt | 2 +- .../test/unittest/thread/CMakeLists.txt | 18 + .../unittest/thread/connector/CMakeLists.txt | 80 ++++ .../connector/one_shot_connector_test.cpp | 346 ++++++++++++++++++ .../thread/connector/slot_connector_test.cpp | 333 +++++++++++++++++ .../unittest/thread/manager/CMakeLists.txt | 90 +++++ .../thread/manager/manager_interface_test.cpp | 152 ++++++++ .../thread/manager/std_thread_pool_test.cpp | 170 +++++++++ .../task}/CMakeLists.txt | 24 +- .../thread/task/task_interface_test.cpp | 159 ++++++++ .../thread_pool/pool/SlotThreadPool.hpp | 170 +++++++++ .../thread_pool/slot_thread_pool_test.cpp | 180 --------- .../test/unittest/thread_pool/task/Task.hpp | 45 +++ .../test/unittest/thread_pool/task/TaskId.hpp | 48 +++ .../thread_pool/thread/CustomThread.hpp | 45 +++ .../unittest/wait/DBQueueWaitHandlerTest.cpp | 4 +- 40 files changed, 2883 insertions(+), 233 deletions(-) create mode 100644 ddsrouter_utils/include/ddsrouter_utils/thread/connector/OneShotConnector.hpp create mode 100644 ddsrouter_utils/include/ddsrouter_utils/thread/connector/SlotConnector.hpp create mode 100644 ddsrouter_utils/include/ddsrouter_utils/thread/connector/impl/OneShotConnector.ipp create mode 100644 ddsrouter_utils/include/ddsrouter_utils/thread/connector/impl/SlotConnector.ipp create mode 100644 ddsrouter_utils/include/ddsrouter_utils/thread/manager/AsyncManager.hpp create mode 100644 ddsrouter_utils/include/ddsrouter_utils/thread/manager/IManager.hpp create mode 100644 ddsrouter_utils/include/ddsrouter_utils/thread/manager/StdThreadPool.hpp create mode 100644 ddsrouter_utils/include/ddsrouter_utils/thread/manager/SyncManager.hpp create mode 100644 ddsrouter_utils/include/ddsrouter_utils/thread/task/ArgsOwnedTask.hpp create mode 100644 ddsrouter_utils/include/ddsrouter_utils/thread/task/ITask.hpp create mode 100644 ddsrouter_utils/include/ddsrouter_utils/thread/task/OwnedTask.hpp create mode 100644 ddsrouter_utils/include/ddsrouter_utils/thread/task/ReferenceTask.hpp create mode 100644 ddsrouter_utils/include/ddsrouter_utils/thread/task/impl/ArgsOwnedTask.ipp create mode 100644 ddsrouter_utils/include/ddsrouter_utils/thread/thread/CustomThread.hpp create mode 100644 ddsrouter_utils/src/cpp/thread/manager/AsyncManager.cpp create mode 100644 ddsrouter_utils/src/cpp/thread/manager/StdThreadPool.cpp create mode 100644 ddsrouter_utils/src/cpp/thread/manager/SyncManager.cpp create mode 100644 ddsrouter_utils/src/cpp/thread/task/OwnedTask.cpp create mode 100644 ddsrouter_utils/src/cpp/thread/task/ReferenceTask.cpp create mode 100644 ddsrouter_utils/test/unittest/thread/CMakeLists.txt create mode 100644 ddsrouter_utils/test/unittest/thread/connector/CMakeLists.txt create mode 100644 ddsrouter_utils/test/unittest/thread/connector/one_shot_connector_test.cpp create mode 100644 ddsrouter_utils/test/unittest/thread/connector/slot_connector_test.cpp create mode 100644 ddsrouter_utils/test/unittest/thread/manager/CMakeLists.txt create mode 100644 ddsrouter_utils/test/unittest/thread/manager/manager_interface_test.cpp create mode 100644 ddsrouter_utils/test/unittest/thread/manager/std_thread_pool_test.cpp rename ddsrouter_utils/test/unittest/{thread_pool => thread/task}/CMakeLists.txt (80%) create mode 100644 ddsrouter_utils/test/unittest/thread/task/task_interface_test.cpp create mode 100644 ddsrouter_utils/test/unittest/thread_pool/pool/SlotThreadPool.hpp delete mode 100644 ddsrouter_utils/test/unittest/thread_pool/slot_thread_pool_test.cpp create mode 100644 ddsrouter_utils/test/unittest/thread_pool/task/Task.hpp create mode 100644 ddsrouter_utils/test/unittest/thread_pool/task/TaskId.hpp create mode 100644 ddsrouter_utils/test/unittest/thread_pool/thread/CustomThread.hpp diff --git a/ddsrouter_cmake/cmake/test/test_target.cmake b/ddsrouter_cmake/cmake/test/test_target.cmake index 82162512a..5645b95be 100644 --- a/ddsrouter_cmake/cmake/test/test_target.cmake +++ b/ddsrouter_cmake/cmake/test/test_target.cmake @@ -58,15 +58,24 @@ function(add_test_executable TEST_EXECUTABLE_NAME TEST_SOURCES TEST_NAME TEST_LI get_win32_path_dependencies(${TEST_EXECUTABLE_NAME} TEST_FRIENDLY_PATH) - foreach(test_name ${TEST_LIST}) - add_test(NAME ${TEST_NAME}.${test_name} - COMMAND ${TEST_EXECUTABLE_NAME} - --gtest_filter=${TEST_NAME}.${test_name}:**/${TEST_NAME}.${test_name}/**) - - if(TEST_FRIENDLY_PATH) - set_tests_properties(${TEST_NAME}.${test_name} PROPERTIES ENVIRONMENT "PATH=${TEST_FRIENDLY_PATH}") - endif(TEST_FRIENDLY_PATH) - endforeach() + if( TEST_LIST ) + # If list of tests is not empty, add each test separatly + foreach(test_name ${TEST_LIST}) + add_test(NAME ${TEST_NAME}.${test_name} + COMMAND ${TEST_EXECUTABLE_NAME} + --gtest_filter=${TEST_NAME}**.${test_name}:**/${TEST_NAME}**.${test_name}/**) + + if(TEST_FRIENDLY_PATH) + set_tests_properties(${TEST_NAME}.${test_name} PROPERTIES ENVIRONMENT "PATH=${TEST_FRIENDLY_PATH}") + endif(TEST_FRIENDLY_PATH) + endforeach() + else() + # If no tests are provided, create a single test + message(STATUS "Creating general test ${TEST_NAME}.") + add_test(NAME ${TEST_NAME} + COMMAND ${TEST_EXECUTABLE_NAME}) + endif( TEST_LIST ) + target_compile_definitions(${TEST_EXECUTABLE_NAME} PRIVATE FASTDDS_ENFORCE_LOG_INFO diff --git a/ddsrouter_utils/include/ddsrouter_utils/thread/connector/OneShotConnector.hpp b/ddsrouter_utils/include/ddsrouter_utils/thread/connector/OneShotConnector.hpp new file mode 100644 index 000000000..1e5566f7e --- /dev/null +++ b/ddsrouter_utils/include/ddsrouter_utils/thread/connector/OneShotConnector.hpp @@ -0,0 +1,51 @@ +// Copyright 2022 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file OwnedTask.hpp + * + * This file contains class Task definition. + */ + +#pragma once + +#include + +#include +#include + +namespace eprosima { +namespace ddsrouter { +namespace utils { +namespace thread { + +template +class OneShotConnector +{ +public: + + static void execute(IManager* tp, const std::function& callback, Args... args); + + static void execute(IManager* tp, std::function&& callback, Args... args); + +}; +using SimpleOneShotConnector = OneShotConnector<>; + +} /* namespace thread */ +} /* namespace utils */ +} /* namespace ddsrouter */ +} /* namespace eprosima */ + +// Include implementation template file +#include diff --git a/ddsrouter_utils/include/ddsrouter_utils/thread/connector/SlotConnector.hpp b/ddsrouter_utils/include/ddsrouter_utils/thread/connector/SlotConnector.hpp new file mode 100644 index 000000000..0d4366e1a --- /dev/null +++ b/ddsrouter_utils/include/ddsrouter_utils/thread/connector/SlotConnector.hpp @@ -0,0 +1,65 @@ +// Copyright 2022 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file SlotConnector.hpp + * + * This file contains class SlotConnector definition. + */ + +#pragma once + +#include + +#include +#include + +namespace eprosima { +namespace ddsrouter { +namespace utils { +namespace thread { + +template +class SlotConnector +{ +public: + + SlotConnector( + IManager* manager, + const std::function& callback); + + SlotConnector( + IManager* manager, + std::function&& callback); + + ~SlotConnector() = default; + + void execute(Args...); + +protected: + + IManager* manager_; + + std::function callback_; + +}; +using SimpleSlotConnector = SlotConnector<>; + +} /* namespace thread */ +} /* namespace utils */ +} /* namespace ddsrouter */ +} /* namespace eprosima */ + +// Include implementation template file +#include diff --git a/ddsrouter_utils/include/ddsrouter_utils/thread/connector/impl/OneShotConnector.ipp b/ddsrouter_utils/include/ddsrouter_utils/thread/connector/impl/OneShotConnector.ipp new file mode 100644 index 000000000..36dd5ec2b --- /dev/null +++ b/ddsrouter_utils/include/ddsrouter_utils/thread/connector/impl/OneShotConnector.ipp @@ -0,0 +1,75 @@ +// Copyright 2022 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file OneShotConnector.hpp + * + * This file contains class OneShotConnector implementation. + */ + +#pragma once + +#include + +namespace eprosima { +namespace ddsrouter { +namespace utils { +namespace thread { + +template +void OneShotConnector::execute( + IManager* manager, + const std::function& callback, + Args... args) +{ + manager->execute( + std::make_unique>( + callback, + args... + ) + ); +} + +template +void OneShotConnector::execute( + IManager* manager, + std::function&& callback, + Args... args) +{ + manager->execute( + std::make_unique>( + std::move(callback), + args... + ) + ); +} + +// template +// void OneShotConnector::execute( +// IManager* manager, +// std::function callback, +// Args... args) +// { +// manager->execute( +// std::make_unique>( +// callback, +// args... +// ) +// ); +// } + +} /* namespace thread */ +} /* namespace event */ +} /* namespace ddsrouter */ +} /* namespace eprosima */ diff --git a/ddsrouter_utils/include/ddsrouter_utils/thread/connector/impl/SlotConnector.ipp b/ddsrouter_utils/include/ddsrouter_utils/thread/connector/impl/SlotConnector.ipp new file mode 100644 index 000000000..a4e2c7ad5 --- /dev/null +++ b/ddsrouter_utils/include/ddsrouter_utils/thread/connector/impl/SlotConnector.ipp @@ -0,0 +1,62 @@ +// Copyright 2022 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file SlotConnector.hpp + * + * This file contains class SlotConnector implementation. + */ + +#pragma once + +#include + +namespace eprosima { +namespace ddsrouter { +namespace utils { +namespace thread { + +template +SlotConnector::SlotConnector( + IManager* manager, + const std::function& callback) + : manager_(manager) + , callback_(callback) +{ +} + +template +SlotConnector::SlotConnector( + IManager* manager, + std::function&& callback) + : manager_(manager) + , callback_(std::move(callback)) +{ +} + +template +void SlotConnector::execute(Args... args) +{ + manager_->execute( + std::make_unique>( + callback_, + args... + ) + ); +} + +} /* namespace thread */ +} /* namespace event */ +} /* namespace ddsrouter */ +} /* namespace eprosima */ diff --git a/ddsrouter_utils/include/ddsrouter_utils/thread/manager/AsyncManager.hpp b/ddsrouter_utils/include/ddsrouter_utils/thread/manager/AsyncManager.hpp new file mode 100644 index 000000000..e027e8db2 --- /dev/null +++ b/ddsrouter_utils/include/ddsrouter_utils/thread/manager/AsyncManager.hpp @@ -0,0 +1,68 @@ +// Copyright 2022 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file AsyncManager.hpp + * + * This file contains class AsyncManager definition. + */ + +#pragma once + +#include +#include +#include +#include + +#include +#include +#include +#include + +namespace eprosima { +namespace ddsrouter { +namespace utils { +namespace thread { + +using TasksCollectionType = + Atomicable< + std::vector< + std::pair< + std::unique_ptr, + std::unique_ptr>>>; + +/** + * TODO + */ +class AsyncManager : public IManager +{ +public: + + AsyncManager() = default; + + ~AsyncManager(); + + virtual void execute(std::unique_ptr&& task) override; + + void clean_threads(); + +protected: + + TasksCollectionType tasks_running_; +}; + +} /* namespace thread */ +} /* namespace utils */ +} /* namespace ddsrouter */ +} /* namespace eprosima */ diff --git a/ddsrouter_utils/include/ddsrouter_utils/thread/manager/IManager.hpp b/ddsrouter_utils/include/ddsrouter_utils/thread/manager/IManager.hpp new file mode 100644 index 000000000..a093e86f2 --- /dev/null +++ b/ddsrouter_utils/include/ddsrouter_utils/thread/manager/IManager.hpp @@ -0,0 +1,40 @@ +// Copyright 2022 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file IManager.hpp + * + * This file contains class SlotThreadPool definition. + */ + +#pragma once + +#include + +namespace eprosima { +namespace ddsrouter { +namespace utils { +namespace thread { + +class IManager +{ +public: + virtual ~IManager() {}; + virtual void execute(std::unique_ptr&& task) = 0; +}; + +} /* namespace thread */ +} /* namespace utils */ +} /* namespace ddsrouter */ +} /* namespace eprosima */ diff --git a/ddsrouter_utils/include/ddsrouter_utils/thread/manager/StdThreadPool.hpp b/ddsrouter_utils/include/ddsrouter_utils/thread/manager/StdThreadPool.hpp new file mode 100644 index 000000000..c3035026b --- /dev/null +++ b/ddsrouter_utils/include/ddsrouter_utils/thread/manager/StdThreadPool.hpp @@ -0,0 +1,86 @@ +// Copyright 2022 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file StdThreadPool.hpp + * + * This file contains class StdThreadPool definition. + */ + +#pragma once + +#include +#include +#include + +#include +#include +#include +#include + +namespace eprosima { +namespace ddsrouter { +namespace utils { +namespace thread { + +/** + * TODO + */ +class StdThreadPool : public IManager +{ +public: + + StdThreadPool( + unsigned int n_threads, + bool start_running = true); + + virtual ~StdThreadPool(); + + void start(); + + void stop(); + + virtual void execute(std::unique_ptr&& task) override; + +protected: + + void thread_routine_(); + + /** + * @brief Double Queue Wait Handler to store task ids + * + * This double queue implement methods \c produce , to add tasks to the queue, and \c consume to wait until any + * task is available, and return the next task available. + * + * It will retrieve tasks in FIFO order. + * Produce and consume methods are not reciprocally blocking. + */ + event::DBQueueWaitHandler> task_queue_; + + /** + * @brief Threads container + * + * @note \c CustomThread are used instead of \c std::thread so some extra logic could be added to threads + * in future implementation (e.g. performance info). + */ + std::vector threads_; + + const unsigned int n_threads_; + +}; + +} /* namespace thread */ +} /* namespace utils */ +} /* namespace ddsrouter */ +} /* namespace eprosima */ diff --git a/ddsrouter_utils/include/ddsrouter_utils/thread/manager/SyncManager.hpp b/ddsrouter_utils/include/ddsrouter_utils/thread/manager/SyncManager.hpp new file mode 100644 index 000000000..bc1310ed6 --- /dev/null +++ b/ddsrouter_utils/include/ddsrouter_utils/thread/manager/SyncManager.hpp @@ -0,0 +1,49 @@ +// Copyright 2022 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file SyncManager.hpp + * + * This file contains class SyncManager definition. + */ + +#pragma once + +#include +#include +#include + +#include +#include + +namespace eprosima { +namespace ddsrouter { +namespace utils { +namespace thread { + +/** + * TODO + */ +class SyncManager : public IManager +{ +public: + // virtual void execute(const ITask& task) override; + // virtual void execute(ITask&& task) override; + virtual void execute(std::unique_ptr&& task) override; +}; + +} /* namespace thread */ +} /* namespace utils */ +} /* namespace ddsrouter */ +} /* namespace eprosima */ diff --git a/ddsrouter_utils/include/ddsrouter_utils/thread/task/ArgsOwnedTask.hpp b/ddsrouter_utils/include/ddsrouter_utils/thread/task/ArgsOwnedTask.hpp new file mode 100644 index 000000000..4bacbc4f2 --- /dev/null +++ b/ddsrouter_utils/include/ddsrouter_utils/thread/task/ArgsOwnedTask.hpp @@ -0,0 +1,109 @@ +// Copyright 2022 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file ArgsOwnedTask.hpp + * + * This file contains class Task definition. + */ + +#pragma once + +#include +#include + +#include + +namespace eprosima { +namespace ddsrouter { +namespace utils { +namespace thread { + +namespace helper +{ + template + struct index {}; + + template + struct gen_seq : gen_seq {}; + + template + struct gen_seq<0, Ts...> : index {}; +} + +template +class ArgsOwnedTask : public ITask +{ +public: + + ArgsOwnedTask( + std::function callback, + const Args&... args); + + void operator()() noexcept override; + +protected: + + template + void call_internal_callback_(helper::index) + { + callback_(std::get(args_)...); + } + + std::function callback_; + std::tuple args_; + +}; + + +// template +// class ArgsOwnedTask : public ITask +// { +// public: + +// ArgsOwnedTask( +// const std::function& callback, +// const Args&... args); + +// // ArgsOwnedTask( +// // std::function&& callback, +// // Args... args); + +// virtual ~ArgsOwnedTask() = default; + +// template +// void call_(helper::index) +// { +// callback_(std::get(args_)...); +// } + +// virtual void operator()() noexcept override +// { +// call_(helper::gen_seq{}); +// } + +// protected: + +// std::function callback_; + +// std::tuple args_; +// }; + +} /* namespace thread */ +} /* namespace utils */ +} /* namespace ddsrouter */ +} /* namespace eprosima */ + +// Include implementation template file +#include diff --git a/ddsrouter_utils/include/ddsrouter_utils/thread/task/ITask.hpp b/ddsrouter_utils/include/ddsrouter_utils/thread/task/ITask.hpp new file mode 100644 index 000000000..ef11f38e6 --- /dev/null +++ b/ddsrouter_utils/include/ddsrouter_utils/thread/task/ITask.hpp @@ -0,0 +1,41 @@ +// Copyright 2022 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file ITask.hpp + * + * This file contains class Task definition. + */ + +#ifndef _DDSROUTERTHREAD_THREAD_TASK_ITASK_HPP_ +#define _DDSROUTERTHREAD_THREAD_TASK_ITASK_HPP_ + +namespace eprosima { +namespace ddsrouter { +namespace utils { +namespace thread { + +class ITask +{ +public: + virtual ~ITask() {}; + virtual void operator()() noexcept = 0; +}; + +} /* namespace thread */ +} /* namespace utils */ +} /* namespace ddsrouter */ +} /* namespace eprosima */ + +#endif /* _DDSROUTERTHREAD_THREAD_TASK_ITASK_HPP_ */ diff --git a/ddsrouter_utils/include/ddsrouter_utils/thread/task/OwnedTask.hpp b/ddsrouter_utils/include/ddsrouter_utils/thread/task/OwnedTask.hpp new file mode 100644 index 000000000..17e12ad04 --- /dev/null +++ b/ddsrouter_utils/include/ddsrouter_utils/thread/task/OwnedTask.hpp @@ -0,0 +1,54 @@ +// Copyright 2022 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file OwnedTask.hpp + * + * This file contains class Task definition. + */ + +#ifndef _DDSROUTERTHREAD_TASK_TASK_HPP_ +#define _DDSROUTERTHREAD_TASK_TASK_HPP_ + +#include + +#include + +namespace eprosima { +namespace ddsrouter { +namespace utils { +namespace thread { + +class OwnedTask : public ITask +{ +public: + + OwnedTask(const std::function& callback); + + OwnedTask(std::function&& callback); + + virtual ~OwnedTask() = default; + + virtual void operator()() noexcept override; + + const std::function callback; +}; + + +} /* namespace thread */ +} /* namespace utils */ +} /* namespace ddsrouter */ +} /* namespace eprosima */ + +#endif /* _DDSROUTERTHREAD_TASK_TASK_HPP_ */ diff --git a/ddsrouter_utils/include/ddsrouter_utils/thread/task/ReferenceTask.hpp b/ddsrouter_utils/include/ddsrouter_utils/thread/task/ReferenceTask.hpp new file mode 100644 index 000000000..4b7e519af --- /dev/null +++ b/ddsrouter_utils/include/ddsrouter_utils/thread/task/ReferenceTask.hpp @@ -0,0 +1,50 @@ +// Copyright 2022 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file ReferenceTask.hpp + * + * This file contains class Task definition. + */ + +#pragma once + +#include + +#include + +namespace eprosima { +namespace ddsrouter { +namespace utils { +namespace thread { + +class ReferenceTask : public ITask +{ +public: + + ReferenceTask(const std::function* callback_ptr); + + virtual ~ReferenceTask() = default; + + virtual void operator()() noexcept override; + + const std::function* callback_ptr; + +}; + + +} /* namespace thread */ +} /* namespace utils */ +} /* namespace ddsrouter */ +} /* namespace eprosima */ diff --git a/ddsrouter_utils/include/ddsrouter_utils/thread/task/impl/ArgsOwnedTask.ipp b/ddsrouter_utils/include/ddsrouter_utils/thread/task/impl/ArgsOwnedTask.ipp new file mode 100644 index 000000000..f359b5e9d --- /dev/null +++ b/ddsrouter_utils/include/ddsrouter_utils/thread/task/impl/ArgsOwnedTask.ipp @@ -0,0 +1,52 @@ +// Copyright 2022 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file ArgsOwnedTask.ipp + * + * This file contains class OneShotConnector implementation. + */ + +#pragma once + +namespace eprosima { +namespace ddsrouter { +namespace utils { +namespace thread { + +template +ArgsOwnedTask::ArgsOwnedTask( + std::function callback, + const Args&... args) + : callback_(callback) + , args_(args...) +{ +} + +template +void ArgsOwnedTask::operator()() noexcept +{ + call_internal_callback_(helper::gen_seq{}); +} + +// template +// void ArgsOwnedTask::call_internal_callback_(helper::index) +// { +// callback_(std::get(args_)...); +// } + +} /* namespace thread */ +} /* namespace event */ +} /* namespace ddsrouter */ +} /* namespace eprosima */ diff --git a/ddsrouter_utils/include/ddsrouter_utils/thread/thread/CustomThread.hpp b/ddsrouter_utils/include/ddsrouter_utils/thread/thread/CustomThread.hpp new file mode 100644 index 000000000..359af09f4 --- /dev/null +++ b/ddsrouter_utils/include/ddsrouter_utils/thread/thread/CustomThread.hpp @@ -0,0 +1,47 @@ +// Copyright 2022 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file CustomThread.hpp + * + * This file contains class CustomThread definition. + */ + +#pragma once + +#include + +#include + +namespace eprosima { +namespace ddsrouter { +namespace utils { +namespace thread { + +/** + * This class represents a thread that can be executed by a Thread Pool. + * + * @note this first implementation only uses this class as a \c std::thread for simplicity. + * In future implementations, this could be a more complex class. + */ +class CustomThread : public std::thread +{ +public: + using std::thread::thread; +}; + +} /* namespace thread */ +} /* namespace utils */ +} /* namespace ddsrouter */ +} /* namespace eprosima */ diff --git a/ddsrouter_utils/include/ddsrouter_utils/wait/ConsumerWaitHandler.hpp b/ddsrouter_utils/include/ddsrouter_utils/wait/ConsumerWaitHandler.hpp index 64f8a2abc..6bc718173 100644 --- a/ddsrouter_utils/include/ddsrouter_utils/wait/ConsumerWaitHandler.hpp +++ b/ddsrouter_utils/include/ddsrouter_utils/wait/ConsumerWaitHandler.hpp @@ -74,7 +74,6 @@ class ConsumerWaitHandler : protected CounterWaitHandler void produce( T&& value); - /** * @brief Add a new value to the collection. Use copy constructor. * @@ -123,18 +122,11 @@ class ConsumerWaitHandler : protected CounterWaitHandler virtual void add_value_( T&& value) = 0; - - /** - * @brief Method that adds a new value in the collection. Use copy constructor. - * - * This method must be reimplemented in child classes specialized to the internal collection. - * - * This method is called without any mutex taken and afterwards the internal counter is increased by 1. - * - * @param value new value + /* + * NOTE: + * Function add_value_ called with const reference is not available because of weird behaviour of override methods + * in template classes. */ - virtual void add_value_( - const T& value) = 0; /** * @brief Method that gets next available value from the collection diff --git a/ddsrouter_utils/include/ddsrouter_utils/wait/DBQueueWaitHandler.hpp b/ddsrouter_utils/include/ddsrouter_utils/wait/DBQueueWaitHandler.hpp index a1add9237..15802996b 100644 --- a/ddsrouter_utils/include/ddsrouter_utils/wait/DBQueueWaitHandler.hpp +++ b/ddsrouter_utils/include/ddsrouter_utils/wait/DBQueueWaitHandler.hpp @@ -55,10 +55,6 @@ class DBQueueWaitHandler : public ConsumerWaitHandler void add_value_( T&& value) override; - //! Override of ConsumerWaitHandler method to copy a new value into the queue - void add_value_( - const T& value) override; - /** * @brief Override of \c ConsumerWaitHandler method to remove a value from the queue * diff --git a/ddsrouter_utils/include/ddsrouter_utils/wait/impl/ConsumerWaitHandler.ipp b/ddsrouter_utils/include/ddsrouter_utils/wait/impl/ConsumerWaitHandler.ipp index 1ee0bf4a5..3b3e15d3d 100644 --- a/ddsrouter_utils/include/ddsrouter_utils/wait/impl/ConsumerWaitHandler.ipp +++ b/ddsrouter_utils/include/ddsrouter_utils/wait/impl/ConsumerWaitHandler.ipp @@ -53,7 +53,7 @@ template void ConsumerWaitHandler::produce( const T& value) { - add_value_(value); + add_value_(T(value)); this->operator ++(); } diff --git a/ddsrouter_utils/include/ddsrouter_utils/wait/impl/DBQueueWaitHandler.ipp b/ddsrouter_utils/include/ddsrouter_utils/wait/impl/DBQueueWaitHandler.ipp index b540d4a99..e6bdaf35a 100644 --- a/ddsrouter_utils/include/ddsrouter_utils/wait/impl/DBQueueWaitHandler.ipp +++ b/ddsrouter_utils/include/ddsrouter_utils/wait/impl/DBQueueWaitHandler.ipp @@ -32,14 +32,6 @@ void DBQueueWaitHandler::add_value_( queue_.Push(std::move(value)); } -template -void DBQueueWaitHandler::add_value_( - const T& value) -{ - logDebug(DDSROUTER_WAIT_DBQUEUE, "Copying element to DBQueue."); - queue_.Push(value); -} - template T DBQueueWaitHandler::get_next_value_() { @@ -59,11 +51,11 @@ T DBQueueWaitHandler::get_next_value_() throw utils::InconsistencyException("Empty DBQueue, impossible to get value."); } - // TODO: Do it without copy - auto value = queue_.Front(); + // TODO: Do it with front and pop without copy + auto value = std::move(queue_.Front()); queue_.Pop(); - return value; + return std::move(value); } } /* namespace event */ diff --git a/ddsrouter_utils/src/cpp/thread/manager/AsyncManager.cpp b/ddsrouter_utils/src/cpp/thread/manager/AsyncManager.cpp new file mode 100644 index 000000000..38800ff1c --- /dev/null +++ b/ddsrouter_utils/src/cpp/thread/manager/AsyncManager.cpp @@ -0,0 +1,75 @@ +// Copyright 2022 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file AsyncManager.cpp + * + */ + +#include + +#include +#include + +namespace eprosima { +namespace ddsrouter { +namespace utils { +namespace thread { + +AsyncManager::~AsyncManager() +{ + logDebug(DDSROUTER_THREAD_ASYNCMANAGER, "Closing Async Manager."); + clean_threads(); + logDebug(DDSROUTER_THREAD_ASYNCMANAGER, "Async Manager closed."); +} + +void AsyncManager::execute(std::unique_ptr&& task) +{ + // Lock mutex + std::unique_lock lock(tasks_running_); + + // Get reference to task + ITask* task_reference = task.get(); + + // Create and Insert task in new index + // Being indexed in map the unique ptr will not be erased + tasks_running_.push_back( + std::make_pair( + std::make_unique( + [task_reference](){ + task_reference->operator()(); + } + ), + std::move(task) + ) + ); + + logDebug(DDSROUTER_THREAD_ASYNCMANAGER, "New thread executing task."); +} + +void AsyncManager::clean_threads() +{ + std::unique_lock lock(tasks_running_); + for (auto& task : tasks_running_) + { + task.first->join(); + logDebug(DDSROUTER_THREAD_ASYNCMANAGER, "Thread finished, removing task and thread."); + } + tasks_running_.clear(); +} + +} /* namespace thread */ +} /* namespace utils */ +} /* namespace ddsrouter */ +} /* namespace eprosima */ diff --git a/ddsrouter_utils/src/cpp/thread/manager/StdThreadPool.cpp b/ddsrouter_utils/src/cpp/thread/manager/StdThreadPool.cpp new file mode 100644 index 000000000..bd9621bc9 --- /dev/null +++ b/ddsrouter_utils/src/cpp/thread/manager/StdThreadPool.cpp @@ -0,0 +1,136 @@ +// Copyright 2022 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file StdThreadPool.cpp + * + */ + +#include + +#include +#include + +namespace eprosima { +namespace ddsrouter { +namespace utils { +namespace thread { + +StdThreadPool::StdThreadPool( + unsigned int n_threads, + bool start_running /* = true */) + : task_queue_(0, false) + , threads_() + , n_threads_(n_threads) +{ + if (start_running) + { + start(); + } +} + +StdThreadPool::~StdThreadPool() +{ + stop(); +} + +void StdThreadPool::start() +{ + if (!task_queue_.enabled()) + { + logDebug(DDSROUTER_STDTHREADPOOL, "Starting thread pool."); + + task_queue_.enable(); + + // Execute all threads + for (unsigned int i=0; i&& task) +{ + task_queue_.produce(std::move(task)); +} + +void StdThreadPool::thread_routine_() +{ + logDebug(DDSROUTER_STDTHREADPOOL, "Starting thread routine: " << std::this_thread::get_id() << "."); + + try + { + while (true) + { + logDebug( + DDSROUTER_STDTHREADPOOL, + "Thread: " << std::this_thread::get_id() << " free, getting new callback."); + + // Wait till there is a new task available + auto task = task_queue_.consume(); + + logDebug( + DDSROUTER_STDTHREADPOOL, + "Thread: " << std::this_thread::get_id() << " executing callback."); + + // Executing callback + task->operator()(); + + // NOTE: at this point task would not be further referenced and it will be destroyed. + } + } + catch (const utils::DisabledException& e) + { + logDebug(DDSROUTER_STDTHREADPOOL, "Stopping thread: " << std::this_thread::get_id() << "."); + } +} + +} /* namespace thread */ +} /* namespace utils */ +} /* namespace ddsrouter */ +} /* namespace eprosima */ diff --git a/ddsrouter_utils/src/cpp/thread/manager/SyncManager.cpp b/ddsrouter_utils/src/cpp/thread/manager/SyncManager.cpp new file mode 100644 index 000000000..1f85bf0d5 --- /dev/null +++ b/ddsrouter_utils/src/cpp/thread/manager/SyncManager.cpp @@ -0,0 +1,35 @@ +// Copyright 2022 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file SyncManager.cpp + * + */ + +#include + +namespace eprosima { +namespace ddsrouter { +namespace utils { +namespace thread { + +void SyncManager::execute(std::unique_ptr&& task) +{ + task->operator()(); +} + +} /* namespace thread */ +} /* namespace utils */ +} /* namespace ddsrouter */ +} /* namespace eprosima */ diff --git a/ddsrouter_utils/src/cpp/thread/task/OwnedTask.cpp b/ddsrouter_utils/src/cpp/thread/task/OwnedTask.cpp new file mode 100644 index 000000000..382c78242 --- /dev/null +++ b/ddsrouter_utils/src/cpp/thread/task/OwnedTask.cpp @@ -0,0 +1,47 @@ +// Copyright 2022 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file OwnedTask.cpp + * + */ + +#include + +namespace eprosima { +namespace ddsrouter { +namespace utils { +namespace thread { + +OwnedTask::OwnedTask(const std::function& callback) + : callback(callback) +{ + +} + +OwnedTask::OwnedTask(std::function&& callback) + : callback(std::move(callback)) +{ + +} + +void OwnedTask::operator()() noexcept +{ + callback.operator()(); +} + +} /* namespace thread */ +} /* namespace utils */ +} /* namespace ddsrouter */ +} /* namespace eprosima */ diff --git a/ddsrouter_utils/src/cpp/thread/task/ReferenceTask.cpp b/ddsrouter_utils/src/cpp/thread/task/ReferenceTask.cpp new file mode 100644 index 000000000..7aa842668 --- /dev/null +++ b/ddsrouter_utils/src/cpp/thread/task/ReferenceTask.cpp @@ -0,0 +1,45 @@ +// Copyright 2022 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file ReferenceTask.cpp + * + */ + +#include +#include + +namespace eprosima { +namespace ddsrouter { +namespace utils { +namespace thread { + +ReferenceTask::ReferenceTask(const std::function* callback_ptr) + : callback_ptr(callback_ptr) +{ + if (!callback_ptr) + { + throw InitializationException(STR_ENTRY << "ReferenceTask must be initialized with a valid ptr."); + } +} + +void ReferenceTask::operator()() noexcept +{ + callback_ptr->operator()(); +} + +} /* namespace thread */ +} /* namespace utils */ +} /* namespace ddsrouter */ +} /* namespace eprosima */ diff --git a/ddsrouter_utils/test/unittest/CMakeLists.txt b/ddsrouter_utils/test/unittest/CMakeLists.txt index b9920d846..b3d669d4b 100644 --- a/ddsrouter_utils/test/unittest/CMakeLists.txt +++ b/ddsrouter_utils/test/unittest/CMakeLists.txt @@ -19,7 +19,7 @@ add_subdirectory(macros) add_subdirectory(math) add_subdirectory(memory) add_subdirectory(return_code) -add_subdirectory(thread_pool) +add_subdirectory(thread) add_subdirectory(time) add_subdirectory(utils) add_subdirectory(wait) diff --git a/ddsrouter_utils/test/unittest/thread/CMakeLists.txt b/ddsrouter_utils/test/unittest/thread/CMakeLists.txt new file mode 100644 index 000000000..6d2cd1964 --- /dev/null +++ b/ddsrouter_utils/test/unittest/thread/CMakeLists.txt @@ -0,0 +1,18 @@ +# Copyright 2022 Proyectos y Sistemas de Mantenimiento SL (eProsima). +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Add test subdirectories +add_subdirectory(connector) +add_subdirectory(manager) +add_subdirectory(task) diff --git a/ddsrouter_utils/test/unittest/thread/connector/CMakeLists.txt b/ddsrouter_utils/test/unittest/thread/connector/CMakeLists.txt new file mode 100644 index 000000000..9858c5368 --- /dev/null +++ b/ddsrouter_utils/test/unittest/thread/connector/CMakeLists.txt @@ -0,0 +1,80 @@ + +################################### +# One Shot Connector Test +################################### + +set(TEST_NAME + OneShotConnectorTest) + +set(TEST_SOURCES + one_shot_connector_test.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/exception/Exception.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/Formatter.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/math/math.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/thread/task/OwnedTask.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/thread/manager/StdThreadPool.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/time/time_utils.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/time/Timer.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/utils.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/wait/CounterWaitHandler.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/wait/IntWaitHandler.cpp + ) + +set(TEST_LIST + one_shot_test_no_params + one_shot_test_int + one_shot_test_string + one_shot_test_bool_int_string + one_shot_test_complex_args + ) + +set(TEST_EXTRA_LIBRARIES + ${MODULE_DEPENDENCIES} + ) + +add_unittest_executable( + "${TEST_NAME}" + "${TEST_SOURCES}" + "${TEST_LIST}" + "${TEST_EXTRA_LIBRARIES}" + ) + +################################### +# Slot Connector Test +################################### + +set(TEST_NAME + SlotConnectorTest) + +set(TEST_SOURCES + slot_connector_test.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/exception/Exception.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/Formatter.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/math/math.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/thread/task/OwnedTask.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/thread/manager/StdThreadPool.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/time/time_utils.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/time/Timer.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/utils.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/wait/CounterWaitHandler.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/wait/IntWaitHandler.cpp + ) + +set(TEST_LIST + slot_test_no_params + slot_test_int + slot_test_string + slot_test_bool_int_string + slot_test_complex_args + ) + +set(TEST_EXTRA_LIBRARIES + ${MODULE_DEPENDENCIES} + ) + +add_unittest_executable( + "${TEST_NAME}" + "${TEST_SOURCES}" + "${TEST_LIST}" + "${TEST_EXTRA_LIBRARIES}" + ) diff --git a/ddsrouter_utils/test/unittest/thread/connector/one_shot_connector_test.cpp b/ddsrouter_utils/test/unittest/thread/connector/one_shot_connector_test.cpp new file mode 100644 index 000000000..944819f24 --- /dev/null +++ b/ddsrouter_utils/test/unittest/thread/connector/one_shot_connector_test.cpp @@ -0,0 +1,346 @@ +// Copyright 2021 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include + +#include +#include +#include +#include +#include +#include + +using namespace eprosima::ddsrouter; + +namespace test { + +utils::Duration_ms DEFAULT_TIME_TEST = 20u; +unsigned int DEFAULT_TIME_REPETITIONS = 20u; +unsigned int DEFAULT_THREADS = 3u; + +/** + * @brief Function that increases \c counter in \c increase so can be checked that has been successfully done. + * + * @param counter Wait Handler that holds the number of increases done to the same variable + * @param increase value to increase \c counter + */ +void test_lambda_increase_waiter( + event::IntWaitHandler& counter, + const int increase = 1) +{ + std::this_thread::sleep_for(std::chrono::milliseconds(DEFAULT_TIME_TEST)); + for (int i = 0; i < increase; ++i) + { + ++counter; + } +} + +void test_lambda_increase_waiter_add_string( + event::IntWaitHandler& counter, + utils::Atomicable& bucket, + std::string string_to_add, + int increase = 1, + bool append_string = true) +{ + std::this_thread::sleep_for(std::chrono::milliseconds(DEFAULT_TIME_TEST)); + + // Lock string that will be modified + if (append_string) + { + std::unique_lock> lock(bucket); + bucket.append(string_to_add); + } + + for (int i = 0; i < increase; ++i) + { + ++counter; + } +} + +/** + * Manager type to use. + * Using \c StdThreadPool because it is the one that will be used the most. + */ +using ManagerType = utils::thread::StdThreadPool; + +utils::thread::IManager* create_manager() +{ + return new ManagerType(DEFAULT_THREADS, true); +} + +} /* namespace eprosima */ + +/** + * Construct a StdThreadPool and uses OneShotConnector to send executions without parameters + * + * STEPS: + * - Create Manager + * - Call a OneShotConnector by copying an already existing std::function + * - Call OneShotConnector N times with a new created lambda each time + * - Check that the final value is the expected + */ +TEST(OneShotConnectorTest, one_shot_test_no_params) +{ + // Waiter to check result + event::IntWaitHandler counter(0); + + // Manager object + utils::thread::IManager* manager = test::create_manager(); + + // Execute lambda by copy increasing in 1 + std::function lambda = [&counter](){ test::test_lambda_increase_waiter(counter, 1); }; + utils::thread::SimpleOneShotConnector::execute( + manager, + lambda); + + // Wait for lambda to be called required times + counter.wait_equal(1); + // Check that lambda has been called only that amount of times + ASSERT_EQ(counter.get_value(), 1); + // Reset counter + counter.set_value(0); + + // Execute lambda N times by moving increasing in n + for (unsigned int i = 1; i <= test::DEFAULT_TIME_REPETITIONS; ++i) + { + utils::thread::SimpleOneShotConnector::execute( + manager, + [&counter, i](){ test::test_lambda_increase_waiter(counter, i); }); + } + + // Wait for lambda to be called required times + uint32_t target_value = utils::arithmetic_progression_sum(1, 1, test::DEFAULT_TIME_REPETITIONS); + counter.wait_equal(target_value); + // Check that lambda has been called only that amount of times + ASSERT_EQ(counter.get_value(), target_value); + + // Erase Manager + delete manager; +} + +TEST(OneShotConnectorTest, one_shot_test_int) +{ + // Waiter to check result + event::IntWaitHandler counter(0); + + // Manager object + utils::thread::IManager* manager = test::create_manager(); + + // Execute lambda by moving increasing in 1 + std::function lambda_move = [&counter](int x){ test::test_lambda_increase_waiter(counter, x); }; + utils::thread::OneShotConnector::execute( + manager, + std::move(lambda_move), + 1); + + // Wait for lambda to be called required times + counter.wait_equal(1); + // Check that lambda has been called only that amount of times + ASSERT_EQ(counter.get_value(), 1); + // Reset counter + counter.set_value(0); + + // Execute lambda N times by copy increasing in 1 + std::function lambda = [&counter](int x){ test::test_lambda_increase_waiter(counter, x); }; + for (unsigned int i = 1; i <= test::DEFAULT_TIME_REPETITIONS; ++i) + { + utils::thread::OneShotConnector::execute( + manager, + lambda, + static_cast(i)); + } + + // Wait for lambda to be called required times + uint32_t target_value = utils::arithmetic_progression_sum(1, 1, test::DEFAULT_TIME_REPETITIONS); + counter.wait_equal(target_value); + // Check that lambda has been called only that amount of times + ASSERT_EQ(counter.get_value(), target_value); + + // Erase Manager + delete manager; +} + +TEST(OneShotConnectorTest, one_shot_test_string) +{ + // Waiter to check result + event::IntWaitHandler counter(0); + // String to check result + utils::Atomicable bucket; + + // Manager object + utils::thread::IManager* manager = test::create_manager(); + + // Execute lambda by moving increasing in 1 + std::function lambda_move = + [&counter, &bucket](std::string s){ test::test_lambda_increase_waiter_add_string(counter, bucket, s, 1); }; + utils::thread::OneShotConnector::execute( + manager, + std::move(lambda_move), + "Hello"); + + // Wait for lambda to be called required times + counter.wait_equal(1); + // Check that lambda has been called only that amount of times and the string result is the correct + ASSERT_EQ(counter.get_value(), 1); + ASSERT_EQ(bucket, "Hello"); // It does not require mutex as the modification in test has already been done + // Reset counter + counter.set_value(0); + bucket.erase(); + + // Execute lambda N times by copy increasing in 1 + std::function lambda = + [&counter, &bucket](std::string s){ test::test_lambda_increase_waiter_add_string(counter, bucket, s, 1); }; + for (unsigned int i = 1; i <= test::DEFAULT_TIME_REPETITIONS; ++i) + { + // Call execute with a character adding 'a' + i (-1 to start from 'a') + utils::thread::OneShotConnector::execute( + manager, + lambda, + std::string(1, static_cast('a' + i - 1))); + } + + // Wait for lambda to be called required times + uint32_t target_value = test::DEFAULT_TIME_REPETITIONS; + counter.wait_equal(target_value); + // Check that lambda has been called only that amount of times + ASSERT_EQ(counter.get_value(), target_value); + + // Check the result string. It may not be in the order expected as the order of threads is not deterministic + // Thus, check that every char from 'a' to 'a' + N is in the string + for (char c = 'a'; c < 'a' + test::DEFAULT_TIME_REPETITIONS; ++c) + { + ASSERT_NE(bucket.find(c), std::string::npos) << c; + } + + // Erase Manager + delete manager; +} + +TEST(OneShotConnectorTest, one_shot_test_bool_int_string) +{ + // Waiter to check result + event::IntWaitHandler counter(0); + // String to check result + utils::Atomicable bucket; + + // Manager object + utils::thread::IManager* manager = test::create_manager(); + + // Execute lambda by moving increasing in 1 + std::function lambda_move = + [&counter, &bucket] + (bool b, int i, std::string s) + { test::test_lambda_increase_waiter_add_string(counter, bucket, s, i, b); }; + + utils::thread::OneShotConnector::execute( + manager, + std::move(lambda_move), + true, + 1, + "Hello"); + + // Wait for lambda to be called required times + counter.wait_equal(1); + // Check that lambda has been called only that amount of times and the string result is the correct + ASSERT_EQ(counter.get_value(), 1); + ASSERT_EQ(bucket, "Hello"); // It does not require mutex as the modification in test has already been done + // Reset counter + counter.set_value(0); + bucket.erase(); + + // Execute lambda N times by copy increasing in 1 + std::function lambda = + [&counter, &bucket] + (bool b, int i, std::string s) + { test::test_lambda_increase_waiter_add_string(counter, bucket, s, i, b); }; + + for (unsigned int i = 1; i <= test::DEFAULT_TIME_REPETITIONS; ++i) + { + // Whether it should add the char. Only add odd number chars + char c = static_cast('a' + i - 1); + bool append_char = static_cast(static_cast(c) % 2); + + // Call execute with a character adding 'a' + i (-1 to start from 'a') + utils::thread::OneShotConnector::execute( + manager, + lambda, + append_char, + i, + std::string(1, c)); + } + + // Wait for lambda to be called required times + uint32_t target_value = utils::arithmetic_progression_sum(1, 1, test::DEFAULT_TIME_REPETITIONS); + counter.wait_equal(target_value); + // Check that lambda has been called only that amount of times + ASSERT_EQ(counter.get_value(), target_value); + + // Check the result string. It may not be in the order expected as the order of threads is not deterministic + // Thus, check that every char from 'a' to 'a' + N is in the string + for (char c = 'a'; c < 'a' + test::DEFAULT_TIME_REPETITIONS; ++c) + { + bool append_char = static_cast(static_cast(c) % 2); + if (append_char) + { + ASSERT_NE(bucket.find(c), std::string::npos); + } + else + { + ASSERT_EQ(bucket.find(c), std::string::npos); + } + } + + // Erase Manager + delete manager; +} + +TEST(OneShotConnectorTest, one_shot_test_complex_args) +{ + // Waiter to check result + event::IntWaitHandler counter(0); + + // Manager object + utils::thread::IManager* manager = test::create_manager(); + + // Use a function reference and already created values to call in the Pool + for (unsigned int i = 1; i <= test::DEFAULT_TIME_REPETITIONS; ++i) + { + utils::thread::OneShotConnector::execute( + manager, + test::test_lambda_increase_waiter, + counter, + static_cast(i)); + } + + // Wait for lambda to be called required times + uint32_t target_value = utils::arithmetic_progression_sum(1, 1, test::DEFAULT_TIME_REPETITIONS); + counter.wait_equal(target_value); + + // Check that lambda has been called only that amount of times and the string result is the correct + ASSERT_EQ(counter.get_value(), target_value); + + delete manager; +} + +int main( + int argc, + char** argv) +{ + // utils::Log::SetVerbosity(utils::Log::Kind::Info); + // utils::Log::Flush(); + + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/ddsrouter_utils/test/unittest/thread/connector/slot_connector_test.cpp b/ddsrouter_utils/test/unittest/thread/connector/slot_connector_test.cpp new file mode 100644 index 000000000..b49370d24 --- /dev/null +++ b/ddsrouter_utils/test/unittest/thread/connector/slot_connector_test.cpp @@ -0,0 +1,333 @@ +// Copyright 2021 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include + +#include +#include +#include +#include +#include +#include + +using namespace eprosima::ddsrouter; + +namespace test { + +utils::Duration_ms DEFAULT_TIME_TEST = 20u; +unsigned int DEFAULT_TIME_REPETITIONS = 20u; +unsigned int DEFAULT_THREADS = 3u; + +/** + * @brief Function that increases \c counter in \c increase so can be checked that has been successfully done. + * + * @param counter Wait Handler that holds the number of increases done to the same variable + * @param increase value to increase \c counter + */ +void test_lambda_increase_waiter( + event::IntWaitHandler& counter, + int increase = 1) +{ + std::this_thread::sleep_for(std::chrono::milliseconds(DEFAULT_TIME_TEST)); + for (int i = 0; i < increase; ++i) + { + ++counter; + } +} + +void test_lambda_increase_waiter_add_string( + event::IntWaitHandler& counter, + utils::Atomicable& bucket, + std::string string_to_add, + int increase = 1, + bool append_string = true) +{ + std::this_thread::sleep_for(std::chrono::milliseconds(DEFAULT_TIME_TEST)); + + // Lock string that will be modified + if (append_string) + { + std::unique_lock> lock(bucket); + bucket.append(string_to_add); + } + + for (int i = 0; i < increase; ++i) + { + ++counter; + } +} + +/** + * Manager type to use. + * Using \c StdThreadPool because it is the one that will be used the most. + */ +using ManagerType = utils::thread::StdThreadPool; + +utils::thread::IManager* create_manager() +{ + return new ManagerType(DEFAULT_THREADS, true); +} + +} /* namespace eprosima */ + +/** + * Construct a StdThreadPool and uses SlotConnector to send executions without parameters + * + * STEPS: + * - Create Manager + * - Call a SlotConnector by copying an already existing std::function + * - Call SlotConnector N times with a new created lambda each time + * - Check that the final value is the expected + */ +TEST(SlotConnectorTest, slot_test_no_params) +{ + // Waiter to check result + event::IntWaitHandler counter(0); + + // Manager object + utils::thread::IManager* manager = test::create_manager(); + + // Create lambda increasing in 1 + std::function lambda = [&counter](){ test::test_lambda_increase_waiter(counter, 1); }; + // Create slot by copy + utils::thread::SimpleSlotConnector once_slot(manager, lambda); + + // Execute slot + once_slot.execute(); + + // Wait for lambda to be called required times + counter.wait_equal(1); + // Check that lambda has been called only that amount of times + ASSERT_EQ(counter.get_value(), 1); + // Reset counter + counter.set_value(0); + + // Execute lambda N times by moving increasing in 1 by moving + utils::thread::SimpleSlotConnector move_slot(manager, std::move(lambda)); + for (unsigned int i = 1; i <= test::DEFAULT_TIME_REPETITIONS; ++i) + { + move_slot.execute(); + } + + // Wait for lambda to be called required times + uint32_t target_value = test::DEFAULT_TIME_REPETITIONS; + counter.wait_equal(target_value); + // Check that lambda has been called only that amount of times + ASSERT_EQ(counter.get_value(), target_value); + + // Erase Manager + delete manager; +} + +TEST(SlotConnectorTest, slot_test_int) +{ + // Waiter to check result + event::IntWaitHandler counter(0); + + // Manager object + utils::thread::IManager* manager = test::create_manager(); + + // Create lambda increasing in 1 + std::function lambda = [&counter](int x){ test::test_lambda_increase_waiter(counter, x); }; + // Create slot by copy + utils::thread::SlotConnector once_slot(manager, lambda); + + // Execute slot + once_slot.execute(1); + + // Wait for lambda to be called required times + counter.wait_equal(1); + // Check that lambda has been called only that amount of times + ASSERT_EQ(counter.get_value(), 1); + // Reset counter + counter.set_value(0); + + // Execute lambda N times by moving increasing in 1 by moving + utils::thread::SlotConnector move_slot(manager, std::move(lambda)); + for (unsigned int i = 1; i <= test::DEFAULT_TIME_REPETITIONS; ++i) + { + move_slot.execute(i); + } + + // Wait for lambda to be called required times + uint32_t target_value = utils::arithmetic_progression_sum(1, 1, test::DEFAULT_TIME_REPETITIONS); + counter.wait_equal(target_value); + // Check that lambda has been called only that amount of times + ASSERT_EQ(counter.get_value(), target_value); + + // Erase Manager + delete manager; +} + +TEST(SlotConnectorTest, slot_test_string) +{ + // Waiter to check result + event::IntWaitHandler counter(0); + // String to check result + utils::Atomicable bucket; + + // Manager object + utils::thread::IManager* manager = test::create_manager(); + + // Create lambda increasing in 1 + std::function lambda = + [&counter, &bucket](std::string st){ test::test_lambda_increase_waiter_add_string(counter, bucket, st, 1); }; + // Create slot by copy + utils::thread::SlotConnector once_slot(manager, lambda); + + // Execute slot + once_slot.execute("Hello"); + + // Wait for lambda to be called required times + counter.wait_equal(1); + // Check that lambda has been called only that amount of times and the string result is the correct + ASSERT_EQ(counter.get_value(), 1); + ASSERT_EQ(bucket, "Hello"); // It does not require mutex as the modification in test has already been done + // Reset counter + counter.set_value(0); + bucket.erase(); + + // Execute lambda N times by moving increasing in 1 by moving + utils::thread::SlotConnector move_slot(manager, std::move(lambda)); + for (unsigned int i = 1; i <= test::DEFAULT_TIME_REPETITIONS; ++i) + { + move_slot.execute(std::string(1, static_cast('a' + i - 1))); + } + + // Wait for lambda to be called required times + uint32_t target_value = test::DEFAULT_TIME_REPETITIONS; + counter.wait_equal(target_value); + // Check that lambda has been called only that amount of times + ASSERT_EQ(counter.get_value(), target_value); + + // Check the result string. It may not be in the order expected as the order of threads is not deterministic + // Thus, check that every char from 'a' to 'a' + N is in the string + for (char c = 'a'; c < 'a' + test::DEFAULT_TIME_REPETITIONS; ++c) + { + ASSERT_NE(bucket.find(c), std::string::npos) << c; + } + + // Erase Manager + delete manager; +} + +TEST(SlotConnectorTest, slot_test_bool_int_string) +{ + // Waiter to check result + event::IntWaitHandler counter(0); + // String to check result + utils::Atomicable bucket; + + // Manager object + utils::thread::IManager* manager = test::create_manager(); + + // Create lambda increasing in n + std::function lambda = + [&counter, &bucket] + (bool b, int i, std::string s) + { test::test_lambda_increase_waiter_add_string(counter, bucket, s, i, b); }; + + // Create slot by copy + utils::thread::SlotConnector once_slot(manager, lambda); + + // Execute slot + once_slot.execute(true, 1, "Hello"); + + // Wait for lambda to be called required times + counter.wait_equal(1); + // Check that lambda has been called only that amount of times and the string result is the correct + ASSERT_EQ(counter.get_value(), 1); + ASSERT_EQ(bucket, "Hello"); // It does not require mutex as the modification in test has already been done + // Reset counter + counter.set_value(0); + bucket.erase(); + + // Execute lambda N times by moving increasing in n by moving + utils::thread::SlotConnector move_slot(manager, std::move(lambda)); + for (unsigned int i = 1; i <= test::DEFAULT_TIME_REPETITIONS; ++i) + { + // Whether it should add the char. Only add odd number chars + char c = static_cast('a' + i - 1); + bool append_char = static_cast(static_cast(c) % 2); + + move_slot.execute( + append_char, + i, + std::string(1, c)); + } + + // Wait for lambda to be called required times + uint32_t target_value = utils::arithmetic_progression_sum(1, 1, test::DEFAULT_TIME_REPETITIONS); + counter.wait_equal(target_value); + // Check that lambda has been called only that amount of times + ASSERT_EQ(counter.get_value(), target_value); + + // Check the result string. It may not be in the order expected as the order of threads is not deterministic + // Thus, check that every char from 'a' to 'a' + N is in the string + for (char c = 'a'; c < 'a' + test::DEFAULT_TIME_REPETITIONS; ++c) + { + bool append_char = static_cast(static_cast(c) % 2); + if (append_char) + { + ASSERT_NE(bucket.find(c), std::string::npos); + } + else + { + ASSERT_EQ(bucket.find(c), std::string::npos); + } + } + + // Erase Manager + delete manager; +} + +TEST(SlotConnectorTest, slot_test_complex_args) +{ + // Waiter to check result + event::IntWaitHandler counter(0); + + // Manager object + utils::thread::IManager* manager = test::create_manager(); + + // Use a function reference and already created values to call in the Pool + utils::thread::SlotConnector move_slot( + manager, + test::test_lambda_increase_waiter); + + for (unsigned int i = 1; i <= test::DEFAULT_TIME_REPETITIONS; ++i) + { + move_slot.execute(counter, static_cast(i)); + } + + // Wait for lambda to be called required times + uint32_t target_value = utils::arithmetic_progression_sum(1, 1, test::DEFAULT_TIME_REPETITIONS); + counter.wait_equal(target_value); + + // Check that lambda has been called only that amount of times and the string result is the correct + ASSERT_EQ(counter.get_value(), target_value); + + delete manager; +} + +int main( + int argc, + char** argv) +{ + // utils::Log::SetVerbosity(utils::Log::Kind::Info); + // utils::Log::Flush(); + + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/ddsrouter_utils/test/unittest/thread/manager/CMakeLists.txt b/ddsrouter_utils/test/unittest/thread/manager/CMakeLists.txt new file mode 100644 index 000000000..e4093cbcf --- /dev/null +++ b/ddsrouter_utils/test/unittest/thread/manager/CMakeLists.txt @@ -0,0 +1,90 @@ +# Copyright 2021 Proyectos y Sistemas de Mantenimiento SL (eProsima). +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +################################### +# IManager Specializations Test +################################### + +set(TEST_NAME + ParametrizedThreadManagerTest) + +set(TEST_SOURCES + manager_interface_test.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/exception/Exception.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/Formatter.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/math/math.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/thread/task/OwnedTask.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/thread/manager/SyncManager.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/thread/manager/AsyncManager.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/thread/manager/StdThreadPool.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/time/time_utils.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/time/Timer.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/utils.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/wait/CounterWaitHandler.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/wait/IntWaitHandler.cpp + ) + +set(TEST_LIST + manager_execute + ) + +set(TEST_EXTRA_LIBRARIES + ${MODULE_DEPENDENCIES} + ) + +add_unittest_executable( + "${TEST_NAME}" + "${TEST_SOURCES}" + "${TEST_LIST}" + "${TEST_EXTRA_LIBRARIES}" + ) + +################################### +# StdThreadPool Test +################################### + +set(TEST_NAME + StdThreadPoolTest) + +set(TEST_SOURCES + std_thread_pool_test.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/exception/Exception.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/Formatter.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/math/math.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/thread/task/OwnedTask.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/thread/manager/StdThreadPool.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/time/time_utils.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/time/Timer.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/utils.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/wait/CounterWaitHandler.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/wait/IntWaitHandler.cpp + ) + +set(TEST_LIST + pool_1_threads_1_tasks + pool_1_threads_M_tasks + pool_N_threads_N_tasks + pool_N_threads_NM_tasks + ) + +set(TEST_EXTRA_LIBRARIES + ${MODULE_DEPENDENCIES} + ) + +add_unittest_executable( + "${TEST_NAME}" + "${TEST_SOURCES}" + "${TEST_LIST}" + "${TEST_EXTRA_LIBRARIES}" + ) diff --git a/ddsrouter_utils/test/unittest/thread/manager/manager_interface_test.cpp b/ddsrouter_utils/test/unittest/thread/manager/manager_interface_test.cpp new file mode 100644 index 000000000..ca11b8fc9 --- /dev/null +++ b/ddsrouter_utils/test/unittest/thread/manager/manager_interface_test.cpp @@ -0,0 +1,152 @@ +// Copyright 2021 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace eprosima::ddsrouter; + +namespace test { + +utils::Duration_ms DEFAULT_TIME_TEST = 20u; +unsigned int DEFAULT_TIME_REPETITIONS = 20u; +unsigned int DEFAULT_THREADS = 3u; + +/** + * @brief Function that increases \c counter in \c increase so can be checked that has been successfully done. + * + * @param counter Wait Handler that holds the number of increases done to the same variable + * @param increase value to increase \c counter + */ +void test_lambda_increase_waiter( + event::IntWaitHandler& counter, + unsigned int increase = 1) +{ + std::this_thread::sleep_for(std::chrono::milliseconds(DEFAULT_TIME_TEST)); + for (unsigned int i = 0; i < increase; ++i) + { + ++counter; + } +} + +/** + * Task type to use. + * Using \c OwnedTask because it is the simpler and easier one. + */ +using TaskType = utils::thread::OwnedTask; + +template +utils::thread::IManager* create_manager_interface() +{ + return new Manager(); +} + +template <> +utils::thread::IManager* create_manager_interface() +{ + utils::thread::StdThreadPool* pool = new utils::thread::StdThreadPool(DEFAULT_THREADS, false); + pool->start(); + return pool; +} + +} /* namespace eprosima */ + +using namespace eprosima::ddsrouter; + +template +struct ThreadManagerTest : public ::testing::Test +{}; + +TYPED_TEST_SUITE_P(ThreadManagerTest); + +TYPED_TEST_P(ThreadManagerTest, manager_execute) +{ + // Waiter to check result + event::IntWaitHandler counter(0); + + // Create manager + utils::thread::IManager* manager( + test::create_manager_interface()); + + // Execute lambda increasing in 1 + { + auto lambda = [&counter](){ test::test_lambda_increase_waiter(counter, 1); }; + test::TaskType task(lambda); + manager->execute(std::make_unique(lambda)); + } + // Wait for lambda to be called required times + counter.wait_equal(1); + // Check that lambda has been called only that amount of times + ASSERT_EQ(counter.get_value(), 1); + // Reset counter + counter.set_value(0); + + // Execute lambda N times by copy increasing in 1 + for (unsigned int i = 1; i <= test::DEFAULT_TIME_REPETITIONS; ++i) + { + manager->execute( + std::make_unique( + [&counter, i](){ test::test_lambda_increase_waiter(counter, i); })); + } + + // Wait for lambda to be called required times + uint32_t target_value = utils::arithmetic_progression_sum(1, 1, test::DEFAULT_TIME_REPETITIONS); + counter.wait_equal(target_value); + + // Erase Manager + delete manager; + + // Check that lambda has been called only that amount of times + ASSERT_EQ(counter.get_value(), target_value); +} + +// Register test class and test cases +REGISTER_TYPED_TEST_SUITE_P( + ThreadManagerTest, + manager_execute +); + +// Set types used in parametrization +typedef ::testing::Types< + utils::thread::SyncManager, + utils::thread::AsyncManager, + utils::thread::StdThreadPool + > CaseTypes; + +// Generate each test case for each type case +INSTANTIATE_TYPED_TEST_SUITE_P( + ParametrizedThreadManagerTest, + ThreadManagerTest, + CaseTypes); + + +int main( + int argc, + char** argv) +{ + // utils::Log::SetVerbosity(utils::Log::Kind::Info); + + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/ddsrouter_utils/test/unittest/thread/manager/std_thread_pool_test.cpp b/ddsrouter_utils/test/unittest/thread/manager/std_thread_pool_test.cpp new file mode 100644 index 000000000..85db94c17 --- /dev/null +++ b/ddsrouter_utils/test/unittest/thread/manager/std_thread_pool_test.cpp @@ -0,0 +1,170 @@ +// Copyright 2021 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include + +#include + +#include +#include +#include +#include +#include +#include + +using namespace eprosima::ddsrouter; + +namespace eprosima { +namespace ddsrouter { +namespace utils { +namespace test { + +constexpr const utils::Duration_ms DEFAULT_TIME_TEST = 200u; // T +constexpr const utils::Duration_ms RESIDUAL_TIME_TEST = DEFAULT_TIME_TEST / 2u; // dT + +constexpr const uint32_t N_THREADS_IN_TEST = 10; // N +constexpr const uint32_t N_EXECUTIONS_IN_TEST = 20; // M + +void test_lambda_increase_waiter( + event::IntWaitHandler& counter, + unsigned int increase = 1) +{ + std::this_thread::sleep_for(std::chrono::milliseconds(DEFAULT_TIME_TEST)); + + for (unsigned int i = 0; i < increase; ++i) + { + ++counter; + } +} + +/** + * Task type to use. + * Using \c OwnedTask because it is the simpler and easier one. + */ +using TaskType = utils::thread::OwnedTask; + +/** + * TESTS EXPLANATION + * These tests create a StdThreadPool and execute tasks in it. + * + * Tasks: + * Tasks objects used are OwnedTask and are created in the moment to send it to execute, so they will be destroyed + * automatically when finishing the task. + * + * Task function: + * The function used waits for a time T and increases a WaitHandler value the amount of time given by parameter. + * The WaitHandler is used so the test can wait in main thread to the expected value. + * + * Parameteres: + * Two parameters are used within the tests: + * @param n_threads Number of threads + * @param m_tasks Number of repetitions (#tasks added to pool) + * + * @warning if \c m_tasks is not dividible by \c n_threads the test may not work as expected because of + * non exact division solution. + */ +void test_thread_pool_with_parameters( + unsigned int n_threads, + unsigned int m_tasks) +{ + // Create thread_pool + thread::StdThreadPool thread_pool(n_threads, false); + thread_pool.start(); + + // Create timer to know the task has been executed in the time expected + utils::Timer timer; + + // Counter Wait Handler to wait for the task to be executed and check the final value + event::IntWaitHandler waiter(0); + + // Emit N tasks n times + for (uint32_t i = 1; i <= m_tasks; ++i) + { + thread_pool.execute( + std::make_unique( + [&waiter, i] () { test::test_lambda_increase_waiter(waiter, i); } + ) + ); + } + + // Wait for counter value to be greater than 0 (so 1 task is being executed) + uint32_t target_value = utils::arithmetic_progression_sum(1, 1, m_tasks); + waiter.wait_greater_equal_than(target_value); + + auto time_elapsed = timer.elapsed(); + + // Check that the task has been executed in more than the time expected for lambda and less than expected + // time and residual; and that function has been called exactly once + double lower_time_expected = test::DEFAULT_TIME_TEST * std::floor(m_tasks / n_threads); + double higher_time_expected = test::DEFAULT_TIME_TEST * std::ceil(m_tasks / n_threads) + test::RESIDUAL_TIME_TEST; + + ASSERT_GE(time_elapsed, lower_time_expected); + ASSERT_LE(time_elapsed, higher_time_expected); + ASSERT_EQ(waiter.get_value(), target_value); + + // Thread Pool is destroyed automatically and without errors +} + +} /* namespace test */ +} /* namespace utils */ +} /* namespace ddsrouter */ +} /* namespace eprosima */ + +using namespace utils; + +/** + * Emit 1 tasks to a ThreadPool with 1 thread. + * Check that time elapsed is > T + */ +TEST(StdThreadPoolTest, pool_1_threads_1_tasks) +{ + test::test_thread_pool_with_parameters(1, 1); +} + +/** + * Emit M tasks to a ThreadPool with 1 thread. + */ +TEST(StdThreadPoolTest, pool_1_threads_M_tasks) +{ + test::test_thread_pool_with_parameters(1, test::N_EXECUTIONS_IN_TEST); +} + +/** + * Emit N tasks to a ThreadPool with N threads. + */ +TEST(StdThreadPoolTest, pool_N_threads_N_tasks) +{ + test::test_thread_pool_with_parameters(test::N_THREADS_IN_TEST, test::N_THREADS_IN_TEST); +} + +/** + * Emit M*N tasks to a ThreadPool with N threads. + */ +TEST(StdThreadPoolTest, pool_N_threads_NM_tasks) +{ + test::test_thread_pool_with_parameters( + test::N_THREADS_IN_TEST, + test::N_THREADS_IN_TEST * test::N_EXECUTIONS_IN_TEST); +} + +int main( + int argc, + char** argv) +{ + // eprosima::ddsxrouter::utils::Log::SetVerbosity(utils::Log::Kind::Info); + + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/ddsrouter_utils/test/unittest/thread_pool/CMakeLists.txt b/ddsrouter_utils/test/unittest/thread/task/CMakeLists.txt similarity index 80% rename from ddsrouter_utils/test/unittest/thread_pool/CMakeLists.txt rename to ddsrouter_utils/test/unittest/thread/task/CMakeLists.txt index 00f047678..6c8084ec8 100644 --- a/ddsrouter_utils/test/unittest/thread_pool/CMakeLists.txt +++ b/ddsrouter_utils/test/unittest/thread/task/CMakeLists.txt @@ -13,29 +13,27 @@ # limitations under the License. ################################### -# Slot Thread Pool execution Test +# ITask Specializations Test ################################### set(TEST_NAME - slot_thread_pool_test) + ParametrizedThreadTaskTest) set(TEST_SOURCES - slot_thread_pool_test.cpp - ${PROJECT_SOURCE_DIR}/src/cpp/thread_pool/pool/SlotThreadPool.cpp - ${PROJECT_SOURCE_DIR}/src/cpp/thread_pool/task/TaskId.cpp - ${PROJECT_SOURCE_DIR}/src/cpp/time/Timer.cpp + task_interface_test.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/exception/Exception.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/Formatter.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/thread/task/OwnedTask.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/thread/task/ReferenceTask.cpp ${PROJECT_SOURCE_DIR}/src/cpp/time/time_utils.cpp - ${PROJECT_SOURCE_DIR}/src/cpp/wait/IntWaitHandler.cpp - ${PROJECT_SOURCE_DIR}/src/cpp/wait/CounterWaitHandler.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/time/Timer.cpp ${PROJECT_SOURCE_DIR}/src/cpp/utils.cpp - ${PROJECT_SOURCE_DIR}/src/cpp/Formatter.cpp - ${PROJECT_SOURCE_DIR}/src/cpp/exception/Exception.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/wait/CounterWaitHandler.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/wait/IntWaitHandler.cpp ) set(TEST_LIST - pool_one_thread_one_slot - pool_one_thread_n_slots - pool_n_threads_one_slot + task_operator ) set(TEST_EXTRA_LIBRARIES diff --git a/ddsrouter_utils/test/unittest/thread/task/task_interface_test.cpp b/ddsrouter_utils/test/unittest/thread/task/task_interface_test.cpp new file mode 100644 index 000000000..270a54bde --- /dev/null +++ b/ddsrouter_utils/test/unittest/thread/task/task_interface_test.cpp @@ -0,0 +1,159 @@ +// Copyright 2021 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include + +#include +#include +#include + +#include +#include +#include + +using namespace eprosima::ddsrouter; + +namespace test { + +utils::Duration_ms DEFAULT_TIME_TEST = 20u; +unsigned int DEFAULT_TIME_REPETITIONS = 20u; + +/** + * @brief Function that increases \c counter in \c increase so can be checked that has been successfully done. + * + * @param counter Wait Handler that holds the number of increases done to the same variable + * @param increase value to increase \c counter + */ +void test_lambda_increase_waiter( + event::IntWaitHandler& counter, + unsigned int increase = 1) +{ + utils::sleep_for(DEFAULT_TIME_TEST); + for (unsigned int i = 0; i < increase; ++i) + { + ++counter; + } +} + +template +utils::thread::ITask* create_task_specialization(std::function* callback); + +template <> +utils::thread::ITask* create_task_specialization( + std::function* callback) +{ + // Copy callback value inside new object + return new utils::thread::ReferenceTask(callback); +} + +template <> +utils::thread::ITask* create_task_specialization( + std::function* callback) +{ + // Copy callback value inside new object + return new utils::thread::OwnedTask(*callback); +} + +template <> +utils::thread::ITask* create_task_specialization>( + std::function* callback) +{ + // Copy callback value inside new object + return new utils::thread::ArgsOwnedTask<>(*callback); +} + +template <> +utils::thread::ITask* create_task_specialization>( + std::function* callback) +{ + // Copy callback value inside new object + return new utils::thread::ArgsOwnedTask( + [callback](int x){ callback->operator()(); }, + 1); +} + +} /* namespace test */ + +// Empty class to parametrized tests +template +struct ThreadTaskTest : public ::testing::Test +{}; +// Needed gtest macro +TYPED_TEST_SUITE_P(ThreadTaskTest); + +/** + * This tests operator() of every specialization if ITask + * + * Uses a IntWaitHandler to increase a value and at the same time wait for it to be updated to a exact value. + * It is increased from a function executed inside the ITask, and the wait for it to be updated and check the value. + * + */ +TYPED_TEST_P(ThreadTaskTest, task_operator) +{ + // Waiter to check result + event::IntWaitHandler counter(0); + + // Function object to create tasks + std::function task_function( + [&counter](){ test::test_lambda_increase_waiter(counter, 1); }); + + // Create task + utils::thread::ITask* task( + test::create_task_specialization( + &task_function)); + + // Execute lambda 1 time + task->operator()(); + counter.wait_equal(1); + counter.set_value(0); + + // Execute lambda N times + for (unsigned int i = 0; i < test::DEFAULT_TIME_REPETITIONS; ++i) + { + task->operator()(); + } + counter.wait_equal(test::DEFAULT_TIME_REPETITIONS); + + // Erase Task + delete task; +} + +// Register test class and test cases +REGISTER_TYPED_TEST_SUITE_P( + ThreadTaskTest, + task_operator +); + +// Set types used in parametrization +typedef ::testing::Types< + utils::thread::ReferenceTask, + utils::thread::OwnedTask, + utils::thread::ArgsOwnedTask<>, + utils::thread::ArgsOwnedTask + > CaseTypes; + +// Generate each test case for each type case +INSTANTIATE_TYPED_TEST_SUITE_P( + ParametrizedThreadTaskTest, + ThreadTaskTest, + CaseTypes); + +int main( + int argc, + char** argv) +{ + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/ddsrouter_utils/test/unittest/thread_pool/pool/SlotThreadPool.hpp b/ddsrouter_utils/test/unittest/thread_pool/pool/SlotThreadPool.hpp new file mode 100644 index 000000000..fa15909e4 --- /dev/null +++ b/ddsrouter_utils/test/unittest/thread_pool/pool/SlotThreadPool.hpp @@ -0,0 +1,170 @@ +// Copyright 2022 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file SlotThreadPool.hpp + * + * This file contains class SlotThreadPool definition. + */ + +#ifndef _DDSROUTERTHREAD__SRC_CPP_POOL_SLOTTHREADPOOL_HPP_ +#define _DDSROUTERTHREAD__SRC_CPP_POOL_SLOTTHREADPOOL_HPP_ + +#include +#include +#include + +#include +#include +#include +#include +#include + +namespace eprosima { +namespace ddsrouter { +namespace utils { + +/** + * This class represents a thread pool that can register tasks inside. + * + * This is another implementation of \c ThreadPool but with the difference that if does not contain actual + * task objects, but only task ids. These ids are much more efficient than actual task objects in order to copy + * or store them. Each id identifies one and only one task. By adding an id to the queue, the thread that consumes + * it will execute the task associated, that must be previously registered. + * + * @note Qt notation is used for this implementation, so \c emit means to add a task to the queue and + * \c slot means to register a task. + * + * @note This class does not inherit from \c ThreadPool as methods and internal variables are not shared, + * even when both solve the same problem in similar ways. + */ +class SlotThreadPool +{ +public: + + /** + * @brief Construct a new Slot Thread Pool object + * + * This creates the internal threads in the pool and make them wait for tasks. + * Each thread is executed with function \c thread_routine_ . + * + * @param n_threads number of threads in the pool + */ + DDSROUTER_UTILS_DllAPI SlotThreadPool( + const uint32_t n_threads); + + /** + * @brief Destroy the Thread Pool object + * + * It disables the queue, what makes the threads to stop to finish their tasks and exit. + */ + DDSROUTER_UTILS_DllAPI ~SlotThreadPool(); + + /** + * Enable Slot Thread Pool in case it is not enabled + * Does nothing if it is already enabled + */ + DDSROUTER_UTILS_DllAPI void enable() noexcept; + + /** + * Disable Slot Thread Pool in case it is enabled + * Does nothing if it is already disabled + * + * It stops all the threads running, not allowing them to take new tasks. + * It blocks until every thread has finished executing. + * It does not remove tasks from queue. + * + * @todo this is a first approach, a new design should be taken into account to not block until threads finish + * when disabling the thread pool, but joining them afterwards. + */ + DDSROUTER_UTILS_DllAPI void disable() noexcept; + + /** + * @brief Add a task Id (that represents a registered Task) to be executed by the threads in the pool + * + * This add \c task_id to the queue, and the task identified will be executed by the threads in the pool. + * + * @pre \c task_id must identify a registered task. + * + * @param task_id task Id to be added to the queue so task identified is executed. + */ + DDSROUTER_UTILS_DllAPI void emit( + const TaskId& task_id); + + /** + * @brief Register a new task identified by a task Id. + * + * This method registers a new task that will be executed when its task Id is added to the queue. + * + * @param task_id task Id that identifies the task. + * @param task task to be registered. + */ + DDSROUTER_UTILS_DllAPI void slot( + const TaskId& task_id, + Task&& task); + +protected: + + /** + * @brief This is the function that every thread in the pool executes. + * + * This function enters an infinite loop where it \c consume an element from the queue (this means it will + * wait for an element to be added to the queue in case it is empty, and it will take one if any available). + * Once a task id is available, it will get the task refering this id and execute it + * Afterwards it will return to consume another task id. + * This will be repeated until the queue is disabled, what is communicated by a \c DisabledException . + */ + void thread_routine_(); + + unsigned int number_of_threads_; + + /** + * @brief Double Queue Wait Handler to store task ids + * + * This double queue implement methods \c produce , to add tasks to the queue, and \c consume to wait until any + * task is available, and return the next task available. + * + * It will retrieve tasks in FIFO order. + * Produce and consume methods are not reciprocally blocking. + */ + event::DBQueueWaitHandler task_queue_; + + /** + * @brief Threads container + * + * @note \c CustomThread are used instead of \c std::thread so some extra logic could be added to threads + * in future implementation (e.g. performance info). + */ + std::vector threads_; + + /** + * @brief Map of tasks indexed by their task Id. + * + * This object is protected by the \c slots_mutex_ mutex. + */ + std::map slots_; + + //! Protects access to \c slots_ . + std::mutex slots_mutex_; + + //! Whether the object is currently enabled + std::atomic enabled_; + +}; + +} /* namespace utils */ +} /* namespace ddsrouter */ +} /* namespace eprosima */ + +#endif /* _DDSROUTERTHREAD__SRC_CPP_POOL_SLOTTHREADPOOL_HPP_ */ diff --git a/ddsrouter_utils/test/unittest/thread_pool/slot_thread_pool_test.cpp b/ddsrouter_utils/test/unittest/thread_pool/slot_thread_pool_test.cpp deleted file mode 100644 index 788e1944d..000000000 --- a/ddsrouter_utils/test/unittest/thread_pool/slot_thread_pool_test.cpp +++ /dev/null @@ -1,180 +0,0 @@ -// Copyright 2021 Proyectos y Sistemas de Mantenimiento SL (eProsima). -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include -#include - -#include -#include -#include - -#include - -namespace eprosima { -namespace ddsrouter { -namespace utils { -namespace test { - -eprosima::ddsrouter::utils::Duration_ms DEFAULT_TIME_TEST = 200u; -eprosima::ddsrouter::utils::Duration_ms RESIDUAL_TIME_TEST = DEFAULT_TIME_TEST / 2u; - -uint32_t N_THREADS_IN_TEST = 10; -uint32_t N_EXECUTIONS_IN_TEST = 5; - -void test_lambda_increase_waiter( - eprosima::ddsrouter::event::IntWaitHandler& counter, - unsigned int increase = 1) -{ - std::this_thread::sleep_for(std::chrono::milliseconds(DEFAULT_TIME_TEST)); - - for (unsigned int i = 0; i < increase; ++i) - { - ++counter; - } -} - -} /* namespace test */ -} /* namespace utils */ -} /* namespace ddsrouter */ -} /* namespace eprosima */ - -using namespace eprosima::ddsrouter::utils; - -/** - * Emit N tasks to a ThreadPool with one thread by storing slot. - */ -TEST(slot_thread_pool_test, pool_one_thread_one_slot) -{ - // Create thread_pool - SlotThreadPool thread_pool(1); - thread_pool.enable(); - - // Counter Wait Handler to wait for the task to be executed and check the final value - eprosima::ddsrouter::event::IntWaitHandler waiter(0); - - // Create slot - TaskId task_id(27); - thread_pool.slot( - task_id, - [&waiter] - () - { - test::test_lambda_increase_waiter(waiter); - } - ); - - // Emit task n times - for (uint32_t i = 0; i < test::N_EXECUTIONS_IN_TEST; ++i) - { - thread_pool.emit(task_id); - } - - // Wait for counter value to be greater than 0 (so 1 task is being executed) - waiter.wait_greater_equal_than(test::N_EXECUTIONS_IN_TEST); - - ASSERT_EQ(waiter.get_value(), test::N_EXECUTIONS_IN_TEST); -} - -/** - * Emit N tasks to a ThreadPool with one thread by storing N slots. - */ -TEST(slot_thread_pool_test, pool_one_thread_n_slots) -{ - // Create thread_pool - SlotThreadPool thread_pool(1); - thread_pool.enable(); - - // Counter Wait Handler to wait for the task to be executed and check the final value - eprosima::ddsrouter::event::IntWaitHandler waiter(0); - // Create timer to know the task has been executed in the time expected - eprosima::ddsrouter::utils::Timer timer; - - // Create slot - for (uint32_t i = 0; i < test::N_EXECUTIONS_IN_TEST; ++i) - { - TaskId task_id(i); - thread_pool.slot( - task_id, - [&waiter, &i] - () - { - test::test_lambda_increase_waiter(waiter, i); - } - ); - } - - // Emit every task 1 time - for (uint32_t i = 0; i < test::N_EXECUTIONS_IN_TEST; ++i) - { - thread_pool.emit(TaskId(i)); - } - - // Wait for counter value to be M being M = N*(N+1)/2 that is the increase value that should be achieved - waiter.wait_greater_equal_than((test::N_EXECUTIONS_IN_TEST* (test::N_EXECUTIONS_IN_TEST + 1)) / 2); - - ASSERT_EQ(waiter.get_value(), (test::N_EXECUTIONS_IN_TEST* (test::N_EXECUTIONS_IN_TEST + 1)) / 2); -} - -/** - * Emit N*T tasks to a ThreadPool with T threads by storing 1 slot. - */ -TEST(slot_thread_pool_test, pool_n_threads_one_slot) -{ - // Create thread_pool - SlotThreadPool thread_pool(test::N_THREADS_IN_TEST); - thread_pool.enable(); - - // Counter Wait Handler to wait for the task to be executed and check the final value - eprosima::ddsrouter::event::IntWaitHandler waiter(0); - // Create timer to know the task has been executed in the time expected - eprosima::ddsrouter::utils::Timer timer; - - // Create slot - TaskId task_id(27); - thread_pool.slot( - task_id, - [&waiter] - () - { - test::test_lambda_increase_waiter(waiter); - } - ); - - // Emit task n times - for (uint32_t i = 0; i < test::N_EXECUTIONS_IN_TEST* test::N_THREADS_IN_TEST; ++i) - { - thread_pool.emit(task_id); - } - - // Wait for counter value to be greater than 0 (so 1 task is being executed) - waiter.wait_greater_equal_than(test::N_EXECUTIONS_IN_TEST* test::N_THREADS_IN_TEST); - - auto time_elapsed = timer.elapsed(); - - // Check that the task has been executed in more than waiting time and less than waiting time + residual time - // and that function has been called exactly once - ASSERT_GE(time_elapsed, test::DEFAULT_TIME_TEST* test::N_EXECUTIONS_IN_TEST); - ASSERT_LE(time_elapsed, test::DEFAULT_TIME_TEST* test::N_EXECUTIONS_IN_TEST + test::RESIDUAL_TIME_TEST); - ASSERT_EQ(waiter.get_value(), test::N_EXECUTIONS_IN_TEST* test::N_THREADS_IN_TEST); -} - -int main( - int argc, - char** argv) -{ - // eprosima::ddsxrouter::utils::Log::SetVerbosity(eprosima::ddsrouter::utils::Log::Kind::Info); - - ::testing::InitGoogleTest(&argc, argv); - return RUN_ALL_TESTS(); -} diff --git a/ddsrouter_utils/test/unittest/thread_pool/task/Task.hpp b/ddsrouter_utils/test/unittest/thread_pool/task/Task.hpp new file mode 100644 index 000000000..c57c1800e --- /dev/null +++ b/ddsrouter_utils/test/unittest/thread_pool/task/Task.hpp @@ -0,0 +1,45 @@ +// Copyright 2022 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file Task.hpp + * + * This file contains class Task definition. + */ + +#ifndef _DDSROUTERTHREAD_TASK_TASK_HPP_ +#define _DDSROUTERTHREAD_TASK_TASK_HPP_ + +#include + +namespace eprosima { +namespace ddsrouter { +namespace utils { + +/** + * This class represents a task that can be executed by a Thread Pool. + * + * @note this first implementation only uses this class as a \c std::function for simplicity. + * In future implementations, this could be a more complex class. + */ +class Task : public std::function +{ + using std::function::function; +}; + +} /* namespace utils */ +} /* namespace ddsrouter */ +} /* namespace eprosima */ + +#endif /* _DDSROUTERTHREAD_TASK_TASK_HPP_ */ diff --git a/ddsrouter_utils/test/unittest/thread_pool/task/TaskId.hpp b/ddsrouter_utils/test/unittest/thread_pool/task/TaskId.hpp new file mode 100644 index 000000000..cf00a97e5 --- /dev/null +++ b/ddsrouter_utils/test/unittest/thread_pool/task/TaskId.hpp @@ -0,0 +1,48 @@ +// Copyright 2022 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file TaskId.hpp + * + * This file contains class Task definition. + */ + +#ifndef _DDSROUTERTHREAD_TASK_TASKID_HPP_ +#define _DDSROUTERTHREAD_TASK_TASKID_HPP_ + +#include + +#include + +namespace eprosima { +namespace ddsrouter { +namespace utils { + +//! Type of the task ID. +using TaskId = unsigned int; + +/** + * @brief Get a new unique task ID. + * + * It uses a random number to generate a new ID. + * + * @return new unique TaskId + */ +DDSROUTER_UTILS_DllAPI TaskId new_unique_task_id(); + +} /* namespace utils */ +} /* namespace ddsrouter */ +} /* namespace eprosima */ + +#endif /* _DDSROUTERTHREAD_TASK_TASKID_HPP_ */ diff --git a/ddsrouter_utils/test/unittest/thread_pool/thread/CustomThread.hpp b/ddsrouter_utils/test/unittest/thread_pool/thread/CustomThread.hpp new file mode 100644 index 000000000..ec068f0f1 --- /dev/null +++ b/ddsrouter_utils/test/unittest/thread_pool/thread/CustomThread.hpp @@ -0,0 +1,45 @@ +// Copyright 2022 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file CustomThread.hpp + * + * This file contains class CustomThread definition. + */ + +#ifndef _DDSROUTERTHREAD__SRC_CPP_THREAD_CUSTOMTHREAD_HPP_ +#define _DDSROUTERTHREAD__SRC_CPP_THREAD_CUSTOMTHREAD_HPP_ + +#include + +namespace eprosima { +namespace ddsrouter { +namespace utils { + +/** + * This class represents a thread that can be executed by a Thread Pool. + * + * @note this first implementation only uses this class as a \c std::thread for simplicity. + * In future implementations, this could be a more complex class. + */ +class CustomThread : public std::thread +{ + using std::thread::thread; +}; + +} /* namespace utils */ +} /* namespace ddsrouter */ +} /* namespace eprosima */ + +#endif /* _DDSROUTERTHREAD__SRC_CPP_THREAD_CUSTOMTHREAD_HPP_ */ diff --git a/ddsrouter_utils/test/unittest/wait/DBQueueWaitHandlerTest.cpp b/ddsrouter_utils/test/unittest/wait/DBQueueWaitHandlerTest.cpp index 9cfda8f17..5cdb715fb 100644 --- a/ddsrouter_utils/test/unittest/wait/DBQueueWaitHandlerTest.cpp +++ b/ddsrouter_utils/test/unittest/wait/DBQueueWaitHandlerTest.cpp @@ -89,7 +89,7 @@ TEST(DBQueueWaitHandlerTest, push_pop_one_thread_string_move) // This lvalue is moved as rvalue, so after moving it will be empty handler.produce(std::move(lvalue)); // TODO uncomment it once DBQueue supports moving values - // ASSERT_EQ(lvalue.size(), 0); + ASSERT_EQ(lvalue.size(), 0); // Getting first value std::string pop_value = handler.consume(); @@ -109,7 +109,7 @@ TEST(DBQueueWaitHandlerTest, push_pop_one_thread_string_copy) std::string lvalue("test_data"); - handler.produce(lvalue); + handler.produce(std::string(lvalue)); // Getting first value std::string pop_value = handler.consume();