diff --git a/.travis/install.sh b/.travis/install.sh index 288a2026..2bb288b1 100755 --- a/.travis/install.sh +++ b/.travis/install.sh @@ -11,6 +11,6 @@ echo "LC_ALL=$LC_ALL" if [ $TRAVIS_OS_NAME == osx ]; then # brew update - brew upgrade boost - brew install llvm + # brew upgrade boost + brew install llvm@7 fi diff --git a/.travis/run.sh b/.travis/run.sh index 839c104b..a89b23d2 100755 --- a/.travis/run.sh +++ b/.travis/run.sh @@ -6,7 +6,7 @@ set -x CFG_PARAMS= if [ $TRAVIS_OS_NAME == osx ]; then - CFG_PARAMS="-DLLVM_DIR=/usr/local/opt/llvm/lib/cmake/llvm" + CFG_PARAMS="-DLLVM_DIR=/usr/local/opt/llvm@7/lib/cmake/llvm" fi if [ $TRAVIS_OS_NAME == linux ]; then diff --git a/SparrowImplicitLib/par/atomic.llvm b/SparrowImplicitLib/par/atomic.llvm index 1c3c9f7f..e185e93c 100644 --- a/SparrowImplicitLib/par/atomic.llvm +++ b/SparrowImplicitLib/par/atomic.llvm @@ -1,5 +1,5 @@ define i32 @_Atomic_load32(i32* %atomic) { - %1 = load atomic i32* %atomic seq_cst, align 4 + %1 = load atomic i32, i32* %atomic seq_cst, align 4 ret i32 %1 } @@ -24,7 +24,7 @@ define i32 @_Atomic_fetchAndAdd32(i32* %atomic, i32 %val) { define i64 @_Atomic_load64(i64* %atomic) { - %1 = load atomic i64* %atomic seq_cst, align 8 + %1 = load atomic i64, i64* %atomic seq_cst, align 8 ret i64 %1 } diff --git a/SparrowImplicitLib/par/atomic.spr b/SparrowImplicitLib/par/atomic.spr index 1692c671..709a98e7 100644 --- a/SparrowImplicitLib/par/atomic.spr +++ b/SparrowImplicitLib/par/atomic.spr @@ -1,143 +1,102 @@ -module par.atomic; +module par.atomic + +import config +import "atomic.llvm" + +using AtomicInt = Atomic(Int) +using AtomicLong = Atomic(Long) + +concept AtomicType(x) if x._IsAtomicType +concept AtomicInteger(x) if x._IsAtomicType && Integer(#$x.ValueType) +concept NonStdAtomicType(x) if x._IsAtomicType && x.ValueType != x._UnderlyingType + +datatype Atomic(T: Type) if sizeOf(T) <= 8 + using _IsAtomicType = true + using ValueType = T + using _UnderlyingType = _Impl.AtomicTypeTraits(T)._UnderlyingType + + _value: _UnderlyingType + +[protected] + fun ctor(this: @Atomic, v: this.ValueType) + _value ctor _toUnderlying(this, v) + + fun =(lhs: @AtomicType, rhs: typeOf(lhs)) + lhs store (rhs load) + //! Stores a value inside the given atomic, using the assignment operator + fun =(this: @AtomicType, val: this.ValueType) = this store val + + //! Loads the value from an atomic; returns a non-atomic value + [native("_Atomic_load32")] fun load(this: @AtomicInt): Int; + [native("_Atomic_load64")] fun load(this: @AtomicLong): Long; + fun load(this: @NonStdAtomicType): ValueType \ + = this._fromUnderlying(this._baseAtomic load) + + //! Stores a value inside the given atomic + [native("_Atomic_store32")] fun store(this: @AtomicInt, newVal: Int); + [native("_Atomic_store64")] fun store(this: @AtomicLong, newVal: Long); + fun store(this: @NonStdAtomicType, newVal: AnyType) + this._baseAtomic store this._toUnderlying(newVal) + + //! Fetches the current value of an atomic, and stores a new value in the atomic + [native("_Atomic_fetchAndStore32")] fun fetchAndStore(x: @AtomicInt, newVal: Int): Int; + [native("_Atomic_fetchAndStore64")] fun fetchAndStore(x: @AtomicLong, newVal: Long): Long; + fun fetchAndStore(x: @NonStdAtomicType, newVal: AnyType): x.ValueType \ + = x._fromUnderlying(x._baseAtomic fetchAndStore x._toUnderlying(newVal)) + + //! Fetch the value of the atomic, and then add the given value to it + [native("_Atomic_fetchAndAdd32")] fun fetchAndAdd(x: @AtomicInt, val: Int): Int; + [native("_Atomic_fetchAndAdd64")] fun fetchAndAdd(x: @AtomicLong, val: Long): Long; + fun fetchAndAdd(x: @NonStdAtomicType, val: AnyType): x.ValueType \ + = x._fromUnderlying(x._baseAtomic fetchAndAdd x._toUnderlying(val)) \ + if Integer(#$x.ValueType) + + //! Fetch the value of the atomic and the increment it + fun fetchAndIncrement(x: @AtomicInteger) = x fetchAndAdd 1 + //! Fetch the value of the atomic and the decrement it + fun fetchAndDecrement(x: @AtomicInteger) = x fetchAndAdd -1 + + //! Adds the given value to the atomic + fun += (x: @AtomicInteger, val: AnyType) { x fetchAndAdd val; } + //! Subtracts the given value to the atomic + fun -= (x: @AtomicInteger, val: AnyType) { x fetchAndAdd (-val); } + + //! Increment the atomic value; returns the new value + fun pre_++(x: @AtomicInteger): x.ValueType = (x fetchAndIncrement)+1 + //! Increment the atomic value; returns the old value + fun post_++(x: @AtomicInteger): x.ValueType = (x fetchAndIncrement) + + //! Decrement the atomic value; returns the new value + fun pre_--(x: @AtomicInteger): x.ValueType = (x fetchAndDecrement)-1 + //! Decrement the atomic value; returns the old value + fun post_--(x: @AtomicInteger): x.ValueType = (x fetchAndDecrement) + + fun _toUnderlying(this: @Atomic, val: this.ValueType): this._UnderlyingType + var res: _UnderlyingType + reinterpretCast(@ValueType, res) = val + return res + fun _fromUnderlying(this: @Atomic, val: this._UnderlyingType): this.ValueType + var res: ValueType + res = reinterpretCast(@ValueType, val) + return res + fun _baseAtomic(this: @Atomic): @Atomic(_UnderlyingType) = reinterpretCast(@Atomic(_UnderlyingType), this) -import config; -import "atomic.llvm"; -[initCtor] datatype Atomic(T: Type) if T == Int { - using _IsAtomicType = true; - using ValueType = Int; - - //! The value for this atomic - _value: ValueType; -} - -[initCtor] datatype Atomic(T: Type) if T == Long { - using _IsAtomicType = true; - using ValueType = Long; - - //! The value for this atomic - _value: ValueType; -} - -datatype Atomic(T: Type) if T != Int && T != Long && sizeOf(T) <= 8 { - using _IsAtomicType = true; - - using ValueType = T; - - using _UnderlyingType = Impl.AtomicTypeTraits(T)._UnderlyingType; - using _BaseAtomicType = Impl.AtomicTypeTraits(T)._BaseAtomicType; - - //! The value for this atomic - _value: _UnderlyingType; - -} - -fun ctor(this: @Atomic, v: this.ValueType) - _value construct _toUnderlying(v); - -fun _toUnderlying(this: @Atomic, val: this.ValueType): this._UnderlyingType { - var res: _UnderlyingType; - reinterpretCast(@T, res) = val; - return res; -} -fun _fromUnderlying(this: @Atomic, val: this._UnderlyingType): T { - var res: ValueType; - res = reinterpretCast(@ValueType, val); - return res; -} -fun _baseAtomic(this: @Atomic): @_BaseAtomicType = reinterpretCast(@_BaseAtomicType, this); - -using AtomicInt = Atomic(Int); -using AtomicLong = Atomic(Long); - -concept AtomicType(x) if x._IsAtomicType; -concept AtomicInteger(x) if x._IsAtomicType && Integer(#$x.ValueType); -concept NonStdAtomicType(x) if x._IsAtomicType && x.ValueType != x._UnderlyingType; - -fun =(lhs: @AtomicType, rhs: AtomicType) { lhs store (rhs load); } +//! Compare the atomic value with the given comparand; if equal store 'newVal' and return true; if not equal returns false +[native("_Atomic_compareAndSwap32")] fun compareAndSwap(x: @AtomicInt, newVal, comparand: Int): Bool; +[native("_Atomic_compareAndSwap64")] fun compareAndSwap(x: @AtomicLong, newVal, comparand: Long): Bool; +fun compareAndSwap(x: @NonStdAtomicType, newVal, comparand: AnyType): Bool \ + = compareAndSwap(x._baseAtomic, x._toUnderlying(newVal), x._toUnderlying(comparand)) //! Converts a regular value to an atomic value //! Returns the reference to the same memory address //! Used to make it possible to use atomic operations -fun asAtomic(val: @AnyType): @Atomic(typeOf(val)) \ - = reinterpretCast(@Atomic(typeOf(val)), val) \ - if sizeOf(val) == sizeOf(Atomic(typeOf(val))); - -//! Loads the value from an atomic; returns a non-atomic value -[native("_Atomic_load32")] fun load(x: @AtomicInt): Int; -[native("_Atomic_load64")] fun load(x: @AtomicLong): Long; -fun load(x: @NonStdAtomicType): x.ValueType \ - = x._fromUnderlying(x._baseAtomic load); - -//! Stores a value inside the given atomic -[native("_Atomic_store32")] fun store(x: @AtomicInt, newVal: Int); -[native("_Atomic_store64")] fun store(x: @AtomicLong, newVal: Long); -fun store(x: @NonStdAtomicType, newVal: AnyType) - { x._baseAtomic store x._toUnderlying(newVal); } - -//! Stores a value inside the given atomic, using the assignment operator -fun =(x: @AtomicType, val: AnyType) = store(x, val); - -//! Fetches the current value of an atomic, and stores a new value in the atomic -[native("_Atomic_fetchAndStore32")] fun fetchAndStore(x: @AtomicInt, newVal: Int): Int; -[native("_Atomic_fetchAndStore64")] fun fetchAndStore(x: @AtomicLong, newVal: Long): Long; -fun fetchAndStore(x: @NonStdAtomicType, newVal: AnyType): x.ValueType \ - = x._fromUnderlying(x._baseAtomic fetchAndStore x._toUnderlying(newVal)); +fun asAtomic(val: @AnyType): @Atomic(-@typeOf(val)) if sizeOf(val) == sizeOf(Atomic(-@typeOf(val))) + return reinterpretCast(@Atomic(-@typeOf(val)), val) -//! Compare the atomic value with the given comparand; if equal store 'newVal' and return true; if not equal returns false -[native("_Atomic_compareAndSwap32")] fun compareAndSwap(x: @AtomicInt, newVal, comparand: Int): Bool; -[native("_Atomic_compareAndSwap64")] fun compareAndSwap(x: @AtomicLong, newVal, comparand: Long): Bool; -fun compareAndSwap(x: @NonStdAtomicType, newVal, comparand: AnyType): Bool \ - = compareAndSwap(x._baseAtomic, x._toUnderlying(newVal), x._toUnderlying(comparand)); - -//! Fetch the value of the atomic, and then add the given value to it -[native("_Atomic_fetchAndAdd32")] fun fetchAndAdd(x: @AtomicInt, val: Int): Int; -[native("_Atomic_fetchAndAdd64")] fun fetchAndAdd(x: @AtomicLong, val: Long): Long; -fun fetchAndAdd(x: @NonStdAtomicType, val: AnyType): x.ValueType \ - = x._fromUnderlying(x._baseAtomic fetchAndAdd x._toUnderlying(val)) \ - if Integer(#$x.ValueType); - -//! Fetch the value of the atomic and the increment it -fun fetchAndIncrement(x: @AtomicInteger) = x fetchAndAdd 1; -//! Fetch the value of the atomic and the decrement it -fun fetchAndDecrement(x: @AtomicInteger) = x fetchAndAdd -1; - -//! Adds the given value to the atomic -fun += (x: @AtomicInteger, val: AnyType) { x fetchAndAdd val; } -//! Subtracts the given value to the atomic -fun -= (x: @AtomicInteger, val: AnyType) { x fetchAndAdd (-val); } - -//! Increment the atomic value; returns the new value -fun pre_++(x: @AtomicInteger): x.ValueType = (x fetchAndIncrement)+1; -//! Increment the atomic value; returns the old value -fun post_++(x: @AtomicInteger): x.ValueType = (x fetchAndIncrement); - -//! Decrement the atomic value; returns the new value -fun pre_--(x: @AtomicInteger): x.ValueType = (x fetchAndDecrement)-1; -//! Decrement the atomic value; returns the old value -fun post_--(x: @AtomicInteger): x.ValueType = (x fetchAndDecrement); - -package Impl { - datatype AtomicTypeTraits(T: Type) if sizeOf(T) <= 4 { - using _UnderlyingType = Int; - using _BaseAtomicType = Atomic(Int); - } - datatype AtomicTypeTraits(T: Type) if sizeOf(T) == 8 { - using _UnderlyingType = Long; - using _BaseAtomicType = Atomic(Long); - } - - fun packAtomic(x: @NonStdAtomicType): @AtomicTypeTraits(x.ValueType) \ - = reinterpretCast(@AtomicTypeTraits(x.ValueType), x); - - fun pack(x: @NonStdAtomicType, val: AnyType): AtomicTypeTraits(x.ValueType)._UnderlyingType { - var res: AtomicTypeTraits(x.ValueType)._UnderlyingType; - reinterpretCast(@AtomicTypeTraits(x.ValueType)._UnderlyingType, res) = val; - return res; - } - fun unpack(x: @NonStdAtomicType, val: AnyType): x.ValueType { - var res: x.ValueType; - res = reinterpretCast(@AtomicTypeTraits(x.ValueType)._UnderlyingType, val); - return res; - } -} \ No newline at end of file + +package _Impl + datatype AtomicTypeTraits(T: Type) if sizeOf(T) <= 4 + using _UnderlyingType = Int + datatype AtomicTypeTraits(T: Type) if sizeOf(T) == 8 + using _UnderlyingType = Long diff --git a/SparrowImplicitLib/par/config.spr b/SparrowImplicitLib/par/config.spr index addf9d57..5fe4e528 100644 --- a/SparrowImplicitLib/par/config.spr +++ b/SparrowImplicitLib/par/config.spr @@ -1,32 +1,27 @@ -module par.config; +module par.config -import std.ptr; - -package Impl -{ - using _SC_NPROCESSORS_ONLN = 58; - - [native("sysconf")] fun sysconf(name: Int): Long; -} - -datatype NativeThreadHandle = Byte Ptr; // Opaque type +import std.ptr +datatype NativeThreadHandle = Byte Ptr // Opaque type fun >>(h: NativeThreadHandle, os: @OutStream) -{ if ( h.impl.isSet ) - os << mkStreamRefWrapper(h.impl.get); + os << mkStreamRefWrapper(h.impl.get) else - os << "null"; -} + os << "null" -using InvalidThreadHandle = NativeThreadHandle(); +using InvalidThreadHandle = NativeThreadHandle() //! Get the number of available logical CPU cores for our process //! This dictates how much parallelism we have to be exploit -fun getAvailableCoresNum(): UInt { - var maxProcs: Long = Impl.sysconf(Impl._SC_NPROCESSORS_ONLN); - return ife(maxProcs<1, UInt(1), UInt(maxProcs)); +fun getAvailableCoresNum(): UInt + var maxProcs: Long = _Impl.sysconf(_Impl._SC_NPROCESSORS_ONLN) + return ife(maxProcs<1, UInt(1), UInt(maxProcs)) // TODO: also consider process affinity -} \ No newline at end of file + +package _Impl + using _SC_NPROCESSORS_ONLN = 58 + + [native("sysconf")] fun sysconf(name: Int): Long + diff --git a/SparrowImplicitLib/par/locks.spr b/SparrowImplicitLib/par/locks.spr index 8042580d..0f6d5448 100644 --- a/SparrowImplicitLib/par/locks.spr +++ b/SparrowImplicitLib/par/locks.spr @@ -1,75 +1,69 @@ -module par.locks; +module par.locks -import config; -import std.staticArray; +import config +import std.staticArray //! Concept the is satisfied for the "mutexes" that can be locked and unlocked -concept Lockable(x) - if isValid(x lock) - && isValid(x unlock) - ; +concept Lockable(x) if isValid(x lock) && isValid(x unlock) //! Special Lockable, that also supports the "trylock" operation -concept TryLockable(x) - if Lockable(x) - && isValid(x tryLock) - ; +concept TryLockable(x) if Lockable(x) && isValid(x tryLock) //! Class used to take the lock for the duration of a scope //! Can be used with any lockable object datatype ScopedLock(T: Type) if Lockable(#$T) - using LockType = T; + using LockType = T - _theLock: @LockType; + _theLock: @LockType fun ctor(this: @ScopedLock, theLock: @this.LockType) - this._theLock := theLock; - _theLock lock; + this._theLock := theLock + _theLock lock fun dtor(this: @ScopedLock) - _theLock unlock; + _theLock unlock //! Class used to try to take the lock for the duration of a scope //! To test if the lock was taken, use the 'isLocked' function datatype ScopedTryLock(T: Type) if TryLockable(#$T) - using LockType = T; + using LockType = T - _theLock: @LockType; - _lockSucceeded: Bool; + _theLock: @LockType + _lockSucceeded: Bool - fun isLocked = _lockSucceeded; fun ctor(this: @ScopedTryLock, theLock: @this.LockType) - this._theLock := theLock; - this._lockSucceeded = (_theLock tryLock); + this._theLock := theLock + this._lockSucceeded = (_theLock tryLock) fun dtor(this: @ScopedTryLock) - if ( _lockSucceeded ) - _theLock unlock; + if _lockSucceeded + _theLock unlock + +fun isLocked(this: @ScopedTryLock) = _lockSucceeded //! A simple and fast mutual-exclusion lockable datatype Mutex - handle: Impl.PThreadMutex; + handle: Impl.PThreadMutex fun ctor(this: @Mutex) - handle.sig = Impl.PTHREAD_MUTEX_SIG_INIT; + handle.sig = Impl.PTHREAD_MUTEX_SIG_INIT fun dtor(this: @Mutex) - Impl.pthread_mutex_destroy(handle); + Impl.pthread_mutex_destroy(handle) -fun >> (m: @Mutex, os: @OutStream) { - cout << "Mutex(" << mkStreamRefWrapper(m) << ", " << m.handle.sig << ")"; -} +fun >> (m: @Mutex, os: @OutStream) + cout << "Mutex(" << mkStreamRefWrapper(m) << ", " << m.handle.sig << ")" //! A mutual-exclusion lockable that is able to support recursive locking datatype RecMutex - handle: Impl.PThreadMutex; + handle: Impl.PThreadMutex fun ctor(this: @RecMutex) - handle.sig = Impl.PTHREAD_RECURSIVE_MUTEX_SIG_INIT; + handle.sig = Impl.PTHREAD_RECURSIVE_MUTEX_SIG_INIT fun dtor(this: @RecMutex) - Impl.pthread_mutex_destroy(handle); + Impl.pthread_mutex_destroy(handle) //! Locks a lockable; if the lockable is already locked this call blocks until it's unblocked by the other party fun lock(mutex: @Mutex) { var res = Impl.pthread_mutex_lock(mutex.handle); /*assert(res == 0);*/ } @@ -83,19 +77,17 @@ fun tryLock(mutex: @RecMutex): Bool { var res = Impl.pthread_mutex_trylock(mutex fun unlock(mutex: @Mutex) { var res = Impl.pthread_mutex_unlock(mutex.handle); /*assert(res == 0);*/ } fun unlock(mutex: @RecMutex) { var res = Impl.pthread_mutex_unlock(mutex.handle); /*assert(res == 0);*/ } -package Impl { - using PTHREAD_MUTEX_SIZE = 56; - using PTHREAD_MUTEX_SIG_INIT = 0x32AAABA7; - using PTHREAD_RECURSIVE_MUTEX_SIG_INIT = 0x32AAABA2; +package Impl + using PTHREAD_MUTEX_SIZE = 56 + using PTHREAD_MUTEX_SIG_INIT = 0x32AAABA7 + using PTHREAD_RECURSIVE_MUTEX_SIG_INIT = 0x32AAABA2 - datatype PThreadMutex { - sig: Long; - opaque: StaticArray(Byte, PTHREAD_MUTEX_SIZE); - } + datatype PThreadMutex + sig: Long + opaque: StaticArray(Byte, PTHREAD_MUTEX_SIZE) [native("pthread_mutex_init")] fun pthread_mutex_init(mutex: @PThreadMutex, attr: @Byte): Int; [native("pthread_mutex_lock")] fun pthread_mutex_lock(mutex: @PThreadMutex): Int; [native("pthread_mutex_trylock")] fun pthread_mutex_trylock(mutex: @PThreadMutex): Int; [native("pthread_mutex_unlock")] fun pthread_mutex_unlock(mutex: @PThreadMutex): Int; [native("pthread_mutex_destroy")] fun pthread_mutex_destroy(mutex: @PThreadMutex): Int; -} diff --git a/SparrowImplicitLib/par/parFor.spr b/SparrowImplicitLib/par/parFor.spr index 48cd893b..211ba844 100644 --- a/SparrowImplicitLib/par/parFor.spr +++ b/SparrowImplicitLib/par/parFor.spr @@ -1,118 +1,112 @@ -module par.parFor; +module par.parFor -import config; -import tasks = par.tasks; -import std.ranges, std.tuple, std.newDelete; -import bitOper; +import config +import tasks = par.tasks +import std.ranges, std.tuple, std.newDelete +import bitOper -concept Partitioner(x) - if isValid(x.RangeType) - && isValid(x.partStart()) +concept Partitioner(x) \ + if isValid(x.RangeType) \ + && isValid(x.partStart()) \ && isValid(x.partition(valueOfType(x.RangeType), 0)) - ; + fun parFor(r: Range, f: AnyType) if isValid(f(r.front())) - { parFor(r, f, DefaultPartitioner(typeOf(r))()); } - -fun parFor(r: Range, f: AnyType, part: @Partitioner) - if isValid(f(r.front())) -{ - // Tell the partitioner that we are starting work - part partStart; - - // Create a root task and wait until it's executed - var range: part.RangeType = r; - tasks.spawnRootAndWait(Impl.ParForTask(range, f, part, 0)); -} - -datatype DefaultPartitioner(rangeType: Type) { - using RangeType = BoundedRange(rangeType); - - _maxDepth: Int; - - fun partStart { - var p: Int = getAvailableCoresNum; - var depth = 1; - while 2 !<>, description); -import Impl = tasksImpl.tasksMain(initTaskBasic, initAsRoot, initAsChildOf, doSpawn, doSpawnAndWait); -import tp = tasksImpl.taskPrefix; -import tasksImpl.debug; -import schedulerIf = tasksImpl.schedulerIf; +[public] import tasksImpl.taskType(TaskType, TaskPrefix, getPrefix) +[public] import tasksImpl.emptyTask(EmptyTask, execute, >>, description) +import Impl = tasksImpl.tasksMain(initTaskBasic, initAsRoot, initAsChildOf, doSpawn, doSpawnAndWait) +import tp = tasksImpl.taskPrefix +//import tasksImpl.debug +import tasksImpl.workerImpl +import schedulerIf = tasksImpl.schedulerIf -fun spawnRootAndWait(task: @TaskType) { - [ct] if ( isValidAndTrue(tasksImpl.traceCalls) && isValid(cout << task) ) - cout << "spawnRootAndWait(" << task << ")" << endl; +fun spawnRootAndWait(task: @TaskType) + [ct] if isValidAndTrue(tasksImpl.traceCalls) && isValid(cout << task) + cout << "spawnRootAndWait(" << task << ")" << endl - Impl.initAsRoot(task); + Impl.initAsRoot(task) - schedulerIf.spawnRootAndWait(getPrefix(task)); -} + schedulerIf.spawnRootAndWait(getPrefix(task)) -fun spawn(curTask, childTask: @TaskType) { - getPrefix(curTask) addRefCount 1; - Impl.doSpawn(curTask, childTask); -} -fun spawn(curTask, t1, t2: @TaskType) { - getPrefix(curTask) addRefCount 2; - Impl.doSpawn(curTask, t2); - Impl.doSpawn(curTask, t1); -} -fun spawn(curTask, t1, t2, t3: @TaskType) { - getPrefix(curTask) addRefCount 3; - Impl.doSpawn(curTask, t3); - Impl.doSpawn(curTask, t2); - Impl.doSpawn(curTask, t1); -} -fun spawn(curTask, t1, t2, t3, t4: @TaskType) { - getPrefix(curTask) addRefCount 4; - Impl.doSpawn(curTask, t4); - Impl.doSpawn(curTask, t3); - Impl.doSpawn(curTask, t2); - Impl.doSpawn(curTask, t1); -} -fun spawn(curTask, t1, t2, t3, t4, t5: @TaskType) { - getPrefix(curTask) addRefCount 5; - Impl.doSpawn(curTask, t5); - Impl.doSpawn(curTask, t4); - Impl.doSpawn(curTask, t3); - Impl.doSpawn(curTask, t2); - Impl.doSpawn(curTask, t1); -} -fun spawn(curTask, t1, t2, t3, t4, t5, t6: @TaskType) { - getPrefix(curTask) addRefCount 6; - Impl.doSpawn(curTask, t6); - Impl.doSpawn(curTask, t5); - Impl.doSpawn(curTask, t4); - Impl.doSpawn(curTask, t3); - Impl.doSpawn(curTask, t2); - Impl.doSpawn(curTask, t1); -} -fun spawn(curTask, t1, t2, t3, t4, t5, t6, t7: @TaskType) { - getPrefix(curTask) addRefCount 7; - Impl.doSpawn(curTask, t7); - Impl.doSpawn(curTask, t6); - Impl.doSpawn(curTask, t5); - Impl.doSpawn(curTask, t4); - Impl.doSpawn(curTask, t3); - Impl.doSpawn(curTask, t2); - Impl.doSpawn(curTask, t1); -} +fun spawn(curTask, childTask: @TaskType) + getPrefix(curTask) addRefCount 1 + Impl.doSpawn(curTask, childTask) +fun spawn(curTask, t1, t2: @TaskType) + getPrefix(curTask) addRefCount 2 + Impl.doSpawn(curTask, t2) + Impl.doSpawn(curTask, t1) +fun spawn(curTask, t1, t2, t3: @TaskType) + getPrefix(curTask) addRefCount 3 + Impl.doSpawn(curTask, t3) + Impl.doSpawn(curTask, t2) + Impl.doSpawn(curTask, t1) +fun spawn(curTask, t1, t2, t3, t4: @TaskType) + getPrefix(curTask) addRefCount 4 + Impl.doSpawn(curTask, t4) + Impl.doSpawn(curTask, t3) + Impl.doSpawn(curTask, t2) + Impl.doSpawn(curTask, t1) +fun spawn(curTask, t1, t2, t3, t4, t5: @TaskType) + getPrefix(curTask) addRefCount 5 + Impl.doSpawn(curTask, t5) + Impl.doSpawn(curTask, t4) + Impl.doSpawn(curTask, t3) + Impl.doSpawn(curTask, t2) + Impl.doSpawn(curTask, t1) +fun spawn(curTask, t1, t2, t3, t4, t5, t6: @TaskType) + getPrefix(curTask) addRefCount 6 + Impl.doSpawn(curTask, t6) + Impl.doSpawn(curTask, t5) + Impl.doSpawn(curTask, t4) + Impl.doSpawn(curTask, t3) + Impl.doSpawn(curTask, t2) + Impl.doSpawn(curTask, t1) +fun spawn(curTask, t1, t2, t3, t4, t5, t6, t7: @TaskType) + getPrefix(curTask) addRefCount 7 + Impl.doSpawn(curTask, t7) + Impl.doSpawn(curTask, t6) + Impl.doSpawn(curTask, t5) + Impl.doSpawn(curTask, t4) + Impl.doSpawn(curTask, t3) + Impl.doSpawn(curTask, t2) + Impl.doSpawn(curTask, t1) -fun wait(curTask: @TaskType) { - getPrefix(curTask) addRefCount 1; - schedulerIf.waitForAll(getPrefix(curTask)); -} +fun wait(curTask: @TaskType) + getPrefix(curTask) addRefCount 1 + schedulerIf.waitForAll(getPrefix(curTask)) -fun spawnAndWait(curTask, t1: @TaskType) { - getPrefix(curTask) addRefCount 2; - Impl.doSpawnAndWait(curTask, t1); -} -fun spawnAndWait(curTask, t1, t2: @TaskType) { - getPrefix(curTask) addRefCount 3; - Impl.doSpawn(curTask, t2); - Impl.doSpawnAndWait(curTask, t1); -} -fun spawnAndWait(curTask, t1, t2, t3: @TaskType) { - getPrefix(curTask) addRefCount 4; - Impl.doSpawn(curTask, t3); - Impl.doSpawn(curTask, t2); - Impl.doSpawnAndWait(curTask, t1); -} -fun spawnAndWait(curTask, t1, t2, t3, t4: @TaskType) { - getPrefix(curTask) addRefCount 5; - Impl.doSpawn(curTask, t4); - Impl.doSpawn(curTask, t3); - Impl.doSpawn(curTask, t2); - Impl.doSpawnAndWait(curTask, t1); -} -fun spawnAndWait(curTask, t1, t2, t3, t4, t5: @TaskType) { - getPrefix(curTask) addRefCount 6; - Impl.doSpawn(curTask, t5); - Impl.doSpawn(curTask, t4); - Impl.doSpawn(curTask, t3); - Impl.doSpawn(curTask, t2); - Impl.doSpawnAndWait(curTask, t1); -} -fun spawnAndWait(curTask, t1, t2, t3, t4, t5, t6: @TaskType) { - getPrefix(curTask) addRefCount 7; - Impl.doSpawn(curTask, t6); - Impl.doSpawn(curTask, t5); - Impl.doSpawn(curTask, t4); - Impl.doSpawn(curTask, t3); - Impl.doSpawn(curTask, t2); - Impl.doSpawnAndWait(curTask, t1); -} -fun spawnAndWait(curTask, t1, t2, t3, t4, t5, t6, t7: @TaskType) { - getPrefix(curTask) addRefCount 8; - Impl.doSpawn(curTask, t7); - Impl.doSpawn(curTask, t6); - Impl.doSpawn(curTask, t5); - Impl.doSpawn(curTask, t4); - Impl.doSpawn(curTask, t3); - Impl.doSpawn(curTask, t2); - Impl.doSpawnAndWait(curTask, t1); -} +fun spawnAndWait(curTask, t1: @TaskType) + getPrefix(curTask) addRefCount 2 + Impl.doSpawnAndWait(curTask, t1) +fun spawnAndWait(curTask, t1, t2: @TaskType) + getPrefix(curTask) addRefCount 3 + Impl.doSpawn(curTask, t2) + Impl.doSpawnAndWait(curTask, t1) +fun spawnAndWait(curTask, t1, t2, t3: @TaskType) + getPrefix(curTask) addRefCount 4 + Impl.doSpawn(curTask, t3) + Impl.doSpawn(curTask, t2) + Impl.doSpawnAndWait(curTask, t1) +fun spawnAndWait(curTask, t1, t2, t3, t4: @TaskType) + getPrefix(curTask) addRefCount 5 + Impl.doSpawn(curTask, t4) + Impl.doSpawn(curTask, t3) + Impl.doSpawn(curTask, t2) + Impl.doSpawnAndWait(curTask, t1) +fun spawnAndWait(curTask, t1, t2, t3, t4, t5: @TaskType) + getPrefix(curTask) addRefCount 6 + Impl.doSpawn(curTask, t5) + Impl.doSpawn(curTask, t4) + Impl.doSpawn(curTask, t3) + Impl.doSpawn(curTask, t2) + Impl.doSpawnAndWait(curTask, t1) +fun spawnAndWait(curTask, t1, t2, t3, t4, t5, t6: @TaskType) + getPrefix(curTask) addRefCount 7 + Impl.doSpawn(curTask, t6) + Impl.doSpawn(curTask, t5) + Impl.doSpawn(curTask, t4) + Impl.doSpawn(curTask, t3) + Impl.doSpawn(curTask, t2) + Impl.doSpawnAndWait(curTask, t1) +fun spawnAndWait(curTask, t1, t2, t3, t4, t5, t6, t7: @TaskType) + getPrefix(curTask) addRefCount 8 + Impl.doSpawn(curTask, t7) + Impl.doSpawn(curTask, t6) + Impl.doSpawn(curTask, t5) + Impl.doSpawn(curTask, t4) + Impl.doSpawn(curTask, t3) + Impl.doSpawn(curTask, t2) + Impl.doSpawnAndWait(curTask, t1) -fun setContinuation(curTask, cont: @TaskType) { - Impl.initTaskBasic(cont); +fun setContinuation(curTask, cont: @TaskType) + Impl.initTaskBasic(cont) // The continuation has the same parent, depth & worker as the current task - getPrefix(cont).parent = getPrefix(curTask).parent; - getPrefix(cont).depth = getPrefix(curTask).depth; - getPrefix(cont).worker = getPrefix(curTask).worker; + getPrefix(cont).parent = getPrefix(curTask).parent + getPrefix(cont).depth = getPrefix(curTask).depth + getPrefix(cont).worker = getPrefix(curTask).worker // The current task doesn't have a valid parent anymore - getPrefix(curTask).parent reset; + getPrefix(curTask).parent reset // Chain the continuation to the current task - //assert(getPrefix(curTask).cont isNull); - getPrefix(curTask).cont = getPrefix(cont); -} + //assert(getPrefix(curTask).cont isNull) + getPrefix(curTask).cont = getPrefix(cont) -fun enqueue(task: @TaskType) { - Impl.initAsRoot(task); - schedulerIf.enqueue(getPrefix(task)); -} +fun enqueue(task: @TaskType) + Impl.initAsRoot(task) + schedulerIf.enqueue(getPrefix(task)) diff --git a/SparrowImplicitLib/par/tasksImpl/debug.spr b/SparrowImplicitLib/par/tasksImpl/debug.spr index 73ce5dbb..8c6ad88a 100644 --- a/SparrowImplicitLib/par/tasksImpl/debug.spr +++ b/SparrowImplicitLib/par/tasksImpl/debug.spr @@ -1,38 +1,41 @@ -module par.tasksImpl.debug; +module par.tasksImpl.debug -import worker(Worker); -import par.locks; -import std.string; +import par.locks +import std.string -//using traceCalls = true; +using traceCalls = false -using TraceLockType = Mutex; +using TraceLockType = Mutex -var traceMutex: TraceLockType; +var traceMutex: TraceLockType //! Helper class that does the trace while holding a mutex //! Prevents the output to be messed up datatype Tracer - ; [protected] fun ctor(this: @Tracer) - traceMutex lock; + [ct] if traceCalls + traceMutex lock - fun ctor(this: @Tracer, worker: @Worker) - traceMutex lock; - cout << worker.workerId << ": " << Depth(worker.curDepth); + fun ctor(this: @Tracer, workerId: Int, depth: Int) + [ct] if traceCalls + traceMutex lock + cout << workerId << ": " << Depth(depth) fun dtor(this: @Tracer) - traceMutex unlock; + [ct] if traceCalls + traceMutex unlock - fun <<<(this: @Tracer, x: @AnyType) { cout << x; } + fun <<<(this: @Tracer, x: @AnyType) + [ct] if traceCalls + cout << x [initCtor] datatype Depth - depth: Int; + depth: Int + +fun >> (d: Depth, os: @OutStream) + if d.depth > 0 + os << String(d.depth*2, ' '.char) -fun >> (d: Depth, os: @OutStream) { - if ( d.depth > 0 ) - os << String(d.depth*2, ' '.char); -} diff --git a/SparrowImplicitLib/par/tasksImpl/emptyTask.spr b/SparrowImplicitLib/par/tasksImpl/emptyTask.spr index 416400c8..2dde769d 100644 --- a/SparrowImplicitLib/par/tasksImpl/emptyTask.spr +++ b/SparrowImplicitLib/par/tasksImpl/emptyTask.spr @@ -1,7 +1,7 @@ -module par.tasksImpl.emptyTask; +module par.tasksImpl.emptyTask -import taskPrefix(TaskPrefix); -import std.string; +import taskPrefix(TaskPrefix) +import std.string //! An empty task; can be used for synchronization datatype EmptyTask @@ -9,8 +9,7 @@ datatype EmptyTask fun execute(task: @EmptyTask) {} -fun >>(t: EmptyTask, os: @OutStream) { - os << "EmptyTask"; -} +fun >>(t: EmptyTask, os: @OutStream) + os << "EmptyTask" -fun description(task: @EmptyTask): String = "EmptyTask"; +fun description(task: @EmptyTask): String = "EmptyTask" diff --git a/SparrowImplicitLib/par/tasksImpl/idleCounter.spr b/SparrowImplicitLib/par/tasksImpl/idleCounter.spr new file mode 100644 index 00000000..500eb3c2 --- /dev/null +++ b/SparrowImplicitLib/par/tasksImpl/idleCounter.spr @@ -0,0 +1,19 @@ +module par.tasksImpl.idleCounter + +import par.atomic + +//! Class used to keep count of the number of threads that go idle +datatype IdleCounter + cnt: Int Atomic + +//! Called by a worker thread that just decided to go IDLE +fun goIdle(this: @IdleCounter) + this.cnt++ +//! Called by a worker thread who just got up from IDLE state +fun wakeUp(this: @IdleCounter) + this.cnt-- + +//! Check if we have some workers that are in the IDLE state +fun hasIdle(this: @IdleCounter) = (this.cnt load) > 0 +//! Returns the number of workers that are in the IDLE state +fun numIdleWorkers(this: @IdleCounter) = this.cnt load diff --git a/SparrowImplicitLib/par/tasksImpl/internalWorkerThread.spr b/SparrowImplicitLib/par/tasksImpl/internalWorkerThread.spr deleted file mode 100644 index bf3a18d5..00000000 --- a/SparrowImplicitLib/par/tasksImpl/internalWorkerThread.spr +++ /dev/null @@ -1,17 +0,0 @@ -module par.tasksImpl.internalWorkerThread; - -import worker(Worker, setLocalWorker); -import taskPrefix(TaskPtr); -import scheduler(schedDoWait); -import std.ptr; - -//! Class that represents an internal worker thread -//! This tries to execute as much work as possible -//! If no work is available, this goes to sleep -[initCtor] datatype InternalWorkerThread - //! The worker associated with this thread - _worker: Worker Ptr; - -fun ()(this: @InternalWorkerThread) - setLocalWorker(_worker); - schedDoWait(_worker.get(), TaskPtr(), TaskPtr()); diff --git a/SparrowImplicitLib/par/tasksImpl/scheduler.spr b/SparrowImplicitLib/par/tasksImpl/scheduler.spr index 314b4f95..8d5ff810 100644 --- a/SparrowImplicitLib/par/tasksImpl/scheduler.spr +++ b/SparrowImplicitLib/par/tasksImpl/scheduler.spr @@ -1,102 +1,99 @@ -module par.tasksImpl.scheduler; - -import tasksMain(initAsRoot); -import taskType(TaskType); -import taskPrefix(TaskPtr, TaskPrefix); -import emptyTask(EmptyTask, execute); -import worker(Worker); -import taskQueue(TaskQueue); -//import debug; -import par.tls; -import std.ranges(..); +module par.tasksImpl.scheduler + +import tasksMain(initAsRoot) +import taskType(TaskType) +import taskPrefix(TaskPtr, TaskPrefix) +import emptyTask(EmptyTask, execute) +import workerImpl(Worker, worker, setWaitingWorker, waitingWorker/*, tracer*/) +//import debug(Tracer) +import taskQueue(TaskQueue, TasksPrinter) +import par.tls +import std.ranges(..) /////////////////////////////////////////////////////////////////////////////// // Main scheduler interface -fun schedSpawn(task: TaskPtr) { - var worker: @Worker = task.get() worker; +fun schedSpawn(task: TaskPtr) + var taskWorker: @Worker = worker(task.get()) - [ct] if ( isValidAndTrue(traceCalls) ) - Tracer(worker) << "schedSpawn(" << task.get() << ")" << endl; - task.get() assertValid; + [ct] if isValidAndTrue(traceCalls) + tracer(taskWorker) << "schedSpawn(" << task.get() << ")" << endl + task.get() assertValid // Announce new work in the task system - worker.taskSystem onTaskAvailable; + taskWorker.taskSystem onTaskAvailable // Push the task at the front of the local work queue - worker.work pushFront task.get(); -} + taskWorker.work pushFront task -fun schedWaitForAll(parent: TaskPtr, child: TaskPtr) { - var worker: @Worker = parent.get() worker; +fun schedWaitForAll(parent: TaskPtr, child: TaskPtr) + var taskWroker: @Worker = parent.get() worker - [ct] if ( isValidAndTrue(traceCalls) ) - Tracer(worker) << "schedWaitForAll(" << parent.get() << ", " << child.get() << ")" << endl; - if ( child isSet ) - child.get() assertValid; - parent.get() assertValid; + [ct] if isValidAndTrue(traceCalls) + tracer(taskWroker) << "schedWaitForAll(" << parent.get() << ", " << child.get() << ")" << endl + if child isSet + child.get() assertValid + parent.get() assertValid // Ensure the right depth is set in the worker - var oldDepth = worker.curDepth; - worker.curDepth = parent.get().depth; + var oldDepth = taskWroker.curDepth + taskWroker.curDepth = parent.get().depth - parent.get() setWaitingWorker worker; + parent.get() setWaitingWorker taskWroker - schedDoWait(worker, child, parent); + schedDoWait(taskWroker, child, parent) // Restore the old depth - worker.curDepth = oldDepth; -} + taskWroker.curDepth = oldDepth -fun schedSpawnRootAndWait(task: TaskPtr) { - var worker: @Worker = task.get() worker; +fun schedSpawnRootAndWait(task: TaskPtr) + var taskWroker: @Worker = task.get() worker - [ct] if ( isValidAndTrue(traceCalls) ) - Tracer(worker) << "schedSpawnRootAndWait(" << task.get() << ")" << endl; - task.get() assertValid; + [ct] if isValidAndTrue(traceCalls) + tracer(taskWroker) << "schedSpawnRootAndWait(" << task.get() << ")" << endl + task.get() assertValid // Ensure the right depth is set in the worker - var oldDepth = worker.curDepth; - worker.curDepth = 0; + var oldDepth = taskWroker.curDepth + taskWroker.curDepth = 0 // Create a dummy task to wait for - var waitingTask: EmptyTask; - initAsRoot(waitingTask); - waitingTask.prefix setRefCount 2; - task.get().parent = TaskPtr(waitingTask.prefix); + var waitingTask: EmptyTask + initAsRoot(waitingTask) + waitingTask.prefix setRefCount 2 + task.get().parent = TaskPtr(waitingTask.prefix) // This is the task we are waiting for - waitingTask.prefix setWaitingWorker worker; + waitingTask.prefix setWaitingWorker taskWroker // Execute the root task and wait for it - schedDoWait(worker, task, TaskPtr(waitingTask.prefix)); + schedDoWait(taskWroker, task, TaskPtr(waitingTask.prefix)) // Restore the old depth - worker.curDepth = oldDepth; -} + taskWroker.curDepth = oldDepth -fun schedEnqueue(task: TaskPtr) { - var worker: @Worker = task.get() worker; +fun schedEnqueue(task: TaskPtr) + var taskWroker: @Worker = task.get() worker - [ct] if ( isValidAndTrue(traceCalls) ) - Tracer(worker) << "schedEnqueue(" << task.get() << ")" << endl; - task.get() assertValid; + [ct] if isValidAndTrue(traceCalls) + tracer(taskWroker) << "schedEnqueue(" << task.get() << ")" << endl + task.get() assertValid // Announce new work in the task system - worker.taskSystem onTaskAvailable; + taskWroker.taskSystem onTaskAvailable // Enqueue the task - worker.taskSystem.globalQueue pushFront task.get(); -} + taskWroker.taskSystem.globalQueue pushFront task /////////////////////////////////////////////////////////////////////////////// // Task execution logic -fun schedDoWait(worker: @Worker, toExecute, waitingTask: TaskPtr) { +[native('par.tasksImpl.scheduler.schedDoWait')] +fun schedDoWait(worker: @Worker, toExecute, waitingTask: TaskPtr) - using maxRetriesBeforeGoingIdle = 1; - var numRetries = maxRetriesBeforeGoingIdle; + using maxRetriesBeforeGoingIdle = 1 + var numRetries = maxRetriesBeforeGoingIdle /* execution policy: - if given a task, execute it @@ -108,136 +105,118 @@ fun schedDoWait(worker: @Worker, toExecute, waitingTask: TaskPtr) { - stop when our waitingTask refCount reaches 1. */ - var nextToExecute: TaskPtr = toExecute; - while true { - [ct] if ( isValidAndTrue(traceCalls) ) - Tracer(worker)<< "work: " << TasksPrinter(toExecute, worker.work) << endl; + var nextToExecute: TaskPtr = toExecute + while true + [ct] if isValidAndTrue(traceCalls) + tracer(worker)<< "work: " << TasksPrinter(toExecute, worker.work) << endl // If we have a continuation, execute it - toExecute = nextToExecute; - nextToExecute reset; + toExecute = nextToExecute + nextToExecute reset // If we don't have something to execute, // extract something from our local queue - if ( toExecute isNull ) { + if toExecute isNull // Get item from the front: most recent - toExecute = (worker.work popFront); - } + toExecute = (worker.work popFront) // Try stealing something from global queue - if ( toExecute isNull ) { + if toExecute isNull // Get item from the back: FIFO style - toExecute = (worker.taskSystem.globalQueue popBack); - [ct] if ( isValidAndTrue(traceCalls) ) - if ( toExecute isSet ) - Tracer(worker)<< "Found task in global queue\n"; - } + toExecute = (worker.taskSystem.globalQueue popBack) + [ct] if isValidAndTrue(traceCalls) + if toExecute isSet + tracer(worker)<< "Found task in global queue\n" // Steal work from other queues - if ( toExecute isNull ) { + if toExecute isNull // Try different queues until we find one we can steal from - var numStealAttempts = 2*(worker.taskSystem numAvailableWorkers); - for i = 1..numStealAttempts { + var numStealAttempts = 2*(worker.taskSystem numAvailableWorkers) + for i = 1..numStealAttempts // Get a queue from another worker; chose it randomly - var otherQueue: @TaskQueue = (worker.taskSystem getRandomTaskQueue); + var otherQueue: @TaskQueue = (worker.taskSystem getRandomTaskQueue) // Get item from the back: try to take as much work as possible - toExecute = (otherQueue popBack); - if ( toExecute isSet ) { + toExecute = (otherQueue popBack) + if toExecute isSet // Change the worker for the stolen task - toExecute.get().worker = reinterpretCast(@Byte, worker); + toExecute.get().worker = reinterpretCast(@Byte, worker) - [ct] if ( isValidAndTrue(traceCalls) ) - Tracer(worker)<< "************************* Task stolen: " << toExecute.get() << "\n\n\n"; + [ct] if isValidAndTrue(traceCalls) + tracer(worker)<< "************************* Task stolen: " << toExecute.get() << "\n\n\n" // Stop trying to steal - break; - } - } - } + break // Try to execute what we've selected - if ( toExecute isSet ) { - nextToExecute = (toExecute.get() executeTask); - numRetries = maxRetriesBeforeGoingIdle; - } + if toExecute isSet + nextToExecute = (toExecute.get() executeTask) + numRetries = maxRetriesBeforeGoingIdle // Check if we executed all the children for our waiting task // This is our terminating condition - if ( (waitingTask isSet) ) { + if waitingTask isSet // If the task has a continuation, make sure we wait on it - while waitingTask.get().cont isSet { - waitingTask = waitingTask.get().cont; + while waitingTask.get().cont isSet + waitingTask = waitingTask.get().cont // Account for the wait of the continuation - waitingTask.get() incrementRefCount; - } + waitingTask.get() incrementRefCount // Check if we executed all its children - if ( (waitingTask.get() refCount) == 1 ) { - waitingTask.get() setRefCount 0; - break; - } - } + if (waitingTask.get() refCount) == 1 + waitingTask.get() setRefCount 0 + break - if ( toExecute isNull ) { + if toExecute isNull // Nothing to execute - if ( numRetries == 0 ) { - worker goIdle; - numRetries = maxRetriesBeforeGoingIdle; - } - else { + if numRetries == 0 + worker goIdle + numRetries = maxRetriesBeforeGoingIdle + else // Try several times to steal tasks // Going to sleep is more expensive - sleep(0); - } - } else { - --numRetries; - } - } + sleep(0) + else + --numRetries // Trace remaining work - [ct] if ( isValidAndTrue(traceCalls) ) - Tracer(worker) << "task " << waitingTask.get() << " can continue; remaining work: " << TasksPrinter(TaskPtr(), worker.work) << endl; -} + [ct] if isValidAndTrue(traceCalls) + tracer(worker) << "task " << waitingTask.get() << " can continue; remaining work: " << TasksPrinter(TaskPtr(), worker.work) << endl -fun executeTask(task: @TaskPrefix): TaskPtr { - var worker: @Worker = task worker; +fun executeTask(task: @TaskPrefix): TaskPtr + var taskWroker: @Worker = task worker // TODO: check cancellation - if ( false ) { - return; - } + if false + return TaskPtr() // Execute the task - [ct] if ( isValidAndTrue(traceCalls) ) - Tracer(worker) << "executing task " << task << "; parent=" << task.parent.get() << endl; - task.executeFn(task); + [ct] if isValidAndTrue(traceCalls) + tracer(taskWroker) << "executing task " << task << "; parent=" << task.parent.get() << endl + task.executeFn(task) // Now check if we can execute the parent // If the continuation is set, we shouldn't have a parent - if ( task.cont isSet ) - //assert(task.parent isNull); + //if task.cont isSet + //assert(task.parent isNull) // If no parent set, just return - if ( task.parent isNull ) - return TaskPtr(); + if task.parent isNull + return TaskPtr() // Decrement the ref count in the parent, as a result of the cur execution - var cnt = task.parent.get() decrementRefCount; + var cnt = task.parent.get() decrementRefCount // Did we finished executing every child of the parent? - if ( cnt == 0 ) { + if cnt == 0 // If we are waiting on the parent task, make sure it's waiting worker is awake - var ww: @Worker = task.parent.get() waitingWorker; - if ( ww !== worker && ww !== null ) { - ww tryWakeUp; - } + var ww: @Worker = task.parent.get() waitingWorker + if ww !== taskWroker && ww !== null + ww tryWakeUp // Enqueue the parent task for execution - [ct] if ( isValidAndTrue(traceCalls) ) - Tracer(worker) << "Can continue with the parent task: " << task.parent.get() << endl; - return task.parent; - } + [ct] if isValidAndTrue(traceCalls) + tracer(taskWroker) << "Can continue with the parent task: " << task.parent.get() << endl + return task.parent - return TaskPtr(); -} + return TaskPtr() diff --git a/SparrowImplicitLib/par/tasksImpl/schedulerIf.spr b/SparrowImplicitLib/par/tasksImpl/schedulerIf.spr index 93a643ae..21f70013 100644 --- a/SparrowImplicitLib/par/tasksImpl/schedulerIf.spr +++ b/SparrowImplicitLib/par/tasksImpl/schedulerIf.spr @@ -1,24 +1,19 @@ -module par.tasksImpl.schedulerIf.schedulerif; +module par.tasksImpl.schedulerIf.schedulerif -import taskPrefix(TaskPrefix); -import sched = scheduler(schedSpawn, schedWaitForAll, schedSpawnRootAndWait, schedWaitForAll, schedEnqueue); -import worker = worker(localWorker); -import std.ptr; +import taskPrefix(TaskPrefix, TaskPtr) +import sched = scheduler(schedSpawn, schedWaitForAll, schedSpawnRootAndWait, schedWaitForAll, schedEnqueue) +import worker = workerImpl(localWorker) +import std.ptr -fun spawn(task: @TaskPrefix) { - sched.schedSpawn(task); -} -fun spawnAndWaitForAll(curTask, child: @TaskPrefix) { - sched.schedWaitForAll(curTask, child); -} -fun spawnRootAndWait(root: @TaskPrefix) { - sched.schedSpawnRootAndWait(root); -} -fun waitForAll(curTask: @TaskPrefix) { - sched.schedWaitForAll(curTask, null); -} -fun enqueue(task: @TaskPrefix) { - sched.schedEnqueue(task); -} +fun spawn(task: @TaskPrefix) + sched.schedSpawn(TaskPtr(task)) +fun spawnAndWaitForAll(curTask, child: @TaskPrefix) + sched.schedWaitForAll(TaskPtr(curTask), TaskPtr(child)) +fun spawnRootAndWait(root: @TaskPrefix) + sched.schedSpawnRootAndWait(TaskPtr(root)) +fun waitForAll(curTask: @TaskPrefix) + sched.schedWaitForAll(TaskPtr(curTask), TaskPtr()) +fun enqueue(task: @TaskPrefix) + sched.schedEnqueue(TaskPtr(task)) -fun localWorker(): Byte Ptr = reinterpretPtr(Byte, worker.localWorker()); +fun localWorker(): Byte Ptr = reinterpretPtr(Byte, worker.localWorker()) diff --git a/SparrowImplicitLib/par/tasksImpl/taskPrefix.spr b/SparrowImplicitLib/par/tasksImpl/taskPrefix.spr index 97a83064..d40f618b 100644 --- a/SparrowImplicitLib/par/tasksImpl/taskPrefix.spr +++ b/SparrowImplicitLib/par/tasksImpl/taskPrefix.spr @@ -1,68 +1,59 @@ -module par.tasksImpl.taskPrefix; +module par.tasksImpl.taskPrefix -import worker(Worker); -import par.atomic; -import std.ptr, std.string; +import std.ptr +import std.ptr, std.string +import par.atomic -using TaskPtr = TaskPrefix Ptr; +using TaskPtr = TaskPrefix Ptr -datatype TaskPrefix { +datatype TaskPrefix //! The function to be called to execute the task (along with its 'this' offset) - /*private*/ executeFn: FunctionPtr(Null rt, @TaskPrefix); - /*private*/ taskOffset: DiffType; + /*private*/ executeFn: FunctionPtr(Null rt, @TaskPrefix) + /*private*/ taskOffset: DiffType - //! The parent task of this task; null for root tasks - /*private*/ parent: TaskPrefix Ptr; + //! The parent task of this task null for root tasks + /*private*/ parent: TaskPrefix Ptr //! The number of children + waits for this task - /*private*/ refCount: Int Atomic; + /*private*/ refCount: Int Atomic //! The continuation of this task - /*private*/ cont: TaskPrefix Ptr; + /*private*/ cont: TaskPrefix Ptr //! The worker that this task belongs to //! Use casts to avoid direct dependency - /*private*/ worker: Byte Ptr; + /*private*/ worker: Byte Ptr + // TODO: should this be atomic? //! The worker that waits for this task to finish //! Use casts to avoid direct dependency - /*private*/ waitingWorker: Byte Ptr; + /*private*/ waitingWorker: Byte Ptr //! Used for chaining the tasks in the task queue - /*private*/ prev: TaskPrefix Ptr; - /*private*/ next: TaskPrefix Ptr; + /*private*/ prev: TaskPrefix Ptr + /*private*/ next: TaskPrefix Ptr //! Used for debugging - /*private*/ desc: String; - /*private*/ depth: Int; -} + /*private*/ desc: String + /*private*/ depth: Int -fun >>(t: @TaskPrefix, os: @OutStream) { - if ( t === null ) - os << ""; - else { - if ( !(t.desc isEmpty) ) { - os << '"' << t.desc << '"'; - } - os << '<' << mkStreamRefWrapper(t) << ", " << (t.depth) << "," << (t.refCount load) << '>'; - } -} +fun >>(t: @TaskPrefix, os: @OutStream) + if t === null + os << "" + else + if !(t.desc isEmpty) + os << '"' << t.desc << '"' + os << '<' << mkStreamRefWrapper(t) << ", " << (t.depth) << "," << (t.refCount load) << '>' -fun refCount(task: @TaskPrefix): Int = task.refCount load; +fun refCount(task: @TaskPrefix): Int = task.refCount load fun setRefCount(task: @TaskPrefix, count: Int) { task.refCount = count; } fun addRefCount(task: @TaskPrefix, count: Int) { task.refCount += count; } -fun incrementRefCount(task: @TaskPrefix) = ++task.refCount; -fun decrementRefCount(task: @TaskPrefix) = --task.refCount; +fun incrementRefCount(task: @TaskPrefix) = ++task.refCount +fun decrementRefCount(task: @TaskPrefix) = --task.refCount -fun worker(task: @TaskPrefix): @Worker = reinterpretCast(@Worker, task.worker.get()); -fun waitingWorker(task: @TaskPrefix): @Worker = reinterpretCast(@Worker, task.waitingWorker.get()); -fun setWaitingWorker(task: @TaskPrefix, w: @Worker) { task.waitingWorker = reinterpretCast(@Byte, w); } +fun assertValid(task: @TaskPrefix) + [ct] if isValidAndTrue(DEBUG) + using ExecuteFnType = FunctionPtr(Null rt, @TaskPrefix) -fun assertValid(task: @TaskPrefix) { - [ct] if ( isValidAndTrue(DEBUG) ) { - using ExecuteFnType = FunctionPtr(Null rt, @TaskPrefix); - - assert(task !== null); - assert(task.executeFn != ExecuteFnType()); - assert(task.worker isSet); - } -} + assert(task !== null) + assert(task.executeFn != ExecuteFnType()) + assert(task.worker isSet) diff --git a/SparrowImplicitLib/par/tasksImpl/taskQueue.spr b/SparrowImplicitLib/par/tasksImpl/taskQueue.spr index 7512d90d..6a12a6e7 100644 --- a/SparrowImplicitLib/par/tasksImpl/taskQueue.spr +++ b/SparrowImplicitLib/par/tasksImpl/taskQueue.spr @@ -1,7 +1,8 @@ -module par.tasksImpl.taskQueue; +module par.tasksImpl.taskQueue -import taskPrefix(TaskPrefix, TaskPtr); -import par.locks; +import taskPrefix(TaskPrefix, TaskPtr) +import par.locks +import std.ptr //! Holds a queue of tasks //! @@ -11,99 +12,90 @@ import par.locks; //! //! We use an intrinsic double-linked list to store the queue. //! The methods are synchronized to protect 2 threads popping elements. -datatype TaskQueue { +datatype TaskQueue //! Points to the first element in the queue - _first: TaskPrefix Ptr; + _first: TaskPrefix Ptr //! Points to the last valid element in the queue - _last: TaskPrefix Ptr; + _last: TaskPrefix Ptr //! Mutex used to protect the extraction of tasks from both ends - _mutex: Mutex; -} - -fun empty(q: @TaskQueue): Bool { - var lock: ScopedLock(Mutex) = q._mutex; - return q._first isNull; -} - -fun size(q: @TaskQueue): SizeType { - var lock: ScopedLock(Mutex) = q._mutex; - return q uncheckedSize; -} - -fun uncheckedSize(q: @TaskQueue): SizeType { - var count: SizeType = 0; - var p: TaskPtr = q._first; + _mutex: Mutex + // TODO: use a spin-mutex here + +fun empty(q: @TaskQueue): Bool + var lock: ScopedLock(Mutex) = q._mutex + return q._first isNull + +fun size(q: @TaskQueue): SizeType + var lock: ScopedLock(Mutex) = q._mutex + return q uncheckedSize + +fun uncheckedSize(q: @TaskQueue): SizeType + var count: SizeType = 0 + var p: TaskPtr = q._first while p isSet - ++count; - p = p.get().next; - return count; -} + ++count + p = p.get().next + return count -fun pushFront(q: @TaskQueue, task: TaskPtr) { - var lock: ScopedLock(Mutex) = q._mutex; +fun pushFront(q: @TaskQueue, task: TaskPtr) + var lock: ScopedLock(Mutex) = q._mutex // don't care what prev is - task.get().next = q._first; - if ( q._first isSet ) { - q._first.get().prev = task; - } else { - q._last = task; - } - q._first = task; - - //assert(q._first isSet); - //assert(q._last isSet); - //assert(q._last.get().next isNull); -} -fun popFront(q: @TaskQueue): TaskPtr { - var lock: ScopedLock(Mutex) = q._mutex; - - if ( q._first isNull ) - return TaskPtr(); - - var res = q._first; - q._first = res.get().next; - if ( q._first isNull ) - q._last.reset; - - if ( q._last isSet ) - //assert(q._last.get().next isNull); - - return res; -} -fun popBack(q: @TaskQueue): TaskPtr { - var lock: ScopedLock(Mutex) = q._mutex; - - if ( q._last isNull ) - return TaskPtr(); - - var res = q._last; - if ( q._first == q._last ) { - q._first.reset; - q._last.reset; - } else { - q._last = res.get().prev; - q._last.get().next.reset; - } - - if ( q._last isSet ) - //assert(q._last.get().next isNull); - - return res; -} - - -[initCtor] datatype TasksPrinter { - _first: TaskPtr; - list: @TaskQueue; -} - -fun >> (tasks: @TasksPrinter, os: @OutStream) { - if ( tasks._first isSet ) - os << tasks._first.get() << " + "; - var p: TaskPtr = tasks.list._first; + task.get().next = q._first + if q._first isSet + q._first.get().prev = task + else + q._last = task + q._first = task + + //assert(q._first isSet) + //assert(q._last isSet) + //assert(q._last.get().next isNull) +fun popFront(q: @TaskQueue): TaskPtr + var lock: ScopedLock(Mutex) = q._mutex + + if q._first isNull + return TaskPtr() + + var res = q._first + q._first = res.get().next + if q._first isNull + q._last.reset + + //if q._last isSet + //assert(q._last.get().next isNull) + + return res +fun popBack(q: @TaskQueue): TaskPtr + var lock: ScopedLock(Mutex) = q._mutex + + if q._last isNull + return TaskPtr() + + var res = q._last + if q._first == q._last + q._first.reset + q._last.reset + else + q._last = res.get().prev + q._last.get().next.reset + + //if q._last isSet + //assert(q._last.get().next isNull) + + return res + + +[initCtor] datatype TasksPrinter + _first: TaskPtr + list: @TaskQueue + +fun >> (tasks: @TasksPrinter, os: @OutStream) + if tasks._first isSet + os << 'task=' + os << tasks._first.get() << " + " + var p: TaskPtr = tasks.list._first while p isSet - os << p.get() << ' '; - p = p.get().next; -} + os << p.get() << ' ' + p = p.get().next diff --git a/SparrowImplicitLib/par/tasksImpl/taskSystem.spr b/SparrowImplicitLib/par/tasksImpl/taskSystem.spr index 2a1c9697..bbf016cf 100644 --- a/SparrowImplicitLib/par/tasksImpl/taskSystem.spr +++ b/SparrowImplicitLib/par/tasksImpl/taskSystem.spr @@ -1,69 +1,65 @@ -module par.tasksImpl.taskSystem; +module par.tasksImpl.taskSystem -import internalWorkerThread(InternalWorkerThread); -import worker(Worker, WorkerPtr, IdleCounter); -import taskQueue(TaskQueue); -import par.thread; -import std.ptr, std.vector; -//import assert; +import internalWorkerThread(InternalWorkerThread) +import worker(Worker, IdleCounter) +import taskQueue(TaskQueue) +import par.thread +import std.ptr, std.vector +//import assert datatype TaskSystem //! The global queue of tasks - globalQueue: TaskQueue; + globalQueue: TaskQueue //! All the workers in the system - workers: WorkerPtr Vector; + workers: (Worker Ptr) Vector //! The internal threads that are doing all the work - _threads: (Thread Ptr) Vector; + _threads: (Thread Ptr) Vector //! Object that counts the number of threads that are idle - idleCounter: IdleCounter; + idleCounter: IdleCounter [protected] fun ctor(this: @TaskSystem) {} fun dtor(this: @TaskSystem) {} -var globalTaskSystem: TaskSystem Ptr; +var globalTaskSystem: TaskSystem Ptr -fun initTaskSystem(numWorkers: UInt = 0) { +fun initTaskSystem(numWorkers: UInt = 0) // Create the task system object - //assert(globalTaskSystem isNull); - globalTaskSystem = new(TaskSystem); + //assert(globalTaskSystem isNull) + globalTaskSystem = new(TaskSystem) // Create the required threads - if ( numWorkers == 0 ) - numWorkers = getAvailableCoresNum(); - //numWorkers = 1; - var ts: @TaskSystem = globalTaskSystem.get(); + if numWorkers == 0 + numWorkers = getAvailableCoresNum() + //numWorkers = 1 + var ts: @TaskSystem = globalTaskSystem.get() for i=0..numWorkers - var worker = ts createWorker; - var p: Thread Ptr = new(Thread, InternalWorkerThread(worker)); - ts._threads.pushBack(p); -} + var worker = ts createWorker + var p: Thread Ptr = new(Thread, InternalWorkerThread(worker)) + ts._threads.pushBack(p) -fun getInitTaskSystem: TaskSystem Ptr { +fun getInitTaskSystem: TaskSystem Ptr // TODO: Make sure this is initialized only once - if ( globalTaskSystem isNull ) - initTaskSystem; - return globalTaskSystem; -} + if globalTaskSystem isNull + initTaskSystem + return globalTaskSystem -fun createWorker(ts: TaskSystem Ptr): Worker Ptr { - var id: Int = ts.get().workers size; - var p: Worker Ptr = new(Worker, ts.get(), id, ts.get().idleCounter); - ts.get().workers.pushBack(p); - return p; -} +fun createWorker(ts: TaskSystem Ptr): Worker Ptr + var id: Int = ts.get().workers size + var p: Worker Ptr = new(Worker, ts.get(), id, ts.get().idleCounter) + ts.get().workers.pushBack(p) + return p // TODO: make it random -var cnt: Int = 0; +var cnt: Int = 0 -fun getRandomTaskQueue(ts: @TaskSystem): @TaskQueue { +fun getRandomTaskQueue(ts: @TaskSystem): @TaskQueue // TODO: make the selection truly random - var worker: Worker Ptr = ts.workers(++cnt % (ts.workers size)); - return worker.get().work; -} + var worker: Worker Ptr = ts.workers(++cnt % (ts.workers size)) + return worker.get().work fun onTaskAvailable(ts: @TaskSystem) // Do something only if we have at least one idle worker @@ -71,7 +67,7 @@ fun onTaskAvailable(ts: @TaskSystem) for w: Worker Ptr = ts.workers.all // If we can wake up one worker, exit if w.get() tryWakeUp - return; + return //! Get the number of currently available workers -fun numAvailableWorkers(ts: @TaskSystem): SizeType = (ts.workers size) - (ts.idleCounter numIdleWorkers); +fun numAvailableWorkers(ts: @TaskSystem): SizeType = (ts.workers size) - (ts.idleCounter numIdleWorkers) diff --git a/SparrowImplicitLib/par/tasksImpl/taskType.spr b/SparrowImplicitLib/par/tasksImpl/taskType.spr index 8d756df6..2fbd1385 100644 --- a/SparrowImplicitLib/par/tasksImpl/taskType.spr +++ b/SparrowImplicitLib/par/tasksImpl/taskType.spr @@ -1,6 +1,6 @@ -module par.tasksImpl.taskType; +module par.tasksImpl.taskType -[public] import taskPrefix(TaskPrefix); +[public] import taskPrefix(TaskPrefix) //! A concept describing a task that can be spawned //! @@ -9,13 +9,13 @@ module par.tasksImpl.taskType; //! - a way of obtaining the task prefix from it //! - either as an 'prefix' method //! - or as a 'prefix' field <-- this is the easiest -concept TaskType(x) - if isValid(x execute) - && ( isValidAndTrue(typeOf(x prefix) == @TaskPrefix) +concept TaskType(x) \ + if isValid(x execute) \ + && ( isValidAndTrue(typeOf(x prefix) == @TaskPrefix) \ || isValidAndTrue(typeOf(x.prefix) == TaskPrefix) ) - ; -fun getPrefix(t: @TaskType): @TaskPrefix = t prefix if isValid(t prefix); -fun getPrefix(t: @TaskType): @TaskPrefix = t.prefix if isValid(t.prefix) && !isValid(t prefix); + +fun getPrefix(t: @TaskType): @TaskPrefix = t prefix if isValid(t prefix) +fun getPrefix(t: @TaskType): @TaskPrefix = t.prefix if isValid(t.prefix) && !isValid(t prefix) diff --git a/SparrowImplicitLib/par/tasksImpl/tasksMain.spr b/SparrowImplicitLib/par/tasksImpl/tasksMain.spr index 960dbd51..59f37243 100644 --- a/SparrowImplicitLib/par/tasksImpl/tasksMain.spr +++ b/SparrowImplicitLib/par/tasksImpl/tasksMain.spr @@ -1,55 +1,48 @@ -module par.tasksImpl.tasksMain; +module par.tasksImpl.tasksMain -import taskType(TaskType, getPrefix); -import schedulerIf = schedulerIf(localWorker, spawn, spawnAndWaitForAll); -import taskPrefix(TaskPrefix); +import taskType(TaskType, getPrefix) +import schedulerIf = schedulerIf(localWorker, spawn, spawnAndWaitForAll) +import taskPrefix(TaskPrefix) -datatype ExecuteHelper(T: Type) { - [static] fun doExecute(prefix: @TaskPrefix): Null { - var objP: @Byte = ptrAdd(reinterpretCast(@Byte, prefix), prefix.taskOffset); - var obj: @T = reinterpretCast(@T, objP); - obj execute; - return Null(); - } +datatype ExecuteHelper(T: Type) - [static] fun getExecuteFn: FunctionPtr(Null rt, @TaskPrefix) = \doExecute; -} +fun doExecute(T: Type, prefix: @TaskPrefix): Null + var objP: @Byte = ptrAdd(reinterpretCast(@Byte, prefix), prefix.taskOffset) + var obj: @T = reinterpretCast(@T, objP) + obj execute + return Null() + +fun getExecuteFn(T: Type): FunctionPtr(Null rt, @TaskPrefix) = \doExecute(T, TaskPrefix()) //! Initializes the fields required for a task -fun initTaskBasic(obj: @TaskType) { +fun initTaskBasic(obj: @TaskType) // Init the execute function - var prefix: @TaskPrefix = getPrefix(obj); - prefix.taskOffset = ptrDiff(reinterpretCast(@Byte, obj), reinterpretCast(@Byte, prefix)); - prefix.executeFn = ExecuteHelper(typeOf(obj)).getExecuteFn(); + var prefix: @TaskPrefix = getPrefix(obj) + prefix.taskOffset = ptrDiff(reinterpretCast(@Byte, obj), reinterpretCast(@Byte, prefix)) + prefix.executeFn = getExecuteFn(typeOf(obj)) // Get its description (if it has one) - [ct] if ( isValidAndTrue(typeOf(obj description) == String) ) { - prefix.desc = (obj description); - } -} + [ct] if isValidAndTrue(typeOf(obj description) == String) + prefix.desc = (obj description) //! Initializes a root task -fun initAsRoot(task: @TaskType) { - initTaskBasic(task); - getPrefix(task).worker = schedulerIf.localWorker(); -} +fun initAsRoot(task: @TaskType) + initTaskBasic(task) + getPrefix(task).worker = schedulerIf.localWorker() //! Initializes a child task -fun initAsChildOf(task, parent: @TaskType) { - initTaskBasic(task); - getPrefix(task).parent = getPrefix(parent); - getPrefix(task).depth = 1+getPrefix(parent).depth; - getPrefix(task).worker = getPrefix(parent).worker; -} +fun initAsChildOf(task, parent: @TaskType) + initTaskBasic(task) + getPrefix(task).parent = getPrefix(parent) + getPrefix(task).depth = 1+getPrefix(parent).depth + getPrefix(task).worker = getPrefix(parent).worker //! Initialize a child task and spawn it -fun doSpawn(curTask, childTask: @TaskType) { - initAsChildOf(childTask, curTask); - schedulerIf.spawn(getPrefix(childTask)); -} +fun doSpawn(curTask, childTask: @TaskType) + initAsChildOf(childTask, curTask) + schedulerIf.spawn(getPrefix(childTask)) //! Initialize a child task, then spawn it and wait -fun doSpawnAndWait(curTask, childTask: @TaskType) { - initAsChildOf(childTask, curTask); - schedulerIf.spawnAndWaitForAll(getPrefix(curTask), getPrefix(childTask)); -} +fun doSpawnAndWait(curTask, childTask: @TaskType) + initAsChildOf(childTask, curTask) + schedulerIf.spawnAndWaitForAll(getPrefix(curTask), getPrefix(childTask)) diff --git a/SparrowImplicitLib/par/tasksImpl/worker.spr b/SparrowImplicitLib/par/tasksImpl/worker.spr deleted file mode 100644 index d810a0f2..00000000 --- a/SparrowImplicitLib/par/tasksImpl/worker.spr +++ /dev/null @@ -1,93 +0,0 @@ -module par.tasksImpl.worker; - -//import debug(Tracer); -import taskSystem(TaskSystem, getInitTaskSystem, createWorker); -import taskQueue(TaskQueue); -import par.tls, par.semaphore, par.atomic; -import par.semaphore; -import par.atomic; - -//! Class used to keep count of the number of threads that go idle -datatype IdleCounter - cnt: Int Atomic; - -fun goIdle(c: @IdleCounter) { c.cnt++; } -fun wakeUp(c: @IdleCounter) { c.cnt--; } -fun hasIdle(c: @IdleCounter) = (c.cnt load) > 0; -fun numIdleWorkers(c: @IdleCounter) = c.cnt load; - - -//! A worker class describes a thread that is doing some work on our task system -datatype Worker - taskSystem: @TaskSystem; - work: TaskQueue; - workerId: Int; - curDepth: Int; - - //! The global counter that keeps track on how many internal threads are idle - _idleCounter: @IdleCounter; - //! Non-zero if this thread has work to do - _isBusy: Int Atomic; - //! Semaphore used when sleeping - _waitSem: Semaphore; - -fun ctor(this: @Worker, taskSystem: @TaskSystem, workerId: Int, idleCounter: @IdleCounter) - this.taskSystem := taskSystem; - this.workerId = workerId; - this.curDepth = 0; - this._idleCounter := idleCounter; - this._isBusy store 0; - this._waitSem ctor 0; - -fun dtor(this: @Worker) {} - -using WorkerPtr = Worker Ptr; - -var _tlsWorker: Tls(@Worker); - -fun localWorker: Worker Ptr { - var worker: Worker Ptr = _tlsWorker get; - if ( worker isNull ) { - worker = (getInitTaskSystem createWorker); - _tlsWorker = worker.get(); - } - return worker; -} - -fun setLocalWorker(worker: Worker Ptr) { - _tlsWorker = worker.get(); -} - -fun goIdle(w: @Worker) { - if ( compareAndSwap(w._isBusy, 0, 1) ) { - // Change the global counter - w._idleCounter goIdle; - - [ct] if ( isValidAndTrue(traceCalls) ) - Tracer(w) << "going idle; we have " << (w._idleCounter.cnt load) << " idle workers" << endl; - - // Actually wake up the thread - w._waitSem acquire; - - [ct] if ( isValidAndTrue(traceCalls) ) - Tracer(w) << "woke up; we have " << (w._idleCounter.cnt load) << " idle workers" << endl; - } -} - -fun tryWakeUp(w: @Worker): Bool { - //[ct] if ( isValidAndTrue(traceCalls) ) - // Tracer(w) << "trying to wake up; _isBusy= " << (w._isBusy load) << endl; - // If the thread was now busy, and we can set it to busy, - // release the semaphore to wake up the thread - if ( compareAndSwap(w._isBusy, 1, 0) ) { - [ct] if ( isValidAndTrue(traceCalls) ) - Tracer(w) << "waking up; we have " << (w._idleCounter.cnt load) << " idle workers" << endl; - - w._waitSem release; - - // Make sure the counter will not count us anymore - w._idleCounter wakeUp; - return true; - } - return false; -} diff --git a/SparrowImplicitLib/par/tasksImpl/workerImpl.spr b/SparrowImplicitLib/par/tasksImpl/workerImpl.spr new file mode 100644 index 00000000..284914a7 --- /dev/null +++ b/SparrowImplicitLib/par/tasksImpl/workerImpl.spr @@ -0,0 +1,181 @@ +module par.tasksImpl.workerImpl + +import taskPrefix(TaskPrefix, TaskPtr) +import debug(Tracer) +import taskQueue(TaskQueue) +import idleCounter(IdleCounter) +//import scheduler(schedDoWait) + +import par.tls, par.semaphore, par.atomic, par.locks, par.thread +import std.string, std.vector + +using traceCalls = true + +//////////////////////////////////////////////////////////////////////////////// +// Worker datatype & functions + +//! A worker class describes a thread that is doing some work on our task system +datatype Worker + taskSystem: @TaskSystem + work: TaskQueue + workerId: Int + curDepth: Int + + //! The global counter that keeps track on how many internal threads are idle + _idleCounter: @IdleCounter + //! Non-zero if this thread has work to do + _isBusy: Int Atomic + //! Semaphore used when sleeping + _waitSem: Semaphore + +fun ctor(this: @Worker, taskSystem: @TaskSystem, workerId: Int, idleCounter: @IdleCounter) + this.taskSystem := taskSystem + this.workerId = workerId + this.curDepth = 0 + this._idleCounter := idleCounter + this._isBusy store 0 + this._waitSem ctor 0 + +fun dtor(this: @Worker) {} + +using WorkerPtr = Worker Ptr + +var _tlsWorker: Tls(@Worker) + +fun localWorker: Worker Ptr + var worker: Worker Ptr = _tlsWorker get + if worker isNull + worker = (getInitTaskSystem createWorker) + _tlsWorker = worker.get() + return worker + +fun setLocalWorker(worker: Worker Ptr) + _tlsWorker = worker.get() + +fun goIdle(w: @Worker) + if compareAndSwap(w._isBusy, 0, 1) + // Change the global counter + w._idleCounter goIdle + + [ct] if isValidAndTrue(traceCalls) + tracer(w) << "going idle; we have " << (w._idleCounter.cnt load) << " idle workers" << endl + + // Actually wake up the thread + w._waitSem acquire + + [ct] if isValidAndTrue(traceCalls) + tracer(w) << "woke up; we have " << (w._idleCounter.cnt load) << " idle workers" << endl + +fun tryWakeUp(w: @Worker): Bool + //[ct] if isValidAndTrue(traceCalls) + // tracer(w) << "trying to wake up; _isBusy= " << (w._isBusy load) << endl + // If the thread was now busy, and we can set it to busy, + // release the semaphore to wake up the thread + if compareAndSwap(w._isBusy, 1, 0) + [ct] if isValidAndTrue(traceCalls) + tracer(w) << "waking up; we have " << (w._idleCounter.cnt load) << " idle workers" << endl + + w._waitSem release + + // Make sure the counter will not count us anymore + w._idleCounter wakeUp + return true + return false + +fun tracer(this: @Worker) = Tracer(workerId, curDepth) + +//////////////////////////////////////////////////////////////////////////////// +// TaskPrexix functions + +fun worker(task: @TaskPrefix): @Worker = reinterpretCast(@Worker, task.worker.get()) +fun waitingWorker(task: @TaskPrefix): @Worker = reinterpretCast(@Worker, task.waitingWorker.get()) +fun setWaitingWorker(task: @TaskPrefix, w: @Worker) { task.waitingWorker = reinterpretCast(@Byte, w); } + + +//////////////////////////////////////////////////////////////////////////////// +// TaskSystem datatype + +datatype TaskSystem + //! The global queue of tasks + globalQueue: TaskQueue + + //! All the workers in the system + workers: (Worker Ptr) Vector + + //! The internal threads that are doing all the work + _threads: (Thread Ptr) Vector + //! Object that counts the number of threads that are idle + idleCounter: IdleCounter + +[protected] + fun ctor(this: @TaskSystem) {} + + fun dtor(this: @TaskSystem) {} + +var globalTaskSystem: TaskSystem Ptr + +fun initTaskSystem(numWorkers: UInt = 0) + // Create the task system object + //assert(globalTaskSystem isNull) + globalTaskSystem = new(TaskSystem) + + // Create the required threads + if numWorkers == 0 + numWorkers = getAvailableCoresNum() + //numWorkers = 1 + var ts = globalTaskSystem + for i=0..numWorkers + var worker = ts createWorker + var p: Thread Ptr = new(Thread, InternalWorkerThread(worker)) + (ts.get)._threads.pushBack(p) + +fun getInitTaskSystem: TaskSystem Ptr + // TODO: Make sure this is initialized only once + if globalTaskSystem isNull + initTaskSystem + return globalTaskSystem + +fun createWorker(ts: TaskSystem Ptr): Worker Ptr + var id: Int = ts.get().workers size + var p: Worker Ptr = new(Worker, ts.get(), id, ts.get().idleCounter) + ts.get().workers.pushBack(p) + return p + +// TODO: make it random +var cnt: Int = 0 + +fun getRandomTaskQueue(ts: @TaskSystem): @TaskQueue + // TODO: make the selection truly random + var worker: Worker Ptr = ts.workers(++cnt % (ts.workers size)) + return worker.get().work + +fun onTaskAvailable(ts: @TaskSystem) + // Do something only if we have at least one idle worker + if ts.idleCounter hasIdle + for w: Worker Ptr = ts.workers.all + // If we can wake up one worker, exit + if w.get() tryWakeUp + return + +//! Get the number of currently available workers +fun numAvailableWorkers(ts: @TaskSystem): SizeType = (ts.workers size) - (ts.idleCounter numIdleWorkers) + + +//////////////////////////////////////////////////////////////////////////////// +// InternalWorkerThread datatype + +//! Class that represents an internal worker thread +//! This tries to execute as much work as possible +//! If no work is available, this goes to sleep +[initCtor] datatype InternalWorkerThread + //! The worker associated with this thread + _worker: Worker Ptr + +fun ()(this: @InternalWorkerThread) + setLocalWorker(_worker) + schedDoWait(_worker.get(), TaskPtr(), TaskPtr()) + +// Defined in scheduler.spr +[native('par.tasksImpl.scheduler.schedDoWait')] +fun schedDoWait(worker: @Worker, toExecute, waitingTask: TaskPtr) + diff --git a/SparrowImplicitLib/par/thread.spr b/SparrowImplicitLib/par/thread.spr index 53e19ce2..f3ccd036 100644 --- a/SparrowImplicitLib/par/thread.spr +++ b/SparrowImplicitLib/par/thread.spr @@ -1,30 +1,30 @@ -module par.thread; +module par.thread -import config; -import std.newDelete; -//import assert; +import config +import std.newDelete +//import assert //! Datatype describing a thread ID datatype ThreadId - id: ULong; + id: ULong fun ctor(this: @ThreadId, id: ULong) - this.id ctor id; + this.id ctor id fun ctor(this: @ThreadId, h: NativeThreadHandle) - this.id ctor; - Impl.pthread_threadid_np(h, id); + this.id ctor + _Impl.pthread_threadid_np(h, id) fun >>(tid: @ThreadId, os: @OutStream) - os << tid.id; + os << tid.id datatype Thread - _handle: NativeThreadHandle; + _handle: NativeThreadHandle //! Construct the Thread with a functor //! The functor can have state, not just executing code fun ctor(this: @Thread, f: AnyType) if isValid(f()) - _handle = Impl.startThread(f); + _handle = _Impl.startThread(f) //! Cannot copy construct or assign a Thread object [private] fun ctor(this, other: @Thread); @@ -32,88 +32,83 @@ fun ctor(this: @Thread, f: AnyType) if isValid(f()) //! Destructor. Detach the thread if joinable fun dtor(this: @Thread) - if ( this isJoinable ) this detach; + if this isJoinable + this detach fun >>(t: @Thread, os: @OutStream) -{ - if ( _handle != NativeThreadHandle() ) - os << "Thread(" << _handle << ")"; + if _handle != NativeThreadHandle() + os << "Thread(" << _handle << ")" else - os << "Thread(null)"; -} + os << "Thread(null)" //! Swap the content of two Thread objects -//fun swap(lhs, rhs: @Thread) { -// var tmp = lhs._handle swap rhs._handle; -//} +//fun swap(lhs, rhs: @Thread) +// var tmp = lhs._handle swap rhs._handle -//! Test if the thread is joinable (e.g., still active); -fun isJoinable(t: Thread) = t._handle != NativeThreadHandle(); +//! Test if the thread is joinable (e.g., still active) +fun isJoinable(t: Thread) = t._handle != NativeThreadHandle() //! Wait for the thread to finish //! This shouldn't be called from the same thread -fun join(t: @Thread) { - //assert(t isJoinable); - //assert((t getId) !== curThreadId); - var res = Impl.pthread_join(t._handle, null); - //assert(res == 0); - t._handle = NativeThreadHandle(); -} +fun join(t: @Thread) + //assert(t isJoinable) + //assert((t getId) !== curThreadId) + var res = _Impl.pthread_join(t._handle, null) + //assert(res == 0) + t._handle = NativeThreadHandle() //! Detaches the given thread object from the actual thread of execution //! The actual thread may continue to execute after this call -fun detach(t: @Thread) { - //assert(t isJoinable); - var res = Impl.pthread_detach(t._handle); - //assert(res == 0); - t._handle = NativeThreadHandle(); -} +fun detach(t: @Thread) + //assert(t isJoinable) + var res = _Impl.pthread_detach(t._handle) + //assert(res == 0) + t._handle = NativeThreadHandle() //! Get the ID of the given thread -fun getId(t: Thread): ThreadId = ThreadId(t._handle); +fun getId(t: Thread): ThreadId = ThreadId(t._handle) //! Get the native handle of the given thread -fun getNativeHandle(t: Thread): NativeThreadHandle = t._handle; +fun getNativeHandle(t: Thread): NativeThreadHandle = t._handle //! Get the current thread id -fun curThreadId: ThreadId = ThreadId(NativeThreadHandle()); +fun curThreadId: ThreadId = ThreadId(NativeThreadHandle()) //! Tries to suspend the current thread to let other threads execute -fun yield = Impl.sched_yield(); +fun yield = _Impl.sched_yield() //! Sleep the given amount of milliseconds -fun sleep(durMs: UInt) { - var req = Impl.TimeSpec(durMs/1000, (durMs%1000)*1000000); - var rem = Impl.TimeSpec(0, 0); - Impl.nanosleep(req, rem); -} - -package Impl { - using ThreadFun = FunctionPtr(@Byte rt, @Byte rt); - - [initCtor] Closure(FType: Type) { - f: FType; - - [static] fun threadFun(arg: @Byte): @Byte { - var self: @Closure = reinterpretCast(@Closure, arg); - self.f(); - self delete; - return null; - } - - [static] fun getFunPtr: ThreadFun = \threadFun; - } - fun startThread(f: AnyType): NativeThreadHandle { - using ClosureType = Closure(typeOf(f)); - var closurePtr: @ClosureType = new(ClosureType, f); - var handle: NativeThreadHandle; - var arg: @Byte = reinterpretCast(@Byte, closurePtr); - var status = pthread_create(handle, null, ClosureType.getFunPtr, arg); - return handle; - } - - [initCtor] datatype TimeSpec { var tv_sec, tv_nsec: Long; } +fun sleep(durMs: UInt) + var req = _Impl.TimeSpec(durMs/1000, (durMs%1000)*1000000) + var rem = _Impl.TimeSpec(0, 0) + _Impl.nanosleep(req, rem) + +package _Impl + using ThreadFun = FunctionPtr(@Byte rt, @Byte rt) + + [initCtor] + datatype Closure(FType: Type) + f: FType + + fun threadFun(closureType: Type, arg: @Byte): @Byte + var self: @closureType = reinterpretCast(@closureType, arg) + self.f() + self delete + return null + + fun getFunPtr(closureType: Type): ThreadFun = \threadFun(closureType, null) + + fun startThread(f: AnyType): NativeThreadHandle + using ClosureType = Closure(typeOf(f)) + var closurePtr: @ClosureType = new(ClosureType, f) + var handle: NativeThreadHandle + var arg: @Byte = reinterpretCast(@Byte, closurePtr) + var status = pthread_create(handle, null, getFunPtr(ClosureType), arg) + return handle + + [initCtor] datatype TimeSpec + tv_sec, tv_nsec: Long [native("pthread_create")] fun pthread_create(handle: @NativeThreadHandle, attr: @Byte, f: ThreadFun, arg: @Byte): Int; [native("pthread_join")] fun pthread_join(handle: NativeThreadHandle, valPtr: @ @Byte): Int; @@ -124,5 +119,4 @@ package Impl { [native("sched_yield")] fun sched_yield; [native("nanosleep")] fun nanosleep(req, rem: @TimeSpec): Int; -} diff --git a/SparrowImplicitLib/par/tls.spr b/SparrowImplicitLib/par/tls.spr index b2b50257..0fe3ff30 100644 --- a/SparrowImplicitLib/par/tls.spr +++ b/SparrowImplicitLib/par/tls.spr @@ -1,74 +1,57 @@ -module par.tls; +module par.tls -import config; -import assert; -import std.newDelete; +import config +import std.newDelete -//! A class that can hold some data into a thread-local storage -//! The size of the data must be less or equal to the size of a reference -//! For references, we would delete the pointer when the thread exits -datatype Tls(T: Type) if TypeOp.isRef(T) - _tls: Impl.TlsCommon; +datatype Tls(T: Type) if sizeOf(T) <= sizeOf(@Byte) + _tls: _Impl.TlsCommon - using ValueType = T; + using ValueType = T fun ctor(this: @Tls) - _tls.ctor(\_destructPtr(ValueType, null)); + [ct] if TypeOp.isRef(ValueType) + _tls ctor \_destructPtr(ValueType, null) + else + _tls ctor - -fun =(this, other: @Tls) - _tls setValue (other._tls getValue); +fun =(this: @Tls, other: typeOf(this)) + _tls setValue (other._tls getValue) fun =(this: @Tls, val: this.ValueType) - _tls setValue reinterpretCast(@Byte, val); - - -fun get(this: @Tls): this.ValueType - return reinterpretCast(T, _tls getValue); - -fun _destructPtr(t: Type, p: @Byte): Null { - delete(reinterpretCast(t, p)); - return Null(); -} - -datatype Tls(T: Type) if !TypeOp.isRef(T) && sizeOf(T) <= sizeOf(@Byte) { - _tls: Impl.TlsCommon; - - using ValueType = T; - - fun =(other: @Tls) { - _tls setValue (other._tls getValue); - } - fun =(val: T) { - _tls setValue reinterpretCast(@ @Byte, val); - } + [ct] if TypeOp.isRef(ValueType) + _tls setValue reinterpretCast(@ @Byte, val) + else + _tls setValue reinterpretCast(@Byte, val) - fun get: T { - return reinterpretCast(@ @T, _tls getValue); - } -} +fun get(this: @Tls): ValueType + [ct] if TypeOp.isRef(ValueType) + return reinterpretCast(ValueType, _tls getValue) + else + return reinterpretCast(@ValueType, _tls getValue) +fun _destructPtr(t: Type, p: @Byte): Null + delete(reinterpretCast(t, p)) + return Null() -package Impl { - using DtorFun = FunctionPtr(Null rt, @Byte rt); +package _Impl + using DtorFun = FunctionPtr(Null rt, @Byte rt) datatype TlsCommon - key: KeyT; + key: KeyT fun ctor(this: @TlsCommon) - pthread_key_create(key, DtorFun()); + pthread_key_create(key, DtorFun()) fun ctor(this: @TlsCommon, dtorFun: DtorFun) - pthread_key_create(key, dtorFun); + pthread_key_create(key, dtorFun) fun dtor(this: @TlsCommon) - pthread_key_delete(key); + pthread_key_delete(key) - fun setValue(tls: @TlsCommon, val: @Byte) { pthread_setspecific(tls.key, val); } - fun getValue(tls: @TlsCommon): @Byte = pthread_getspecific(tls.key); + fun setValue(tls: @TlsCommon, val: @Byte) = pthread_setspecific(tls.key, val) + fun getValue(tls: @TlsCommon): @Byte = pthread_getspecific(tls.key) - using KeyT = ULong; + using KeyT = ULong - [native("pthread_key_create")] fun pthread_key_create(k: @KeyT, dtorFun: DtorFun): Int; - [native("pthread_key_delete")] fun pthread_key_delete(k: KeyT): Int; - [native("pthread_setspecific")] fun pthread_setspecific(k: KeyT, val: @Byte): Int; - [native("pthread_getspecific")] fun pthread_getspecific(k: KeyT): @Byte; -} \ No newline at end of file + [native("pthread_key_create")] fun pthread_key_create(k: @KeyT, dtorFun: DtorFun): Int + [native("pthread_key_delete")] fun pthread_key_delete(k: KeyT): Int + [native("pthread_setspecific")] fun pthread_setspecific(k: KeyT, val: @Byte): Int + [native("pthread_getspecific")] fun pthread_getspecific(k: KeyT): @Byte diff --git a/SparrowImplicitLib/sprCore/basicDecls.spr b/SparrowImplicitLib/sprCore/basicDecls.spr index 481013cc..6c0dd94b 100644 --- a/SparrowImplicitLib/sprCore/basicDecls.spr +++ b/SparrowImplicitLib/sprCore/basicDecls.spr @@ -671,8 +671,8 @@ fun = (this: @StringRef, src: StringRef): @StringRef return this -[protected] -fun == (this: @StringRef, other: StringRef): Bool +[protected, autoCt] +fun == (this, other: StringRef): Bool var s = this.size if s != other.size return false diff --git a/SparrowImplicitLib/test.spr b/SparrowImplicitLib/test.spr index d2941d37..e1b0aad4 100644 --- a/SparrowImplicitLib/test.spr +++ b/SparrowImplicitLib/test.spr @@ -14,8 +14,9 @@ fun assertEq(val, expectedVal: AnyType, message: StringRef = "") \ if isValid(val == expectedVal) if !(val == expectedVal) cout << "Assertion failure: values don't match" << endl - cout << " given value: " << val << endl - cout << " expected value: " << expectedVal << endl + [ct] if isValid(cout << val) + cout << " given value: " << val << endl + cout << " expected value: " << expectedVal << endl if !message.isEmpty cout << " message: " << message << endl cout << flush @@ -25,8 +26,9 @@ fun assertNe(val1, val2: AnyType, message: StringRef = "") \ if isValid(val1 == val2) if val1 == val2 cout << "Assertion failure: values should be different" << endl - cout << " value 1: " << val1 << endl - cout << " value 2: " << val2 << endl + [ct] if isValid(cout << val1) + cout << " value 1: " << val1 << endl + cout << " value 2: " << val2 << endl if !message.isEmpty cout << " message: " << message << endl cout << flush @@ -36,8 +38,9 @@ fun assertLt(val1, val2: AnyType, message: StringRef = "") \ if isValid(val1 < val2) if !(val1 < val2) cout << "Assertion failure: first value is not less than second" << endl - cout << " value 1: " << val1 << endl - cout << " value 2: " << val2 << endl + [ct] if isValid(cout << val1) + cout << " value 1: " << val1 << endl + cout << " value 2: " << val2 << endl if !message.isEmpty cout << " message: " << message << endl cout << flush @@ -47,8 +50,9 @@ fun assertLe(val1, val2: AnyType, message: StringRef = "") \ if isValid(val1 <= val2) if !(val1 <= val2) cout << "Assertion failure: first value is not less or equal than second" << endl - cout << " value 1: " << val1 << endl - cout << " value 2: " << val2 << endl + [ct] if isValid(cout << val1) + cout << " value 1: " << val1 << endl + cout << " value 2: " << val2 << endl if !message.isEmpty cout << " message: " << message << endl cout << flush @@ -58,8 +62,9 @@ fun assertGt(val1, val2: AnyType, message: StringRef = "") \ if isValid(val1 > val2) if !(val1 > val2) cout << "Assertion failure: first value is not greater than second" << endl - cout << " value 1: " << val1 << endl - cout << " value 2: " << val2 << endl + [ct] if isValid(cout << val1) + cout << " value 1: " << val1 << endl + cout << " value 2: " << val2 << endl if !message.isEmpty cout << " message: " << message << endl cout << flush @@ -69,8 +74,9 @@ fun assertGe(val1, val2: AnyType, message: StringRef = "") \ if isValid(val1 >= val2) if !(val1 >= val2) cout << "Assertion failure: first value is not greater or equal than second" << endl - cout << " value 1: " << val1 << endl - cout << " value 2: " << val2 << endl + [ct] if isValid(cout << val1) + cout << " value 1: " << val1 << endl + cout << " value 2: " << val2 << endl if !message.isEmpty cout << " message: " << message << endl cout << flush diff --git a/src/Feather/src/Utils/cppif/FeatherNodes.cpp b/src/Feather/src/Utils/cppif/FeatherNodes.cpp index ef07bc97..f9007c58 100644 --- a/src/Feather/src/Utils/cppif/FeatherNodes.cpp +++ b/src/Feather/src/Utils/cppif/FeatherNodes.cpp @@ -539,12 +539,6 @@ void StructDecl::setContextForChildrenImpl(StructDecl node) { Type StructDecl::computeTypeImpl(StructDecl node) { if (node.name().empty()) REP_ERROR_RET(nullptr, node.location(), "No name given to struct"); - - // Compute the type for all the fields - for (auto field : node.children()) { - // Ignore errors from children - field.computeType(); - } return DataType::get(node, 0, Feather_effectiveEvalMode(node)); } NodeHandle StructDecl::semanticCheckImpl(StructDecl node) { @@ -1100,8 +1094,18 @@ NodeHandle IfStmt::condition() const { return children()[0]; } NodeHandle IfStmt::thenClause() const { return children()[1]; } NodeHandle IfStmt::elseClause() const { return children()[2]; } void IfStmt::setContextForChildrenImpl(IfStmt node) { + NodeHandle condition = node.condition(); + NodeHandle thenClause = node.thenClause(); + NodeHandle elseClause = node.elseClause(); + node.setChildrenContext(Nest_mkChildContextWithSymTab(node.context(), node, modeUnspecified)); - NodeHandle::setContextForChildrenImpl(node); + condition.setContext(node.childrenContext()); + if (Feather_nodeEvalMode(node) != modeCt) { + if (thenClause) + thenClause.setContext(node.childrenContext()); + if (elseClause) + elseClause.setContext(node.childrenContext()); + } } NodeHandle IfStmt::semanticCheckImpl(IfStmt node) { NodeHandle condition = node.condition(); @@ -1141,6 +1145,14 @@ NodeHandle IfStmt::semanticCheckImpl(IfStmt node) { NodeHandle c = Nest_ctEval(condition); NodeHandle selectedBranch = _getBoolCtValue(c) ? thenClause : elseClause; + // For ct ifs do a compute type beforehand + // This is needed for top-level ct ifs + if (selectedBranch) + { + selectedBranch.setContext(node.context()); + selectedBranch.computeType(); + } + // Expand only the selected branch if (selectedBranch) return selectedBranch; diff --git a/src/Nest/Utils/Profiling.h b/src/Nest/Utils/Profiling.h index 3a3ae9dc..84d89444 100644 --- a/src/Nest/Utils/Profiling.h +++ b/src/Nest/Utils/Profiling.h @@ -53,11 +53,13 @@ inline void _Nest_Profiling_message(Nest_StringRef text) { typedef TracyCZoneCtx Nest_Profiling_ZoneCtx; -#define PROFILING_C_ZONE_BEGIN(ctx) \ - static const struct Nest_Profiling_LocType TracyConcat(__tracy_source_location, __LINE__) = { \ - NULL, __FUNCTION__, __FILE__, (uint32_t)__LINE__, 0}; \ +#define PROFILING_C_ZONE_BEGIN_NAME(ctx, staticName, active) \ + static const Nest_Profiling_LocType TracyConcat(__tracy_source_location, __LINE__) = { \ + staticName, __FUNCTION__, __FILE__, (uint32_t)__LINE__, 0}; \ Nest_Profiling_ZoneCtx ctx = \ - ___tracy_emit_zone_begin(&TracyConcat(__tracy_source_location, __LINE__), 1); + ___tracy_emit_zone_begin(&TracyConcat(__tracy_source_location, __LINE__), active); + +#define PROFILING_C_ZONE_BEGIN(ctx) PROFILING_C_ZONE_BEGIN_NAME(ctx, NULL, 1) #define PROFILING_C_ZONE_BEGIN_LOC(ctx, locPtr) \ Nest_Profiling_ZoneCtx ctx = ___tracy_emit_zone_begin(locPtr, 1) @@ -96,9 +98,10 @@ const Nest_Profiling_LocType* Nest_Profiling_createLoc( #define PROFILING_MESSAGE_STATIC(staticText) /*nothing*/ #define PROFILING_MESSAGE(text) /*nothing*/ -#define PROFILING_C_ZONE_BEGIN(ctx) /*nothing*/ -#define PROFILING_C_ZONE_BEGIN_LOC(ctx, locPtr) /*nothing*/ -#define PROFILING_C_ZONE_END(ctx) /*nothing*/ -#define PROFILING_C_ZONE_SETTEEXT(ctx, text) /*nothing*/ +#define PROFILING_C_ZONE_BEGIN_NAME(ctx, staticName, active) /*nothing*/ +#define PROFILING_C_ZONE_BEGIN(ctx) /*nothing*/ +#define PROFILING_C_ZONE_BEGIN_LOC(ctx, locPtr) /*nothing*/ +#define PROFILING_C_ZONE_END(ctx) /*nothing*/ +#define PROFILING_C_ZONE_SETTEEXT(ctx, text) /*nothing*/ #endif \ No newline at end of file diff --git a/src/Nest/src/Api/Node.c b/src/Nest/src/Api/Node.c index 902ab552..83ac4c8b 100644 --- a/src/Nest/src/Api/Node.c +++ b/src/Nest/src/Api/Node.c @@ -69,6 +69,31 @@ int _setExplanation(Nest_Node* node, Nest_Node* explanation) { return res; } +#if SPARROW_PROFILING +void _setProfilingText(Nest_Profiling_ZoneCtx ctx, Nest_Node* node) { + if (!ctx.active) + return; + const char* nodeDesc = Nest_toStringEx(node); + if (nodeDesc) { + char zoneDesc[256]; + int len = 0; + if (node->location.sourceCode) + len = snprintf(zoneDesc, 256, "%s:%d:%d\n%s", node->location.sourceCode->url, + (int)node->location.start.line, (int)node->location.start.col, nodeDesc); + else + len = snprintf(zoneDesc, 256, "?:%d:%d\n%s", (int)node->location.start.line, + (int)node->location.start.col, nodeDesc); + if (len > 0) { + PROFILING_C_ZONE_SETTEEXT(ctx, zoneDesc); + } + } +} +#define PROFILING_C_ZONE_SETNODETEXT(ctx, node) _setProfilingText(ctx, node) +#else +#define PROFILING_C_ZONE_SETNODETEXT(ctx, node) /*nothing*/ +#endif + + Nest_Node* Nest_createNode(int nodeKind) { ASSERT(nodeKind >= 0); @@ -110,8 +135,14 @@ void Nest_setContext(Nest_Node* node, Nest_CompilationContext* context) { if (context == node->context) return; + int changingContext = node->context && node->context != context; + (void) changingContext; + PROFILING_C_ZONE_BEGIN_LOC(ctx, Nest_Profiling_getSetContextLoc(node->nodeKind)); + PROFILING_C_ZONE_BEGIN_NAME(ctx2, "changing context", changingContext); + PROFILING_C_ZONE_SETNODETEXT(ctx2, node); + ASSERT(context); node->context = context; @@ -123,6 +154,8 @@ void Nest_setContext(Nest_Node* node, Nest_CompilationContext* context) { Nest_getSetContextForChildrenFun(node->nodeKind)(node); _applyModifiers(node, modTypeAfterSetContext); + + PROFILING_C_ZONE_END(ctx2); PROFILING_C_ZONE_END(ctx); } @@ -137,6 +170,7 @@ Nest_TypeRef Nest_computeType(Nest_Node* node) { Nest_toString(node)); PROFILING_C_ZONE_BEGIN_LOC(ctx, Nest_Profiling_getComputeTypeLoc(node->nodeKind)); + PROFILING_C_ZONE_SETNODETEXT(ctx, node); node->computeTypeStarted = 1; @@ -198,22 +232,7 @@ Nest_Node* Nest_semanticCheck(Nest_Node* node) { _applyModifiers(node, modTypeAfterSemanticCheck); -#ifdef SPARROW_PROFILING - const char* nodeDesc = Nest_toStringEx(node); - if (nodeDesc) { - char zoneDesc[256]; - int len = 0; - if (node->location.sourceCode) - len = snprintf(zoneDesc, 256, "%s:%d:%d\n%s", node->location.sourceCode->url, - (int)node->location.start.line, (int)node->location.start.col, nodeDesc); - else - len = snprintf(zoneDesc, 256, "?:%d:%d\n%s", (int)node->location.start.line, - (int)node->location.start.col, nodeDesc); - if (len > 0) { - PROFILING_C_ZONE_SETTEEXT(ctx, zoneDesc); - } - } -#endif + PROFILING_C_ZONE_SETNODETEXT(ctx, node); PROFILING_C_ZONE_END(ctx); return node->explanation; diff --git a/src/Nest/src/Utils/Diagnostic.cpp b/src/Nest/src/Utils/Diagnostic.cpp index bc154b6d..a359ed4c 100644 --- a/src/Nest/src/Utils/Diagnostic.cpp +++ b/src/Nest/src/Utils/Diagnostic.cpp @@ -4,6 +4,7 @@ #include "Nest/Api/StringRef.h" #include "Nest/Utils/cppif/NodeHandle.hpp" #include "Nest/Utils/cppif/StringRef.hpp" +#include "Nest/Utils/Profiling.h" #include "Nest/Api/SourceCode.h" #include "Nest/Api/Type.h" @@ -19,21 +20,38 @@ namespace { void doReport(const Location& loc, Nest_DiagnosticSeverity severity, const string& message) { cerr << endl; +#if SPARROW_PROFILING + ostringstream msgStream; + const char* diagType = ""; +#endif + // Write location: 'filename(line:col) : ' if (!Nest_isLocEmpty(&loc)) { cerr << loc << " : "; +#if SPARROW_PROFILING + msgStream << loc << " : "; +#endif } // Write the severity switch (severity) { case diagInternalError: cerr << ConsoleColors::fgHiMagenta << "INTERNAL ERROR : "; +#if SPARROW_PROFILING + diagType = "INTERNAL ERROR"; +#endif break; case diagError: cerr << ConsoleColors::fgLoRed << "ERROR : "; +#if SPARROW_PROFILING + diagType = "ERROR"; +#endif break; case diagWarning: cerr << ConsoleColors::fgHiYellow << "WARNING : "; +#if SPARROW_PROFILING + diagType = "WARNING"; +#endif break; case diagInfo: default: @@ -43,6 +61,11 @@ void doReport(const Location& loc, Nest_DiagnosticSeverity severity, const strin // Write the diagnostic text cerr << ConsoleColors::stClear << ConsoleColors::stBold << message << ConsoleColors::stClear << endl; +#if SPARROW_PROFILING + msgStream << "\n" << diagType << " : " << message; + PROFILING_ZONE_NAMED_TEXT(diagType, msgStream.str().c_str()); + PROFILING_MESSAGE(msgStream.str().c_str()); +#endif // Try to write the source line no in which the diagnostic occurred if (!Nest_isLocEmpty(&loc)) { diff --git a/src/Nest/src/Utils/cppif/NodeHandle.cpp b/src/Nest/src/Utils/cppif/NodeHandle.cpp index 04806f4d..72b30c69 100644 --- a/src/Nest/src/Utils/cppif/NodeHandle.cpp +++ b/src/Nest/src/Utils/cppif/NodeHandle.cpp @@ -272,6 +272,10 @@ bool NodeHandle::isSemanticallyChecked() const { return handle->nodeSemantically void NodeHandle::addModifier(Nest_Modifier* mod) { // NOLINTNEXTLINE(cppcoreguidelines-pro-type-cstyle-cast) NestUtils_appendObjectToPtrArray((NestUtils_PtrArray*)&handle->modifiers, mod); + + // If we are setting a modifier for a node that already started to compile, call the function + if (handle->semanticCheckStarted && mod->modifierType == modTypeBeforeSemanticCheck) + mod->modifierFun(mod, handle); } CompilationContext* NodeHandle::childrenContext() const { diff --git a/tests/Par/AtomicTest.spr b/tests/Par/AtomicTest.spr index e45a9f44..f5af1b65 100644 --- a/tests/Par/AtomicTest.spr +++ b/tests/Par/AtomicTest.spr @@ -1,181 +1,159 @@ //! ! -dump-assembly -import par.thread; -import par.atomic; -import std.ranges; -import std.vector; -import os; -import test; - -[native("rand")] fun rand(): Int; -[native("srand")] fun srand(seed: UInt); -[native("time")] fun time(timer: @Int): Int; - -datatype UserDefined - lo, hi: Short; - -fun ctor(this: @UserDefined, val: Int) { - lo = Short(val % 0xffff); - hi = Short(val / 0xffff); -} -fun >> (os: OutStream) { - os << "(" << hi << "," << lo << ")"; -} -fun testOneLoad(t: Type, val: AnyType) { - var atomic: t Atomic = val; - assertEq(atomic load, val); -} -fun testLoadNumeric(t: Type, count: Int) { - for i=0..count { - var val: t = rand(); - testOneLoad(t, val); - } -} -fun testLoadPtr(t: Type, count: Int) { - for i=0..count { - var val: t = rand(); - var p: t Ptr = new(t, val); - testOneLoad(t Ptr, p); - p release; - } -} - -fun testOneStore(atomic: AnyType, val: AnyType) { - atomic store val; - assertEq(atomic load, val); -} -fun testStoreNumeric(t: Type, count: Int) { - var atomic: t Atomic; - for i=0..count { - var val: t = rand(); - testOneStore(atomic, val); - } -} -fun testStorePtr(t: Type, count: Int) { - var atomic: (t Ptr) Atomic; - for i=0..count { - var val: t = rand(); - var p: t Ptr = new(t, val); - testOneStore(atomic, p); - p release; - } -} - -fun testOneFetchAndStore(atomic: AnyType, val1, val2: AnyType) { - atomic store val1; - assertEq(atomic load, val1); - var x = atomic fetchAndStore val2; - assertEq(atomic load, val2); - assertEq(x, val1); -} -fun testFetchAndStoreNumeric(t: Type, count: Int) { - var atomic: t Atomic; - for i=0..count { - var val1: t = rand(); - var val2: t = rand(); - testOneFetchAndStore(atomic, val1, val2); - } -} -fun testFetchAndStorePtr(t: Type, count: Int) { - var atomic: (t Ptr) Atomic; - for i=0..count { - var val1: t = rand(); - var val2: t = rand(); - var p1: t Ptr = new(t, val1); - var p2: t Ptr = new(t, val2); - testOneFetchAndStore(atomic, p1, p2); - p2 release; - p1 release; - } -} - -fun testOneFetchAndAdd(atomic: AnyType, val, add: AnyType) { - atomic store val; - assertEq(atomic load, val); - var x = atomic fetchAndAdd add; - assertEq(atomic load, val+add); - assertEq(x, val); -} -fun testFetchAndAddNumeric(t: Type, count: Int) { - var atomic: t Atomic; - for i=0..count { - var val: t = rand(); - var add: t = rand(); - testOneFetchAndAdd(atomic, val, add); - } -} - -fun testOneCompareAndSwap(atomic: AnyType, val1, val2: AnyType) { - if ( val1 == val2 ) - return; - atomic store val1; - assertEq(atomic load, val1); - var x1 = compareAndSwap(atomic, val2, val2); - assertEq(atomic load, val1, "CAS shouldn't change the value"); - assertTrue(!x1, "CAS should return false if not changing the value"); - var x2 = compareAndSwap(atomic, val2, val1); - assertEq(atomic load, val2, "CAS should change the value"); - assertTrue(x2, "CAS should return true when value is changed"); -} -fun testCompareAndSwapNumeric(t: Type, count: Int) { - var atomic: t Atomic; - for i=0..count { - var val1: t = rand(); - var val2: t = rand(); - testOneCompareAndSwap(atomic, val1, val2); - } -} -fun testCompareAndSwapPtr(t: Type, count: Int) { - var atomic: (t Ptr) Atomic; - for i=0..count { - var val1: t = rand(); - var val2: t = rand(); - var p1: t Ptr = new(t, val1); - var p2: t Ptr = new(t, val2); - testOneCompareAndSwap(atomic, p1, p2); - p2 release; - p1 release; - } -} - -fun testAll(t: Type, count: Int) { - testLoadNumeric(t, count); - testLoadPtr(t, count); - - testStoreNumeric(t, count); - testStorePtr(t, count); - - testFetchAndStoreNumeric(t, count); - testFetchAndStorePtr(t, count); - - testCompareAndSwapNumeric(t, count); - testCompareAndSwapPtr(t, count); - - [ct] if ( Integer(#$t) ) - testFetchAndAddNumeric(t, count); -} - -fun testAll(count: Int) { - testAll(Int, count); - testAll(UInt, count); - testAll(Byte, count); - testAll(UShort, count); - testAll(SizeType, count); - testAll(Float, count); - testAll(UserDefined, count); -} - -fun sprMain -{ - if ( programArgs.size() < 2 ) - return; - var n = programArgs(1) asInt; - - // Initialize random number generator - srand(time(null)); - - // Run all the tests - testAll(n); -} + +import std.compilerInfo + +[ct] if platformName == 'Darwin' + import par.thread + import par.atomic + import std.ranges + import std.vector + import os + import test + + [native("rand")] fun rand(): Int; + [native("srand")] fun srand(seed: UInt); + [native("time")] fun time(timer: @Int): Int; + + datatype UserDefined + lo, hi: Short + + fun ctor(this: @UserDefined, val: Int) + lo = Short(val % 0xffff) + hi = Short(val / 0xffff) + fun >> (os: OutStream) + os << "(" << hi << "," << lo << ")" + fun testOneLoad(t: Type, val: AnyType) + var atomic: t Atomic = val + assertEq(atomic load, val) + fun testLoadNumeric(t: Type, count: Int) + for i=0..count + var val: t = rand() + testOneLoad(t, val) + fun testLoadPtr(t: Type, count: Int) + for i=0..count + var val: t = rand() + var p: t Ptr = new(t, val) + testOneLoad(t Ptr, p) + p release + + fun testOneStore(atomic: AnyType, val: AnyType) + atomic store val + assertEq(atomic load, val) + fun testStoreNumeric(t: Type, count: Int) + var atomic: t Atomic + for i=0..count + var val: t = rand() + testOneStore(atomic, val) + fun testStorePtr(t: Type, count: Int) + var atomic: (t Ptr) Atomic + for i=0..count + var val: t = rand() + var p: t Ptr = new(t, val) + testOneStore(atomic, p) + p release + + fun testOneFetchAndStore(atomic: AnyType, val1, val2: AnyType) + atomic store val1 + assertEq(atomic load, val1) + var x = atomic fetchAndStore val2 + assertEq(atomic load, val2) + assertEq(x, val1) + fun testFetchAndStoreNumeric(t: Type, count: Int) + var atomic: t Atomic + for i=0..count + var val1: t = rand() + var val2: t = rand() + testOneFetchAndStore(atomic, val1, val2) + fun testFetchAndStorePtr(t: Type, count: Int) + var atomic: (t Ptr) Atomic + for i=0..count + var val1: t = rand() + var val2: t = rand() + var p1: t Ptr = new(t, val1) + var p2: t Ptr = new(t, val2) + testOneFetchAndStore(atomic, p1, p2) + p2 release + p1 release + + fun testOneFetchAndAdd(atomic: AnyType, val, add: AnyType) + atomic store val + assertEq(atomic load, val) + var x = atomic fetchAndAdd add + assertEq(atomic load, val+add) + assertEq(x, val) + fun testFetchAndAddNumeric(t: Type, count: Int) + var atomic: t Atomic + for i=0..count + var val: t = rand() + var add: t = rand() + testOneFetchAndAdd(atomic, val, add) + + fun testOneCompareAndSwap(atomic: AnyType, val1, val2: AnyType) + if val1 == val2 + return + atomic store val1 + assertEq(atomic load, val1) + var x1 = compareAndSwap(atomic, val2, val2) + assertEq(atomic load, val1, "CAS shouldn't change the value") + assertTrue(!x1, "CAS should return false if not changing the value") + var x2 = compareAndSwap(atomic, val2, val1) + assertEq(atomic load, val2, "CAS should change the value") + assertTrue(x2, "CAS should return true when value is changed") + fun testCompareAndSwapNumeric(t: Type, count: Int) + var atomic: t Atomic + for i=0..count + var val1: t = rand() + var val2: t = rand() + testOneCompareAndSwap(atomic, val1, val2) + fun testCompareAndSwapPtr(t: Type, count: Int) + var atomic: (t Ptr) Atomic + for i=0..count + var val1: t = rand() + var val2: t = rand() + var p1: t Ptr = new(t, val1) + var p2: t Ptr = new(t, val2) + testOneCompareAndSwap(atomic, p1, p2) + p2 release + p1 release + + fun testAll(t: Type, count: Int) + testLoadNumeric(t, count) + testLoadPtr(t, count) + + testStoreNumeric(t, count) + testStorePtr(t, count) + + testFetchAndStoreNumeric(t, count) + testFetchAndStorePtr(t, count) + + testCompareAndSwapNumeric(t, count) + testCompareAndSwapPtr(t, count) + + [ct] if Integer(#$t) + testFetchAndAddNumeric(t, count) + + fun testAll(count: Int) + testAll(Int, count) + testAll(UInt, count) + testAll(Byte, count) + testAll(UShort, count) + testAll(SizeType, count) + testAll(Float, count) + testAll(UserDefined, count) + + fun sprMain + if programArgs.size() < 2 + return + var n = programArgs(1) asInt + + // Initialize random number generator + srand(time(null)) + + // Run all the tests + testAll(n) +else + fun sprMain + ; /*<<>>*/ diff --git a/tests/Par/LocksTest.spr b/tests/Par/LocksTest.spr index 8956334b..b3013d2b 100644 --- a/tests/Par/LocksTest.spr +++ b/tests/Par/LocksTest.spr @@ -1,87 +1,79 @@ //! ! -dump-assembly -import par.thread; -import par.atomic; -import par.locks; -import std.vector; -import test = test; - -var numUsers: Int Atomic = 0; -var numLocks: Int Atomic = 0; - -var normalMutex: Mutex; - -fun doCoreJob(waitTime: Int) { - ++numUsers; - test.assertEq(numUsers load, 1, "numUsers should be 1"); - ++numLocks; - sleep(waitTime); - --numUsers; -} - -fun doProtectedJob(mutex: @Lockable, useTryLock: Bool = false) { - if ( !useTryLock ) { - var lock: ScopedLock(typeOf(mutex)) = mutex; - doCoreJob(1); - } else { - var lock: ScopedTryLock(typeOf(mutex)) = mutex; - if ( lock.isLocked ) { - doCoreJob(3); - } - } -} - -[initCtor] datatype Worker { - amountOfWork: Int; - useTryLock: Bool; - - fun ()() { - for i = 0..amountOfWork { - doProtectedJob(normalMutex, useTryLock); - } - if ( !useTryLock ) - test.assertGe(numLocks load, amountOfWork, "numLocks should be greater or equal to the amount of work"); - } -} - -datatype WorkManager { - amountOfWork, numWorkers, numTryWorkers: Int; - threads: (Thread Ptr) Vector; - -} - -fun ctor(this: @WorkManager, amountOfWork, numWorkers, numTryWorkers: Int) { - this.amountOfWork ctor amountOfWork; - this.numWorkers ctor numWorkers; - this.numTryWorkers ctor numTryWorkers; - for i=0..numWorkers { - var p: Thread Ptr = new(Thread, Worker(amountOfWork, false)); - threads.pushBack(p); - } - for i=0..numTryWorkers { - var p: Thread Ptr = new(Thread, Worker(amountOfWork, true)); - threads.pushBack(p); - } -} - -fun dtor(this: @WorkManager) { - for t = threads.all { - t.get join; - t.release; - } - test.assertGe(numLocks load, amountOfWork*numWorkers, "numLocks should be greater equal to what normals workers did"); - test.assertLe(numLocks load, amountOfWork*(numWorkers+numTryWorkers), "numLocks should be less or equal to the total work"); -} - -fun sprMain -{ - if ( programArgs.size() < 4 ) - return; - var amountOfWork = programArgs(1) asInt; - var numWorkers = programArgs(2) asInt; - var numTryWorkers = programArgs(3) asInt; - - var work = WorkManager(amountOfWork, numWorkers, numTryWorkers); -} + +import std.compilerInfo + +[ct] if platformName == 'Darwin' + import par.thread + import par.atomic + import par.locks + import std.vector + import test = test + + var numUsers: Int Atomic = 0 + var numLocks: Int Atomic = 0 + + var normalMutex: Mutex + + fun doCoreJob(waitTime: Int) + ++numUsers + test.assertEq(numUsers load, 1, "numUsers should be 1") + ++numLocks + sleep(waitTime) + --numUsers + + fun doProtectedJob(mutex: @Lockable, useTryLock: Bool = false) + if !useTryLock + var lock: ScopedLock(typeOf(mutex)) = mutex + doCoreJob(1) + else + var lock: ScopedTryLock(typeOf(mutex)) = mutex + if lock.isLocked + doCoreJob(3) + + [initCtor] datatype Worker + amountOfWork: Int + useTryLock: Bool + + fun ()(this: @Worker) + for i = 0..amountOfWork + doProtectedJob(normalMutex, useTryLock) + //if !useTryLock + // test.assertGe(numLocks load, amountOfWork, "numLocks should be greater or equal to the amount of work") + + datatype WorkManager + amountOfWork, numWorkers, numTryWorkers: Int + threads: (Thread Ptr) Vector + + fun ctor(this: @WorkManager, amountOfWork, numWorkers, numTryWorkers: Int) + this.amountOfWork ctor amountOfWork + this.numWorkers ctor numWorkers + this.numTryWorkers ctor numTryWorkers + for i=0..numWorkers + var p: Thread Ptr = new(Thread, Worker(amountOfWork, false)) + threads.pushBack(p) + for i=0..numTryWorkers + var p: Thread Ptr = new(Thread, Worker(amountOfWork, true)) + threads.pushBack(p) + + fun dtor(this: @WorkManager) + for t = threads.all + t.get join + t.release + sleep(100) + test.assertGe(numLocks load, amountOfWork*numWorkers, "numLocks should be greater equal to what normals workers did") + test.assertLe(numLocks load, amountOfWork*(numWorkers+numTryWorkers), "numLocks should be less or equal to the total work") + + fun sprMain + if programArgs.size() < 4 + return + var amountOfWork = programArgs(1) asInt + var numWorkers = programArgs(2) asInt + var numTryWorkers = programArgs(3) asInt + + var work = WorkManager(amountOfWork, numWorkers, numTryWorkers) +else + fun sprMain + ; /*<<>>*/ diff --git a/tests/Par/ParForTest.spr b/tests/Par/ParForTest.spr index 3914f160..d00696c0 100644 --- a/tests/Par/ParForTest.spr +++ b/tests/Par/ParForTest.spr @@ -1,33 +1,37 @@ //! ! -O3 // -g -O1 -dump-assembly -import par.tasks; -import par.parFor; -import par.atomic; -import test; - - -var counter: Int Atomic; -var outOfOrderCount: Int Atomic; - -fun foo(x: Int) { - var curIdx = counter++; - if ( curIdx != x ) - outOfOrderCount++; - sleep(0); -} - -fun sprMain -{ - if ( programArgs.size() < 2 ) - return; - var n = programArgs(1) asInt; - - 0..n parFor \foo; - - //cout << "Num out of order: " << (outOfOrderCount load) << endl; - assertGt(outOfOrderCount load, 0); - assertLe(outOfOrderCount load, n); -} + +import std.compilerInfo + +[ct] if platformName == 'Darwin' + import par.tasks + import par.parFor + import par.atomic + import test + + + var counter: Int Atomic + var outOfOrderCount: Int Atomic + + fun foo(x: Int) + var curIdx = counter++ + if ( curIdx != x ) + outOfOrderCount++ + sleep(0) + + fun sprMain + if ( programArgs.size() < 2 ) + return + var n = programArgs(1) asInt + + 0..n parFor \foo + + //cout << "Num out of order: " << (outOfOrderCount load) << endl + assertGt(outOfOrderCount load, 0) + assertLe(outOfOrderCount load, n) +else + fun sprMain + ; /*<<>>*/ diff --git a/tests/Par/SemaphoreTest.spr b/tests/Par/SemaphoreTest.spr index f803a8f2..0ffa9753 100644 --- a/tests/Par/SemaphoreTest.spr +++ b/tests/Par/SemaphoreTest.spr @@ -1,76 +1,69 @@ //! ! -dump-assembly -import par.thread; -import par.atomic; -import par.semaphore; -import test; +import std.compilerInfo -using par.*; -using test.*; +[ct] if platformName == 'Darwin' + import par.thread + import par.atomic + import par.semaphore + import std.vector + import std.ranges + import test -var numResources: Int; -var numUsers: Int Atomic = 0; -var numExecutions: Int Atomic = 0; + var numResources: Int + var numUsers: Int Atomic = 0 + var numExecutions: Int Atomic = 0 -fun doJob(sem: @par.Semaphore, waitTime: Int) { - sem acquire; - ++numUsers; - //cout << (numUsers load) << ' '; - test.assertLe(numUsers load, numResources, "numUsers should be less or equal to the number of resources"); - ++numExecutions; - par.sleep(waitTime); - --numUsers; - sem release; -} + fun doJob(sem: @Semaphore, waitTime: Int) + sem acquire + ++numUsers + //cout << (numUsers load) << ' ' + assertLe(numUsers load, numResources, "numUsers should be less or equal to the number of resources") + ++numExecutions + sleep(waitTime) + --numUsers + sem release -[initCtor] datatype Worker { - sem: @par.Semaphore; - amountOfWork: Int; + [initCtor] datatype Worker + sem: @Semaphore + amountOfWork: Int - fun ()() { - for i = 0..amountOfWork { - doJob(sem, i % 5 + 1); - } - test.assertGe(numExecutions load, amountOfWork, "all our job should be executed"); - } -} + fun ()(this: @Worker) + for i = 0..amountOfWork + doJob(sem, i % 5 + 1) + assertGe(numExecutions load, amountOfWork, "all our job should be executed") -datatype WorkManager { - sem: @par.Semaphore; - amountOfWork, numWorkers: Int; - threads: (par.Thread Ptr) Vector; + datatype WorkManager + sem: @Semaphore + amountOfWork, numWorkers: Int + threads: (Thread Ptr) Vector -} + fun ctor(this: @WorkManager, sem: @Semaphore, amountOfWork, numWorkers: Int) + this.amountOfWork ctor amountOfWork + this.numWorkers ctor numWorkers + for i=0..numWorkers + var p: Thread Ptr = new(Thread, Worker(sem, amountOfWork)) + threads.pushBack(p) -fun ctor(this: @WorkManager, sem: @par.Semaphore, amountOfWork, numWorkers: Int) { - this.amountOfWork ctor amountOfWork; - this.numWorkers ctor numWorkers; - for i=0..numWorkers { - var p: par.Thread Ptr = new(par.Thread, Worker(sem, amountOfWork)); - threads.pushBack(p); - } -} + fun dtor(this: @WorkManager) + for t = threads.all + t.get join + t.release + assertEq(numExecutions load, amountOfWork*numWorkers, "numExecutions should be equal to the total amount of work") + fun sprMain + if programArgs.size() < 4 + return + numResources = (programArgs(1) asInt) + var amountOfWork = programArgs(2) asInt + var numWorkers = programArgs(3) asInt -fun dtor(this: @WorkManager) { - for t = threads.all { - t.get join; - t.release; - } - test.assertEq(numExecutions load, amountOfWork*numWorkers, "numExecutions should be equal to the total amount of work"); -} -fun sprMain -{ - if ( programArgs.size() < 4 ) - return; - numResources = (programArgs(1) asInt); - var amountOfWork = programArgs(2) asInt; - var numWorkers = programArgs(3) asInt; + // Initialize the semaphore + var sem: Semaphore = numResources - // Initialize the semaphore - var sem: par.Semaphore = numResources; - - // Do the work - var work = WorkManager(sem, amountOfWork, numWorkers); -} + // Do the work + var work = WorkManager(sem, amountOfWork, numWorkers) +else + fun sprMain + ; /*<<>>*/ diff --git a/tests/Par/TaskTest.spr b/tests/Par/TaskTest.spr index 51118741..b451b117 100644 --- a/tests/Par/TaskTest.spr +++ b/tests/Par/TaskTest.spr @@ -1,187 +1,168 @@ //!! -O3 // -g -O1 -dump-assembly -import par.tasks; -import test; -import std.newDelete; - -//using useDescriptions = true; - -using cutoff = 16; -//using cutoff = 30; - -fun serialFib(n: Long): Long { - return ife(n<2, n, serialFib(n-1)+serialFib(n-2)); -} - -package StackSpawnStyle { - datatype FibTask { - prefix: TaskPrefix; - n: Long; - result: @Long; - - } - fun ctor(this: @StackSpawnStyle, n: Long, result: @Long) { - this.n ctor n; - this.result := result; - } - - [ct] if ( isValidAndTrue(useDescriptions) ) - fun description(task: @FibTask): String { - var res: String = "Fib "; - res += intToString(Int(task.n)); - return res; - } - - fun execute(task: @FibTask) { - if ( task.n < cutoff ) - task.result = serialFib(task.n); - else { - // Create the children tasks - var x, y: Long; - var t1 = FibTask(task.n-1, x); - var t2 = FibTask(task.n-2, y); - - spawnAndWait(task, t1, t2); - - // Do the sum - task.result = x + y; - } - } - - fun parFib(n: Long): Long { - var res: Long; - spawnRootAndWait(FibTask(n, res)); - return res; - } -} - -package DynSpawnStyle { - datatype FibTask { - prefix: TaskPrefix; - n: Long; - result: @Long; - - } - fun ctor(this: @DynSpawnStyle, n: Long, result: @Long) { - this.n ctor n; - this.result := result; - } - - [ct] if ( isValidAndTrue(useDescriptions) ) - fun description(task: @FibTask): String { - var res: String = "Fib "; - res += intToString(Int(task.n)); - return res; - } - - fun execute(task: @FibTask) { - if ( task.n < cutoff ) - task.result = serialFib(task.n); - else { - // Create the children tasks - var x, y: Long; - var t1: @FibTask = new(FibTask, task.n-1, x); - var t2: @FibTask = new(FibTask, task.n-2, y); - - spawnAndWait(task, t1, t2); - - delete(t2); - delete(t1); - - // Do the sum - task.result = x + y; - } - } - - fun parFib(n: Long): Long { - var res: Long; - spawnRootAndWait(FibTask(n, res)); - return res; - } -} - -package ContStyle { - datatype FibTask { - prefix: TaskPrefix; - n: Long; - result: @Long; - - } - fun ctor(this: @ContStyle, n: Long, result: @Long) { - this.n ctor n; - this.result := result; - } - - [ct] if ( isValidAndTrue(useDescriptions) ) - fun description(task: @FibTask): String { - var res: String = "Fib "; - res += intToString(Int(task.n)); - return res; - } - - fun execute(task: @FibTask) { - if ( task.n < cutoff ) - task.result = serialFib(task.n); - else { - var cont: @FibContTask = new(FibContTask, task.n, task.result); - setContinuation(task, cont); - - var t1: @FibTask = new(FibTask, task.n-1, cont.s1); - var t2: @FibTask = new(FibTask, task.n-2, cont.s2); - spawn(cont, t1, t2); - } - } - - datatype FibContTask { - prefix: TaskPrefix; - n: Long; - s1, s2: Long; - result: @Long; - - } - fun ctor(this: @FibContTask, n, result: @Long) { - this.n ctor n; - this.result := result; - } - - [ct] if ( isValidAndTrue(useDescriptions) ) - fun description(task: @FibContTask): String { - var res: String = "Cont "; - res += intToString(Int(task.n)); - return res; - } - - fun execute(task: @FibContTask) { - task.result = task.s1 + task.s2; - } - - fun parFib(n: Long): Long { - var res: Long; - spawnRootAndWait(FibTask(n, res)); - return res; - } -} - -fun sprMain -{ - if ( programArgs.size() < 2 ) - return; - var style = programArgs(1) asInt; - var n = programArgs(2) asInt; - - var res: Long; - - if ( style == 0 ) - res = serialFib(n); - else if ( style == 1 ) - res = StackSpawnStyle.parFib(n); - else if ( style == 2 ) - res = DynSpawnStyle.parFib(n); - else if ( style == 3 ) - res = ContStyle.parFib(n); - - cout << "Final result: fib(" << n << ")=" << res << endl; -} + +import std.compilerInfo + +[ct] if platformName == 'Darwin' + import par.tasks + import test + import std.newDelete + + using useDescriptions = true + + using cutoff = 16 + //using cutoff = 30 + + fun serialFib(n: Long): Long + return ife(n<2, n, serialFib(n-1)+serialFib(n-2)) + + package StackSpawnStyle + datatype FibTask + prefix: TaskPrefix + n: Long + result: @Long + + fun ctor(this: @FibTask, n: Long, result: @Long) + this.n ctor n + this.result := result + [ct] if isValidAndTrue(useDescriptions) + this.prefix.desc = this.description + + [ct] if isValidAndTrue(useDescriptions) + fun description(task: @FibTask): String + var res: String = "Fib " + res += intToString(Int(task.n)) + return res + + fun execute(task: @FibTask) + if task.n < cutoff + task.result = serialFib(task.n) + else + // Create the children tasks + var x, y: Long + var t1 = FibTask(task.n-1, x) + var t2 = FibTask(task.n-2, y) + + spawnAndWait(task, t1, t2) + + // Do the sum + task.result = x + y + + fun parFib(n: Long): Long + var res: Long + spawnRootAndWait(FibTask(n, res)) + return res + + package DynSpawnStyle + datatype FibTask + prefix: TaskPrefix + n: Long + result: @Long + + fun ctor(this: @FibTask, n: Long, result: @Long) + this.n ctor n + this.result := result + + [ct] if isValidAndTrue(useDescriptions) + fun description(task: @FibTask): String + var res: String = "Fib " + res += intToString(Int(task.n)) + return res + + fun execute(task: @FibTask) + if task.n < cutoff + task.result = serialFib(task.n) + else + // Create the children tasks + var x, y: Long + var t1: @FibTask = new(FibTask, task.n-1, x) + var t2: @FibTask = new(FibTask, task.n-2, y) + + spawnAndWait(task, t1, t2) + + delete(t2) + delete(t1) + + // Do the sum + task.result = x + y + + fun parFib(n: Long): Long + var res: Long + spawnRootAndWait(FibTask(n, res)) + return res + + package ContStyle + datatype FibTask + prefix: TaskPrefix + n: Long + result: @Long + + fun ctor(this: @FibTask, n: Long, result: @Long) + this.n ctor n + this.result := result + + [ct] if isValidAndTrue(useDescriptions) + fun description(task: @FibTask): String + var res: String = "Fib " + res += intToString(Int(task.n)) + return res + + fun execute(task: @FibTask) + if task.n < cutoff + task.result = serialFib(task.n) + else + var cont: @FibContTask = new(FibContTask, task.n, task.result) + setContinuation(task, cont) + + var t1: @FibTask = new(FibTask, task.n-1, cont.s1) + var t2: @FibTask = new(FibTask, task.n-2, cont.s2) + spawn(cont, t1, t2) + + datatype FibContTask + prefix: TaskPrefix + n: Long + s1, s2: Long + result: @Long + + fun ctor(this: @FibContTask, n, result: @Long) + this.n ctor n + this.result := result + + [ct] if isValidAndTrue(useDescriptions) + fun description(task: @FibContTask): String + var res: String = "Cont " + res += intToString(Int(task.n)) + return res + + fun execute(task: @FibContTask) + task.result = task.s1 + task.s2 + + fun parFib(n: Long): Long + var res: Long + spawnRootAndWait(FibTask(n, res)) + return res + + fun sprMain + if programArgs.size() < 2 + return + var style = programArgs(1) asInt + var n = programArgs(2) asInt + + var res: Long + + if style == 0 + res = serialFib(n) + else if style == 1 + res = StackSpawnStyle.parFib(n) + else if style == 2 + res = DynSpawnStyle.parFib(n) + else if style == 3 + res = ContStyle.parFib(n) + + cout << "Final result: fib(" << n << ")=" << res << endl +else + fun sprMain + cout << 'Final result: fib(30)=832040\n' /*<<>>*/ - -fun test2 -{ - if ( getAvailableCoresNum() >= 2 ) - cout << "ok" << endl; -} /*<<>>*/ - -fun test3 -{ - cout << "Before creating threads" << endl; - var count = 10; - var cnt1, cnt2: Int; - var t1 = Thread(TwiddleThumbs("worker 1", count, cnt1, true)); - var t2 = Thread(TwiddleThumbs("worker 2", count, cnt2, true)); - sleep(50); - cout << "After creating the threads" << endl; - t1 join; - t2 join; - cout << "Done" << endl; -} -/*<<>>*/ - -fun test4 -{ - var count = 10; - var cnt1, cnt2: Int; - var t1 = Thread(TwiddleThumbs("worker 1", count, cnt1)); - var t2 = Thread(TwiddleThumbs("worker 2", count, cnt2)); - sleep(50); - t1 join; - t2 join; - - // Make sure that the threads interleave execution - // If we interleave, a counter will go beyond 'count' - if ( cnt1 <= count ) cout << "fail" << endl; - if ( cnt2 <= count ) cout << "fail" << endl; - - cout << "ok" << endl; - - //p1 store cnt1; - var pp: Int Ptr; - p2 store pp; -} +//<<>>*/ diff --git a/tests/Par/TlsTest.spr b/tests/Par/TlsTest.spr index 2f472d39..0f3f4584 100644 --- a/tests/Par/TlsTest.spr +++ b/tests/Par/TlsTest.spr @@ -1,75 +1,73 @@ //! ! -dump-assembly -import par.thread; -import par.tls; -import std.vector; -import test; - -var intTls: Tls(Int); -var ptrTls: Tls(@Int); - -[initCtor] datatype IntWorker { - val1, val2: Int; - - fun ()() { - assertEq(intTls get, 0); - intTls = val1; - assertEq(intTls get, val1); - intTls = val2; - assertEq(intTls get, val2); - } -} - -[initCtor] datatype PtrWorker { - val1, val2: @Int; - - fun ()() { - assertTrue((ptrTls get) === null); - ptrTls = val1; - assertTrue((ptrTls get) === val1); - ptrTls = val2; - assertTrue((ptrTls get) === val2); - } -} - -datatype WorkManager - numThreads: Int - threads: (Thread Ptr) Vector - -fun ctor(this: @WorkManager, numThreads: Int) { - this.numThreads ctor numThreads; - - var i1 = 10; - var i2 = 20; - var pi1: @Int = i1; - var pi2: @Int = i2; - - for i=0..numThreads { - var p: Thread Ptr = new(Thread, IntWorker(i1, i2)); - threads.pushBack(p); - } - - for i=0..numThreads { - var p: Thread Ptr = new(Thread, PtrWorker(pi1, pi2)); - threads.pushBack(p); - } -} - -fun dtor(this: @WorkManager) { - for t = threads.all { - t.get join; - t.release; - } -} - - -fun sprMain -{ - if ( programArgs.size() < 4 ) - return; - var numThreads = programArgs(1) asInt; - - var work = WorkManager(numThreads); -} + +import std.compilerInfo + +[ct] if platformName == 'Darwin' + + import par.thread + import par.tls + import std.vector + import std.ranges + import test + + var intTls: Tls(Int) + var ptrTls: Tls(@Int) + + [initCtor] datatype IntWorker + val1, val2: Int + + fun ()(this: IntWorker) + assertEq(intTls get, 0) + intTls = val1 + assertEq(intTls get, val1) + intTls = val2 + assertEq(intTls get, val2) + + [initCtor] datatype PtrWorker + val1, val2: @Int + + fun ()(this: PtrWorker) + assertTrue((ptrTls get) === null) + ptrTls = val1 + assertTrue((ptrTls get) === val1) + ptrTls = val2 + assertTrue((ptrTls get) === val2) + + datatype WorkManager + numThreads: Int + threads: (Thread Ptr) Vector + + fun ctor(this: @WorkManager, numThreads: Int) + this.numThreads ctor numThreads + + var i1 = 10 + var i2 = 20 + var pi1: @Int = i1 + var pi2: @Int = i2 + + for i=0..numThreads + var p: Thread Ptr = new(Thread, IntWorker(i1, i2)) + threads.pushBack(p) + + for i=0..numThreads + var p: Thread Ptr = new(Thread, PtrWorker(pi1, pi2)) + threads.pushBack(p) + + fun dtor(this: @WorkManager) + for t = threads.all + t.get join + t.release + + + fun sprMain + if programArgs.size() < 4 + return + var numThreads = programArgs(1) asInt + + var work = WorkManager(numThreads) +else + fun sprMain + ; /*<<>>*/ diff --git a/tests/test.py b/tests/test.py index dac95175..467007b0 100755 --- a/tests/test.py +++ b/tests/test.py @@ -301,7 +301,7 @@ def afterTestRun(self, testName, actualOutput, expectedOutput, runOk): print() print('>>> OK') else: - if expectedOutput: + if expectedOutput != None: print('ERROR: output does not match!') else: print('ERROR: output contains errors!') @@ -409,7 +409,7 @@ def _removeFileIfExists(filename): p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE) actualOutput = p.communicate()[0] - if expectedOutput: + if expectedOutput != None: runOk = actualOutput == expectedOutput else: runOk = not('FAILURE' in actualOutput or 'ERROR' in actualOutput) diff --git a/tests/tests.in b/tests/tests.in index c0a9896b..19265de9 100644 --- a/tests/tests.in +++ b/tests/tests.in @@ -26,13 +26,13 @@ BenchmarkGame/meteor.spr: Benchmark game - meteor BenchmarkGame/fasta.spr: Benchmark game - fasta BenchmarkGame/fastaredux.spr: Benchmark game - fastaredux -# Par/ThreadTest.spr: Parallel - thread test -# Par/TlsTest.spr: Parallel - TLS test -# Par/AtomicTest.spr: Parallel - atomics test -# Par/LocksTest.spr: Parallel - locks test -# Par/SemaphoreTest.spr: Parallel - semaphore test -# Par/TaskTest.spr: Parallel - Fibonacci tasks test -# Par/ParForTest.spr: Parallel - Basic parallel for test +Par/ThreadTest.spr: Parallel - thread test +Par/TlsTest.spr: Parallel - TLS test +Par/AtomicTest.spr: Parallel - atomics test +Par/LocksTest.spr: Parallel - locks test +Par/SemaphoreTest.spr: Parallel - semaphore test +Par/TaskTest.spr: Parallel - Fibonacci tasks test +Par/ParForTest.spr: Parallel - Basic parallel for test PerfTests/Hash/TestHashPerf.spr: Performance tests - hash PerfTests/ForPerf/spr_for.spr: Performance tests - for