From 4e3746b88990ffdade59b152f5287b2a0e589f29 Mon Sep 17 00:00:00 2001 From: Dennis Trautwein Date: Fri, 10 Jan 2025 21:32:11 +0100 Subject: [PATCH 1/2] feat: add request callback config option --- dht.go | 3 +++ dht_net.go | 2 ++ dht_options.go | 12 ++++++++++++ internal/config/config.go | 5 +++++ 4 files changed, 22 insertions(+) diff --git a/dht.go b/dht.go index 0a6f2ecb1..3aa962b9f 100644 --- a/dht.go +++ b/dht.go @@ -163,6 +163,8 @@ type IpfsDHT struct { // addrFilter is used to filter the addresses we put into the peer store. // Mostly used to filter out localhost and local addresses. addrFilter func([]ma.Multiaddr) []ma.Multiaddr + + onRequestHook func(ctx context.Context, s network.Stream, req pb.Message) } // Assert that IPFS assumptions about interfaces aren't broken. These aren't a @@ -306,6 +308,7 @@ func makeDHT(h host.Host, cfg dhtcfg.Config) (*IpfsDHT, error) { routingTablePeerFilter: cfg.RoutingTable.PeerFilter, rtPeerDiversityFilter: cfg.RoutingTable.DiversityFilter, addrFilter: cfg.AddressFilter, + onRequestHook: cfg.OnRequestHook, fixLowPeersChan: make(chan struct{}, 1), diff --git a/dht_net.go b/dht_net.go index 3e135df11..c6c7d98e0 100644 --- a/dht_net.go +++ b/dht_net.go @@ -100,6 +100,8 @@ func (dht *IpfsDHT) handleNewMessage(s network.Stream) bool { metrics.ReceivedBytes.M(int64(msgLen)), ) + dht.onRequestHook(ctx, s, req) + handler := dht.handlerForMsgType(req.GetType()) if handler == nil { stats.Record(ctx, metrics.ReceivedMessageErrors.M(1)) diff --git a/dht_options.go b/dht_options.go index 1c0b3d13b..aa32e0e66 100644 --- a/dht_options.go +++ b/dht_options.go @@ -1,6 +1,7 @@ package dht import ( + "context" "fmt" "testing" "time" @@ -12,6 +13,7 @@ import ( "github.com/libp2p/go-libp2p-kbucket/peerdiversity" record "github.com/libp2p/go-libp2p-record" "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/protocol" @@ -368,3 +370,13 @@ func WithCustomMessageSender(messageSenderBuilder func(h host.Host, protos []pro return nil } } + +// OnRequestHook registers a callback function that will be invoked +// for every incoming DHT protocol message. Note: Ensure that the +// callback executes efficiently, as it can block the entire message handler. +func OnRequestHook(f func(ctx context.Context, s network.Stream, req pb.Message)) Option { + return func(c *dhtcfg.Config) error { + c.OnRequestHook = f + return nil + } +} diff --git a/internal/config/config.go b/internal/config/config.go index 3248b8171..0900b47e3 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -1,6 +1,7 @@ package config import ( + "context" "fmt" "time" @@ -14,6 +15,7 @@ import ( "github.com/libp2p/go-libp2p-kbucket/peerdiversity" record "github.com/libp2p/go-libp2p-record" "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/protocol" ma "github.com/multiformats/go-multiaddr" @@ -63,6 +65,7 @@ type Config struct { BootstrapPeers func() []peer.AddrInfo AddressFilter func([]ma.Multiaddr) []ma.Multiaddr + OnRequestHook func(ctx context.Context, s network.Stream, req pb.Message) // test specific Config options DisableFixLowPeers bool @@ -134,6 +137,8 @@ var Defaults = func(o *Config) error { // MAGIC: It makes sense to set it to a multiple of OptProvReturnRatio * BucketSize. We chose a multiple of 4. o.OptimisticProvideJobsPoolSize = 60 + o.OnRequestHook = func(ctx context.Context, s network.Stream, req pb.Message) {} + return nil } From 70dbacb780d2e3445134499a10f7240f174eb89e Mon Sep 17 00:00:00 2001 From: guillaumemichel Date: Wed, 15 Jan 2025 16:11:23 +0100 Subject: [PATCH 2/2] optimisation --- dht_net.go | 4 +++- dht_options.go | 7 ++++--- internal/config/config.go | 2 -- 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/dht_net.go b/dht_net.go index c6c7d98e0..3fd113fcf 100644 --- a/dht_net.go +++ b/dht_net.go @@ -100,7 +100,9 @@ func (dht *IpfsDHT) handleNewMessage(s network.Stream) bool { metrics.ReceivedBytes.M(int64(msgLen)), ) - dht.onRequestHook(ctx, s, req) + if dht.onRequestHook != nil { + dht.onRequestHook(ctx, s, req) + } handler := dht.handlerForMsgType(req.GetType()) if handler == nil { diff --git a/dht_options.go b/dht_options.go index aa32e0e66..5939d2653 100644 --- a/dht_options.go +++ b/dht_options.go @@ -371,9 +371,10 @@ func WithCustomMessageSender(messageSenderBuilder func(h host.Host, protos []pro } } -// OnRequestHook registers a callback function that will be invoked -// for every incoming DHT protocol message. Note: Ensure that the -// callback executes efficiently, as it can block the entire message handler. +// OnRequestHook registers a callback function that will be invoked for every +// incoming DHT protocol message. +// Note: Ensure that the callback executes efficiently, as it will block the +// entire message handler. func OnRequestHook(f func(ctx context.Context, s network.Stream, req pb.Message)) Option { return func(c *dhtcfg.Config) error { c.OnRequestHook = f diff --git a/internal/config/config.go b/internal/config/config.go index 0900b47e3..919c2d0dd 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -137,8 +137,6 @@ var Defaults = func(o *Config) error { // MAGIC: It makes sense to set it to a multiple of OptProvReturnRatio * BucketSize. We chose a multiple of 4. o.OptimisticProvideJobsPoolSize = 60 - o.OnRequestHook = func(ctx context.Context, s network.Stream, req pb.Message) {} - return nil }