Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 48 additions & 35 deletions les/serverpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func newServerPool(db ethdb.KeyValueStore, dbKey []byte, vt *lpc.ValueTracker, d
if oldState.Equals(sfWaitDialTimeout) && newState.IsEmpty() {
// dial timeout, no connection
s.setRedialWait(n, dialCost, dialWaitStep)
s.ns.SetState(n, nodestate.Flags{}, sfDialing, 0)
s.ns.SetStateSub(n, nodestate.Flags{}, sfDialing, 0)
}
})

Expand All @@ -193,10 +193,10 @@ func (s *serverPool) addPreNegFilter(input enode.Iterator, query queryFunc) enod
if rand.Intn(maxQueryFails*2) < int(fails) {
// skip pre-negotiation with increasing chance, max 50%
// this ensures that the client can operate even if UDP is not working at all
s.ns.SetState(n, sfCanDial, nodestate.Flags{}, time.Second*10)
s.ns.SetStateSub(n, sfCanDial, nodestate.Flags{}, time.Second*10)
// set canDial before resetting queried so that FillSet will not read more
// candidates unnecessarily
s.ns.SetState(n, nodestate.Flags{}, sfQueried, 0)
s.ns.SetStateSub(n, nodestate.Flags{}, sfQueried, 0)
return
}
go func() {
Expand All @@ -206,12 +206,15 @@ func (s *serverPool) addPreNegFilter(input enode.Iterator, query queryFunc) enod
} else {
atomic.StoreUint32(&s.queryFails, 0)
}
if q == 1 {
s.ns.SetState(n, sfCanDial, nodestate.Flags{}, time.Second*10)
} else {
s.setRedialWait(n, queryCost, queryWaitStep)
}
s.ns.SetState(n, nodestate.Flags{}, sfQueried, 0)
s.ns.Operation(func() {
// we are no longer running in the operation that the callback belongs to, start a new one because of setRedialWait
if q == 1 {
s.ns.SetStateSub(n, sfCanDial, nodestate.Flags{}, time.Second*10)
} else {
s.setRedialWait(n, queryCost, queryWaitStep)
}
s.ns.SetStateSub(n, nodestate.Flags{}, sfQueried, 0)
})
}()
}
})
Expand Down Expand Up @@ -240,18 +243,20 @@ func (s *serverPool) start() {
}
}
unixTime := s.unixTime()
s.ns.ForEach(sfHasValue, nodestate.Flags{}, func(node *enode.Node, state nodestate.Flags) {
s.calculateWeight(node)
if n, ok := s.ns.GetField(node, sfiNodeHistory).(nodeHistory); ok && n.redialWaitEnd > unixTime {
wait := n.redialWaitEnd - unixTime
lastWait := n.redialWaitEnd - n.redialWaitStart
if wait > lastWait {
// if the time until expiration is larger than the last suggested
// waiting time then the system clock was probably adjusted
wait = lastWait
s.ns.Operation(func() {
s.ns.ForEach(sfHasValue, nodestate.Flags{}, func(node *enode.Node, state nodestate.Flags) {
s.calculateWeight(node)
if n, ok := s.ns.GetField(node, sfiNodeHistory).(nodeHistory); ok && n.redialWaitEnd > unixTime {
wait := n.redialWaitEnd - unixTime
lastWait := n.redialWaitEnd - n.redialWaitStart
if wait > lastWait {
// if the time until expiration is larger than the last suggested
// waiting time then the system clock was probably adjusted
wait = lastWait
}
s.ns.SetStateSub(node, sfRedialWait, nodestate.Flags{}, time.Duration(wait)*time.Second)
}
s.ns.SetState(node, sfRedialWait, nodestate.Flags{}, time.Duration(wait)*time.Second)
}
})
})
}

Expand All @@ -261,9 +266,11 @@ func (s *serverPool) stop() {
if s.fillSet != nil {
s.fillSet.Close()
}
s.ns.ForEach(sfConnected, nodestate.Flags{}, func(n *enode.Node, state nodestate.Flags) {
// recalculate weight of connected nodes in order to update hasValue flag if necessary
s.calculateWeight(n)
s.ns.Operation(func() {
s.ns.ForEach(sfConnected, nodestate.Flags{}, func(n *enode.Node, state nodestate.Flags) {
// recalculate weight of connected nodes in order to update hasValue flag if necessary
s.calculateWeight(n)
})
})
s.ns.Stop()
}
Expand All @@ -279,9 +286,11 @@ func (s *serverPool) registerPeer(p *serverPeer) {

// unregisterPeer implements serverPeerSubscriber
func (s *serverPool) unregisterPeer(p *serverPeer) {
s.setRedialWait(p.Node(), dialCost, dialWaitStep)
s.ns.SetState(p.Node(), nodestate.Flags{}, sfConnected, 0)
s.ns.SetField(p.Node(), sfiConnectedStats, nil)
s.ns.Operation(func() {
s.setRedialWait(p.Node(), dialCost, dialWaitStep)
s.ns.SetStateSub(p.Node(), nodestate.Flags{}, sfConnected, 0)
s.ns.SetFieldSub(p.Node(), sfiConnectedStats, nil)
})
s.vt.Unregister(p.ID())
p.setValueTracker(nil, nil)
}
Expand Down Expand Up @@ -380,14 +389,16 @@ func (s *serverPool) serviceValue(node *enode.Node) (sessionValue, totalValue fl

// updateWeight calculates the node weight and updates the nodeWeight field and the
// hasValue flag. It also saves the node state if necessary.
// Note: this function should run inside a NodeStateMachine operation
func (s *serverPool) updateWeight(node *enode.Node, totalValue float64, totalDialCost uint64) {
weight := uint64(totalValue * nodeWeightMul / float64(totalDialCost))
if weight >= nodeWeightThreshold {
s.ns.SetState(node, sfHasValue, nodestate.Flags{}, 0)
s.ns.SetField(node, sfiNodeWeight, weight)
s.ns.SetStateSub(node, sfHasValue, nodestate.Flags{}, 0)
s.ns.SetFieldSub(node, sfiNodeWeight, weight)
} else {
s.ns.SetState(node, nodestate.Flags{}, sfHasValue, 0)
s.ns.SetField(node, sfiNodeWeight, nil)
s.ns.SetStateSub(node, nodestate.Flags{}, sfHasValue, 0)
s.ns.SetFieldSub(node, sfiNodeWeight, nil)
s.ns.SetFieldSub(node, sfiNodeHistory, nil)
}
s.ns.Persist(node) // saved if node history or hasValue changed
}
Expand All @@ -400,6 +411,7 @@ func (s *serverPool) updateWeight(node *enode.Node, totalValue float64, totalDia
// a significant amount of service value again its waiting time is quickly reduced or reset
// to the minimum.
// Note: node weight is also recalculated and updated by this function.
// Note 2: this function should run inside a NodeStateMachine operation
func (s *serverPool) setRedialWait(node *enode.Node, addDialCost int64, waitStep float64) {
n, _ := s.ns.GetField(node, sfiNodeHistory).(nodeHistory)
sessionValue, totalValue := s.serviceValue(node)
Expand Down Expand Up @@ -450,21 +462,22 @@ func (s *serverPool) setRedialWait(node *enode.Node, addDialCost int64, waitStep
if wait < waitThreshold {
n.redialWaitStart = unixTime
n.redialWaitEnd = unixTime + int64(nextTimeout)
s.ns.SetField(node, sfiNodeHistory, n)
s.ns.SetState(node, sfRedialWait, nodestate.Flags{}, wait)
s.ns.SetFieldSub(node, sfiNodeHistory, n)
s.ns.SetStateSub(node, sfRedialWait, nodestate.Flags{}, wait)
s.updateWeight(node, totalValue, totalDialCost)
} else {
// discard known node statistics if waiting time is very long because the node
// hasn't been responsive for a very long time
s.ns.SetField(node, sfiNodeHistory, nil)
s.ns.SetField(node, sfiNodeWeight, nil)
s.ns.SetState(node, nodestate.Flags{}, sfHasValue, 0)
s.ns.SetFieldSub(node, sfiNodeHistory, nil)
s.ns.SetFieldSub(node, sfiNodeWeight, nil)
s.ns.SetStateSub(node, nodestate.Flags{}, sfHasValue, 0)
}
}

// calculateWeight calculates and sets the node weight without altering the node history.
// This function should be called during startup and shutdown only, otherwise setRedialWait
// will keep the weights updated as the underlying statistics are adjusted.
// Note: this function should run inside a NodeStateMachine operation
func (s *serverPool) calculateWeight(node *enode.Node) {
n, _ := s.ns.GetField(node, sfiNodeHistory).(nodeHistory)
_, totalValue := s.serviceValue(node)
Expand Down
Loading