Skip to content

Commit

Permalink
Merge pull request #40 from nim-lang/feat_non_block_and_nim2
Browse files Browse the repository at this point in the history
Feat non block and nim2
  • Loading branch information
Araq authored Dec 21, 2023
2 parents 06380aa + 073831a commit 2c5e7a5
Show file tree
Hide file tree
Showing 8 changed files with 324 additions and 37 deletions.
86 changes: 86 additions & 0 deletions tests/async_demo.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import ../zmq
import std/[asyncdispatch]

proc asyncpoll() =
# test "asyncZPoller":
block:
const zaddr = "tcp://127.0.0.1:15571"
const zaddr2 = "tcp://127.0.0.1:15572"
var pusher = listen(zaddr, PUSH)
var puller = connect(zaddr, PULL)

var pusher2 = listen(zaddr2, PUSH)
var puller2 = connect(zaddr2, PULL)
var poller: AsyncZPoller

var i = 0
# Register the callback
# assert message received are correct (should be even integer in string format)
var msglist = @["0", "2", "4", "6", "8"]
var msgCount = 0
poller.register(
puller2,
ZMQ_POLLIN,
proc(x: ZSocket) =
let msg = x.receive()
inc(msgCount)
if msglist.contains(msg):
msglist.delete(0)
assert true
else:
assert false
)
# assert message received are correct (should be even integer in string format)
var msglist2 = @["0", "2", "4", "6", "8"]
var msgCount2 = 0
poller.register(
puller,
ZMQ_POLLIN,
proc(x: ZSocket) =
let msg = x.receive()
inc(msgCount2)
if msglist2.contains(msg):
msglist2.delete(0)
assert true
else:
assert false
)

let
N = 10
N_MAX_TIMEOUT = 5

var sndCount = 0
# A client send some message
for i in 0..<N:
if (i mod 2) == 0:
# Can periodically send stuff
pusher.send($i)
pusher2.send($i)
inc(sndCount)

# N_MAX_TIMEOUT is the number of time the poller can timeout before exiting the loop
while i < N_MAX_TIMEOUT:

# I don't recommend a high timeout because it's going to poll for the duration if there is no message in queue
var fut = poller.pollAsync(1)
let r = waitFor fut
if r < 0:
break # error case
elif r == 0:
inc(i)

# No longer polling but some callback may not have finished
while hasPendingOperations():
drain()

assert msgCount == msgCount2
assert msgCount == sndCount

pusher.close()
puller.close()
pusher2.close()
puller2.close()

when isMainModule:
asyncpoll()
37 changes: 37 additions & 0 deletions tests/tdestruc.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import ../zmq
import std/unittest

proc sockHandler(req, rep: ZConnection, pong: string) =
req.send(pong)
let r = rep.receive()
check r == pong

proc testDestroy() =
const sockaddr = "tcp://127.0.0.1:55001"

test "Destroy & Copy":
let
ping = "ping"
pong = "pong"

var rep = listen(sockaddr, REP)
var req = connect(sockaddr, REQ)

sockHandler(req, rep, ping)
sockHandler(rep, req, pong)

block:
var req2 = req
req2.send(ping)
let r = rep.receive()
check r == ping

rep.send(pong)
block:
var req2 = req
let r = req2.receive()
check r == pong

when isMainModule:
testDestroy()

67 changes: 59 additions & 8 deletions tests/tzmq.nim
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import ../zmq
import std/[unittest, os]
import std/[unittest, os, times, monotimes]
import std/[asyncdispatch, asyncfutures]

proc reqrep() =
Expand Down Expand Up @@ -88,7 +88,8 @@ proc routerdealer() =
test "routerdealer":
const sockaddr = "tcp://127.0.0.1:55001"
var router = listen(sockaddr, mode = ROUTER)
router.setsockopt(RCVTIMEO, 350.cint)
router.setsockopt(RCVTIMEO, 500.cint)

defer: router.close()
var dealer = connect(sockaddr, mode = DEALER)
defer: dealer.close()
Expand All @@ -106,12 +107,23 @@ proc routerdealer() =
check dealer.receive() == payload
# Let receive timeout
block:
let start = getMonoTime()
# On receive return empty message
let recv = router.receive()
let
recv = router.receive()
stop = getMonoTime()
elapsed = stop - start
check (elapsed - initDuration(milliseconds=500)) < initDuration(milliseconds=1)
check recv == ""

block:
# On try receive, check flag is flase
let recv = router.tryReceive()
let
start = getMonoTime()
recv = router.waitForReceive(350)
stop = getMonoTime()
elapsed = stop - start
check (elapsed - initDuration(milliseconds=350)) < initDuration(milliseconds=1)
check recv.msgAvailable == false

proc inproc_sharectx() =
Expand Down Expand Up @@ -146,31 +158,32 @@ proc pairpair() =

var pairs = @[listen(sockaddr, PAIR), connect(sockaddr, PAIR)]
pairs[1].setsockopt(RCVTIMEO, 500.cint)

block:
pairs[0].send(ping, SNDMORE)
pairs[0].send(ping, SNDMORE)
pairs[0].send(ping)

block:
let content = pairs[1].tryReceive()
let content = pairs[1].waitForReceive()
check content.msgAvailable
check content.moreAvailable
check content.msg == ping

block:
let content = pairs[1].tryReceive()
let content = pairs[1].waitForReceive()
check content.msgAvailable
check content.moreAvailable
check content.msg == ping

block:
let content = pairs[1].tryReceive()
let content = pairs[1].waitForReceive()
check content.msgAvailable
check (not content.moreAvailable)
check content.msg == ping

block:
let content = pairs[1].tryReceive()
let content = pairs[1].waitForReceive()
check (not content.msgAvailable)

block:
Expand Down Expand Up @@ -300,6 +313,43 @@ proc async_pub_sub() =
test "async pub_sub":
check count == N_mSGS

proc non_blocking_recv() =
const sockaddr = "tcp://127.0.0.1:55001"
test "non-blocking receive":
var router = listen(sockaddr, mode = ROUTER)
let res = router.tryReceive()
check res == (false, false, "")

var dealer = connect(sockaddr, mode = DEALER)
let payload = "payload"
block:
# Dealer send a message to router
dealer.send(payload)

block:
# Remove "envelope" of router / dealer
let dealerSocketId = router.receive()
let res = router.waitForReceive(250)
check res.msgAvailable
check not res.moreAvailable
check res.msg == payload

block:
# Remove "envelope" of router / dealer
let
start = getMonoTime()
res = router.waitForReceive(250)
stop = getMonoTime()
elapsed = stop - start
check (elapsed - initDuration(milliseconds=250)) < initDuration(milliseconds=1)

check not res.msgAvailable
check not res.moreAvailable
check res.msg == ""

router.close(250)
dealer.close(250)

when isMainModule:
reqrep()
pubsub()
Expand All @@ -308,3 +358,4 @@ when isMainModule:
pairpair()
async_pub_sub()
asyncpoll()
non_blocking_recv()
6 changes: 3 additions & 3 deletions zmq.nimble
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
# Package

version = "1.4.0"
version = "1.5.0"
author = "Andreas Rumpf"
description = "ZeroMQ wrapper"
license = "MIT"

# Dependencies
requires "nim >= 0.18.0"
requires "nim >= 1.4.0"

task buildexamples, "Compile all examples":
withDir "examples":
Expand All @@ -17,5 +17,5 @@ task buildexamples, "Compile all examples":
selfExec("cpp --mm:orc -d:release " & fstr)

task gendoc, "Generate documentation":
exec("nimble doc --project zmq.nim --out:docs/")
exec("nim doc --mm:orc --project --out:docs/ zmq.nim")

30 changes: 26 additions & 4 deletions zmq/asynczmq.nim
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import std/[asyncdispatch]
import std/[asyncdispatch, selectors]
import ./connections
import ./bindings
import ./poller
Expand All @@ -14,9 +14,31 @@ type
proc len*(poller: AsyncZPoller): int =
result = poller.zpoll.len()

proc `=destroy`*(obj: var AsyncZPoller) =
if hasPendingOperations():
drain(500)
proc waitAll(obj: AsyncZPoller) {.raises: [].} =
# Is there a more elegant to do this ?
# We want a helper function that will not raises to avoid excpetion in =destroy hooks

try:

while hasPendingOperations():
drain(500)

except ValueError:
discard

except OSError:
discard

except ref IOSelectorsException:
discard

except Exception:
discard

proc `=destroy`*(obj: AsyncZPoller) =
obj.waitAll()
`=destroy`(obj.cb)
`=destroy`(obj.zpoll)

proc register*(poller: var AsyncZPoller, sock: ZSocket, event: int, cb: AsyncZPollCB) =
## Register ZSocket function
Expand Down
1 change: 0 additions & 1 deletion zmq/bindings.nim
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
{.deadCodeElim: on.}
when defined(windows):
const
zmqdll* = "(lib|)zmq.dll"
Expand Down
Loading

0 comments on commit 2c5e7a5

Please sign in to comment.