From 49150ac50f799907ce8ff9316e4c5ec8fc877b8a Mon Sep 17 00:00:00 2001 From: Khalil Claybon Date: Wed, 29 Nov 2023 13:29:29 -0500 Subject: [PATCH 01/16] add gossipsub-scoring-registry-startup-silence-period config --- config/default-config.yml | 7 +++++++ network/netconf/flags.go | 7 +++++++ network/p2p/p2pconf/gossipsub.go | 10 ++++++++++ 3 files changed, 24 insertions(+) diff --git a/config/default-config.yml b/config/default-config.yml index d4f244286fa..31dd2046a36 100644 --- a/config/default-config.yml +++ b/config/default-config.yml @@ -191,6 +191,13 @@ network-config: gossipsub-rpc-metrics-inspector-workers: 1 # The size of the queue used by worker pool for the control message metrics inspector gossipsub-rpc-metrics-inspector-cache-size: 100 + # Defines the duration of time, after the node startup, + # during which the scoring registry remains inactive before penalizing nodes. + # Throughout this startup silence period, the application-specific penalty + # returned for all nodes will be 0, and any invalid control message notifications + # will be ignored. This configuration allows nodes to stabilize and initialize before + # applying penalties or processing invalid control message notifications. + gossipsub-scoring-registry-startup-silence-period: 20m # Application layer spam prevention alsp-spam-record-cache-size: 1000 alsp-spam-report-queue-size: 10_000 diff --git a/network/netconf/flags.go b/network/netconf/flags.go index f40d755fbfc..11ff10f076a 100644 --- a/network/netconf/flags.go +++ b/network/netconf/flags.go @@ -93,6 +93,8 @@ const ( alspSyncEngineBatchRequestBaseProb = "alsp-sync-engine-batch-request-base-prob" alspSyncEngineRangeRequestBaseProb = "alsp-sync-engine-range-request-base-prob" alspSyncEngineSyncRequestProb = "alsp-sync-engine-sync-request-prob" + + scoringRegistryStartupSilencePeriod = "gossipsub-scoring-registry-startup-silence-period" ) func AllFlagNames() []string { @@ -150,6 +152,7 @@ func AllFlagNames() []string { iwantCacheMissCheckSize, rpcMessageMaxSampleSize, rpcMessageErrorThreshold, + scoringRegistryStartupSilencePeriod, } for _, scope := range []string{systemScope, transientScope, protocolScope, peerScope, peerProtocolScope} { @@ -308,6 +311,10 @@ func InitializeNetworkFlags(flags *pflag.FlagSet, config *Config) { gossipSubSubscriptionProviderCacheSize, config.GossipSubConfig.SubscriptionProviderConfig.CacheSize, "size of the cache that keeps the list of topics each peer has subscribed to, recommended size is 10x the number of authorized nodes") + flags.Duration( + scoringRegistryStartupSilencePeriod, + config.GossipSubConfig.ScoringRegistryStartupSilencePeriod, + "the duration of time, after the node startup, during which the scoring registry remains inactive before penalizing nodes.") } // LoadLibP2PResourceManagerFlags loads all CLI flags for the libp2p resource manager configuration on the provided pflag set. diff --git a/network/p2p/p2pconf/gossipsub.go b/network/p2p/p2pconf/gossipsub.go index b8392c3268c..83e96035a91 100644 --- a/network/p2p/p2pconf/gossipsub.go +++ b/network/p2p/p2pconf/gossipsub.go @@ -66,6 +66,16 @@ type GossipSubConfig struct { PeerScoring bool `mapstructure:"gossipsub-peer-scoring-enabled"` SubscriptionProviderConfig SubscriptionProviderParameters `mapstructure:",squash"` + + // ScoringRegistryStartupSilencePeriod defines the duration of time, after the node startup, + // during which the scoring registry remains inactive before penalizing nodes. + // Throughout this startup silence period, the application-specific penalty + // for all nodes will be set to 0, and any invalid control message notifications + // will be ignored. + // + // This configuration allows nodes to stabilize and initialize before + // applying penalties or responding processing invalid control message notifications. + ScoringRegistryStartupSilencePeriod time.Duration `validate:"gt=10m" mapstructure:"gossipsub-scoring-registry-startup-silence-period"` } type SubscriptionProviderParameters struct { From 7550f65d91a769067e7eceb471c577dc4c158531 Mon Sep 17 00:00:00 2001 From: Khalil Claybon Date: Wed, 29 Nov 2023 13:54:18 -0500 Subject: [PATCH 02/16] update scoring registry to not penalize nodes during silence period --- config/default-config.yml | 2 +- network/netconf/flags.go | 8 ++-- network/p2p/p2pconf/gossipsub.go | 4 +- network/p2p/scoring/registry.go | 65 ++++++++++++++++++++++++-------- 4 files changed, 57 insertions(+), 22 deletions(-) diff --git a/config/default-config.yml b/config/default-config.yml index 31dd2046a36..d1cfd932b94 100644 --- a/config/default-config.yml +++ b/config/default-config.yml @@ -197,7 +197,7 @@ network-config: # returned for all nodes will be 0, and any invalid control message notifications # will be ignored. This configuration allows nodes to stabilize and initialize before # applying penalties or processing invalid control message notifications. - gossipsub-scoring-registry-startup-silence-period: 20m + gossipsub-scoring-registry-startup-silence-duration: 20m # Application layer spam prevention alsp-spam-record-cache-size: 1000 alsp-spam-report-queue-size: 10_000 diff --git a/network/netconf/flags.go b/network/netconf/flags.go index 11ff10f076a..af816368ba0 100644 --- a/network/netconf/flags.go +++ b/network/netconf/flags.go @@ -94,7 +94,7 @@ const ( alspSyncEngineRangeRequestBaseProb = "alsp-sync-engine-range-request-base-prob" alspSyncEngineSyncRequestProb = "alsp-sync-engine-sync-request-prob" - scoringRegistryStartupSilencePeriod = "gossipsub-scoring-registry-startup-silence-period" + scoringRegistryStartupSilenceDuration = "gossipsub-scoring-registry-startup-silence-duration" ) func AllFlagNames() []string { @@ -152,7 +152,7 @@ func AllFlagNames() []string { iwantCacheMissCheckSize, rpcMessageMaxSampleSize, rpcMessageErrorThreshold, - scoringRegistryStartupSilencePeriod, + scoringRegistryStartupSilenceDuration, } for _, scope := range []string{systemScope, transientScope, protocolScope, peerScope, peerProtocolScope} { @@ -312,8 +312,8 @@ func InitializeNetworkFlags(flags *pflag.FlagSet, config *Config) { config.GossipSubConfig.SubscriptionProviderConfig.CacheSize, "size of the cache that keeps the list of topics each peer has subscribed to, recommended size is 10x the number of authorized nodes") flags.Duration( - scoringRegistryStartupSilencePeriod, - config.GossipSubConfig.ScoringRegistryStartupSilencePeriod, + scoringRegistryStartupSilenceDuration, + config.GossipSubConfig.ScoringRegistryStartupSilenceDuration, "the duration of time, after the node startup, during which the scoring registry remains inactive before penalizing nodes.") } diff --git a/network/p2p/p2pconf/gossipsub.go b/network/p2p/p2pconf/gossipsub.go index 83e96035a91..257249d2e74 100644 --- a/network/p2p/p2pconf/gossipsub.go +++ b/network/p2p/p2pconf/gossipsub.go @@ -67,7 +67,7 @@ type GossipSubConfig struct { SubscriptionProviderConfig SubscriptionProviderParameters `mapstructure:",squash"` - // ScoringRegistryStartupSilencePeriod defines the duration of time, after the node startup, + // ScoringRegistryStartupSilenceDuration defines the duration of time, after the node startup, // during which the scoring registry remains inactive before penalizing nodes. // Throughout this startup silence period, the application-specific penalty // for all nodes will be set to 0, and any invalid control message notifications @@ -75,7 +75,7 @@ type GossipSubConfig struct { // // This configuration allows nodes to stabilize and initialize before // applying penalties or responding processing invalid control message notifications. - ScoringRegistryStartupSilencePeriod time.Duration `validate:"gt=10m" mapstructure:"gossipsub-scoring-registry-startup-silence-period"` + ScoringRegistryStartupSilenceDuration time.Duration `validate:"gt=10m" mapstructure:"gossipsub-scoring-registry-startup-silence-duration"` } type SubscriptionProviderParameters struct { diff --git a/network/p2p/scoring/registry.go b/network/p2p/scoring/registry.go index 2e3bc4ea374..eb86d3d8e9e 100644 --- a/network/p2p/scoring/registry.go +++ b/network/p2p/scoring/registry.go @@ -98,6 +98,10 @@ type GossipSubAppSpecificScoreRegistry struct { // initial application specific penalty record, used to initialize the penalty cache entry. init func() p2p.GossipSubSpamRecord validator p2p.SubscriptionValidator + // silencePeriodDuration duration that the startup silence period will last, during which nodes will not be penalized + silencePeriodDuration time.Duration + // silencePeriodStartTime time that the silence period begins, this is the time that the registry is started by the node. + silencePeriodStartTime time.Time } // GossipSubAppSpecificScoreRegistryConfig is the configuration for the GossipSubAppSpecificScoreRegistry. @@ -123,6 +127,10 @@ type GossipSubAppSpecificScoreRegistryConfig struct { // CacheFactory is a factory function that returns a new GossipSubSpamRecordCache. It is used to initialize the spamScoreCache. // The cache is used to store the application specific penalty of peers. CacheFactory func() p2p.GossipSubSpamRecordCache + + // ScoringRegistryStartupSilenceDuration defines the duration of time, after the node startup, + // during which the scoring registry remains inactive before penalizing nodes. + ScoringRegistryStartupSilenceDuration time.Duration } // NewGossipSubAppSpecificScoreRegistry returns a new GossipSubAppSpecificScoreRegistry. @@ -135,12 +143,13 @@ type GossipSubAppSpecificScoreRegistryConfig struct { // a new GossipSubAppSpecificScoreRegistry. func NewGossipSubAppSpecificScoreRegistry(config *GossipSubAppSpecificScoreRegistryConfig) *GossipSubAppSpecificScoreRegistry { reg := &GossipSubAppSpecificScoreRegistry{ - logger: config.Logger.With().Str("module", "app_score_registry").Logger(), - spamScoreCache: config.CacheFactory(), - penalty: config.Penalty, - init: config.Init, - validator: config.Validator, - idProvider: config.IdProvider, + logger: config.Logger.With().Str("module", "app_score_registry").Logger(), + spamScoreCache: config.CacheFactory(), + penalty: config.Penalty, + init: config.Init, + validator: config.Validator, + idProvider: config.IdProvider, + silencePeriodDuration: config.ScoringRegistryStartupSilenceDuration, } builder := component.NewComponentManagerBuilder() @@ -168,24 +177,40 @@ func NewGossipSubAppSpecificScoreRegistry(config *GossipSubAppSpecificScoreRegis var _ p2p.GossipSubInvCtrlMsgNotifConsumer = (*GossipSubAppSpecificScoreRegistry)(nil) +// Start sets the silencePeriodStartTime before starting registry components. +func (r *GossipSubAppSpecificScoreRegistry) Start(parent irrecoverable.SignalerContext) { + if !r.silencePeriodStartTime.IsZero() { + parent.Throw(fmt.Errorf("gossipsub scoring registry started more than once")) + } + r.silencePeriodStartTime = time.Now() + r.Component.Start(parent) +} + // AppSpecificScoreFunc returns the application specific penalty function that is called by the GossipSub protocol to determine the application specific penalty of a peer. func (r *GossipSubAppSpecificScoreRegistry) AppSpecificScoreFunc() func(peer.ID) float64 { return func(pid peer.ID) float64 { - appSpecificScore := float64(0) + appSpecificScorePenalty := float64(0) lg := r.logger.With().Str("peer_id", p2plogging.PeerId(pid)).Logger() + + // during startup silence period avoid penalizing nodes + if !r.afterSilencePeriod() { + lg.Debug().Msg("returning 0 app specific score penalty for node during silence period") + return appSpecificScorePenalty + } + // (1) spam penalty: the penalty is applied to the application specific penalty when a peer conducts a spamming misbehaviour. spamRecord, err, spamRecordExists := r.spamScoreCache.Get(pid) if err != nil { // the error is considered fatal as it means the cache is not working properly. // we should not continue with the execution as it may lead to routing attack vulnerability. r.logger.Fatal().Str("peer_id", p2plogging.PeerId(pid)).Err(err).Msg("could not get application specific penalty for peer") - return appSpecificScore // unreachable, but added to avoid proceeding with the execution if log level is changed. + return appSpecificScorePenalty // unreachable, but added to avoid proceeding with the execution if log level is changed. } if spamRecordExists { lg = lg.With().Float64("spam_penalty", spamRecord.Penalty).Logger() - appSpecificScore += spamRecord.Penalty + appSpecificScorePenalty += spamRecord.Penalty } // (2) staking score: for staked peers, a default positive reward is applied only if the peer has no penalty on spamming and subscription. @@ -194,7 +219,7 @@ func (r *GossipSubAppSpecificScoreRegistry) AppSpecificScoreFunc() func(peer.ID) if stakingScore < 0 { lg = lg.With().Float64("staking_penalty", stakingScore).Logger() // staking penalty is applied right away. - appSpecificScore += stakingScore + appSpecificScorePenalty += stakingScore } if stakingScore >= 0 { @@ -205,21 +230,21 @@ func (r *GossipSubAppSpecificScoreRegistry) AppSpecificScoreFunc() func(peer.ID) subscriptionPenalty := r.subscriptionPenalty(pid, flowId, role) lg = lg.With().Float64("subscription_penalty", subscriptionPenalty).Logger() if subscriptionPenalty < 0 { - appSpecificScore += subscriptionPenalty + appSpecificScorePenalty += subscriptionPenalty } } // (4) staking reward: for staked peers, a default positive reward is applied only if the peer has no penalty on spamming and subscription. - if stakingScore > 0 && appSpecificScore == float64(0) { + if stakingScore > 0 && appSpecificScorePenalty == float64(0) { lg = lg.With().Float64("staking_reward", stakingScore).Logger() - appSpecificScore += stakingScore + appSpecificScorePenalty += stakingScore } lg.Trace(). - Float64("total_app_specific_score", appSpecificScore). + Float64("total_app_specific_score", appSpecificScorePenalty). Msg("application specific penalty computed") - return appSpecificScore + return appSpecificScorePenalty } } @@ -280,6 +305,12 @@ func (r *GossipSubAppSpecificScoreRegistry) OnInvalidControlMessageNotification( Str("peer_id", p2plogging.PeerId(notification.PeerID)). Str("misbehavior_type", notification.MsgType.String()).Logger() + // during startup silence period avoid penalizing nodes, ignore all notifications + if !r.afterSilencePeriod() { + lg.Debug().Msg("ignoring invalid control message notification for peer during silence period") + return + } + // try initializing the application specific penalty for the peer if it is not yet initialized. // this is done to avoid the case where the peer is not yet cached and the application specific penalty is not yet initialized. // initialization is successful only if the peer is not yet cached. @@ -325,6 +356,10 @@ func (r *GossipSubAppSpecificScoreRegistry) OnInvalidControlMessageNotification( Msg("applied misbehaviour penalty and updated application specific penalty") } +func (r *GossipSubAppSpecificScoreRegistry) afterSilencePeriod() bool { + return time.Since(r.silencePeriodStartTime) < r.silencePeriodDuration +} + // DefaultDecayFunction is the default decay function that is used to decay the application specific penalty of a peer. // It is used if no decay function is provided in the configuration. // It decays the application specific penalty of a peer if it is negative. From 135f027cc2cfbe157996aab5cc84bc289c738b04 Mon Sep 17 00:00:00 2001 From: Khalil Claybon Date: Wed, 29 Nov 2023 17:55:04 -0500 Subject: [PATCH 03/16] add registry test that ensures silence period behaves as expected --- network/p2p/scoring/registry.go | 3 +- network/p2p/scoring/registry_test.go | 133 ++++++++++++++++++++++++++- network/p2p/test/fixtures.go | 16 ++++ 3 files changed, 148 insertions(+), 4 deletions(-) diff --git a/network/p2p/scoring/registry.go b/network/p2p/scoring/registry.go index eb86d3d8e9e..52a36b745bd 100644 --- a/network/p2p/scoring/registry.go +++ b/network/p2p/scoring/registry.go @@ -356,8 +356,9 @@ func (r *GossipSubAppSpecificScoreRegistry) OnInvalidControlMessageNotification( Msg("applied misbehaviour penalty and updated application specific penalty") } +// afterSilencePeriod returns true if registry silence period is over, false otherwise. func (r *GossipSubAppSpecificScoreRegistry) afterSilencePeriod() bool { - return time.Since(r.silencePeriodStartTime) < r.silencePeriodDuration + return time.Since(r.silencePeriodStartTime) > r.silencePeriodDuration } // DefaultDecayFunction is the default decay function that is used to decay the application specific penalty of a peer. diff --git a/network/p2p/scoring/registry_test.go b/network/p2p/scoring/registry_test.go index e1b3170940b..dfb84e9e86c 100644 --- a/network/p2p/scoring/registry_test.go +++ b/network/p2p/scoring/registry_test.go @@ -1,17 +1,22 @@ package scoring_test import ( + "context" "fmt" "math" + "os" "sync" "testing" "time" "github.com/libp2p/go-libp2p/core/peer" + "github.com/rs/zerolog" "github.com/stretchr/testify/assert" testifymock "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + "go.uber.org/atomic" + "github.com/onflow/flow-go/module/irrecoverable" "github.com/onflow/flow-go/module/metrics" "github.com/onflow/flow-go/module/mock" "github.com/onflow/flow-go/network/p2p" @@ -19,6 +24,7 @@ import ( p2pmsg "github.com/onflow/flow-go/network/p2p/message" mockp2p "github.com/onflow/flow-go/network/p2p/mock" "github.com/onflow/flow-go/network/p2p/scoring" + p2ptest "github.com/onflow/flow-go/network/p2p/test" "github.com/onflow/flow-go/utils/unittest" ) @@ -31,6 +37,10 @@ func TestNoPenaltyRecord(t *testing.T) { t, withStakedIdentity(peerID), withValidSubscriptions(peerID)) + ctx, cancel := context.WithCancel(context.Background()) + signalerCtx := irrecoverable.NewMockSignalerContext(t, ctx) + reg.Start(signalerCtx) + defer stopRegistry(t, cancel, reg) // initially, the spamRecords should not have the peer id. assert.False(t, spamRecords.Has(peerID)) @@ -72,6 +82,10 @@ func testPeerWithSpamRecord(t *testing.T, messageType p2pmsg.ControlMessageType, t, withStakedIdentity(peerID), withValidSubscriptions(peerID)) + ctx, cancel := context.WithCancel(context.Background()) + signalerCtx := irrecoverable.NewMockSignalerContext(t, ctx) + reg.Start(signalerCtx) + defer stopRegistry(t, cancel, reg) // initially, the spamRecords should not have the peer id. assert.False(t, spamRecords.Has(peerID)) @@ -126,6 +140,10 @@ func testSpamRecordWithUnknownIdentity(t *testing.T, messageType p2pmsg.ControlM t, withUnknownIdentity(peerID), withValidSubscriptions(peerID)) + ctx, cancel := context.WithCancel(context.Background()) + signalerCtx := irrecoverable.NewMockSignalerContext(t, ctx) + reg.Start(signalerCtx) + defer stopRegistry(t, cancel, reg) // initially, the spamRecords should not have the peer id. assert.False(t, spamRecords.Has(peerID)) @@ -179,6 +197,10 @@ func testSpamRecordWithSubscriptionPenalty(t *testing.T, messageType p2pmsg.Cont t, withStakedIdentity(peerID), withInvalidSubscriptions(peerID)) + ctx, cancel := context.WithCancel(context.Background()) + signalerCtx := irrecoverable.NewMockSignalerContext(t, ctx) + reg.Start(signalerCtx) + defer stopRegistry(t, cancel, reg) // initially, the spamRecords should not have the peer id. assert.False(t, spamRecords.Has(peerID)) @@ -212,6 +234,10 @@ func TestSpamPenaltyDecaysInCache(t *testing.T) { reg, _ := newGossipSubAppSpecificScoreRegistry(t, withStakedIdentity(peerID), withValidSubscriptions(peerID)) + ctx, cancel := context.WithCancel(context.Background()) + signalerCtx := irrecoverable.NewMockSignalerContext(t, ctx) + reg.Start(signalerCtx) + defer stopRegistry(t, cancel, reg) // report a misbehavior for the peer id. reg.OnInvalidControlMessageNotification(&p2p.InvCtrlMsgNotif{ @@ -282,6 +308,10 @@ func TestSpamPenaltyDecayToZero(t *testing.T) { Penalty: 0, } })) + ctx, cancel := context.WithCancel(context.Background()) + signalerCtx := irrecoverable.NewMockSignalerContext(t, ctx) + reg.Start(signalerCtx) + defer stopRegistry(t, cancel, reg) // report a misbehavior for the peer id. reg.OnInvalidControlMessageNotification(&p2p.InvCtrlMsgNotif{ @@ -328,6 +358,10 @@ func TestPersistingUnknownIdentityPenalty(t *testing.T) { Penalty: 0, } })) + ctx, cancel := context.WithCancel(context.Background()) + signalerCtx := irrecoverable.NewMockSignalerContext(t, ctx) + reg.Start(signalerCtx) + defer stopRegistry(t, cancel, reg) // initially, the app specific score should be the default unknown identity penalty. require.Equal(t, scoring.DefaultUnknownIdentityPenalty, reg.AppSpecificScoreFunc()(peerID)) @@ -386,8 +420,13 @@ func TestPersistingInvalidSubscriptionPenalty(t *testing.T) { } })) + ctx, cancel := context.WithCancel(context.Background()) + signalerCtx := irrecoverable.NewMockSignalerContext(t, ctx) + reg.Start(signalerCtx) + defer stopRegistry(t, cancel, reg) + // initially, the app specific score should be the default invalid subscription penalty. - require.Equal(t, scoring.DefaultUnknownIdentityPenalty, reg.AppSpecificScoreFunc()(peerID)) + require.Equal(t, scoring.DefaultInvalidSubscriptionPenalty, reg.AppSpecificScoreFunc()(peerID)) // report a misbehavior for the peer id. reg.OnInvalidControlMessageNotification(&p2p.InvCtrlMsgNotif{ @@ -413,7 +452,7 @@ func TestPersistingInvalidSubscriptionPenalty(t *testing.T) { require.Eventually(t, func() bool { // when the spam penalty is decayed to zero, the app specific penalty of the node should only contain the default invalid subscription penalty. - return reg.AppSpecificScoreFunc()(peerID) == scoring.DefaultUnknownIdentityPenalty + return reg.AppSpecificScoreFunc()(peerID) == scoring.DefaultInvalidSubscriptionPenalty }, 5*time.Second, 100*time.Millisecond) // the spam penalty should now be zero in spamRecords. @@ -435,6 +474,10 @@ func TestPeerSpamPenaltyClusterPrefixed(t *testing.T) { opts = append(opts, withStakedIdentity(peerID), withValidSubscriptions(peerID)) } reg, spamRecords := newGossipSubAppSpecificScoreRegistry(t, opts...) + ctx, cancel := context.WithCancel(context.Background()) + signalerCtx := irrecoverable.NewMockSignalerContext(t, ctx) + reg.Start(signalerCtx) + defer stopRegistry(t, cancel, reg) for _, peerID := range peerIds { // initially, the spamRecords should not have the peer id. @@ -491,6 +534,82 @@ func TestPeerSpamPenaltyClusterPrefixed(t *testing.T) { } } +// TestScoringRegistrySilencePeriod ensures that the scoring registry does not penalize nodes during the silence period, and +// starts to penalize nodes only after the silence period is over. +func TestScoringRegistrySilencePeriod(t *testing.T) { + peerID := unittest.PeerIdFixture(t) + silenceDuration := 5 * time.Second + silencedNotificationLogs := atomic.NewInt32(0) + hook := zerolog.HookFunc(func(e *zerolog.Event, level zerolog.Level, message string) { + if level == zerolog.DebugLevel { + if message == "ignoring invalid control message notification for peer during silence period" { + silencedNotificationLogs.Inc() + } + } + }) + logger := zerolog.New(os.Stdout).Level(zerolog.DebugLevel).Hook(hook) + reg, spamRecords := newGossipSubAppSpecificScoreRegistry( + t, + withStakedIdentity(peerID), + withInvalidSubscriptions(peerID), // the peer id has an invalid subscription. + func(config *scoring.GossipSubAppSpecificScoreRegistryConfig) { + // we set the scoring registry silence duration 10 seconds + // the peer is not expected to be penalized for the first 5 seconds of the test + // after that an invalid control message notification is processed and the peer + // should be penalized + config.ScoringRegistryStartupSilenceDuration = silenceDuration + // hooked logger will capture the number of logs related to ignored notifications + config.Logger = logger + }) + ctx, cancel := context.WithCancel(context.Background()) + signalerCtx := irrecoverable.NewMockSignalerContext(t, ctx) + defer stopRegistry(t, cancel, reg) + // capture approximate registry start time + reg.Start(signalerCtx) + registryStartTime := time.Now() + expectedNumOfSilencedNotif := 0 + // while we are in the silence period all notifications should be ignored and the + // invalid subscription penalty should not be applied to the app specific score + // we ensure we stay within the silence duration by iterating only up until 1 second + // before silence period is over + for time.Since(registryStartTime) < (silenceDuration - time.Second) { + // report a misbehavior for the peer id. + reg.OnInvalidControlMessageNotification(&p2p.InvCtrlMsgNotif{ + PeerID: peerID, + MsgType: p2pmsg.CtrlMsgGraft, + }) + expectedNumOfSilencedNotif++ + // spam records should not be created during the silence period + _, err, ok := spamRecords.Get(peerID) + assert.False(t, ok) + assert.NoError(t, err) + // initially, the app specific score should be the default invalid subscription penalty. + require.Equal(t, float64(0), reg.AppSpecificScoreFunc()(peerID)) + } + // sleep for one second to ensure silence period is over + time.Sleep(time.Second) + // we expect to have logged a debug message for all notifications ignored. + require.Equal(t, int32(expectedNumOfSilencedNotif), silencedNotificationLogs.Load()) + // after silence period the invalid subscription penalty should be applied to the app specific score + require.Equal(t, scoring.DefaultInvalidSubscriptionPenalty, reg.AppSpecificScoreFunc()(peerID)) + + // after silence period the peer has spam record as well as an unknown identity. Hence, the app specific score should be the spam penalty + // and the staking penalty. + reg.OnInvalidControlMessageNotification(&p2p.InvCtrlMsgNotif{ + PeerID: peerID, + MsgType: p2pmsg.CtrlMsgGraft, + }) + // the penalty should now be applied and spam records created. + record, err, ok := spamRecords.Get(peerID) + assert.True(t, ok) + assert.NoError(t, err) + expectedPenalty := penaltyValueFixtures().Graft + assert.Less(t, math.Abs(expectedPenalty-record.Penalty), 10e-3) + assert.Equal(t, scoring.InitAppScoreRecordState().Decay, record.Decay) // decay should be initialized to the initial state. + score := reg.AppSpecificScoreFunc()(peerID) + assert.Less(t, math.Abs(expectedPenalty+scoring.DefaultInvalidSubscriptionPenalty-score), 10e-3) +} + // withStakedIdentity returns a function that sets the identity provider to return an staked identity for the given peer id. // It is used for testing purposes, and causes the given peer id to benefit from the staked identity reward in GossipSub. func withStakedIdentity(peerId peer.ID) func(cfg *scoring.GossipSubAppSpecificScoreRegistryConfig) { @@ -533,16 +652,19 @@ func withInitFunction(initFunction func() p2p.GossipSubSpamRecord) func(cfg *sco // for the testing purposes. func newGossipSubAppSpecificScoreRegistry(t *testing.T, opts ...func(*scoring.GossipSubAppSpecificScoreRegistryConfig)) (*scoring.GossipSubAppSpecificScoreRegistry, *netcache.GossipSubSpamRecordCache) { cache := netcache.NewGossipSubSpamRecordCache(100, unittest.Logger(), metrics.NewNoopCollector(), scoring.DefaultDecayFunction()) + subscriptionValidator := mockp2p.NewSubscriptionValidator(t) cfg := &scoring.GossipSubAppSpecificScoreRegistryConfig{ Logger: unittest.Logger(), Init: scoring.InitAppScoreRecordState, Penalty: penaltyValueFixtures(), IdProvider: mock.NewIdentityProvider(t), - Validator: mockp2p.NewSubscriptionValidator(t), + Validator: subscriptionValidator, CacheFactory: func() p2p.GossipSubSpamRecordCache { return cache }, + ScoringRegistryStartupSilenceDuration: 0, // turn off silence period by default } + p2ptest.MockScoringRegistrySubscriptionValidatorReadyDoneAware(subscriptionValidator) for _, opt := range opts { opt(cfg) } @@ -581,3 +703,8 @@ func penaltyValueFixture(msgType p2pmsg.ControlMessageType) float64 { return penaltyValues.ClusterPrefixedPenaltyReductionFactor } } + +func stopRegistry(t *testing.T, cancel context.CancelFunc, registry *scoring.GossipSubAppSpecificScoreRegistry) { + cancel() + unittest.RequireCloseBefore(t, registry.Done(), 500*time.Millisecond, "registry did not stop") +} diff --git a/network/p2p/test/fixtures.go b/network/p2p/test/fixtures.go index 4a3c5686f1d..dcc713b615b 100644 --- a/network/p2p/test/fixtures.go +++ b/network/p2p/test/fixtures.go @@ -848,6 +848,22 @@ func MockInspectorNotificationDistributorReadyDoneAware(d *mockp2p.GossipSubInsp }()).Maybe() } +// MockScoringRegistrySubscriptionValidatorReadyDoneAware mocks the Ready and Done methods of the subscription validator to return a channel that is already closed, +// so that the distributor is considered ready and done when the test needs. +func MockScoringRegistrySubscriptionValidatorReadyDoneAware(s *mockp2p.SubscriptionValidator) { + s.On("Start", mockery.Anything).Return().Maybe() + s.On("Ready").Return(func() <-chan struct{} { + ch := make(chan struct{}) + close(ch) + return ch + }()).Maybe() + s.On("Done").Return(func() <-chan struct{} { + ch := make(chan struct{}) + close(ch) + return ch + }()).Maybe() +} + // GossipSubRpcFixtures returns a slice of random message IDs for testing. // Args: // - t: *testing.T instance From e214946bf147d95c3bbd248079e38359180904d5 Mon Sep 17 00:00:00 2001 From: Khalil Claybon Date: Wed, 29 Nov 2023 18:08:35 -0500 Subject: [PATCH 04/16] Update registry.go --- network/p2p/scoring/registry.go | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/network/p2p/scoring/registry.go b/network/p2p/scoring/registry.go index 52a36b745bd..e456b68d9e2 100644 --- a/network/p2p/scoring/registry.go +++ b/network/p2p/scoring/registry.go @@ -189,14 +189,14 @@ func (r *GossipSubAppSpecificScoreRegistry) Start(parent irrecoverable.SignalerC // AppSpecificScoreFunc returns the application specific penalty function that is called by the GossipSub protocol to determine the application specific penalty of a peer. func (r *GossipSubAppSpecificScoreRegistry) AppSpecificScoreFunc() func(peer.ID) float64 { return func(pid peer.ID) float64 { - appSpecificScorePenalty := float64(0) + appSpecificScore := float64(0) lg := r.logger.With().Str("peer_id", p2plogging.PeerId(pid)).Logger() // during startup silence period avoid penalizing nodes if !r.afterSilencePeriod() { lg.Debug().Msg("returning 0 app specific score penalty for node during silence period") - return appSpecificScorePenalty + return appSpecificScore } // (1) spam penalty: the penalty is applied to the application specific penalty when a peer conducts a spamming misbehaviour. @@ -205,12 +205,12 @@ func (r *GossipSubAppSpecificScoreRegistry) AppSpecificScoreFunc() func(peer.ID) // the error is considered fatal as it means the cache is not working properly. // we should not continue with the execution as it may lead to routing attack vulnerability. r.logger.Fatal().Str("peer_id", p2plogging.PeerId(pid)).Err(err).Msg("could not get application specific penalty for peer") - return appSpecificScorePenalty // unreachable, but added to avoid proceeding with the execution if log level is changed. + return appSpecificScore // unreachable, but added to avoid proceeding with the execution if log level is changed. } if spamRecordExists { lg = lg.With().Float64("spam_penalty", spamRecord.Penalty).Logger() - appSpecificScorePenalty += spamRecord.Penalty + appSpecificScore += spamRecord.Penalty } // (2) staking score: for staked peers, a default positive reward is applied only if the peer has no penalty on spamming and subscription. @@ -219,7 +219,7 @@ func (r *GossipSubAppSpecificScoreRegistry) AppSpecificScoreFunc() func(peer.ID) if stakingScore < 0 { lg = lg.With().Float64("staking_penalty", stakingScore).Logger() // staking penalty is applied right away. - appSpecificScorePenalty += stakingScore + appSpecificScore += stakingScore } if stakingScore >= 0 { @@ -230,21 +230,21 @@ func (r *GossipSubAppSpecificScoreRegistry) AppSpecificScoreFunc() func(peer.ID) subscriptionPenalty := r.subscriptionPenalty(pid, flowId, role) lg = lg.With().Float64("subscription_penalty", subscriptionPenalty).Logger() if subscriptionPenalty < 0 { - appSpecificScorePenalty += subscriptionPenalty + appSpecificScore += subscriptionPenalty } } // (4) staking reward: for staked peers, a default positive reward is applied only if the peer has no penalty on spamming and subscription. - if stakingScore > 0 && appSpecificScorePenalty == float64(0) { + if stakingScore > 0 && appSpecificScore == float64(0) { lg = lg.With().Float64("staking_reward", stakingScore).Logger() - appSpecificScorePenalty += stakingScore + appSpecificScore += stakingScore } lg.Trace(). - Float64("total_app_specific_score", appSpecificScorePenalty). + Float64("total_app_specific_score", appSpecificScore). Msg("application specific penalty computed") - return appSpecificScorePenalty + return appSpecificScore } } From ef26bbde6e13db36b721d5ca4f89430d2bcf490b Mon Sep 17 00:00:00 2001 From: Khalil Claybon Date: Mon, 1 Jan 2024 22:18:23 -0500 Subject: [PATCH 05/16] Update network/p2p/scoring/registry.go Co-authored-by: Yahya Hassanzadeh, Ph.D. --- network/p2p/scoring/registry.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/network/p2p/scoring/registry.go b/network/p2p/scoring/registry.go index e148dbcce01..88b3299f712 100644 --- a/network/p2p/scoring/registry.go +++ b/network/p2p/scoring/registry.go @@ -204,7 +204,7 @@ func (r *GossipSubAppSpecificScoreRegistry) AppSpecificScoreFunc() func(peer.ID) // during startup silence period avoid penalizing nodes if !r.afterSilencePeriod() { - lg.Debug().Msg("returning 0 app specific score penalty for node during silence period") + lg.Trace().Msg("returning 0 app specific score penalty for node during silence period") return appSpecificScore } From 793b2fc6342f2fc26ef7bb37c3a57a37a0381dc6 Mon Sep 17 00:00:00 2001 From: Khalil Claybon Date: Mon, 1 Jan 2024 22:18:51 -0500 Subject: [PATCH 06/16] Update network/p2p/scoring/registry.go Co-authored-by: Yahya Hassanzadeh, Ph.D. --- network/p2p/scoring/registry.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/network/p2p/scoring/registry.go b/network/p2p/scoring/registry.go index 88b3299f712..c08985062d8 100644 --- a/network/p2p/scoring/registry.go +++ b/network/p2p/scoring/registry.go @@ -316,7 +316,7 @@ func (r *GossipSubAppSpecificScoreRegistry) OnInvalidControlMessageNotification( // during startup silence period avoid penalizing nodes, ignore all notifications if !r.afterSilencePeriod() { - lg.Debug().Msg("ignoring invalid control message notification for peer during silence period") + lg.Trace().Msg("ignoring invalid control message notification for peer during silence period") return } From ee84e4c39a64c24d050bf2c1db72a82a4df91909 Mon Sep 17 00:00:00 2001 From: Khalil Claybon Date: Mon, 1 Jan 2024 22:19:16 -0500 Subject: [PATCH 07/16] Update network/p2p/scoring/registry_test.go Co-authored-by: Yahya Hassanzadeh, Ph.D. --- network/p2p/scoring/registry_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/network/p2p/scoring/registry_test.go b/network/p2p/scoring/registry_test.go index 6afb0eabf79..e8c736fc9c7 100644 --- a/network/p2p/scoring/registry_test.go +++ b/network/p2p/scoring/registry_test.go @@ -806,5 +806,5 @@ func penaltyValueFixture(msgType p2pmsg.ControlMessageType) float64 { func stopRegistry(t *testing.T, cancel context.CancelFunc, registry *scoring.GossipSubAppSpecificScoreRegistry) { cancel() - unittest.RequireCloseBefore(t, registry.Done(), 500*time.Millisecond, "registry did not stop") + unittest.RequireCloseBefore(t, registry.Done(), 5*time.Second, "registry did not stop") } From 763754e933ba0f4b38e0faecae3bcbc079f4444c Mon Sep 17 00:00:00 2001 From: Khalil Claybon Date: Mon, 1 Jan 2024 22:22:08 -0500 Subject: [PATCH 08/16] move silce startup to worker --- network/p2p/scoring/registry.go | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/network/p2p/scoring/registry.go b/network/p2p/scoring/registry.go index e148dbcce01..f5704e5fafc 100644 --- a/network/p2p/scoring/registry.go +++ b/network/p2p/scoring/registry.go @@ -178,6 +178,11 @@ func NewGossipSubAppSpecificScoreRegistry(config *GossipSubAppSpecificScoreRegis reg.logger.Info().Msg("stopping subscription validator") <-reg.validator.Done() reg.logger.Info().Msg("subscription validator stopped") + }).AddWorker(func(parent irrecoverable.SignalerContext, ready component.ReadyFunc) { + if !reg.silencePeriodStartTime.IsZero() { + parent.Throw(fmt.Errorf("gossipsub scoring registry started more than once")) + } + reg.silencePeriodStartTime = time.Now() }) reg.Component = builder.Build() @@ -186,15 +191,6 @@ func NewGossipSubAppSpecificScoreRegistry(config *GossipSubAppSpecificScoreRegis var _ p2p.GossipSubInvCtrlMsgNotifConsumer = (*GossipSubAppSpecificScoreRegistry)(nil) -// Start sets the silencePeriodStartTime before starting registry components. -func (r *GossipSubAppSpecificScoreRegistry) Start(parent irrecoverable.SignalerContext) { - if !r.silencePeriodStartTime.IsZero() { - parent.Throw(fmt.Errorf("gossipsub scoring registry started more than once")) - } - r.silencePeriodStartTime = time.Now() - r.Component.Start(parent) -} - // AppSpecificScoreFunc returns the application specific penalty function that is called by the GossipSub protocol to determine the application specific penalty of a peer. func (r *GossipSubAppSpecificScoreRegistry) AppSpecificScoreFunc() func(peer.ID) float64 { return func(pid peer.ID) float64 { From 080e024105895a27008d9cbab183ae20cbdb943f Mon Sep 17 00:00:00 2001 From: Khalil Claybon Date: Tue, 2 Jan 2024 09:52:06 -0500 Subject: [PATCH 09/16] store silence period time elapsed in atomic bool --- network/p2p/scoring/registry.go | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/network/p2p/scoring/registry.go b/network/p2p/scoring/registry.go index 1b46a602595..a30f5905641 100644 --- a/network/p2p/scoring/registry.go +++ b/network/p2p/scoring/registry.go @@ -7,6 +7,7 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/rs/zerolog" + "go.uber.org/atomic" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module" @@ -111,6 +112,8 @@ type GossipSubAppSpecificScoreRegistry struct { silencePeriodDuration time.Duration // silencePeriodStartTime time that the silence period begins, this is the time that the registry is started by the node. silencePeriodStartTime time.Time + // silencePeriodElapsed atomic bool that stores a bool flag which indicates if the silence period is over or not. + silencePeriodElapsed *atomic.Bool } // GossipSubAppSpecificScoreRegistryConfig is the configuration for the GossipSubAppSpecificScoreRegistry. @@ -159,6 +162,7 @@ func NewGossipSubAppSpecificScoreRegistry(config *GossipSubAppSpecificScoreRegis validator: config.Validator, idProvider: config.IdProvider, silencePeriodDuration: config.ScoringRegistryStartupSilenceDuration, + silencePeriodElapsed: atomic.NewBool(false), } builder := component.NewComponentManagerBuilder() @@ -364,7 +368,14 @@ func (r *GossipSubAppSpecificScoreRegistry) OnInvalidControlMessageNotification( // afterSilencePeriod returns true if registry silence period is over, false otherwise. func (r *GossipSubAppSpecificScoreRegistry) afterSilencePeriod() bool { - return time.Since(r.silencePeriodStartTime) > r.silencePeriodDuration + if !r.silencePeriodElapsed.Load() { + if time.Since(r.silencePeriodStartTime) > r.silencePeriodDuration { + r.silencePeriodElapsed.Store(true) + return true + } + return false + } + return true } // DefaultDecayFunction is the default decay function that is used to decay the application specific penalty of a peer. From af54b59ab391fdd326d003ec23b237c5e6d97f2c Mon Sep 17 00:00:00 2001 From: Khalil Claybon Date: Tue, 2 Jan 2024 10:24:49 -0500 Subject: [PATCH 10/16] ensure registry is ready --- network/p2p/scoring/registry.go | 2 +- network/p2p/scoring/registry_test.go | 24 ++++++++++++++++++++++-- 2 files changed, 23 insertions(+), 3 deletions(-) diff --git a/network/p2p/scoring/registry.go b/network/p2p/scoring/registry.go index a30f5905641..97e58cae425 100644 --- a/network/p2p/scoring/registry.go +++ b/network/p2p/scoring/registry.go @@ -177,7 +177,6 @@ func NewGossipSubAppSpecificScoreRegistry(config *GossipSubAppSpecificScoreRegis ready() reg.logger.Info().Msg("subscription validator is ready") } - <-ctx.Done() reg.logger.Info().Msg("stopping subscription validator") <-reg.validator.Done() @@ -187,6 +186,7 @@ func NewGossipSubAppSpecificScoreRegistry(config *GossipSubAppSpecificScoreRegis parent.Throw(fmt.Errorf("gossipsub scoring registry started more than once")) } reg.silencePeriodStartTime = time.Now() + ready() }) reg.Component = builder.Build() diff --git a/network/p2p/scoring/registry_test.go b/network/p2p/scoring/registry_test.go index e8c736fc9c7..bb20e2d204e 100644 --- a/network/p2p/scoring/registry_test.go +++ b/network/p2p/scoring/registry_test.go @@ -42,6 +42,8 @@ func TestNoPenaltyRecord(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) signalerCtx := irrecoverable.NewMockSignalerContext(t, ctx) reg.Start(signalerCtx) + unittest.RequireCloseBefore(t, reg.Ready(), 1*time.Second, "registry did not start in time") + defer stopRegistry(t, cancel, reg) // initially, the spamRecords should not have the peer id. @@ -87,6 +89,8 @@ func testPeerWithSpamRecord(t *testing.T, messageType p2pmsg.ControlMessageType, ctx, cancel := context.WithCancel(context.Background()) signalerCtx := irrecoverable.NewMockSignalerContext(t, ctx) reg.Start(signalerCtx) + unittest.RequireCloseBefore(t, reg.Ready(), 1*time.Second, "registry did not start in time") + defer stopRegistry(t, cancel, reg) // initially, the spamRecords should not have the peer id. @@ -145,6 +149,8 @@ func testSpamRecordWithUnknownIdentity(t *testing.T, messageType p2pmsg.ControlM ctx, cancel := context.WithCancel(context.Background()) signalerCtx := irrecoverable.NewMockSignalerContext(t, ctx) reg.Start(signalerCtx) + unittest.RequireCloseBefore(t, reg.Ready(), 1*time.Second, "registry did not start in time") + defer stopRegistry(t, cancel, reg) // initially, the spamRecords should not have the peer id. @@ -201,6 +207,8 @@ func testSpamRecordWithSubscriptionPenalty(t *testing.T, messageType p2pmsg.Cont ctx, cancel := context.WithCancel(context.Background()) signalerCtx := irrecoverable.NewMockSignalerContext(t, ctx) reg.Start(signalerCtx) + unittest.RequireCloseBefore(t, reg.Ready(), 1*time.Second, "registry did not start in time") + defer stopRegistry(t, cancel, reg) // initially, the spamRecords should not have the peer id. @@ -237,6 +245,8 @@ func TestSpamPenaltyDecaysInCache(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) signalerCtx := irrecoverable.NewMockSignalerContext(t, ctx) reg.Start(signalerCtx) + unittest.RequireCloseBefore(t, reg.Ready(), 1*time.Second, "registry did not start in time") + defer stopRegistry(t, cancel, reg) // report a misbehavior for the peer id. @@ -312,6 +322,8 @@ func TestSpamPenaltyDecayToZero(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) signalerCtx := irrecoverable.NewMockSignalerContext(t, ctx) reg.Start(signalerCtx) + unittest.RequireCloseBefore(t, reg.Ready(), 1*time.Second, "registry did not start in time") + defer stopRegistry(t, cancel, reg) // report a misbehavior for the peer id. @@ -362,6 +374,8 @@ func TestPersistingUnknownIdentityPenalty(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) signalerCtx := irrecoverable.NewMockSignalerContext(t, ctx) reg.Start(signalerCtx) + unittest.RequireCloseBefore(t, reg.Ready(), 1*time.Second, "registry did not start in time") + defer stopRegistry(t, cancel, reg) // initially, the app specific score should be the default unknown identity penalty. @@ -424,6 +438,8 @@ func TestPersistingInvalidSubscriptionPenalty(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) signalerCtx := irrecoverable.NewMockSignalerContext(t, ctx) reg.Start(signalerCtx) + unittest.RequireCloseBefore(t, reg.Ready(), 1*time.Second, "registry did not start in time") + defer stopRegistry(t, cancel, reg) // initially, the app specific score should be the default invalid subscription penalty. @@ -563,6 +579,8 @@ func TestPeerSpamPenaltyClusterPrefixed(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) signalerCtx := irrecoverable.NewMockSignalerContext(t, ctx) reg.Start(signalerCtx) + unittest.RequireCloseBefore(t, reg.Ready(), 1*time.Second, "registry did not start in time") + defer stopRegistry(t, cancel, reg) for _, peerID := range peerIds { @@ -627,13 +645,13 @@ func TestScoringRegistrySilencePeriod(t *testing.T) { silenceDuration := 5 * time.Second silencedNotificationLogs := atomic.NewInt32(0) hook := zerolog.HookFunc(func(e *zerolog.Event, level zerolog.Level, message string) { - if level == zerolog.DebugLevel { + if level == zerolog.TraceLevel { if message == "ignoring invalid control message notification for peer during silence period" { silencedNotificationLogs.Inc() } } }) - logger := zerolog.New(os.Stdout).Level(zerolog.DebugLevel).Hook(hook) + logger := zerolog.New(os.Stdout).Level(zerolog.TraceLevel).Hook(hook) reg, spamRecords := newGossipSubAppSpecificScoreRegistry( t, withStakedIdentity(peerID), @@ -652,6 +670,8 @@ func TestScoringRegistrySilencePeriod(t *testing.T) { defer stopRegistry(t, cancel, reg) // capture approximate registry start time reg.Start(signalerCtx) + unittest.RequireCloseBefore(t, reg.Ready(), 1*time.Second, "registry did not start in time") + registryStartTime := time.Now() expectedNumOfSilencedNotif := 0 // while we are in the silence period all notifications should be ignored and the From 6d9f1d2ad6f61c25bae45946e0bb84b291cadd58 Mon Sep 17 00:00:00 2001 From: Khalil Claybon Date: Tue, 2 Jan 2024 10:30:17 -0500 Subject: [PATCH 11/16] use const message for silence log --- network/p2p/scoring/registry.go | 2 ++ network/p2p/scoring/registry_test.go | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/network/p2p/scoring/registry.go b/network/p2p/scoring/registry.go index 97e58cae425..ee5c85d4b19 100644 --- a/network/p2p/scoring/registry.go +++ b/network/p2p/scoring/registry.go @@ -63,6 +63,8 @@ const ( clusterPrefixedPenaltyReductionFactor = .5 // rpcPublishMessageMisbehaviourPenalty is the penalty applied to the application specific penalty when a peer conducts a RpcPublishMessageMisbehaviourPenalty misbehaviour. rpcPublishMessageMisbehaviourPenalty = -10 + // NotificationSilencedMsg log messages for silenced notifications + NotificationSilencedMsg = "ignoring invalid control message notification for peer during silence period" ) type SpamRecordInitFunc func() p2p.GossipSubSpamRecord diff --git a/network/p2p/scoring/registry_test.go b/network/p2p/scoring/registry_test.go index bb20e2d204e..6f8d52ed68e 100644 --- a/network/p2p/scoring/registry_test.go +++ b/network/p2p/scoring/registry_test.go @@ -646,7 +646,7 @@ func TestScoringRegistrySilencePeriod(t *testing.T) { silencedNotificationLogs := atomic.NewInt32(0) hook := zerolog.HookFunc(func(e *zerolog.Event, level zerolog.Level, message string) { if level == zerolog.TraceLevel { - if message == "ignoring invalid control message notification for peer during silence period" { + if message == scoring.NotificationSilencedMsg { silencedNotificationLogs.Inc() } } From b8a2fd234db0be52b9722703b512c77744622bfc Mon Sep 17 00:00:00 2001 From: Khalil Claybon Date: Wed, 3 Jan 2024 15:23:07 -0500 Subject: [PATCH 12/16] add long silence period --- config/default-config.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/config/default-config.yml b/config/default-config.yml index d580a7c1a4b..9af4b5b54b1 100644 --- a/config/default-config.yml +++ b/config/default-config.yml @@ -197,7 +197,7 @@ network-config: # returned for all nodes will be 0, and any invalid control message notifications # will be ignored. This configuration allows nodes to stabilize and initialize before # applying penalties or processing invalid control message notifications. - gossipsub-scoring-registry-startup-silence-duration: 20m + gossipsub-scoring-registry-startup-silence-duration: 1h # Threshold level for penalty. At each evaluation period, when a node's penalty is below this value, the decay rate slows down, ensuring longer decay periods for malicious nodes and quicker decay for honest ones. gossipsub-app-specific-penalty-decay-slowdown-threshold: -99 # This setting adjusts the decay rate when a node's penalty falls below the threshold. The decay rate, ranging between 0 and 1, dictates how quickly penalties decrease: a higher rate results in slower decay. The decay calculation is multiplicative (newPenalty = decayRate * oldPenalty). The reduction factor increases the decay rate, thus decelerating the penalty reduction. For instance, with a 0.01 reduction factor, the decay rate increases by 0.01 at each evaluation interval when the penalty is below the threshold. Consequently, a decay rate of `x` diminishes the penalty to zero more rapidly than a rate of `x+0.01`. From 3e45ea0a3368e9d3a65ea5b664d02ccd97250d4d Mon Sep 17 00:00:00 2001 From: Khalil Claybon Date: Wed, 3 Jan 2024 16:46:42 -0500 Subject: [PATCH 13/16] update registry tests --- network/p2p/scoring/registry.go | 3 -- network/p2p/scoring/registry_test.go | 42 +++++++++++++++++----------- 2 files changed, 25 insertions(+), 20 deletions(-) diff --git a/network/p2p/scoring/registry.go b/network/p2p/scoring/registry.go index d7fbb274b33..293b07f06d0 100644 --- a/network/p2p/scoring/registry.go +++ b/network/p2p/scoring/registry.go @@ -278,7 +278,6 @@ func (r *GossipSubAppSpecificScoreRegistry) AppSpecificScoreFunc() func(peer.ID) lg.Trace(). Bool("worker_submitted", submitted). Msg("application specific score not found in cache, submitting worker to update it") - return 0 // in the mean time, return 0, which is a neutral score. case time.Since(lastUpdated) > r.scoreTTL: // record found in the cache, but expired; submit a worker to update it. @@ -288,14 +287,12 @@ func (r *GossipSubAppSpecificScoreRegistry) AppSpecificScoreFunc() func(peer.ID) Float64("app_specific_score", appSpecificScore). Dur("score_ttl", r.scoreTTL). Msg("application specific score expired, submitting worker to update it") - return appSpecificScore // in the mean time, return the expired score. default: // record found in the cache. r.logger.Trace(). Float64("app_specific_score", appSpecificScore). Msg("application specific score found in cache") - return appSpecificScore } } diff --git a/network/p2p/scoring/registry_test.go b/network/p2p/scoring/registry_test.go index de130ce5f24..b7b1b815e44 100644 --- a/network/p2p/scoring/registry_test.go +++ b/network/p2p/scoring/registry_test.go @@ -868,18 +868,20 @@ func TestScoringRegistrySilencePeriod(t *testing.T) { cfg, err := config.DefaultConfig() require.NoError(t, err) - // we set the scoring registry silence duration 10 seconds - // the peer is not expected to be penalized for the first 5 seconds of the test - // after that an invalid control message notification is processed and the peer - // should be penalized - cfg.NetworkConfig.GossipSub.ScoringParameters.ScoringRegistryStartupSilenceDuration = silenceDuration - + // refresh cached app-specific score every 100 milliseconds to speed up the test. + cfg.NetworkConfig.GossipSub.ScoringParameters.AppSpecificScore.ScoreTTL = 100 * time.Millisecond reg, spamRecords, _ := newGossipSubAppSpecificScoreRegistry(t, cfg.NetworkConfig.GossipSub.ScoringParameters, - withStakedIdentities(peerID), - withValidSubscriptions(peerID), func(config *scoring.GossipSubAppSpecificScoreRegistryConfig) { + withUnknownIdentity(peerID), + withInvalidSubscriptions(peerID), + func(cfg *scoring.GossipSubAppSpecificScoreRegistryConfig) { + // we set the scoring registry silence duration 10 seconds + // the peer is not expected to be penalized for the first 5 seconds of the test + // after that an invalid control message notification is processed and the peer + // should be penalized + cfg.ScoringRegistryStartupSilenceDuration = silenceDuration // hooked logger will capture the number of logs related to ignored notifications - config.Logger = logger + cfg.Logger = logger }) ctx, cancel := context.WithCancel(context.Background()) @@ -909,12 +911,13 @@ func TestScoringRegistrySilencePeriod(t *testing.T) { // initially, the app specific score should be the default invalid subscription penalty. require.Equal(t, float64(0), reg.AppSpecificScoreFunc()(peerID)) } - // sleep for one second to ensure silence period is over - time.Sleep(time.Second) - // we expect to have logged a debug message for all notifications ignored. - require.Equal(t, int32(expectedNumOfSilencedNotif), silencedNotificationLogs.Load()) - // after silence period the invalid subscription penalty should be applied to the app specific score - require.Equal(t, scoring.DefaultInvalidSubscriptionPenalty, reg.AppSpecificScoreFunc()(peerID)) + + require.Eventually(t, func() bool { + // we expect to have logged a debug message for all notifications ignored. + require.Equal(t, int32(expectedNumOfSilencedNotif), silencedNotificationLogs.Load()) + // after silence period the invalid subscription penalty should be applied to the app specific score + return scoring.DefaultInvalidSubscriptionPenalty == reg.AppSpecificScoreFunc()(peerID) + }, 2*time.Second, 200*time.Millisecond) // after silence period the peer has spam record as well as an unknown identity. Hence, the app specific score should be the spam penalty // and the staking penalty. @@ -929,8 +932,13 @@ func TestScoringRegistrySilencePeriod(t *testing.T) { expectedPenalty := penaltyValueFixtures().Graft assert.Less(t, math.Abs(expectedPenalty-record.Penalty), 10e-3) assert.Equal(t, scoring.InitAppScoreRecordState().Decay, record.Decay) // decay should be initialized to the initial state. - score := reg.AppSpecificScoreFunc()(peerID) - assert.Less(t, math.Abs(expectedPenalty+scoring.DefaultInvalidSubscriptionPenalty-score), 10e-3) + + require.Eventually(t, func() bool { + // we expect to have logged a debug message for all notifications ignored. + require.Equal(t, int32(expectedNumOfSilencedNotif), silencedNotificationLogs.Load()) + // after silence period the invalid subscription penalty should be applied to the app specific score + return scoring.DefaultInvalidSubscriptionPenalty+expectedPenalty-reg.AppSpecificScoreFunc()(peerID) < 0.1 + }, 2*time.Second, 200*time.Millisecond) } // withStakedIdentities returns a function that sets the identity provider to return staked identities for the given peer ids. From 042274f451c5949d1599fa1f71cc02a869ae0a05 Mon Sep 17 00:00:00 2001 From: Khalil Claybon Date: Wed, 17 Jan 2024 14:53:33 -0500 Subject: [PATCH 14/16] update flags --- config/default-config.yml | 14 +++++++------- network/netconf/flags.go | 6 +++--- network/p2p/config/gossipsub.go | 16 +++------------- network/p2p/config/score_registry.go | 16 +++++++++++++--- 4 files changed, 26 insertions(+), 26 deletions(-) diff --git a/config/default-config.yml b/config/default-config.yml index 9a4a135d658..620b3c8361c 100644 --- a/config/default-config.yml +++ b/config/default-config.yml @@ -199,13 +199,6 @@ network-config: # Peer scoring is the default value for enabling peer scoring peer-scoring-enabled: true scoring-parameters: - # Defines the duration of time, after the node startup, - # during which the scoring registry remains inactive before penalizing nodes. - # Throughout this startup silence period, the application-specific penalty - # returned for all nodes will be 0, and any invalid control message notifications - # will be ignored. This configuration allows nodes to stabilize and initialize before - # applying penalties or processing invalid control message notifications. - scoring-registry-startup-silence-duration: 1h peer-scoring: internal: # The weight for app-specific scores. @@ -450,6 +443,13 @@ network-config: # peers for their contribution to the network and prioritize them in neighbor selection. staked-identity-reward: 100 scoring-registry: + # Defines the duration of time, after the node startup, + # during which the scoring registry remains inactive before penalizing nodes. + # Throughout this startup silence period, the application-specific penalty + # returned for all nodes will be 0, and any invalid control message notifications + # will be ignored. This configuration allows nodes to stabilize and initialize before + # applying penalties or processing invalid control message notifications. + startup-silence-duration: 1h app-specific-score: # number of workers that asynchronously update the app specific score requests when they are expired. score-update-worker-num: 5 diff --git a/network/netconf/flags.go b/network/netconf/flags.go index ebed17cb1ff..7785866901d 100644 --- a/network/netconf/flags.go +++ b/network/netconf/flags.go @@ -104,7 +104,6 @@ func AllFlagNames() []string { BuildFlagName(gossipsubKey, p2pconfig.SubscriptionProviderKey, p2pconfig.UpdateIntervalKey), BuildFlagName(gossipsubKey, p2pconfig.SubscriptionProviderKey, p2pconfig.CacheSizeKey), - BuildFlagName(gossipsubKey, p2pconfig.ScoreParamsKey, p2pconfig.ScoringRegistryStartupSilenceDurationKey), BuildFlagName(gossipsubKey, p2pconfig.ScoreParamsKey, p2pconfig.PeerScoringKey, p2pconfig.InternalKey, p2pconfig.AppSpecificScoreWeightKey), BuildFlagName(gossipsubKey, p2pconfig.ScoreParamsKey, p2pconfig.PeerScoringKey, p2pconfig.InternalKey, p2pconfig.DecayIntervalKey), BuildFlagName(gossipsubKey, p2pconfig.ScoreParamsKey, p2pconfig.PeerScoringKey, p2pconfig.InternalKey, p2pconfig.DecayToZeroKey), @@ -139,6 +138,7 @@ func AllFlagNames() []string { BuildFlagName(gossipsubKey, p2pconfig.ScoreParamsKey, p2pconfig.PeerScoringKey, p2pconfig.ProtocolKey, p2pconfig.AppSpecificKey, p2pconfig.MaxAppSpecificKey, p2pconfig.RewardKey), BuildFlagName(gossipsubKey, p2pconfig.ScoreParamsKey, p2pconfig.PeerScoringKey, p2pconfig.ProtocolKey, p2pconfig.AppSpecificKey, p2pconfig.StakedIdentityKey, p2pconfig.RewardKey), + BuildFlagName(gossipsubKey, p2pconfig.ScoreParamsKey, p2pconfig.ScoringRegistryKey, p2pconfig.StartupSilenceDurationKey), BuildFlagName(gossipsubKey, p2pconfig.ScoreParamsKey, p2pconfig.ScoringRegistryKey, p2pconfig.AppSpecificScoreRegistryKey, p2pconfig.ScoreUpdateWorkerNumKey), BuildFlagName(gossipsubKey, p2pconfig.ScoreParamsKey, p2pconfig.ScoringRegistryKey, p2pconfig.AppSpecificScoreRegistryKey, p2pconfig.ScoreUpdateRequestQueueSizeKey), BuildFlagName(gossipsubKey, p2pconfig.ScoreParamsKey, p2pconfig.ScoringRegistryKey, p2pconfig.AppSpecificScoreRegistryKey, p2pconfig.ScoreTTLKey), @@ -312,8 +312,8 @@ func InitializeNetworkFlags(flags *pflag.FlagSet, config *Config) { config.GossipSub.ScoringParameters.PeerScoring.Internal.DecayToZero, "the maximum value below which a peer scoring counter is reset to zero") - flags.Duration(BuildFlagName(gossipsubKey, p2pconfig.ScoreParamsKey, p2pconfig.ScoringRegistryStartupSilenceDurationKey), - config.GossipSub.ScoringParameters.ScoringRegistryStartupSilenceDuration, + flags.Duration(BuildFlagName(gossipsubKey, p2pconfig.ScoreParamsKey, p2pconfig.StartupSilenceDurationKey), + config.GossipSub.ScoringParameters.ScoringRegistryParameters.StartupSilenceDuration, "the duration of time, after the node startup, during which the scoring registry remains inactive before penalizing nodes.") flags.Bool(BuildFlagName(gossipsubKey, p2pconfig.ScoreParamsKey, p2pconfig.PeerScoringKey, p2pconfig.InternalKey, p2pconfig.TopicKey, p2pconfig.SkipAtomicValidationKey), diff --git a/network/p2p/config/gossipsub.go b/network/p2p/config/gossipsub.go index 97d91ad962f..31b69dd221b 100644 --- a/network/p2p/config/gossipsub.go +++ b/network/p2p/config/gossipsub.go @@ -79,24 +79,14 @@ type GossipSubParameters struct { } const ( - DecayIntervalKey = "decay-interval" - ScoringRegistryStartupSilenceDurationKey = "scoring-registry-startup-silence-duration" + DecayIntervalKey = "decay-interval" ) // ScoringParameters are the parameters for the score option. // Parameters are "numerical values" that are used to compute or build components that compute the score of a peer in GossipSub system. type ScoringParameters struct { - // ScoringRegistryStartupSilenceDuration defines the duration of time, after the node startup, - // during which the scoring registry remains inactive before penalizing nodes. - // Throughout this startup silence period, the application-specific penalty - // for all nodes will be set to 0, and any invalid control message notifications - // will be ignored. - // - // This configuration allows nodes to stabilize and initialize before - // applying penalties or responding processing invalid control message notifications. - ScoringRegistryStartupSilenceDuration time.Duration `validate:"gt=10m" mapstructure:"scoring-registry-startup-silence-duration"` - PeerScoring PeerScoringParameters `validate:"required" mapstructure:"peer-scoring"` - ScoringRegistryParameters ScoringRegistryParameters `validate:"required" mapstructure:"scoring-registry"` + PeerScoring PeerScoringParameters `validate:"required" mapstructure:"peer-scoring"` + ScoringRegistryParameters ScoringRegistryParameters `validate:"required" mapstructure:"scoring-registry"` } // SubscriptionProviderParameters keys. diff --git a/network/p2p/config/score_registry.go b/network/p2p/config/score_registry.go index 771edb685e8..3788451325a 100644 --- a/network/p2p/config/score_registry.go +++ b/network/p2p/config/score_registry.go @@ -6,12 +6,22 @@ const ( SpamRecordCacheKey = "spam-record-cache" ScoringRegistryKey = "scoring-registry" AppSpecificScoreRegistryKey = "app-specific-score" + StartupSilenceDurationKey = "startup-silence-duration" ) type ScoringRegistryParameters struct { - AppSpecificScore AppSpecificScoreParameters `validate:"required" mapstructure:"app-specific-score"` - SpamRecordCache SpamRecordCacheParameters `validate:"required" mapstructure:"spam-record-cache"` - MisbehaviourPenalties MisbehaviourPenalties `validate:"required" mapstructure:"misbehaviour-penalties"` + // StartupSilenceDuration defines the duration of time, after the node startup, + // during which the scoring registry remains inactive before penalizing nodes. + // Throughout this startup silence period, the application-specific penalty + // for all nodes will be set to 0, and any invalid control message notifications + // will be ignored. + // + // This configuration allows nodes to stabilize and initialize before + // applying penalties or responding processing invalid control message notifications. + StartupSilenceDuration time.Duration `validate:"gt=10m" mapstructure:"startup-silence-duration"` + AppSpecificScore AppSpecificScoreParameters `validate:"required" mapstructure:"app-specific-score"` + SpamRecordCache SpamRecordCacheParameters `validate:"required" mapstructure:"spam-record-cache"` + MisbehaviourPenalties MisbehaviourPenalties `validate:"required" mapstructure:"misbehaviour-penalties"` } const ( From 52db024c2800563e2dde9a446146747497f502db Mon Sep 17 00:00:00 2001 From: Khalil Claybon Date: Wed, 17 Jan 2024 16:53:34 -0500 Subject: [PATCH 15/16] fix lint --- network/netconf/flags.go | 2 -- network/p2p/scoring/registry_test.go | 3 +-- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/network/netconf/flags.go b/network/netconf/flags.go index 7785866901d..a726a43d0d6 100644 --- a/network/netconf/flags.go +++ b/network/netconf/flags.go @@ -43,8 +43,6 @@ const ( alspSyncEngineBatchRequestBaseProb = "alsp-sync-engine-batch-request-base-prob" alspSyncEngineRangeRequestBaseProb = "alsp-sync-engine-range-request-base-prob" alspSyncEngineSyncRequestProb = "alsp-sync-engine-sync-request-prob" - - scoringRegistryStartupSilenceDuration = "gossipsub-scoring-registry-startup-silence-duration" ) func AllFlagNames() []string { diff --git a/network/p2p/scoring/registry_test.go b/network/p2p/scoring/registry_test.go index d52c775e394..ce5a522c17c 100644 --- a/network/p2p/scoring/registry_test.go +++ b/network/p2p/scoring/registry_test.go @@ -3,7 +3,6 @@ package scoring_test import ( "context" "fmt" - "github.com/onflow/flow-go/network" "math" "os" "sync" @@ -18,11 +17,11 @@ import ( "go.uber.org/atomic" "github.com/onflow/flow-go/config" - "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module/irrecoverable" "github.com/onflow/flow-go/module/metrics" "github.com/onflow/flow-go/module/mock" + "github.com/onflow/flow-go/network" "github.com/onflow/flow-go/network/p2p" netcache "github.com/onflow/flow-go/network/p2p/cache" p2pconfig "github.com/onflow/flow-go/network/p2p/config" From bb1d6a3ea4550e70d0109013b7f764832679577a Mon Sep 17 00:00:00 2001 From: Khalil Claybon Date: Wed, 17 Jan 2024 17:00:31 -0500 Subject: [PATCH 16/16] Update flags.go --- network/netconf/flags.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/network/netconf/flags.go b/network/netconf/flags.go index a726a43d0d6..7c8f8ca5d90 100644 --- a/network/netconf/flags.go +++ b/network/netconf/flags.go @@ -310,10 +310,6 @@ func InitializeNetworkFlags(flags *pflag.FlagSet, config *Config) { config.GossipSub.ScoringParameters.PeerScoring.Internal.DecayToZero, "the maximum value below which a peer scoring counter is reset to zero") - flags.Duration(BuildFlagName(gossipsubKey, p2pconfig.ScoreParamsKey, p2pconfig.StartupSilenceDurationKey), - config.GossipSub.ScoringParameters.ScoringRegistryParameters.StartupSilenceDuration, - "the duration of time, after the node startup, during which the scoring registry remains inactive before penalizing nodes.") - flags.Bool(BuildFlagName(gossipsubKey, p2pconfig.ScoreParamsKey, p2pconfig.PeerScoringKey, p2pconfig.InternalKey, p2pconfig.TopicKey, p2pconfig.SkipAtomicValidationKey), config.GossipSub.ScoringParameters.PeerScoring.Internal.TopicParameters.SkipAtomicValidation, "the default value for the skip atomic validation flag for topics") @@ -396,6 +392,9 @@ func InitializeNetworkFlags(flags *pflag.FlagSet, config *Config) { config.GossipSub.ScoringParameters.PeerScoring.Protocol.AppSpecificScore.StakedIdentityReward, "the reward for staking peers") + flags.Duration(BuildFlagName(gossipsubKey, p2pconfig.ScoreParamsKey, p2pconfig.ScoringRegistryKey, p2pconfig.StartupSilenceDurationKey), + config.GossipSub.ScoringParameters.ScoringRegistryParameters.StartupSilenceDuration, + "the duration of time, after the node startup, during which the scoring registry remains inactive before penalizing nodes.") flags.Int(BuildFlagName(gossipsubKey, p2pconfig.ScoreParamsKey, p2pconfig.ScoringRegistryKey, p2pconfig.AppSpecificScoreRegistryKey, p2pconfig.ScoreUpdateWorkerNumKey), config.GossipSub.ScoringParameters.ScoringRegistryParameters.AppSpecificScore.ScoreUpdateWorkerNum, "number of workers for the app specific score update worker pool")