Skip to content

Commit

Permalink
use manual reference counting to deal with nim-lang/Nim#13024 and fix…
Browse files Browse the repository at this point in the history
… parent index bug
  • Loading branch information
mratsim committed Jan 3, 2020
1 parent 8f38b4c commit bf6a594
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 50 deletions.
4 changes: 2 additions & 2 deletions weave/channels/loop_promises_notifiers.md
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ proc ready*(pr: ProducersRangePromises, index: int32) =
while idx != 0:
pr.fulfilled[idx] += 1
idx = idx shr 1
idx = (idx-1) shr 1
pr.fulfilled[0] += 1
Expand All @@ -165,7 +165,7 @@ proc dispatch*(cr: var ConsumerRangeDelayedTasks, internalIndex: int32) =
while idx != 0:
cr.dispatched[idx] += 1
idx = idx shr 1
idx = (idx-1) shr 1
cr.dispatched[0] += 1
Expand Down
78 changes: 50 additions & 28 deletions weave/channels/loop_promises_notifiers.nim
Original file line number Diff line number Diff line change
Expand Up @@ -35,27 +35,49 @@ type
fulfilled*: ptr UncheckedArray[Atomic[int32]]
numBuckets*: int32

proc `=destroy`*(prom: var ProducersLoopPromises) {.inline.} =
# For now promises must be manually managed due to
# - https://github.com/nim-lang/Nim/issues/13024
# Additionally https://github.com/nim-lang/Nim/issues/13025
# requires workaround
#
# proc `=destroy`*(prom: var ProducersLoopPromises) {.inline.} =
# let oldCount = fetchSub(prom.lp.refCount, 1, moRelease)
# ascertain: oldCount > 0
# ascertain: not prom.lp.fulfilled.isNil
# if oldCount == 1:
# fence(moAcquire)
# # Return memory
# wv_free(prom.lp.fulfilled)
#
# proc `=`*(dst: var ProducersLoopPromises, src: ProducersLoopPromises) {.inline.}
# # Workaround: https://github.com/nim-lang/Nim/issues/13025
#
# # Pending https://github.com/nim-lang/Nim/issues/13024
# proc `=sink`*(dst: var ProducersLoopPromises, src: ProducersLoopPromises) {.inline.} =
# # Don't pay for atomic refcounting when compiler can prove there is no ref change.
# system.`=sink`(dst, src)
#
# proc `=`*(dst: var ProducersLoopPromises, src: ProducersLoopPromises) {.inline.} =
# let oldCount = fetchAdd(src.lp.refCount, 1, moRelaxed)
# ascertain: oldCount > 0
# system.`=`(dst, src)

func incRef*(prom: var ProducersLoopPromises) {.inline.} =
## Manual atomic refcounting - workaround https://github.com/nim-lang/Nim/issues/13024
let oldCount = fetchAdd(prom.lp.refCount, 1, moRelaxed)
ascertain: oldCount > 0

func decRef*(prom: var ProducersLoopPromises) {.inline.} =
## Manual atomic refcounting - workaround https://github.com/nim-lang/Nim/issues/13024
let oldCount = fetchSub(prom.lp.refCount, 1, moRelease)
ascertain: oldCount > 0
ascertain: not prom.lp.fulfilled.isNil
if oldCount == 1:
fence(moAcquire)
# Return memory to the memory pool
# Return memory
wv_free(prom.lp.fulfilled)
recycle(prom.lp)

proc `=`*(dst: var ProducersLoopPromises, src: ProducersLoopPromises) {.inline.}
# Workaround: https://github.com/nim-lang/Nim/issues/13025

proc `=sink`*(dst: var ProducersLoopPromises, src: ProducersLoopPromises) {.inline.} =
# Don't pay for atomic refcounting when compiler can prove there is no ref change.
system.`=sink`(dst, src)

proc `=`*(dst: var ProducersLoopPromises, src: ProducersLoopPromises) {.inline.} =
let oldCount = fetchAdd(src.lp.refCount, 1, moRelaxed)
ascertain: oldCount > 0
system.`=`(dst, src)

proc initialize*(plp: var ProducersLoopPromises, pool: var TLPoolAllocator, start, stop, stride: int32) =
## Allocate loop promises (producer side)
## Multiple consumers can depend on the delivery of those promises
Expand Down Expand Up @@ -90,11 +112,10 @@ proc ready*(pr: ProducersLoopPromises, index: int32) =
# see if a new promise was delivered upon in O(1) time.
preCondition: index in pr.lp.start ..< pr.lp.stop
var idx = pr.getBucket(index)
ascertain: pr.lp.fulfilled[idx].load(moRelaxed) == 0

while idx != 0:
discard pr.lp.fulfilled[idx].fetchAdd(1, moRelaxed)
idx = idx shr 1
idx = (idx-1) shr 1

discard pr.lp.fulfilled[0].fetchAdd(1, moRelaxed)

Expand All @@ -104,19 +125,20 @@ proc ready*(pr: ProducersLoopPromises, index: int32) =
# TODO: multithreaded test case

when isMainModule:
echo "Testing Loop Promises (Producer)"

proc main() =
# Promises can't be globals, Nim bug: https://github.com/nim-lang/Nim/issues/13024
echo "Testing Loop Promises (Producer)"
var pool: TLPoolAllocator
pool.initialize()

var pool: TLPoolAllocator
pool.initialize()
block:
var plp: ProducersLoopPromises
plp.initialize(pool, 0, 10, 1)
doAssert plp.getBucket(0) == 0

block: # Fulfilling all promises
var prodLoopPromises: ProducersLoopPromises
prodLoopPromises.initialize(pool, 0, 10000, 1)
for i in 0'i32 ..< 10000:
prodLoopPromises.ready(i)
doAssert prodLoopPromises.lp.fulfilled[0].load(moRelaxed) == 10000

main()
block: # Fulfilling all promises
var prodLoopPromises: ProducersLoopPromises
prodLoopPromises.initialize(pool, 0, 10000, 1)
for i in 0'i32 ..< 10000:
prodLoopPromises.ready(i)
doAssert prodLoopPromises.lp.fulfilled[0].load(moRelaxed) == 10000
59 changes: 39 additions & 20 deletions weave/datatypes/promises.nim
Original file line number Diff line number Diff line change
Expand Up @@ -56,25 +56,45 @@ const dummy = cast[DummyPtr](0xFACADE)
# Internal
# ----------------------------------------------------

proc `=destroy`*(prom: var Promise) {.inline.} =
# For now promises must be manually managed due to
# - https://github.com/nim-lang/Nim/issues/13024
# Additionally https://github.com/nim-lang/Nim/issues/13025
# requires workaround
#
# proc `=destroy`*(prom: var Promise) {.inline.} =
# let oldCount = fetchSub(prom.p.refCount, 1, moRelease)
# if oldCount == 1:
# fence(moAcquire)
# # Return memory to the memory pool
# recycle(prom.p)
#
# proc `=`*(dst: var Promise, src: Promise) {.inline.}
# # Workaround: https://github.com/nim-lang/Nim/issues/13025
#
# # Pending https://github.com/nim-lang/Nim/issues/13024
# proc `=sink`*(dst: var Promise, src: Promise) {.inline.} =
# # Don't pay for atomic refcounting when compiler can prove there is no ref change
# system.`=sink`(dst, src)
#
# proc `=`*(dst: var Promise, src: Promise) {.inline.} =
# let oldCount = fetchAdd(src.p.refCount, 1, moRelaxed)
# ascertain: oldCount > 0
# system.`=`(dst, src)

func incRef*(prom: var Promise) {.inline.} =
## Manual atomic refcounting - workaround https://github.com/nim-lang/Nim/issues/13024
let oldCount = fetchAdd(prom.p.refCount, 1, moRelaxed)
ascertain: oldCount > 0

func decRef*(prom: var Promise) {.inline.} =
## Manual atomic refcounting - workaround https://github.com/nim-lang/Nim/issues/13024
let oldCount = fetchSub(prom.p.refCount, 1, moRelease)
ascertain: oldCount > 0
if oldCount == 1:
fence(moAcquire)
# Return memory to the memory pool
recycle(prom.p)

proc `=`*(dst: var Promise, src: Promise) {.inline.}
# Workaround: https://github.com/nim-lang/Nim/issues/13025

proc `=sink`*(dst: var Promise, src: Promise) {.inline.} =
# Don't pay for atomic refcounting when compiler can prove there is no ref change
system.`=sink`(dst, src)

proc `=`*(dst: var Promise, src: Promise) {.inline.} =
let oldCount = fetchAdd(src.p.refCount, 1, moRelaxed)
ascertain: oldCount > 0
system.`=`(dst, src)

proc isFulfilled*(prom: Promise): bool {.inline.} =
## Check if a promise is fulfilled.
# Library-only
Expand Down Expand Up @@ -125,7 +145,7 @@ proc dispatch(clp: ConsumerLoopPromises, bucket: int32) =
var idx = bucket
while idx != 0:
clp.dispatched[idx] += 1
idx = idx shr 1
idx = (idx-1) shr 1

clp.dispatched[0] += 1

Expand Down Expand Up @@ -168,7 +188,6 @@ proc anyFulfilled*(clp: ConsumerLoopPromises): tuple[foundNew: bool, bucket: int
# since SPMC broadcast channels can be tested with a single thread-local consumer.

when isMainModule:

proc main() =
# Promises can't be globals, Nim bug: https://github.com/nim-lang/Nim/issues/13024
echo "Testing Loop promises (producer + Consumer)"
Expand All @@ -188,7 +207,7 @@ when isMainModule:

block:
let promise = clp.anyFulfilled()
doAssert promise.foundNew and promise.bucket == 7
doAssert promise.foundNew and promise.bucket == 7, "promise: " & $promise

block:
let promise = clp.anyFulfilled()
Expand All @@ -199,22 +218,22 @@ when isMainModule:

block:
let promise = clp.anyFulfilled()
doAssert promise.foundNew and promise.bucket == 2
doAssert promise.foundNew and promise.bucket == 2, "promise: " & $promise

plp.ready(3)
plp.ready(4)

block:
let promise = clp.anyFulfilled()
doAssert promise.foundNew and promise.bucket == 3
doAssert promise.foundNew and promise.bucket == 3, "promise: " & $promise

block:
let promise = clp.anyFulfilled()
doAssert promise.foundNew and promise.bucket == 4
doAssert promise.foundNew and promise.bucket == 4, "promise: " & $promise

block:
let promise = clp.anyFulfilled()
doAssert promise.foundNew and promise.bucket == 0
doAssert promise.foundNew and promise.bucket == 0, "promise: " & $promise

block:
let promise = clp.anyFulfilled()
Expand Down
2 changes: 2 additions & 0 deletions weave/memory/memory_pools.nim
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,8 @@ proc recycle*[T](p: ptr T) {.gcsafe.} =
# Find the owning arena
let arena = p.getArena()

# TODO: optional double-frees detection

if getMemThreadID() == arena.meta.threadID.load(moRelaxed):
# thread-local free
if arena.meta.localFree.isNil:
Expand Down

0 comments on commit bf6a594

Please sign in to comment.