diff --git a/paddle/fluid/pybind/tensor.cc b/paddle/fluid/pybind/tensor.cc index 52462bd182803f..b2c83177284486 100644 --- a/paddle/fluid/pybind/tensor.cc +++ b/paddle/fluid/pybind/tensor.cc @@ -903,7 +903,7 @@ void BindTensor(pybind11::module &m) { // NOLINT const auto &device_id = paddle::platform::GetXPUCurrentDeviceId(); auto stream = paddle::platform::get_current_stream(device_id); - xpu_wait(stream); + xpu_wait(stream->raw_stream()); int type_idx = static_cast(self.type()); size_t data_size = self.numel() * framework::SizeOfType( diff --git a/paddle/fluid/pybind/xpu_streams_py.cc b/paddle/fluid/pybind/xpu_streams_py.cc index 98a581e0768138..957746605007ab 100644 --- a/paddle/fluid/pybind/xpu_streams_py.cc +++ b/paddle/fluid/pybind/xpu_streams_py.cc @@ -33,19 +33,27 @@ namespace py = pybind11; namespace paddle { namespace platform { #ifdef PADDLE_WITH_XPU -XPUStream get_current_stream(int device_id) { - if (device_id == -1) { - device_id = phi::backends::xpu::GetXPUCurrentDeviceId(); - } +phi::XPUStreamHandle *get_current_stream(int device_id) { auto place = phi::XPUPlace(device_id); auto *dev_ctx = static_cast( phi::DeviceContextPool::Instance().Get(place)); dev_ctx->Wait(); - return dev_ctx->stream(); + return dev_ctx->get_current_stream_handle(); +} + +phi::XPUStreamHandle *set_current_stream(int idx) { + int device_id = phi::backends::xpu::GetXPUCurrentDeviceId(); + auto original_stream = get_current_stream(device_id); + auto place = phi::XPUPlace(device_id); + auto *dev_ctx = static_cast( + phi::DeviceContextPool::Instance().Get(place)); + dev_ctx->SetCurrentStream(idx); + return original_stream; } #endif } // namespace platform + namespace pybind { void BindXpuStream(py::module *m_ptr) { auto &m = *m_ptr; @@ -69,7 +77,7 @@ void BindXpuStream(py::module *m_ptr) { #endif }); m.def( - "_get_current_stream", + "_xpu_get_current_stream", [](int device_id) { #ifdef PADDLE_WITH_XPU if (device_id == -1) { @@ -79,7 +87,19 @@ void BindXpuStream(py::module *m_ptr) { return platform::get_current_stream(device_id); #else PADDLE_THROW( - common::errors::Unavailable("Paddle is not compiled with CUDA. " + common::errors::Unavailable("Paddle is not compiled with XPU. " + "Cannot visit device synchronize.")); +#endif + }, + py::return_value_policy::reference); + m.def( + "_xpu_set_current_stream", + [](int stream_id) { +#ifdef PADDLE_WITH_XPU + return platform::set_current_stream(stream_id); +#else + PADDLE_THROW( + common::errors::Unavailable("Paddle is not compiled with XPU. " "Cannot visit device synchronize.")); #endif }, @@ -100,12 +120,167 @@ void BindXpuStream(py::module *m_ptr) { #endif }); + py::class_(m, "XPUStream", R"DOC( + The handle of the XPU stream. + + Parameters: + device(paddle.XPUPlace()|int|None, optional): The device which wanted to allocate the stream. + If device is None or negative integer, device will be the current device. + If device is positive integer, it must less than the device count. Default: None. + + Examples: + .. code-block:: python + + >>> # doctest: +REQUIRES(env:XPU) + >>> import paddle + >>> s1 = paddle.device.xpu.Stream(paddle.XPUPlace(0)) + >>> s2 = paddle.device.xpu.Stream(0) + >>> s3 = paddle.device.xpu.Stream() + + )DOC") +#ifdef PADDLE_WITH_XPU + .def_property_readonly( + "xpu_stream", + [](phi::XPUStreamHandle &self) { + return reinterpret_cast(self.raw_stream()); + }) + .def("wait_stream", + [](phi::XPUStreamHandle &self, phi::XPUStreamHandle &other) { + auto *dev_ctx = phi::get_xpu_context(); + dev_ctx->StreamWaitStreamInPool(self.id(), other.id()); + }) + .def("wait_event", + [](phi::XPUStreamHandle &self, phi::XPUEventHandle &other) { + self.wait_event(other.get_event()); + }) + .def("query", + [](phi::XPUStreamHandle &self) { + PADDLE_THROW(common::errors::Unavailable( + "Query function for XPUStream is not supported now")); + }) + .def("record_event", + [](phi::XPUStreamHandle &self, phi::XPUEventHandle *event) { + if (event == nullptr) { + event = new phi::XPUEventHandle(); + } + self.record_event(event->get_event()); + return event; + }) + .def( + "synchronize", + [](phi::XPUStreamHandle &self) { self.synchronize(); }, + R"DOC( + Waits for stream tasks to complete. + + Examples: + .. code-block:: python + + >>> # doctest: +REQUIRES(env:XPU) + >>> import paddle + >>> s = paddle.device.xpu.Stream(paddle.XPUPlace(0), 1) + >>> s.synchronize() + + )DOC") + .def_property_readonly( + "place", + [](phi::XPUStreamHandle &self) { + return phi::XPUPlace(platform::GetXPUCurrentDeviceId()); + }) + .def_property_readonly( + "idx", [](phi::XPUStreamHandle &self) { return self.id(); }) +#endif + + .def("__init__", + [](phi::XPUStreamHandle &self) { +#ifdef PADDLE_WITH_XPU + new (&self) phi::XPUStreamHandle(); + self.Init(); +#else + PADDLE_THROW(common::errors::Unavailable( + "Class XPUStream can only be initialized on the XPU " + "platform.")); +#endif + }) + .def( + "__init__", + [](phi::XPUStreamHandle &self, phi::XPUPlace *place) { +#ifdef PADDLE_WITH_XPU + if (place == nullptr) { + int curr_device_id = platform::GetXPUCurrentDeviceId(); + auto place_tmp = phi::XPUPlace(curr_device_id); + new (&self) phi::XPUStreamHandle(place_tmp); + } else { + new (&self) phi::XPUStreamHandle(*place); + } +#else + PADDLE_THROW(common::errors::Unavailable( + "Class XPUStream can only be initialized on the XPU " + "platform.")); +#endif + }, + py::arg("device") = nullptr) + .def( + "__init__", + [](phi::XPUStreamHandle &self, int device) { +#ifdef PADDLE_WITH_XPU + if (device < 0) { + device = platform::GetXPUCurrentDeviceId(); + } + auto place_tmp = phi::XPUPlace(device); + new (&self) phi::XPUStreamHandle(place_tmp); +#else + PADDLE_THROW(common::errors::Unavailable( + "Class XPUStream can only be initialized on the XPU " + "platform.")); +#endif + }, + py::arg("device") = -1); + py::class_(m, "XPUEvent", R"DOC( + The handle of the XPU event. + + Examples: + .. code-block:: python + + >>> # doctest: +REQUIRES(env:XPU) + >>> import paddle + >>> event = paddle.device.xpu.Event() + + )DOC") +#ifdef PADDLE_WITH_XPU + .def( + "record", + [](phi::XPUEventHandle &self, phi::XPUStreamHandle *stream) { + if (stream == nullptr) { + auto *dev_ctx = phi::get_xpu_context(); + auto stream_handle = dev_ctx->get_current_stream_handle(); + self.record(stream_handle->raw_stream()); + } else { + self.record(stream->raw_stream()); + } + }, + py::arg("stream") = nullptr) + .def("query", [](phi::XPUEventHandle &self) { return self.query(); }) + .def("elapsed_time", + [](phi::XPUEventHandle &self) { + PADDLE_THROW(common::errors::Unavailable( + "XPUEvent elapsed_time is not supported now")); + }) + .def("synchronize", [](phi::XPUEventHandle &self) { self.synchronize(); }) +#endif + .def("__init__", [](phi::XPUEventHandle &self) { +#ifdef PADDLE_WITH_XPU + new (&self) phi::XPUEventHandle(); +#else + PADDLE_THROW(common::errors::Unavailable( + "Class XPUEvent can only be initialized on the XPU platform.")); +#endif + }); #ifdef PADDLE_WITH_XPU - py::class_(m, "XPUStream", R"DOC( - The handle of the CUDA stream. + py::class_(m, "XPUCUDAStream", R"DOC( + The handle of the XPU stream. Parameters: - device(paddle.CUDAPlace()|int|None, optional): The device which wanted to allocate the stream. + device(paddle.XPUPlace()|int|None, optional): The device which wanted to allocate the stream. If device is None or negative integer, device will be the current device. If device is positive integer, it must less than the device count. Default: None. priority(int|None, optional): The priority of stream. The priority can be 1(high) or 2(normal). @@ -114,16 +289,16 @@ void BindXpuStream(py::module *m_ptr) { Examples: .. code-block:: python - >>> # doctest: +REQUIRES(env:GPU) + >>> # doctest: +REQUIRES(env:XPU) >>> import paddle - >>> s1 = paddle.device.cuda.Stream(paddle.CUDAPlace(0), 1) - >>> s2 = paddle.device.cuda.Stream(0, 1) - >>> s3 = paddle.device.cuda.Stream() + >>> s1 = paddle.device.xpu.Stream(paddle.XPUPlace(0), 1) + >>> s2 = paddle.device.xpu.Stream(0, 1) + >>> s3 = paddle.device.xpu.Stream() )DOC") .def( "synchronize", - [](XPUStream &self) { xpu_wait(self); }, + [](phi::XPUCUDAStream &self) { self.Synchronize(); }, R"DOC( Waits for stream tasks to complete. @@ -135,7 +310,25 @@ void BindXpuStream(py::module *m_ptr) { >>> s = paddle.device.cuda.Stream(paddle.CUDAPlace(0), 1) >>> s.synchronize() - )DOC"); + )DOC") + .def("__init__", + [](phi::XPUCUDAStream &self, phi::XPUPlace *place, int priority) { + if (priority != 1 && priority != 2) { + PADDLE_THROW(common::errors::InvalidArgument( + "Priority should be 1(high) or 2(normal) ")); + } + auto stream_flag = + phi::XPUCUDAStream::StreamFlag::kStreamNonBlocking; + if (place == nullptr) { + int curr_device_id = platform::GetXPUCurrentDeviceId(); + auto place_tmp = phi::XPUPlace(curr_device_id); + new (&self) + phi::XPUCUDAStream(place_tmp, priority - 2, stream_flag); + } else { + new (&self) + phi::XPUCUDAStream(*place, priority - 2, stream_flag); + } + }); #endif } } // namespace pybind diff --git a/paddle/fluid/pybind/xpu_streams_py.h b/paddle/fluid/pybind/xpu_streams_py.h index a146cf6ba3419e..a1f56b879d1cd9 100644 --- a/paddle/fluid/pybind/xpu_streams_py.h +++ b/paddle/fluid/pybind/xpu_streams_py.h @@ -18,12 +18,16 @@ #include "pybind11/stl.h" #ifdef PADDLE_WITH_XPU +#include "paddle/phi/backends/xpu/xpu_context.h" #include "paddle/phi/core/xpu_cuda_stream.h" #include "xpu/runtime.h" #include "xpu/runtime_ex.h" + #else namespace phi { class XPUCUDAStream {}; +class XPUStreamHandle {}; +class XPUEventHandle {}; } // namespace phi #endif @@ -32,7 +36,8 @@ namespace py = pybind11; namespace paddle { namespace platform { #ifdef PADDLE_WITH_XPU -XPUStream get_current_stream(int device_id = -1); +phi::XPUStreamHandle* get_current_stream(int device_id = -1); +phi::XPUStreamHandle* set_current_stream(int idx); #endif } // namespace platform namespace pybind { diff --git a/paddle/phi/api/include/tensor.h b/paddle/phi/api/include/tensor.h index b9e919c52b11b2..93bed19b2bc29d 100644 --- a/paddle/phi/api/include/tensor.h +++ b/paddle/phi/api/include/tensor.h @@ -29,6 +29,11 @@ using gpuStream_t = cudaStream_t; using gpuStream_t = hipStream_t; #endif +#ifdef PADDLE_WITH_XPU +#include "xpu/runtime.h" +#include "xpu/runtime_ex.h" +#endif + #ifdef PADDLE_WITH_CUSTOM_DEVICE #include "paddle/phi/backends/stream.h" #endif @@ -434,6 +439,10 @@ class PADDLE_API Tensor final { * @return gpuStream_t */ gpuStream_t stream() const; +#elif defined(PADDLE_WITH_XPU) + + void record_stream(XPUStream stream) const; + #elif defined(PADDLE_WITH_CUSTOM_DEVICE) /** * @brief Get the stream where the tensor is currently located diff --git a/paddle/phi/api/lib/tensor.cc b/paddle/phi/api/lib/tensor.cc index 98c632a511cd74..0e6af802094e2d 100644 --- a/paddle/phi/api/lib/tensor.cc +++ b/paddle/phi/api/lib/tensor.cc @@ -40,6 +40,8 @@ limitations under the License. */ #include "paddle/phi/core/tensor_meta.h" #include "paddle/phi/core/tensor_utils.h" +#include "paddle/phi/core/memory/malloc.h" + namespace paddle { using DeviceContextPool = experimental::DeviceContextPool; @@ -397,6 +399,14 @@ Tensor Tensor::slice(int64_t begin_idx, int64_t end_idx) const { const std::shared_ptr &Tensor::impl() const { return impl_; } +#ifdef PADDLE_WITH_XPU + +void Tensor::record_stream(XPUStream stream) const { + paddle::memory::RecordStream( + std::dynamic_pointer_cast(impl_)->Holder(), stream); +} + +#endif void Tensor::set_impl(const std::shared_ptr &impl) { impl_ = impl; } diff --git a/paddle/phi/backends/xpu/xpu_context.cc b/paddle/phi/backends/xpu/xpu_context.cc index 800aefdc91ffa4..1c5d26d5f548c4 100644 --- a/paddle/phi/backends/xpu/xpu_context.cc +++ b/paddle/phi/backends/xpu/xpu_context.cc @@ -13,6 +13,7 @@ // limitations under the License. #include "paddle/phi/backends/xpu/xpu_context.h" +#include "paddle/phi/backends/context_pool.h" #ifdef PADDLE_WITH_XPU #include @@ -100,8 +101,8 @@ struct XPUContext::Impl { } // Set external stream for context - void SetStream(void* stream) { - if (context_->xpu_stream != nullptr && stream_owned_) { + void SetStream(void* stream, bool clear = true) { + if (clear && context_->xpu_stream != nullptr && stream_owned_) { xpu_stream_destroy(context_->xpu_stream); } stream_owned_ = false; @@ -343,7 +344,21 @@ XPUContext::XPUContext() : DeviceContext() { } else { impls_.push_back(std::make_unique()); impls_[0]->Init(get_gm_size(0), get_l3_size(0)); + stream_pool.push_back(impls_[0]->context_->get_stream()); + idle_stream_flags.push_back(false); + current_stream_handle = + XPUStreamHandle(impls_[0]->context_->get_stream(), 0); + if (std::getenv("XPU_DEFAULT_STREAM_NUMBER") != nullptr) { + int default_num_stream = atoi(std::getenv("XPU_DEFAULT_STREAM_NUMBER")); + for (int i = 0; i < default_num_stream; i++) { + XPUStream s; + PADDLE_ENFORCE_XPU_SUCCESS(xpu_stream_create(&s)); + stream_pool.push_back(s); + idle_stream_flags.push_back(true); + } + } } + current_stream_idx = 0; } XPUContext::XPUContext(const XPUPlace& place, bool is_comm_context) @@ -362,10 +377,18 @@ XPUContext::XPUContext(const XPUPlace& place, bool is_comm_context) impls_.push_back(std::make_unique(place)); impls_[i]->Init(get_gm_size(i), get_l3_size(i)); } + stream_pool.push_back(impls_[0]->context_->get_stream()); + idle_stream_flags.push_back(false); } else { impls_.push_back(std::make_unique(place)); impls_[0]->Init(get_gm_size(0), get_l3_size(0)); + stream_pool.push_back(impls_[0]->context_->get_stream()); + idle_stream_flags.push_back(false); + current_stream_handle = + XPUStreamHandle(impls_[0]->context_->get_stream(), 0); } + + current_stream_idx = 0; } XPUContext::~XPUContext() = default; @@ -380,6 +403,9 @@ XPUStream XPUContext::stream(int i) const { void XPUContext::SetStream(void* stream, int i) { CheckValidStreamId(i); impls_[i]->SetStream(stream); + if (i == 0) { + current_stream_handle.set_stream(static_cast(stream)); + } } void XPUContext::CheckValidStreamId(int i) const { @@ -397,6 +423,21 @@ void XPUContext::CheckValidStreamId(int i) const { i)); } +void XPUContext::CheckValidIdxInRange(int i, int i_max) const { + PADDLE_ENFORCE_GE( + i, + 0, + errors::InvalidArgument( + "The stream index must be greater than or equal to 0.")); + PADDLE_ENFORCE_LT( + i, + i_max, + errors::InvalidArgument("The stream index should be less than the number " + "of stream used (%d), but got %d", + i_max, + i)); +} + void XPUContext::SetXpuVersion(int version) { impls_[0]->xpu_version_ = static_cast(version); } @@ -462,26 +503,251 @@ void XPUContext::StreamWaitEvent(XPUEvent event, int s) const { void XPUContext::StreamWaitStream(int wait_stream, int record_stream) const { CheckValidStreamId(wait_stream); CheckValidStreamId(record_stream); - XPUEvent event; - int r = xpu_event_create(&event); - PADDLE_ENFORCE_XRE_SUCCESS(r); + XPUEvent event = XPUEventPool::Instance().CreateEventFromPool(); RecordEvent(event, record_stream); StreamWaitEvent(event, wait_stream); - r = xpu_event_destroy(event); - PADDLE_ENFORCE_XRE_SUCCESS(r); - impls_[record_stream]->ClearStashedMemory(); } int64_t XPUContext::GetStreamNum() const { return impls_.size(); } +int XPUContext::SetCurrentStream(int idx) { + int prev_stream_idx = current_stream_idx; + if (prev_stream_idx != idx) { + impls_[0]->SetStream(stream_pool[idx]); + current_stream_handle.set_stream(stream_pool[idx]); + current_stream_idx = idx; + idle_stream_flags[prev_stream_idx] = true; + idle_stream_flags[current_stream_idx] = false; + } + return prev_stream_idx; +} + +void XPUContext::StreamWaitStreamInPool(int wait_stream, + int record_stream) const { + CheckValidIdxInRange(wait_stream, stream_pool.size()); + CheckValidIdxInRange(record_stream, stream_pool.size()); + XPUEvent event = XPUEventPool::Instance().CreateEventFromPool(); + int r = xpu_event_record(event, stream_pool[record_stream]); + PADDLE_ENFORCE_XRE_SUCCESS(r); + r = xpu_stream_wait_event(stream_pool[wait_stream], event); + PADDLE_ENFORCE_XRE_SUCCESS(r); +} + +void XPUContext::StreamWaitEventInPool(int wait_stream, XPUEvent event) const { + CheckValidIdxInRange(wait_stream, stream_pool.size()); + int r = xpu_stream_wait_event(stream_pool[wait_stream], event); + PADDLE_ENFORCE_XRE_SUCCESS(r); +} + +int XPUContext::get_idle_stream() { + bool found_idle_stream = false; + int stream_idx = 0; + int num_streams = idle_stream_flags.size(); + for (; stream_idx < num_streams; stream_idx++) { + if (idle_stream_flags[stream_idx]) { + found_idle_stream = true; + break; + } + } + if (found_idle_stream) { + idle_stream_flags[stream_idx] = false; + return stream_idx; + } else { + add_stream_to_pool(); + return stream_pool.size() - 1; + } +} + +void XPUContext::add_stream_to_pool() { + XPUStream s; + PADDLE_ENFORCE_XPU_SUCCESS(xpu_stream_create(&s)); + stream_pool.push_back(s); + idle_stream_flags.push_back(false); +} + +XPUStream XPUContext::get_stream_from_pool(int idx) const { + PADDLE_ENFORCE_GE( + idx, + 0, + errors::InvalidArgument( + "The stream index must be greater than or equal to 0.")); + PADDLE_ENFORCE_LT( + idx, + stream_pool.size(), + errors::InvalidArgument("The stream index should be less than the number " + "of stream used (%d), but got %d", + stream_pool.size(), + idx)); + return stream_pool[idx]; +} + +int XPUContext::get_current_stream_idx() { return current_stream_idx; } void XPUContext::AddStashedMemory(int stream, const DenseTensor& tensor) { CheckValidStreamId(stream); impls_[stream]->AddStashedMemory(tensor); } +XPUStream XPUContext::get_current_stream() { return impls_[0]->stream(); } + +XPUStreamHandle* XPUContext::get_current_stream_handle() { + if (impls_[0]->context_->get_stream() == nullptr) { + XPUStream s; + PADDLE_ENFORCE_XPU_SUCCESS(xpu_stream_create(&s)); + impls_[0]->SetStream(s); + stream_pool[current_stream_idx] = s; + current_stream_handle.set_stream(s); + } + return ¤t_stream_handle; +} + void XPUContext::Init() { impls_[0]->Init(); } +XPUContext* get_xpu_context(int device_id) { + auto place_tmp = phi::XPUPlace( + device_id > -1 ? device_id : phi::backends::xpu::GetXPUCurrentDeviceId()); + phi::XPUContext* dev_ctx = static_cast( + phi::DeviceContextPool::Instance().Get(place_tmp)); + + return dev_ctx; +} + +XPUStreamHandle::XPUStreamHandle() {} + +XPUStreamHandle::XPUStreamHandle(const int idx) { + auto* dev_ctx = phi::get_xpu_context(); + stream_id = idx; + stream = dev_ctx->get_stream_from_pool(stream_id); +} + +XPUStreamHandle::XPUStreamHandle(const phi::XPUPlace& place) { + phi::XPUContext* dev_ctx = static_cast( + phi::DeviceContextPool::Instance().Get(place)); + stream_id = dev_ctx->get_idle_stream(); + stream = dev_ctx->get_stream_from_pool(stream_id); +} + +XPUStreamHandle::XPUStreamHandle(const XPUStream xpu_stream, const int id) { + stream = xpu_stream; + stream_id = id; +} + +void XPUStreamHandle::Init() { + auto* dev_ctx = phi::get_xpu_context(); + stream_id = dev_ctx->get_idle_stream(); + stream = dev_ctx->get_stream_from_pool(stream_id); +} + +void XPUStreamHandle::wait_event(XPUEvent event) const { + int r = xpu_stream_wait_event(stream, event); + PADDLE_ENFORCE_XRE_SUCCESS(r); +} + +void XPUStreamHandle::synchronize() const { + int r = xpu_wait(stream); + PADDLE_ENFORCE_XRE_SUCCESS(r); +} + +void XPUStreamHandle::set_stream(XPUStream stream_) { stream = stream_; } + +void XPUStreamHandle::record_event(XPUEvent event) const { + int r = xpu_event_record(event, stream); + PADDLE_ENFORCE_XRE_SUCCESS(r); +} + +XPUStreamHandle get_current_stream_handle(int device_id) { + auto* dev_ctx = get_xpu_context(device_id); + return *dev_ctx->get_current_stream_handle(); +} + +XPUStreamHandle get_stream_handle(int device_id) { + auto* dev_ctx = get_xpu_context(device_id); + return XPUStreamHandle(dev_ctx->get_idle_stream()); +} + +void set_current_stream(XPUStreamHandle* s) { + auto* dev_ctx = get_xpu_context(); + dev_ctx->SetStream(s->raw_stream(), 0); +} + +XPUEventPool& XPUEventPool::Instance() { + static XPUEventPool pool; + return pool; +} + +XPUEventPool::~XPUEventPool() { + const auto& DestroyEvent = [](XPUEvent event) { + int r = xpu_event_destroy(event); + PADDLE_ENFORCE_XRE_SUCCESS(r); + }; + const auto& CheckComplishAndDestroy = [&](XPUEvent event) -> bool { + if (xpu_event_query(event) == XPU_SUCCESS) { + DestroyEvent(event); + return true; + } else { + return false; + } + }; + std::unique_lock lock(mtx_); + while (!incomplished_events_.empty()) { + XPUEvent event = incomplished_events_.front(); + if (!CheckComplishAndDestroy(event)) { + LOG(ERROR) << "failed on destroying event when destroying event pool."; + } + incomplished_events_.pop(); + } +} + +XPUEvent XPUEventPool::CreateEventFromPool() { + std::unique_lock lock(mtx_); + + const auto& CreateNewEvent = [&]() -> XPUEvent { + XPUEvent new_event; + PADDLE_ENFORCE_XPU_SUCCESS(xpu_event_create(&new_event)); + incomplished_events_.push(new_event); + return new_event; + }; + + const auto& CreateNewOrReuseEvent = [&]() -> XPUEvent { + XPUEvent front_event = incomplished_events_.front(); + incomplished_events_.pop(); + incomplished_events_.push(front_event); + if (xpu_event_query(front_event) == XPU_SUCCESS) { + return front_event; + } + return CreateNewEvent(); + }; + + if (incomplished_events_.empty()) { + return CreateNewEvent(); + } + return CreateNewOrReuseEvent(); +} + +XPUEventHandle::XPUEventHandle() { + event_ = XPUEventPool::Instance().CreateEventFromPool(); +} +XPUEventHandle::XPUEventHandle(XPUStream stream) { + event_ = XPUEventPool::Instance().CreateEventFromPool(); + PADDLE_ENFORCE_XRE_SUCCESS(xpu_event_record(event_, stream)); +} + +void XPUEventHandle::record(XPUStream stream) { + PADDLE_ENFORCE_XRE_SUCCESS(xpu_event_query(event_)); + PADDLE_ENFORCE_XRE_SUCCESS(xpu_event_record(event_, stream)); +} + +bool XPUEventHandle::query() { + int result = xpu_event_query(event_); + if (result == XPU_SUCCESS) { + return true; + } + return false; +} + +void XPUEventHandle::synchronize() { + PADDLE_ENFORCE_XRE_SUCCESS(xpu_event_wait(event_)); +} #if defined(PADDLE_WITH_XPU) XPUPinnedContext::XPUPinnedContext() { eigen_device_ = std::make_unique(); diff --git a/paddle/phi/backends/xpu/xpu_context.h b/paddle/phi/backends/xpu/xpu_context.h index 2a9823d6a8de88..daa4cdd05e3d69 100644 --- a/paddle/phi/backends/xpu/xpu_context.h +++ b/paddle/phi/backends/xpu/xpu_context.h @@ -17,14 +17,14 @@ limitations under the License. */ #ifdef PADDLE_WITH_XPU #include +#include +#include #include - #include "paddle/phi/backends/xpu/forwards.h" #include "paddle/phi/backends/xpu/xpu_header.h" #include "paddle/phi/backends/xpu/xpu_info.h" #include "paddle/phi/common/place.h" #include "paddle/phi/core/device_context.h" - #ifdef PADDLE_WITH_XPU #include "paddle/phi/core/xpu_cuda_stream.h" #endif @@ -39,6 +39,26 @@ namespace phi { #ifdef PADDLE_WITH_XPU class XPUCUDAStream; +class XPUStreamHandle { + public: + XPUStreamHandle(); + explicit XPUStreamHandle(const int idx); + explicit XPUStreamHandle(const XPUPlace& place); + explicit XPUStreamHandle(const XPUStream xpu_stream, const int id); + + void Init(); + + int id() const { return stream_id; } + XPUStream raw_stream() const { return stream; } + void wait_event(XPUEvent event) const; + void synchronize() const; + void record_event(XPUEvent event) const; + void set_stream(XPUStream stream); + + private: + XPUStream stream; + int stream_id; +}; #endif class DenseTensor; @@ -110,16 +130,65 @@ class XPUContext : public DeviceContext, Eigen::DefaultDevice* eigen_device() const { return nullptr; } XPUStream stream(int i = 0) const; - + XPUStream get_stream_from_pool(int i = 0) const; + XPUStream get_current_stream(); static const char* name() { return "XPUContext"; } + int SetCurrentStream(int idx); + void StreamWaitStreamInPool(int wait_stream, int record_stream) const; + void StreamWaitEventInPool(int wait_stream, XPUEvent event) const; + int get_idle_stream(); + int get_current_stream_idx(); + XPUStreamHandle* get_current_stream_handle(); private: struct Impl; + XPUStreamHandle current_stream_handle; std::vector> impls_; + std::vector idle_stream_flags; + std::vector stream_pool; + int current_stream_idx; + void add_stream_to_pool(); + int get_stream_pool_size() const { return stream_pool.size(); } void CheckValidStreamId(int i) const; + void CheckValidIdxInRange(int idx, int range) const; +}; + +XPUContext* get_xpu_context(int device_id = -1); + +class XPUEventPool { + public: + XPUEventPool() = default; + XPUEventPool(const XPUEventPool&) = delete; + XPUEventPool(XPUEventPool&&) = delete; + ~XPUEventPool(); + + XPUEvent CreateEventFromPool(); + + static XPUEventPool& Instance(); + + private: + std::queue incomplished_events_; + std::mutex mtx_; }; +class XPUEventHandle { + public: + XPUEventHandle(); + explicit XPUEventHandle(XPUStream stream); + void record(XPUStream stream); + bool query(); + void synchronize(); + XPUEvent get_event() const { return event_; } + + private: + XPUEvent event_; +}; + +XPUStreamHandle get_current_stream_handle(int device_id = -1); +XPUStreamHandle get_stream_handle(int device_id = -1); +void set_current_stream(XPUStreamHandle* s); + // KPS (Kernel PrimitiveS API) needs to exist as a kind of backend, // because we want to implement a KPS-based kernel and make it run // on GPU and XPU at the same time, so we need KPSContext when registering diff --git a/paddle/phi/core/memory/allocation/allocator_facade.cc b/paddle/phi/core/memory/allocation/allocator_facade.cc index 65daaa257c2c5f..5123c6b33b6685 100644 --- a/paddle/phi/core/memory/allocation/allocator_facade.cc +++ b/paddle/phi/core/memory/allocation/allocator_facade.cc @@ -1940,6 +1940,14 @@ void AllocatorFacade::SetDefaultStream(const phi::XPUPlace& place, } #endif +#ifdef PADDLE_WITH_XPU + +bool AllocatorFacade::RecordStream(std::shared_ptr allocation, + XPUStream stream) { + return GetPrivate()->RecordStream(allocation, stream); +} +#endif + #ifdef PADDLE_WITH_CUSTOM_DEVICE uint64_t AllocatorFacade::Release(const phi::CustomPlace& place, phi::stream::stream_t stream) { diff --git a/paddle/phi/core/memory/allocation/allocator_facade.h b/paddle/phi/core/memory/allocation/allocator_facade.h index e46a6f9b13ef52..4b24dfcf57af4a 100644 --- a/paddle/phi/core/memory/allocation/allocator_facade.h +++ b/paddle/phi/core/memory/allocation/allocator_facade.h @@ -97,6 +97,7 @@ class AllocatorFacade { #elif defined(PADDLE_WITH_XPU) TEST_API const std::shared_ptr& GetAllocator( const phi::Place& place, XPUStream stream); + bool RecordStream(std::shared_ptr allocation, XPUStream stream); void SetDefaultStream(const phi::XPUPlace& place, XPUStream stream); #endif diff --git a/paddle/phi/core/memory/allocation/stream_safe_xpu_allocator.cc b/paddle/phi/core/memory/allocation/stream_safe_xpu_allocator.cc index fd30d61d47593e..8cd5471eb0e014 100644 --- a/paddle/phi/core/memory/allocation/stream_safe_xpu_allocator.cc +++ b/paddle/phi/core/memory/allocation/stream_safe_xpu_allocator.cc @@ -38,6 +38,10 @@ StreamSafeXPUAllocation::StreamSafeXPUAllocation( bool StreamSafeXPUAllocation::RecordStream(XPUStream stream) { VLOG(8) << "Try record stream " << stream << " for address " << ptr(); if (stream == owning_stream_) { + VLOG(8) << "stream " << stream << " is the same as owning stream " + << owning_stream_; + VLOG(8) << "Skip recording the same stream " << stream << " for address " + << ptr(); return false; } @@ -57,9 +61,13 @@ bool StreamSafeXPUAllocation::CanBeFreed() { it != outstanding_event_map_.end(); ++it) { XPUEvent& event = it->second; - - PADDLE_ENFORCE_XRE_SUCCESS(xpu_event_destroy(event)); - VLOG(8) << "Destroy event " << event; + if (xpu_event_query(event) == XPU_SUCCESS) { + PADDLE_ENFORCE_XRE_SUCCESS(xpu_event_destroy(event)); + VLOG(8) << "Destroy event " << event; + } else { + outstanding_event_map_.erase(outstanding_event_map_.begin(), it); + return false; + } } return true; } diff --git a/paddle/phi/core/memory/malloc.cc b/paddle/phi/core/memory/malloc.cc index 050a3d2855189b..304a835a5b1b71 100644 --- a/paddle/phi/core/memory/malloc.cc +++ b/paddle/phi/core/memory/malloc.cc @@ -76,6 +76,13 @@ gpuStream_t GetStream(const std::shared_ptr& allocation) { #endif +#ifdef PADDLE_WITH_XPU +bool RecordStream(std::shared_ptr allocation, XPUStream stream) { + return allocation::AllocatorFacade::Instance().RecordStream(allocation, + stream); +} +#endif + #ifdef PADDLE_WITH_CUSTOM_DEVICE uint64_t Release(const phi::CustomPlace& place, phi::stream::stream_t stream) { return allocation::AllocatorFacade::Instance().Release(place, stream); diff --git a/paddle/phi/core/memory/malloc.h b/paddle/phi/core/memory/malloc.h index eea770696608a2..0d064e28b8a119 100644 --- a/paddle/phi/core/memory/malloc.h +++ b/paddle/phi/core/memory/malloc.h @@ -22,6 +22,11 @@ limitations under the License. */ #include "paddle/phi/core/memory/allocation/allocator.h" #include "paddle/phi/core/stream.h" +#ifdef PADDLE_WITH_XPU +#include "xpu/runtime.h" +#include "xpu/runtime_ex.h" +#endif + namespace paddle { namespace memory { @@ -58,6 +63,11 @@ void EraseStream(std::shared_ptr allocation, gpuStream_t stream); gpuStream_t GetStream(const std::shared_ptr& allocation); #endif + +#ifdef PADDLE_WITH_XPU +bool RecordStream(std::shared_ptr allocation, XPUStream stream); +#endif + #ifdef PADDLE_WITH_CUSTOM_DEVICE extern uint64_t Release(const phi::CustomPlace& place, phi::stream::stream_t stream); diff --git a/paddle/phi/core/platform/device/xpu/xpu_resource_pool.cc b/paddle/phi/core/platform/device/xpu/xpu_resource_pool.cc index 8104cbe80514b1..2956043e9bd18c 100644 --- a/paddle/phi/core/platform/device/xpu/xpu_resource_pool.cc +++ b/paddle/phi/core/platform/device/xpu/xpu_resource_pool.cc @@ -71,7 +71,12 @@ XpuEventResourcePool::XpuEventResourcePool() { auto deleter = [dev_idx](xpuEventHandle event) { phi::backends::xpu::XPUDeviceGuard guard(dev_idx); - xpu_event_destroy(event); + if (xpu_event_query(event) == XPU_SUCCESS) { + xpu_event_destroy(event); + } else { + PADDLE_THROW(phi::errors::InvalidArgument( + "event not finished, can not destroy")); + } }; pool_.emplace_back(ResourcePool::Create(creator, deleter)); diff --git a/python/paddle/base/core.py b/python/paddle/base/core.py index 8ab41418202607..1a5699c3245522 100644 --- a/python/paddle/base/core.py +++ b/python/paddle/base/core.py @@ -325,6 +325,8 @@ def to_list(s): _switch_tracer, _test_enforce_gpu_success, _xpu_device_synchronize, + _xpu_get_current_stream, + _xpu_set_current_stream, ) # isort: off diff --git a/python/paddle/device/__init__.py b/python/paddle/device/__init__.py index 4da42ecc058626..91c684bff0e4a1 100644 --- a/python/paddle/device/__init__.py +++ b/python/paddle/device/__init__.py @@ -43,8 +43,12 @@ from paddle._typing.device_like import PlaceLike from paddle.base.core import Place - _InitStreamBase = Union[core.CUDAStream, core.CustomDeviceStream] - _InitEventBase = Union[core.CUDAEvent, core.CustomDeviceEvent] + _InitStreamBase = Union[ + core.CUDAStream, core.CustomDeviceStream, core.XPUStream + ] + _InitEventBase = Union[ + core.CUDAEvent, core.CustomDeviceEvent, core.XPUEvent + ] from paddle import CUDAPlace, CustomPlace from paddle.base.libpaddle import _customDeviceProperties @@ -975,6 +979,11 @@ def __init__( self.event_base = core.CUDAEvent( enable_timing, blocking, interprocess ) + elif paddle.is_compiled_with_xpu() and isinstance( + self.device, paddle.XPUPlace + ): + self.event_base = core.XPUEvent() + elif isinstance(self.device, paddle.CustomPlace): self.event_base = core.CustomDeviceEvent( self.device.get_device_type(), @@ -1138,13 +1147,14 @@ def __init__( ) -> None: if stream_base is not None: if isinstance( - stream_base, (core.CUDAStream, core.CustomDeviceStream) + stream_base, + (core.CUDAStream, core.CustomDeviceStream, core.XPUStream), ): self.stream_base = stream_base self.device = stream_base.place else: raise TypeError( - "stream_base should be CUDAStream, CustomDeviceStream" + "stream_base should be CUDAStream, XPUStream, CustomDeviceStream" ) return @@ -1161,6 +1171,10 @@ def __init__( self.stream_base = core.CUDAStream( self.device.get_device_id(), priority ) + elif paddle.is_compiled_with_xpu() and isinstance( + self.device, paddle.XPUPlace + ): + self.stream_base = core.XPUStream(self.device.get_device_id()) elif isinstance(self.device, paddle.CustomPlace): self.stream_base = core.CustomDeviceStream( self.device.get_device_type(), @@ -1306,6 +1320,8 @@ def synchronize(self) -> None: def _as_parameter_(self): if isinstance(self.stream_base, core.CUDAStream): return ctypes.c_void_p(self.stream_base.cuda_stream) + elif isinstance(self.stream_base, core.XPUStream): + return ctypes.c_void_p(self.stream_base.xpu_stream) else: return ctypes.c_void_p(self.stream_base.raw_stream) @@ -1358,6 +1374,10 @@ def current_stream(device: PlaceLike | None = None) -> Stream: return Stream( stream_base=core._get_current_stream(place.get_device_id()) ) + elif paddle.is_compiled_with_xpu() and isinstance(place, paddle.XPUPlace): + return Stream( + stream_base=core._xpu_get_current_stream(place.get_device_id()) + ) elif isinstance(place, paddle.CustomPlace): return Stream( stream_base=core._get_current_custom_device_stream( @@ -1401,6 +1421,10 @@ def set_stream(stream: Stream) -> Stream: stream.stream_base.place, paddle.CUDAPlace ): core._set_current_stream(stream.stream_base) + elif paddle.is_compiled_with_xpu() and isinstance( + stream.stream_base.place, paddle.XPUPlace + ): + core._xpu_set_current_stream(stream.stream_base.idx) elif isinstance(stream.stream_base.place, paddle.CustomPlace): core._set_current_custom_device_stream( stream.stream_base.place.get_device_type(), diff --git a/python/paddle/device/xpu/__init__.py b/python/paddle/device/xpu/__init__.py index 9a48a70e4a7f23..3840c173953dcd 100644 --- a/python/paddle/device/xpu/__init__.py +++ b/python/paddle/device/xpu/__init__.py @@ -20,6 +20,8 @@ from paddle.base import core from paddle.utils import deprecated +from .streams import Event, Stream + if TYPE_CHECKING: from paddle import XPUPlace @@ -30,6 +32,8 @@ ] __all__ = [ + 'Stream', + 'Event', 'synchronize', 'device_count', 'set_debug_level', @@ -45,6 +49,45 @@ ] +def current_stream(device: _XPUPlaceLike | None = None) -> core.XPUStream: + ''' + Return the current XPU stream by the device. + + Args: + device(paddle.XPUPlace()|int|None, optional): The device or the ID of the device which want to get stream from. + If device is None, the device is the current device. Default: None. + + Returns: + XPUStream: the stream to the device. + + Examples: + .. code-block:: python + + >>> # doctest: +REQUIRES(env:XPU) + >>> import paddle + >>> paddle.device.set_device('xpu') + + >>> s1 = paddle.device.xpu.current_stream() + + >>> s2 = paddle.device.xpu.current_stream(0) + + >>> s3 = paddle.device.xpu.current_stream(paddle.XPUPlace(0)) + + ''' + + device_id = -1 + + if device is not None: + if isinstance(device, int): + device_id = device + elif isinstance(device, core.XPUPlace): + device_id = device.get_device_id() + else: + raise ValueError("device type must be int or paddle.XPUPlace") + + return core._xpu_get_current_stream(device_id) + + def extract_xpu_device_id(device: _XPUPlaceLike, op_name: str) -> int: ''' Return the id of the given xpu device. It is just a utility that will not be exposed to users. diff --git a/python/paddle/device/xpu/streams.py b/python/paddle/device/xpu/streams.py new file mode 100644 index 00000000000000..b396c38890e59f --- /dev/null +++ b/python/paddle/device/xpu/streams.py @@ -0,0 +1,18 @@ +# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from paddle.base.core import ( # noqa: F401 + XPUEvent as Event, + XPUStream as Stream, +) diff --git a/test/xpu/test_xpu_stream_event.py b/test/xpu/test_xpu_stream_event.py index 9bf1d21c5ee57e..b739bc9f7ad390 100644 --- a/test/xpu/test_xpu_stream_event.py +++ b/test/xpu/test_xpu_stream_event.py @@ -12,12 +12,30 @@ # See the License for the specific language governing permissions and # limitations under the License. +import ctypes import unittest +import numpy as np + import paddle from paddle.device import xpu +class TestCurrentStream(unittest.TestCase): + def test_current_stream(self): + if paddle.is_compiled_with_xpu(): + s = xpu.current_stream() + self.assertTrue(isinstance(s, xpu.Stream)) + + s1 = xpu.current_stream(0) + self.assertTrue(isinstance(s1, xpu.Stream)) + + s2 = xpu.current_stream(paddle.XPUPlace(0)) + self.assertTrue(isinstance(s2, xpu.Stream)) + self.assertEqual(s1, s2) + self.assertRaises(ValueError, xpu.current_stream, "xpu:0") + + class TestSynchronize(unittest.TestCase): def test_synchronize(self): if paddle.is_compiled_with_xpu(): @@ -28,5 +46,120 @@ def test_synchronize(self): self.assertRaises(ValueError, xpu.synchronize, "xpu:0") +class TestXPUStream(unittest.TestCase): + def test_xpu_stream(self): + if paddle.is_compiled_with_xpu(): + s = paddle.device.xpu.Stream() + self.assertIsNotNone(s) + + def test_xpu_stream_synchronize(self): + if paddle.is_compiled_with_xpu(): + s = paddle.device.xpu.Stream() + e1 = paddle.device.xpu.Event() + e2 = paddle.device.xpu.Event() + + e1.record(s) + e1.query() + tensor1 = paddle.to_tensor(paddle.rand([1000, 1000])) + tensor2 = paddle.matmul(tensor1, tensor1) + s.synchronize() + e2.record(s) + e2.synchronize() + + self.assertTrue(e2.query()) + + def test_xpu_stream_wait_event_and_record_event(self): + if paddle.is_compiled_with_xpu(): + s1 = xpu.Stream(0) + tensor1 = paddle.to_tensor(paddle.rand([1000, 1000])) + tensor2 = paddle.matmul(tensor1, tensor1) + e1 = xpu.Event() + s1.record_event(e1) + + s2 = xpu.Stream(0) + s2.wait_event(e1) + s2.synchronize() + + self.assertTrue(e1.query()) + + +class TestXPUEvent(unittest.TestCase): + def test_xpu_event(self): + if paddle.is_compiled_with_xpu(): + e = paddle.device.xpu.Event() + self.assertIsNotNone(e) + s = paddle.device.xpu.current_stream() + + def test_xpu_event_methods(self): + if paddle.is_compiled_with_xpu(): + e = paddle.device.xpu.Event() + s = paddle.device.xpu.current_stream() + event_query_1 = e.query() + tensor1 = paddle.to_tensor(paddle.rand([1000, 1000])) + tensor2 = paddle.matmul(tensor1, tensor1) + s.record_event(e) + e.synchronize() + event_query_2 = e.query() + + self.assertTrue(event_query_1) + self.assertTrue(event_query_2) + + +class TestStreamGuard(unittest.TestCase): + ''' + Note: + The asynchronous execution property of XPU Stream can only be tested offline. + ''' + + def test_stream_guard_normal(self): + if paddle.is_compiled_with_xpu(): + s = paddle.device.Stream() + a = paddle.to_tensor(np.array([0, 2, 4], dtype="int32")) + b = paddle.to_tensor(np.array([1, 3, 5], dtype="int32")) + c = a + b + with paddle.device.stream_guard(s): + d = a + b + s.synchronize() + + np.testing.assert_array_equal(np.array(c), np.array(d)) + + def test_stream_guard_default_stream(self): + if paddle.is_compiled_with_xpu(): + s1 = paddle.device.current_stream() + with paddle.device.stream_guard(s1): + pass + s2 = paddle.device.current_stream() + + self.assertTrue(id(s1.stream_base) == id(s2.stream_base)) + + def test_set_current_stream_default_stream(self): + if paddle.is_compiled_with_xpu(): + cur_stream = paddle.device.current_stream() + new_stream = paddle.device.set_stream(cur_stream) + + self.assertTrue( + id(cur_stream.stream_base) == id(new_stream.stream_base) + ) + + def test_stream_guard_raise_error(self): + if paddle.is_compiled_with_xpu(): + + def test_not_correct_stream_guard_input(): + tmp = np.zeros(5) + with paddle.device.stream_guard(tmp): + pass + + self.assertRaises(TypeError, test_not_correct_stream_guard_input) + + +class TestRawStream(unittest.TestCase): + def test_xpu_stream(self): + if paddle.is_compiled_with_xpu(): + xpu_stream = paddle.device.xpu.current_stream().xpu_stream + print(xpu_stream) + self.assertTrue(type(xpu_stream) is int) + ptr = ctypes.c_void_p(xpu_stream) + + if __name__ == "__main__": unittest.main()