Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(shwap/bitswap): Shwap optimized Bitswap constructors #3536

Merged
merged 1 commit into from
Aug 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ require (
github.com/ipfs/go-cid v0.4.1
github.com/ipfs/go-datastore v0.6.0
github.com/ipfs/go-ds-badger4 v0.1.5
github.com/ipfs/go-ipfs-delay v0.0.1
github.com/ipfs/go-ipld-cbor v0.1.0
github.com/ipfs/go-ipld-format v0.6.0
github.com/ipfs/go-log/v2 v2.5.1
Expand Down Expand Up @@ -202,7 +203,6 @@ require (
github.com/ipfs/bbloom v0.0.4 // indirect
github.com/ipfs/go-blockservice v0.5.2 // indirect
github.com/ipfs/go-ipfs-blockstore v1.3.1 // indirect
github.com/ipfs/go-ipfs-delay v0.0.1 // indirect
github.com/ipfs/go-ipfs-ds-help v1.1.1 // indirect
github.com/ipfs/go-ipfs-exchange-interface v0.2.1 // indirect
github.com/ipfs/go-ipfs-pq v0.0.3 // indirect
Expand Down
136 changes: 136 additions & 0 deletions share/shwap/p2p/bitswap/bitswap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
package bitswap

import (
"context"
"fmt"
"time"

"github.com/ipfs/boxo/bitswap/client"
"github.com/ipfs/boxo/bitswap/network"
"github.com/ipfs/boxo/bitswap/server"
"github.com/ipfs/boxo/blockstore"
delay "github.com/ipfs/go-ipfs-delay"
routinghelpers "github.com/libp2p/go-libp2p-routing-helpers"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/protocol"
)

// Client constants
const (
// simulateDontHaves emulates DONT_HAVE message from a peer after 5 second timeout.
// This protects us from unresponsive/slow peers.
// TODO(@Wondertan): PR to bitswap to make this timeout configurable
// Higher timeout increases the probability of successful reconstruction
Comment on lines +22 to +23
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this todo should not be part of code base. Lets park it into issue instead

simulateDontHaves = true
// providerSearchDelay defines the initial delay before Bitswap client starts aggressive
// broadcasting of WANTs to all the peers. We offset this for longer than the default to minimize
// unnecessary broadcasting as in most cases we already have peers connected with needed data on
// a new request.
providerSearchDelay = time.Second * 10
// rebroadcastDelay is similar to the providerSearchDelay, but it targets DHT/ContentRouting
// peer discovery and a gentle broadcast of a single random live WANT to all connected peers.
// Considering no DHT usage and broadcasting configured by providerSearchDelay, we set
// rebroadcastDelay to max value, effectively disabling it
rebroadcastDelay = 1<<63 - 1
Wondertan marked this conversation as resolved.
Show resolved Hide resolved
)

// Server constants
const (
// providesEnabled dictates Bitswap Server not to provide content to DHT/ContentRouting as we don't use it
providesEnabled = false
// sendDontHaves prevents Bitswap Server from sending DONT_HAVEs while keeping peers on hold instead:
// * Clients simulate DONT_HAVEs after timeout anyway
// * Servers may not have data immediately and this gives an opportunity to subscribe
// * This is necessary for reconstruction. See https://github.com/celestiaorg/celestia-node/issues/732
sendDontHaves = false
Wondertan marked this conversation as resolved.
Show resolved Hide resolved
// maxServerWantListsPerPeer defines the limit for maximum possible cached wants/requests per peer
// in the Bitswap. Exceeding this limit will cause Bitswap server to drop requested wants leaving
// client stuck for sometime.
// Thus, we make the limit a bit generous, so we minimize the chances of this happening.
// This is relevant until https://github.com/ipfs/boxo/pull/629#discussion_r1653362485 is fixed.
maxServerWantListsPerPeer = 8096
Comment on lines +46 to +51
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think It will affect the max size of blocks we can reconstruct? WIf yes, would be great to specify max eds size we can support with this value in use.

Copy link
Member Author

@Wondertan Wondertan Jul 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the limit is reached, reconstruction will slow down for N*30 secs, but it should still succeed. The N represents how many times all the samples didn't fit into the want list.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where 30sec comes from? If the peer sends 16k Iwants to the server, looks like the server will just drop first 8k of requests. which means it might never reply to those iwant requests. For example, 8096 will not fit 128*128 samples, which might be a potential limitation point for that setting value

Copy link
Member Author

@Wondertan Wondertan Jul 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

30 seconds comes from WANT resend timeout in Bitswap MessageeQueue. Client side resends the WANTs every 30 secs basically.

// targetMessageSize defines how much data Bitswap will aim to pack within a single message, before
// splitting it up in multiple. Bitswap first looks up the size of the requested data across
// multiple requests and only after reads up the data in portions one-by-one targeting the
// targetMessageSize.
//
// Bigger number will speed transfers up if reading data from disk is fast. In our case, the
// Bitswap's size lookup via [Blockstore] will already cause underlying cache to keep the data,
// so reading up data is fast, and we can aim to pack as much as we can.
targetMessageSize = 1 << 20 // 1MB
// outstandingBytesPerPeer limits number of bytes queued for work for a peer across multiple requests.
// We set it to be equal to targetMessageSize * N, so there can max N messages being prepared for
// a peer at once.
outstandingBytesPerPeer = targetMessageSize * 4
)

// NewNetwork constructs Bitswap network for Shwap protocol composition.
func NewNetwork(host host.Host, prefix protocol.ID) network.BitSwapNetwork {
prefix = shwapProtocolID(prefix)
net := network.NewFromIpfsHost(
host,
routinghelpers.Null{},
network.Prefix(prefix),
network.SupportedProtocols([]protocol.ID{protocolID}),
)
return net
}

// NewClient constructs a Bitswap client with parameters optimized for Shwap protocol composition.
// Meant to be used by Full and Light nodes.
func NewClient(
ctx context.Context,
net network.BitSwapNetwork,
bstore blockstore.Blockstore,
) *client.Client {
return client.New(
ctx,
net,
bstore,
client.SetSimulateDontHavesOnTimeout(simulateDontHaves),
client.ProviderSearchDelay(providerSearchDelay),
client.RebroadcastDelay(delay.Fixed(rebroadcastDelay)),
// Prevents Has calls to Blockstore for metric that counts duplicates
// Unnecessary for our use case, so we can save some disk lookups.
client.WithoutDuplicatedBlockStats(),
)
}

// NewServer construct a Bitswap server with parameters optimized for Shwap protocol composition.
// Meant to be used by Full nodes.
func NewServer(
ctx context.Context,
net network.BitSwapNetwork,
bstore blockstore.Blockstore,
opts ...server.Option,
) *server.Server {
opts = append(
opts,
server.ProvideEnabled(providesEnabled),
server.SetSendDontHaves(sendDontHaves),
server.MaxQueuedWantlistEntriesPerPeer(maxServerWantListsPerPeer),
server.WithTargetMessageSize(targetMessageSize),
server.MaxOutstandingBytesPerPeer(outstandingBytesPerPeer),
)

return server.New(
ctx,
net,
bstore,
opts...,
)
}

// TODO(@Wondertan): We have to use the protocol defined by Bitswap here
//
// due to a little bug. Bitswap allows setting custom protocols, but
// they have to be either one of the switch.
// https://github.com/ipfs/boxo/blob/dfd4a53ba828a368cec8d61c3fe12969ac6aa94c/bitswap/network/ipfs_impl.go#L250-L266
var protocolID = network.ProtocolBitswap

func shwapProtocolID(network protocol.ID) protocol.ID {
if network == "" {
return ""
}
return protocol.ID(fmt.Sprintf("/%s/shwap", network))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should add versioning in case protocol breaking changes?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Breaking changes on the ID/Container level would be reflected in CID's codec bump, so we don't have to worry about it here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another question I have about protocol ID is the idea we discussed to separate bitswap networks into samples and reconstruction related. Any plans to make constructors aware of network type in this PR? it seems like a PR those changes would fit well

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought of including that in this PR, but decided against that as those constructors should land next to reconstruction retrieval.

}
9 changes: 1 addition & 8 deletions share/shwap/p2p/bitswap/block_fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func WithStore(store blockstore.Blockstore) FetchOption {
func Fetch(ctx context.Context, exchg exchange.Interface, root *share.Root, blks []Block, opts ...FetchOption) error {
var from, to int
for to < len(blks) {
from, to = to, to+maxPerFetch
from, to = to, to+maxServerWantListsPerPeer
if to >= len(blks) {
to = len(blks)
}
Expand All @@ -51,13 +51,6 @@ func Fetch(ctx context.Context, exchg exchange.Interface, root *share.Root, blks
return ctx.Err()
}

// maxPerFetch sets the limit for maximum items in a single fetch.
// This limit comes from server side default limit size on max possible simultaneous CID WANTs from
// a peer.
//
//https:github.com/ipfs/boxo/blob/dfd4a53ba828a368cec8d61c3fe12969ac6aa94c/bitswap/internal/defaults/defaults.go#L29-L30
const maxPerFetch = 1024

// fetch fetches given Blocks.
// See [Fetch] for detailed description.
func fetch(ctx context.Context, exchg exchange.Interface, root *share.Root, blks []Block, opts ...FetchOption) error {
Expand Down
16 changes: 4 additions & 12 deletions share/shwap/p2p/bitswap/block_fetch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,12 @@ import (
"time"

"github.com/ipfs/boxo/bitswap/client"
"github.com/ipfs/boxo/bitswap/network"
"github.com/ipfs/boxo/bitswap/server"
"github.com/ipfs/boxo/blockstore"
"github.com/ipfs/boxo/exchange"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
routinghelpers "github.com/libp2p/go-libp2p-routing-helpers"
"github.com/libp2p/go-libp2p/core/host"
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -138,26 +136,20 @@ func newExchange(ctx context.Context, t *testing.T, bstore blockstore.Blockstore
}

func newServer(ctx context.Context, host host.Host, store blockstore.Blockstore) {
net := network.NewFromIpfsHost(host, routinghelpers.Null{})
server := server.New(
net := NewNetwork(host, "test")
server := NewServer(
ctx,
net,
store,
server.TaskWorkerCount(2),
server.EngineTaskWorkerCount(2),
server.ProvideEnabled(false),
server.SetSendDontHaves(false),
)
net.Start(server)
}

func newClient(ctx context.Context, host host.Host, store blockstore.Blockstore) *client.Client {
net := network.NewFromIpfsHost(host, routinghelpers.Null{})
client := client.New(
ctx,
net,
store,
)
net := NewNetwork(host, "test")
client := NewClient(ctx, net, store)
net.Start(client)
return client
}
Expand Down
Loading