diff --git a/config/default-config.yml b/config/default-config.yml index 04e681ebe1c..620b3c8361c 100644 --- a/config/default-config.yml +++ b/config/default-config.yml @@ -443,6 +443,13 @@ network-config: # peers for their contribution to the network and prioritize them in neighbor selection. staked-identity-reward: 100 scoring-registry: + # Defines the duration of time, after the node startup, + # during which the scoring registry remains inactive before penalizing nodes. + # Throughout this startup silence period, the application-specific penalty + # returned for all nodes will be 0, and any invalid control message notifications + # will be ignored. This configuration allows nodes to stabilize and initialize before + # applying penalties or processing invalid control message notifications. + startup-silence-duration: 1h app-specific-score: # number of workers that asynchronously update the app specific score requests when they are expired. score-update-worker-num: 5 diff --git a/network/netconf/flags.go b/network/netconf/flags.go index b0506f6d99f..7c8f8ca5d90 100644 --- a/network/netconf/flags.go +++ b/network/netconf/flags.go @@ -136,6 +136,7 @@ func AllFlagNames() []string { BuildFlagName(gossipsubKey, p2pconfig.ScoreParamsKey, p2pconfig.PeerScoringKey, p2pconfig.ProtocolKey, p2pconfig.AppSpecificKey, p2pconfig.MaxAppSpecificKey, p2pconfig.RewardKey), BuildFlagName(gossipsubKey, p2pconfig.ScoreParamsKey, p2pconfig.PeerScoringKey, p2pconfig.ProtocolKey, p2pconfig.AppSpecificKey, p2pconfig.StakedIdentityKey, p2pconfig.RewardKey), + BuildFlagName(gossipsubKey, p2pconfig.ScoreParamsKey, p2pconfig.ScoringRegistryKey, p2pconfig.StartupSilenceDurationKey), BuildFlagName(gossipsubKey, p2pconfig.ScoreParamsKey, p2pconfig.ScoringRegistryKey, p2pconfig.AppSpecificScoreRegistryKey, p2pconfig.ScoreUpdateWorkerNumKey), BuildFlagName(gossipsubKey, p2pconfig.ScoreParamsKey, p2pconfig.ScoringRegistryKey, p2pconfig.AppSpecificScoreRegistryKey, p2pconfig.ScoreUpdateRequestQueueSizeKey), BuildFlagName(gossipsubKey, p2pconfig.ScoreParamsKey, p2pconfig.ScoringRegistryKey, p2pconfig.AppSpecificScoreRegistryKey, p2pconfig.ScoreTTLKey), @@ -391,6 +392,9 @@ func InitializeNetworkFlags(flags *pflag.FlagSet, config *Config) { config.GossipSub.ScoringParameters.PeerScoring.Protocol.AppSpecificScore.StakedIdentityReward, "the reward for staking peers") + flags.Duration(BuildFlagName(gossipsubKey, p2pconfig.ScoreParamsKey, p2pconfig.ScoringRegistryKey, p2pconfig.StartupSilenceDurationKey), + config.GossipSub.ScoringParameters.ScoringRegistryParameters.StartupSilenceDuration, + "the duration of time, after the node startup, during which the scoring registry remains inactive before penalizing nodes.") flags.Int(BuildFlagName(gossipsubKey, p2pconfig.ScoreParamsKey, p2pconfig.ScoringRegistryKey, p2pconfig.AppSpecificScoreRegistryKey, p2pconfig.ScoreUpdateWorkerNumKey), config.GossipSub.ScoringParameters.ScoringRegistryParameters.AppSpecificScore.ScoreUpdateWorkerNum, "number of workers for the app specific score update worker pool") diff --git a/network/p2p/config/score_registry.go b/network/p2p/config/score_registry.go index 771edb685e8..3788451325a 100644 --- a/network/p2p/config/score_registry.go +++ b/network/p2p/config/score_registry.go @@ -6,12 +6,22 @@ const ( SpamRecordCacheKey = "spam-record-cache" ScoringRegistryKey = "scoring-registry" AppSpecificScoreRegistryKey = "app-specific-score" + StartupSilenceDurationKey = "startup-silence-duration" ) type ScoringRegistryParameters struct { - AppSpecificScore AppSpecificScoreParameters `validate:"required" mapstructure:"app-specific-score"` - SpamRecordCache SpamRecordCacheParameters `validate:"required" mapstructure:"spam-record-cache"` - MisbehaviourPenalties MisbehaviourPenalties `validate:"required" mapstructure:"misbehaviour-penalties"` + // StartupSilenceDuration defines the duration of time, after the node startup, + // during which the scoring registry remains inactive before penalizing nodes. + // Throughout this startup silence period, the application-specific penalty + // for all nodes will be set to 0, and any invalid control message notifications + // will be ignored. + // + // This configuration allows nodes to stabilize and initialize before + // applying penalties or responding processing invalid control message notifications. + StartupSilenceDuration time.Duration `validate:"gt=10m" mapstructure:"startup-silence-duration"` + AppSpecificScore AppSpecificScoreParameters `validate:"required" mapstructure:"app-specific-score"` + SpamRecordCache SpamRecordCacheParameters `validate:"required" mapstructure:"spam-record-cache"` + MisbehaviourPenalties MisbehaviourPenalties `validate:"required" mapstructure:"misbehaviour-penalties"` } const ( diff --git a/network/p2p/scoring/registry.go b/network/p2p/scoring/registry.go index 653cf2fe48c..4b56de3754e 100644 --- a/network/p2p/scoring/registry.go +++ b/network/p2p/scoring/registry.go @@ -8,6 +8,7 @@ import ( "github.com/go-playground/validator/v10" "github.com/libp2p/go-libp2p/core/peer" "github.com/rs/zerolog" + "go.uber.org/atomic" "github.com/onflow/flow-go/engine/common/worker" "github.com/onflow/flow-go/model/flow" @@ -25,6 +26,11 @@ import ( "github.com/onflow/flow-go/utils/logging" ) +const ( + // NotificationSilencedMsg log messages for silenced notifications + NotificationSilencedMsg = "ignoring invalid control message notification for peer during silence period" +) + type SpamRecordInitFunc func() p2p.GossipSubSpamRecord // GossipSubAppSpecificScoreRegistry is the registry for the application specific score of peers in the GossipSub protocol. @@ -58,6 +64,13 @@ type GossipSubAppSpecificScoreRegistry struct { // appScoreUpdateWorkerPool is the worker pool for handling the application specific score update of peers in a non-blocking way. appScoreUpdateWorkerPool *worker.Pool[peer.ID] + // silencePeriodDuration duration that the startup silence period will last, during which nodes will not be penalized + silencePeriodDuration time.Duration + // silencePeriodStartTime time that the silence period begins, this is the time that the registry is started by the node. + silencePeriodStartTime time.Time + // silencePeriodElapsed atomic bool that stores a bool flag which indicates if the silence period is over or not. + silencePeriodElapsed *atomic.Bool + unknownIdentityPenalty float64 minAppSpecificPenalty float64 stakedIdentityReward float64 @@ -94,6 +107,10 @@ type GossipSubAppSpecificScoreRegistryConfig struct { NetworkingType network.NetworkingType `validate:"required"` + // ScoringRegistryStartupSilenceDuration defines the duration of time, after the node startup, + // during which the scoring registry remains inactive before penalizing nodes. + ScoringRegistryStartupSilenceDuration time.Duration + AppSpecificScoreParams p2pconfig.ApplicationSpecificScoreParameters `validate:"required"` } @@ -125,6 +142,8 @@ func NewGossipSubAppSpecificScoreRegistry(config *GossipSubAppSpecificScoreRegis validator: config.Validator, idProvider: config.IdProvider, scoreTTL: config.Parameters.ScoreTTL, + silencePeriodDuration: config.ScoringRegistryStartupSilenceDuration, + silencePeriodElapsed: atomic.NewBool(false), unknownIdentityPenalty: config.AppSpecificScoreParams.UnknownIdentityPenalty, minAppSpecificPenalty: config.AppSpecificScoreParams.MinAppSpecificPenalty, stakedIdentityReward: config.AppSpecificScoreParams.StakedIdentityReward, @@ -147,11 +166,16 @@ func NewGossipSubAppSpecificScoreRegistry(config *GossipSubAppSpecificScoreRegis ready() reg.logger.Info().Msg("subscription validator is ready") } - <-ctx.Done() reg.logger.Info().Msg("stopping subscription validator") <-reg.validator.Done() reg.logger.Info().Msg("subscription validator stopped") + }).AddWorker(func(parent irrecoverable.SignalerContext, ready component.ReadyFunc) { + if !reg.silencePeriodStartTime.IsZero() { + parent.Throw(fmt.Errorf("gossipsub scoring registry started more than once")) + } + reg.silencePeriodStartTime = time.Now() + ready() }) for i := 0; i < config.Parameters.ScoreUpdateWorkerNum; i++ { @@ -176,6 +200,13 @@ var _ p2p.GossipSubInvCtrlMsgNotifConsumer = (*GossipSubAppSpecificScoreRegistry func (r *GossipSubAppSpecificScoreRegistry) AppSpecificScoreFunc() func(peer.ID) float64 { return func(pid peer.ID) float64 { lg := r.logger.With().Str("remote_peer_id", p2plogging.PeerId(pid)).Logger() + + // during startup silence period avoid penalizing nodes + if !r.afterSilencePeriod() { + lg.Trace().Msg("returning 0 app specific score penalty for node during silence period") + return 0 + } + appSpecificScore, lastUpdated, ok := r.appScoreCache.Get(pid) switch { case !ok: @@ -184,7 +215,6 @@ func (r *GossipSubAppSpecificScoreRegistry) AppSpecificScoreFunc() func(peer.ID) lg.Trace(). Bool("worker_submitted", submitted). Msg("application specific score not found in cache, submitting worker to update it") - return 0 // in the mean time, return 0, which is a neutral score. case time.Since(lastUpdated) > r.scoreTTL: // record found in the cache, but expired; submit a worker to update it. @@ -194,14 +224,12 @@ func (r *GossipSubAppSpecificScoreRegistry) AppSpecificScoreFunc() func(peer.ID) Float64("app_specific_score", appSpecificScore). Dur("score_ttl", r.scoreTTL). Msg("application specific score expired, submitting worker to update it") - return appSpecificScore // in the mean time, return the expired score. default: // record found in the cache. r.logger.Trace(). Float64("app_specific_score", appSpecificScore). Msg("application specific score found in cache") - return appSpecificScore } } @@ -346,6 +374,12 @@ func (r *GossipSubAppSpecificScoreRegistry) OnInvalidControlMessageNotification( Str("peer_id", p2plogging.PeerId(notification.PeerID)). Str("misbehavior_type", notification.MsgType.String()).Logger() + // during startup silence period avoid penalizing nodes, ignore all notifications + if !r.afterSilencePeriod() { + lg.Trace().Msg("ignoring invalid control message notification for peer during silence period") + return + } + record, err := r.spamScoreCache.Adjust(notification.PeerID, func(record p2p.GossipSubSpamRecord) p2p.GossipSubSpamRecord { penalty := 0.0 switch notification.MsgType { @@ -383,6 +417,18 @@ func (r *GossipSubAppSpecificScoreRegistry) OnInvalidControlMessageNotification( Msg("applied misbehaviour penalty and updated application specific penalty") } +// afterSilencePeriod returns true if registry silence period is over, false otherwise. +func (r *GossipSubAppSpecificScoreRegistry) afterSilencePeriod() bool { + if !r.silencePeriodElapsed.Load() { + if time.Since(r.silencePeriodStartTime) > r.silencePeriodDuration { + r.silencePeriodElapsed.Store(true) + return true + } + return false + } + return true +} + // DefaultDecayFunction is the default decay function that is used to decay the application specific penalty of a peer. // It is used if no decay function is provided in the configuration. // It decays the application specific penalty of a peer if it is negative. diff --git a/network/p2p/scoring/registry_test.go b/network/p2p/scoring/registry_test.go index 18650658eb2..ce5a522c17c 100644 --- a/network/p2p/scoring/registry_test.go +++ b/network/p2p/scoring/registry_test.go @@ -4,14 +4,17 @@ import ( "context" "fmt" "math" + "os" "sync" "testing" "time" "github.com/libp2p/go-libp2p/core/peer" + "github.com/rs/zerolog" "github.com/stretchr/testify/assert" testifymock "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + "go.uber.org/atomic" "github.com/onflow/flow-go/config" "github.com/onflow/flow-go/model/flow" @@ -46,12 +49,12 @@ func TestScoreRegistry_FreshStart(t *testing.T) { scoring.InitAppScoreRecordStateFunc(cfg.NetworkConfig.GossipSub.ScoringParameters.ScoringRegistryParameters.SpamRecordCache.Decay.MaximumSpamPenaltyDecayFactor), withStakedIdentities(peerID), withValidSubscriptions(peerID)) - - // starts the registry. ctx, cancel := context.WithCancel(context.Background()) signalerCtx := irrecoverable.NewMockSignalerContext(t, ctx) reg.Start(signalerCtx) - unittest.RequireCloseBefore(t, reg.Ready(), 1*time.Second, "failed to start GossipSubAppSpecificScoreRegistry") + unittest.RequireCloseBefore(t, reg.Ready(), 1*time.Second, "registry did not start in time") + + defer stopRegistry(t, cancel, reg) // initially, the spamRecords should not have the peer id, and there should be no app-specific score in the cache. require.False(t, spamRecords.Has(peerID)) @@ -133,12 +136,12 @@ func testScoreRegistryPeerWithSpamRecord(t *testing.T, messageType p2pmsg.Contro scoring.InitAppScoreRecordStateFunc(cfg.NetworkConfig.GossipSub.ScoringParameters.ScoringRegistryParameters.SpamRecordCache.Decay.MaximumSpamPenaltyDecayFactor), withStakedIdentities(peerID), withValidSubscriptions(peerID)) - - // starts the registry. ctx, cancel := context.WithCancel(context.Background()) signalerCtx := irrecoverable.NewMockSignalerContext(t, ctx) reg.Start(signalerCtx) - unittest.RequireCloseBefore(t, reg.Ready(), 1*time.Second, "failed to start GossipSubAppSpecificScoreRegistry") + unittest.RequireCloseBefore(t, reg.Ready(), 1*time.Second, "registry did not start in time") + + defer stopRegistry(t, cancel, reg) // initially, the spamRecords should not have the peer id; also the app specific score record should not be in the cache. require.False(t, spamRecords.Has(peerID)) @@ -237,12 +240,12 @@ func testScoreRegistrySpamRecordWithUnknownIdentity(t *testing.T, messageType p2 scoring.InitAppScoreRecordStateFunc(cfg.NetworkConfig.GossipSub.ScoringParameters.ScoringRegistryParameters.SpamRecordCache.Decay.MaximumSpamPenaltyDecayFactor), withUnknownIdentity(peerID), withValidSubscriptions(peerID)) - - // starts the registry. ctx, cancel := context.WithCancel(context.Background()) signalerCtx := irrecoverable.NewMockSignalerContext(t, ctx) reg.Start(signalerCtx) - unittest.RequireCloseBefore(t, reg.Ready(), 1*time.Second, "failed to start GossipSubAppSpecificScoreRegistry") + unittest.RequireCloseBefore(t, reg.Ready(), 1*time.Second, "registry did not start in time") + + defer stopRegistry(t, cancel, reg) // initially, the spamRecords should not have the peer id; also the app specific score record should not be in the cache. require.False(t, spamRecords.Has(peerID)) @@ -344,12 +347,12 @@ func testScoreRegistrySpamRecordWithSubscriptionPenalty(t *testing.T, messageTyp scoring.InitAppScoreRecordStateFunc(cfg.NetworkConfig.GossipSub.ScoringParameters.ScoringRegistryParameters.SpamRecordCache.Decay.MaximumSpamPenaltyDecayFactor), withStakedIdentities(peerID), withInvalidSubscriptions(peerID)) - - // starts the registry. ctx, cancel := context.WithCancel(context.Background()) signalerCtx := irrecoverable.NewMockSignalerContext(t, ctx) reg.Start(signalerCtx) - unittest.RequireCloseBefore(t, reg.Ready(), 1*time.Second, "failed to start GossipSubAppSpecificScoreRegistry") + unittest.RequireCloseBefore(t, reg.Ready(), 1*time.Second, "registry did not start in time") + + defer stopRegistry(t, cancel, reg) // initially, the spamRecords should not have the peer id; also the app specific score record should not be in the cache. require.False(t, spamRecords.Has(peerID)) @@ -417,12 +420,12 @@ func TestScoreRegistry_SpamPenaltyDecaysInCache(t *testing.T) { scoring.InitAppScoreRecordStateFunc(cfg.NetworkConfig.GossipSub.ScoringParameters.ScoringRegistryParameters.SpamRecordCache.Decay.MaximumSpamPenaltyDecayFactor), withStakedIdentities(peerID), withValidSubscriptions(peerID)) - - // starts the registry. ctx, cancel := context.WithCancel(context.Background()) signalerCtx := irrecoverable.NewMockSignalerContext(t, ctx) reg.Start(signalerCtx) - unittest.RequireCloseBefore(t, reg.Ready(), 1*time.Second, "failed to start GossipSubAppSpecificScoreRegistry") + unittest.RequireCloseBefore(t, reg.Ready(), 1*time.Second, "registry did not start in time") + + defer stopRegistry(t, cancel, reg) // report a misbehavior for the peer id. reg.OnInvalidControlMessageNotification(&p2p.InvCtrlMsgNotif{ @@ -510,7 +513,9 @@ func TestScoreRegistry_SpamPenaltyDecayToZero(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) signalerCtx := irrecoverable.NewMockSignalerContext(t, ctx) reg.Start(signalerCtx) - unittest.RequireCloseBefore(t, reg.Ready(), 1*time.Second, "failed to start GossipSubAppSpecificScoreRegistry") + unittest.RequireCloseBefore(t, reg.Ready(), 1*time.Second, "registry did not start in time") + + defer stopRegistry(t, cancel, reg) scoreOptParameters := cfg.NetworkConfig.GossipSub.ScoringParameters.PeerScoring.Protocol.AppSpecificScore @@ -576,7 +581,9 @@ func TestScoreRegistry_PersistingUnknownIdentityPenalty(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) signalerCtx := irrecoverable.NewMockSignalerContext(t, ctx) reg.Start(signalerCtx) - unittest.RequireCloseBefore(t, reg.Ready(), 1*time.Second, "failed to start GossipSubAppSpecificScoreRegistry") + unittest.RequireCloseBefore(t, reg.Ready(), 1*time.Second, "registry did not start in time") + + defer stopRegistry(t, cancel, reg) scoreOptParameters := cfg.NetworkConfig.GossipSub.ScoringParameters.PeerScoring.Protocol.AppSpecificScore @@ -888,6 +895,100 @@ func TestPeerSpamPenaltyClusterPrefixed(t *testing.T) { unittest.RequireCloseBefore(t, reg.Done(), 1*time.Second, "failed to stop GossipSubAppSpecificScoreRegistry") } +// TestScoringRegistrySilencePeriod ensures that the scoring registry does not penalize nodes during the silence period, and +// starts to penalize nodes only after the silence period is over. +func TestScoringRegistrySilencePeriod(t *testing.T) { + peerID := unittest.PeerIdFixture(t) + silenceDuration := 5 * time.Second + silencedNotificationLogs := atomic.NewInt32(0) + hook := zerolog.HookFunc(func(e *zerolog.Event, level zerolog.Level, message string) { + if level == zerolog.TraceLevel { + if message == scoring.NotificationSilencedMsg { + silencedNotificationLogs.Inc() + } + } + }) + logger := zerolog.New(os.Stdout).Level(zerolog.TraceLevel).Hook(hook) + + cfg, err := config.DefaultConfig() + require.NoError(t, err) + // refresh cached app-specific score every 100 milliseconds to speed up the test. + cfg.NetworkConfig.GossipSub.ScoringParameters.ScoringRegistryParameters.AppSpecificScore.ScoreTTL = 100 * time.Millisecond + maximumSpamPenaltyDecayFactor := cfg.NetworkConfig.GossipSub.ScoringParameters.ScoringRegistryParameters.SpamRecordCache.Decay.MaximumSpamPenaltyDecayFactor + reg, spamRecords, _ := newGossipSubAppSpecificScoreRegistry(t, + cfg.NetworkConfig.GossipSub.ScoringParameters, + scoring.InitAppScoreRecordStateFunc(maximumSpamPenaltyDecayFactor), + withUnknownIdentity(peerID), + withInvalidSubscriptions(peerID), + func(cfg *scoring.GossipSubAppSpecificScoreRegistryConfig) { + // we set the scoring registry silence duration 10 seconds + // the peer is not expected to be penalized for the first 5 seconds of the test + // after that an invalid control message notification is processed and the peer + // should be penalized + cfg.ScoringRegistryStartupSilenceDuration = silenceDuration + // hooked logger will capture the number of logs related to ignored notifications + cfg.Logger = logger + }) + + ctx, cancel := context.WithCancel(context.Background()) + signalerCtx := irrecoverable.NewMockSignalerContext(t, ctx) + defer stopRegistry(t, cancel, reg) + // capture approximate registry start time + reg.Start(signalerCtx) + unittest.RequireCloseBefore(t, reg.Ready(), 1*time.Second, "registry did not start in time") + + registryStartTime := time.Now() + expectedNumOfSilencedNotif := 0 + // while we are in the silence period all notifications should be ignored and the + // invalid subscription penalty should not be applied to the app specific score + // we ensure we stay within the silence duration by iterating only up until 1 second + // before silence period is over + for time.Since(registryStartTime) < (silenceDuration - time.Second) { + // report a misbehavior for the peer id. + reg.OnInvalidControlMessageNotification(&p2p.InvCtrlMsgNotif{ + PeerID: peerID, + MsgType: p2pmsg.CtrlMsgGraft, + }) + expectedNumOfSilencedNotif++ + // spam records should not be created during the silence period + _, err, ok := spamRecords.Get(peerID) + assert.False(t, ok) + assert.NoError(t, err) + // initially, the app specific score should be the default invalid subscription penalty. + require.Equal(t, float64(0), reg.AppSpecificScoreFunc()(peerID)) + } + + invalidSubscriptionPenalty := cfg.NetworkConfig.GossipSub.ScoringParameters.PeerScoring.Protocol.AppSpecificScore.InvalidSubscriptionPenalty + + require.Eventually(t, func() bool { + // we expect to have logged a debug message for all notifications ignored. + require.Equal(t, int32(expectedNumOfSilencedNotif), silencedNotificationLogs.Load()) + // after silence period the invalid subscription penalty should be applied to the app specific score + return invalidSubscriptionPenalty == reg.AppSpecificScoreFunc()(peerID) + }, 2*time.Second, 200*time.Millisecond) + + // after silence period the peer has spam record as well as an unknown identity. Hence, the app specific score should be the spam penalty + // and the staking penalty. + reg.OnInvalidControlMessageNotification(&p2p.InvCtrlMsgNotif{ + PeerID: peerID, + MsgType: p2pmsg.CtrlMsgGraft, + }) + // the penalty should now be applied and spam records created. + record, err, ok := spamRecords.Get(peerID) + assert.True(t, ok) + assert.NoError(t, err) + expectedPenalty := penaltyValueFixtures().GraftMisbehaviour + assert.Less(t, math.Abs(expectedPenalty-record.Penalty), 10e-3) + assert.Equal(t, scoring.InitAppScoreRecordStateFunc(maximumSpamPenaltyDecayFactor)().Decay, record.Decay) // decay should be initialized to the initial state. + + require.Eventually(t, func() bool { + // we expect to have logged a debug message for all notifications ignored. + require.Equal(t, int32(expectedNumOfSilencedNotif), silencedNotificationLogs.Load()) + // after silence period the invalid subscription penalty should be applied to the app specific score + return invalidSubscriptionPenalty+expectedPenalty-reg.AppSpecificScoreFunc()(peerID) < 0.1 + }, 2*time.Second, 200*time.Millisecond) +} + // withStakedIdentities returns a function that sets the identity provider to return staked identities for the given peer ids. // It is used for testing purposes, and causes the given peer id to benefit from the staked identity reward in GossipSub. func withStakedIdentities(peerIds ...peer.ID) func(cfg *scoring.GossipSubAppSpecificScoreRegistryConfig) { @@ -1002,10 +1103,11 @@ func newGossipSubAppSpecificScoreRegistry(t *testing.T, SpamRecordCacheFactory: func() p2p.GossipSubSpamRecordCache { return cache }, - Parameters: params.ScoringRegistryParameters.AppSpecificScore, - HeroCacheMetricsFactory: metrics.NewNoopHeroCacheMetricsFactory(), - NetworkingType: network.PrivateNetwork, - AppSpecificScoreParams: params.PeerScoring.Protocol.AppSpecificScore, + Parameters: params.ScoringRegistryParameters.AppSpecificScore, + HeroCacheMetricsFactory: metrics.NewNoopHeroCacheMetricsFactory(), + NetworkingType: network.PrivateNetwork, + AppSpecificScoreParams: params.PeerScoring.Protocol.AppSpecificScore, + ScoringRegistryStartupSilenceDuration: 0, // turn off silence period by default } for _, opt := range opts { opt(cfg) @@ -1049,3 +1151,8 @@ func penaltyValueFixture(msgType p2pmsg.ControlMessageType) float64 { return penaltyValues.ClusterPrefixedReductionFactor } } + +func stopRegistry(t *testing.T, cancel context.CancelFunc, registry *scoring.GossipSubAppSpecificScoreRegistry) { + cancel() + unittest.RequireCloseBefore(t, registry.Done(), 5*time.Second, "registry did not stop") +} diff --git a/network/p2p/test/fixtures.go b/network/p2p/test/fixtures.go index 8b8f33067d4..50fb4cb80ca 100644 --- a/network/p2p/test/fixtures.go +++ b/network/p2p/test/fixtures.go @@ -825,6 +825,22 @@ func MockInspectorNotificationDistributorReadyDoneAware(d *mockp2p.GossipSubInsp }()).Maybe() } +// MockScoringRegistrySubscriptionValidatorReadyDoneAware mocks the Ready and Done methods of the subscription validator to return a channel that is already closed, +// so that the distributor is considered ready and done when the test needs. +func MockScoringRegistrySubscriptionValidatorReadyDoneAware(s *mockp2p.SubscriptionValidator) { + s.On("Start", mockery.Anything).Return().Maybe() + s.On("Ready").Return(func() <-chan struct{} { + ch := make(chan struct{}) + close(ch) + return ch + }()).Maybe() + s.On("Done").Return(func() <-chan struct{} { + ch := make(chan struct{}) + close(ch) + return ch + }()).Maybe() +} + // GossipSubRpcFixtures returns a slice of random message IDs for testing. // Args: // - t: *testing.T instance