From b796d1a0efb15cad68e2b06ddaa305b08cf5f722 Mon Sep 17 00:00:00 2001 From: Mamy Ratsimbazafy Date: Sun, 24 Nov 2019 10:54:11 +0100 Subject: [PATCH] Change steal requests to lockless unbounded mpsc (#21) * Stashing: Sadly, we might never know the perf of this channel, back to the drawing board - https://github.com/nim-lang/Nim/issues/12695 * Fix queue, workaround https://github.com/nim-lang/Nim/issues/12695 * Fix fence issue: no strange memory reuse anymore with both Nim and system alloc * Fix #19 #20: Snmalloc / Pony-lang MPSC queues issues: - they hold on the last item of a queue (breaking for steal requests) - they require memory management of the dummy node (snmalloc deletes it and its memory doesn't seem to be reclaimed) - they never touch the "back" pointer of the queue when dequeuing, meaning if an item was last, dequeuing will still points to it. Pony has an emptiness check via tagged pointer and snmalloc does ??? --- weave/channels/channels_mpsc_unbounded.nim | 149 ++++++++++++------ weave/contexts.nim | 4 +- weave/datatypes/context_global.nim | 3 +- weave/datatypes/sparsesets.nim | 3 - weave/datatypes/sync_types.nim | 8 +- .../compiler_optimization_hints.nim | 4 +- weave/runtime.nim | 4 +- weave/scheduler.nim | 9 +- weave/thieves.nim | 8 +- weave/victims.nim | 7 +- 10 files changed, 133 insertions(+), 66 deletions(-) diff --git a/weave/channels/channels_mpsc_unbounded.nim b/weave/channels/channels_mpsc_unbounded.nim index 83a6c50..f7add05 100644 --- a/weave/channels/channels_mpsc_unbounded.nim +++ b/weave/channels/channels_mpsc_unbounded.nim @@ -1,12 +1,14 @@ import std/atomics, - ../config, # TODO: for CacheLineSize - ../primitives/compiler_optimization_hints # for prefetch + ../config, + ../primitives/compiler_optimization_hints, # for prefetch + ../instrumentation/[contracts, loggers] type Enqueueable = concept x, type T x is ptr - x.next is Atomic[T] + x.next is Atomic[pointer] + # Workaround generic atomics bug: https://github.com/nim-lang/Nim/issues/12695 ChannelMpscUnbounded*[T: Enqueueable] = object ## Lockless multi-producer single-consumer channel @@ -16,72 +18,129 @@ type ## - Lock-free (?): Progress guarantees to determine ## - Unbounded ## - Intrusive List based + ## - Keep an approximate count on enqueued # TODO: pass this through Relacy and Valgrind/Helgrind # to make sure there are no bugs # on arch with relaxed memory models + count: Atomic[int] + dummy: typeof(default(T)[]) # Deref the pointer type + pad0: array[WV_CacheLineSize - sizeof(pointer), byte] front: T - # TODO: align - back: Atomic[T] - -proc initialize*[T](chan: var ChannelMpscUnbounded[T], dummy: T) = - ## This queue is designed for use within a thread-safe allocator - ## It requires an allocated dummy node for initialization - ## but cannot rely on an allocator. - assert not dummy.isNil - dummy.next.store(nil, moRelaxed) - chan.front = dummy - chan.back.store(dummy, moRelaxed) - - assert not(chan.front.isNil) - assert not(chan.back.load(moRelaxed).isNil) - -proc removeDummy*[T](chan: var ChannelMpscUnbounded[T]): T = - ## Remove dummy for its deallocation - ## The queue should be testroyed afterwards - assert not(chan.front.isNil) - assert not(chan.back.load(moRelaxed).isNil) - # Only the dummy should be left - assert chan.front == chan.back.load(moRelease) - assert chan.front.next.load(moRelease).isNil - - result = chan.front - chan.front = nil - chan.back.store(nil, moRelaxed) - -proc trySend*[T](chan: var ChannelMpscUnbounded[T], src: sink T): bool = + pad1: array[WV_CacheLineSize - sizeof(int), byte] + back: Atomic[pointer] # Workaround generic atomics bug: https://github.com/nim-lang/Nim/issues/12695 + +template checkInvariants(): untyped = + ascertain: not(chan.front.isNil) + ascertain: not(chan.back.load(moRelaxed).isNil) + +proc initialize*[T](chan: var ChannelMpscUnbounded[T]) = + # We keep a dummy node within the queue itself + # it doesn't need any dynamic allocation which simplify + # its use in an allocator + chan.dummy.reset() + chan.front = chan.dummy.addr + chan.back.store(chan.dummy.addr, moRelaxed) + +proc trySendImpl[T](chan: var ChannelMpscUnbounded[T], src: sink T, count: static bool): bool {.inline.}= ## Send an item to the back of the channel ## As the channel as unbounded capacity, this should never fail - assert not(chan.front.isNil) - assert not(chan.back.load(moRelaxed).isNil) + checkInvariants() src.next.store(nil, moRelaxed) fence(moRelease) let oldBack = chan.back.exchange(src, moRelaxed) - oldBack.next.store(src, moRelaxed) + cast[T](oldBack).next.store(src, moRelaxed) # Workaround generic atomics bug: https://github.com/nim-lang/Nim/issues/12695 + when count: + discard chan.count.fetchAdd(1, moRelaxed) return true +proc trySend*[T](chan: var ChannelMpscUnbounded[T], src: sink T): bool = + # log("Channel 0x%.08x trySend - front: 0x%.08x (%d), second: 0x%.08x, back: 0x%.08x\n", chan.addr, chan.front, chan.front.val, chan.front.next, chan.back) + chan.trySendImpl(src, count = true) + +proc reenqueueDummy[T](chan: var ChannelMpscUnbounded[T]) = + # log("Channel 0x%.08x reenqueing dummy\n") + discard chan.trySendImpl(chan.dummy.addr, count = false) + proc tryRecv*[T](chan: var ChannelMpscUnbounded[T], dst: var T): bool = ## Try receiving the next item buffered in the channel ## Returns true if successful (channel was not empty) + ## This can fail spuriously on the last element if producer + ## enqueues a new element while the consumer was dequeing it assert not(chan.front.isNil) assert not(chan.back.load(moRelaxed).isNil) - let first = chan.front # dummy - let next = first.next.load(moRelaxed) + var first = chan.front + # Workaround generic atomics bug: https://github.com/nim-lang/Nim/issues/12695 + var next = cast[T](first.next.load(moRelaxed)) - if not next.isNil: + # log("Channel 0x%.08x tryRecv - first: 0x%.08x (%d), next: 0x%.08x (%d), last: 0x%.08x\n", + # chan.addr, first, first.val, next, if not next.isNil: next.val else: 0, chan.back) + + if first == chan.dummy.addr: + # First node is the dummy + if next.isNil: + # Dummy has no next node + return false + # Overwrite the dummy, with the real first element chan.front = next + first = next + next = cast[T](next.next.load(moRelaxed)) + + # Fast-path + if not next.isNil: + # second element exist, setup the queue, only consumer touches the front + chan.front = next # switch the front prefetch(first.next.load(moRelaxed)) + # Publish the changes fence(moAcquire) - dst = next + dst = first + discard chan.count.fetchSub(1, moRelaxed) return true + # End fast-path + + # No second element, but we really need something to take + # the place of the first, have a look on the producer side + fence(moAcquire) + let last = chan.back.load(moRelaxed) + if first != last: + # A producer got ahead of us, spurious failure + return false + + # Reenqueue dummy, it is now in the second slot or later + chan.reenqueueDummy() + # Reload the second item + next = cast[T](first.next.load(moRelaxed)) - dst = nil + if not next.isNil: + # second element exist, setup the queue, only consumer touches the front + chan.front = next # switch the front + prefetch(first.next.load(moRelaxed)) + # Publish the changes + fence(moAcquire) + dst = first + discard chan.count.fetchSub(1, moRelaxed) + return true + + # No empty element?! There was a race in enqueueing + # and the new "next" still isn't published + # spurious failure return false +func peek*(chan: var ChannelMpscUnbounded): int32 {.inline.} = + ## Estimates the number of items pending in the channel + ## - If called by the consumer the true number might be more + ## due to producers adding items concurrently. + ## - If called by a producer the true number is undefined + ## as other producers also add items concurrently and + ## the consumer removes them concurrently. + ## + ## This is a non-locking operation. + result = int32 chan.count.load(moRelaxed) + # Sanity checks # ------------------------------------------------------------------------------ when isMainModule: @@ -110,7 +169,7 @@ when isMainModule: while not chan.tryRecv(data): body - const NumVals = 100000 + const NumVals = 1000000 const Padding = 10 * NumVals # Pad with a 0 so that iteration 10 of thread 3 is 3010 with 99 max iters type @@ -134,7 +193,7 @@ when isMainModule: Val = ptr ValObj ValObj = object - next: Atomic[Val] + next: Atomic[pointer] val: int ThreadArgs = object @@ -185,7 +244,7 @@ when isMainModule: args.chan[].recvLoop(val): # Busy loop, in prod we might want to yield the core/thread timeslice discard - # echo "Receiver got: ", val.val, " at address 0x", toLowerASCII toHex cast[ByteAddress](val) + # log("Receiver got: %d at address 0x%.08x\n", val.val, val) let sender = WorkerKind(val.val div Padding) doAssert val.val == counts[sender] + ord(sender) * Padding, "Incorrect value: " & $val.val inc counts[sender] @@ -209,8 +268,7 @@ when isMainModule: echo "------------------------------------------------------------------------" var threads: array[WorkerKind, Thread[ThreadArgs]] let chan = createSharedU(ChannelMpscUnbounded[Val]) # CreateU is not zero-init - let dummy = valAlloc() - chan[].initialize(dummy) + chan[].initialize() createThread(threads[Receiver], thread_func, ThreadArgs(ID: Receiver, chan: chan)) for sender in Sender1..Sender15: @@ -219,7 +277,6 @@ when isMainModule: for worker in WorkerKind: joinThread(threads[worker]) - chan[].removeDummy.valFree() deallocShared(chan) echo "------------------------------------------------------------------------" echo "Success" diff --git a/weave/contexts.nim b/weave/contexts.nim index 9987779..c5a1b48 100644 --- a/weave/contexts.nim +++ b/weave/contexts.nim @@ -7,7 +7,7 @@ import ./datatypes/[context_global, context_thread_local, sync_types], - ./channels/[channels_spsc_single_ptr, channels_mpsc_bounded_lock], + ./channels/[channels_spsc_single_ptr, channels_mpsc_bounded_lock, channels_mpsc_unbounded], ./memory/[persistacks, intrusive_stacks], ./config, system/ansi_c, @@ -65,7 +65,7 @@ proc initialize*[T](c: var ChannelLegacy[T], size: int32) = proc delete*[T](c: var ChannelLegacy[T]) = channel_free(c) -template myThieves*: ChannelLegacy[StealRequest] = +template myThieves*: ChannelMpscUnbounded[StealRequest] = globalCtx.com.thefts[localCtx.worker.ID] template workforce*: int32 = diff --git a/weave/datatypes/context_global.nim b/weave/datatypes/context_global.nim index 6d2bef5..3b50ff7 100644 --- a/weave/datatypes/context_global.nim +++ b/weave/datatypes/context_global.nim @@ -7,6 +7,7 @@ import ../channels/channels_mpsc_bounded_lock, + ../channels/channels_mpsc_unbounded, ../channels/channels_spsc_single_ptr, ../memory/persistacks, ../config, @@ -35,7 +36,7 @@ type # would work but then it requires a pointer indirection # per channel # and a known max number of workers - thefts*: ptr UncheckedArray[ChannelLegacy[StealRequest]] + thefts*: ptr UncheckedArray[ChannelMPSCunbounded[StealRequest]] tasks*: ptr UncheckedArray[Persistack[WV_MaxConcurrentStealPerWorker, ChannelSpscSinglePtr[Task]]] GlobalContext* = object diff --git a/weave/datatypes/sparsesets.nim b/weave/datatypes/sparsesets.nim index be299a8..3cf4412 100644 --- a/weave/datatypes/sparsesets.nim +++ b/weave/datatypes/sparsesets.nim @@ -49,9 +49,6 @@ type func allocate*(s: var SparseSet, capacity: SomeInteger) = preCondition: capacity <= WV_MaxWorkers - preCondition: s.indices.isNil - preCondition: s.values.isNil - preCondition: s.rawBuffer.isNil s.capacity = Setuint capacity s.rawBuffer = wv_alloc(Setuint, 2*capacity) diff --git a/weave/datatypes/sync_types.nim b/weave/datatypes/sync_types.nim index 71f58c8..307a3fb 100644 --- a/weave/datatypes/sync_types.nim +++ b/weave/datatypes/sync_types.nim @@ -10,7 +10,8 @@ import ../config, ../channels/channels_spsc_single_ptr, ../instrumentation/contracts, - ../memory/allocs + ../memory/allocs, + std/atomics # Inter-thread synchronization types # ---------------------------------------------------------------------------------- @@ -72,7 +73,10 @@ type # Padding shouldn't be needed as steal requests are used as value types # and deep-copied between threads StealRequest* = ptr object - thiefAddr*: ptr ChannelSpscSinglePtr[Task] # Channel for sending tasks back to the thief + # TODO: padding to cache line + # TODO: Remove workaround generic atomics bug: https://github.com/nim-lang/Nim/issues/12695 + next*: Atomic[pointer] # For intrusive lists and queues + thiefAddr*: ptr ChannelSpscSinglePtr[Task] # Channel for sending tasks back to the thief thiefID*: WorkerID retry*: int32 # 0 <= retry <= num_workers victims*: SparseSet # set of potential victims diff --git a/weave/primitives/compiler_optimization_hints.nim b/weave/primitives/compiler_optimization_hints.nim index 1dd3066..175acbb 100644 --- a/weave/primitives/compiler_optimization_hints.nim +++ b/weave/primitives/compiler_optimization_hints.nim @@ -18,8 +18,8 @@ const withBuiltins = defined(gcc) or defined(clang) or defined(icc) when withBuiltins: proc builtin_prefetch(data: pointer, rw: PrefetchRW, locality: PrefetchLocality) {.importc: "__builtin_prefetch", noDecl.} -template prefetch*[T]( - data: ptr (T or UncheckedArray[T]), +template prefetch*( + data: pointer, rw: static PrefetchRW = Read, locality: static PrefetchLocality = HighTemporalLocality) = ## Prefetch examples: diff --git a/weave/runtime.nim b/weave/runtime.nim index 2921bb7..8cc4339 100644 --- a/weave/runtime.nim +++ b/weave/runtime.nim @@ -12,7 +12,7 @@ import ./instrumentation/[contracts, profilers, loggers], ./contexts, ./config, ./datatypes/[sync_types, prell_deques], - ./channels/[channels_mpsc_bounded_lock, channels_spsc_single_ptr], + ./channels/[channels_mpsc_bounded_lock, channels_spsc_single_ptr, channels_mpsc_unbounded], ./memory/[persistacks, intrusive_stacks, allocs], ./scheduler, ./signals, ./workers, ./thieves, ./victims, # Low-level primitives @@ -40,7 +40,7 @@ proc init*(_: type Runtime) = ## Allocation of the global context. globalCtx.threadpool = wv_alloc(Thread[WorkerID], workforce()) - globalCtx.com.thefts = wv_alloc(ChannelLegacy[StealRequest], workforce()) + globalCtx.com.thefts = wv_alloc(ChannelMpscUnbounded[StealRequest], workforce()) globalCtx.com.tasks = wv_alloc(Persistack[WV_MaxConcurrentStealPerWorker, ChannelSpscSinglePtr[Task]], workforce()) discard pthread_barrier_init(globalCtx.barrier, nil, workforce()) diff --git a/weave/scheduler.nim b/weave/scheduler.nim index f9778f5..ed3168d 100644 --- a/weave/scheduler.nim +++ b/weave/scheduler.nim @@ -9,8 +9,8 @@ import ./instrumentation/[contracts, profilers, loggers], ./primitives/barriers, ./datatypes/[sync_types, prell_deques, context_thread_local, flowvars, sparsesets], - ./channels/[channels_mpsc_bounded_lock, channels_spsc_single_ptr, channels_spsc_single_object], - ./memory/[persistacks, intrusive_stacks], + ./channels/[channels_mpsc_bounded_lock, channels_spsc_single_ptr, channels_spsc_single_object, channels_mpsc_unbounded], + ./memory/[persistacks, intrusive_stacks, allocs], ./contexts, ./config, ./victims, ./loop_splitting, ./thieves, ./workers, @@ -28,9 +28,9 @@ proc init*(ctx: var TLContext) = ## Initialize the thread-local context of a worker (including the lead worker) myWorker().deque = newPrellDeque(Task) - myThieves().initialize(WV_MaxConcurrentStealPerWorker * workforce()) myTodoBoxes().initialize() myWorker().initialize(maxID = workforce() - 1) + myThieves().initialize() localCtx.stealCache.initialize() for i in 0 ..< localCtx.stealCache.len: @@ -38,7 +38,7 @@ proc init*(ctx: var TLContext) = ascertain: myTodoBoxes().len == WV_MaxConcurrentStealPerWorker - # Workers see their RNG with their myID() + # Workers seed their RNG with their myID() myThefts().rng.seed(myID()) # Thread-Local Profiling @@ -148,7 +148,6 @@ proc schedulingLoop() = proc threadLocalCleanup*() = myWorker().deque.delete() - myThieves().delete() for i in 0 ..< WV_MaxConcurrentStealPerWorker: # No tasks left diff --git a/weave/thieves.nim b/weave/thieves.nim index c01ad55..ba9e2c2 100644 --- a/weave/thieves.nim +++ b/weave/thieves.nim @@ -9,9 +9,10 @@ import ./datatypes/[sparsesets, sync_types, context_thread_local], ./contexts, ./targets, ./instrumentation/[contracts, profilers, loggers], - ./channels/channels_mpsc_bounded_lock, + ./channels/channels_mpsc_unbounded, ./memory/persistacks, - ./config, ./signals + ./config, ./signals, + std/atomics # Thief # ---------------------------------------------------------------------------------- @@ -22,6 +23,7 @@ proc newStealRequest(): StealRequest {.inline.} = result = localCtx.stealCache.borrow() ascertain: result.victims.capacity.int32 == workforce() + result.next.store(nil, moRelaxed) result.thiefAddr = myTodoBoxes.borrow() result.thiefID = myID() result.retry = 0 @@ -36,6 +38,8 @@ proc newStealRequest(): StealRequest {.inline.} = proc rawSend(victimID: WorkerID, req: sink StealRequest) {.inline.}= ## Send a steal or work sharing request # TODO: check for race condition on runtime exit + # log("Worker %d: sending request 0x%.08x to %d (Channel: 0x%.08x)\n", + # myID(), cast[ByteAddress](req), victimID, globalCtx.com.thefts[victimID].addr) let stealRequestSent = globalCtx.com .thefts[victimID] .trySend(req) diff --git a/weave/victims.nim b/weave/victims.nim index f200c4a..001b2e9 100644 --- a/weave/victims.nim +++ b/weave/victims.nim @@ -10,7 +10,7 @@ import sparsesets, prell_deques, flowvars], ./contexts, ./config, ./instrumentation/[contracts, profilers, loggers], - ./channels/[channels_mpsc_bounded_lock, channels_spsc_single_ptr], + ./channels/[channels_mpsc_bounded_lock, channels_spsc_single_ptr, channels_mpsc_unbounded], ./thieves, ./loop_splitting # Victims - Adaptative task splitting @@ -38,6 +38,11 @@ proc recv*(req: var StealRequest): bool {.inline.} = profile(send_recv_req): result = myThieves().tryRecv(req) + debug: + if result: + log("Worker %d: receives request 0x%.08x from %d with %d potential victims. (Channel: 0x%.08x)\n", + myID(), cast[ByteAddress](req), req.thiefID, req.victims.len, myThieves().addr) + # We treat specially the case where children fail to steal # and defer to the current worker (their parent) while result and req.state == Waiting: