Skip to content

Commit

Permalink
Merge pull request onflow#5084 from onflow/khalil/4979-scoring-regist…
Browse files Browse the repository at this point in the history
…ry-startup-silence-period

[Networking] Implements a silence period for GossipSub peer scoring.
  • Loading branch information
kc1116 authored Jan 18, 2024
2 parents f040031 + bb1d6a3 commit aeb29e8
Show file tree
Hide file tree
Showing 6 changed files with 218 additions and 28 deletions.
7 changes: 7 additions & 0 deletions config/default-config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,13 @@ network-config:
# peers for their contribution to the network and prioritize them in neighbor selection.
staked-identity-reward: 100
scoring-registry:
# Defines the duration of time, after the node startup,
# during which the scoring registry remains inactive before penalizing nodes.
# Throughout this startup silence period, the application-specific penalty
# returned for all nodes will be 0, and any invalid control message notifications
# will be ignored. This configuration allows nodes to stabilize and initialize before
# applying penalties or processing invalid control message notifications.
startup-silence-duration: 1h
app-specific-score:
# number of workers that asynchronously update the app specific score requests when they are expired.
score-update-worker-num: 5
Expand Down
4 changes: 4 additions & 0 deletions network/netconf/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ func AllFlagNames() []string {
BuildFlagName(gossipsubKey, p2pconfig.ScoreParamsKey, p2pconfig.PeerScoringKey, p2pconfig.ProtocolKey, p2pconfig.AppSpecificKey, p2pconfig.MaxAppSpecificKey, p2pconfig.RewardKey),
BuildFlagName(gossipsubKey, p2pconfig.ScoreParamsKey, p2pconfig.PeerScoringKey, p2pconfig.ProtocolKey, p2pconfig.AppSpecificKey, p2pconfig.StakedIdentityKey, p2pconfig.RewardKey),

BuildFlagName(gossipsubKey, p2pconfig.ScoreParamsKey, p2pconfig.ScoringRegistryKey, p2pconfig.StartupSilenceDurationKey),
BuildFlagName(gossipsubKey, p2pconfig.ScoreParamsKey, p2pconfig.ScoringRegistryKey, p2pconfig.AppSpecificScoreRegistryKey, p2pconfig.ScoreUpdateWorkerNumKey),
BuildFlagName(gossipsubKey, p2pconfig.ScoreParamsKey, p2pconfig.ScoringRegistryKey, p2pconfig.AppSpecificScoreRegistryKey, p2pconfig.ScoreUpdateRequestQueueSizeKey),
BuildFlagName(gossipsubKey, p2pconfig.ScoreParamsKey, p2pconfig.ScoringRegistryKey, p2pconfig.AppSpecificScoreRegistryKey, p2pconfig.ScoreTTLKey),
Expand Down Expand Up @@ -391,6 +392,9 @@ func InitializeNetworkFlags(flags *pflag.FlagSet, config *Config) {
config.GossipSub.ScoringParameters.PeerScoring.Protocol.AppSpecificScore.StakedIdentityReward,
"the reward for staking peers")

flags.Duration(BuildFlagName(gossipsubKey, p2pconfig.ScoreParamsKey, p2pconfig.ScoringRegistryKey, p2pconfig.StartupSilenceDurationKey),
config.GossipSub.ScoringParameters.ScoringRegistryParameters.StartupSilenceDuration,
"the duration of time, after the node startup, during which the scoring registry remains inactive before penalizing nodes.")
flags.Int(BuildFlagName(gossipsubKey, p2pconfig.ScoreParamsKey, p2pconfig.ScoringRegistryKey, p2pconfig.AppSpecificScoreRegistryKey, p2pconfig.ScoreUpdateWorkerNumKey),
config.GossipSub.ScoringParameters.ScoringRegistryParameters.AppSpecificScore.ScoreUpdateWorkerNum,
"number of workers for the app specific score update worker pool")
Expand Down
16 changes: 13 additions & 3 deletions network/p2p/config/score_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,22 @@ const (
SpamRecordCacheKey = "spam-record-cache"
ScoringRegistryKey = "scoring-registry"
AppSpecificScoreRegistryKey = "app-specific-score"
StartupSilenceDurationKey = "startup-silence-duration"
)

type ScoringRegistryParameters struct {
AppSpecificScore AppSpecificScoreParameters `validate:"required" mapstructure:"app-specific-score"`
SpamRecordCache SpamRecordCacheParameters `validate:"required" mapstructure:"spam-record-cache"`
MisbehaviourPenalties MisbehaviourPenalties `validate:"required" mapstructure:"misbehaviour-penalties"`
// StartupSilenceDuration defines the duration of time, after the node startup,
// during which the scoring registry remains inactive before penalizing nodes.
// Throughout this startup silence period, the application-specific penalty
// for all nodes will be set to 0, and any invalid control message notifications
// will be ignored.
//
// This configuration allows nodes to stabilize and initialize before
// applying penalties or responding processing invalid control message notifications.
StartupSilenceDuration time.Duration `validate:"gt=10m" mapstructure:"startup-silence-duration"`
AppSpecificScore AppSpecificScoreParameters `validate:"required" mapstructure:"app-specific-score"`
SpamRecordCache SpamRecordCacheParameters `validate:"required" mapstructure:"spam-record-cache"`
MisbehaviourPenalties MisbehaviourPenalties `validate:"required" mapstructure:"misbehaviour-penalties"`
}

const (
Expand Down
54 changes: 50 additions & 4 deletions network/p2p/scoring/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/go-playground/validator/v10"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/rs/zerolog"
"go.uber.org/atomic"

"github.com/onflow/flow-go/engine/common/worker"
"github.com/onflow/flow-go/model/flow"
Expand All @@ -25,6 +26,11 @@ import (
"github.com/onflow/flow-go/utils/logging"
)

const (
// NotificationSilencedMsg log messages for silenced notifications
NotificationSilencedMsg = "ignoring invalid control message notification for peer during silence period"
)

type SpamRecordInitFunc func() p2p.GossipSubSpamRecord

// GossipSubAppSpecificScoreRegistry is the registry for the application specific score of peers in the GossipSub protocol.
Expand Down Expand Up @@ -58,6 +64,13 @@ type GossipSubAppSpecificScoreRegistry struct {
// appScoreUpdateWorkerPool is the worker pool for handling the application specific score update of peers in a non-blocking way.
appScoreUpdateWorkerPool *worker.Pool[peer.ID]

// silencePeriodDuration duration that the startup silence period will last, during which nodes will not be penalized
silencePeriodDuration time.Duration
// silencePeriodStartTime time that the silence period begins, this is the time that the registry is started by the node.
silencePeriodStartTime time.Time
// silencePeriodElapsed atomic bool that stores a bool flag which indicates if the silence period is over or not.
silencePeriodElapsed *atomic.Bool

unknownIdentityPenalty float64
minAppSpecificPenalty float64
stakedIdentityReward float64
Expand Down Expand Up @@ -94,6 +107,10 @@ type GossipSubAppSpecificScoreRegistryConfig struct {

NetworkingType network.NetworkingType `validate:"required"`

// ScoringRegistryStartupSilenceDuration defines the duration of time, after the node startup,
// during which the scoring registry remains inactive before penalizing nodes.
ScoringRegistryStartupSilenceDuration time.Duration

AppSpecificScoreParams p2pconfig.ApplicationSpecificScoreParameters `validate:"required"`
}

Expand Down Expand Up @@ -125,6 +142,8 @@ func NewGossipSubAppSpecificScoreRegistry(config *GossipSubAppSpecificScoreRegis
validator: config.Validator,
idProvider: config.IdProvider,
scoreTTL: config.Parameters.ScoreTTL,
silencePeriodDuration: config.ScoringRegistryStartupSilenceDuration,
silencePeriodElapsed: atomic.NewBool(false),
unknownIdentityPenalty: config.AppSpecificScoreParams.UnknownIdentityPenalty,
minAppSpecificPenalty: config.AppSpecificScoreParams.MinAppSpecificPenalty,
stakedIdentityReward: config.AppSpecificScoreParams.StakedIdentityReward,
Expand All @@ -147,11 +166,16 @@ func NewGossipSubAppSpecificScoreRegistry(config *GossipSubAppSpecificScoreRegis
ready()
reg.logger.Info().Msg("subscription validator is ready")
}

<-ctx.Done()
reg.logger.Info().Msg("stopping subscription validator")
<-reg.validator.Done()
reg.logger.Info().Msg("subscription validator stopped")
}).AddWorker(func(parent irrecoverable.SignalerContext, ready component.ReadyFunc) {
if !reg.silencePeriodStartTime.IsZero() {
parent.Throw(fmt.Errorf("gossipsub scoring registry started more than once"))
}
reg.silencePeriodStartTime = time.Now()
ready()
})

for i := 0; i < config.Parameters.ScoreUpdateWorkerNum; i++ {
Expand All @@ -176,6 +200,13 @@ var _ p2p.GossipSubInvCtrlMsgNotifConsumer = (*GossipSubAppSpecificScoreRegistry
func (r *GossipSubAppSpecificScoreRegistry) AppSpecificScoreFunc() func(peer.ID) float64 {
return func(pid peer.ID) float64 {
lg := r.logger.With().Str("remote_peer_id", p2plogging.PeerId(pid)).Logger()

// during startup silence period avoid penalizing nodes
if !r.afterSilencePeriod() {
lg.Trace().Msg("returning 0 app specific score penalty for node during silence period")
return 0
}

appSpecificScore, lastUpdated, ok := r.appScoreCache.Get(pid)
switch {
case !ok:
Expand All @@ -184,7 +215,6 @@ func (r *GossipSubAppSpecificScoreRegistry) AppSpecificScoreFunc() func(peer.ID)
lg.Trace().
Bool("worker_submitted", submitted).
Msg("application specific score not found in cache, submitting worker to update it")

return 0 // in the mean time, return 0, which is a neutral score.
case time.Since(lastUpdated) > r.scoreTTL:
// record found in the cache, but expired; submit a worker to update it.
Expand All @@ -194,14 +224,12 @@ func (r *GossipSubAppSpecificScoreRegistry) AppSpecificScoreFunc() func(peer.ID)
Float64("app_specific_score", appSpecificScore).
Dur("score_ttl", r.scoreTTL).
Msg("application specific score expired, submitting worker to update it")

return appSpecificScore // in the mean time, return the expired score.
default:
// record found in the cache.
r.logger.Trace().
Float64("app_specific_score", appSpecificScore).
Msg("application specific score found in cache")

return appSpecificScore
}
}
Expand Down Expand Up @@ -346,6 +374,12 @@ func (r *GossipSubAppSpecificScoreRegistry) OnInvalidControlMessageNotification(
Str("peer_id", p2plogging.PeerId(notification.PeerID)).
Str("misbehavior_type", notification.MsgType.String()).Logger()

// during startup silence period avoid penalizing nodes, ignore all notifications
if !r.afterSilencePeriod() {
lg.Trace().Msg("ignoring invalid control message notification for peer during silence period")
return
}

record, err := r.spamScoreCache.Adjust(notification.PeerID, func(record p2p.GossipSubSpamRecord) p2p.GossipSubSpamRecord {
penalty := 0.0
switch notification.MsgType {
Expand Down Expand Up @@ -383,6 +417,18 @@ func (r *GossipSubAppSpecificScoreRegistry) OnInvalidControlMessageNotification(
Msg("applied misbehaviour penalty and updated application specific penalty")
}

// afterSilencePeriod returns true if registry silence period is over, false otherwise.
func (r *GossipSubAppSpecificScoreRegistry) afterSilencePeriod() bool {
if !r.silencePeriodElapsed.Load() {
if time.Since(r.silencePeriodStartTime) > r.silencePeriodDuration {
r.silencePeriodElapsed.Store(true)
return true
}
return false
}
return true
}

// DefaultDecayFunction is the default decay function that is used to decay the application specific penalty of a peer.
// It is used if no decay function is provided in the configuration.
// It decays the application specific penalty of a peer if it is negative.
Expand Down
Loading

0 comments on commit aeb29e8

Please sign in to comment.