diff --git a/share/shwap/p2p/bitswap/block_registry.go b/share/shwap/p2p/bitswap/block_registry.go index 94a53bc7b9..f7221df120 100644 --- a/share/shwap/p2p/bitswap/block_registry.go +++ b/share/shwap/p2p/bitswap/block_registry.go @@ -10,9 +10,9 @@ import ( // EmptyBlock constructs an empty Block with type in the given CID. func EmptyBlock(cid cid.Cid) (Block, error) { - spec, ok := specRegistry[cid.Prefix().MhType] - if !ok { - return nil, fmt.Errorf("unsupported Block type: %v", cid.Prefix().MhType) + spec, err := getSpec(cid) + if err != nil { + return nil, err } blk, err := spec.builder(cid) @@ -23,27 +23,49 @@ func EmptyBlock(cid cid.Cid) (Block, error) { return blk, nil } +// maxBlockSize returns the maximum size of the Block type in the given CID. +func maxBlockSize(cid cid.Cid) (int, error) { + spec, err := getSpec(cid) + if err != nil { + return 0, err + } + + return spec.maxSize, nil +} + // registerBlock registers the new Block type and multihash for it. -func registerBlock(mhcode, codec uint64, idSize int, bldrFn func(cid.Cid) (Block, error)) { +func registerBlock(mhcode, codec uint64, maxSize, idSize int, bldrFn func(cid.Cid) (Block, error)) { mh.Register(mhcode, func() hash.Hash { return &hasher{IDSize: idSize} }) - specRegistry[mhcode] = blockSpec{ + specRegistry[codec] = blockSpec{ idSize: idSize, - codec: codec, + maxSize: maxSize, + mhCode: mhcode, builder: bldrFn, } } +// getSpec returns the blockSpec for the given CID. +func getSpec(cid cid.Cid) (blockSpec, error) { + spec, ok := specRegistry[cid.Type()] + if !ok { + return blockSpec{}, fmt.Errorf("unsupported codec %d", cid.Type()) + } + + return spec, nil +} + // blockSpec holds constant metadata about particular Block types. type blockSpec struct { idSize int - codec uint64 + maxSize int + mhCode uint64 builder func(cid.Cid) (Block, error) } func (spec *blockSpec) String() string { - return fmt.Sprintf("BlockSpec{IDSize: %d, Codec: %d}", spec.idSize, spec.codec) + return fmt.Sprintf("BlockSpec{IDSize: %d, MHCode: %d}", spec.idSize, spec.mhCode) } var specRegistry = make(map[uint64]blockSpec) diff --git a/share/shwap/p2p/bitswap/block_store.go b/share/shwap/p2p/bitswap/block_store.go index 910a95c204..182731d7d2 100644 --- a/share/shwap/p2p/bitswap/block_store.go +++ b/share/shwap/p2p/bitswap/block_store.go @@ -28,7 +28,7 @@ type Blockstore struct { Getter AccessorGetter } -func (b *Blockstore) getBlock(ctx context.Context, cid cid.Cid) (blocks.Block, error) { +func (b *Blockstore) Get(ctx context.Context, cid cid.Cid) (blocks.Block, error) { blk, err := EmptyBlock(cid) if err != nil { return nil, err @@ -42,6 +42,7 @@ func (b *Blockstore) getBlock(ctx context.Context, cid cid.Cid) (blocks.Block, e if err != nil { return nil, fmt.Errorf("getting EDS Accessor for height %v: %w", blk.Height(), err) } + defer func() { if err := acc.Close(); err != nil { log.Warnf("failed to close EDS accessor for height %v: %s", blk.Height(), err) @@ -55,24 +56,19 @@ func (b *Blockstore) getBlock(ctx context.Context, cid cid.Cid) (blocks.Block, e return convertBitswap(blk) } -func (b *Blockstore) Get(ctx context.Context, cid cid.Cid) (blocks.Block, error) { - blk, err := b.getBlock(ctx, cid) - if err != nil { - return nil, err - } - - return blk, nil -} +func (b *Blockstore) GetSize(_ context.Context, cid cid.Cid) (int, error) { + // NOTE: Size is used as a weight for the incoming Bitswap requests. Bitswap uses fair scheduling for the requests + // and prioritizes peers with less *active* work. Active work of a peer is a cumulative weight of all the in-progress + // requests. -func (b *Blockstore) GetSize(ctx context.Context, cid cid.Cid) (int, error) { - // TODO(@Wondertan): Bitswap checks the size of the data(GetSize) before serving it via Get. This means - // GetSize may do an unnecessary read from disk which we can avoid by either caching on Blockstore level - // or returning constant size(we know at that point that we have requested data) - blk, err := b.Get(ctx, cid) + // Constant max block size is used instead of factual size. This avoids disk IO but equalizes the weights of the + // requests of the same type. E.g. row of 2MB EDS and row of 8MB EDS will have the same weight. + size, err := maxBlockSize(cid) if err != nil { return 0, err } - return len(blk.RawData()), nil + + return size, nil } func (b *Blockstore) Has(ctx context.Context, cid cid.Cid) (bool, error) { diff --git a/share/shwap/p2p/bitswap/block_test.go b/share/shwap/p2p/bitswap/block_test.go index 39bb55bc14..a7b7f590a8 100644 --- a/share/shwap/p2p/bitswap/block_test.go +++ b/share/shwap/p2p/bitswap/block_test.go @@ -20,6 +20,7 @@ import ( const ( testCodec = 0x9999 testMultihashCode = 0x9999 + testBlockSize = 256 testIDSize = 2 ) @@ -27,6 +28,7 @@ func init() { registerBlock( testMultihashCode, testCodec, + testBlockSize, testIDSize, func(cid cid.Cid) (Block, error) { return newEmptyTestBlock(cid) @@ -67,7 +69,7 @@ type testBlock struct { } func newTestBlock(id int) *testBlock { - bytes := make([]byte, 256) + bytes := make([]byte, testBlockSize) _, _ = crand.Read(bytes) return &testBlock{id: testID(id), data: bytes} } diff --git a/share/shwap/p2p/bitswap/cid.go b/share/shwap/p2p/bitswap/cid.go index 3c5531a7fd..af7f714de7 100644 --- a/share/shwap/p2p/bitswap/cid.go +++ b/share/shwap/p2p/bitswap/cid.go @@ -35,16 +35,18 @@ func encodeToCID(bm encoding.BinaryMarshaler, mhcode, codec uint64) cid.Cid { // validateCID checks correctness of the CID. func validateCID(cid cid.Cid) error { - prefix := cid.Prefix() - spec, ok := specRegistry[prefix.MhType] - if !ok { - return fmt.Errorf("unsupported multihash type %d", prefix.MhType) + spec, err := getSpec(cid) + if err != nil { + return err } - if prefix.Codec != spec.codec { - return fmt.Errorf("invalid CID codec %d", prefix.Codec) + prefix := cid.Prefix() + if prefix.Version != 1 { + return fmt.Errorf("invalid cid version %d", prefix.Version) + } + if prefix.MhType != spec.mhCode { + return fmt.Errorf("invalid multihash type %d", prefix.MhType) } - if prefix.MhLength != spec.idSize { return fmt.Errorf("invalid multihash length %d", prefix.MhLength) } diff --git a/share/shwap/p2p/bitswap/row_block.go b/share/shwap/p2p/bitswap/row_block.go index 4763f63954..4bf1966d8c 100644 --- a/share/shwap/p2p/bitswap/row_block.go +++ b/share/shwap/p2p/bitswap/row_block.go @@ -6,6 +6,7 @@ import ( "github.com/ipfs/go-cid" + libshare "github.com/celestiaorg/go-square/v2/share" "github.com/celestiaorg/rsmt2d" "github.com/celestiaorg/celestia-node/share" @@ -22,10 +23,15 @@ const ( rowMultihashCode = 0x7801 ) +// maxRowSize is the maximum size of the RowBlock. +// It is calculated as half of the square size multiplied by the share size. +var maxRowSize = share.MaxSquareSize / 2 * libshare.ShareSize + func init() { registerBlock( rowMultihashCode, rowCodec, + maxRowSize, shwap.RowIDSize, func(cid cid.Cid) (Block, error) { return EmptyRowBlockFromCID(cid) diff --git a/share/shwap/p2p/bitswap/row_namespace_data_block.go b/share/shwap/p2p/bitswap/row_namespace_data_block.go index 52ddbd9520..1f9d26907c 100644 --- a/share/shwap/p2p/bitswap/row_namespace_data_block.go +++ b/share/shwap/p2p/bitswap/row_namespace_data_block.go @@ -22,10 +22,14 @@ const ( rowNamespaceDataMultihashCode = 0x7821 ) +// maxRNDSize is the maximum size of the RowNamespaceDataBlock. +var maxRNDSize = maxRowSize + func init() { registerBlock( rowNamespaceDataMultihashCode, rowNamespaceDataCodec, + maxRNDSize, shwap.RowNamespaceDataIDSize, func(cid cid.Cid) (Block, error) { return EmptyRowNamespaceDataBlockFromCID(cid) diff --git a/share/shwap/p2p/bitswap/sample_block.go b/share/shwap/p2p/bitswap/sample_block.go index 062369c0be..0f25ccb454 100644 --- a/share/shwap/p2p/bitswap/sample_block.go +++ b/share/shwap/p2p/bitswap/sample_block.go @@ -3,9 +3,12 @@ package bitswap import ( "context" "fmt" + "math" "github.com/ipfs/go-cid" + libshare "github.com/celestiaorg/go-square/v2/share" + "github.com/celestiaorg/celestia-node/share" "github.com/celestiaorg/celestia-node/share/eds" "github.com/celestiaorg/celestia-node/share/shwap" @@ -21,10 +24,15 @@ const ( sampleMultihashCode = 0x7811 ) +// maxSampleSize is the maximum size of the SampleBlock. +// It is calculated as the size of the share plus the size of the proof. +var maxSampleSize = libshare.ShareSize + share.AxisRootSize*int(math.Log2(float64(share.MaxSquareSize))) + func init() { registerBlock( sampleMultihashCode, sampleCodec, + maxSampleSize, shwap.SampleIDSize, func(cid cid.Cid) (Block, error) { return EmptySampleBlockFromCID(cid)