Skip to content

Commit

Permalink
add support for archival vs avail sessons
Browse files Browse the repository at this point in the history
  • Loading branch information
Wondertan committed Jul 23, 2024
1 parent 99f8523 commit 512a68a
Showing 1 changed file with 42 additions and 11 deletions.
53 changes: 42 additions & 11 deletions share/shwap/p2p/bitswap/getter.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,26 +11,45 @@ import (
"github.com/celestiaorg/rsmt2d"

"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/pruner"
"github.com/celestiaorg/celestia-node/share"
)

// Getter implements share.Getter.
type Getter struct {
exchange exchange.SessionExchange
bstore blockstore.Blockstore
session exchange.Fetcher
cancel context.CancelFunc
exchange exchange.SessionExchange
bstore blockstore.Blockstore
availWndw pruner.AvailabilityWindow

availableSession exchange.Fetcher
archivalSession exchange.Fetcher

cancel context.CancelFunc
}

// NewGetter constructs a new Getter.
func NewGetter(exchange exchange.SessionExchange, bstore blockstore.Blockstore) *Getter {
return &Getter{exchange: exchange, bstore: bstore}
func NewGetter(
exchange exchange.SessionExchange,
bstore blockstore.Blockstore,
availWndw pruner.AvailabilityWindow,
) *Getter {
return &Getter{exchange: exchange, bstore: bstore, availWndw: availWndw}
}

// Start kicks off internal fetching session.
// Start kicks off internal fetching sessions.
//
// We keep Bitswap sessions for the whole Getter lifespan:
// - Sessions retain useful heuristics about peers, like TTFB
// - Sessions prefer peers that previously served us related content.
//
// So reusing session is expected to improve fetching performance.
//
// There are two sessions for archival and available data, so archival node peers aren't mixed
// with regular full node peers.
func (g *Getter) Start() {
ctx, cancel := context.WithCancel(context.Background())
g.session = g.exchange.NewSession(ctx)
g.availableSession = g.exchange.NewSession(ctx)
g.archivalSession = g.exchange.NewSession(ctx)
g.cancel = cancel
}

Expand Down Expand Up @@ -64,7 +83,8 @@ func (g *Getter) GetShares(
blks[i] = sid
}

err := Fetch(ctx, g.exchange, hdr.DAH, blks, WithStore(g.bstore))
ses := g.session(hdr)
err := Fetch(ctx, g.exchange, hdr.DAH, blks, WithStore(g.bstore), WithFetcher(ses))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -111,7 +131,8 @@ func (g *Getter) GetEDS(
blks[i] = blk
}

err := Fetch(ctx, g.exchange, hdr.DAH, blks, WithFetcher(g.session))
ses := g.session(hdr)
err := Fetch(ctx, g.exchange, hdr.DAH, blks, WithFetcher(ses))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -158,7 +179,8 @@ func (g *Getter) GetSharesByNamespace(
blks[i] = rndblk
}

err := Fetch(ctx, g.exchange, hdr.DAH, blks, WithFetcher(g.session))
ses := g.session(hdr)
err := Fetch(ctx, g.exchange, hdr.DAH, blks, WithFetcher(ses))
if err != nil {
return nil, err
}
Expand All @@ -175,3 +197,12 @@ func (g *Getter) GetSharesByNamespace(

return nsShrs, nil
}

// session decides which fetching session to use for the given header.
func (g *Getter) session(hdr *header.ExtendedHeader) exchange.Fetcher {
if pruner.IsWithinAvailabilityWindow(hdr.Time(), g.availWndw) {
return g.availableSession
}

return g.archivalSession
}

0 comments on commit 512a68a

Please sign in to comment.