Skip to content
This repository has been archived by the owner on Dec 30, 2022. It is now read-only.

Commit

Permalink
2.0.1
Browse files Browse the repository at this point in the history
  • Loading branch information
niinivaa committed Sep 7, 2021
1 parent 427f872 commit 52da55c
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 24 deletions.
8 changes: 5 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,12 @@ bus.doDelivery()
bus.stop()
```


## Release notes for 2.0.0 (2021-08-09)
- Alternative *newSuber* procs removed, use the one remaining
- alternative *newSuber* procs removed, use the one remaining
- Push, Pull and Deliver callbacks must be registered via corresponding *setXXXCallback* procs
- new *doSynced* proc for running code in sync with message sending
- *stop* does not return a thread, joins the thread automatically instead
- other bug fixes and minor improvements
- other bug fixes and minor improvements

## Release notes for 2.0.1 (2021-09-07)
- bug fix: resurrected filling of expired topics on pull
73 changes: 55 additions & 18 deletions src/suber.nim
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@
when not compileOption("threads"): {.fatal: "Suber requires threads:on compiler option".}
when not defined(gcDestructors): {.fatal: "Suber requires gc:arc or orc compiler option".}

import intsets, std/monotimes, stashtable
import intsets, std/monotimes, tables, stashtable
export intsets, monotimes

type
Expand Down Expand Up @@ -162,6 +162,7 @@ type

when not defined(nimdoc):
type Suber*[TData; SuberMaxTopics: static int] = ref object
channelqueuesize: int
state: SuberState
CacheMaxCapacity: int
CacheMaxLength: int
Expand All @@ -174,6 +175,7 @@ when not defined(nimdoc):
pushCallback: PushCallback[TData]
pullCallback: PullCallback[TData]
subscribers: StashTable[Topic, ref IntSet, SuberMaxTopics]
topicexpirations: Table[Topic, MonoTime]
channel: Channel[SuberMessage[TData]]
thread: Thread[Suber[TData, SuberMaxTopics]]
peakchannelqueuelength: int
Expand Down Expand Up @@ -226,6 +228,7 @@ proc initSuber[TData; SuberMaxTopics](
suber.CacheMaxLength = cachelength
suber.MaxDelivery = maxdelivery
suber.subscribers = newStashTable[Topic, ref IntSet, SuberMaxTopics]()
suber.channelqueuesize = channelsize
suber.channel.open(channelsize)
suber.pushCallback = onPush
suber.deliverCallback = onDeliver
Expand Down Expand Up @@ -265,13 +268,15 @@ proc stopImmediately*[TData; SuberMaxTopics](suber: Suber[TData, SuberMaxTopics]
suber.state = InstantStop
if suber.thread.running: suber.channel.send(SuberMessage[TData](kind: smNil))

proc getChannelQueueLengths*(suber: Suber): (int, int, int) =
proc getChannelQueueLengths*(suber: Suber): (int, int, int) {.inline.} =
## Reports amounts of buffered messages in channel queue for monitoring and backpressure purposes:
## | first field: Current number of messages in the channel buffer
## | second field: Peak number of queued messages since queue was empty
## | third field: Maximum number of queued messages ever
(suber.channel.peek(), suber.peakchannelqueuelength, suber.maxchannelqueuelength)


proc getChannelQueueSize*(suber: Suber): int = suber.channelqueuesize

# topics ----------------------------------------------------

proc addTopic*(suber: Suber, topic: Topic | int): bool {.discardable.} =
Expand All @@ -286,6 +291,7 @@ proc addTopic*(suber: Suber, topic: Topic | int): bool {.discardable.} =

proc removeTopic*(suber: Suber, topic: Topic | int) =
suber.subscribers.del(topic)
suber.topicexpirations.del(topic)

proc hasTopic*(suber: Suber, topic: Topic | int): bool =
not (findIndex(suber.subscribers, topic) == NotInStash)
Expand Down Expand Up @@ -350,7 +356,20 @@ proc getSubscribers*(suber: Suber, message: ptr SuberMessage, toset: var IntSet,
suber.subscribers.withValue(Topic(topic)): toset.incl(value[][])

proc isSubscriber*(suber: Suber, subscriber: Subscriber, topic: Topic): bool =
suber.subscribers.withValue(topic): return value[][].contains(subscriber)
suber.subscribers.withValue(topic): return value[][].contains(int(subscriber))

template testSubscriber() =
suber.subscribers.withValue(topic):
if not value[][].contains(int(subscriber)): return false
do: return false

proc isSubscriber*(suber: Suber, subscriber: Subscriber, topics: openArray[Topic]): bool =
for topic in topics: testSubscriber()
true

proc isSubscriber*(suber: Suber, subscriber: Subscriber, topics: IntSet): bool =
for topic in topics.items(): testSubscriber()
true

# deliver ------------------------------------------------

Expand Down Expand Up @@ -395,8 +414,10 @@ template evictCache() =
var current = suber.head + 1
if(unlikely) current == suber.cache.len: current = 0
var evictedsize = 0
while evictedsize < message.size: # TODO: we could evict more, now that we are at it
while evictedsize < message.size:
if suber.cache[current].kind == smMessage:
for topic in suber.cache[current].topics:
suber.topicexpirations[topic] = suber.cache[current].timestamp
evictedsize += suber.cache[current].size
`=destroy`(suber.cache[current])
suber.cache[current] = SuberMessage[TData](kind: smNil)
Expand All @@ -422,6 +443,8 @@ template handlePush() =
if (unlikely) suber.head == suber.CacheMaxLength: suber.head = 0
if suber.cache[suber.head].kind == smMessage:
suber.cachesize -= suber.cache[suber.head].size
for topic in suber.cache[suber.head].topics:
suber.topicexpirations[topic] = suber.cache[suber.head].timestamp
suber.cache[suber.head] = move message
else: suber.cache.add(message)

Expand All @@ -435,40 +458,54 @@ template handlePush() =

# pull ----------------------------------------------------

proc pull*[TData; SuberMaxTopics](suber: Suber[TData, SuberMaxTopics], subscriber: Subscriber | int, topics: sink IntSet, aftertimestamp: sink MonoTime) =
{.push hints:off.}
proc pull*[TData; SuberMaxTopics](suber: Suber[TData, SuberMaxTopics], subscriber: Subscriber | int, topics: sink IntSet, aftertimestamp: sink MonoTime): bool {.discardable.} =
## Requests messages after given timestamp and belonging to certain topics.
## | `suber`: service
## | `subscriber`: will be passed to callback
## | `topics`: set of topics that are of interest
## | `aftertimestamp`: only messages published after this timestamp will be pulled
if (unlikely) suber.state != Running: return
if(unlikely) suber.head == -1 or topics.len == 0: return
##
## Returns false, if there was nothing to pull.
assert(suber.pullCallback != nil, "call setPullCallback before pull")
if (unlikely) suber.state != Running: return false
if(unlikely) suber.head == -1 or topics.len == 0: return false
for topic in topics:
suber.subscribers.withValue(Topic(topic)):
if not value[][].contains(int(subscriber)): return
if not value[][].contains(int(subscriber)): return false
do:
return
suber.channel.send(SuberMessage[TData](kind: smPull, subscriber: Subscriber(subscriber),
pulltopics: move topics, aftertimestamp: move aftertimestamp))

proc pull*[TData; SuberMaxTopics](suber: Suber[TData, SuberMaxTopics], subscriber: Subscriber | int, topic: Topic | int, aftertimestamp: sink MonoTime) =
return false
when subscriber is int:
suber.channel.send(SuberMessage[TData](kind: smPull, subscriber: Subscriber(subscriber),
pulltopics: move topics, aftertimestamp: move aftertimestamp))
else:
suber.channel.send(SuberMessage[TData](kind: smPull, subscriber: subscriber,
pulltopics: move topics, aftertimestamp: move aftertimestamp))
return true
{.pop.}

proc pull*[TData; SuberMaxTopics](suber: Suber[TData, SuberMaxTopics], subscriber: Subscriber | int, topic: Topic | int, aftertimestamp: sink MonoTime): bool {.discardable.} =
var topicset = initIntSet()
topicset.incl(int(topic))
pull(suber, subscriber, topicset, aftertimestamp)
return pull(suber, subscriber, topicset, aftertimestamp)

proc pullAll*[TData; SuberMaxTopics](suber: Suber[TData, SuberMaxTopics], subscriber: Subscriber | int, topic: Topic | int, aftertimestamp: sink MonoTime) =
proc pullAll*[TData; SuberMaxTopics](suber: Suber[TData, SuberMaxTopics], subscriber: Subscriber | int, aftertimestamp: sink MonoTime): bool {.discardable.} =
var topicset = initIntSet()
for (topic , index) in suber.subscribers.keys():
suber.subscribers.withFound(topic, index):
if value[][].contains(int(subscriber)): topicset.incl(int(topic))
pull(suber, subscriber, topicset, aftertimestamp)
return pull(suber, subscriber, topicset, aftertimestamp)

template handlePull() =
var expiredtopics: seq[Topic]
var messages: seq[ptr SuberMessage[TData]]

var remainingtopics = initIntSet()
for topic in message.pulltopics.items(): remainingtopics.incl(topic)
for topic in message.pulltopics.items():
let expiration = suber.topicexpirations.getOrDefault(topic)
if expiration > message.aftertimestamp: expiredtopics.add(topic)
else: remainingtopics.incl(topic)

var current = suber.head + 1
var wrapped = false
while true:
Expand Down
2 changes: 1 addition & 1 deletion suber.nimble
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Package

version = "2.0.0"
version = "2.0.1"
author = "Olli"
description = "Pub/Sub engine"
license = "MIT"
Expand Down
2 changes: 1 addition & 1 deletion tests/test_delivery.nim
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ echo "b Max channel queue length: ", b.getChannelQueueLengths[2]
sleep(1000)

#--------------------------
# gc:orc may SIGSEGV here, it is not a Suber bug
# gc:orc SIGSEGVs here, it is not a Suber bug (use gc:arc or don't clear your IntSets...)
pushedmessages.clear()
aDeliveredmessages.clear()
bDeliveredmessages.clear()
Expand Down
2 changes: 1 addition & 1 deletion tests/test_find.nim
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,4 @@ bus.addTopic(1.Topic)
for x in 0 .. ids.high: bus.push(1.Topic, rand(int.high))
while bus.getChannelQueueLengths[0] > 0: sleep(10)
for x in 0 .. ids.high: bus.find(ids[rand(x)], onFind)
joinThread bus.stop()
bus.stop()

0 comments on commit 52da55c

Please sign in to comment.