Skip to content
This repository has been archived by the owner on Dec 30, 2022. It is now read-only.

Commit

Permalink
2.0.0
Browse files Browse the repository at this point in the history
  • Loading branch information
niinivaa committed Aug 9, 2021
1 parent a2cb9db commit 459010f
Show file tree
Hide file tree
Showing 7 changed files with 146 additions and 804 deletions.
10 changes: 9 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,5 +45,13 @@ bus.push(topic "Dogs", "dog-related message")
bus.push(toIntSet([topic "Cats", topic "Dogs"]), "multitopical message")
bus.doDelivery()
jointhreads bus.stop()
bus.stop()
```


## Release notes for 2.0.0 (2021-08-09)
- Alternative *newSuber* procs removed, use the one remaining
- Push, Pull and Deliver callbacks must be registered via corresponding *setXXXCallback* procs
- new *doSynced* proc for running code in sync with message sending
- *stop* does not return a thread, joins the thread automatically instead
- other bug fixes and minor improvements
700 changes: 0 additions & 700 deletions docs/index.html

This file was deleted.

199 changes: 111 additions & 88 deletions src/suber.nim

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion suber.nimble
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Package

version = "1.0.0"
version = "2.0.0"
author = "Olli"
description = "Pub/Sub engine"
license = "MIT"
Expand Down
19 changes: 12 additions & 7 deletions tests/test_delivery.nim
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,16 @@ proc onBDeliver(messages: openArray[ptr SuberMessage[int]]) =
{.gcsafe.}:
for m in messages: bDeliveredmessages.incl(m.data)

let a = newSuber[int, TopicCount](onADeliver)
let b = newSuber[int, TopicCount](onBDeliver, 10000, 20000, 10000, 50)
let a = newSuber[int, TopicCount]()
let b = newSuber[int, TopicCount](10000, 20000, 10000, 50)

var lock: Lock
lock.initLock

a.setDeliverCallback(onADeliver)
b.setDeliverCallback(onBDeliver)
a.subscribe(1.Subscriber, 1.Topic, true)
b.subscribe(1.Subscriber, 1.Topic, true)

proc run(t: int) =
{.gcsafe.}:
Expand All @@ -36,16 +42,15 @@ proc run(t: int) =
b.push(1.Topic, data, 1)
withLock(lock): pushedmessages.incl(data)


var threads: array[ThreadCount, Thread[int]]
a.subscribe(1.Subscriber, 1.Topic, true)
b.subscribe(1.Subscriber, 1.Topic, true)
lock.initLock
echo "Multi-threaded delivery testing with ", ThreadCount * DeliveryCount, " messages"
for i in 0 ..< ThreadCount: createThread(threads[i], run, i)
joinThreads(threads)
a.doDelivery()
b.doDelivery()
joinThreads([a.stop(), b.stop()])
a.stop()
b.stop()
doAssert(pushedmessages.len == ThreadCount * DeliveryCount)
doAssert(pushedmessages.len == aDeliveredmessages.len)
doAssert(aDeliveredmessages.len == bDeliveredmessages.len)
Expand All @@ -55,7 +60,7 @@ echo "b Max channel queue length: ", b.getChannelQueueLengths[2]
sleep(1000)

#--------------------------
# gc:orc may SIGSEGV here, it is not a Suber bug!
# gc:orc may SIGSEGV here, it is not a Suber bug
pushedmessages.clear()
aDeliveredmessages.clear()
bDeliveredmessages.clear()
Expand Down
15 changes: 10 additions & 5 deletions tests/test_generic.nim
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,17 @@ proc onADeliver(messages: openArray[ptr Message]) = aDeliveredmessages += messag

proc onBDeliver(messages: openArray[ptr Message]) {.gcsafe, raises:[].} = bDeliveredmessages += messages.len

let a: Suber[MessageData, MaxTopics] = newSuber[MessageData, MaxTopics](onAPush, onADeliver)
let a: Suber[MessageData, MaxTopics] = newSuber[MessageData, MaxTopics]()

let b: Suber[MessageData, MaxTopics] = newSuber[MessageData, MaxTopics](onBPush, onBDeliver, 1000, 20, 5, 10)
let b: Suber[MessageData, MaxTopics] = newSuber[MessageData, MaxTopics](1000, 20, 5, 10)

var rounds = 0

a.setPushCallback(onAPush)
b.setPushCallback(onBpush)
a.setDeliverCallback(onADeliver)
b.setDeliverCallback(onBDeliver)

proc addTopic() =
if a.getTopiccount() == MaxTopics: return
addtopics.inc
Expand All @@ -68,7 +73,6 @@ proc removeTopic() =
if a.getSubscriptions(subscribers[s]).len == 0: nomores.add(subscribers[s])
for n in nomores: subscribers.del(subscribers.find(n))

from os import sleep
proc addSubscriber() =
addsubscribers.inc
if topics.len == 0: addTopic()
Expand Down Expand Up @@ -121,8 +125,9 @@ proc run() =
b.doDelivery()

run()
joinThread a.stop()
joinThread b.stop()
a.removeTopic(255)
a.stop()
b.stop()
doAssert(publishedmessages > 1000)
doAssert(publishedmessages == aPushedmessages)
doAssert(aPushedmessages == bPushedmessages)
Expand Down
5 changes: 3 additions & 2 deletions tests/test_speed.nim
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ var messagecount: int

proc onDeliver(messages: openArray[ptr SuberMessage[int]]) = {.gcsafe.}: discard messagecount.atomicInc(messages.len)

let bus = newSuber[int, 1](onDeliver, 1000000, 100000, 100)
let bus = newSuber[int, 1](1000000, 100000, 100)
bus.setDeliverCallback(onDeliver)
bus.subscribe(1.Subscriber, 1.Topic, true)

var stop: bool
Expand All @@ -26,5 +27,5 @@ for i in 0 ..< ThreadCount: createThread(threads[i], run)
sleep TestDuration * 1000
stop = true
joinThreads(threads)
joinThread bus.stop()
bus.stop()
echo messagecount div TestDuration, " msg/s"

0 comments on commit 459010f

Please sign in to comment.