-
Notifications
You must be signed in to change notification settings - Fork 146
fix(dot/peerset): remove race conditions from peerset package
#2267
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 11 commits
6a7d933
b0eb77f
98cb4bc
a6e97fd
d8f6700
a93fafe
858a72b
55dca9c
f0c0e10
1f39a56
5366804
3695853
36ab22e
fc0cf71
147575c
8e693b5
a340adc
a1d5391
39e1974
3773559
71c5ea4
78a5468
50ef6cb
4f285ee
237cfe9
d02fafc
a4adf71
1b49a64
9d4c81a
af4380e
c184ccf
5fe401c
ca7f676
60791f1
4856eb2
ad060fe
2c870d2
249eecf
9dcb348
c726949
9946ead
aab7e3a
7d2f7c7
266d7a2
8a52a8a
0dbad30
2207aad
04221fa
0dffe8a
77784e2
d563386
df15a5d
a3b6ea8
3480177
233d2eb
9315906
cb763ab
53987c1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -5,17 +5,17 @@ package peerset | |
|
|
||
| import ( | ||
| "context" | ||
| "fmt" | ||
| "strings" | ||
|
|
||
| "github.com/libp2p/go-libp2p-core/peer" | ||
| ) | ||
|
|
||
| const logStringPattern = "call=%s, set-id=%d, reputation change %v, peers=[%s]" | ||
|
|
||
| // Handler manages peerSet. | ||
| type Handler struct { | ||
| actionQueue chan<- action | ||
| peerSet *PeerSet | ||
| closeCh chan struct{} | ||
|
|
||
| cancelCtx context.CancelFunc | ||
| peerSet *PeerSet | ||
| } | ||
|
|
||
| // NewPeerSetHandler creates a new *peerset.Handler. | ||
|
|
@@ -30,80 +30,82 @@ func NewPeerSetHandler(cfg *ConfigSet) (*Handler, error) { | |
| }, nil | ||
| } | ||
|
|
||
| // SetReservedOnlyPeer not yet implemented | ||
| func (h *Handler) SetReservedOnlyPeer(setID int, peers ...peer.ID) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I feel like these functions should return an error now, and the caller should log it. |
||
| // TODO: not yet implemented (#1888) | ||
| logger.Errorf("failed to do action %s on peerSet: not implemented yet", setReservedOnly) | ||
| } | ||
|
|
||
| // AddReservedPeer adds reserved peer into peerSet. | ||
| func (h *Handler) AddReservedPeer(setID int, peers ...peer.ID) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we not want any of these functions returning errors?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes I'm curious here as well 🤔 Same wondering question for the below methods logging errors.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. All these methods were intended to be non-blocking, so the caller executes a handler method that will dispatch a message through a channel, and if an error occurs during the message it just logs. Now I have removed the channel and as all those methods was not expected to return any error I keep the log
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it should return an error and the caller should log the error. |
||
| h.actionQueue <- action{ | ||
| actionCall: addReservedPeer, | ||
| setID: setID, | ||
| peers: peers, | ||
| err := h.peerSet.addReservedPeers(setID, peers...) | ||
| if err != nil { | ||
| msg := fmt.Sprintf(logStringPattern, addReservedPeer, setID, nil, stringfyPeers(peers)) | ||
|
EclesioMeloJunior marked this conversation as resolved.
Outdated
|
||
| logger.Errorf("failed to do action %s on peerSet: %s", msg, err) | ||
| } | ||
| } | ||
|
|
||
| // RemoveReservedPeer remove reserved peer from peerSet. | ||
|
EclesioMeloJunior marked this conversation as resolved.
|
||
| func (h *Handler) RemoveReservedPeer(setID int, peers ...peer.ID) { | ||
| h.actionQueue <- action{ | ||
| actionCall: removeReservedPeer, | ||
| setID: setID, | ||
| peers: peers, | ||
| err := h.peerSet.removeReservedPeers(setID, peers...) | ||
| if err != nil { | ||
| msg := fmt.Sprintf(logStringPattern, removeReservedPeer, setID, nil, stringfyPeers(peers)) | ||
| logger.Errorf("failed to do action %s on peerSet: %s", msg, err) | ||
|
qdm12 marked this conversation as resolved.
Outdated
|
||
| } | ||
| } | ||
|
|
||
| // SetReservedPeer set the reserve peer into peerSet | ||
|
EclesioMeloJunior marked this conversation as resolved.
|
||
| func (h *Handler) SetReservedPeer(setID int, peers ...peer.ID) { | ||
| h.actionQueue <- action{ | ||
| actionCall: setReservedPeers, | ||
| setID: setID, | ||
| peers: peers, | ||
| // TODO: this is not used yet, might required to implement RPC Call for this. | ||
|
EclesioMeloJunior marked this conversation as resolved.
Outdated
|
||
| err := h.peerSet.setReservedPeer(setID, peers...) | ||
| if err != nil { | ||
| msg := fmt.Sprintf(logStringPattern, setReservedPeers, setID, nil, stringfyPeers(peers)) | ||
| logger.Errorf("failed to do action %s on peerSet: %s", msg, err) | ||
| } | ||
| } | ||
|
|
||
| // AddPeer adds peer to peerSet. | ||
| func (h *Handler) AddPeer(setID int, peers ...peer.ID) { | ||
| h.actionQueue <- action{ | ||
| actionCall: addToPeerSet, | ||
| setID: setID, | ||
| peers: peers, | ||
| err := h.peerSet.addPeer(setID, peers) | ||
| if err != nil { | ||
| msg := fmt.Sprintf(logStringPattern, addToPeerSet, setID, nil, stringfyPeers(peers)) | ||
| logger.Errorf("failed to do action %s on peerSet: %s", msg, err) | ||
|
qdm12 marked this conversation as resolved.
Outdated
|
||
| } | ||
| } | ||
|
|
||
| // RemovePeer removes peer from peerSet. | ||
| func (h *Handler) RemovePeer(setID int, peers ...peer.ID) { | ||
| h.actionQueue <- action{ | ||
| actionCall: removeFromPeerSet, | ||
| setID: setID, | ||
| peers: peers, | ||
| err := h.peerSet.removePeer(setID, peers...) | ||
| if err != nil { | ||
| msg := fmt.Sprintf(logStringPattern, removeFromPeerSet, setID, nil, stringfyPeers(peers)) | ||
| logger.Errorf("failed to do action %s on peerSet: %s", msg, err) | ||
| } | ||
| } | ||
|
|
||
| // ReportPeer reports ReputationChange according to the peer behaviour. | ||
| func (h *Handler) ReportPeer(rep ReputationChange, peers ...peer.ID) { | ||
| h.actionQueue <- action{ | ||
| actionCall: reportPeer, | ||
| reputation: rep, | ||
| peers: peers, | ||
| err := h.peerSet.reportPeer(rep, peers...) | ||
| if err != nil { | ||
| msg := fmt.Sprintf(logStringPattern, reportPeer, 0, rep, stringfyPeers(peers)) | ||
| logger.Errorf("failed to do action %s on peerSet: %s", msg, err) | ||
| } | ||
| } | ||
|
|
||
| // Incoming calls when we have an incoming connection from peer. | ||
| func (h *Handler) Incoming(setID int, peers ...peer.ID) { | ||
| h.actionQueue <- action{ | ||
| actionCall: incoming, | ||
| peers: peers, | ||
| setID: setID, | ||
| err := h.peerSet.incoming(setID, peers...) | ||
| if err != nil { | ||
| msg := fmt.Sprintf(logStringPattern, incoming, setID, nil, stringfyPeers(peers)) | ||
| logger.Errorf("failed to do action %s on peerSet: %s", msg, err) | ||
| } | ||
| } | ||
|
|
||
| // Messages return result message chan. | ||
| func (h *Handler) Messages() chan Message { | ||
| return h.peerSet.resultMsgCh | ||
| } | ||
|
|
||
| // DisconnectPeer calls for disconnecting a connection from peer. | ||
| func (h *Handler) DisconnectPeer(setID int, peers ...peer.ID) { | ||
| h.actionQueue <- action{ | ||
| actionCall: disconnect, | ||
| setID: setID, | ||
| peers: peers, | ||
| err := h.peerSet.disconnect(setID, UnknownDrop, peers...) | ||
| if err != nil { | ||
| msg := fmt.Sprintf(logStringPattern, disconnect, setID, nil, stringfyPeers(peers)) | ||
| logger.Errorf("failed to do action %s on peerSet: %s", msg, err) | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -113,40 +115,24 @@ func (h *Handler) PeerReputation(peerID peer.ID) (Reputation, error) { | |
| if err != nil { | ||
| return 0, err | ||
| } | ||
| return n.getReputation(), nil | ||
| return n.rep, nil | ||
| } | ||
|
|
||
| // Start starts peerSet processing | ||
| func (h *Handler) Start(ctx context.Context) { | ||
| ctx, cancel := context.WithCancel(ctx) | ||
| h.cancelCtx = cancel | ||
|
|
||
| actionCh := make(chan action, msgChanSize) | ||
| h.closeCh = make(chan struct{}) | ||
| h.actionQueue = actionCh | ||
|
|
||
| h.peerSet.start(ctx, actionCh) | ||
| func (h *Handler) Start(ctx context.Context, processMessageFn func(Message)) { | ||
| h.peerSet.start(ctx, processMessageFn) | ||
| } | ||
|
|
||
| // SortedPeers return chan for sorted connected peer in the peerSet. | ||
|
EclesioMeloJunior marked this conversation as resolved.
|
||
| func (h *Handler) SortedPeers(setIdx int) chan peer.IDSlice { | ||
| resultPeersCh := make(chan peer.IDSlice) | ||
| h.actionQueue <- action{ | ||
| actionCall: sortedPeers, | ||
| resultPeersCh: resultPeersCh, | ||
| setID: setIdx, | ||
| } | ||
|
|
||
| return resultPeersCh | ||
| func (h *Handler) SortedPeers(setIdx int) peer.IDSlice { | ||
| return h.peerSet.peerState.sortedPeers(setIdx) | ||
| } | ||
|
|
||
| // Stop closes the actionQueue and result message chan. | ||
| func (h *Handler) Stop() { | ||
| select { | ||
| case <-h.closeCh: | ||
| default: | ||
| h.cancelCtx() | ||
| close(h.closeCh) | ||
| close(h.actionQueue) | ||
| func stringfyPeers(peers peer.IDSlice) string { | ||
|
EclesioMeloJunior marked this conversation as resolved.
Outdated
|
||
| peersStrings := make([]string, len(peers)) | ||
| for i := range peers { | ||
| peersStrings[i] = peers[i].String() | ||
| } | ||
|
|
||
| return strings.Join(peersStrings, ", ") | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.