Skip to content

Commit

Permalink
Merge pull request onflow#4574 from onflow/khalil/6472-iwant-flooding…
Browse files Browse the repository at this point in the history
…-detection

[Networking] GossipSub iWant Flooding Mitigation
  • Loading branch information
gomisha authored Aug 29, 2023
2 parents a8f28b9 + a4fcdb5 commit 5f9c8a0
Show file tree
Hide file tree
Showing 29 changed files with 802 additions and 178 deletions.
3 changes: 2 additions & 1 deletion cmd/access/node_builder/access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1361,7 +1361,8 @@ func (builder *FlowAccessNodeBuilder) initPublicLibp2pNode(networkKey crypto.Pri
&p2p.DisallowListCacheConfig{
MaxSize: builder.FlowConfig.NetworkConfig.DisallowListNotificationCacheSize,
Metrics: metrics.DisallowListCacheMetricsFactory(builder.HeroCacheMetricsFactory(), network.PublicNetwork),
}).
},
meshTracer).
SetBasicResolver(builder.Resolver).
SetSubscriptionFilter(
subscription.NewRoleBasedFilter(
Expand Down
3 changes: 2 additions & 1 deletion cmd/observer/node_builder/observer_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -747,7 +747,8 @@ func (builder *ObserverServiceBuilder) initPublicLibp2pNode(networkKey crypto.Pr
&p2p.DisallowListCacheConfig{
MaxSize: builder.FlowConfig.NetworkConfig.DisallowListNotificationCacheSize,
Metrics: metrics.DisallowListCacheMetricsFactory(builder.HeroCacheMetricsFactory(), network.PublicNetwork),
}).
},
meshTracer).
SetSubscriptionFilter(
subscription.NewRoleBasedFilter(
subscription.UnstakedRole, builder.IdentityProvider,
Expand Down
7 changes: 7 additions & 0 deletions config/default-config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,13 @@ network-config:
ihave-async-inspection-sample-size-percentage: .10
# Max number of ihave messages in a sample to be inspected
ihave-max-sample-size: 100

# Max number of iwant messages in a sample to be inspected
gossipsub-rpc-iwant-max-sample-size: 1_000_000
# The allowed threshold of iWant messages received without a corresponding tracked iHave message that was sent
gossipsub-rpc-iwant-cache-miss-threshold: .5
# The max allowed duplicate message IDs in a single iWant control message
gossipsub-rpc-iwant-duplicate-message-id-threshold: .15
# RPC metrics observer inspector configs
# The number of metrics inspector pool workers
gossipsub-rpc-metrics-inspector-workers: 1
Expand Down
3 changes: 2 additions & 1 deletion follower/follower_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -635,7 +635,8 @@ func (builder *FollowerServiceBuilder) initPublicLibp2pNode(networkKey crypto.Pr
&p2p.DisallowListCacheConfig{
MaxSize: builder.FlowConfig.NetworkConfig.DisallowListNotificationCacheSize,
Metrics: metrics.DisallowListCacheMetricsFactory(builder.HeroCacheMetricsFactory(), network.PublicNetwork),
}).
},
meshTracer).
SetSubscriptionFilter(
subscription.NewRoleBasedFilter(
subscription.UnstakedRole, builder.IdentityProvider,
Expand Down
17 changes: 13 additions & 4 deletions insecure/corruptlibp2p/fixtures.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,21 @@ func WithIHave(msgCount, msgSize int, topicId string) GossipSubCtrlOption {
}

// WithIWant adds iWant control messages of the given size and number to the control message.
func WithIWant(msgCount, msgSize int) GossipSubCtrlOption {
// The message IDs are generated randomly.
// Args:
//
// msgCount: number of iWant messages to add.
// msgIdsPerIWant: number of message IDs to add to each iWant message.
//
// Returns:
// A GossipSubCtrlOption that adds iWant messages to the control message.
// Example: WithIWant(2, 3) will add 2 iWant messages, each with 3 message IDs.
func WithIWant(iWantCount int, msgIdsPerIWant int) GossipSubCtrlOption {
return func(msg *pubsubpb.ControlMessage) {
iWants := make([]*pubsubpb.ControlIWant, msgCount)
for i := 0; i < msgCount; i++ {
iWants := make([]*pubsubpb.ControlIWant, iWantCount)
for i := 0; i < iWantCount; i++ {
iWants[i] = &pubsubpb.ControlIWant{
MessageIDs: GossipSubMessageIdsFixture(msgSize),
MessageIDs: GossipSubMessageIdsFixture(msgIdsPerIWant),
}
}
msg.Iwant = iWants
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func TestMetricsInspector_ObserveRPC(t *testing.T) {
nodes := []p2p.LibP2PNode{victimNode, spammer.SpammerNode}
startNodesAndEnsureConnected(t, signalerCtx, nodes, sporkID)
spammer.Start(t)
defer stopNodesAndInspector(t, cancel, nodes, metricsInspector)
defer stopTestComponents(t, cancel, nodes, metricsInspector)
// prepare to spam - generate control messages
ctlMsgs := spammer.GenerateCtlMessages(controlMessageCount,
corruptlibp2p.WithGraft(messageCount, channels.PushBlocks.String()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,24 @@ package rpc_inspector

import (
"context"
"fmt"
"math/rand"
"testing"
"time"

mockery "github.com/stretchr/testify/mock"

"github.com/onflow/flow-go/config"
"github.com/onflow/flow-go/insecure/corruptlibp2p"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/module/irrecoverable"
"github.com/onflow/flow-go/module/metrics"
"github.com/onflow/flow-go/network/channels"
"github.com/onflow/flow-go/network/p2p"
mockp2p "github.com/onflow/flow-go/network/p2p/mock"
p2ptest "github.com/onflow/flow-go/network/p2p/test"
"github.com/onflow/flow-go/network/p2p/tracer"
"github.com/onflow/flow-go/utils/unittest"
)

Expand All @@ -26,7 +36,54 @@ func startNodesAndEnsureConnected(t *testing.T, ctx irrecoverable.SignalerContex
})
}

func stopNodesAndInspector(t *testing.T, cancel context.CancelFunc, nodes []p2p.LibP2PNode, inspector p2p.GossipSubRPCInspector) {
func stopTestComponents(t *testing.T, cancel context.CancelFunc, nodes []p2p.LibP2PNode, components ...module.ReadyDoneAware) {
p2ptest.StopNodes(t, nodes, cancel)
unittest.RequireComponentsDoneBefore(t, time.Second, inspector)
unittest.RequireComponentsDoneBefore(t, time.Second, components...)
}

func randomClusterPrefixedTopic() channels.Topic {
return channels.Topic(channels.SyncCluster(flow.ChainID(fmt.Sprintf("%d", rand.Uint64()))))
}

type onNotificationDissemination func(spammer *corruptlibp2p.GossipSubRouterSpammer) func(args mockery.Arguments)
type mockDistributorOption func(*mockp2p.GossipSubInspectorNotificationDistributor, *corruptlibp2p.GossipSubRouterSpammer)

func withExpectedNotificationDissemination(expectedNumOfTotalNotif int, f onNotificationDissemination) mockDistributorOption {
return func(distributor *mockp2p.GossipSubInspectorNotificationDistributor, spammer *corruptlibp2p.GossipSubRouterSpammer) {
distributor.
On("Distribute", mockery.Anything).
Times(expectedNumOfTotalNotif).
Run(f(spammer)).
Return(nil)
}
}

// mockDistributorReadyDoneAware mocks the Ready and Done methods of the distributor to return a channel that is already closed,
// so that the distributor is considered ready and done when the test needs.
func mockDistributorReadyDoneAware(d *mockp2p.GossipSubInspectorNotificationDistributor) {
d.On("Start", mockery.Anything).Return().Maybe()
d.On("Ready").Return(func() <-chan struct{} {
ch := make(chan struct{})
close(ch)
return ch
}()).Maybe()
d.On("Done").Return(func() <-chan struct{} {
ch := make(chan struct{})
close(ch)
return ch
}()).Maybe()
}

func meshTracerFixture(flowConfig *config.FlowConfig, idProvider module.IdentityProvider) *tracer.GossipSubMeshTracer {
meshTracerCfg := &tracer.GossipSubMeshTracerConfig{
Logger: unittest.Logger(),
Metrics: metrics.NewNoopCollector(),
IDProvider: idProvider,
LoggerInterval: time.Second,
HeroCacheMetricsFactory: metrics.NewNoopHeroCacheMetricsFactory(),
RpcSentTrackerCacheSize: flowConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerCacheSize,
RpcSentTrackerWorkerQueueCacheSize: flowConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerQueueCacheSize,
RpcSentTrackerNumOfWorkers: flowConfig.NetworkConfig.GossipSubConfig.RpcSentTrackerNumOfWorkers,
}
return tracer.NewGossipSubMeshTracer(meshTracerCfg)
}
Loading

0 comments on commit 5f9c8a0

Please sign in to comment.