Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simple channels with --gc:arc #13936

Closed
mratsim opened this issue Apr 9, 2020 · 8 comments
Closed

Simple channels with --gc:arc #13936

mratsim opened this issue Apr 9, 2020 · 8 comments

Comments

@mratsim
Copy link
Collaborator

mratsim commented Apr 9, 2020

The following is a standalone implementation of Weave channel that can buffer up to one object.
It works with the default GC but crashes with gc:arc

import std/atomics

const MemBlockSize = 256

type
  ChannelSPSCSingle* = object
    full{.align: 128.}: Atomic[bool]
    itemSize*: uint8
    buffer*{.align: 8.}: UncheckedArray[byte]

proc `=`(
    dest: var ChannelSPSCSingle,
    source: ChannelSPSCSingle
  ) {.error: "A channel cannot be copied".}

proc initialize*(chan: var ChannelSPSCSingle, itemsize: SomeInteger) {.inline.} =
  ## If ChannelSPSCSingle is used intrusive another data structure
  ## be aware that it should be the last part due to ending by UncheckedArray
  ## Also due to 128 bytes padding, it automatically takes half
  ## of the default MemBlockSize
  assert itemsize.int in 0 .. int high(uint8)
  assert itemSize.int +
          sizeof(chan.itemsize) +
          sizeof(chan.full) < MemBlockSize

  chan.itemSize = uint8 itemsize
  chan.full.store(false, moRelaxed)

func isEmpty*(chan: var ChannelSPSCSingle): bool {.inline.} =
  not chan.full.load(moAcquire)

func tryRecv*[T](chan: var ChannelSPSCSingle, dst: var T): bool {.inline.} =
  ## Try receiving the item buffered in the channel
  ## Returns true if successful (channel was not empty)
  ##
  ## ⚠ Use only in the consumer thread that reads from the channel.
  assert (sizeof(T) == chan.itemsize.int) or
          # Support dummy object
          (sizeof(T) == 0 and chan.itemsize == 1)

  let full = chan.full.load(moAcquire)
  if not full:
    return false
  dst = cast[ptr T](chan.buffer.addr)[]
  chan.full.store(false, moRelease)
  return true

func trySend*[T](chan: var ChannelSPSCSingle, src: sink T): bool {.inline.} =
  ## Try sending an item into the channel
  ## Reurns true if successful (channel was empty)
  ##
  ## ⚠ Use only in the producer thread that writes from the channel.
  assert (sizeof(T) == chan.itemsize.int) or
          # Support dummy object
          (sizeof(T) == 0 and chan.itemsize == 1)

  let full = chan.full.load(moAcquire)
  if full:
    return false
  cast[ptr T](chan.buffer.addr)[] = src
  chan.full.store(true, moRelease)
  return true

# Sanity checks
# ------------------------------------------------------------------------------
when isMainModule:

  when not compileOption("threads"):
    {.error: "This requires --threads:on compilation flag".}

  template sendLoop[T](chan: var ChannelSPSCSingle,
                       data: sink T,
                       body: untyped): untyped =
    while not chan.trySend(data):
      body

  template recvLoop[T](chan: var ChannelSPSCSingle,
                       data: var T,
                       body: untyped): untyped =
    while not chan.tryRecv(data):
      body

  type
    ThreadArgs = object
      ID: WorkerKind
      chan: ptr ChannelSPSCSingle

    WorkerKind = enum
      Sender
      Receiver

  template Worker(id: WorkerKind, body: untyped): untyped {.dirty.} =
    if args.ID == id:
      body

  proc thread_func(args: ThreadArgs) =

    # Worker RECEIVER:
    # ---------
    # <- chan
    # <- chan
    # <- chan
    #
    # Worker SENDER:
    # ---------
    # chan <- 42
    # chan <- 53
    # chan <- 64
    Worker(Receiver):
      var val: int
      for j in 0 ..< 10:
        args.chan[].recvLoop(val):
          # Busy loop, in prod we might want to yield the core/thread timeslice
          discard
        echo "                  Receiver got: ", val
        doAssert val == 42 + j*11

    Worker(Sender):
      doAssert args.chan.full.load(moRelaxed) == false
      for j in 0 ..< 10:
        let val = 42 + j*11
        args.chan[].sendLoop(val):
          # Busy loop, in prod we might want to yield the core/thread timeslice
          discard
        echo "Sender sent: ", val

  proc main() =
    echo "Testing if 2 threads can send data"
    echo "-----------------------------------"
    var threads: array[2, Thread[ThreadArgs]]

    var chan = cast[ptr ChannelSPSCSingle](allocShared(MemBlockSize))
    chan[].initialize(itemSize = sizeof(int))

    createThread(threads[0], thread_func, ThreadArgs(ID: Receiver, chan: chan))
    createThread(threads[1], thread_func, ThreadArgs(ID: Sender, chan: chan))

    joinThread(threads[0])
    joinThread(threads[1])

    freeShared(chan)

    echo "-----------------------------------"
    echo "Success"

  main()
@mratsim
Copy link
Collaborator Author

mratsim commented Apr 22, 2020

On Nim from 2020-04-20, the issue changed, the previous crash was a segfault on createThread
likely similar to this #13881

Now it is similar to #13935 (comment) but:

  • neither nim cpp --gc:arc ...
  • nor nim --gc:arc --exceptions:setjmp ...
    work

image

@ghost
Copy link

ghost commented May 29, 2020

Still hangs on devel (and SIGSEGVs with -d:danger)

@Araq
Copy link
Member

Araq commented Jun 16, 2020

Works for me on Windows, here is the output:

Sender sent: 42
                  Receiver got: 42
                  Receiver got: 53
Sender sent: 53
                  Receiver got: 64
Sender sent: 64
Sender sent: 75
Sender sent: 86
                  Receiver got: 75
                  Receiver got: 86
                  Receiver got: 97
Sender sent: 97
Sender sent: 108
Sender sent: 119
                  Receiver got: 108
                  Receiver got: 119
                  Receiver got: 130
Sender sent: 130
Sender sent: 141
                  Receiver got: 141

@ghost
Copy link

ghost commented Jun 18, 2020

Works for me on Linux with #14722 !

Testing if 2 threads can send data
-----------------------------------
                  Receiver got: 42
Sender sent: 42
                  Receiver got: 53
Sender sent: 53
Sender sent: 64
Sender sent: 75
                  Receiver got: 64
                  Receiver got: 75
                  Receiver got: 86
Sender sent: 86
Sender sent: 97
Sender sent: 108
                  Receiver got: 97
                  Receiver got: 108
                  Receiver got: 119
Sender sent: 119
                  Receiver got: 130
Sender sent: 130
                  Receiver got: 141
Sender sent: 141
-----------------------------------
Success

@mratsim
Copy link
Collaborator Author

mratsim commented Jun 19, 2020

@Araq You don't have "Success" in your output, on the current devel my code doesn't segfaults but it gets stuck at the same point.
@Yardanico good news!

@ghost
Copy link

ghost commented Jun 22, 2020

@mratsim can you retest after 3ba0c30 ?

@ghost
Copy link

ghost commented Jun 22, 2020

Just for curiosity, valgrind (with -d:useMalloc of course):

~/P/stuff ❯❯❯ valgrind --leak-check=full ./a
==20443== Memcheck, a memory error detector
==20443== Copyright (C) 2002-2017, and GNU GPL'd, by Julian Seward et al.
==20443== Using Valgrind-3.16.0.GIT and LibVEX; rerun with -h for copyright info
==20443== Command: ./a
==20443== 
Testing if 2 threads can send data
-----------------------------------
Sender sent: 42
                  Receiver got: 42
                  Receiver got: 53
Sender sent: 53
Sender sent: 64
Sender sent: 75
                  Receiver got: 64
                  Receiver got: 75
                  Receiver got: 86
Sender sent: 86
Sender sent: 97
                  Receiver got: 97
                  Receiver got: 108
Sender sent: 108
Sender sent: 119
Sender sent: 130
                  Receiver got: 119
                  Receiver got: 130
Sender sent: 141
                  Receiver got: 141
-----------------------------------
Success
==20443== 
==20443== HEAP SUMMARY:
==20443==     in use at exit: 0 bytes in 0 blocks
==20443==   total heap usage: 26 allocs, 26 frees, 2,678 bytes allocated
==20443== 
==20443== All heap blocks were freed -- no leaks are possible
==20443== 
==20443== For lists of detected and suppressed errors, rerun with: -s
==20443== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0)

@mratsim
Copy link
Collaborator Author

mratsim commented Jun 22, 2020

Yes, it's working now. I think this should be added as a test, we can test that "Success" is printed last (with a timeout as multithreading bugs lead to unresponsive apps from time to time)

Araq added a commit that referenced this issue Jun 23, 2020
Clyybber pushed a commit to Clyybber/Nim that referenced this issue Jun 24, 2020
@Araq Araq closed this as completed in 1854d29 Jul 4, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants