Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Improve Windows threading performance scaling" #4113

Merged
merged 1 commit into from
Jun 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions CONTRIBUTORS.md
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,3 @@ In chronological order:

* Pablo Romero <https://github.com/pablorcum>
* [2022-08] Fix building from sources for QNX

* Mark Seminatore <https://github.com/mseminatore>
* [2023-06-23] Fix bounds issue in goto_set_num_threads
* [2023-06-23] Improve Windows threading performance scaling

110 changes: 61 additions & 49 deletions driver/others/blas_server_win32.c
Original file line number Diff line number Diff line change
Expand Up @@ -50,19 +50,11 @@

/* This is a thread implementation for Win32 lazy implementation */

#if defined (__GNUC__) && (__GNUC__ < 6)
#define WIN_CAS(dest, exch, comp) __sync_val_compare_and_swap(dest, comp, exch)
#else
#if defined(_WIN64)
#define WIN_CAS(dest, exch, comp) InterlockedCompareExchange64(dest, exch, comp)
#else
#define WIN_CAS(dest, exch, comp) InterlockedCompareExchange(dest, exch, comp)
#endif
#endif

/* Thread server common information */
typedef struct{
HANDLE taskSemaphore;
CRITICAL_SECTION lock;
HANDLE filled;
HANDLE killed;

blas_queue_t *queue; /* Parameter Pointer */
int shutdown; /* server shutdown flag */
Expand All @@ -79,6 +71,8 @@ static blas_pool_t pool;
static HANDLE blas_threads [MAX_CPU_NUMBER];
static DWORD blas_threads_id[MAX_CPU_NUMBER];



static void legacy_exec(void *func, int mode, blas_arg_t *args, void *sb){

if (!(mode & BLAS_COMPLEX)){
Expand Down Expand Up @@ -204,6 +198,7 @@ static void legacy_exec(void *func, int mode, blas_arg_t *args, void *sb){

/* This is a main routine of threads. Each thread waits until job is */
/* queued. */

static DWORD WINAPI blas_thread_server(void *arg){

/* Thread identifier */
Expand All @@ -212,7 +207,9 @@ static DWORD WINAPI blas_thread_server(void *arg){
#endif

void *buffer, *sa, *sb;
volatile blas_queue_t *queue;
blas_queue_t *queue;
DWORD action;
HANDLE handles[] = {pool.filled, pool.killed};

/* Each server needs each buffer */
buffer = blas_memory_alloc(2);
Expand All @@ -229,32 +226,28 @@ static DWORD WINAPI blas_thread_server(void *arg){
fprintf(STDERR, "Server[%2ld] Waiting for Queue.\n", cpu);
#endif

// all worker threads wait on the semaphore
WaitForSingleObject(pool.taskSemaphore, INFINITE);
do {
action = WaitForMultipleObjects(2, handles, FALSE, INFINITE);
} while ((action != WAIT_OBJECT_0) && (action != WAIT_OBJECT_0 + 1));

if (action == WAIT_OBJECT_0 + 1) break;

// kill the thread if we are shutting down the server
if (pool.shutdown)
break;

#ifdef SMP_DEBUG
fprintf(STDERR, "Server[%2ld] Got it.\n", cpu);
#endif

// grab a queued task and update the list
volatile blas_queue_t* queue_next;
INT_PTR prev_value;
do {
queue = (volatile blas_queue_t*)pool.queue;
if (!queue)
break;
EnterCriticalSection(&pool.lock);

queue = pool.queue;
if (queue) pool.queue = queue->next;

queue_next = (volatile blas_queue_t*)queue->next;
prev_value = WIN_CAS((INT_PTR*)&pool.queue, (INT_PTR)queue_next, (INT_PTR)queue);
} while (prev_value != queue);
LeaveCriticalSection(&pool.lock);

if (queue) {
int (*routine)(blas_arg_t *, void *, void *, void *, void *, BLASLONG) = queue -> routine;

if (pool.queue) SetEvent(pool.filled);

sa = queue -> sa;
sb = queue -> sb;

Expand Down Expand Up @@ -339,8 +332,13 @@ static DWORD WINAPI blas_thread_server(void *arg){
fprintf(STDERR, "Server[%2ld] Finished!\n", cpu);
#endif

// mark our sub-task as complete
InterlockedDecrement(&queue->status);
EnterCriticalSection(&queue->lock);

queue -> status = BLAS_STATUS_FINISHED;

LeaveCriticalSection(&queue->lock);

SetEvent(queue->finish);
}

/* Shutdown procedure */
Expand All @@ -355,7 +353,7 @@ static DWORD WINAPI blas_thread_server(void *arg){
}

/* Initializing routine */
int blas_thread_init(void){
int blas_thread_init(void){
BLASLONG i;

if (blas_server_avail || (blas_cpu_number <= 1)) return 0;
Expand All @@ -369,7 +367,9 @@ static DWORD WINAPI blas_thread_server(void *arg){

if (!blas_server_avail){

pool.taskSemaphore = CreateSemaphore(NULL, 0, blas_cpu_number - 1, NULL);
InitializeCriticalSection(&pool.lock);
pool.filled = CreateEvent(NULL, FALSE, FALSE, NULL);
pool.killed = CreateEvent(NULL, TRUE, FALSE, NULL);

pool.shutdown = 0;
pool.queue = NULL;
Expand All @@ -391,10 +391,11 @@ static DWORD WINAPI blas_thread_server(void *arg){
/*
User can call one of two routines.

exec_blas_async ... immediately returns after jobs are queued.
exec_blas_async ... immediately returns after jobs are queued.

exec_blas ... returns after jobs are finished.
exec_blas ... returns after jobs are finished.
*/

int exec_blas_async(BLASLONG pos, blas_queue_t *queue){

#if defined(SMP_SERVER)
Expand All @@ -408,7 +409,8 @@ int exec_blas_async(BLASLONG pos, blas_queue_t *queue){
current = queue;

while (current) {
current->status = 1;
InitializeCriticalSection(&current -> lock);
current -> finish = CreateEvent(NULL, FALSE, FALSE, NULL);
current -> position = pos;

#ifdef CONSISTENT_FPCSR
Expand All @@ -420,10 +422,19 @@ int exec_blas_async(BLASLONG pos, blas_queue_t *queue){
pos ++;
}

pool.queue = queue;
EnterCriticalSection(&pool.lock);

if (pool.queue) {
current = pool.queue;
while (current -> next) current = current -> next;
current -> next = queue;
} else {
pool.queue = queue;
}

LeaveCriticalSection(&pool.lock);

// start up worker threads
ReleaseSemaphore(pool.taskSemaphore, pos - 1, NULL);
SetEvent(pool.filled);

return 0;
}
Expand All @@ -439,9 +450,10 @@ int exec_blas_async_wait(BLASLONG num, blas_queue_t *queue){
fprintf(STDERR, "Waiting Queue ..\n");
#endif

// spin-wait on each sub-task to finish
while (*((volatile int*)&queue->status))
YIELDING;
WaitForSingleObject(queue->finish, INFINITE);

CloseHandle(queue->finish);
DeleteCriticalSection(&queue -> lock);

queue = queue -> next;
num --;
Expand Down Expand Up @@ -489,21 +501,18 @@ int exec_blas(BLASLONG num, blas_queue_t *queue){

/* Shutdown procedure, but user don't have to call this routine. The */
/* kernel automatically kill threads. */

int BLASFUNC(blas_thread_shutdown)(void){

int i;

#ifdef SMP_DEBUG
fprintf(STDERR, "blas_thread_shutdown..\n");
#endif

if (!blas_server_avail) return 0;

LOCK_COMMAND(&server_lock);

if (blas_server_avail){

pool.shutdown = 1;
SetEvent(pool.killed);

for(i = 0; i < blas_num_threads - 1; i++){
// Could also just use WaitForMultipleObjects
Expand All @@ -519,7 +528,8 @@ int BLASFUNC(blas_thread_shutdown)(void){
CloseHandle(blas_threads[i]);
}

CloseHandle(pool.taskSemaphore);
CloseHandle(pool.filled);
CloseHandle(pool.killed);

blas_server_avail = 0;
}
Expand Down Expand Up @@ -549,14 +559,16 @@ void goto_set_num_threads(int num_threads)
//increased_threads = 1;
if (!blas_server_avail){

pool.taskSemaphore = CreateSemaphore(NULL, 0, blas_cpu_number - 1, NULL);
InitializeCriticalSection(&pool.lock);
pool.filled = CreateEvent(NULL, FALSE, FALSE, NULL);
pool.killed = CreateEvent(NULL, TRUE, FALSE, NULL);

pool.shutdown = 0;
pool.queue = NULL;
blas_server_avail = 1;
}

for(i = blas_num_threads; i < num_threads - 1; i++){
for(i = blas_num_threads - 1; i < num_threads - 1; i++){

blas_threads[i] = CreateThread(NULL, 0,
blas_thread_server, (void *)i,
Expand Down