Skip to content

Commit

Permalink
Changed to asynchronous processing for opencl device activity
Browse files Browse the repository at this point in the history
  • Loading branch information
khuck committed Oct 25, 2024
1 parent f2e0d18 commit 2af8a8d
Showing 1 changed file with 87 additions and 43 deletions.
130 changes: 87 additions & 43 deletions src/apex/apex_opencl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@
#include "async_thread_node.hpp"
#include <map>
#include <deque>
#include <mutex>
#include <condition_variable>

namespace apex {
namespace opencl {
Expand Down Expand Up @@ -159,6 +161,21 @@ void enqueue_event(asyncEvent* event) {
/* forward declaration, defined at end because it uses OpenCL API calls */
void register_sync_event(cl_command_queue queue);

auto& signalMutex() {
static std::mutex mtx;
return mtx;
}

auto& signalVar() {
static std::condition_variable var;
return var;
}

auto& activeQueues() {
static std::set<cl_command_queue> queues;
return queues;
}

auto& deviceMap() {
static std::map<cl_device_id, uint32_t> theMap;
return theMap;
Expand Down Expand Up @@ -1838,52 +1855,79 @@ void store_profiler_data(asyncEvent* event, cl_ulong start, cl_ulong end, opencl
instance->complete_task(tt);
}

void process_queue(void) {
// waiting for timeout after 1 seconds
std::chrono::seconds timeoutPeriod{1};
auto timePoint = std::chrono::system_clock::now() + timeoutPeriod;
std::unique_lock<std::mutex> uLock(signalMutex());
while(activeQueues().empty())
{
if (signalVar().wait_until(uLock, timePoint) //<##
== std::cv_status::timeout) {
continue;
}
for (auto queue : activeQueues()) {
auto& event_queue = getMap(queue);
while(!event_queue.empty())
{
cl_int err = CL_SUCCESS;
cl_ulong startTime, endTime, queuedTime, submitTime;
asyncEvent* kernel_data = event_queue.front();
const auto checkError = [=](const char * msg) {
if (err != CL_SUCCESS) {
printf("%s", msg);
abort();
}
};

cl_int status;
err = clGetEventInfo_noinst(kernel_data->_event, CL_EVENT_COMMAND_EXECUTION_STATUS, sizeof(cl_int), &status, NULL);
checkError("Fatal error: calling clGetEventInfo, exiting.\n");
if (status != CL_COMPLETE) continue;

err = clGetEventProfilingInfo_noinst(kernel_data->_event, CL_PROFILING_COMMAND_QUEUED,
sizeof(cl_ulong), &queuedTime, NULL);
checkError("Cannot get queued time for Kernel event.\n");
err = clGetEventProfilingInfo_noinst(kernel_data->_event, CL_PROFILING_COMMAND_SUBMIT,
sizeof(cl_ulong), &submitTime, NULL);
checkError("Cannot get submit time for Kernel event.\n");
err = clGetEventProfilingInfo_noinst(kernel_data->_event, CL_PROFILING_COMMAND_START,
sizeof(cl_ulong), &startTime, NULL);
checkError("Cannot get start time for Kernel event.\n");
err = clGetEventProfilingInfo_noinst(kernel_data->_event, CL_PROFILING_COMMAND_END,
sizeof(cl_ulong), &endTime, NULL);
checkError("Cannot get end time for Kernel event.\n");

sample_value("Time in Queue (us)", (startTime - queuedTime)/1e3);
sample_value("Time in Submitted (us)", (startTime - submitTime)/1e3);
queueData data = queueContextDeviceMap().find(kernel_data->_queue)->second;
opencl_thread_node node(data.ids[0], data.ids[1], data.ids[2], kernel_data->_type);
double start_d = ((double)startTime);
double end_d = ((double)endTime);
printf("SYNC: start= %f offset= %f, corrected = %f.\n", start_d, data.offset,
start_d + data.offset);
printf("SYNC: end= %f offset= %f, corrected = %f.\n", end_d, data.offset,
end_d + data.offset);
store_profiler_data(kernel_data, start_d + data.offset, end_d + data.offset, node);
event_queue.pop_front();
clReleaseEvent_noinst(kernel_data->_event);
}
}
}
}

void register_sync_event(cl_command_queue queue) {
auto& event_queue = getMap(queue);
while(!event_queue.empty())
const auto getThread = [](void) {
std::shared_ptr<std::thread> t = std::make_shared<std::thread>(process_queue);
t->detach();
return t;
};
static auto worker = getThread();
{
cl_int err = CL_SUCCESS;
cl_ulong startTime, endTime, queuedTime, submitTime;
asyncEvent* kernel_data = event_queue.front();
const auto checkError = [=](const char * msg) {
if (err != CL_SUCCESS) {
printf("%s", msg);
abort();
}
};

cl_int status;
err = clGetEventInfo_noinst(kernel_data->_event, CL_EVENT_COMMAND_EXECUTION_STATUS, sizeof(cl_int), &status, NULL);
checkError("Fatal error: calling clGetEventInfo, exiting.\n");
if (status != CL_COMPLETE) continue;

err = clGetEventProfilingInfo_noinst(kernel_data->_event, CL_PROFILING_COMMAND_QUEUED,
sizeof(cl_ulong), &queuedTime, NULL);
checkError("Cannot get queued time for Kernel event.\n");
err = clGetEventProfilingInfo_noinst(kernel_data->_event, CL_PROFILING_COMMAND_SUBMIT,
sizeof(cl_ulong), &submitTime, NULL);
checkError("Cannot get submit time for Kernel event.\n");
err = clGetEventProfilingInfo_noinst(kernel_data->_event, CL_PROFILING_COMMAND_START,
sizeof(cl_ulong), &startTime, NULL);
checkError("Cannot get start time for Kernel event.\n");
err = clGetEventProfilingInfo_noinst(kernel_data->_event, CL_PROFILING_COMMAND_END,
sizeof(cl_ulong), &endTime, NULL);
checkError("Cannot get end time for Kernel event.\n");

sample_value("Time in Queue (us)", (startTime - queuedTime)/1e3);
sample_value("Time in Submitted (us)", (startTime - submitTime)/1e3);
queueData data = queueContextDeviceMap().find(kernel_data->_queue)->second;
opencl_thread_node node(data.ids[0], data.ids[1], data.ids[2], kernel_data->_type);
double start_d = ((double)startTime);
double end_d = ((double)endTime);
printf("SYNC: start= %f offset= %f, corrected = %f.\n", start_d, data.offset,
start_d + data.offset);
printf("SYNC: end= %f offset= %f, corrected = %f.\n", end_d, data.offset,
end_d + data.offset);
store_profiler_data(kernel_data, start_d + data.offset, end_d + data.offset, node);
event_queue.pop_front();
clReleaseEvent_noinst(kernel_data->_event);
std::unique_lock l(signalMutex());
activeQueues().insert(queue);
}
signalVar().notify_one();
}

double sync_clocks(queueData& qData) {
Expand Down

0 comments on commit 2af8a8d

Please sign in to comment.