Skip to content

Commit

Permalink
Added semaphore support for Linux platform
Browse files Browse the repository at this point in the history
  • Loading branch information
CristianDragu committed May 2, 2019
1 parent 49844c2 commit 718b61b
Show file tree
Hide file tree
Showing 7 changed files with 242 additions and 225 deletions.
49 changes: 25 additions & 24 deletions SparrowImplicitLib/par/config.spr
Original file line number Diff line number Diff line change
@@ -1,32 +1,33 @@
module par.config;
module par.config

import std.ptr;
import std.compilerInfo
import std.ptr

package Impl
{
using _SC_NPROCESSORS_ONLN = 58;
[ct] if platformName == "Linux"
datatype NativeThreadHandle = ULong
else
datatype NativeThreadHandle = Byte Ptr // Opaque type

[native("sysconf")] fun sysconf(name: Int): Long;
}

datatype NativeThreadHandle = Byte Ptr; // Opaque type


fun >>(h: NativeThreadHandle, os: @OutStream)
{
if ( h.impl.isSet )
os << mkStreamRefWrapper(h.impl.get);
else
os << "null";
}

using InvalidThreadHandle = NativeThreadHandle();
fun >>(h: NativeThreadHandle, os: @OutStream)
if ( h.impl.isSet )
os << mkStreamRefWrapper(h.impl.get)
else
os << "null"

using InvalidThreadHandle = NativeThreadHandle()

//! Get the number of available logical CPU cores for our process
//! This dictates how much parallelism we have to be exploit
fun getAvailableCoresNum(): UInt {
var maxProcs: Long = Impl.sysconf(Impl._SC_NPROCESSORS_ONLN);
return ife(maxProcs<1, UInt(1), UInt(maxProcs));
fun getAvailableCoresNum(): UInt
var maxProcs: Long = _Impl.sysconf(_Impl._SC_NPROCESSORS_ONLN)
return ife(maxProcs < 1, UInt(1), UInt(maxProcs))
// TODO: also consider process affinity
}

package _Impl

[ct] if platformName == "Darwin"
using _SC_NPROCESSORS_ONLN = 58
else
using _SC_NPROCESSORS_ONLN = 84

[native("sysconf")] fun sysconf(name: Int): Long
91 changes: 58 additions & 33 deletions SparrowImplicitLib/par/semaphore.spr
Original file line number Diff line number Diff line change
@@ -1,48 +1,73 @@
module par.semaphore;
module par.semaphore

import config;
import std.compilerInfo
import config

datatype TimeSpecT
tv_sec: Int
tv_nsec: Long

//! Allows a limited numbers of threads to simultaneously "acquire" a resource
//! before releasing it.
//! Compared to the mutex, we can release the semaphore without acquiring it
datatype Semaphore
_handle: Impl.SemaphoreT;
_handle: _Impl.SemaphoreT

fun ctor(this: @Semaphore, startValue: UInt = 0)
_handle ctor;
var res = Impl.semaphore_create(Impl.mach_task_self(), _handle, Impl.SYNC_POLICY_FIFO, Int(startValue));
_handle ctor
[ct] if platformName == "Darwin"
var res = _Impl.semaphore_create(_Impl.mach_task_self(), _handle, _Impl.SYNC_POLICY_FIFO, Int(startValue))
else
var res = _Impl.sem_init(_handle, Int(0), UInt(startValue))

fun dtor(this: @Semaphore)
//verify(0 == Impl.semaphore_destroy(Impl.mach_task_self(), _handle));
Impl.semaphore_destroy(Impl.mach_task_self(), _handle);
[ct] if platformName == "Darwin"
_Impl.semaphore_destroy(_Impl.mach_task_self(), _handle)
else
_Impl.sem_destroy(_handle)

//! Increments the counter of the semaphore
fun release(s: @Semaphore) {
Impl.semaphore_signal(s._handle);
}
fun release(s: @Semaphore)
[ct] if platformName == "Darwin"
_Impl.semaphore_signal(s._handle)
else
_Impl.sem_post(s._handle)

//! Decrements the semaphore counter
//! If the counter reaches zero, the call blocks until somebody calls 'release'
fun acquire(s: @Semaphore) {
while 0 != Impl.semaphore_wait(s._handle)
/*keep trying*/;
}

package Impl {
using TaskT = Int;
using SemaphoreT = Int;
using SYNC_POLICY_FIFO = 0;

[native("mach_task_self")] fun mach_task_self(): TaskT;
[native("semaphore_create")] fun semaphore_create(task: TaskT, s: @SemaphoreT, policy, value: Int): Int;
[native("semaphore_destroy")] fun semaphore_destroy(task: TaskT, s: SemaphoreT): Int;
[native("semaphore_signal")] fun semaphore_signal(s: SemaphoreT): Int;
[native("semaphore_wait")] fun semaphore_wait(s: SemaphoreT): Int;

//using SemType = Int;

//[native("sem_init")] fun sem_init(s: @SemType, pshared: Int, value: UInt): Int;
//[native("sem_destroy")] fun sem_destroy(s: @SemType): Int;
//[native("sem_wait")] fun sem_wait(s: @SemType): Int;
//[native("sem_post")] fun sem_post(s: @SemType): Int;
}
fun acquire(s: @Semaphore)
[ct] if platformName == "Darwin"
while 0 != _Impl.semaphore_wait(s._handle)
/* keep trying */;
else
while 0 != _Impl.sem_wait(s._handle)
/* keep trying */;

package _Impl
[ct] if platformName == "Darwin"
using TaskT = Int
using SYNC_POLICY_FIFO = 0

using SemaphoreT = Int

[ct] if platformName == "Linux"
using ModeT = Int

[ct] if platformName == "Darwin"
[native("mach_task_self")] fun mach_task_self(): TaskT;
[native("semaphore_create")] fun semaphore_create(task: TaskT, s: @SemaphoreT, policy, value: Int): Int;
[native("semaphore_destroy")] fun semaphore_destroy(task: TaskT, s: SemaphoreT): Int;
[native("semaphore_signal")] fun semaphore_signal(s: SemaphoreT): Int;
[native("semaphore_wait")] fun semaphore_wait(s: SemaphoreT): Int;
else
[native("sem_init")] fun sem_init(sem: @SemaphoreT, pshared: Int, value: UInt): Int
[native("sem_destroy")] fun sem_destroy(sem: @SemaphoreT): Int
[native("sem_wait")] fun sem_wait(sem: @SemaphoreT): Int
[native("sem_trywait")] fun sem_trywait(sem: @SemaphoreT): Int
[native("sem_timedwait")] fun sem_timedwait(sem: @SemaphoreT, timeout: @TimeSpecT): Int
[native("sem_post")] fun sem_post(sem: @SemaphoreT): Int
[native("sem_getvalue")] fun sem_getvalue(sem: @SemaphoreT, sval: @Int): Int
[native("sem_close")] fun sem_close(sem: @SemaphoreT): Int
[native("sem_unlink")] fun sem_unlink(name: @Char): Int
[native("sem_open")] fun sem_open(name: @Char, oflag: Int): @SemaphoreT
[native("sem_open")] fun sem_open(name: @Char, oflag: Int, mode: ModeT, value: UInt): @SemaphoreT
172 changes: 89 additions & 83 deletions SparrowImplicitLib/par/thread.spr
Original file line number Diff line number Diff line change
@@ -1,128 +1,134 @@
module par.thread;
module par.thread

import config;
import std.newDelete;
//import assert;
import std.compilerInfo
import config
import std.newDelete
//import assert

datatype NativeThreadHandle = ULong

//! Datatype describing a thread ID
datatype ThreadId
id: ULong;
id: NativeThreadHandle

fun ctor(this: @ThreadId, id: ULong)
this.id ctor id;
fun ctor(this: @ThreadId, h: NativeThreadHandle)
this.id ctor;
Impl.pthread_threadid_np(h, id);
fun >>(tid: @ThreadId, os: @OutStream)
os << tid.id;
this.id ctor id

[ct] if platformName == "Darwin"
fun ctor(this: @ThreadId, h: NativeThreadHandle)
this.id ctor
_Impl.pthread_threadid_np(h, id)

fun >>(tid: @ThreadId, os: @OutStream)
os << tid.id

datatype Thread
_handle: NativeThreadHandle;
_handle: NativeThreadHandle

//! Construct the Thread with a functor
//! The functor can have state, not just executing code
fun ctor(this: @Thread, f: AnyType) if isValid(f())
_handle = Impl.startThread(f);
_handle = _Impl.startThread(f)

//! Cannot copy construct or assign a Thread object
[private] fun ctor(this, other: @Thread);
[private] fun =(this, other: @Thread);

//! Destructor. Detach the thread if joinable
fun dtor(this: @Thread)
if ( this isJoinable ) this detach;
if this isJoinable
this detach

fun >>(t: @Thread, os: @OutStream)
{
if ( _handle != NativeThreadHandle() )
os << "Thread(" << _handle << ")";
if _handle != NativeThreadHandle()
os << "Thread(" << _handle << ")"
else
os << "Thread(null)";
}
os << "Thread(null)"

//! Swap the content of two Thread objects
//fun swap(lhs, rhs: @Thread) {
// var tmp = lhs._handle swap rhs._handle;
//}
//fun swap(lhs, rhs: @Thread)
// var tmp = lhs._handle swap rhs._handle

//! Test if the thread is joinable (e.g., still active);
fun isJoinable(t: Thread) = t._handle != NativeThreadHandle();
//! Test if the thread is joinable (e.g., still active)
fun isJoinable(t: Thread) = t._handle != NativeThreadHandle()

//! Wait for the thread to finish
//! This shouldn't be called from the same thread
fun join(t: @Thread) {
//assert(t isJoinable);
//assert((t getId) !== curThreadId);
var res = Impl.pthread_join(t._handle, null);
//assert(res == 0);
t._handle = NativeThreadHandle();
}
fun join(t: @Thread)
//assert(t isJoinable)
//assert((t getId) !== curThreadId)
var res = _Impl.pthread_join(t._handle, null)
//assert(res == 0)
t._handle = NativeThreadHandle()

//! Detaches the given thread object from the actual thread of execution
//! The actual thread may continue to execute after this call
fun detach(t: @Thread) {
//assert(t isJoinable);
var res = Impl.pthread_detach(t._handle);
//assert(res == 0);
t._handle = NativeThreadHandle();
}
fun detach(t: @Thread)
//assert(t isJoinable)
var res = _Impl.pthread_detach(t._handle)
//assert(res == 0)
t._handle = NativeThreadHandle()

//! Get the ID of the given thread
fun getId(t: Thread): ThreadId = ThreadId(t._handle);
//fun getId(t: Thread): ThreadId = ThreadId(t._handle)

//! Get the native handle of the given thread
fun getNativeHandle(t: Thread): NativeThreadHandle = t._handle;
//fun getNativeHandle(t: Thread): NativeThreadHandle = t._handle


//! Get the current thread id
fun curThreadId: ThreadId = ThreadId(NativeThreadHandle());
//fun curThreadId: ThreadId = ThreadId(NativeThreadHandle())

//! Tries to suspend the current thread to let other threads execute
fun yield = Impl.sched_yield();
fun yield = _Impl.sched_yield()

//! Sleep the given amount of milliseconds
fun sleep(durMs: UInt) {
var req = Impl.TimeSpec(durMs/1000, (durMs%1000)*1000000);
var rem = Impl.TimeSpec(0, 0);
Impl.nanosleep(req, rem);
}

package Impl {
using ThreadFun = FunctionPtr(@Byte rt, @Byte rt);

[initCtor] Closure(FType: Type) {
f: FType;

[static] fun threadFun(arg: @Byte): @Byte {
var self: @Closure = reinterpretCast(@Closure, arg);
self.f();
self delete;
return null;
}

[static] fun getFunPtr: ThreadFun = \threadFun;
}
fun startThread(f: AnyType): NativeThreadHandle {
using ClosureType = Closure(typeOf(f));
var closurePtr: @ClosureType = new(ClosureType, f);
var handle: NativeThreadHandle;
var arg: @Byte = reinterpretCast(@Byte, closurePtr);
var status = pthread_create(handle, null, ClosureType.getFunPtr, arg);
return handle;
}

[initCtor] datatype TimeSpec { var tv_sec, tv_nsec: Long; }

[native("pthread_create")] fun pthread_create(handle: @NativeThreadHandle, attr: @Byte, f: ThreadFun, arg: @Byte): Int;
[native("pthread_join")] fun pthread_join(handle: NativeThreadHandle, valPtr: @ @Byte): Int;
[native("pthread_detach")] fun pthread_detach(handle: NativeThreadHandle): Int;
[native("pthread_self")] fun pthread_self: NativeThreadHandle;

[native("pthread_threadid_np")] fun pthread_threadid_np(t: NativeThreadHandle, res: @ULong): Int;

[native("sched_yield")] fun sched_yield;
[native("nanosleep")] fun nanosleep(req, rem: @TimeSpec): Int;
}
fun sleep(durMs: UInt)
var req = _Impl.TimeSpec(durMs/1000, (durMs%1000)*1000000)
var rem = _Impl.TimeSpec(0, 0)
_Impl.nanosleep(req, rem)

package _Impl
using ThreadFun = FunctionPtr(@Byte rt, @Byte rt)

[initCtor]
datatype Closure(FType: Type)
f: FType

fun threadFun(closureType: Type, arg: @Byte): @Byte
var self: @closureType = reinterpretCast(@closureType, arg)
self.f()
self delete
return null

fun getFunPtr(closureType: Type): ThreadFun = \threadFun(closureType, null)

fun startThread(f: AnyType): NativeThreadHandle
using ClosureType = Closure(typeOf(f))
var closurePtr: @ClosureType = new(ClosureType, f)
var handle: NativeThreadHandle
var arg: @Byte = reinterpretCast(@Byte, closurePtr)
var status = pthread_create(handle, null, getFunPtr(ClosureType), arg)
return handle

[initCtor] datatype TimeSpec
tv_sec, tv_nsec: Long

[ct] if platformName == "Darwin"
[native("pthread_create")] fun pthread_create(handle: @NativeThreadHandle, attr: @Byte, f: ThreadFun, arg: @Byte): Int;
[native("pthread_join")] fun pthread_join(handle: NativeThreadHandle, valPtr: @ @Byte): Int;
[native("pthread_detach")] fun pthread_detach(handle: NativeThreadHandle): Int;
[native("pthread_self")] fun pthread_self: NativeThreadHandle;

[native("pthread_threadid_np")] fun pthread_threadid_np(t: NativeThreadHandle, res: @ULong): Int;

[native("sched_yield")] fun sched_yield;
[native("nanosleep")] fun nanosleep(req, rem: @TimeSpec): Int;
else
[native("pthread_create")] fun pthread_create(handle: @NativeThreadHandle, attr: @Byte, f: ThreadFun, arg: @Byte): Int;
[native("pthread_join")] fun pthread_join(handle: NativeThreadHandle, valPtr: @ @Byte): Int;
[native("pthread_detach")] fun pthread_detach(handle: NativeThreadHandle): Int;
[native("pthread_self")] fun pthread_self: NativeThreadHandle;

[native("sched_yield")] fun sched_yield;
[native("nanosleep")] fun nanosleep(req, rem: @TimeSpec): Int;
1 change: 1 addition & 0 deletions src/LLVMBackend/Generator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ void generateNativeObjGCC(
args.insert(args.end(), s.linkerArgs_.begin(), s.linkerArgs_.end());
args.insert(args.end(), {inputFilename, "-o", outputFilename});
args.emplace_back("-lm"); // link against the math library
args.emplace_back("-pthread");
runCmd(args);
}
} // namespace
Expand Down
Loading

0 comments on commit 718b61b

Please sign in to comment.