Skip to content

Commit

Permalink
Change steal requests to lockless unbounded mpsc (#21)
Browse files Browse the repository at this point in the history
* Stashing: Sadly, we might never know the perf of this channel, back to the drawing board - nim-lang/Nim#12695

* Fix queue, workaround nim-lang/Nim#12695

* Fix fence issue: no strange memory reuse anymore with both Nim and system alloc

* Fix #19 #20: Snmalloc / Pony-lang MPSC queues issues:
- they hold on the last item of a queue (breaking for steal requests)
- they require memory management of the dummy node (snmalloc deletes it and its memory doesn't seem to be reclaimed)
- they never touch the "back" pointer of the queue when dequeuing, meaning if an item was last, dequeuing will still points to it. Pony has an emptiness check via tagged pointer and snmalloc does ???
  • Loading branch information
mratsim authored Nov 24, 2019
1 parent 9feb8d0 commit b796d1a
Show file tree
Hide file tree
Showing 10 changed files with 133 additions and 66 deletions.
149 changes: 103 additions & 46 deletions weave/channels/channels_mpsc_unbounded.nim
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import
std/atomics,
../config, # TODO: for CacheLineSize
../primitives/compiler_optimization_hints # for prefetch
../config,
../primitives/compiler_optimization_hints, # for prefetch
../instrumentation/[contracts, loggers]

type
Enqueueable = concept x, type T
x is ptr
x.next is Atomic[T]
x.next is Atomic[pointer]
# Workaround generic atomics bug: https://github.com/nim-lang/Nim/issues/12695

ChannelMpscUnbounded*[T: Enqueueable] = object
## Lockless multi-producer single-consumer channel
Expand All @@ -16,72 +18,129 @@ type
## - Lock-free (?): Progress guarantees to determine
## - Unbounded
## - Intrusive List based
## - Keep an approximate count on enqueued

# TODO: pass this through Relacy and Valgrind/Helgrind
# to make sure there are no bugs
# on arch with relaxed memory models

count: Atomic[int]
dummy: typeof(default(T)[]) # Deref the pointer type
pad0: array[WV_CacheLineSize - sizeof(pointer), byte]
front: T
# TODO: align
back: Atomic[T]

proc initialize*[T](chan: var ChannelMpscUnbounded[T], dummy: T) =
## This queue is designed for use within a thread-safe allocator
## It requires an allocated dummy node for initialization
## but cannot rely on an allocator.
assert not dummy.isNil
dummy.next.store(nil, moRelaxed)
chan.front = dummy
chan.back.store(dummy, moRelaxed)

assert not(chan.front.isNil)
assert not(chan.back.load(moRelaxed).isNil)

proc removeDummy*[T](chan: var ChannelMpscUnbounded[T]): T =
## Remove dummy for its deallocation
## The queue should be testroyed afterwards
assert not(chan.front.isNil)
assert not(chan.back.load(moRelaxed).isNil)
# Only the dummy should be left
assert chan.front == chan.back.load(moRelease)
assert chan.front.next.load(moRelease).isNil

result = chan.front
chan.front = nil
chan.back.store(nil, moRelaxed)

proc trySend*[T](chan: var ChannelMpscUnbounded[T], src: sink T): bool =
pad1: array[WV_CacheLineSize - sizeof(int), byte]
back: Atomic[pointer] # Workaround generic atomics bug: https://github.com/nim-lang/Nim/issues/12695

template checkInvariants(): untyped =
ascertain: not(chan.front.isNil)
ascertain: not(chan.back.load(moRelaxed).isNil)

proc initialize*[T](chan: var ChannelMpscUnbounded[T]) =
# We keep a dummy node within the queue itself
# it doesn't need any dynamic allocation which simplify
# its use in an allocator
chan.dummy.reset()
chan.front = chan.dummy.addr
chan.back.store(chan.dummy.addr, moRelaxed)

proc trySendImpl[T](chan: var ChannelMpscUnbounded[T], src: sink T, count: static bool): bool {.inline.}=
## Send an item to the back of the channel
## As the channel as unbounded capacity, this should never fail
assert not(chan.front.isNil)
assert not(chan.back.load(moRelaxed).isNil)
checkInvariants()

src.next.store(nil, moRelaxed)
fence(moRelease)
let oldBack = chan.back.exchange(src, moRelaxed)
oldBack.next.store(src, moRelaxed)
cast[T](oldBack).next.store(src, moRelaxed) # Workaround generic atomics bug: https://github.com/nim-lang/Nim/issues/12695
when count:
discard chan.count.fetchAdd(1, moRelaxed)

return true

proc trySend*[T](chan: var ChannelMpscUnbounded[T], src: sink T): bool =
# log("Channel 0x%.08x trySend - front: 0x%.08x (%d), second: 0x%.08x, back: 0x%.08x\n", chan.addr, chan.front, chan.front.val, chan.front.next, chan.back)
chan.trySendImpl(src, count = true)

proc reenqueueDummy[T](chan: var ChannelMpscUnbounded[T]) =
# log("Channel 0x%.08x reenqueing dummy\n")
discard chan.trySendImpl(chan.dummy.addr, count = false)

proc tryRecv*[T](chan: var ChannelMpscUnbounded[T], dst: var T): bool =
## Try receiving the next item buffered in the channel
## Returns true if successful (channel was not empty)
## This can fail spuriously on the last element if producer
## enqueues a new element while the consumer was dequeing it
assert not(chan.front.isNil)
assert not(chan.back.load(moRelaxed).isNil)

let first = chan.front # dummy
let next = first.next.load(moRelaxed)
var first = chan.front
# Workaround generic atomics bug: https://github.com/nim-lang/Nim/issues/12695
var next = cast[T](first.next.load(moRelaxed))

if not next.isNil:
# log("Channel 0x%.08x tryRecv - first: 0x%.08x (%d), next: 0x%.08x (%d), last: 0x%.08x\n",
# chan.addr, first, first.val, next, if not next.isNil: next.val else: 0, chan.back)

if first == chan.dummy.addr:
# First node is the dummy
if next.isNil:
# Dummy has no next node
return false
# Overwrite the dummy, with the real first element
chan.front = next
first = next
next = cast[T](next.next.load(moRelaxed))

# Fast-path
if not next.isNil:
# second element exist, setup the queue, only consumer touches the front
chan.front = next # switch the front
prefetch(first.next.load(moRelaxed))
# Publish the changes
fence(moAcquire)
dst = next
dst = first
discard chan.count.fetchSub(1, moRelaxed)
return true
# End fast-path

# No second element, but we really need something to take
# the place of the first, have a look on the producer side
fence(moAcquire)
let last = chan.back.load(moRelaxed)
if first != last:
# A producer got ahead of us, spurious failure
return false

# Reenqueue dummy, it is now in the second slot or later
chan.reenqueueDummy()
# Reload the second item
next = cast[T](first.next.load(moRelaxed))

dst = nil
if not next.isNil:
# second element exist, setup the queue, only consumer touches the front
chan.front = next # switch the front
prefetch(first.next.load(moRelaxed))
# Publish the changes
fence(moAcquire)
dst = first
discard chan.count.fetchSub(1, moRelaxed)
return true

# No empty element?! There was a race in enqueueing
# and the new "next" still isn't published
# spurious failure
return false

func peek*(chan: var ChannelMpscUnbounded): int32 {.inline.} =
## Estimates the number of items pending in the channel
## - If called by the consumer the true number might be more
## due to producers adding items concurrently.
## - If called by a producer the true number is undefined
## as other producers also add items concurrently and
## the consumer removes them concurrently.
##
## This is a non-locking operation.
result = int32 chan.count.load(moRelaxed)

# Sanity checks
# ------------------------------------------------------------------------------
when isMainModule:
Expand Down Expand Up @@ -110,7 +169,7 @@ when isMainModule:
while not chan.tryRecv(data):
body

const NumVals = 100000
const NumVals = 1000000
const Padding = 10 * NumVals # Pad with a 0 so that iteration 10 of thread 3 is 3010 with 99 max iters

type
Expand All @@ -134,7 +193,7 @@ when isMainModule:

Val = ptr ValObj
ValObj = object
next: Atomic[Val]
next: Atomic[pointer]
val: int

ThreadArgs = object
Expand Down Expand Up @@ -185,7 +244,7 @@ when isMainModule:
args.chan[].recvLoop(val):
# Busy loop, in prod we might want to yield the core/thread timeslice
discard
# echo "Receiver got: ", val.val, " at address 0x", toLowerASCII toHex cast[ByteAddress](val)
# log("Receiver got: %d at address 0x%.08x\n", val.val, val)
let sender = WorkerKind(val.val div Padding)
doAssert val.val == counts[sender] + ord(sender) * Padding, "Incorrect value: " & $val.val
inc counts[sender]
Expand All @@ -209,8 +268,7 @@ when isMainModule:
echo "------------------------------------------------------------------------"
var threads: array[WorkerKind, Thread[ThreadArgs]]
let chan = createSharedU(ChannelMpscUnbounded[Val]) # CreateU is not zero-init
let dummy = valAlloc()
chan[].initialize(dummy)
chan[].initialize()

createThread(threads[Receiver], thread_func, ThreadArgs(ID: Receiver, chan: chan))
for sender in Sender1..Sender15:
Expand All @@ -219,7 +277,6 @@ when isMainModule:
for worker in WorkerKind:
joinThread(threads[worker])

chan[].removeDummy.valFree()
deallocShared(chan)
echo "------------------------------------------------------------------------"
echo "Success"
Expand Down
4 changes: 2 additions & 2 deletions weave/contexts.nim
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import
./datatypes/[context_global, context_thread_local, sync_types],
./channels/[channels_spsc_single_ptr, channels_mpsc_bounded_lock],
./channels/[channels_spsc_single_ptr, channels_mpsc_bounded_lock, channels_mpsc_unbounded],
./memory/[persistacks, intrusive_stacks],
./config,
system/ansi_c,
Expand Down Expand Up @@ -65,7 +65,7 @@ proc initialize*[T](c: var ChannelLegacy[T], size: int32) =
proc delete*[T](c: var ChannelLegacy[T]) =
channel_free(c)

template myThieves*: ChannelLegacy[StealRequest] =
template myThieves*: ChannelMpscUnbounded[StealRequest] =
globalCtx.com.thefts[localCtx.worker.ID]

template workforce*: int32 =
Expand Down
3 changes: 2 additions & 1 deletion weave/datatypes/context_global.nim
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import
../channels/channels_mpsc_bounded_lock,
../channels/channels_mpsc_unbounded,
../channels/channels_spsc_single_ptr,
../memory/persistacks,
../config,
Expand Down Expand Up @@ -35,7 +36,7 @@ type
# would work but then it requires a pointer indirection
# per channel
# and a known max number of workers
thefts*: ptr UncheckedArray[ChannelLegacy[StealRequest]]
thefts*: ptr UncheckedArray[ChannelMPSCunbounded[StealRequest]]
tasks*: ptr UncheckedArray[Persistack[WV_MaxConcurrentStealPerWorker, ChannelSpscSinglePtr[Task]]]

GlobalContext* = object
Expand Down
3 changes: 0 additions & 3 deletions weave/datatypes/sparsesets.nim
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,6 @@ type

func allocate*(s: var SparseSet, capacity: SomeInteger) =
preCondition: capacity <= WV_MaxWorkers
preCondition: s.indices.isNil
preCondition: s.values.isNil
preCondition: s.rawBuffer.isNil

s.capacity = Setuint capacity
s.rawBuffer = wv_alloc(Setuint, 2*capacity)
Expand Down
8 changes: 6 additions & 2 deletions weave/datatypes/sync_types.nim
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ import
../config,
../channels/channels_spsc_single_ptr,
../instrumentation/contracts,
../memory/allocs
../memory/allocs,
std/atomics

# Inter-thread synchronization types
# ----------------------------------------------------------------------------------
Expand Down Expand Up @@ -72,7 +73,10 @@ type
# Padding shouldn't be needed as steal requests are used as value types
# and deep-copied between threads
StealRequest* = ptr object
thiefAddr*: ptr ChannelSpscSinglePtr[Task] # Channel for sending tasks back to the thief
# TODO: padding to cache line
# TODO: Remove workaround generic atomics bug: https://github.com/nim-lang/Nim/issues/12695
next*: Atomic[pointer] # For intrusive lists and queues
thiefAddr*: ptr ChannelSpscSinglePtr[Task] # Channel for sending tasks back to the thief
thiefID*: WorkerID
retry*: int32 # 0 <= retry <= num_workers
victims*: SparseSet # set of potential victims
Expand Down
4 changes: 2 additions & 2 deletions weave/primitives/compiler_optimization_hints.nim
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ const withBuiltins = defined(gcc) or defined(clang) or defined(icc)
when withBuiltins:
proc builtin_prefetch(data: pointer, rw: PrefetchRW, locality: PrefetchLocality) {.importc: "__builtin_prefetch", noDecl.}

template prefetch*[T](
data: ptr (T or UncheckedArray[T]),
template prefetch*(
data: pointer,
rw: static PrefetchRW = Read,
locality: static PrefetchLocality = HighTemporalLocality) =
## Prefetch examples:
Expand Down
4 changes: 2 additions & 2 deletions weave/runtime.nim
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import
./instrumentation/[contracts, profilers, loggers],
./contexts, ./config,
./datatypes/[sync_types, prell_deques],
./channels/[channels_mpsc_bounded_lock, channels_spsc_single_ptr],
./channels/[channels_mpsc_bounded_lock, channels_spsc_single_ptr, channels_mpsc_unbounded],
./memory/[persistacks, intrusive_stacks, allocs],
./scheduler, ./signals, ./workers, ./thieves, ./victims,
# Low-level primitives
Expand Down Expand Up @@ -40,7 +40,7 @@ proc init*(_: type Runtime) =

## Allocation of the global context.
globalCtx.threadpool = wv_alloc(Thread[WorkerID], workforce())
globalCtx.com.thefts = wv_alloc(ChannelLegacy[StealRequest], workforce())
globalCtx.com.thefts = wv_alloc(ChannelMpscUnbounded[StealRequest], workforce())
globalCtx.com.tasks = wv_alloc(Persistack[WV_MaxConcurrentStealPerWorker, ChannelSpscSinglePtr[Task]], workforce())
discard pthread_barrier_init(globalCtx.barrier, nil, workforce())

Expand Down
9 changes: 4 additions & 5 deletions weave/scheduler.nim
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import
./instrumentation/[contracts, profilers, loggers],
./primitives/barriers,
./datatypes/[sync_types, prell_deques, context_thread_local, flowvars, sparsesets],
./channels/[channels_mpsc_bounded_lock, channels_spsc_single_ptr, channels_spsc_single_object],
./memory/[persistacks, intrusive_stacks],
./channels/[channels_mpsc_bounded_lock, channels_spsc_single_ptr, channels_spsc_single_object, channels_mpsc_unbounded],
./memory/[persistacks, intrusive_stacks, allocs],
./contexts, ./config,
./victims, ./loop_splitting,
./thieves, ./workers,
Expand All @@ -28,17 +28,17 @@ proc init*(ctx: var TLContext) =
## Initialize the thread-local context of a worker (including the lead worker)

myWorker().deque = newPrellDeque(Task)
myThieves().initialize(WV_MaxConcurrentStealPerWorker * workforce())
myTodoBoxes().initialize()
myWorker().initialize(maxID = workforce() - 1)
myThieves().initialize()

localCtx.stealCache.initialize()
for i in 0 ..< localCtx.stealCache.len:
localCtx.stealCache.access(i).victims.allocate(capacity = workforce())

ascertain: myTodoBoxes().len == WV_MaxConcurrentStealPerWorker

# Workers see their RNG with their myID()
# Workers seed their RNG with their myID()
myThefts().rng.seed(myID())

# Thread-Local Profiling
Expand Down Expand Up @@ -148,7 +148,6 @@ proc schedulingLoop() =

proc threadLocalCleanup*() =
myWorker().deque.delete()
myThieves().delete()

for i in 0 ..< WV_MaxConcurrentStealPerWorker:
# No tasks left
Expand Down
8 changes: 6 additions & 2 deletions weave/thieves.nim
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ import
./datatypes/[sparsesets, sync_types, context_thread_local],
./contexts, ./targets,
./instrumentation/[contracts, profilers, loggers],
./channels/channels_mpsc_bounded_lock,
./channels/channels_mpsc_unbounded,
./memory/persistacks,
./config, ./signals
./config, ./signals,
std/atomics

# Thief
# ----------------------------------------------------------------------------------
Expand All @@ -22,6 +23,7 @@ proc newStealRequest(): StealRequest {.inline.} =
result = localCtx.stealCache.borrow()
ascertain: result.victims.capacity.int32 == workforce()

result.next.store(nil, moRelaxed)
result.thiefAddr = myTodoBoxes.borrow()
result.thiefID = myID()
result.retry = 0
Expand All @@ -36,6 +38,8 @@ proc newStealRequest(): StealRequest {.inline.} =
proc rawSend(victimID: WorkerID, req: sink StealRequest) {.inline.}=
## Send a steal or work sharing request
# TODO: check for race condition on runtime exit
# log("Worker %d: sending request 0x%.08x to %d (Channel: 0x%.08x)\n",
# myID(), cast[ByteAddress](req), victimID, globalCtx.com.thefts[victimID].addr)
let stealRequestSent = globalCtx.com
.thefts[victimID]
.trySend(req)
Expand Down
Loading

0 comments on commit b796d1a

Please sign in to comment.