Skip to content

Commit

Permalink
Merge branch 'develop' into feat/flattenproof
Browse files Browse the repository at this point in the history
  • Loading branch information
lispc authored Sep 5, 2024
2 parents 984bfc5 + 8f8ef30 commit 5a590c2
Show file tree
Hide file tree
Showing 25 changed files with 1,751 additions and 417 deletions.
1 change: 1 addition & 0 deletions accounts/abi/bind/backends/simulated.go
Original file line number Diff line number Diff line change
Expand Up @@ -821,6 +821,7 @@ func (m callMsg) Value() *big.Int { return m.CallMsg.Value }
func (m callMsg) Data() []byte { return m.CallMsg.Data }
func (m callMsg) AccessList() types.AccessList { return m.CallMsg.AccessList }
func (m callMsg) IsL1MessageTx() bool { return false }
func (m callMsg) TxSize() common.StorageSize { return 0 }

// filterBackend implements filters.Backend to support filtering for logs without
// taking bloom-bits acceleration structures into account.
Expand Down
5 changes: 5 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -1551,6 +1551,11 @@ func setMiner(ctx *cli.Context, cfg *miner.Config) {
if ctx.GlobalIsSet(LegacyMinerGasTargetFlag.Name) {
log.Warn("The generic --miner.gastarget flag is deprecated and will be removed in the future!")
}

cfg.CCCMaxWorkers = runtime.GOMAXPROCS(0)
if ctx.GlobalIsSet(CircuitCapacityCheckWorkersFlag.Name) {
cfg.CCCMaxWorkers = int(ctx.GlobalUint(CircuitCapacityCheckWorkersFlag.Name))
}
}

func setWhitelist(ctx *cli.Context, cfg *ethconfig.Config) {
Expand Down
8 changes: 5 additions & 3 deletions core/evm.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,11 @@ func NewEVMBlockContext(header *types.Header, chain ChainContext, chainConfig *p
// NewEVMTxContext creates a new transaction context for a single transaction.
func NewEVMTxContext(msg Message) vm.TxContext {
return vm.TxContext{
Origin: msg.From(),
To: msg.To(),
GasPrice: new(big.Int).Set(msg.GasPrice()),
Origin: msg.From(),
To: msg.To(),
GasPrice: new(big.Int).Set(msg.GasPrice()),
IsL1MessageTx: msg.IsL1MessageTx(),
TxSize: msg.TxSize(),
}
}

Expand Down
5 changes: 5 additions & 0 deletions core/rawdb/accessors_row_consumption.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,8 @@ func ReadBlockRowConsumptionRLP(db ethdb.Reader, l2BlockHash common.Hash) rlp.Ra
}
return data
}

// DeleteBlockRowConsumption deletes a RowConsumption of the block from the database
func DeleteBlockRowConsumption(db ethdb.KeyValueWriter, l2BlockHash common.Hash) error {
return db.Delete(rowConsumptionKey(l2BlockHash))
}
6 changes: 6 additions & 0 deletions core/state_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package core

import (
"errors"
"fmt"
"math/big"
"time"
Expand Down Expand Up @@ -132,6 +133,11 @@ func applyTransaction(msg types.Message, config *params.ChainConfig, bc ChainCon
// Apply the transaction to the current state (included in the env).
applyMessageStartTime := time.Now()
result, err := ApplyMessage(evm, msg, gp, l1DataFee)
if evm.Config.Debug {
if erroringTracer, ok := evm.Config.Tracer.(interface{ Error() error }); ok {
err = errors.Join(err, erroringTracer.Error())
}
}
applyMessageTimer.Update(time.Since(applyMessageStartTime))
if err != nil {
return nil, err
Expand Down
1 change: 1 addition & 0 deletions core/state_transition.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ type Message interface {
Data() []byte
AccessList() types.AccessList
IsL1MessageTx() bool
TxSize() common.StorageSize
}

// ExecutionResult includes all output after executing given evm
Expand Down
9 changes: 9 additions & 0 deletions core/tx_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ var (
var (
evictionInterval = time.Minute // Time interval to check for evictable transactions
statsReportInterval = 8 * time.Second // Time interval to report transaction pool stats

dumpReorgTxHashThreshold = 100 // Number of transaction hashse to dump when runReorg
)

var (
Expand Down Expand Up @@ -1362,6 +1364,13 @@ func (pool *TxPool) runReorg(done chan struct{}, reset *txpoolResetRequest, dirt
txs = append(txs, set.Flatten()...)
}
pool.txFeed.Send(NewTxsEvent{txs})

log.Debug("runReorg", "len(txs)", len(txs))
if len(txs) > dumpReorgTxHashThreshold {
for _, txs := range txs {
log.Debug("dumping runReorg tx hashes", "txHash", txs.Hash().Hex())
}
}
}
}

Expand Down
27 changes: 15 additions & 12 deletions core/types/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -752,6 +752,7 @@ type Message struct {
accessList AccessList
isFake bool
isL1MessageTx bool
txSize common.StorageSize
}

func NewMessage(from common.Address, to *common.Address, nonce uint64, amount *big.Int, gasLimit uint64, gasPrice, gasFeeCap, gasTipCap *big.Int, data []byte, accessList AccessList, isFake bool) Message {
Expand Down Expand Up @@ -785,6 +786,7 @@ func (tx *Transaction) AsMessage(s Signer, baseFee *big.Int) (Message, error) {
accessList: tx.AccessList(),
isFake: false,
isL1MessageTx: tx.IsL1MessageTx(),
txSize: tx.Size(),
}
// If baseFee provided, set gasPrice to effectiveGasPrice.
if baseFee != nil {
Expand All @@ -795,18 +797,19 @@ func (tx *Transaction) AsMessage(s Signer, baseFee *big.Int) (Message, error) {
return msg, err
}

func (m Message) From() common.Address { return m.from }
func (m Message) To() *common.Address { return m.to }
func (m Message) GasPrice() *big.Int { return m.gasPrice }
func (m Message) GasFeeCap() *big.Int { return m.gasFeeCap }
func (m Message) GasTipCap() *big.Int { return m.gasTipCap }
func (m Message) Value() *big.Int { return m.amount }
func (m Message) Gas() uint64 { return m.gasLimit }
func (m Message) Nonce() uint64 { return m.nonce }
func (m Message) Data() []byte { return m.data }
func (m Message) AccessList() AccessList { return m.accessList }
func (m Message) IsFake() bool { return m.isFake }
func (m Message) IsL1MessageTx() bool { return m.isL1MessageTx }
func (m Message) From() common.Address { return m.from }
func (m Message) To() *common.Address { return m.to }
func (m Message) GasPrice() *big.Int { return m.gasPrice }
func (m Message) GasFeeCap() *big.Int { return m.gasFeeCap }
func (m Message) GasTipCap() *big.Int { return m.gasTipCap }
func (m Message) Value() *big.Int { return m.amount }
func (m Message) Gas() uint64 { return m.gasLimit }
func (m Message) Nonce() uint64 { return m.nonce }
func (m Message) Data() []byte { return m.data }
func (m Message) AccessList() AccessList { return m.accessList }
func (m Message) IsFake() bool { return m.isFake }
func (m Message) IsL1MessageTx() bool { return m.isL1MessageTx }
func (m Message) TxSize() common.StorageSize { return m.txSize }

// copyAddressPtr copies an address.
func copyAddressPtr(a *common.Address) *common.Address {
Expand Down
8 changes: 5 additions & 3 deletions core/vm/evm.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,11 @@ type BlockContext struct {
// All fields can change between transactions.
type TxContext struct {
// Message information
Origin common.Address // Provides information for ORIGIN
To *common.Address // Provides information for trace
GasPrice *big.Int // Provides information for GASPRICE
Origin common.Address // Provides information for ORIGIN
To *common.Address // Provides information for trace
IsL1MessageTx bool // Provides information for trace
TxSize common.StorageSize // Provides information for trace
GasPrice *big.Int // Provides information for GASPRICE
}

// EVM is the Ethereum Virtual Machine base object and provides
Expand Down
2 changes: 1 addition & 1 deletion eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func New(stack *node.Node, config *ethconfig.Config, l1Client sync_service.EthCl
return nil, err
}
if config.CheckCircuitCapacity {
eth.asyncChecker = ccc.NewAsyncChecker(eth.blockchain, config.CCCMaxWorkers, true)
eth.asyncChecker = ccc.NewAsyncChecker(eth.blockchain, config.CCCMaxWorkers, false)
eth.asyncChecker.WithOnFailingBlock(func(b *types.Block, err error) {
log.Warn("block failed CCC check, it will be reorged by the sequencer", "hash", b.Hash(), "err", err)
})
Expand Down
8 changes: 8 additions & 0 deletions eth/fetcher/tx_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ var (
txFetcherQueueingHashes = metrics.NewRegisteredGauge("eth/fetcher/transaction/queueing/hashes", nil)
txFetcherFetchingPeers = metrics.NewRegisteredGauge("eth/fetcher/transaction/fetching/peers", nil)
txFetcherFetchingHashes = metrics.NewRegisteredGauge("eth/fetcher/transaction/fetching/hashes", nil)

peerAnnounceTxsLenGauge = metrics.NewRegisteredGauge("eth/fetcher/peer/announce/txs", nil)
peerRetrievalTxsLenGauge = metrics.NewRegisteredGauge("eth/fetcher/peer/retrieval/txs", nil)
)

// txAnnounce is the notification of the availability of a batch
Expand Down Expand Up @@ -790,6 +793,11 @@ func (f *TxFetcher) scheduleFetches(timer *mclock.Timer, timeout chan struct{},
}
return true // continue in the for-each
})

log.Debug("Scheduling transaction retrieval", "peer", peer, "len(f.announces[peer])", len(f.announces[peer]), "len(hashes)", len(hashes))
peerAnnounceTxsLenGauge.Update(int64(len(f.announces[peer])))
peerRetrievalTxsLenGauge.Update(int64(len(hashes)))

// If any hashes were allocated, request them from the peer
if len(hashes) > 0 {
f.requests[peer] = &txRequest{hashes: hashes, time: f.clock.Now()}
Expand Down
15 changes: 15 additions & 0 deletions eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,17 @@ import (
"github.com/scroll-tech/go-ethereum/ethdb"
"github.com/scroll-tech/go-ethereum/event"
"github.com/scroll-tech/go-ethereum/log"
"github.com/scroll-tech/go-ethereum/metrics"
"github.com/scroll-tech/go-ethereum/p2p"
"github.com/scroll-tech/go-ethereum/params"
"github.com/scroll-tech/go-ethereum/trie"
)

var (
annoTxsLenGauge = metrics.NewRegisteredGauge("eth/handler/broadast/announce/txs", nil)
directTxsLenGauge = metrics.NewRegisteredGauge("eth/handler/broadast/direct/txs", nil)
)

const (
// txChanSize is the size of channel listening to NewTxsEvent.
// The number is referenced from the size of tx pool.
Expand Down Expand Up @@ -512,15 +518,24 @@ func (h *handler) BroadcastTransactions(txs types.Transactions) {
directPeers++
directCount += len(hashes)
peer.AsyncSendTransactions(hashes)
log.Debug("Transactions being broadcasted to", "peer", peer.String(), "len", len(hashes))
}
for peer, hashes := range annos {
annoPeers++
annoCount += len(hashes)
peer.AsyncSendPooledTransactionHashes(hashes)
log.Debug("Transactions being announced to", "peer", peer.String(), "len", len(hashes))
}
log.Debug("Transaction broadcast", "txs", len(txs),
"announce packs", annoPeers, "announced hashes", annoCount,
"tx packs", directPeers, "broadcast txs", directCount)

if directPeers > 0 {
directTxsLenGauge.Update(int64(directCount / directPeers))
}
if annoPeers > 0 {
annoTxsLenGauge.Update(int64(annoCount / annoPeers))
}
}

// minedBroadcastLoop sends mined blocks to connected peers.
Expand Down
29 changes: 29 additions & 0 deletions eth/protocols/eth/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,19 @@ import (

"github.com/scroll-tech/go-ethereum/common"
"github.com/scroll-tech/go-ethereum/core/types"
"github.com/scroll-tech/go-ethereum/log"
"github.com/scroll-tech/go-ethereum/metrics"
)

var (
broadcastSendTxsLenGauge = metrics.NewRegisteredGauge("eth/protocols/eth/broadcast/direct/txs", nil)
broadcastSendTxsFailMeter = metrics.NewRegisteredMeter("eth/protocols/eth/broadcast/direct/fail", nil)
broadcastSendHashesLenGauge = metrics.NewRegisteredGauge("eth/protocols/eth/broadcast/direct/hashes", nil)
broadcastSendQueueLenGauge = metrics.NewRegisteredGauge("eth/protocols/eth/broadcast/direct/queue", nil)
broadcastAnnoTxsLenGauge = metrics.NewRegisteredGauge("eth/protocols/eth/broadcast/anno/txs", nil)
broadcastAnnoTxsFailMeter = metrics.NewRegisteredMeter("eth/protocols/eth/broadcast/anno/fail", nil)
broadcastAnnoHashesLenGauge = metrics.NewRegisteredGauge("eth/protocols/eth/broadcast/anno/hashes", nil)
broadcastAnnoQueueLenGauge = metrics.NewRegisteredGauge("eth/protocols/eth/broadcast/anno/queue", nil)
)

const (
Expand Down Expand Up @@ -92,10 +105,15 @@ func (p *Peer) broadcastTransactions() {
if len(txs) > 0 {
done = make(chan struct{})
go func() {
log.Debug("Sending transactions", "count", len(txs))
broadcastSendTxsLenGauge.Update(int64(len(txs)))
if err := p.SendTransactions(txs); err != nil {
log.Debug("Sending transactions", "count", len(txs), "err", err)
broadcastSendTxsFailMeter.Mark(1)
fail <- err
return
}
log.Debug("Sent transactions", "count", len(txs))
close(done)
p.Log().Trace("Sent transactions", "count", len(txs))
}()
Expand All @@ -110,6 +128,9 @@ func (p *Peer) broadcastTransactions() {
}
// New batch of transactions to be broadcast, queue them (with cap)
queue = append(queue, hashes...)
log.Debug("Queue size in broadcastTransactions", "len(hashes)", len(hashes), "len(queue)", len(queue), "maxQueuedTxs", maxQueuedTxs)
broadcastSendHashesLenGauge.Update(int64(len(hashes)))
broadcastSendQueueLenGauge.Update(int64(len(queue)))
if len(queue) > maxQueuedTxs {
// Fancy copy and resize to ensure buffer doesn't grow indefinitely
queue = queue[:copy(queue, queue[len(queue)-maxQueuedTxs:])]
Expand Down Expand Up @@ -159,10 +180,15 @@ func (p *Peer) announceTransactions() {
if len(pending) > 0 {
done = make(chan struct{})
go func() {
log.Debug("Sending transaction announcements", "count", len(pending))
broadcastAnnoTxsLenGauge.Update(int64(len(pending)))
if err := p.sendPooledTransactionHashes(pending); err != nil {
log.Debug("Sending transaction announcements", "count", len(pending), "err", err)
broadcastAnnoTxsFailMeter.Mark(1)
fail <- err
return
}
log.Debug("Sent transaction announcements", "count", len(pending))
close(done)
p.Log().Trace("Sent transaction announcements", "count", len(pending))
}()
Expand All @@ -177,6 +203,9 @@ func (p *Peer) announceTransactions() {
}
// New batch of transactions to be broadcast, queue them (with cap)
queue = append(queue, hashes...)
log.Debug("Queue size in announceTransactions", "len(hashes)", len(hashes), "len(queue)", len(queue), "maxQueuedTxAnns", maxQueuedTxAnns)
broadcastAnnoHashesLenGauge.Update(int64(len(hashes)))
broadcastAnnoQueueLenGauge.Update(int64(len(queue)))
if len(queue) > maxQueuedTxAnns {
// Fancy copy and resize to ensure buffer doesn't grow indefinitely
queue = queue[:copy(queue, queue[len(queue)-maxQueuedTxAnns:])]
Expand Down
36 changes: 36 additions & 0 deletions eth/protocols/eth/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,25 @@ import (
"github.com/scroll-tech/go-ethereum/common"
"github.com/scroll-tech/go-ethereum/core/types"
"github.com/scroll-tech/go-ethereum/log"
"github.com/scroll-tech/go-ethereum/metrics"
"github.com/scroll-tech/go-ethereum/rlp"
"github.com/scroll-tech/go-ethereum/trie"
)

var (
newPooledTxHashesFailMeter = metrics.NewRegisteredMeter("eth/protocols/eth/handlers/newpooledtxhashes/fail", nil)
newPooledTxHashesLenGauge = metrics.NewRegisteredGauge("eth/protocols/eth/handlers/newpooledtxhashes/len", nil)
getPooledTxsFailMeter = metrics.NewRegisteredMeter("eth/protocols/eth/handlers/getpooledtxs/fail", nil)
getPooledTxsQueryLenGauge = metrics.NewRegisteredGauge("eth/protocols/eth/handlers/getpooledtxs/query", nil)
getPooledTxsRetrievedLenGauge = metrics.NewRegisteredGauge("eth/protocols/eth/handlers/getpooledtxs/retrieved", nil)
handleTxsFailMeter = metrics.NewRegisteredMeter("eth/protocols/eth/handlers/handletxs/fail", nil)
handleTxsLenGauge = metrics.NewRegisteredGauge("eth/protocols/eth/handlers/handletxs/len", nil)
handleTxsNilMeter = metrics.NewRegisteredMeter("eth/protocols/eth/handlers/handletxs/nil", nil)
pooledTxs66FailMeter = metrics.NewRegisteredMeter("eth/protocols/eth/handlers/pooledtxs66/fail", nil)
pooledTxs66LenGauge = metrics.NewRegisteredGauge("eth/protocols/eth/handlers/pooledtxs66/len", nil)
pooledTxs66NilMeter = metrics.NewRegisteredMeter("eth/protocols/eth/handlers/pooledtxs66/nil", nil)
)

// handleGetBlockHeaders66 is the eth/66 version of handleGetBlockHeaders
func handleGetBlockHeaders66(backend Backend, msg Decoder, peer *Peer) error {
// Decode the complex header query
Expand Down Expand Up @@ -323,9 +338,13 @@ func handleNewPooledTransactionHashes(backend Backend, msg Decoder, peer *Peer)
}
ann := new(NewPooledTransactionHashesPacket)
if err := msg.Decode(ann); err != nil {
log.Debug("Failed to decode `NewPooledTransactionHashesPacket`", "peer", peer.String(), "err", err)
newPooledTxHashesFailMeter.Mark(1)
return fmt.Errorf("%w: message %v: %v", errDecode, msg, err)
}
// Schedule all the unknown hashes for retrieval
log.Debug("handleNewPooledTransactionHashes", "peer", peer.String(), "len(ann)", len(*ann))
newPooledTxHashesLenGauge.Update(int64(len(*ann)))
for _, hash := range *ann {
peer.markTransaction(hash)
}
Expand All @@ -336,9 +355,14 @@ func handleGetPooledTransactions66(backend Backend, msg Decoder, peer *Peer) err
// Decode the pooled transactions retrieval message
var query GetPooledTransactionsPacket66
if err := msg.Decode(&query); err != nil {
log.Debug("Failed to decode `GetPooledTransactionsPacket66`", "peer", peer.String(), "err", err)
getPooledTxsFailMeter.Mark(1)
return fmt.Errorf("%w: message %v: %v", errDecode, msg, err)
}
hashes, txs := answerGetPooledTransactions(backend, query.GetPooledTransactionsPacket, peer)
log.Debug("handleGetPooledTransactions", "peer", peer.String(), "RequestId", query.RequestId, "len(query)", len(query.GetPooledTransactionsPacket), "retrieved", len(hashes))
getPooledTxsQueryLenGauge.Update(int64(len(query.GetPooledTransactionsPacket)))
getPooledTxsRetrievedLenGauge.Update(int64(len(hashes)))
return peer.ReplyPooledTransactionsRLP(query.RequestId, hashes, txs)
}

Expand Down Expand Up @@ -378,11 +402,17 @@ func handleTransactions(backend Backend, msg Decoder, peer *Peer) error {
// Transactions can be processed, parse all of them and deliver to the pool
var txs TransactionsPacket
if err := msg.Decode(&txs); err != nil {
handleTxsFailMeter.Mark(1)
log.Debug("Failed to decode `TransactionsPacket`", "peer", peer.String(), "err", err)
return fmt.Errorf("%w: message %v: %v", errDecode, msg, err)
}
log.Debug("handleTransactions", "peer", peer.String(), "len(txs)", len(txs))
handleTxsLenGauge.Update(int64(len(txs)))
for i, tx := range txs {
// Validate and mark the remote transaction
if tx == nil {
handleTxsNilMeter.Mark(1)
log.Debug("handleTransactions: transaction is nil", "peer", peer.String(), "i", i)
return fmt.Errorf("%w: transaction %d is nil", errDecode, i)
}
peer.markTransaction(tx.Hash())
Expand All @@ -398,11 +428,17 @@ func handlePooledTransactions66(backend Backend, msg Decoder, peer *Peer) error
// Transactions can be processed, parse all of them and deliver to the pool
var txs PooledTransactionsPacket66
if err := msg.Decode(&txs); err != nil {
pooledTxs66FailMeter.Mark(1)
log.Debug("Failed to decode `PooledTransactionsPacket66`", "peer", peer.String(), "err", err)
return fmt.Errorf("%w: message %v: %v", errDecode, msg, err)
}
log.Debug("handlePooledTransactions66", "peer", peer.String(), "len(txs)", len(txs.PooledTransactionsPacket))
pooledTxs66LenGauge.Update(int64(len(txs.PooledTransactionsPacket)))
for i, tx := range txs.PooledTransactionsPacket {
// Validate and mark the remote transaction
if tx == nil {
pooledTxs66NilMeter.Mark(1)
log.Debug("handlePooledTransactions: transaction is nil", "peer", peer.String(), "i", i)
return fmt.Errorf("%w: transaction %d is nil", errDecode, i)
}
peer.markTransaction(tx.Hash())
Expand Down
Loading

0 comments on commit 5a590c2

Please sign in to comment.