From 16f5c8d9156497b8ea354c88d6c4711380e668f0 Mon Sep 17 00:00:00 2001 From: darkestpigeon Date: Thu, 17 Oct 2024 12:06:01 +0200 Subject: [PATCH 1/3] add channel flushing on destroy, add tryRecv with Option[T] result --- threading/channels.nim | 23 ++++++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/threading/channels.nim b/threading/channels.nim index f9d7529..46b2a1b 100644 --- a/threading/channels.nim +++ b/threading/channels.nim @@ -100,7 +100,7 @@ runnableExamples("--threads:on --gc:orc"): when not (defined(gcArc) or defined(gcOrc) or defined(gcAtomicArc) or defined(nimdoc)): {.error: "This module requires one of --mm:arc / --mm:atomicArc / --mm:orc compilation flags".} -import std/[locks, isolation, atomics] +import std/[locks, isolation, atomics, options] # Channel # ------------------------------------------------------------------------------ @@ -256,10 +256,15 @@ type Chan*[T] = object ## Typed channel d: ChannelRaw +proc tryRecv*[T](c: Chan[T]): Option[T] {.inline, raises: [].} + template frees(c) = if c.d != nil: # this `fetchSub` returns current val then subs # so count == 0 means we're the last + while true: + if c.tryRecv().isNone: + break if c.d.atomicCounter.fetchSub(1, moAcquireRelease) == 0: freeChannel(c.d) @@ -343,6 +348,22 @@ proc tryRecv*[T](c: Chan[T], dst: var T): bool {.inline.} = ## Returns `false` and does not change `dist` if no message was received. channelReceive(c.d, dst.addr, sizeof(T), false) +proc tryRecv*[T](c: Chan[T]): Option[T] {.inline.} = + ## Tries to receive a message from the channel `c`. + ## + ## Doesn't block waiting for messages in the channel to become available. + ## Instead returns after an attempt to receive a message was made. + ## + ## .. warning:: In high-concurrency situations, consider using an exponential + ## backoff strategy to reduce contention and improve the success rate of + ## operations. + var dst: T + let dataAvailable = tryRecv(c, dst) + if dataAvailable: + result = some(dst) + else: + result = none(T) + proc send*[T](c: Chan[T], src: sink Isolated[T]) {.inline.} = ## Sends the message `src` to the channel `c`. ## This blocks the sending thread until `src` was successfully sent. From 14107877219b2725f684ec2575a9828f37e21ac2 Mon Sep 17 00:00:00 2001 From: darkestpigeon Date: Thu, 17 Oct 2024 16:13:51 +0200 Subject: [PATCH 2/3] removed tryRecv with Option[T] --- threading/channels.nim | 25 +++++-------------------- 1 file changed, 5 insertions(+), 20 deletions(-) diff --git a/threading/channels.nim b/threading/channels.nim index 46b2a1b..40de3f7 100644 --- a/threading/channels.nim +++ b/threading/channels.nim @@ -100,7 +100,7 @@ runnableExamples("--threads:on --gc:orc"): when not (defined(gcArc) or defined(gcOrc) or defined(gcAtomicArc) or defined(nimdoc)): {.error: "This module requires one of --mm:arc / --mm:atomicArc / --mm:orc compilation flags".} -import std/[locks, isolation, atomics, options] +import std/[locks, isolation, atomics] # Channel # ------------------------------------------------------------------------------ @@ -256,14 +256,15 @@ type Chan*[T] = object ## Typed channel d: ChannelRaw -proc tryRecv*[T](c: Chan[T]): Option[T] {.inline, raises: [].} +proc tryRecv*[T](c: Chan[T], dst: var T): bool {.inline, raises: [].} -template frees(c) = +template frees[T](c: Chan[T]) = if c.d != nil: # this `fetchSub` returns current val then subs # so count == 0 means we're the last while true: - if c.tryRecv().isNone: + var msg: T + if not c.tryRecv(msg): break if c.d.atomicCounter.fetchSub(1, moAcquireRelease) == 0: freeChannel(c.d) @@ -348,22 +349,6 @@ proc tryRecv*[T](c: Chan[T], dst: var T): bool {.inline.} = ## Returns `false` and does not change `dist` if no message was received. channelReceive(c.d, dst.addr, sizeof(T), false) -proc tryRecv*[T](c: Chan[T]): Option[T] {.inline.} = - ## Tries to receive a message from the channel `c`. - ## - ## Doesn't block waiting for messages in the channel to become available. - ## Instead returns after an attempt to receive a message was made. - ## - ## .. warning:: In high-concurrency situations, consider using an exponential - ## backoff strategy to reduce contention and improve the success rate of - ## operations. - var dst: T - let dataAvailable = tryRecv(c, dst) - if dataAvailable: - result = some(dst) - else: - result = none(T) - proc send*[T](c: Chan[T], src: sink Isolated[T]) {.inline.} = ## Sends the message `src` to the channel `c`. ## This blocks the sending thread until `src` was successfully sent. From 2d1f8607dbda4afb229f9b73d28a20b759f3cf16 Mon Sep 17 00:00:00 2001 From: darkestpigeon Date: Tue, 22 Oct 2024 19:28:29 +0200 Subject: [PATCH 3/3] fixed a bug (flushing was done without checking the refcount) --- threading/channels.nim | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/threading/channels.nim b/threading/channels.nim index 40de3f7..2e0693f 100644 --- a/threading/channels.nim +++ b/threading/channels.nim @@ -262,11 +262,11 @@ template frees[T](c: Chan[T]) = if c.d != nil: # this `fetchSub` returns current val then subs # so count == 0 means we're the last - while true: - var msg: T - if not c.tryRecv(msg): - break if c.d.atomicCounter.fetchSub(1, moAcquireRelease) == 0: + while true: + var msg: T + if not c.tryRecv(msg): + break freeChannel(c.d) when defined(nimAllowNonVarDestructor):