diff --git a/miner/collator.go b/miner/collator.go new file mode 100644 index 000000000000..5d8b24ee8907 --- /dev/null +++ b/miner/collator.go @@ -0,0 +1,299 @@ +// Copyright 2015 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package miner + +import ( + "errors" + "math/big" + "sync/atomic" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/state" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/params" +) + +type AddTransactionsResultFunc func(error, []*types.Receipt) bool + +// BlockState represents a block-to-be-mined, which is being assembled. +// A collator can add transactions by calling AddTransactions +type BlockState interface { + // AddTransactions adds the sequence of transactions to the blockstate. Either all + // transactions are added, or none of them. In the latter case, the error + // describes the reason why the txs could not be included. + // if ErrRecommit, the collator should not attempt to add more transactions to the + // block and submit the block for sealing. + // If ErrAbort is returned, the collator should immediately abort and return a + // value (true) from CollateBlock which indicates to the miner to discard the + // block + AddTransactions(sequence types.Transactions, cb AddTransactionsResultFunc) + Gas() (remaining uint64) + Coinbase() common.Address + BaseFee() *big.Int + Signer() types.Signer +} + +var ( + ErrAbort = errors.New("miner signalled to abort sealing the current block") + ErrRecommit = errors.New("err sealing recommit timer elapsed") + ErrUnsupportedEIP155Tx = errors.New("encountered eip155 tx when chain doesn't support it") +) + +// Collator is something that can assemble a block. +type Collator interface { + // should add transactions to the pending BlockState. + // should return true if sealing of the block should be aborted + CollateBlock(bs BlockState, pool Pool) bool +} + +// Pool is an interface to the transaction pool +type Pool interface { + Pending(bool) (map[common.Address]types.Transactions, error) + Locals() []common.Address +} + +// blockState is an implementation of BlockState +type blockState struct { + state *state.StateDB + logs []*types.Log + worker *worker + coinbase common.Address + baseFee *big.Int + signer types.Signer + interrupt *int32 + resubmitAdjustHandled bool +} + +// Coinbase returns the miner-address of the block being mined +func (bs *blockState) Coinbase() common.Address { + return bs.coinbase +} + +// Basefee returns the basefee for the current block +func (bs *blockState) BaseFee() *big.Int { + return bs.baseFee +} + +// Signer returns the block-specific signer +func (bs *blockState) Signer() types.Signer { + return bs.signer +} + +func (bs *blockState) Commit() { + w := bs.worker + + if !w.isRunning() && len(bs.logs) > 0 { + // We don't push the pendingLogsEvent while we are mining. The reason is that + // when we are mining, the worker will regenerate a mining block every 3 seconds. + // In order to avoid pushing the repeated pendingLog, we disable the pending log pushing. + + // make a copy, the state caches the logs and these logs get "upgraded" from pending to mined + // logs by filling in the block hash when the block was mined by the local miner. This can + // cause a race condition if a log was "upgraded" before the PendingLogsEvent is processed. + cpy := make([]*types.Log, len(bs.logs)) + for i, l := range bs.logs { + cpy[i] = new(types.Log) + *cpy[i] = *l + } + w.pendingLogsFeed.Send(cpy) + } + // Notify resubmit loop to decrease resubmitting interval if current interval is larger + // than the user-specified one. + if !bs.resubmitAdjustHandled && bs.interrupt != nil { + w.resubmitAdjustCh <- &intervalAdjust{inc: false} + } +} + +func (bs *blockState) AddTransactions(sequence types.Transactions, cb AddTransactionsResultFunc) { + var ( + w = bs.worker + snap = w.current.state.Snapshot() + err error + logs []*types.Log + tcount = w.current.tcount + startTCount = w.current.tcount + ) + if bs.resubmitAdjustHandled { + cb(ErrRecommit, nil) + return + } + + for _, tx := range sequence { + if bs.interrupt != nil && atomic.LoadInt32(bs.interrupt) != commitInterruptNone { + // Notify resubmit loop to increase resubmitting interval due to too frequent commits. + if atomic.LoadInt32(bs.interrupt) == commitInterruptResubmit { + ratio := float64(w.current.header.GasLimit-w.current.gasPool.Gas()) / float64(w.current.header.GasLimit) + if ratio < 0.1 { + ratio = 0.1 + } + w.resubmitAdjustCh <- &intervalAdjust{ + ratio: ratio, + inc: true, + } + } + if atomic.LoadInt32(bs.interrupt) == commitInterruptNewHead { + err = ErrAbort + } else { + err = ErrRecommit + } + bs.resubmitAdjustHandled = true + break + } + if w.current.gasPool.Gas() < params.TxGas { + log.Trace("Not enough gas for further transactions", "have", w.current.gasPool, "want", params.TxGas) + err = core.ErrGasLimitReached + break + } + from, _ := types.Sender(w.current.signer, tx) + // Check whether the tx is replay protected. If we're not in the EIP155 hf + // phase, start ignoring the sender until we do. + if tx.Protected() && !w.chainConfig.IsEIP155(w.current.header.Number) { + log.Trace("encountered replay-protected transaction when chain doesn't support replay protection", "hash", tx.Hash(), "eip155", w.chainConfig.EIP155Block) + err = ErrUnsupportedEIP155Tx + break + } + // Start executing the transaction + bs.state.Prepare(tx.Hash(), w.current.tcount) + + var txLogs []*types.Log + txLogs, err = w.commitTransaction(tx, bs.Coinbase()) + if err == nil { + logs = append(logs, txLogs...) + tcount++ + } else { + log.Trace("Tx block inclusion failed", "sender", from, "nonce", tx.Nonce(), + "type", tx.Type(), "hash", tx.Hash(), "err", err) + break + } + } + var txReceipts []*types.Receipt = nil + if err == nil { + txReceipts = w.current.receipts[startTCount:tcount] + } + // TODO: deep copy the tx receipts here or add a disclaimer to implementors not to modify them? + shouldRevert := cb(err, txReceipts) + + if err != nil || shouldRevert { + bs.state.RevertToSnapshot(snap) + + // remove the txs and receipts that were added + for i := startTCount; i < tcount; i++ { + w.current.txs[i] = nil + w.current.receipts[i] = nil + } + w.current.txs = w.current.txs[:startTCount] + w.current.receipts = w.current.receipts[:startTCount] + } else { + bs.logs = append(bs.logs, logs...) + w.current.tcount = tcount + } +} + +func (bs *blockState) Gas() (remaining uint64) { + return bs.worker.current.gasPool.Gas() +} + +// The DefaultCollator is the 'normal' block collator. It assembles a block +// based on transaction price ordering. +type DefaultCollator struct{} + +func submitTransactions(bs BlockState, txs *types.TransactionsByPriceAndNonce) bool { + returnVal := false + shouldReturn := false + cb := func(err error, receipts []*types.Receipt) bool { + switch { + case errors.Is(err, core.ErrGasLimitReached): + fallthrough + case errors.Is(err, core.ErrTxTypeNotSupported): + fallthrough + case errors.Is(err, core.ErrNonceTooHigh): + txs.Pop() + case errors.Is(err, ErrAbort): + returnVal = true + shouldReturn = true + case errors.Is(err, ErrRecommit): + returnVal = false + shouldReturn = true + case errors.Is(err, core.ErrNonceTooLow): + fallthrough + case errors.Is(err, nil): + fallthrough + default: + txs.Shift() + } + return false + } + + for { + // If we don't have enough gas for any further transactions then we're done + available := bs.Gas() + if available < params.TxGas { + break + } + // Retrieve the next transaction and abort if all done + tx := txs.Peek() + if tx == nil { + break + } + // Enough space for this tx? + if available < tx.Gas() { + txs.Pop() + continue + } + bs.AddTransactions(types.Transactions{tx}, cb) + if shouldReturn { + break + } + } + + return returnVal +} + +// CollateBlock fills a block based on the highest paying transactions from the +// transaction pool, giving precedence over local transactions. +func (w *DefaultCollator) CollateBlock(bs BlockState, pool Pool) bool { + txs, err := pool.Pending(true) + if err != nil { + log.Error("could not get pending transactions from the pool", "err", err) + return true + } + if len(txs) == 0 { + return true + } + // Split the pending transactions into locals and remotes + localTxs, remoteTxs := make(map[common.Address]types.Transactions), txs + for _, account := range pool.Locals() { + if accountTxs := remoteTxs[account]; len(accountTxs) > 0 { + delete(remoteTxs, account) + localTxs[account] = accountTxs + } + } + if len(localTxs) > 0 { + if submitTransactions(bs, types.NewTransactionsByPriceAndNonce(bs.Signer(), localTxs, bs.BaseFee())) { + return true + } + } + if len(remoteTxs) > 0 { + if submitTransactions(bs, types.NewTransactionsByPriceAndNonce(bs.Signer(), remoteTxs, bs.BaseFee())) { + return true + } + } + + return false +} diff --git a/miner/worker.go b/miner/worker.go index accf3dac9096..c9754956fc5d 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -505,23 +505,12 @@ func (w *worker) mainLoop() { if gp := w.current.gasPool; gp != nil && gp.Gas() < params.TxGas { continue } - w.mu.RLock() - coinbase := w.coinbase - w.mu.RUnlock() - txs := make(map[common.Address]types.Transactions) for _, tx := range ev.Txs { acc, _ := types.Sender(w.current.signer, tx) txs[acc] = append(txs[acc], tx) } - txset := types.NewTransactionsByPriceAndNonce(w.current.signer, txs, w.current.header.BaseFee) - tcount := w.current.tcount - w.commitTransactions(txset, coinbase, nil) - // Only update the snapshot if any new transactons were added - // to the pending block - if tcount != w.current.tcount { - w.updateSnapshot() - } + w.commitTransactionsToPending(txs) } else { // Special case, if the consensus engine is 0 period clique(dev mode), // submit mining work here since all empty submission will be rejected @@ -763,122 +752,33 @@ func (w *worker) commitTransaction(tx *types.Transaction, coinbase common.Addres return receipt.Logs, nil } -func (w *worker) commitTransactions(txs *types.TransactionsByPriceAndNonce, coinbase common.Address, interrupt *int32) bool { +func (w *worker) collateBlock(coinbase common.Address, interrupt *int32) bool { // Short circuit if current is nil if w.current == nil { return true } - gasLimit := w.current.header.GasLimit if w.current.gasPool == nil { w.current.gasPool = new(core.GasPool).AddGas(gasLimit) } - - var coalescedLogs []*types.Log - - for { - // In the following three cases, we will interrupt the execution of the transaction. - // (1) new head block event arrival, the interrupt signal is 1 - // (2) worker start or restart, the interrupt signal is 1 - // (3) worker recreate the mining block with any newly arrived transactions, the interrupt signal is 2. - // For the first two cases, the semi-finished work will be discarded. - // For the third case, the semi-finished work will be submitted to the consensus engine. - if interrupt != nil && atomic.LoadInt32(interrupt) != commitInterruptNone { - // Notify resubmit loop to increase resubmitting interval due to too frequent commits. - if atomic.LoadInt32(interrupt) == commitInterruptResubmit { - ratio := float64(gasLimit-w.current.gasPool.Gas()) / float64(gasLimit) - if ratio < 0.1 { - ratio = 0.1 - } - w.resubmitAdjustCh <- &intervalAdjust{ - ratio: ratio, - inc: true, - } - } - return atomic.LoadInt32(interrupt) == commitInterruptNewHead - } - // If we don't have enough gas for any further transactions then we're done - if w.current.gasPool.Gas() < params.TxGas { - log.Trace("Not enough gas for further transactions", "have", w.current.gasPool, "want", params.TxGas) - break - } - // Retrieve the next transaction and abort if all done - tx := txs.Peek() - if tx == nil { - break - } - // Error may be ignored here. The error has already been checked - // during transaction acceptance is the transaction pool. - // - // We use the eip155 signer regardless of the current hf. - from, _ := types.Sender(w.current.signer, tx) - // Check whether the tx is replay protected. If we're not in the EIP155 hf - // phase, start ignoring the sender until we do. - if tx.Protected() && !w.chainConfig.IsEIP155(w.current.header.Number) { - log.Trace("Ignoring reply protected transaction", "hash", tx.Hash(), "eip155", w.chainConfig.EIP155Block) - - txs.Pop() - continue - } - // Start executing the transaction - w.current.state.Prepare(tx.Hash(), w.current.tcount) - - logs, err := w.commitTransaction(tx, coinbase) - switch { - case errors.Is(err, core.ErrGasLimitReached): - // Pop the current out-of-gas transaction without shifting in the next from the account - log.Trace("Gas limit exceeded for current block", "sender", from) - txs.Pop() - - case errors.Is(err, core.ErrNonceTooLow): - // New head notification data race between the transaction pool and miner, shift - log.Trace("Skipping transaction with low nonce", "sender", from, "nonce", tx.Nonce()) - txs.Shift() - - case errors.Is(err, core.ErrNonceTooHigh): - // Reorg notification data race between the transaction pool and miner, skip account = - log.Trace("Skipping account with hight nonce", "sender", from, "nonce", tx.Nonce()) - txs.Pop() - - case errors.Is(err, nil): - // Everything ok, collect the logs and shift in the next transaction from the same account - coalescedLogs = append(coalescedLogs, logs...) - w.current.tcount++ - txs.Shift() - - case errors.Is(err, core.ErrTxTypeNotSupported): - // Pop the unsupported transaction without shifting in the next from the account - log.Trace("Skipping unsupported transaction type", "sender", from, "type", tx.Type()) - txs.Pop() - - default: - // Strange error, discard the transaction and get the next in line (note, the - // nonce-too-high clause will prevent us from executing in vain). - log.Debug("Transaction failed, account skipped", "hash", tx.Hash(), "err", err) - txs.Shift() - } + var bs blockState + bs = blockState{ + state: w.current.state, + logs: nil, + worker: w, + coinbase: w.coinbase, + baseFee: w.current.header.BaseFee, + signer: w.current.signer, + interrupt: interrupt, + resubmitAdjustHandled: false, } + var collator = &DefaultCollator{} - if !w.isRunning() && len(coalescedLogs) > 0 { - // We don't push the pendingLogsEvent while we are mining. The reason is that - // when we are mining, the worker will regenerate a mining block every 3 seconds. - // In order to avoid pushing the repeated pendingLog, we disable the pending log pushing. - - // make a copy, the state caches the logs and these logs get "upgraded" from pending to mined - // logs by filling in the block hash when the block was mined by the local miner. This can - // cause a race condition if a log was "upgraded" before the PendingLogsEvent is processed. - cpy := make([]*types.Log, len(coalescedLogs)) - for i, l := range coalescedLogs { - cpy[i] = new(types.Log) - *cpy[i] = *l - } - w.pendingLogsFeed.Send(cpy) - } - // Notify resubmit loop to decrease resubmitting interval if current interval is larger - // than the user-specified one. - if interrupt != nil { - w.resubmitAdjustCh <- &intervalAdjust{inc: false} + if collator.CollateBlock(&bs, w.eth.TxPool()) { + return true } + + bs.Commit() return false } @@ -991,26 +891,11 @@ func (w *worker) commitNewWork(interrupt *int32, noempty bool, timestamp int64) w.updateSnapshot() return } - // Split the pending transactions into locals and remotes - localTxs, remoteTxs := make(map[common.Address]types.Transactions), pending - for _, account := range w.eth.TxPool().Locals() { - if txs := remoteTxs[account]; len(txs) > 0 { - delete(remoteTxs, account) - localTxs[account] = txs - } - } - if len(localTxs) > 0 { - txs := types.NewTransactionsByPriceAndNonce(w.current.signer, localTxs, header.BaseFee) - if w.commitTransactions(txs, w.coinbase, interrupt) { - return - } - } - if len(remoteTxs) > 0 { - txs := types.NewTransactionsByPriceAndNonce(w.current.signer, remoteTxs, header.BaseFee) - if w.commitTransactions(txs, w.coinbase, interrupt) { - return - } + + if w.collateBlock(w.coinbase, interrupt) { + return } + w.commit(uncles, w.fullTaskHook, true, tstart) } @@ -1064,6 +949,39 @@ func (w *worker) postSideBlock(event core.ChainSideEvent) { } } +func (w *worker) commitTransactionsToPending(txs map[common.Address]types.Transactions) { + // Short circuit if current is nil + if w.current == nil { + return + } + gasLimit := w.current.header.GasLimit + if w.current.gasPool == nil { + w.current.gasPool = new(core.GasPool).AddGas(gasLimit) + } + var bs blockState + bs = blockState{ + state: w.current.state, + logs: nil, + worker: w, + coinbase: w.coinbase, + baseFee: w.current.header.BaseFee, + signer: w.current.signer, + interrupt: nil, + resubmitAdjustHandled: false, + } + + // try to commit all transactions to the pending state. won't return an error + // because the recommit interrupt only applies when sealing + tcount := w.current.tcount + submitTransactions(&bs, types.NewTransactionsByPriceAndNonce(bs.Signer(), txs, bs.BaseFee())) + // Only update the snapshot if any new transactons were added + // to the pending block + if tcount != w.current.tcount { + w.updateSnapshot() + } + bs.Commit() +} + // totalFees computes total consumed miner fees in ETH. Block transactions and receipts have to have the same order. func totalFees(block *types.Block, receipts []*types.Receipt) *big.Float { feesWei := new(big.Int)