-
Notifications
You must be signed in to change notification settings - Fork 110
network: Kademlia Load Balancing #1774
Changes from 23 commits
0f31b5a
c1c9ac5
fc65d5a
7dc9568
ca11c52
97ab47e
f0fc99d
0941717
e9263d7
ed1f9a9
904e204
29303f1
e53ae25
f82db7f
0e346e6
5665ea9
4c81a24
5b44ab3
52dacb0
3301920
49e7d09
ad5eac5
e78e2b3
21a0d2f
c615cdd
340ce15
96a8245
697b049
5cb6d46
abc51ab
9f951d1
d8f8059
c34062c
e7eefaf
3019822
2879509
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,126 @@ | ||
| package gopubsub | ||
|
kortatu marked this conversation as resolved.
Outdated
kortatu marked this conversation as resolved.
Outdated
|
||
|
|
||
| import ( | ||
| "fmt" | ||
| "strconv" | ||
| "sync" | ||
| ) | ||
|
|
||
| //PubSubChannel represents a pubsub system where subscriber can .Subscribe() and publishers can .Publish() or .Close(). | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @kortatu Please have a look at @janos implementation of a similar mechanism with
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Great, I will take a look at it.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have read
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I'm not sure this little amount of code justifies being generalized into a component in its own
I'm not really following...
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If you stop a channel from the reader side and the writer tries to write, you get an error. A channel should be always closed by the writer.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Which tests are testing this pubsub implementation? I see no tests in gopubsub package. And I think that we have tests for
I am sorry if you feel offended. But, PR reviews because of any code change I think that it is ok to question any code or comment in PR review. Even, my questions are mainly about problems that you mentioned that you have found in other places. I do not think that anybody has any objections on abstracting common functionality, I think that the abstraction implementation is in question here.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I forgot to commit the test file.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see what you are referring to for SubscribeToNeighbourhoodDepthChange, but since the channel is removed in the same function and there is syncOnce protecting the close, there should be no possibility for both send to the closed channel and close of the closed channel. This can be done with additional channel, not to close the returned channel from the client side but with more complexity. I found that this approach solved both problems in a simpler way since there already is Kademlia.lock. In localstore SubscribePush a different approach is made and the returned channel is not closed on client side, but that subscription has different functionality then SubscribeToNeighbourhoodDepthChange.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So it is working because the user of the API (the code subscribing) is doing things "kindly". I think an interface should not rely on that. As a generic publish/subscriber channel, I think that my implementation is safer.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have nothing to add. Thanks. |
||
| type PubSubChannel struct { | ||
| subscriptions []*Subscription | ||
| subsMutex sync.RWMutex | ||
| nextId int | ||
| } | ||
|
|
||
| //Subscription is created in PubSubChannel using pubSub.Subscribe(). Subscribers can receive using .ReceiveChannel(). | ||
|
kortatu marked this conversation as resolved.
Outdated
|
||
| // or .Unsubscribe() | ||
| type Subscription struct { | ||
| closed bool | ||
| removeSub func() | ||
|
janos marked this conversation as resolved.
Outdated
|
||
| signal chan interface{} | ||
| closeOnce sync.Once | ||
| id string | ||
| } | ||
|
|
||
| //New creates a new PubSubChannel. | ||
| func New() *PubSubChannel { | ||
| return &PubSubChannel{ | ||
| subscriptions: make([]*Subscription, 0), | ||
| } | ||
| } | ||
|
|
||
| //Subscribe creates a subscription to a channel, each subscriber should keep its own Subscription instance. | ||
| func (psc *PubSubChannel) Subscribe() *Subscription { | ||
| psc.subsMutex.Lock() | ||
| defer psc.subsMutex.Unlock() | ||
| newSubscription := newSubscription(strconv.Itoa(psc.nextId)) | ||
| psc.nextId++ | ||
| psc.subscriptions = append(psc.subscriptions, &newSubscription) | ||
| newSubscription.removeSub = func() { | ||
| psc.subsMutex.Lock() | ||
| defer psc.subsMutex.Unlock() | ||
|
|
||
| for i, subscription := range psc.subscriptions { | ||
| if subscription.signal == newSubscription.signal { | ||
| fmt.Println("Unsubscribing", "id", subscription.id) | ||
|
kortatu marked this conversation as resolved.
Outdated
|
||
| subscription.closed = true | ||
|
nolash marked this conversation as resolved.
Outdated
|
||
| psc.subscriptions = append(psc.subscriptions[:i], psc.subscriptions[i+1:]...) | ||
|
nolash marked this conversation as resolved.
Outdated
|
||
| } | ||
| } | ||
| } | ||
| return &newSubscription | ||
| } | ||
|
|
||
| //Publish broadcasts a message asynchronously to each subscriber. | ||
| //If some of the subscriptions(channels) has been marked as closeable, it does it now. | ||
| func (psc *PubSubChannel) Publish(msg interface{}) { | ||
| psc.subsMutex.RLock() | ||
| defer psc.subsMutex.RUnlock() | ||
| for i, sub := range psc.subscriptions { | ||
| if sub.closed { | ||
| fmt.Println("Subscription was closed", "id", sub.id) | ||
|
kortatu marked this conversation as resolved.
Outdated
|
||
| sub.closeChannel() | ||
|
kortatu marked this conversation as resolved.
Outdated
|
||
| } else { | ||
| go func(sub *Subscription, index int) { | ||
| sub.signal <- msg | ||
|
kortatu marked this conversation as resolved.
Outdated
|
||
| }(sub, i) | ||
|
|
||
| } | ||
| } | ||
| } | ||
|
|
||
| //NumSubscriptions returns how many subscriptions are currently active. | ||
| func (psc *PubSubChannel) NumSubscriptions() int { | ||
| psc.subsMutex.RLock() | ||
| defer psc.subsMutex.RUnlock() | ||
| return len(psc.subscriptions) | ||
| } | ||
|
|
||
| //Close cancels all subscriptions closing the channels associated with them. | ||
| //Usually the publisher is in charge of calling Close(). | ||
| func (psc *PubSubChannel) Close() { | ||
| psc.subsMutex.Lock() | ||
| defer psc.subsMutex.Unlock() | ||
| for _, sub := range psc.subscriptions { | ||
| sub.closed = true | ||
| sub.closeChannel() | ||
| } | ||
| } | ||
|
|
||
| //Unsubscribe cancels subscription from the subscriber side. Channel is marked as closed but only writer should close it. | ||
| func (sub *Subscription) Unsubscribe() { | ||
| sub.closed = true | ||
| sub.removeSub() | ||
| } | ||
|
|
||
| //ReceiveChannel returns the channel where the subscriber will receive messages. | ||
| func (sub *Subscription) ReceiveChannel() <-chan interface{} { | ||
| return sub.signal | ||
| } | ||
|
|
||
| //IsClosed returns if the subscription is closed via Unsubscribe() or Close() in the pubSub that creates it. | ||
| func (sub *Subscription) IsClosed() bool { | ||
|
kortatu marked this conversation as resolved.
Outdated
|
||
| return sub.closed | ||
| } | ||
|
|
||
| //ID returns a unique id in the PubSubChannel of this subscription. Useful for debugging. | ||
| func (sub *Subscription) ID() string { | ||
|
kortatu marked this conversation as resolved.
Outdated
|
||
| return sub.id | ||
| } | ||
|
|
||
| func (sub *Subscription) closeChannel() { | ||
| sub.closeOnce.Do(func() { | ||
| close(sub.signal) | ||
| }) | ||
| } | ||
|
|
||
| func newSubscription(id string) Subscription { | ||
| return Subscription{ | ||
| closed: false, | ||
| removeSub: nil, | ||
|
janos marked this conversation as resolved.
Outdated
|
||
| signal: make(chan interface{}), | ||
| closeOnce: sync.Once{}, | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No need to initialise value, it is implicit.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, but id doesn't hurt to make it explicit, it is clear for the reader. Swarm code is full of false boolean initialisations.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I disagree that it is better, it is obvious from the struct declaration. |
||
| id: id, | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.