Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ The following emojis are used to highlight certain changes:

### Removed

- `bitswap/server` do not allow override of peer ledger with `WithPeerLedger` [#938](https://github.com/ipfs/boxo/pull/938)

### Fixed

- `gateway`: Fixed suffix range-requests and updated tests to [gateway-conformance v0.8](https://github.com/ipfs/gateway-conformance/releases/tag/v0.8.0) [#922](https://github.com/ipfs/boxo/pull/922)
Expand Down
5 changes: 0 additions & 5 deletions bitswap/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,6 @@ func WithScoreLedger(scoreLedger server.ScoreLedger) Option {
return Option{server.WithScoreLedger(scoreLedger)}
}

// Deprecated: This is no longer needed and will be removed.
func WithPeerLedger(peerLedger server.PeerLedger) Option {
return Option{server.WithPeerLedger(peerLedger)}
}

func WithTargetMessageSize(tms int) Option {
return Option{server.WithTargetMessageSize(tms)}
}
Expand Down
2 changes: 0 additions & 2 deletions bitswap/server/forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,4 @@ type (
TaskInfo = decision.TaskInfo
ScoreLedger = decision.ScoreLedger
ScorePeerFunc = decision.ScorePeerFunc
PeerLedger = decision.PeerLedger
PeerEntry = decision.PeerEntry
)
57 changes: 2 additions & 55 deletions bitswap/server/internal/decision/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,47 +120,6 @@ type ScoreLedger interface {
Stop()
}

type PeerEntry struct {
Peer peer.ID
Priority int32
WantType pb.Message_Wantlist_WantType
}

// PeerLedger is an external ledger dealing with peers and their want lists.
type PeerLedger interface {
// Wants informs the ledger that [peer.ID] wants [wl.Entry].
// If peer ledger exceed internal limit, then the entry is not added
// and false is returned.
Wants(p peer.ID, e wl.Entry) bool

// CancelWant returns true if the [cid.Cid] was removed from the wantlist of [peer.ID].
CancelWant(p peer.ID, k cid.Cid) bool

// CancelWantWithType will not cancel WantBlock if we sent a HAVE message.
CancelWantWithType(p peer.ID, k cid.Cid, typ pb.Message_Wantlist_WantType)

// Peers returns all peers that want [cid.Cid].
Peers(k cid.Cid) []PeerEntry

// CollectPeerIDs returns all peers that the ledger has an active session with.
CollectPeerIDs() []peer.ID

// WantlistSizeForPeer returns the size of the wantlist for [peer.ID].
WantlistSizeForPeer(p peer.ID) int

// WantlistForPeer returns the wantlist for [peer.ID].
WantlistForPeer(p peer.ID) []wl.Entry

// ClearPeerWantlist clears the wantlist for [peer.ID].
ClearPeerWantlist(p peer.ID)

// PeerDisconnected informs the ledger that [peer.ID] is no longer connected.
PeerDisconnected(p peer.ID)

// HasPeer checks if the ledger has an active session with the given peer.
HasPeer(p peer.ID) bool
}

// Engine manages sending requested blocks to peers.
type Engine struct {
// peerRequestQueue is a priority queue of requests received from peers.
Expand Down Expand Up @@ -188,7 +147,7 @@ type Engine struct {
lock sync.RWMutex // protects the fields immediately below

// peerLedger saves which peers are waiting for a Cid
peerLedger PeerLedger
peerLedger *peerLedger

// an external ledger dealing with peer scores
scoreLedger ScoreLedger
Expand Down Expand Up @@ -280,15 +239,6 @@ func WithScoreLedger(scoreledger ScoreLedger) Option {
}
}

// WithPeerLedger sets a custom [PeerLedger] to be used with this [Engine].
//
// Deprecated: This is no longer needed and will be removed.
func WithPeerLedger(peerLedger PeerLedger) Option {
return func(e *Engine) {
e.peerLedger = peerLedger
}
}

// WithBlockstoreWorkerCount sets the number of worker threads used for
// blockstore operations in the decision engine
func WithBlockstoreWorkerCount(count int) Option {
Expand Down Expand Up @@ -417,10 +367,7 @@ func NewEngine(
opt(e)
}

// If peerLedger was not set by option, then create a default instance.
if e.peerLedger == nil {
e.peerLedger = NewDefaultPeerLedger(e.maxQueuedWantlistEntriesPerPeer)
}
e.peerLedger = newPeerLedger(e.maxQueuedWantlistEntriesPerPeer)

e.bsm = newBlockstoreManager(bs, e.bstoreWorkerCount, bmetrics.PendingBlocksGauge(ctx), bmetrics.ActiveBlocksGauge(ctx))

Expand Down
52 changes: 28 additions & 24 deletions bitswap/server/internal/decision/peer_ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,26 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
)

type DefaultPeerLedger struct {
type peerLedger struct {
// these two maps are inversions of each other
peers map[peer.ID]map[cid.Cid]entry
cids map[cid.Cid]map[peer.ID]entry
// value 0 mean no limit
maxEntriesPerPeer int
}

func NewDefaultPeerLedger(maxEntriesPerPeer uint) *DefaultPeerLedger {
return &DefaultPeerLedger{
type peerEntry struct {
Peer peer.ID
entry
}

type entry struct {
Priority int32
WantType pb.Message_Wantlist_WantType
}

func newPeerLedger(maxEntriesPerPeer uint) *peerLedger {
return &peerLedger{
peers: make(map[peer.ID]map[cid.Cid]entry),
cids: make(map[cid.Cid]map[peer.ID]entry),

Expand All @@ -27,7 +37,7 @@ func NewDefaultPeerLedger(maxEntriesPerPeer uint) *DefaultPeerLedger {
// Wants adds an entry to the peer ledger. If adding the entry would make the
// peer ledger exceed the maxEntriesPerPeer limit, then the entry is not added
// and false is returned.
func (l *DefaultPeerLedger) Wants(p peer.ID, e wl.Entry) bool {
func (l *peerLedger) Wants(p peer.ID, e wl.Entry) bool {
cids, ok := l.peers[p]
if !ok {
cids = make(map[cid.Cid]entry)
Expand All @@ -49,7 +59,7 @@ func (l *DefaultPeerLedger) Wants(p peer.ID, e wl.Entry) bool {
return true
}

func (l *DefaultPeerLedger) CancelWant(p peer.ID, k cid.Cid) bool {
func (l *peerLedger) CancelWant(p peer.ID, k cid.Cid) bool {
wants, ok := l.peers[p]
if !ok {
return false
Expand All @@ -64,7 +74,7 @@ func (l *DefaultPeerLedger) CancelWant(p peer.ID, k cid.Cid) bool {
return had
}

func (l *DefaultPeerLedger) CancelWantWithType(p peer.ID, k cid.Cid, typ pb.Message_Wantlist_WantType) {
func (l *peerLedger) CancelWantWithType(p peer.ID, k cid.Cid, typ pb.Message_Wantlist_WantType) {
wants, ok := l.peers[p]
if !ok {
return
Expand All @@ -85,7 +95,7 @@ func (l *DefaultPeerLedger) CancelWantWithType(p peer.ID, k cid.Cid, typ pb.Mess
l.removePeerFromCid(p, k)
}

func (l *DefaultPeerLedger) removePeerFromCid(p peer.ID, k cid.Cid) {
func (l *peerLedger) removePeerFromCid(p peer.ID, k cid.Cid) {
m, ok := l.cids[k]
if !ok {
return
Expand All @@ -96,40 +106,34 @@ func (l *DefaultPeerLedger) removePeerFromCid(p peer.ID, k cid.Cid) {
}
}

type entry struct {
Priority int32
WantType pb.Message_Wantlist_WantType
}

func (l *DefaultPeerLedger) Peers(k cid.Cid) []PeerEntry {
func (l *peerLedger) Peers(k cid.Cid) []peerEntry {
m, ok := l.cids[k]
if !ok {
return nil
}
peers := make([]PeerEntry, 0, len(m))
peers := make([]peerEntry, 0, len(m))
for p, e := range m {
peers = append(peers, PeerEntry{
Peer: p,
Priority: e.Priority,
WantType: e.WantType,
peers = append(peers, peerEntry{
Peer: p,
entry: e,
})
}
return peers
}

func (l *DefaultPeerLedger) CollectPeerIDs() []peer.ID {
func (l *peerLedger) CollectPeerIDs() []peer.ID {
peers := make([]peer.ID, 0, len(l.peers))
for p := range l.peers {
peers = append(peers, p)
}
return peers
}

func (l *DefaultPeerLedger) WantlistSizeForPeer(p peer.ID) int {
func (l *peerLedger) WantlistSizeForPeer(p peer.ID) int {
return len(l.peers[p])
}

func (l *DefaultPeerLedger) WantlistForPeer(p peer.ID) []wl.Entry {
func (l *peerLedger) WantlistForPeer(p peer.ID) []wl.Entry {
cids, ok := l.peers[p]
if !ok {
return nil
Expand All @@ -149,7 +153,7 @@ func (l *DefaultPeerLedger) WantlistForPeer(p peer.ID) []wl.Entry {
// ClearPeerWantlist does not take an effort to fully erase it from memory.
// This is intended when the peer is still connected and the map capacity could
// be reused. If the memory should be freed use PeerDisconnected instead.
func (l *DefaultPeerLedger) ClearPeerWantlist(p peer.ID) {
func (l *peerLedger) ClearPeerWantlist(p peer.ID) {
cids, ok := l.peers[p]
if !ok {
return
Expand All @@ -160,12 +164,12 @@ func (l *DefaultPeerLedger) ClearPeerWantlist(p peer.ID) {
}
}

func (l *DefaultPeerLedger) PeerDisconnected(p peer.ID) {
func (l *peerLedger) PeerDisconnected(p peer.ID) {
l.ClearPeerWantlist(p)
delete(l.peers, p)
}

func (l *DefaultPeerLedger) HasPeer(p peer.ID) bool {
func (l *peerLedger) HasPeer(p peer.ID) bool {
_, ok := l.peers[p]
return ok
}
10 changes: 0 additions & 10 deletions bitswap/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,16 +130,6 @@ func WithScoreLedger(scoreLedger decision.ScoreLedger) Option {
}
}

// WithPeerLedger configures the engine with a custom [decision.PeerLedger].
//
// Deprecated: This is no longer needed and will be removed.
func WithPeerLedger(peerLedger decision.PeerLedger) Option {
o := decision.WithPeerLedger(peerLedger)
return func(bs *Server) {
bs.engineOptions = append(bs.engineOptions, o)
}
}

// LedgerForPeer returns aggregated data about blocks swapped and communication
// with a given peer.
func (bs *Server) LedgerForPeer(p peer.ID) *decision.Receipt {
Expand Down