diff --git a/tests/async_demo.nim b/tests/async_demo.nim new file mode 100644 index 0000000..c74d52a --- /dev/null +++ b/tests/async_demo.nim @@ -0,0 +1,86 @@ +import ../zmq +import std/[asyncdispatch] + +proc asyncpoll() = + # test "asyncZPoller": + block: + const zaddr = "tcp://127.0.0.1:15571" + const zaddr2 = "tcp://127.0.0.1:15572" + var pusher = listen(zaddr, PUSH) + var puller = connect(zaddr, PULL) + + var pusher2 = listen(zaddr2, PUSH) + var puller2 = connect(zaddr2, PULL) + var poller: AsyncZPoller + + var i = 0 + # Register the callback + # assert message received are correct (should be even integer in string format) + var msglist = @["0", "2", "4", "6", "8"] + var msgCount = 0 + poller.register( + puller2, + ZMQ_POLLIN, + proc(x: ZSocket) = + let msg = x.receive() + inc(msgCount) + if msglist.contains(msg): + msglist.delete(0) + assert true + else: + assert false + ) + # assert message received are correct (should be even integer in string format) + var msglist2 = @["0", "2", "4", "6", "8"] + var msgCount2 = 0 + poller.register( + puller, + ZMQ_POLLIN, + proc(x: ZSocket) = + let msg = x.receive() + inc(msgCount2) + if msglist2.contains(msg): + msglist2.delete(0) + assert true + else: + assert false + ) + + let + N = 10 + N_MAX_TIMEOUT = 5 + + var sndCount = 0 + # A client send some message + for i in 0..= 0.18.0" +requires "nim >= 1.4.0" task buildexamples, "Compile all examples": withDir "examples": @@ -17,5 +17,5 @@ task buildexamples, "Compile all examples": selfExec("cpp --mm:orc -d:release " & fstr) task gendoc, "Generate documentation": - exec("nimble doc --project zmq.nim --out:docs/") + exec("nim doc --mm:orc --project --out:docs/ zmq.nim") diff --git a/zmq/asynczmq.nim b/zmq/asynczmq.nim index b2ddd66..40b99b2 100644 --- a/zmq/asynczmq.nim +++ b/zmq/asynczmq.nim @@ -1,4 +1,4 @@ -import std/[asyncdispatch] +import std/[asyncdispatch, selectors] import ./connections import ./bindings import ./poller @@ -14,9 +14,31 @@ type proc len*(poller: AsyncZPoller): int = result = poller.zpoll.len() -proc `=destroy`*(obj: var AsyncZPoller) = - if hasPendingOperations(): - drain(500) +proc waitAll(obj: AsyncZPoller) {.raises: [].} = + # Is there a more elegant to do this ? + # We want a helper function that will not raises to avoid excpetion in =destroy hooks + + try: + + while hasPendingOperations(): + drain(500) + + except ValueError: + discard + + except OSError: + discard + + except ref IOSelectorsException: + discard + + except Exception: + discard + +proc `=destroy`*(obj: AsyncZPoller) = + obj.waitAll() + `=destroy`(obj.cb) + `=destroy`(obj.zpoll) proc register*(poller: var AsyncZPoller, sock: ZSocket, event: int, cb: AsyncZPollCB) = ## Register ZSocket function diff --git a/zmq/bindings.nim b/zmq/bindings.nim index 2c7eaad..1916461 100644 --- a/zmq/bindings.nim +++ b/zmq/bindings.nim @@ -1,4 +1,3 @@ -{.deadCodeElim: on.} when defined(windows): const zmqdll* = "(lib|)zmq.dll" diff --git a/zmq/connections.nim b/zmq/connections.nim index 17c2c03..d986241 100644 --- a/zmq/connections.nim +++ b/zmq/connections.nim @@ -13,11 +13,11 @@ type ZConnectionImpl* {.pure, final.} = object ## A Zmq connection. Since ``ZContext`` and ``ZSocket`` are pointers, it is highly recommended to **not** copy ``ZConnection``. - context*: ZContext ## Zmq context. Can be 'owned' by another connection (useful for inproc protocol). - socket*: ZSocket ## Embedded socket. - ownctx: bool ## Boolean indicating if the connection owns the Zmq context - alive: bool ## Boolean indicating if the connections has been closed or not - sockaddr: string ## Address of the embedded socket + context*: ZContext ## Zmq context from C-bindings. + socket*: ZSocket ## Zmq socket from C-bindings. + ownctx: bool # Boolean indicating if the connection owns the Zmq context + alive: bool # Boolean indicating if the connection has been closed + sockaddr: string # Address of the embedded socket ZConnection * = ref ZConnectionImpl @@ -73,6 +73,7 @@ proc setsockopt_impl[T: SomeOrdinal](s: ZSocket, option: ZSockOptions, optval: T var val: T = optval if setsockopt(s, option, addr(val), sizeof(val)) != 0: zmqError() + # Some option take cstring proc setsockopt_impl(s: ZSocket, option: ZSockOptions, optval: string) = var val: string = optval @@ -139,10 +140,46 @@ proc getsockopt*[T: SomeOrdinal|string](c: ZConnection, option: ZSockOptions): T Destructor ]# when defined(gcDestructors): - proc close*(c: var ZConnectionImpl, linger: int = 500) - proc `=destroy`(x: var ZConnectionImpl) = - if x.alive: - x.close() + proc `=destroy`(x: ZConnectionImpl) = + # Handle exception in =destroy hook or use private close without possible exception ? + if x.alive and not isNil(x.socket): + var linger = 500.cint + # Use low level primitive to avoid throwing + if setsockopt(x.socket, LINGER, addr(linger), sizeof(linger)) != 0: + # Handle error in closure ? + echo("Error in closing ZMQ-socket") + + if close(x.socket) != 0: + # Handle error in closure ? + echo("Error in closing ZMQ-socket") + + if x.ownctx and not isNil(x.context): + if ctx_term(x.context) != 0: + echo("Error in closing ZMQ-context") + + proc `=wasMoved`(x: var ZConnectionImpl) = + x.alive = false + x.socket = nil + x.context = nil + + proc `=sink`*(dest: var ZConnectionImpl, source: ZConnectionImpl) = + `=destroy`(dest) + wasMoved(dest) + dest.context = source.context + dest.socket = source.socket + dest.ownctx = source.ownctx + dest.sockaddr = source.sockaddr + + proc `=copy`*(dest: var ZConnectionImpl, source: ZConnectionImpl) = + if dest.socket != source.socket: + dest.socket = source.socket + + if dest.sockaddr != source.sockaddr: + dest.sockaddr = source.sockaddr + + if dest.context != source.context: + dest.context = source.context + dest.ownctx = false #[ Connect / Listen / Close @@ -176,7 +213,7 @@ proc bindAddr*(conn: var ZConnection, address: string) = conn.sockaddr = address proc connect*(address: string, mode: ZSocketType, context: ZContext): ZConnection = - ## Open a new connection on an external ``ZContext`` and connect the socket + ## Open a new connection on an external ``ZContext`` and connect the socket. External context are useful for inproc connections. result = new(ZConnection) result.context = context result.ownctx = false @@ -211,7 +248,7 @@ proc connect*(address: string, mode: ZSocketType): ZConnection = result.ownctx = true proc listen*(address: string, mode: ZSocketType, context: ZContext): ZConnection = - ## Open a new connection on an external ``ZContext`` and binds on the socket + ## Open a new connection on an external ``ZContext`` and binds on the socket. External context are useful for inproc connections. runnableExamples: import zmq var monoserver = listen("tcp://127.0.0.1:34444", PAIR) @@ -302,12 +339,7 @@ proc sendAll*(c: ZConnection, msg: varargs[string]) = sendAll(c.socket, msg) # receive with ZSocket type -proc tryReceive*(s: ZSocket, flags: ZSendRecvOptions = NOFLAGS): tuple[msgAvailable: bool, moreAvailable: bool, msg: string] = - ## Receives a message from a socket. - ## - ## Indicate whether a message was received or EAGAIN occured by ``msgAvailable`` - ## - ## Indicate if more parts are needed to be received by ``moreAvailable`` +proc receiveImpl(s: ZSocket, flags: ZSendRecvOptions = NOFLAGS): tuple[msgAvailable: bool, moreAvailable: bool, msg: string] = result.moreAvailable = false result.msgAvailable = false @@ -321,6 +353,8 @@ proc tryReceive*(s: ZSocket, flags: ZSendRecvOptions = NOFLAGS): tuple[msgAvaila result.msg = newString(msg_size(m)) if result.msg.len > 0: copyMem(addr(result.msg[0]), msg_data(m), result.msg.len) + + # Check if more part follows result.moreAvailable = msg_more(m).bool else: # Either an error or EAGAIN @@ -330,11 +364,53 @@ proc tryReceive*(s: ZSocket, flags: ZSendRecvOptions = NOFLAGS): tuple[msgAvaila if msg_close(m) != 0: zmqError() +proc waitForReceive*(s: ZSocket, timeout: int = -2, flags: ZSendRecvOptions = NOFLAGS): tuple[msgAvailable: bool, moreAvailable: bool, msg: string] = + ## Set RCVTIMEO for the socket and wait until a message is available. + ## This function is blocking. + ## + ## timeout: + ## -1 means infinite wait + ## positive value is in milliseconds + ## negative value strictly below -1 are ignored and the wait time will default to RCVTIMEO set for the socket (which by default is -1). + ## + ## Indicate whether a message was received or EAGAIN occured by ``msgAvailable`` + ## Indicate if more parts are needed to be received by ``moreAvailable`` + result.moreAvailable = false + result.msgAvailable = false + + let curtimeout : cint = getsockopt[cint](s, RCVTIMEO) + + # If rcvtimeout is set and not timeout argument is passed (or -1), use the existing timeout + # Otherwise update the rcvtimeout + let shouldUpdateTimeout = (timeout >= -1) and ((curtimeout > 0 and timeout > 0) or (curtimeout < 0)) + + if shouldUpdateTimeout: + s.setsockopt(RCVTIMEO, timeout.cint) + + result = receiveImpl(s, flags) + + if shouldUpdateTimeout: + s.setsockopt(RCVTIMEO, curtimeout.cint) + +proc tryReceive*(s: ZSocket, flags: ZSendRecvOptions = NOFLAGS): tuple[msgAvailable: bool, moreAvailable: bool, msg: string] = + ## Receives a message from a socket in a non-blocking way. + ## + ## Indicate whether a message was received or EAGAIN occured by ``msgAvailable`` + ## + ## Indicate if more parts are needed to be received by ``moreAvailable`` + result.moreAvailable = false + result.msgAvailable = false + + let status = getsockopt[cint](s, ZSockOptions.EVENTS).int() + # Check if socket has an incoming message + if (status and ZMQ_POLLIN) != 0: + result = receiveImpl(s, flags) + proc receive*(s: ZSocket, flags: ZSendRecvOptions = NOFLAGS): string = ## Receive a message on socket. - ## + # ## Return an empty string on EAGAIN - tryReceive(s, flags).msg + receiveImpl(s, flags).msg proc receiveAll*(s: ZSocket, flags: ZSendRecvOptions = NOFLAGS): seq[string] = ## Receive all parts of a message @@ -342,15 +418,28 @@ proc receiveAll*(s: ZSocket, flags: ZSendRecvOptions = NOFLAGS): seq[string] = ## If EAGAIN occurs without any data being received, it will be an empty seq var expectMessage = true while expectMessage: - let (msgAvailable, moreAvailable, msg) = tryReceive(s, flags) + let (msgAvailable, moreAvailable, msg) = receiveImpl(s, flags) if msgAvailable: result.add msg expectMessage = moreAvailable else: expectMessage = false +proc waitForReceive*(c: ZConnection, timeout: int = -1, flags: ZSendRecvOptions = NOFLAGS): tuple[msgAvailable: bool, moreAvailable: bool, msg: string] = + ## Set RCVTIMEO for the socket and wait until a message is available. + ## This function is blocking. + ## + ## timeout: + ## -1 means infinite wait + ## positive value is in milliseconds + ## negative value strictly below -1 are ignored and the wait time will default to RCVTIMEO set for the socket (which by default is -1). + ## + ## Indicate whether a message was received or EAGAIN occured by ``msgAvailable`` + ## Indicate if more parts are needed to be received by ``moreAvailable`` + waitForReceive(c.socket, timeout, flags) + proc tryReceive*(c: ZConnection, flags: ZSendRecvOptions = NOFLAGS): tuple[msgAvailable: bool, moreAvailable: bool, msg: string] = - ## Receives a message from a connection. + ## Receives a message from a socket in a non-blocking way. ## ## Indicate whether a message was received or EAGAIN occured by ``msgAvailable`` ## diff --git a/zmq/poller.nim b/zmq/poller.nim index 1984cc0..ef9ec06 100644 --- a/zmq/poller.nim +++ b/zmq/poller.nim @@ -12,6 +12,9 @@ type ## It is mandatory to manage the lifetimes of the polled sockets independently of the ``ZPoller`` - either manually or by using a ``ZConnection``. items*: seq[ZPollItem] +proc `=destroy`*(poll: Zpoller) = + `=destroy`(poll.items) + proc `[]`*(poller: ZPoller, idx: int): lent ZPollItem = ## Access registered element by index poller.items[idx]