Skip to content

Commit

Permalink
Fixing async processing of OpenCL queues
Browse files Browse the repository at this point in the history
  • Loading branch information
khuck committed Oct 25, 2024
1 parent 5cd5113 commit 5f9aecd
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 16 deletions.
6 changes: 6 additions & 0 deletions src/apex/apex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ DEFINE_DESTRUCTOR(apex_finalize_static_void)
#define FUNCTION_ENTER
#define FUNCTION_EXIT
#endif
#ifdef APEX_WITH_OPENCL
#include "apex_opencl.hpp"
#endif

APEX_NATIVE_TLS bool _registered = false;
APEX_NATIVE_TLS bool _exited = false;
Expand Down Expand Up @@ -1788,6 +1791,9 @@ void finalize(void)
dynamic::level0::flush();
dynamic::level0::stop();
}
#ifdef APEX_WITH_OPENCL
opencl::shutdown();
#endif

// stop processing new timers/counters/messages/tasks/etc.
apex_options::suspend(true);
Expand Down
60 changes: 44 additions & 16 deletions src/apex/apex_opencl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,14 +148,14 @@ std::deque<asyncEvent*>& getMap(cl_command_queue queue) {
asyncEvent* new_gpu_event(scoped_timer& timer,
cl_command_queue queue, std::string name, apex_async_activity_t type) {
asyncEvent* tmp = new asyncEvent(timer.get_task_wrapper(), queue, name, type);
std::cout << "new event " << name << std::endl;
//std::cout << "new event " << name << std::endl;
return tmp;
}

void enqueue_event(asyncEvent* event) {
auto& map = getMap(event->_queue);
map.push_back(event);
std::cout << "queued event " << event->_tt_ptr->task_id->get_name() << std::endl;
//std::cout << "queued event " << event->_tt_ptr->task_id->get_name() << std::endl;
}

/* forward declaration, defined at end because it uses OpenCL API calls */
Expand Down Expand Up @@ -205,6 +205,11 @@ auto& queueContextDeviceMap() {
return theMap;
}

auto& working() {
static std::atomic<bool> w{true};
return w;
}

double sync_clocks(queueData& qData);

} // namespace opencl
Expand Down Expand Up @@ -470,8 +475,22 @@ clRetainCommandQueue(cl_command_queue command_queue) CL_API_SUFFIX__VERSION_1_0
return function_ptr(command_queue);
}

extern CL_API_ENTRY cl_int CL_API_CALL
clFlush_noinst(cl_command_queue command_queue) CL_API_SUFFIX__VERSION_1_0;
extern CL_API_ENTRY cl_int CL_API_CALL
clReleaseCommandQueue(cl_command_queue command_queue) CL_API_SUFFIX__VERSION_1_0 {
// explicitly flush the queue
clFlush_noinst(command_queue);
// signal the worker to process the async events
apex::opencl::register_sync_event(command_queue);
{
std::unique_lock l(apex::opencl::signalMutex());
auto& event_queue = apex::opencl::getMap(command_queue);
std::cout << "events remaining: " << event_queue.size() << std::endl;
//APEX_ASSERT(event_queue.empty());
apex::opencl::activeQueues().erase(command_queue);
}
// implicitly flush the queue and delete it
GET_SYMBOL_TIMER(clReleaseCommandQueue);
return function_ptr(command_queue);
}
Expand Down Expand Up @@ -1101,7 +1120,9 @@ clFlush_noinst(cl_command_queue command_queue) CL_API_SUFFIX__VERSION_1_0 {
extern CL_API_ENTRY cl_int CL_API_CALL
clFlush(cl_command_queue command_queue) CL_API_SUFFIX__VERSION_1_0 {
GET_SYMBOL_TIMER(clFlush);
return function_ptr(command_queue);
auto rc = function_ptr(command_queue);
apex::opencl::register_sync_event(command_queue);
return rc;
}

extern CL_API_ENTRY cl_int CL_API_CALL
Expand All @@ -1113,7 +1134,9 @@ clFinish_noinst(cl_command_queue command_queue) CL_API_SUFFIX__VERSION_1_0 {
extern CL_API_ENTRY cl_int CL_API_CALL
clFinish(cl_command_queue command_queue) CL_API_SUFFIX__VERSION_1_0 {
GET_SYMBOL_TIMER(clFinish);
return function_ptr(command_queue);
auto rc = function_ptr(command_queue);
apex::opencl::register_sync_event(command_queue);
return rc;
}

/////////
Expand Down Expand Up @@ -1860,30 +1883,31 @@ void process_queue(void) {
std::chrono::seconds timeoutPeriod{1};
auto timePoint = std::chrono::system_clock::now() + timeoutPeriod;
std::unique_lock<std::mutex> uLock(signalMutex());
while(activeQueues().empty())
{
while(working()) {
if (signalVar().wait_until(uLock, timePoint) //<##
== std::cv_status::timeout) {
continue;
}

for (auto queue : activeQueues()) {
auto& event_queue = getMap(queue);
while(!event_queue.empty())
{
while(!event_queue.empty()) {
cl_int err = CL_SUCCESS;
cl_ulong startTime, endTime, queuedTime, submitTime;
asyncEvent* kernel_data = event_queue.front();
const auto checkError = [=](const char * msg) {
if (err != CL_SUCCESS) {
printf("%s", msg);
abort();
}
};

asyncEvent* kernel_data = event_queue.front();
cl_int status;
err = clGetEventInfo_noinst(kernel_data->_event, CL_EVENT_COMMAND_EXECUTION_STATUS, sizeof(cl_int), &status, NULL);
checkError("Fatal error: calling clGetEventInfo, exiting.\n");
if (status != CL_COMPLETE) continue;
if (status != CL_COMPLETE) {
break;
}

err = clGetEventProfilingInfo_noinst(kernel_data->_event, CL_PROFILING_COMMAND_QUEUED,
sizeof(cl_ulong), &queuedTime, NULL);
Expand All @@ -1904,22 +1928,23 @@ void process_queue(void) {
opencl_thread_node node(data.ids[0], data.ids[1], data.ids[2], kernel_data->_type);
double start_d = ((double)startTime);
double end_d = ((double)endTime);
printf("SYNC: start= %f offset= %f, corrected = %f.\n", start_d, data.offset,
start_d + data.offset);
printf("SYNC: end= %f offset= %f, corrected = %f.\n", end_d, data.offset,
end_d + data.offset);
//printf("SYNC: start= %f offset= %f, corrected = %f.\n", start_d, data.offset, start_d + data.offset);
//printf("SYNC: end= %f offset= %f, corrected = %f.\n", end_d, data.offset, end_d + data.offset);
store_profiler_data(kernel_data, start_d + data.offset, end_d + data.offset, node);
event_queue.pop_front();
clReleaseEvent_noinst(kernel_data->_event);
}
//std::cout << " * * * * * Queue empty * * * * *" << std::endl;
}
}
std::cout << "********* Exiting worker *********" << std::endl;
}

void register_sync_event(cl_command_queue queue) {
const auto getThread = [](void) {
std::shared_ptr<std::thread> t = std::make_shared<std::thread>(process_queue);
t->detach();
std::cout << "new thread" << std::endl;
return t;
};
static auto worker = getThread();
Expand Down Expand Up @@ -1972,10 +1997,13 @@ double sync_clocks(queueData& qData) {
err = clGetEventProfilingInfo_noinst(sync_event, CL_PROFILING_COMMAND_END, sizeof(cl_ulong), &gpu_timestamp, NULL);
checkError("Cannot get end time for Sync event.\n");
qData.offset = cpu_timestamp - (((double)gpu_timestamp));
printf("SYNC: CPU= %f GPU= %f, diff = %f.\n", cpu_timestamp, ((double)gpu_timestamp),
qData.offset);
//printf("SYNC: CPU= %f GPU= %f, diff = %f.\n", cpu_timestamp, ((double)gpu_timestamp), qData.offset);
return qData.offset;
}

void shutdown(void) {
working() = false;
}

} // namespace opencl;
} // namespace apex;
6 changes: 6 additions & 0 deletions src/apex/apex_opencl.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@

namespace apex {
namespace opencl {
void shutdown(void);
}
}

0 comments on commit 5f9aecd

Please sign in to comment.