diff --git a/runtime/bindings/python/src/openvino/__init__.py b/runtime/bindings/python/src/openvino/__init__.py index 56e7605c38699a..aea37b7c58cff4 100644 --- a/runtime/bindings/python/src/openvino/__init__.py +++ b/runtime/bindings/python/src/openvino/__init__.py @@ -36,7 +36,7 @@ from openvino.pyopenvino import DataPtr from openvino.pyopenvino import TensorDesc from openvino.pyopenvino import get_version -#from openvino.pyopenvino import InferQueue +from openvino.pyopenvino import AsyncInferQueue from openvino.pyopenvino import InferRequest # TODO: move to ie_api? from openvino.pyopenvino import Blob from openvino.pyopenvino import PreProcessInfo @@ -83,5 +83,5 @@ # Patching InferRequest InferRequest.infer = infer InferRequest.start_async = start_async -# Patching InferQueue -#InferQueue.async_infer = async_infer +# Patching AsyncInferQueue +AsyncInferQueue.start_async = start_async diff --git a/runtime/bindings/python/src/openvino/ie_api.py b/runtime/bindings/python/src/openvino/ie_api.py index 9199ccc1a273db..4314398feac618 100644 --- a/runtime/bindings/python/src/openvino/ie_api.py +++ b/runtime/bindings/python/src/openvino/ie_api.py @@ -3,7 +3,7 @@ import numpy as np import copy -from typing import List +from typing import List, Union from openvino.pyopenvino import TBlobFloat32 from openvino.pyopenvino import TBlobFloat64 @@ -17,6 +17,7 @@ from openvino.pyopenvino import TBlobUint8 from openvino.pyopenvino import TensorDesc from openvino.pyopenvino import InferRequest +from openvino.pyopenvino import AsyncInferQueue from openvino.pyopenvino import ExecutableNetwork from openvino.pyopenvino import Tensor @@ -57,7 +58,7 @@ def infer_new_request(exec_net: ExecutableNetwork, inputs: dict = None) -> List[ return [copy.deepcopy(tensor.data) for tensor in res] # flake8: noqa: D102 -def start_async(request: InferRequest, inputs: dict = {}, userdata: dict = None) -> None: # type: ignore +def start_async(request: Union[InferRequest, AsyncInferQueue], inputs: dict = {}, userdata: dict = None) -> None: # type: ignore request._start_async(inputs=normalize_inputs(inputs), userdata=userdata) # flake8: noqa: C901 diff --git a/runtime/bindings/python/src/pyopenvino/core/async_infer_queue.cpp b/runtime/bindings/python/src/pyopenvino/core/async_infer_queue.cpp new file mode 100644 index 00000000000000..8c68958a1ef70a --- /dev/null +++ b/runtime/bindings/python/src/pyopenvino/core/async_infer_queue.cpp @@ -0,0 +1,205 @@ +// Copyright (C) 2021 Intel Corporation +// SPDX-License-Identifier: Apache-2.0 + +#include "pyopenvino/core/async_infer_queue.hpp" + +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include "pyopenvino/core/common.hpp" +#include "pyopenvino/core/infer_request.hpp" + +namespace py = pybind11; + +class AsyncInferQueue { +public: + AsyncInferQueue(std::vector requests, + std::queue idle_handles, + std::vector user_ids) + : _requests(requests), + _idle_handles(idle_handles), + _user_ids(user_ids) { + this->set_default_callbacks(); + } + + ~AsyncInferQueue() { + _requests.clear(); + } + + bool _is_ready() { + py::gil_scoped_release release; + std::unique_lock lock(_mutex); + _cv.wait(lock, [this] { + return !(_idle_handles.empty()); + }); + + return !(_idle_handles.empty()); + } + + size_t get_idle_request_id() { + // Wait for any of _idle_handles + py::gil_scoped_release release; + std::unique_lock lock(_mutex); + _cv.wait(lock, [this] { + return !(_idle_handles.empty()); + }); + + return _idle_handles.front(); + ; + } + + void wait_all() { + // Wait for all requests to return with callback thus updating + // _idle_handles so it matches the size of requests + py::gil_scoped_release release; + std::unique_lock lock(_mutex); + _cv.wait(lock, [this] { + return _idle_handles.size() == _requests.size(); + }); + } + + void set_default_callbacks() { + for (size_t handle = 0; handle < _requests.size(); handle++) { + _requests[handle]._request.set_callback([this, handle /* ... */](std::exception_ptr exception_ptr) { + _requests[handle]._end_time = Time::now(); + // Add idle handle to queue + _idle_handles.push(handle); + // Notify locks in getIdleRequestId() or waitAll() functions + _cv.notify_one(); + }); + } + } + + void set_custom_callbacks(py::function f_callback) { + for (size_t handle = 0; handle < _requests.size(); handle++) { + _requests[handle]._request.set_callback([this, f_callback, handle](std::exception_ptr exception_ptr) { + _requests[handle]._end_time = Time::now(); + try { + if (exception_ptr) { + std::rethrow_exception(exception_ptr); + } + } catch (const std::exception& e) { + throw ov::Exception(e.what()); + } + // Acquire GIL, execute Python function + py::gil_scoped_acquire acquire; + f_callback(_requests[handle], _user_ids[handle]); + // Add idle handle to queue + _idle_handles.push(handle); + // Notify locks in getIdleRequestId() or waitAll() functions + _cv.notify_one(); + }); + } + } + + std::vector _requests; + std::queue _idle_handles; + std::vector _user_ids; // user ID can be any Python object + std::mutex _mutex; + std::condition_variable _cv; +}; + +void regclass_AsyncInferQueue(py::module m) { + py::class_> cls(m, "AsyncInferQueue"); + + cls.def(py::init([](ov::runtime::ExecutableNetwork& net, size_t jobs) { + if (jobs == 0) { + jobs = (size_t)Common::get_optimal_number_of_requests(net); + } + + std::vector requests; + std::queue idle_handles; + std::vector user_ids(jobs); + + for (size_t handle = 0; handle < jobs; handle++) { + auto request = InferRequestWrapper(net.create_infer_request()); + // Get Inputs and Outputs info from executable network + request._inputs = net.inputs(); + request._outputs = net.outputs(); + + requests.push_back(request); + idle_handles.push(handle); + } + + return new AsyncInferQueue(requests, idle_handles, user_ids); + }), + py::arg("network"), + py::arg("jobs") = 0); + + cls.def( + "_start_async", + [](AsyncInferQueue& self, const py::dict inputs, py::object userdata) { + // getIdleRequestId function has an intention to block InferQueue + // until there is at least one idle (free to use) InferRequest + auto handle = self.get_idle_request_id(); + self._idle_handles.pop(); + // Set new inputs label/id from user + self._user_ids[handle] = userdata; + // Update inputs if there are any + if (!inputs.empty()) { + if (py::isinstance(inputs.begin()->first)) { + auto inputs_map = Common::cast_to_tensor_name_map(inputs); + for (auto&& input : inputs_map) { + self._requests[handle]._request.set_tensor(input.first, input.second); + } + } else if (py::isinstance(inputs.begin()->first)) { + auto inputs_map = Common::cast_to_tensor_index_map(inputs); + for (auto&& input : inputs_map) { + self._requests[handle]._request.set_input_tensor(input.first, input.second); + } + } + } + // Now GIL can be released - we are NOT working with Python objects in this block + { + py::gil_scoped_release release; + self._requests[handle]._start_time = Time::now(); + // Start InferRequest in asynchronus mode + self._requests[handle]._request.start_async(); + } + }, + py::arg("inputs"), + py::arg("userdata")); + + cls.def("is_ready", [](AsyncInferQueue& self) { + return self._is_ready(); + }); + + cls.def("wait_all", [](AsyncInferQueue& self) { + return self.wait_all(); + }); + + cls.def("get_idle_request_id", [](AsyncInferQueue& self) { + return self.get_idle_request_id(); + }); + + cls.def("set_callback", [](AsyncInferQueue& self, py::function f_callback) { + self.set_custom_callbacks(f_callback); + }); + + cls.def("__len__", [](AsyncInferQueue& self) { + return self._requests.size(); + }); + + cls.def( + "__iter__", + [](AsyncInferQueue& self) { + return py::make_iterator(self._requests.begin(), self._requests.end()); + }, + py::keep_alive<0, 1>()); /* Keep set alive while iterator is used */ + + cls.def("__getitem__", [](AsyncInferQueue& self, size_t i) { + return self._requests[i]; + }); + + cls.def_property_readonly("userdata", [](AsyncInferQueue& self) { + return self._user_ids; + }); +} diff --git a/runtime/bindings/python/src/pyopenvino/core/ie_infer_queue.hpp b/runtime/bindings/python/src/pyopenvino/core/async_infer_queue.hpp similarity index 77% rename from runtime/bindings/python/src/pyopenvino/core/ie_infer_queue.hpp rename to runtime/bindings/python/src/pyopenvino/core/async_infer_queue.hpp index 23aa72fd072496..3ed1122ba126d7 100644 --- a/runtime/bindings/python/src/pyopenvino/core/ie_infer_queue.hpp +++ b/runtime/bindings/python/src/pyopenvino/core/async_infer_queue.hpp @@ -7,4 +7,4 @@ namespace py = pybind11; -void regclass_InferQueue(py::module m); +void regclass_AsyncInferQueue(py::module m); diff --git a/runtime/bindings/python/src/pyopenvino/core/common.cpp b/runtime/bindings/python/src/pyopenvino/core/common.cpp index b42f4d14419594..8a15aaf6b92598 100644 --- a/runtime/bindings/python/src/pyopenvino/core/common.cpp +++ b/runtime/bindings/python/src/pyopenvino/core/common.cpp @@ -321,13 +321,13 @@ void set_request_blobs(InferenceEngine::InferRequest& request, const py::dict& d } } -uint32_t get_optimal_number_of_requests(const InferenceEngine::ExecutableNetwork& actual) { +uint32_t get_optimal_number_of_requests(const ov::runtime::ExecutableNetwork& actual) { try { - auto parameter_value = actual.GetMetric(METRIC_KEY(SUPPORTED_METRICS)); + auto parameter_value = actual.get_metric(METRIC_KEY(SUPPORTED_METRICS)); auto supported_metrics = parameter_value.as>(); const std::string key = METRIC_KEY(OPTIMAL_NUMBER_OF_INFER_REQUESTS); if (std::find(supported_metrics.begin(), supported_metrics.end(), key) != supported_metrics.end()) { - parameter_value = actual.GetMetric(key); + parameter_value = actual.get_metric(key); if (parameter_value.is()) return parameter_value.as(); else diff --git a/runtime/bindings/python/src/pyopenvino/core/common.hpp b/runtime/bindings/python/src/pyopenvino/core/common.hpp index d4be9bd2a77995..867330640f3cf2 100644 --- a/runtime/bindings/python/src/pyopenvino/core/common.hpp +++ b/runtime/bindings/python/src/pyopenvino/core/common.hpp @@ -15,6 +15,7 @@ #include "Python.h" #include "ie_common.h" #include "openvino/runtime/tensor.hpp" +#include "openvino/runtime/executable_network.hpp" #include "pyopenvino/core/containers.hpp" namespace py = pybind11; @@ -60,5 +61,5 @@ namespace Common void set_request_blobs(InferenceEngine::InferRequest& request, const py::dict& dictonary); - uint32_t get_optimal_number_of_requests(const InferenceEngine::ExecutableNetwork& actual); + uint32_t get_optimal_number_of_requests(const ov::runtime::ExecutableNetwork& actual); }; // namespace Common diff --git a/runtime/bindings/python/src/pyopenvino/core/ie_infer_queue.cpp b/runtime/bindings/python/src/pyopenvino/core/ie_infer_queue.cpp deleted file mode 100644 index e80cd33105f01b..00000000000000 --- a/runtime/bindings/python/src/pyopenvino/core/ie_infer_queue.cpp +++ /dev/null @@ -1,228 +0,0 @@ -// Copyright (C) 2021 Intel Corporation -// SPDX-License-Identifier: Apache-2.0 - -#include "pyopenvino/core/ie_infer_queue.hpp" - -#include -#include -#include - -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include "pyopenvino/core/common.hpp" -#include "pyopenvino/core/infer_request.hpp" - -#define INVALID_ID -1 - -namespace py = pybind11; - -class InferQueue { -public: - InferQueue(std::vector requests, - std::queue idle_handles, - std::vector user_ids) - : _requests(requests), - _idle_handles(idle_handles), - _user_ids(user_ids) { - this->setDefaultCallbacks(); - _last_id = -1; - } - - ~InferQueue() { - _requests.clear(); - } - - bool _is_ready() { - py::gil_scoped_release release; - std::unique_lock lock(_mutex); - _cv.wait(lock, [this] { - return !(_idle_handles.empty()); - }); - - return !(_idle_handles.empty()); - } - - py::dict _getIdleRequestInfo() { - py::gil_scoped_release release; - std::unique_lock lock(_mutex); - _cv.wait(lock, [this] { - return !(_idle_handles.empty()); - }); - - size_t request_id = _idle_handles.front(); - - py::dict request_info = py::dict(); - request_info["id"] = request_id; - // request_info["status"] = true; // TODO - - return request_info; - } - - size_t getIdleRequestId() { - // Wait for any of _idle_handles - py::gil_scoped_release release; - std::unique_lock lock(_mutex); - _cv.wait(lock, [this] { - return !(_idle_handles.empty()); - }); - - size_t idle_request_id = _idle_handles.front(); - _idle_handles.pop(); - - return idle_request_id; - } - - std::vector waitAll() { - // Wait for all requests to return with callback thus updating - // _idle_handles so it matches the size of requests - py::gil_scoped_release release; - std::unique_lock lock(_mutex); - _cv.wait(lock, [this] { - return _idle_handles.size() == _requests.size(); - }); - - std::vector statuses; - - for (size_t handle = 0; handle < _requests.size(); handle++) { - statuses.push_back(_requests[handle]._request.wait_for(std::chrono::milliseconds(0))); - } - - return statuses; - } - - void setDefaultCallbacks() { - for (size_t handle = 0; handle < _requests.size(); handle++) { - _requests[handle]._request.set_callback([this, handle /* ... */](std::exception_ptr exception_ptr) { - _requests[handle]._end_time = Time::now(); - // Add idle handle to queue - _idle_handles.push(handle); - // Notify locks in getIdleRequestId() or waitAll() functions - _cv.notify_one(); - }); - } - } - - void setCustomCallbacks(py::function f_callback) { - for (size_t handle = 0; handle < _requests.size(); handle++) { - _requests[handle]._request.set_callback([this, f_callback, handle](std::exception_ptr exception_ptr) { - _requests[handle]._end_time = Time::now(); - try { - if (exception_ptr) { - std::rethrow_exception(exception_ptr); - } - } catch (const std::exception& e) { - IE_THROW() << "Caught exception: " << e.what(); - } - // Acquire GIL, execute Python function - py::gil_scoped_acquire acquire; - f_callback(_requests[handle], _user_ids[handle]); - // Add idle handle to queue - _idle_handles.push(handle); - // Notify locks in getIdleRequestId() or waitAll() functions - _cv.notify_one(); - }); - } - } - - std::vector _requests; - std::queue _idle_handles; - std::vector _user_ids; // user ID can be any Python object - size_t _last_id; - std::mutex _mutex; - std::condition_variable _cv; -}; - -// void regclass_InferQueue(py::module m) { -// py::class_> cls(m, "InferQueue"); - -// cls.def(py::init([](InferenceEngine::ExecutableNetwork& net, size_t jobs) { -// if (jobs == 0) { -// const InferenceEngine::ExecutableNetwork& _net = net; -// jobs = (size_t)Common::get_optimal_number_of_requests(_net); -// } - -// std::vector requests; -// std::queue idle_handles; -// std::vector user_ids(jobs); - -// for (size_t handle = 0; handle < jobs; handle++) { -// auto request = InferRequestWrapper(net.CreateInferRequest()); -// // Get Inputs and Outputs info from executable network -// request._inputsInfo = net.GetInputsInfo(); -// request._outputsInfo = net.GetOutputsInfo(); - -// requests.push_back(request); -// idle_handles.push(handle); -// } - -// return new InferQueue(requests, idle_handles, user_ids); -// }), -// py::arg("network"), -// py::arg("jobs") = 0); - -// cls.def( -// "_async_infer", -// [](InferQueue& self, const py::dict inputs, py::object userdata) { -// // getIdleRequestId function has an intention to block InferQueue -// // until there is at least one idle (free to use) InferRequest -// auto handle = self.getIdleRequestId(); -// // Set new inputs label/id from user -// self._user_ids[handle] = userdata; -// // Update inputs of picked InferRequest -// if (!inputs.empty()) { -// Common::set_request_blobs(self._requests[handle]._request, inputs); -// } -// // Now GIL can be released - we are NOT working with Python objects in this block -// { -// py::gil_scoped_release release; -// self._requests[handle]._start_time = Time::now(); -// // Start InferRequest in asynchronus mode -// self._requests[handle]._request.start_async(); -// } -// }, -// py::arg("inputs"), -// py::arg("userdata")); - -// cls.def("is_ready", [](InferQueue& self) { -// return self._is_ready(); -// }); - -// cls.def("wait_all", [](InferQueue& self) { -// return self.waitAll(); -// }); - -// cls.def("get_idle_request_info", [](InferQueue& self) { -// return self._getIdleRequestInfo(); -// }); - -// cls.def("set_infer_callback", [](InferQueue& self, py::function f_callback) { -// self.setCustomCallbacks(f_callback); -// }); - -// cls.def("__len__", [](InferQueue& self) { -// return self._requests.size(); -// }); - -// cls.def( -// "__iter__", -// [](InferQueue& self) { -// return py::make_iterator(self._requests.begin(), self._requests.end()); -// }, -// py::keep_alive<0, 1>()); /* Keep set alive while iterator is used */ - -// cls.def("__getitem__", [](InferQueue& self, size_t i) { -// return self._requests[i]; -// }); - -// cls.def_property_readonly("userdata", [](InferQueue& self) { -// return self._user_ids; -// }); -// } diff --git a/runtime/bindings/python/src/pyopenvino/core/infer_request.hpp b/runtime/bindings/python/src/pyopenvino/core/infer_request.hpp index 3ea9859db1fcc8..143df8f200cb73 100644 --- a/runtime/bindings/python/src/pyopenvino/core/infer_request.hpp +++ b/runtime/bindings/python/src/pyopenvino/core/infer_request.hpp @@ -20,11 +20,15 @@ class InferRequestWrapper { InferRequestWrapper(ov::runtime::InferRequest request) : _request(request) { + // AsyncInferQueue uses this constructor - setting callback for computing a latency will be done there } InferRequestWrapper(ov::runtime::InferRequest request, const std::vector>& inputs, const std::vector>& outputs) : _request(request), _inputs(inputs), _outputs(outputs) { + _request.set_callback([this](std::exception_ptr exception_ptr) { + _end_time = Time::now(); + }); } // ~InferRequestWrapper() = default; diff --git a/runtime/bindings/python/src/pyopenvino/pyopenvino.cpp b/runtime/bindings/python/src/pyopenvino/pyopenvino.cpp index d0088018454005..c1529428789db9 100644 --- a/runtime/bindings/python/src/pyopenvino/pyopenvino.cpp +++ b/runtime/bindings/python/src/pyopenvino/pyopenvino.cpp @@ -19,12 +19,12 @@ #if defined(NGRAPH_ONNX_FRONTEND_ENABLE) # include "pyopenvino/graph/onnx_import/onnx_import.hpp" #endif +#include "pyopenvino/core/async_infer_queue.hpp" #include "pyopenvino/core/containers.hpp" #include "pyopenvino/core/core.hpp" #include "pyopenvino/core/executable_network.hpp" #include "pyopenvino/core/ie_blob.hpp" #include "pyopenvino/core/ie_data.hpp" -#include "pyopenvino/core/ie_infer_queue.hpp" #include "pyopenvino/core/ie_input_info.hpp" #include "pyopenvino/core/ie_network.hpp" #include "pyopenvino/core/ie_parameter.hpp" @@ -127,7 +127,7 @@ PYBIND11_MODULE(pyopenvino, m) { regclass_Version(m); regclass_Parameter(m); regclass_InputInfo(m); - // regclass_InferQueue(m); + regclass_AsyncInferQueue(m); regclass_ProfilingInfo(m); regclass_PreProcessInfo(m); diff --git a/runtime/bindings/python/tests/test_inference_engine/test_infer_request.py b/runtime/bindings/python/tests/test_inference_engine/test_infer_request.py index d80907aedebc36..98075c099f2307 100644 --- a/runtime/bindings/python/tests/test_inference_engine/test_infer_request.py +++ b/runtime/bindings/python/tests/test_inference_engine/test_infer_request.py @@ -8,7 +8,7 @@ import time from ..conftest import image_path, model_path -from openvino import Core, Tensor, ProfilingInfo +from openvino import Core, AsyncInferQueue, Tensor, ProfilingInfo is_myriad = os.environ.get("TEST_DEVICE") == "MYRIAD" test_net_xml, test_net_bin = model_path(is_myriad) @@ -35,6 +35,7 @@ def test_get_profiling_info(device): img = read_image() request = exec_net.create_infer_request() request.infer({0: img}) + assert request.latency > 0 prof_info = request.get_profiling_info() soft_max_node = next(node for node in prof_info if node.node_name == "fc_out") assert soft_max_node.node_type == "Softmax" @@ -168,6 +169,7 @@ def callback(callbacks_info): request.start_async({0: img}) for request in requests: request.wait() + assert request.latency > 0 assert callbacks_info["finished"] == jobs @@ -187,3 +189,26 @@ def test_infer_mixed_keys(device): with pytest.raises(TypeError) as e: request.infer({0: tensor, "fc_out": tensor2}) assert "incompatible function arguments!" in str(e.value) + + +def test_infer_queue(device): + jobs = 8 + num_request = 4 + core = Core() + func = core.read_model(test_net_xml, test_net_bin) + exec_net = core.compile_model(func, device) + infer_queue = AsyncInferQueue(exec_net, num_request) + jobs_done = [{"finished": False, "latency": 0} for _ in range(jobs)] + + def callback(request, job_id): + jobs_done[job_id]["finished"] = True + jobs_done[job_id]["latency"] = request.latency + + img = read_image() + infer_queue.set_callback(callback) + assert infer_queue.is_ready + for i in range(jobs): + infer_queue.start_async({"data": img}, i) + infer_queue.wait_all() + assert all(job["finished"] for job in jobs_done) + assert all(job["latency"] > 0 for job in jobs_done)