diff --git a/src/apex/apex_opencl.cpp b/src/apex/apex_opencl.cpp index d50e0de7..36bce4c5 100644 --- a/src/apex/apex_opencl.cpp +++ b/src/apex/apex_opencl.cpp @@ -62,6 +62,8 @@ #include "async_thread_node.hpp" #include #include +#include +#include namespace apex { namespace opencl { @@ -159,6 +161,21 @@ void enqueue_event(asyncEvent* event) { /* forward declaration, defined at end because it uses OpenCL API calls */ void register_sync_event(cl_command_queue queue); +auto& signalMutex() { + static std::mutex mtx; + return mtx; +} + +auto& signalVar() { + static std::condition_variable var; + return var; +} + +auto& activeQueues() { + static std::set queues; + return queues; +} + auto& deviceMap() { static std::map theMap; return theMap; @@ -1838,52 +1855,79 @@ void store_profiler_data(asyncEvent* event, cl_ulong start, cl_ulong end, opencl instance->complete_task(tt); } +void process_queue(void) { + // waiting for timeout after 1 seconds + std::chrono::seconds timeoutPeriod{1}; + auto timePoint = std::chrono::system_clock::now() + timeoutPeriod; + std::unique_lock uLock(signalMutex()); + while(activeQueues().empty()) + { + if (signalVar().wait_until(uLock, timePoint) //<## + == std::cv_status::timeout) { + continue; + } + for (auto queue : activeQueues()) { + auto& event_queue = getMap(queue); + while(!event_queue.empty()) + { + cl_int err = CL_SUCCESS; + cl_ulong startTime, endTime, queuedTime, submitTime; + asyncEvent* kernel_data = event_queue.front(); + const auto checkError = [=](const char * msg) { + if (err != CL_SUCCESS) { + printf("%s", msg); + abort(); + } + }; + + cl_int status; + err = clGetEventInfo_noinst(kernel_data->_event, CL_EVENT_COMMAND_EXECUTION_STATUS, sizeof(cl_int), &status, NULL); + checkError("Fatal error: calling clGetEventInfo, exiting.\n"); + if (status != CL_COMPLETE) continue; + + err = clGetEventProfilingInfo_noinst(kernel_data->_event, CL_PROFILING_COMMAND_QUEUED, + sizeof(cl_ulong), &queuedTime, NULL); + checkError("Cannot get queued time for Kernel event.\n"); + err = clGetEventProfilingInfo_noinst(kernel_data->_event, CL_PROFILING_COMMAND_SUBMIT, + sizeof(cl_ulong), &submitTime, NULL); + checkError("Cannot get submit time for Kernel event.\n"); + err = clGetEventProfilingInfo_noinst(kernel_data->_event, CL_PROFILING_COMMAND_START, + sizeof(cl_ulong), &startTime, NULL); + checkError("Cannot get start time for Kernel event.\n"); + err = clGetEventProfilingInfo_noinst(kernel_data->_event, CL_PROFILING_COMMAND_END, + sizeof(cl_ulong), &endTime, NULL); + checkError("Cannot get end time for Kernel event.\n"); + + sample_value("Time in Queue (us)", (startTime - queuedTime)/1e3); + sample_value("Time in Submitted (us)", (startTime - submitTime)/1e3); + queueData data = queueContextDeviceMap().find(kernel_data->_queue)->second; + opencl_thread_node node(data.ids[0], data.ids[1], data.ids[2], kernel_data->_type); + double start_d = ((double)startTime); + double end_d = ((double)endTime); + printf("SYNC: start= %f offset= %f, corrected = %f.\n", start_d, data.offset, + start_d + data.offset); + printf("SYNC: end= %f offset= %f, corrected = %f.\n", end_d, data.offset, + end_d + data.offset); + store_profiler_data(kernel_data, start_d + data.offset, end_d + data.offset, node); + event_queue.pop_front(); + clReleaseEvent_noinst(kernel_data->_event); + } + } + } +} + void register_sync_event(cl_command_queue queue) { - auto& event_queue = getMap(queue); - while(!event_queue.empty()) + const auto getThread = [](void) { + std::shared_ptr t = std::make_shared(process_queue); + t->detach(); + return t; + }; + static auto worker = getThread(); { - cl_int err = CL_SUCCESS; - cl_ulong startTime, endTime, queuedTime, submitTime; - asyncEvent* kernel_data = event_queue.front(); - const auto checkError = [=](const char * msg) { - if (err != CL_SUCCESS) { - printf("%s", msg); - abort(); - } - }; - - cl_int status; - err = clGetEventInfo_noinst(kernel_data->_event, CL_EVENT_COMMAND_EXECUTION_STATUS, sizeof(cl_int), &status, NULL); - checkError("Fatal error: calling clGetEventInfo, exiting.\n"); - if (status != CL_COMPLETE) continue; - - err = clGetEventProfilingInfo_noinst(kernel_data->_event, CL_PROFILING_COMMAND_QUEUED, - sizeof(cl_ulong), &queuedTime, NULL); - checkError("Cannot get queued time for Kernel event.\n"); - err = clGetEventProfilingInfo_noinst(kernel_data->_event, CL_PROFILING_COMMAND_SUBMIT, - sizeof(cl_ulong), &submitTime, NULL); - checkError("Cannot get submit time for Kernel event.\n"); - err = clGetEventProfilingInfo_noinst(kernel_data->_event, CL_PROFILING_COMMAND_START, - sizeof(cl_ulong), &startTime, NULL); - checkError("Cannot get start time for Kernel event.\n"); - err = clGetEventProfilingInfo_noinst(kernel_data->_event, CL_PROFILING_COMMAND_END, - sizeof(cl_ulong), &endTime, NULL); - checkError("Cannot get end time for Kernel event.\n"); - - sample_value("Time in Queue (us)", (startTime - queuedTime)/1e3); - sample_value("Time in Submitted (us)", (startTime - submitTime)/1e3); - queueData data = queueContextDeviceMap().find(kernel_data->_queue)->second; - opencl_thread_node node(data.ids[0], data.ids[1], data.ids[2], kernel_data->_type); - double start_d = ((double)startTime); - double end_d = ((double)endTime); - printf("SYNC: start= %f offset= %f, corrected = %f.\n", start_d, data.offset, - start_d + data.offset); - printf("SYNC: end= %f offset= %f, corrected = %f.\n", end_d, data.offset, - end_d + data.offset); - store_profiler_data(kernel_data, start_d + data.offset, end_d + data.offset, node); - event_queue.pop_front(); - clReleaseEvent_noinst(kernel_data->_event); + std::unique_lock l(signalMutex()); + activeQueues().insert(queue); } + signalVar().notify_one(); } double sync_clocks(queueData& qData) {