Skip to content
26 changes: 23 additions & 3 deletions eth/downloader/bor_fetchers_concurrent_receipts.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package downloader

import (
"math/big"
"time"

"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -92,10 +93,29 @@ func (q *receiptQueue) request(peer *peerConnection, req *fetchRequest, resCh ch
// deliver is responsible for taking a generic response packet from the concurrent
// fetcher, unpacking the receipt data and delivering it to the downloader's queue.
func (q *receiptQueue) deliver(peer *peerConnection, packet *eth.Response) (int, error) {
receipts := *packet.Res.(*eth.ReceiptsRLPResponse)
hashes := packet.Meta.([]common.Hash) // {receipt hashes}
// We're expecting a full decoded receipt list instead of encoded receipts for storage
// earlier. Extract receipts based on type. Add/remove support for new types here as needed.
var (
receipts eth.ReceiptsRLPResponse
getReceiptListHashes func(int, *big.Int) common.Hash
)
switch packet.Res.(type) {
case *eth.ReceiptList68:
receiptList := packet.Res.([]*eth.ReceiptList68)
receipts = eth.EncodeReceipts(receiptList)
getReceiptListHashes = eth.PrepareReceiptListHasher(receiptList, q.queue.borConfig)
case *eth.ReceiptList69:
receiptList := packet.Res.([]*eth.ReceiptList69)
receipts = eth.EncodeReceipts(receiptList)
getReceiptListHashes = eth.PrepareReceiptListHasher(receiptList, q.queue.borConfig)
default:
// This shouldn't happen unless there's a bug in identifying type of receipt list
// or there's a new type which isn't handled here. Just log error.
peer.log.Warn("Unknown receipt list type, discarding packet")
return 0, nil
}

accepted, err := q.queue.DeliverReceipts(peer.id, receipts, hashes)
accepted, err := q.queue.DeliverReceipts(peer.id, receipts, getReceiptListHashes)

switch {
case err == nil && len(receipts) == 0:
Expand Down
6 changes: 4 additions & 2 deletions eth/downloader/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package downloader
import (
"errors"
"fmt"
"math/big"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -1034,12 +1035,13 @@ func (q *queue) DeliverBodies(id string, txLists [][]*types.Transaction, txListH
// DeliverReceipts injects a receipt retrieval response into the results queue.
// The method returns the number of transaction receipts accepted from the delivery
// and also wakes any threads waiting for data delivery.
func (q *queue) DeliverReceipts(id string, receiptList []rlp.RawValue, receiptListHashes []common.Hash) (int, error) {
func (q *queue) DeliverReceipts(id string, receiptList []rlp.RawValue, getReceiptListHash func(int, *big.Int) common.Hash) (int, error) {
q.lock.Lock()
defer q.lock.Unlock()

validate := func(index int, header *types.Header) error {
if receiptListHashes[index] != header.ReceiptHash {
receiptListHash := getReceiptListHash(index, header.Number)
if receiptListHash != header.ReceiptHash {
return errInvalidReceipt
}

Expand Down
9 changes: 4 additions & 5 deletions eth/downloader/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,12 +470,11 @@ func XTestDelivery(t *testing.T) {
}

hasher := trie.NewStackTrie(nil)
hashes := make([]common.Hash, len(rcs))

for i, receipt := range rcs {
hashes[i] = types.DeriveSha(receipt, hasher)
getHashes := func(index int, number *big.Int) common.Hash {
return types.DeriveSha(rcs[index], hasher)
}
_, err := q.DeliverReceipts(peer.id, types.EncodeBlockReceiptLists(rcs), hashes)

_, err := q.DeliverReceipts(peer.id, types.EncodeBlockReceiptLists(rcs), getHashes)
if err != nil {
fmt.Printf("delivered %d receipts %v\n", len(rcs), err)
}
Expand Down
67 changes: 38 additions & 29 deletions eth/protocols/eth/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@ package eth
import (
"encoding/json"
"fmt"
"math/big"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/tracker"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/trie"
)
Expand Down Expand Up @@ -534,50 +536,57 @@ func handleReceipts[L ReceiptsList](backend Backend, msg Decoder, peer *Peer) er
return err
}

// The response in p2p packet can only be consumed once. As we need a copy of receipt
// to exclude state-sync transaction receipt from receipt root calculation, encode
// and decode the response back.
packet, err := rlp.EncodeToBytes(res)
if err != nil {
return fmt.Errorf("failed to re-encode receipt packet for making copy: %w", err)
}

resWithoutStateSync := new(ReceiptsPacket[L])
if err := rlp.DecodeBytes(packet, resWithoutStateSync); err != nil {
return fmt.Errorf("failed to decode re-encoded receipt packet for making copy: %w", err)
}

// Assign temporary hashing buffer to each list item, the same buffer is shared
// between all receipt list instances.
buffers := new(receiptListBuffers)
for i := range res.List {
res.List[i].setBuffers(buffers)
}

// The `metadata` function below was used earlier to calculate `ReceiptHash` which is further
// used to validate against `header.ReceiptHash`. By default, state-sync receipts (which are
// appended at the end of list for a block) are excluded from the `ReceiptHash` calculation.
// After the state-sync hard fork, they should be included in the calculation. We don't have
// access to block number here so we can't determine whether to exclude or not. Instead, just
// ignore the `metadata` function and pass on the whole receipt list as is. The receipt queue
// handler which has access to block number will take care of the exclusion if needed.
metadata := func() interface{} {
hasher := trie.NewStackTrie(nil)
hashes := make([]common.Hash, len(resWithoutStateSync.List))
for i := range resWithoutStateSync.List {
// The receipt root of a block doesn't include receipts from state-sync
// transactions specific to polygon. Exclude them for calculating the
// hashes of all receipts.
resWithoutStateSync.List[i].ExcludeStateSyncReceipt()
hashes[i] = types.DeriveSha(resWithoutStateSync.List[i], hasher)
}

return hashes
}
var enc ReceiptsRLPResponse
for i := range res.List {
enc = append(enc, res.List[i].EncodeForStorage())
return nil
}

// Assign the decoded receipt list to the result of `Response` packet.
return peer.dispatchResponse(&Response{
id: res.RequestId,
code: ReceiptsMsg,
Res: &enc,
Res: &res.List,
}, metadata)
}

// EncodeReceipts encodes a list of receipts to the storage format (does not include TxType field).
func EncodeReceipts[L ReceiptsList](receipts []L) ReceiptsRLPResponse {
var encodedReceipts ReceiptsRLPResponse = make(ReceiptsRLPResponse, len(receipts))
for i := range receipts {
encodedReceipts[i] = receipts[i].EncodeForStorage()
}
return encodedReceipts
}

// PrepareReceiptListHasher returns a function which calculates `ReceiptHash` of a receipt list
// based on the whether we've crossed the hardfork or not.
func PrepareReceiptListHasher[L ReceiptsList](receipts []L, borCfg *params.BorConfig) func(int, *big.Int) common.Hash {
hasher := trie.NewStackTrie(nil)
calculateReceiptHashes := func(index int, number *big.Int) common.Hash {
// Don't exclude state-sync receipts for post hardfork blocks
if borCfg.IsStateSync(number) {
return types.DeriveSha(receipts[index], hasher)
} else {
receipts[index].ExcludeStateSyncReceipt()
return types.DeriveSha(receipts[index], hasher)
}
}
return calculateReceiptHashes
}

func handleNewPooledTransactionHashes(backend Backend, msg Decoder, peer *Peer) error {
// New transaction announcement arrived, make sure we have
// a valid and fresh chain to handle them
Expand Down
Loading