From 5f9aecd5bb7d6af32ef03dc871e7b580c1e5f86d Mon Sep 17 00:00:00 2001 From: Kevin Huck Date: Fri, 25 Oct 2024 14:38:22 -0700 Subject: [PATCH] Fixing async processing of OpenCL queues --- src/apex/apex.cpp | 6 ++++ src/apex/apex_opencl.cpp | 60 +++++++++++++++++++++++++++++----------- src/apex/apex_opencl.hpp | 6 ++++ 3 files changed, 56 insertions(+), 16 deletions(-) create mode 100644 src/apex/apex_opencl.hpp diff --git a/src/apex/apex.cpp b/src/apex/apex.cpp index 5e7e739c..67c0118c 100644 --- a/src/apex/apex.cpp +++ b/src/apex/apex.cpp @@ -102,6 +102,9 @@ DEFINE_DESTRUCTOR(apex_finalize_static_void) #define FUNCTION_ENTER #define FUNCTION_EXIT #endif +#ifdef APEX_WITH_OPENCL +#include "apex_opencl.hpp" +#endif APEX_NATIVE_TLS bool _registered = false; APEX_NATIVE_TLS bool _exited = false; @@ -1788,6 +1791,9 @@ void finalize(void) dynamic::level0::flush(); dynamic::level0::stop(); } +#ifdef APEX_WITH_OPENCL + opencl::shutdown(); +#endif // stop processing new timers/counters/messages/tasks/etc. apex_options::suspend(true); diff --git a/src/apex/apex_opencl.cpp b/src/apex/apex_opencl.cpp index 836f34c2..78f46d49 100644 --- a/src/apex/apex_opencl.cpp +++ b/src/apex/apex_opencl.cpp @@ -148,14 +148,14 @@ std::deque& getMap(cl_command_queue queue) { asyncEvent* new_gpu_event(scoped_timer& timer, cl_command_queue queue, std::string name, apex_async_activity_t type) { asyncEvent* tmp = new asyncEvent(timer.get_task_wrapper(), queue, name, type); - std::cout << "new event " << name << std::endl; + //std::cout << "new event " << name << std::endl; return tmp; } void enqueue_event(asyncEvent* event) { auto& map = getMap(event->_queue); map.push_back(event); - std::cout << "queued event " << event->_tt_ptr->task_id->get_name() << std::endl; + //std::cout << "queued event " << event->_tt_ptr->task_id->get_name() << std::endl; } /* forward declaration, defined at end because it uses OpenCL API calls */ @@ -205,6 +205,11 @@ auto& queueContextDeviceMap() { return theMap; } +auto& working() { + static std::atomic w{true}; + return w; +} + double sync_clocks(queueData& qData); } // namespace opencl @@ -470,8 +475,22 @@ clRetainCommandQueue(cl_command_queue command_queue) CL_API_SUFFIX__VERSION_1_0 return function_ptr(command_queue); } +extern CL_API_ENTRY cl_int CL_API_CALL +clFlush_noinst(cl_command_queue command_queue) CL_API_SUFFIX__VERSION_1_0; extern CL_API_ENTRY cl_int CL_API_CALL clReleaseCommandQueue(cl_command_queue command_queue) CL_API_SUFFIX__VERSION_1_0 { + // explicitly flush the queue + clFlush_noinst(command_queue); + // signal the worker to process the async events + apex::opencl::register_sync_event(command_queue); + { + std::unique_lock l(apex::opencl::signalMutex()); + auto& event_queue = apex::opencl::getMap(command_queue); + std::cout << "events remaining: " << event_queue.size() << std::endl; + //APEX_ASSERT(event_queue.empty()); + apex::opencl::activeQueues().erase(command_queue); + } + // implicitly flush the queue and delete it GET_SYMBOL_TIMER(clReleaseCommandQueue); return function_ptr(command_queue); } @@ -1101,7 +1120,9 @@ clFlush_noinst(cl_command_queue command_queue) CL_API_SUFFIX__VERSION_1_0 { extern CL_API_ENTRY cl_int CL_API_CALL clFlush(cl_command_queue command_queue) CL_API_SUFFIX__VERSION_1_0 { GET_SYMBOL_TIMER(clFlush); - return function_ptr(command_queue); + auto rc = function_ptr(command_queue); + apex::opencl::register_sync_event(command_queue); + return rc; } extern CL_API_ENTRY cl_int CL_API_CALL @@ -1113,7 +1134,9 @@ clFinish_noinst(cl_command_queue command_queue) CL_API_SUFFIX__VERSION_1_0 { extern CL_API_ENTRY cl_int CL_API_CALL clFinish(cl_command_queue command_queue) CL_API_SUFFIX__VERSION_1_0 { GET_SYMBOL_TIMER(clFinish); - return function_ptr(command_queue); + auto rc = function_ptr(command_queue); + apex::opencl::register_sync_event(command_queue); + return rc; } ///////// @@ -1860,19 +1883,17 @@ void process_queue(void) { std::chrono::seconds timeoutPeriod{1}; auto timePoint = std::chrono::system_clock::now() + timeoutPeriod; std::unique_lock uLock(signalMutex()); - while(activeQueues().empty()) - { + while(working()) { if (signalVar().wait_until(uLock, timePoint) //<## == std::cv_status::timeout) { continue; } + for (auto queue : activeQueues()) { auto& event_queue = getMap(queue); - while(!event_queue.empty()) - { + while(!event_queue.empty()) { cl_int err = CL_SUCCESS; cl_ulong startTime, endTime, queuedTime, submitTime; - asyncEvent* kernel_data = event_queue.front(); const auto checkError = [=](const char * msg) { if (err != CL_SUCCESS) { printf("%s", msg); @@ -1880,10 +1901,13 @@ void process_queue(void) { } }; + asyncEvent* kernel_data = event_queue.front(); cl_int status; err = clGetEventInfo_noinst(kernel_data->_event, CL_EVENT_COMMAND_EXECUTION_STATUS, sizeof(cl_int), &status, NULL); checkError("Fatal error: calling clGetEventInfo, exiting.\n"); - if (status != CL_COMPLETE) continue; + if (status != CL_COMPLETE) { + break; + } err = clGetEventProfilingInfo_noinst(kernel_data->_event, CL_PROFILING_COMMAND_QUEUED, sizeof(cl_ulong), &queuedTime, NULL); @@ -1904,22 +1928,23 @@ void process_queue(void) { opencl_thread_node node(data.ids[0], data.ids[1], data.ids[2], kernel_data->_type); double start_d = ((double)startTime); double end_d = ((double)endTime); - printf("SYNC: start= %f offset= %f, corrected = %f.\n", start_d, data.offset, - start_d + data.offset); - printf("SYNC: end= %f offset= %f, corrected = %f.\n", end_d, data.offset, - end_d + data.offset); + //printf("SYNC: start= %f offset= %f, corrected = %f.\n", start_d, data.offset, start_d + data.offset); + //printf("SYNC: end= %f offset= %f, corrected = %f.\n", end_d, data.offset, end_d + data.offset); store_profiler_data(kernel_data, start_d + data.offset, end_d + data.offset, node); event_queue.pop_front(); clReleaseEvent_noinst(kernel_data->_event); } + //std::cout << " * * * * * Queue empty * * * * *" << std::endl; } } + std::cout << "********* Exiting worker *********" << std::endl; } void register_sync_event(cl_command_queue queue) { const auto getThread = [](void) { std::shared_ptr t = std::make_shared(process_queue); t->detach(); + std::cout << "new thread" << std::endl; return t; }; static auto worker = getThread(); @@ -1972,10 +1997,13 @@ double sync_clocks(queueData& qData) { err = clGetEventProfilingInfo_noinst(sync_event, CL_PROFILING_COMMAND_END, sizeof(cl_ulong), &gpu_timestamp, NULL); checkError("Cannot get end time for Sync event.\n"); qData.offset = cpu_timestamp - (((double)gpu_timestamp)); - printf("SYNC: CPU= %f GPU= %f, diff = %f.\n", cpu_timestamp, ((double)gpu_timestamp), - qData.offset); + //printf("SYNC: CPU= %f GPU= %f, diff = %f.\n", cpu_timestamp, ((double)gpu_timestamp), qData.offset); return qData.offset; } +void shutdown(void) { + working() = false; +} + } // namespace opencl; } // namespace apex; diff --git a/src/apex/apex_opencl.hpp b/src/apex/apex_opencl.hpp new file mode 100644 index 00000000..c0e28fd0 --- /dev/null +++ b/src/apex/apex_opencl.hpp @@ -0,0 +1,6 @@ + +namespace apex { +namespace opencl { +void shutdown(void); +} +} \ No newline at end of file