From c9fc3655bdb0af130bc34378b99b7f7f91b01df8 Mon Sep 17 00:00:00 2001 From: Jared Wasinger Date: Tue, 27 Jul 2021 20:46:34 +0200 Subject: [PATCH 01/13] introduce block collator abstraction in miner. implementation of default block collator which has current miner block collation behavior. Co-authored-by: Martin Holst Swende Co-authored-by: Jared Wasinger --- miner/collator.go | 245 ++++++++++++++++++++++++++++++++++++++++++++++ miner/worker.go | 167 ++++++++----------------------- 2 files changed, 285 insertions(+), 127 deletions(-) create mode 100644 miner/collator.go diff --git a/miner/collator.go b/miner/collator.go new file mode 100644 index 000000000000..54b9aa15bc9d --- /dev/null +++ b/miner/collator.go @@ -0,0 +1,245 @@ +// Copyright 2015 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package miner + +import ( + "errors" + "math/big" + "sync/atomic" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/state" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/params" +) + +// BlockState represents a block-to-be-mined, which is being assembled. A collator +// can add transactions, and finish by calling Commit. +type BlockState interface { + // AddTransactions adds the sequence of transactions to the blockstate. Either all + // transactions are added, or none of them. In the latter case, the error + // describes the reason why the txs could not be included. + AddTransactions(sequence types.Transactions) error + // Commit signals that the collation is finished, and the block is ready + // to be sealed. After calling Commit, no more transactions may be added. + Commit() + + Gas() (remaining uint64) + Coinbase() common.Address + BaseFee() *big.Int + Signer() types.Signer +} + +// Collator is something that can assemble a block. +type Collator interface { + CollateBlock(bs BlockState, txs types.Transactions, interrupt *int32, isSealing bool) error +} + +// Pool is an interface to the transaction pool +type Pool interface { + Pending(bool) (map[common.Address]types.Transactions, error) + Locals() []common.Address +} + +// blockState is an implementation of BlockState +type blockState struct { + state *state.StateDB + logs []*types.Log + worker *worker + coinbase common.Address + baseFee *big.Int + signer types.Signer +} + +// Coinbase returns the miner-address of the block being mined +func (bs *blockState) Coinbase() common.Address { + return bs.coinbase +} + +// Basefee returns the basefee for the current block +func (bs *blockState) BaseFee() *big.Int { + return bs.baseFee +} + +// Signer returns the block-specific signer +func (bs *blockState) Signer() types.Signer { + return bs.signer +} + +func (bs *blockState) Commit() { + w := bs.worker + // We don't push the pendingLogsEvent while we are mining. The reason is that + // when we are mining, the worker will regenerate a mining block every 3 seconds. + // In order to avoid pushing the repeated pendingLog, we disable the pending log pushing. + if !w.isRunning() && len(bs.logs) > 0 { + + // make a copy, the state caches the logs and these logs get "upgraded" from pending to mined + // logs by filling in the block hash when the block was mined by the local miner. This can + // cause a race condition if a log was "upgraded" before the PendingLogsEvent is processed. + cpy := make([]*types.Log, len(bs.logs)) + for i, l := range bs.logs { + cpy[i] = new(types.Log) + *cpy[i] = *l + } + w.pendingLogsFeed.Send(cpy) + } +} + +func (bs *blockState) AddTransactions(sequence types.Transactions) error { + var ( + w = bs.worker + snap = w.current.state.Snapshot() + err error + logs []*types.Log + tcount = w.current.tcount + ) + for _, tx := range sequence { + if w.current.gasPool.Gas() < params.TxGas { + log.Trace("Not enough gas for further transactions", "have", w.current.gasPool, "want", params.TxGas) + return core.ErrGasLimitReached + } + from, _ := types.Sender(w.current.signer, tx) + + // Check whether the tx is replay protected. If we're not in the EIP155 hf + // phase, start ignoring the sender until we do. + if tx.Protected() && !w.chainConfig.IsEIP155(w.current.header.Number) { + log.Trace("Ignoring reply protected transaction", "hash", tx.Hash(), "eip155", w.chainConfig.EIP155Block) + continue + } + + // Start executing the transaction + bs.state.Prepare(tx.Hash(), w.current.tcount) + var txLogs []*types.Log + txLogs, err = w.commitTransaction(tx, bs.Coinbase()) + if err == nil { + logs = append(logs, txLogs...) + tcount++ + } else { + log.Trace("Tx block inclusion failed", "sender", from, "nonce", tx.Nonce(), + "type", tx.Type(), "hash", tx.Hash(), "err", err) + break + } + } + if err != nil { + bs.state.RevertToSnapshot(snap) + } else { + bs.logs = append(bs.logs, logs...) + w.current.tcount = tcount + } + return err +} + +func (bs *blockState) Gas() (remaining uint64) { + return bs.worker.current.gasPool.Gas() +} + +// The DefaultCollator is the 'normal' block collator. It assembles a block +// based on transaction price ordering. +type DefaultCollator struct { + pool Pool +} + +var ( + ErrResubmitIntervalElapsed = errors.New("recommit interval elapsed") + ErrNewHead = errors.New("new chain head received") + ErrNoCurrentEnv = errors.New("missing env for mining") +) + +func (w *DefaultCollator) submit(bs BlockState, txs *types.TransactionsByPriceAndNonce, interrupt *int32) error { + for { + if interrupt != nil && atomic.LoadInt32(interrupt) != commitInterruptNone { + if atomic.LoadInt32(interrupt) == commitInterruptResubmit { + return ErrResubmitIntervalElapsed + } else { + return ErrNewHead + } + } + + // If we don't have enough gas for any further transactions then we're done + available := bs.Gas() + if available < params.TxGas { + break + } + + // Retrieve the next transaction and abort if all done + tx := txs.Peek() + if tx == nil { + break + } + + // Enough space for this tx? + if available < tx.Gas() { + txs.Pop() + continue + } + + // Error may be ignored here. The error has already been checked + // during transaction acceptance is the transaction pool. + err := bs.AddTransactions(types.Transactions{tx}) + switch { + case errors.Is(err, core.ErrGasLimitReached): + fallthrough + case errors.Is(err, core.ErrTxTypeNotSupported): + fallthrough + case errors.Is(err, core.ErrNonceTooHigh): + txs.Pop() + case errors.Is(err, core.ErrNonceTooLow): + fallthrough + case errors.Is(err, nil): + fallthrough + default: + txs.Shift() + } + } + + return nil +} + +// CollateBlock fills a block based on the highest paying transactions from the +// transaction pool, giving precedence over 'local' transactions. +func (w *DefaultCollator) CollateBlock(bs BlockState, txs map[common.Address]types.Transactions, interrupt *int32, isSealing bool) error { + if isSealing { + // Split the pending transactions into locals and remotes + localTxs, remoteTxs := make(map[common.Address]types.Transactions), txs + for _, account := range w.pool.Locals() { + if accountTxs := remoteTxs[account]; len(accountTxs) > 0 { + delete(remoteTxs, account) + localTxs[account] = accountTxs + } + } + + if len(localTxs) > 0 { + if err := w.submit(bs, types.NewTransactionsByPriceAndNonce(bs.Signer(), localTxs, bs.BaseFee()), interrupt); err != nil { + return err + } + } + + if len(remoteTxs) > 0 { + if err := w.submit(bs, types.NewTransactionsByPriceAndNonce(bs.Signer(), remoteTxs, bs.BaseFee()), interrupt); err != nil { + return err + } + } + } else { + // ignore resubmit interval elapse here (only used when sealing) + w.submit(bs, types.NewTransactionsByPriceAndNonce(bs.Signer(), txs, bs.BaseFee()), nil) + } + + bs.Commit() + return nil +} diff --git a/miner/worker.go b/miner/worker.go index accf3dac9096..85d7e4ef91f2 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -514,9 +514,11 @@ func (w *worker) mainLoop() { acc, _ := types.Sender(w.current.signer, tx) txs[acc] = append(txs[acc], tx) } - txset := types.NewTransactionsByPriceAndNonce(w.current.signer, txs, w.current.header.BaseFee) tcount := w.current.tcount - w.commitTransactions(txset, coinbase, nil) + + // can ignore returned error (interrupted by new chain head) + w.collateBlock(txs, coinbase, nil, false) + // Only update the snapshot if any new transactons were added // to the pending block if tcount != w.current.tcount { @@ -763,123 +765,29 @@ func (w *worker) commitTransaction(tx *types.Transaction, coinbase common.Addres return receipt.Logs, nil } -func (w *worker) commitTransactions(txs *types.TransactionsByPriceAndNonce, coinbase common.Address, interrupt *int32) bool { +func (w *worker) collateBlock(txs map[common.Address]types.Transactions, coinbase common.Address, interrupt *int32, isSealing bool) error { // Short circuit if current is nil if w.current == nil { - return true + return ErrNoCurrentEnv } - gasLimit := w.current.header.GasLimit if w.current.gasPool == nil { w.current.gasPool = new(core.GasPool).AddGas(gasLimit) } - - var coalescedLogs []*types.Log - - for { - // In the following three cases, we will interrupt the execution of the transaction. - // (1) new head block event arrival, the interrupt signal is 1 - // (2) worker start or restart, the interrupt signal is 1 - // (3) worker recreate the mining block with any newly arrived transactions, the interrupt signal is 2. - // For the first two cases, the semi-finished work will be discarded. - // For the third case, the semi-finished work will be submitted to the consensus engine. - if interrupt != nil && atomic.LoadInt32(interrupt) != commitInterruptNone { - // Notify resubmit loop to increase resubmitting interval due to too frequent commits. - if atomic.LoadInt32(interrupt) == commitInterruptResubmit { - ratio := float64(gasLimit-w.current.gasPool.Gas()) / float64(gasLimit) - if ratio < 0.1 { - ratio = 0.1 - } - w.resubmitAdjustCh <- &intervalAdjust{ - ratio: ratio, - inc: true, - } - } - return atomic.LoadInt32(interrupt) == commitInterruptNewHead - } - // If we don't have enough gas for any further transactions then we're done - if w.current.gasPool.Gas() < params.TxGas { - log.Trace("Not enough gas for further transactions", "have", w.current.gasPool, "want", params.TxGas) - break - } - // Retrieve the next transaction and abort if all done - tx := txs.Peek() - if tx == nil { - break - } - // Error may be ignored here. The error has already been checked - // during transaction acceptance is the transaction pool. - // - // We use the eip155 signer regardless of the current hf. - from, _ := types.Sender(w.current.signer, tx) - // Check whether the tx is replay protected. If we're not in the EIP155 hf - // phase, start ignoring the sender until we do. - if tx.Protected() && !w.chainConfig.IsEIP155(w.current.header.Number) { - log.Trace("Ignoring reply protected transaction", "hash", tx.Hash(), "eip155", w.chainConfig.EIP155Block) - - txs.Pop() - continue - } - // Start executing the transaction - w.current.state.Prepare(tx.Hash(), w.current.tcount) - - logs, err := w.commitTransaction(tx, coinbase) - switch { - case errors.Is(err, core.ErrGasLimitReached): - // Pop the current out-of-gas transaction without shifting in the next from the account - log.Trace("Gas limit exceeded for current block", "sender", from) - txs.Pop() - - case errors.Is(err, core.ErrNonceTooLow): - // New head notification data race between the transaction pool and miner, shift - log.Trace("Skipping transaction with low nonce", "sender", from, "nonce", tx.Nonce()) - txs.Shift() - - case errors.Is(err, core.ErrNonceTooHigh): - // Reorg notification data race between the transaction pool and miner, skip account = - log.Trace("Skipping account with hight nonce", "sender", from, "nonce", tx.Nonce()) - txs.Pop() - - case errors.Is(err, nil): - // Everything ok, collect the logs and shift in the next transaction from the same account - coalescedLogs = append(coalescedLogs, logs...) - w.current.tcount++ - txs.Shift() - - case errors.Is(err, core.ErrTxTypeNotSupported): - // Pop the unsupported transaction without shifting in the next from the account - log.Trace("Skipping unsupported transaction type", "sender", from, "type", tx.Type()) - txs.Pop() - - default: - // Strange error, discard the transaction and get the next in line (note, the - // nonce-too-high clause will prevent us from executing in vain). - log.Debug("Transaction failed, account skipped", "hash", tx.Hash(), "err", err) - txs.Shift() - } + var bs BlockState + bs = &blockState{ + state: w.current.state, + logs: nil, + worker: w, + coinbase: w.current.header.Coinbase, + baseFee: w.current.header.BaseFee, + signer: w.current.signer, } - - if !w.isRunning() && len(coalescedLogs) > 0 { - // We don't push the pendingLogsEvent while we are mining. The reason is that - // when we are mining, the worker will regenerate a mining block every 3 seconds. - // In order to avoid pushing the repeated pendingLog, we disable the pending log pushing. - - // make a copy, the state caches the logs and these logs get "upgraded" from pending to mined - // logs by filling in the block hash when the block was mined by the local miner. This can - // cause a race condition if a log was "upgraded" before the PendingLogsEvent is processed. - cpy := make([]*types.Log, len(coalescedLogs)) - for i, l := range coalescedLogs { - cpy[i] = new(types.Log) - *cpy[i] = *l - } - w.pendingLogsFeed.Send(cpy) + var collator = &DefaultCollator{ + pool: w.eth.TxPool(), } - // Notify resubmit loop to decrease resubmitting interval if current interval is larger - // than the user-specified one. - if interrupt != nil { - w.resubmitAdjustCh <- &intervalAdjust{inc: false} - } - return false + + return collator.CollateBlock(bs, txs, interrupt, isSealing) } // commitNewWork generates several new sealing tasks based on the parent block. @@ -991,26 +899,31 @@ func (w *worker) commitNewWork(interrupt *int32, noempty bool, timestamp int64) w.updateSnapshot() return } - // Split the pending transactions into locals and remotes - localTxs, remoteTxs := make(map[common.Address]types.Transactions), pending - for _, account := range w.eth.TxPool().Locals() { - if txs := remoteTxs[account]; len(txs) > 0 { - delete(remoteTxs, account) - localTxs[account] = txs - } - } - if len(localTxs) > 0 { - txs := types.NewTransactionsByPriceAndNonce(w.current.signer, localTxs, header.BaseFee) - if w.commitTransactions(txs, w.coinbase, interrupt) { - return + + err = w.collateBlock(pending, w.coinbase, interrupt, true) + if err != nil { + if err == ErrResubmitIntervalElapsed { + // Notify resubmit loop to increase resubmitting interval due to too frequent commits. + gasLimit := w.current.header.GasLimit + ratio := float64(gasLimit-w.current.gasPool.Gas()) / float64(gasLimit) + if ratio < 0.1 { + ratio = 0.1 + } + w.resubmitAdjustCh <- &intervalAdjust{ + ratio: ratio, + inc: true, + } } + + return } - if len(remoteTxs) > 0 { - txs := types.NewTransactionsByPriceAndNonce(w.current.signer, remoteTxs, header.BaseFee) - if w.commitTransactions(txs, w.coinbase, interrupt) { - return - } + + if interrupt != nil { + // Notify resubmit loop to decrease resubmitting interval if current interval is larger + // than the user-specified one. + w.resubmitAdjustCh <- &intervalAdjust{inc: false} } + w.commit(uncles, w.fullTaskHook, true, tstart) } From 85a107642c42ff77c66e44abdc46fd40a644c49a Mon Sep 17 00:00:00 2001 From: Jared Wasinger Date: Wed, 28 Jul 2021 11:33:25 +0200 Subject: [PATCH 02/13] fix error handling in AddTransactions --- miner/collator.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/miner/collator.go b/miner/collator.go index 54b9aa15bc9d..544865355b8d 100644 --- a/miner/collator.go +++ b/miner/collator.go @@ -112,7 +112,8 @@ func (bs *blockState) AddTransactions(sequence types.Transactions) error { for _, tx := range sequence { if w.current.gasPool.Gas() < params.TxGas { log.Trace("Not enough gas for further transactions", "have", w.current.gasPool, "want", params.TxGas) - return core.ErrGasLimitReached + err = core.ErrGasLimitReached + break } from, _ := types.Sender(w.current.signer, tx) @@ -120,7 +121,8 @@ func (bs *blockState) AddTransactions(sequence types.Transactions) error { // phase, start ignoring the sender until we do. if tx.Protected() && !w.chainConfig.IsEIP155(w.current.header.Number) { log.Trace("Ignoring reply protected transaction", "hash", tx.Hash(), "eip155", w.chainConfig.EIP155Block) - continue + err = ErrTxProtectionDisabled + break } // Start executing the transaction @@ -159,6 +161,7 @@ var ( ErrResubmitIntervalElapsed = errors.New("recommit interval elapsed") ErrNewHead = errors.New("new chain head received") ErrNoCurrentEnv = errors.New("missing env for mining") + ErrTxProtectionDisabled = errors.New("eip155-compatible tx provided when chain config does not support it") ) func (w *DefaultCollator) submit(bs BlockState, txs *types.TransactionsByPriceAndNonce, interrupt *int32) error { From af4ec1e7ac0eb5cbedd0b3d3124a7ef228addd23 Mon Sep 17 00:00:00 2001 From: Jared Wasinger Date: Wed, 28 Jul 2021 11:56:45 +0200 Subject: [PATCH 03/13] remove unecessary empty lines --- miner/collator.go | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/miner/collator.go b/miner/collator.go index 544865355b8d..eac63c713f29 100644 --- a/miner/collator.go +++ b/miner/collator.go @@ -88,7 +88,6 @@ func (bs *blockState) Commit() { // when we are mining, the worker will regenerate a mining block every 3 seconds. // In order to avoid pushing the repeated pendingLog, we disable the pending log pushing. if !w.isRunning() && len(bs.logs) > 0 { - // make a copy, the state caches the logs and these logs get "upgraded" from pending to mined // logs by filling in the block hash when the block was mined by the local miner. This can // cause a race condition if a log was "upgraded" before the PendingLogsEvent is processed. @@ -124,7 +123,6 @@ func (bs *blockState) AddTransactions(sequence types.Transactions) error { err = ErrTxProtectionDisabled break } - // Start executing the transaction bs.state.Prepare(tx.Hash(), w.current.tcount) var txLogs []*types.Log @@ -173,25 +171,21 @@ func (w *DefaultCollator) submit(bs BlockState, txs *types.TransactionsByPriceAn return ErrNewHead } } - // If we don't have enough gas for any further transactions then we're done available := bs.Gas() if available < params.TxGas { break } - // Retrieve the next transaction and abort if all done tx := txs.Peek() if tx == nil { break } - // Enough space for this tx? if available < tx.Gas() { txs.Pop() continue } - // Error may be ignored here. The error has already been checked // during transaction acceptance is the transaction pool. err := bs.AddTransactions(types.Transactions{tx}) @@ -210,7 +204,6 @@ func (w *DefaultCollator) submit(bs BlockState, txs *types.TransactionsByPriceAn txs.Shift() } } - return nil } @@ -226,13 +219,11 @@ func (w *DefaultCollator) CollateBlock(bs BlockState, txs map[common.Address]typ localTxs[account] = accountTxs } } - if len(localTxs) > 0 { if err := w.submit(bs, types.NewTransactionsByPriceAndNonce(bs.Signer(), localTxs, bs.BaseFee()), interrupt); err != nil { return err } } - if len(remoteTxs) > 0 { if err := w.submit(bs, types.NewTransactionsByPriceAndNonce(bs.Signer(), remoteTxs, bs.BaseFee()), interrupt); err != nil { return err @@ -242,7 +233,6 @@ func (w *DefaultCollator) CollateBlock(bs BlockState, txs map[common.Address]typ // ignore resubmit interval elapse here (only used when sealing) w.submit(bs, types.NewTransactionsByPriceAndNonce(bs.Signer(), txs, bs.BaseFee()), nil) } - bs.Commit() return nil } From 242f7f6d88bfe7cf5ff1d56b48d35ebfffe29f83 Mon Sep 17 00:00:00 2001 From: Jared Wasinger Date: Wed, 28 Jul 2021 12:32:52 +0200 Subject: [PATCH 04/13] remove unecessary spaces. make pool parameter of CollateBlock. remove txs as parameter from CollateBlock --- miner/collator.go | 17 +++++++++++------ miner/worker.go | 15 +++++++-------- 2 files changed, 18 insertions(+), 14 deletions(-) diff --git a/miner/collator.go b/miner/collator.go index eac63c713f29..219cfd51d8a0 100644 --- a/miner/collator.go +++ b/miner/collator.go @@ -48,7 +48,7 @@ type BlockState interface { // Collator is something that can assemble a block. type Collator interface { - CollateBlock(bs BlockState, txs types.Transactions, interrupt *int32, isSealing bool) error + CollateBlock(bs BlockState, interrupt *int32, isSealing bool) error } // Pool is an interface to the transaction pool @@ -151,9 +151,7 @@ func (bs *blockState) Gas() (remaining uint64) { // The DefaultCollator is the 'normal' block collator. It assembles a block // based on transaction price ordering. -type DefaultCollator struct { - pool Pool -} +type DefaultCollator struct{} var ( ErrResubmitIntervalElapsed = errors.New("recommit interval elapsed") @@ -209,11 +207,18 @@ func (w *DefaultCollator) submit(bs BlockState, txs *types.TransactionsByPriceAn // CollateBlock fills a block based on the highest paying transactions from the // transaction pool, giving precedence over 'local' transactions. -func (w *DefaultCollator) CollateBlock(bs BlockState, txs map[common.Address]types.Transactions, interrupt *int32, isSealing bool) error { +func (w *DefaultCollator) CollateBlock(bs BlockState, pool Pool, interrupt *int32, isSealing bool) error { + txs, err := pool.Pending(true) + if err != nil { + return err + } + if len(txs) == 0 { + return nil + } if isSealing { // Split the pending transactions into locals and remotes localTxs, remoteTxs := make(map[common.Address]types.Transactions), txs - for _, account := range w.pool.Locals() { + for _, account := range pool.Locals() { if accountTxs := remoteTxs[account]; len(accountTxs) > 0 { delete(remoteTxs, account) localTxs[account] = accountTxs diff --git a/miner/worker.go b/miner/worker.go index 85d7e4ef91f2..5f7b05ac3c8e 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -517,7 +517,7 @@ func (w *worker) mainLoop() { tcount := w.current.tcount // can ignore returned error (interrupted by new chain head) - w.collateBlock(txs, coinbase, nil, false) + w.collateBlock(coinbase, nil, false) // Only update the snapshot if any new transactons were added // to the pending block @@ -765,7 +765,7 @@ func (w *worker) commitTransaction(tx *types.Transaction, coinbase common.Addres return receipt.Logs, nil } -func (w *worker) collateBlock(txs map[common.Address]types.Transactions, coinbase common.Address, interrupt *int32, isSealing bool) error { +func (w *worker) collateBlock(coinbase common.Address, interrupt *int32, isSealing bool) error { // Short circuit if current is nil if w.current == nil { return ErrNoCurrentEnv @@ -783,11 +783,9 @@ func (w *worker) collateBlock(txs map[common.Address]types.Transactions, coinbas baseFee: w.current.header.BaseFee, signer: w.current.signer, } - var collator = &DefaultCollator{ - pool: w.eth.TxPool(), - } + var collator = &DefaultCollator{} - return collator.CollateBlock(bs, txs, interrupt, isSealing) + return collator.CollateBlock(bs, w.eth.TxPool(), interrupt, isSealing) } // commitNewWork generates several new sealing tasks based on the parent block. @@ -900,7 +898,7 @@ func (w *worker) commitNewWork(interrupt *int32, noempty bool, timestamp int64) return } - err = w.collateBlock(pending, w.coinbase, interrupt, true) + err = w.collateBlock(w.coinbase, interrupt, true) if err != nil { if err == ErrResubmitIntervalElapsed { // Notify resubmit loop to increase resubmitting interval due to too frequent commits. @@ -913,8 +911,9 @@ func (w *worker) commitNewWork(interrupt *int32, noempty bool, timestamp int64) ratio: ratio, inc: true, } + } else if err != ErrNewHead { + log.Error("CollateBlock failed", "err", err) } - return } From f4718004ef87a1440ed8f526cfb7eaf1542a5afd Mon Sep 17 00:00:00 2001 From: Jared Wasinger Date: Wed, 28 Jul 2021 12:48:21 +0200 Subject: [PATCH 05/13] propagate tx change to collator interface --- miner/collator.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/miner/collator.go b/miner/collator.go index 219cfd51d8a0..899b96c7d14d 100644 --- a/miner/collator.go +++ b/miner/collator.go @@ -48,7 +48,7 @@ type BlockState interface { // Collator is something that can assemble a block. type Collator interface { - CollateBlock(bs BlockState, interrupt *int32, isSealing bool) error + CollateBlock(bs BlockState, pool Pool, interrupt *int32, isSealing bool) error } // Pool is an interface to the transaction pool From 249be343cbec3d84e0b4f9f7b7f9b7174a01bcbc Mon Sep 17 00:00:00 2001 From: Jared Wasinger Date: Wed, 28 Jul 2021 20:41:55 +0200 Subject: [PATCH 06/13] remove isSealing flag from CollateBlock. CollateBlock only called during sealing now --- miner/collator.go | 43 +++++++++++++++++++++---------------------- miner/worker.go | 43 +++++++++++++++++++++++++++++++++++-------- 2 files changed, 56 insertions(+), 30 deletions(-) diff --git a/miner/collator.go b/miner/collator.go index 899b96c7d14d..d4d8f89f0f1e 100644 --- a/miner/collator.go +++ b/miner/collator.go @@ -48,7 +48,7 @@ type BlockState interface { // Collator is something that can assemble a block. type Collator interface { - CollateBlock(bs BlockState, pool Pool, interrupt *int32, isSealing bool) error + CollateBlock(bs BlockState, pool Pool, interrupt *int32) error } // Pool is an interface to the transaction pool @@ -160,7 +160,7 @@ var ( ErrTxProtectionDisabled = errors.New("eip155-compatible tx provided when chain config does not support it") ) -func (w *DefaultCollator) submit(bs BlockState, txs *types.TransactionsByPriceAndNonce, interrupt *int32) error { +func SubmitTransactions(bs BlockState, txs *types.TransactionsByPriceAndNonce, interrupt *int32) error { for { if interrupt != nil && atomic.LoadInt32(interrupt) != commitInterruptNone { if atomic.LoadInt32(interrupt) == commitInterruptResubmit { @@ -202,12 +202,14 @@ func (w *DefaultCollator) submit(bs BlockState, txs *types.TransactionsByPriceAn txs.Shift() } } + + bs.Commit() return nil } // CollateBlock fills a block based on the highest paying transactions from the // transaction pool, giving precedence over 'local' transactions. -func (w *DefaultCollator) CollateBlock(bs BlockState, pool Pool, interrupt *int32, isSealing bool) error { +func (w *DefaultCollator) CollateBlock(bs BlockState, pool Pool, interrupt *int32) error { txs, err := pool.Pending(true) if err != nil { return err @@ -215,29 +217,26 @@ func (w *DefaultCollator) CollateBlock(bs BlockState, pool Pool, interrupt *int3 if len(txs) == 0 { return nil } - if isSealing { - // Split the pending transactions into locals and remotes - localTxs, remoteTxs := make(map[common.Address]types.Transactions), txs - for _, account := range pool.Locals() { - if accountTxs := remoteTxs[account]; len(accountTxs) > 0 { - delete(remoteTxs, account) - localTxs[account] = accountTxs - } + + // Split the pending transactions into locals and remotes + localTxs, remoteTxs := make(map[common.Address]types.Transactions), txs + for _, account := range pool.Locals() { + if accountTxs := remoteTxs[account]; len(accountTxs) > 0 { + delete(remoteTxs, account) + localTxs[account] = accountTxs } - if len(localTxs) > 0 { - if err := w.submit(bs, types.NewTransactionsByPriceAndNonce(bs.Signer(), localTxs, bs.BaseFee()), interrupt); err != nil { - return err - } + } + if len(localTxs) > 0 { + if err := SubmitTransactions(bs, types.NewTransactionsByPriceAndNonce(bs.Signer(), localTxs, bs.BaseFee()), interrupt); err != nil { + return err } - if len(remoteTxs) > 0 { - if err := w.submit(bs, types.NewTransactionsByPriceAndNonce(bs.Signer(), remoteTxs, bs.BaseFee()), interrupt); err != nil { - return err - } + } + if len(remoteTxs) > 0 { + if err := SubmitTransactions(bs, types.NewTransactionsByPriceAndNonce(bs.Signer(), remoteTxs, bs.BaseFee()), interrupt); err != nil { + return err } - } else { - // ignore resubmit interval elapse here (only used when sealing) - w.submit(bs, types.NewTransactionsByPriceAndNonce(bs.Signer(), txs, bs.BaseFee()), nil) } + bs.Commit() return nil } diff --git a/miner/worker.go b/miner/worker.go index 5f7b05ac3c8e..df96df6ab00f 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -505,9 +505,6 @@ func (w *worker) mainLoop() { if gp := w.current.gasPool; gp != nil && gp.Gas() < params.TxGas { continue } - w.mu.RLock() - coinbase := w.coinbase - w.mu.RUnlock() txs := make(map[common.Address]types.Transactions) for _, tx := range ev.Txs { @@ -516,8 +513,7 @@ func (w *worker) mainLoop() { } tcount := w.current.tcount - // can ignore returned error (interrupted by new chain head) - w.collateBlock(coinbase, nil, false) + w.commitTransactionsToPending(txs) // Only update the snapshot if any new transactons were added // to the pending block @@ -765,7 +761,7 @@ func (w *worker) commitTransaction(tx *types.Transaction, coinbase common.Addres return receipt.Logs, nil } -func (w *worker) collateBlock(coinbase common.Address, interrupt *int32, isSealing bool) error { +func (w *worker) collateBlock(coinbase common.Address, interrupt *int32) error { // Short circuit if current is nil if w.current == nil { return ErrNoCurrentEnv @@ -785,7 +781,7 @@ func (w *worker) collateBlock(coinbase common.Address, interrupt *int32, isSeali } var collator = &DefaultCollator{} - return collator.CollateBlock(bs, w.eth.TxPool(), interrupt, isSealing) + return collator.CollateBlock(bs, w.eth.TxPool(), interrupt) } // commitNewWork generates several new sealing tasks based on the parent block. @@ -898,7 +894,7 @@ func (w *worker) commitNewWork(interrupt *int32, noempty bool, timestamp int64) return } - err = w.collateBlock(w.coinbase, interrupt, true) + err = w.collateBlock(w.coinbase, interrupt) if err != nil { if err == ErrResubmitIntervalElapsed { // Notify resubmit loop to increase resubmitting interval due to too frequent commits. @@ -976,6 +972,37 @@ func (w *worker) postSideBlock(event core.ChainSideEvent) { } } +func (w *worker) commitTransactionsToPending(txs map[common.Address]types.Transactions) { + // Short circuit if current is nil + if w.current == nil { + return + } + gasLimit := w.current.header.GasLimit + if w.current.gasPool == nil { + w.current.gasPool = new(core.GasPool).AddGas(gasLimit) + } + var bs BlockState + bs = &blockState{ + state: w.current.state, + logs: nil, + worker: w, + coinbase: w.current.header.Coinbase, + baseFee: w.current.header.BaseFee, + signer: w.current.signer, + } + + txs, err := w.eth.TxPool().Pending(true) + if err != nil { + log.Trace("error getting pending txs from the pool", "err", err) + return + } + if len(txs) == 0 { + return + } + + SubmitTransactions(bs, types.NewTransactionsByPriceAndNonce(bs.Signer(), txs, bs.BaseFee()), nil) +} + // totalFees computes total consumed miner fees in ETH. Block transactions and receipts have to have the same order. func totalFees(block *types.Block, receipts []*types.Receipt) *big.Float { feesWei := new(big.Int) From 2b28a20bbdf59c37ff51087045fd452cb44c6105 Mon Sep 17 00:00:00 2001 From: Jared Wasinger Date: Thu, 29 Jul 2021 21:17:08 +0200 Subject: [PATCH 07/13] move handling of sealing recommit/newhead interrupt into blockstate --- miner/collator.go | 220 +++++++++++++++++++++++++--------------------- miner/worker.go | 73 +++++---------- 2 files changed, 142 insertions(+), 151 deletions(-) diff --git a/miner/collator.go b/miner/collator.go index d4d8f89f0f1e..968e534819ec 100644 --- a/miner/collator.go +++ b/miner/collator.go @@ -32,10 +32,16 @@ import ( // BlockState represents a block-to-be-mined, which is being assembled. A collator // can add transactions, and finish by calling Commit. type BlockState interface { - // AddTransactions adds the sequence of transactions to the blockstate. Either all - // transactions are added, or none of them. In the latter case, the error - // describes the reason why the txs could not be included. - AddTransactions(sequence types.Transactions) error + // AddTransactions attempts to add transactions to the blockstate. + // The following errors are returned + // 1) ErrRecommit- the recommit interval elapses + // 2) ErrNewHead- a new chainHead is received + // For both cases, no more transactions will be added to the block on subsequent + // calls to AddTransactions + // If AddTransactions returns an ErrNewHead during sealing, the collator should abort + // immediately and propogate this error to the caller + // TODO perhaps this should also return a list of (reason, tx_hash) for failing txs + AddTransactions(txs *types.TransactionsByPriceAndNonce) error // Commit signals that the collation is finished, and the block is ready // to be sealed. After calling Commit, no more transactions may be added. Commit() @@ -46,6 +52,10 @@ type BlockState interface { Signer() types.Signer } +var ( + ErrNewHead = errors.New("new chain head received") +) + // Collator is something that can assemble a block. type Collator interface { CollateBlock(bs BlockState, pool Pool, interrupt *int32) error @@ -59,12 +69,13 @@ type Pool interface { // blockState is an implementation of BlockState type blockState struct { - state *state.StateDB - logs []*types.Log - worker *worker - coinbase common.Address - baseFee *big.Int - signer types.Signer + state *state.StateDB + logs []*types.Log + worker *worker + coinbase common.Address + baseFee *big.Int + signer types.Signer + interrupt *int32 } // Coinbase returns the miner-address of the block being mined @@ -84,10 +95,12 @@ func (bs *blockState) Signer() types.Signer { func (bs *blockState) Commit() { w := bs.worker - // We don't push the pendingLogsEvent while we are mining. The reason is that - // when we are mining, the worker will regenerate a mining block every 3 seconds. - // In order to avoid pushing the repeated pendingLog, we disable the pending log pushing. + if !w.isRunning() && len(bs.logs) > 0 { + // We don't push the pendingLogsEvent while we are mining. The reason is that + // when we are mining, the worker will regenerate a mining block every 3 seconds. + // In order to avoid pushing the repeated pendingLog, we disable the pending log pushing. + // make a copy, the state caches the logs and these logs get "upgraded" from pending to mined // logs by filling in the block hash when the block was mined by the local miner. This can // cause a race condition if a log was "upgraded" before the PendingLogsEvent is processed. @@ -98,126 +111,122 @@ func (bs *blockState) Commit() { } w.pendingLogsFeed.Send(cpy) } + // Notify resubmit loop to decrease resubmitting interval if current interval is larger + // than the user-specified one. + if bs.interrupt != nil { + w.resubmitAdjustCh <- &intervalAdjust{inc: false} + } } -func (bs *blockState) AddTransactions(sequence types.Transactions) error { +func (bs *blockState) AddTransactions(txs *types.TransactionsByPriceAndNonce) error { var ( - w = bs.worker - snap = w.current.state.Snapshot() - err error - logs []*types.Log - tcount = w.current.tcount + w = bs.worker + returnErr error + coalescedLogs []*types.Log ) - for _, tx := range sequence { + for { + if bs.interrupt != nil && atomic.LoadInt32(bs.interrupt) != commitInterruptNone { + // Notify resubmit loop to increase resubmitting interval due to too frequent commits. + if atomic.LoadInt32(bs.interrupt) == commitInterruptResubmit { + ratio := float64(w.current.header.GasLimit-w.current.gasPool.Gas()) / float64(w.current.header.GasLimit) + if ratio < 0.1 { + ratio = 0.1 + } + w.resubmitAdjustCh <- &intervalAdjust{ + ratio: ratio, + inc: true, + } + } + if atomic.LoadInt32(bs.interrupt) == commitInterruptNewHead { + returnErr = ErrNewHead + } + break + } + if w.current.gasPool.Gas() < params.TxGas { log.Trace("Not enough gas for further transactions", "have", w.current.gasPool, "want", params.TxGas) - err = core.ErrGasLimitReached break } + // Retrieve the next transaction and abort if all done + tx := txs.Peek() + if tx == nil { + break + } + // Error may be ignored here. The error has already been checked + // during transaction acceptance is the transaction pool. + // + // We use the eip155 signer regardless of the current hf. from, _ := types.Sender(w.current.signer, tx) // Check whether the tx is replay protected. If we're not in the EIP155 hf // phase, start ignoring the sender until we do. if tx.Protected() && !w.chainConfig.IsEIP155(w.current.header.Number) { log.Trace("Ignoring reply protected transaction", "hash", tx.Hash(), "eip155", w.chainConfig.EIP155Block) - err = ErrTxProtectionDisabled - break + continue } + // Start executing the transaction bs.state.Prepare(tx.Hash(), w.current.tcount) - var txLogs []*types.Log - txLogs, err = w.commitTransaction(tx, bs.Coinbase()) - if err == nil { - logs = append(logs, txLogs...) - tcount++ - } else { - log.Trace("Tx block inclusion failed", "sender", from, "nonce", tx.Nonce(), - "type", tx.Type(), "hash", tx.Hash(), "err", err) - break - } - } - if err != nil { - bs.state.RevertToSnapshot(snap) - } else { - bs.logs = append(bs.logs, logs...) - w.current.tcount = tcount - } - return err -} + logs, err := w.commitTransaction(tx, bs.Coinbase()) + switch { + case errors.Is(err, core.ErrGasLimitReached): + // Pop the current out-of-gas transaction without shifting in the next from the account + log.Trace("Gas limit exceeded for current block", "sender", from) + txs.Pop() -func (bs *blockState) Gas() (remaining uint64) { - return bs.worker.current.gasPool.Gas() -} + case errors.Is(err, core.ErrNonceTooLow): + // New head notification data race between the transaction pool and miner, shift + log.Trace("Skipping transaction with low nonce", "sender", from, "nonce", tx.Nonce()) + txs.Shift() -// The DefaultCollator is the 'normal' block collator. It assembles a block -// based on transaction price ordering. -type DefaultCollator struct{} + case errors.Is(err, core.ErrNonceTooHigh): + // Reorg notification data race between the transaction pool and miner, skip account = + log.Trace("Skipping account with hight nonce", "sender", from, "nonce", tx.Nonce()) + txs.Pop() -var ( - ErrResubmitIntervalElapsed = errors.New("recommit interval elapsed") - ErrNewHead = errors.New("new chain head received") - ErrNoCurrentEnv = errors.New("missing env for mining") - ErrTxProtectionDisabled = errors.New("eip155-compatible tx provided when chain config does not support it") -) + case errors.Is(err, nil): + // Everything ok, collect the logs and shift in the next transaction from the same account + coalescedLogs = append(coalescedLogs, logs...) + w.current.tcount++ + txs.Shift() -func SubmitTransactions(bs BlockState, txs *types.TransactionsByPriceAndNonce, interrupt *int32) error { - for { - if interrupt != nil && atomic.LoadInt32(interrupt) != commitInterruptNone { - if atomic.LoadInt32(interrupt) == commitInterruptResubmit { - return ErrResubmitIntervalElapsed - } else { - return ErrNewHead - } - } - // If we don't have enough gas for any further transactions then we're done - available := bs.Gas() - if available < params.TxGas { - break - } - // Retrieve the next transaction and abort if all done - tx := txs.Peek() - if tx == nil { - break - } - // Enough space for this tx? - if available < tx.Gas() { - txs.Pop() - continue - } - // Error may be ignored here. The error has already been checked - // during transaction acceptance is the transaction pool. - err := bs.AddTransactions(types.Transactions{tx}) - switch { - case errors.Is(err, core.ErrGasLimitReached): - fallthrough case errors.Is(err, core.ErrTxTypeNotSupported): - fallthrough - case errors.Is(err, core.ErrNonceTooHigh): + // Pop the unsupported transaction without shifting in the next from the account + log.Trace("Skipping unsupported transaction type", "sender", from, "type", tx.Type()) txs.Pop() - case errors.Is(err, core.ErrNonceTooLow): - fallthrough - case errors.Is(err, nil): - fallthrough + default: + // Strange error, discard the transaction and get the next in line (note, the + // nonce-too-high clause will prevent us from executing in vain). + log.Debug("Transaction failed, account skipped", "hash", tx.Hash(), "err", err) txs.Shift() } } + bs.logs = coalescedLogs - bs.Commit() - return nil + return returnErr } +func (bs *blockState) Gas() (remaining uint64) { + return bs.worker.current.gasPool.Gas() +} + +// The DefaultCollator is the 'normal' block collator. It assembles a block +// based on transaction price ordering. +type DefaultCollator struct{} + // CollateBlock fills a block based on the highest paying transactions from the -// transaction pool, giving precedence over 'local' transactions. -func (w *DefaultCollator) CollateBlock(bs BlockState, pool Pool, interrupt *int32) error { +// transaction pool, giving precedence over local transactions. +func (w *DefaultCollator) CollateBlock(bs BlockState, pool Pool) error { + recommitFired := false txs, err := pool.Pending(true) if err != nil { + log.Error("could not get pending transactions from the pool", "err", err) return err } if len(txs) == 0 { return nil } - // Split the pending transactions into locals and remotes localTxs, remoteTxs := make(map[common.Address]types.Transactions), txs for _, account := range pool.Locals() { @@ -227,16 +236,25 @@ func (w *DefaultCollator) CollateBlock(bs BlockState, pool Pool, interrupt *int3 } } if len(localTxs) > 0 { - if err := SubmitTransactions(bs, types.NewTransactionsByPriceAndNonce(bs.Signer(), localTxs, bs.BaseFee()), interrupt); err != nil { - return err + if err := bs.AddTransactions(types.NewTransactionsByPriceAndNonce(bs.Signer(), localTxs, bs.BaseFee())); err != nil { + if err == ErrNewHead { + return err + } else { + recommitFired = true + } } } if len(remoteTxs) > 0 { - if err := SubmitTransactions(bs, types.NewTransactionsByPriceAndNonce(bs.Signer(), remoteTxs, bs.BaseFee()), interrupt); err != nil { - return err + if err := bs.AddTransactions(types.NewTransactionsByPriceAndNonce(bs.Signer(), remoteTxs, bs.BaseFee())); err != nil { + if err == ErrNewHead { + return err + } else { + recommitFired = true + } } } - - bs.Commit() + if !recommitFired { + bs.Commit() + } return nil } diff --git a/miner/worker.go b/miner/worker.go index df96df6ab00f..3777e16575c1 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -505,16 +505,13 @@ func (w *worker) mainLoop() { if gp := w.current.gasPool; gp != nil && gp.Gas() < params.TxGas { continue } - txs := make(map[common.Address]types.Transactions) for _, tx := range ev.Txs { acc, _ := types.Sender(w.current.signer, tx) txs[acc] = append(txs[acc], tx) } tcount := w.current.tcount - w.commitTransactionsToPending(txs) - // Only update the snapshot if any new transactons were added // to the pending block if tcount != w.current.tcount { @@ -761,10 +758,10 @@ func (w *worker) commitTransaction(tx *types.Transaction, coinbase common.Addres return receipt.Logs, nil } -func (w *worker) collateBlock(coinbase common.Address, interrupt *int32) error { +func (w *worker) collateBlock(coinbase common.Address, interrupt *int32) bool { // Short circuit if current is nil if w.current == nil { - return ErrNoCurrentEnv + return false } gasLimit := w.current.header.GasLimit if w.current.gasPool == nil { @@ -772,16 +769,20 @@ func (w *worker) collateBlock(coinbase common.Address, interrupt *int32) error { } var bs BlockState bs = &blockState{ - state: w.current.state, - logs: nil, - worker: w, - coinbase: w.current.header.Coinbase, - baseFee: w.current.header.BaseFee, - signer: w.current.signer, + state: w.current.state, + logs: nil, + worker: w, + coinbase: w.current.header.Coinbase, + baseFee: w.current.header.BaseFee, + signer: w.current.signer, + interrupt: interrupt, } var collator = &DefaultCollator{} - return collator.CollateBlock(bs, w.eth.TxPool(), interrupt) + if err := collator.CollateBlock(bs, w.eth.TxPool()); err == ErrNewHead { + return false + } + return true } // commitNewWork generates several new sealing tasks based on the parent block. @@ -894,31 +895,10 @@ func (w *worker) commitNewWork(interrupt *int32, noempty bool, timestamp int64) return } - err = w.collateBlock(w.coinbase, interrupt) - if err != nil { - if err == ErrResubmitIntervalElapsed { - // Notify resubmit loop to increase resubmitting interval due to too frequent commits. - gasLimit := w.current.header.GasLimit - ratio := float64(gasLimit-w.current.gasPool.Gas()) / float64(gasLimit) - if ratio < 0.1 { - ratio = 0.1 - } - w.resubmitAdjustCh <- &intervalAdjust{ - ratio: ratio, - inc: true, - } - } else if err != ErrNewHead { - log.Error("CollateBlock failed", "err", err) - } + if !w.collateBlock(w.coinbase, interrupt) { return } - if interrupt != nil { - // Notify resubmit loop to decrease resubmitting interval if current interval is larger - // than the user-specified one. - w.resubmitAdjustCh <- &intervalAdjust{inc: false} - } - w.commit(uncles, w.fullTaskHook, true, tstart) } @@ -983,24 +963,17 @@ func (w *worker) commitTransactionsToPending(txs map[common.Address]types.Transa } var bs BlockState bs = &blockState{ - state: w.current.state, - logs: nil, - worker: w, - coinbase: w.current.header.Coinbase, - baseFee: w.current.header.BaseFee, - signer: w.current.signer, - } - - txs, err := w.eth.TxPool().Pending(true) - if err != nil { - log.Trace("error getting pending txs from the pool", "err", err) - return - } - if len(txs) == 0 { - return + state: w.current.state, + logs: nil, + worker: w, + coinbase: w.current.header.Coinbase, + baseFee: w.current.header.BaseFee, + signer: w.current.signer, + interrupt: nil, } - SubmitTransactions(bs, types.NewTransactionsByPriceAndNonce(bs.Signer(), txs, bs.BaseFee()), nil) + bs.AddTransactions(types.NewTransactionsByPriceAndNonce(bs.Signer(), txs, bs.BaseFee())) + bs.Commit() } // totalFees computes total consumed miner fees in ETH. Block transactions and receipts have to have the same order. From e9d1b4de6d9ed099c7477a86e5dbc863ed668c46 Mon Sep 17 00:00:00 2001 From: Jared Wasinger Date: Fri, 30 Jul 2021 13:23:13 +0200 Subject: [PATCH 08/13] don't use verbatim coinbase from header (fixes clique sealer test) --- miner/worker.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/miner/worker.go b/miner/worker.go index 3777e16575c1..1ac94ab93549 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -772,7 +772,7 @@ func (w *worker) collateBlock(coinbase common.Address, interrupt *int32) bool { state: w.current.state, logs: nil, worker: w, - coinbase: w.current.header.Coinbase, + coinbase: w.coinbase, baseFee: w.current.header.BaseFee, signer: w.current.signer, interrupt: interrupt, @@ -966,7 +966,7 @@ func (w *worker) commitTransactionsToPending(txs map[common.Address]types.Transa state: w.current.state, logs: nil, worker: w, - coinbase: w.current.header.Coinbase, + coinbase: w.coinbase, baseFee: w.current.header.BaseFee, signer: w.current.signer, interrupt: nil, From 90c3a8e074125588e8acf271447571de10a476d7 Mon Sep 17 00:00:00 2001 From: Jared Wasinger Date: Sat, 31 Jul 2021 13:43:37 +0200 Subject: [PATCH 09/13] fix recommit interval --- miner/collator.go | 43 +++++++++++++------------------------------ miner/worker.go | 16 +++++++++++----- 2 files changed, 24 insertions(+), 35 deletions(-) diff --git a/miner/collator.go b/miner/collator.go index 968e534819ec..59ca44fc6712 100644 --- a/miner/collator.go +++ b/miner/collator.go @@ -32,19 +32,13 @@ import ( // BlockState represents a block-to-be-mined, which is being assembled. A collator // can add transactions, and finish by calling Commit. type BlockState interface { - // AddTransactions attempts to add transactions to the blockstate. - // The following errors are returned - // 1) ErrRecommit- the recommit interval elapses - // 2) ErrNewHead- a new chainHead is received - // For both cases, no more transactions will be added to the block on subsequent - // calls to AddTransactions - // If AddTransactions returns an ErrNewHead during sealing, the collator should abort - // immediately and propogate this error to the caller + // AddTransactions attempts to add transactions to the blockstate. An error + // (ErrNewHead) is returned signalling that the calling Collator should abort + // immediately and not make subsequent calls to AddTransactions. + // If a call to AddTransactions returns an ErrNewHead during sealing, + // the collator should abort immediately and propogate this error to the caller // TODO perhaps this should also return a list of (reason, tx_hash) for failing txs AddTransactions(txs *types.TransactionsByPriceAndNonce) error - // Commit signals that the collation is finished, and the block is ready - // to be sealed. After calling Commit, no more transactions may be added. - Commit() Gas() (remaining uint64) Coinbase() common.Address @@ -76,6 +70,7 @@ type blockState struct { baseFee *big.Int signer types.Signer interrupt *int32 + resubmitAdjustHandled bool } // Coinbase returns the miner-address of the block being mined @@ -113,7 +108,7 @@ func (bs *blockState) Commit() { } // Notify resubmit loop to decrease resubmitting interval if current interval is larger // than the user-specified one. - if bs.interrupt != nil { + if !bs.resubmitAdjustHandled && bs.interrupt != nil { w.resubmitAdjustCh <- &intervalAdjust{inc: false} } } @@ -124,6 +119,9 @@ func (bs *blockState) AddTransactions(txs *types.TransactionsByPriceAndNonce) er returnErr error coalescedLogs []*types.Log ) + if bs.resubmitAdjustHandled { + return nil + } for { if bs.interrupt != nil && atomic.LoadInt32(bs.interrupt) != commitInterruptNone { // Notify resubmit loop to increase resubmitting interval due to too frequent commits. @@ -140,9 +138,9 @@ func (bs *blockState) AddTransactions(txs *types.TransactionsByPriceAndNonce) er if atomic.LoadInt32(bs.interrupt) == commitInterruptNewHead { returnErr = ErrNewHead } + bs.resubmitAdjustHandled = true break } - if w.current.gasPool.Gas() < params.TxGas { log.Trace("Not enough gas for further transactions", "have", w.current.gasPool, "want", params.TxGas) break @@ -157,14 +155,12 @@ func (bs *blockState) AddTransactions(txs *types.TransactionsByPriceAndNonce) er // // We use the eip155 signer regardless of the current hf. from, _ := types.Sender(w.current.signer, tx) - // Check whether the tx is replay protected. If we're not in the EIP155 hf // phase, start ignoring the sender until we do. if tx.Protected() && !w.chainConfig.IsEIP155(w.current.header.Number) { log.Trace("Ignoring reply protected transaction", "hash", tx.Hash(), "eip155", w.chainConfig.EIP155Block) continue } - // Start executing the transaction bs.state.Prepare(tx.Hash(), w.current.tcount) logs, err := w.commitTransaction(tx, bs.Coinbase()) @@ -203,7 +199,6 @@ func (bs *blockState) AddTransactions(txs *types.TransactionsByPriceAndNonce) er } } bs.logs = coalescedLogs - return returnErr } @@ -218,7 +213,6 @@ type DefaultCollator struct{} // CollateBlock fills a block based on the highest paying transactions from the // transaction pool, giving precedence over local transactions. func (w *DefaultCollator) CollateBlock(bs BlockState, pool Pool) error { - recommitFired := false txs, err := pool.Pending(true) if err != nil { log.Error("could not get pending transactions from the pool", "err", err) @@ -237,24 +231,13 @@ func (w *DefaultCollator) CollateBlock(bs BlockState, pool Pool) error { } if len(localTxs) > 0 { if err := bs.AddTransactions(types.NewTransactionsByPriceAndNonce(bs.Signer(), localTxs, bs.BaseFee())); err != nil { - if err == ErrNewHead { - return err - } else { - recommitFired = true - } + return err } } if len(remoteTxs) > 0 { if err := bs.AddTransactions(types.NewTransactionsByPriceAndNonce(bs.Signer(), remoteTxs, bs.BaseFee())); err != nil { - if err == ErrNewHead { - return err - } else { - recommitFired = true - } + return err } } - if !recommitFired { - bs.Commit() - } return nil } diff --git a/miner/worker.go b/miner/worker.go index 1ac94ab93549..e67474d01228 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -767,8 +767,8 @@ func (w *worker) collateBlock(coinbase common.Address, interrupt *int32) bool { if w.current.gasPool == nil { w.current.gasPool = new(core.GasPool).AddGas(gasLimit) } - var bs BlockState - bs = &blockState{ + var bs blockState + bs = blockState{ state: w.current.state, logs: nil, worker: w, @@ -776,12 +776,15 @@ func (w *worker) collateBlock(coinbase common.Address, interrupt *int32) bool { baseFee: w.current.header.BaseFee, signer: w.current.signer, interrupt: interrupt, + resubmitAdjustHandled: false, } var collator = &DefaultCollator{} - if err := collator.CollateBlock(bs, w.eth.TxPool()); err == ErrNewHead { + if err := collator.CollateBlock(&bs, w.eth.TxPool()); err != nil { return false } + + bs.Commit() return true } @@ -961,8 +964,8 @@ func (w *worker) commitTransactionsToPending(txs map[common.Address]types.Transa if w.current.gasPool == nil { w.current.gasPool = new(core.GasPool).AddGas(gasLimit) } - var bs BlockState - bs = &blockState{ + var bs blockState + bs = blockState{ state: w.current.state, logs: nil, worker: w, @@ -970,8 +973,11 @@ func (w *worker) commitTransactionsToPending(txs map[common.Address]types.Transa baseFee: w.current.header.BaseFee, signer: w.current.signer, interrupt: nil, + resubmitAdjustHandled: false, } + // try to commit all transactions to the pending state. won't return an error + // because the recommit interrupt only applies when sealing bs.AddTransactions(types.NewTransactionsByPriceAndNonce(bs.Signer(), txs, bs.BaseFee())) bs.Commit() } From e99ef78adb33c0da8191cd066da18e3b1fd3e39b Mon Sep 17 00:00:00 2001 From: Jared Wasinger Date: Sun, 1 Aug 2021 23:10:01 +0200 Subject: [PATCH 10/13] make AddTransactions behavior "all or nothing" again --- miner/collator.go | 195 ++++++++++++++++++++++++++-------------------- miner/worker.go | 66 ++++++++-------- 2 files changed, 145 insertions(+), 116 deletions(-) diff --git a/miner/collator.go b/miner/collator.go index 59ca44fc6712..bae6cec122f7 100644 --- a/miner/collator.go +++ b/miner/collator.go @@ -29,17 +29,18 @@ import ( "github.com/ethereum/go-ethereum/params" ) -// BlockState represents a block-to-be-mined, which is being assembled. A collator -// can add transactions, and finish by calling Commit. +// BlockState represents a block-to-be-mined, which is being assembled. +// A collator can add transactions by calling AddTransactions type BlockState interface { - // AddTransactions attempts to add transactions to the blockstate. An error - // (ErrNewHead) is returned signalling that the calling Collator should abort - // immediately and not make subsequent calls to AddTransactions. - // If a call to AddTransactions returns an ErrNewHead during sealing, - // the collator should abort immediately and propogate this error to the caller - // TODO perhaps this should also return a list of (reason, tx_hash) for failing txs - AddTransactions(txs *types.TransactionsByPriceAndNonce) error - + // AddTransactions adds the sequence of transactions to the blockstate. Either all + // transactions are added, or none of them. In the latter case, the error + // describes the reason why the txs could not be included. + // if ErrRecommit, the collator should not attempt to add more transactions to the + // block and submit the block for sealing. + // If ErrAbort is returned, the collator should immediately abort and return a + // value (true) from CollateBlock which indicates to the miner to discard the + // block + AddTransactions(sequence types.Transactions) error Gas() (remaining uint64) Coinbase() common.Address BaseFee() *big.Int @@ -47,12 +48,16 @@ type BlockState interface { } var ( - ErrNewHead = errors.New("new chain head received") + ErrAbort = errors.New("miner signalled to abort sealing the current block") + ErrRecommit = errors.New("err sealing recommit timer elapsed") + ErrUnsupportedEIP155Tx = errors.New("encountered eip155 tx when chain doesn't support it") ) // Collator is something that can assemble a block. type Collator interface { - CollateBlock(bs BlockState, pool Pool, interrupt *int32) error + // should add transactions to the pending BlockState. + // should return true if sealing of the block should be aborted + CollateBlock(bs BlockState, pool Pool) bool } // Pool is an interface to the transaction pool @@ -63,14 +68,14 @@ type Pool interface { // blockState is an implementation of BlockState type blockState struct { - state *state.StateDB - logs []*types.Log - worker *worker - coinbase common.Address - baseFee *big.Int - signer types.Signer - interrupt *int32 - resubmitAdjustHandled bool + state *state.StateDB + logs []*types.Log + worker *worker + coinbase common.Address + baseFee *big.Int + signer types.Signer + interrupt *int32 + resubmitAdjustHandled bool } // Coinbase returns the miner-address of the block being mined @@ -113,16 +118,18 @@ func (bs *blockState) Commit() { } } -func (bs *blockState) AddTransactions(txs *types.TransactionsByPriceAndNonce) error { +func (bs *blockState) AddTransactions(sequence types.Transactions) error { var ( - w = bs.worker - returnErr error - coalescedLogs []*types.Log + w = bs.worker + snap = w.current.state.Snapshot() + err error + logs []*types.Log + tcount = w.current.tcount ) - if bs.resubmitAdjustHandled { - return nil - } - for { + if bs.resubmitAdjustHandled { + return ErrRecommit + } + for _, tx := range sequence { if bs.interrupt != nil && atomic.LoadInt32(bs.interrupt) != commitInterruptNone { // Notify resubmit loop to increase resubmitting interval due to too frequent commits. if atomic.LoadInt32(bs.interrupt) == commitInterruptResubmit { @@ -136,70 +143,47 @@ func (bs *blockState) AddTransactions(txs *types.TransactionsByPriceAndNonce) er } } if atomic.LoadInt32(bs.interrupt) == commitInterruptNewHead { - returnErr = ErrNewHead + err = ErrAbort + } else { + err = ErrRecommit } - bs.resubmitAdjustHandled = true + bs.resubmitAdjustHandled = true break } if w.current.gasPool.Gas() < params.TxGas { log.Trace("Not enough gas for further transactions", "have", w.current.gasPool, "want", params.TxGas) + err = core.ErrGasLimitReached break } - // Retrieve the next transaction and abort if all done - tx := txs.Peek() - if tx == nil { - break - } - // Error may be ignored here. The error has already been checked - // during transaction acceptance is the transaction pool. - // - // We use the eip155 signer regardless of the current hf. from, _ := types.Sender(w.current.signer, tx) // Check whether the tx is replay protected. If we're not in the EIP155 hf // phase, start ignoring the sender until we do. if tx.Protected() && !w.chainConfig.IsEIP155(w.current.header.Number) { - log.Trace("Ignoring reply protected transaction", "hash", tx.Hash(), "eip155", w.chainConfig.EIP155Block) - continue + log.Trace("encountered replay-protected transaction when chain doesn't support replay protection", "hash", tx.Hash(), "eip155", w.chainConfig.EIP155Block) + err = ErrUnsupportedEIP155Tx + break } // Start executing the transaction bs.state.Prepare(tx.Hash(), w.current.tcount) - logs, err := w.commitTransaction(tx, bs.Coinbase()) - switch { - case errors.Is(err, core.ErrGasLimitReached): - // Pop the current out-of-gas transaction without shifting in the next from the account - log.Trace("Gas limit exceeded for current block", "sender", from) - txs.Pop() - case errors.Is(err, core.ErrNonceTooLow): - // New head notification data race between the transaction pool and miner, shift - log.Trace("Skipping transaction with low nonce", "sender", from, "nonce", tx.Nonce()) - txs.Shift() - - case errors.Is(err, core.ErrNonceTooHigh): - // Reorg notification data race between the transaction pool and miner, skip account = - log.Trace("Skipping account with hight nonce", "sender", from, "nonce", tx.Nonce()) - txs.Pop() - - case errors.Is(err, nil): - // Everything ok, collect the logs and shift in the next transaction from the same account - coalescedLogs = append(coalescedLogs, logs...) - w.current.tcount++ - txs.Shift() - - case errors.Is(err, core.ErrTxTypeNotSupported): - // Pop the unsupported transaction without shifting in the next from the account - log.Trace("Skipping unsupported transaction type", "sender", from, "type", tx.Type()) - txs.Pop() - - default: - // Strange error, discard the transaction and get the next in line (note, the - // nonce-too-high clause will prevent us from executing in vain). - log.Debug("Transaction failed, account skipped", "hash", tx.Hash(), "err", err) - txs.Shift() + var txLogs []*types.Log + txLogs, err = w.commitTransaction(tx, bs.Coinbase()) + if err == nil { + logs = append(logs, txLogs...) + tcount++ + } else { + log.Trace("Tx block inclusion failed", "sender", from, "nonce", tx.Nonce(), + "type", tx.Type(), "hash", tx.Hash(), "err", err) + break } } - bs.logs = coalescedLogs - return returnErr + if err != nil { + bs.state.RevertToSnapshot(snap) + } else { + bs.logs = append(bs.logs, logs...) + w.current.tcount = tcount + } + return err } func (bs *blockState) Gas() (remaining uint64) { @@ -210,16 +194,60 @@ func (bs *blockState) Gas() (remaining uint64) { // based on transaction price ordering. type DefaultCollator struct{} +func submitTransactions(bs BlockState, txs *types.TransactionsByPriceAndNonce) bool { + returnVal := false + for { + // If we don't have enough gas for any further transactions then we're done + available := bs.Gas() + if available < params.TxGas { + break + } + // Retrieve the next transaction and abort if all done + tx := txs.Peek() + if tx == nil { + break + } + // Enough space for this tx? + if available < tx.Gas() { + txs.Pop() + continue + } + // Error already logged in AddTransactions + err := bs.AddTransactions(types.Transactions{tx}) + switch { + case errors.Is(err, core.ErrGasLimitReached): + fallthrough + case errors.Is(err, core.ErrTxTypeNotSupported): + fallthrough + case errors.Is(err, core.ErrNonceTooHigh): + txs.Pop() + case errors.Is(err, ErrAbort): + returnVal = true + break + case errors.Is(err, ErrRecommit): + break + case errors.Is(err, core.ErrNonceTooLow): + fallthrough + case errors.Is(err, nil): + fallthrough + default: + txs.Shift() + } + } + + return returnVal +} + // CollateBlock fills a block based on the highest paying transactions from the // transaction pool, giving precedence over local transactions. -func (w *DefaultCollator) CollateBlock(bs BlockState, pool Pool) error { +func (w *DefaultCollator) CollateBlock(bs BlockState, pool Pool) bool { txs, err := pool.Pending(true) if err != nil { log.Error("could not get pending transactions from the pool", "err", err) - return err + return true } if len(txs) == 0 { - return nil + return true } // Split the pending transactions into locals and remotes localTxs, remoteTxs := make(map[common.Address]types.Transactions), txs @@ -230,14 +258,15 @@ func (w *DefaultCollator) CollateBlock(bs BlockState, pool Pool) error { } } if len(localTxs) > 0 { - if err := bs.AddTransactions(types.NewTransactionsByPriceAndNonce(bs.Signer(), localTxs, bs.BaseFee())); err != nil { - return err + if submitTransactions(bs, types.NewTransactionsByPriceAndNonce(bs.Signer(), localTxs, bs.BaseFee())) { + return true } } if len(remoteTxs) > 0 { - if err := bs.AddTransactions(types.NewTransactionsByPriceAndNonce(bs.Signer(), remoteTxs, bs.BaseFee())); err != nil { - return err + if submitTransactions(bs, types.NewTransactionsByPriceAndNonce(bs.Signer(), remoteTxs, bs.BaseFee())) { + return true } } - return nil + + return false } diff --git a/miner/worker.go b/miner/worker.go index e67474d01228..c9754956fc5d 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -510,13 +510,7 @@ func (w *worker) mainLoop() { acc, _ := types.Sender(w.current.signer, tx) txs[acc] = append(txs[acc], tx) } - tcount := w.current.tcount w.commitTransactionsToPending(txs) - // Only update the snapshot if any new transactons were added - // to the pending block - if tcount != w.current.tcount { - w.updateSnapshot() - } } else { // Special case, if the consensus engine is 0 period clique(dev mode), // submit mining work here since all empty submission will be rejected @@ -761,7 +755,7 @@ func (w *worker) commitTransaction(tx *types.Transaction, coinbase common.Addres func (w *worker) collateBlock(coinbase common.Address, interrupt *int32) bool { // Short circuit if current is nil if w.current == nil { - return false + return true } gasLimit := w.current.header.GasLimit if w.current.gasPool == nil { @@ -769,23 +763,23 @@ func (w *worker) collateBlock(coinbase common.Address, interrupt *int32) bool { } var bs blockState bs = blockState{ - state: w.current.state, - logs: nil, - worker: w, - coinbase: w.coinbase, - baseFee: w.current.header.BaseFee, - signer: w.current.signer, - interrupt: interrupt, - resubmitAdjustHandled: false, + state: w.current.state, + logs: nil, + worker: w, + coinbase: w.coinbase, + baseFee: w.current.header.BaseFee, + signer: w.current.signer, + interrupt: interrupt, + resubmitAdjustHandled: false, } var collator = &DefaultCollator{} - if err := collator.CollateBlock(&bs, w.eth.TxPool()); err != nil { - return false + if collator.CollateBlock(&bs, w.eth.TxPool()) { + return true } - bs.Commit() - return true + bs.Commit() + return false } // commitNewWork generates several new sealing tasks based on the parent block. @@ -898,7 +892,7 @@ func (w *worker) commitNewWork(interrupt *int32, noempty bool, timestamp int64) return } - if !w.collateBlock(w.coinbase, interrupt) { + if w.collateBlock(w.coinbase, interrupt) { return } @@ -966,19 +960,25 @@ func (w *worker) commitTransactionsToPending(txs map[common.Address]types.Transa } var bs blockState bs = blockState{ - state: w.current.state, - logs: nil, - worker: w, - coinbase: w.coinbase, - baseFee: w.current.header.BaseFee, - signer: w.current.signer, - interrupt: nil, - resubmitAdjustHandled: false, - } - - // try to commit all transactions to the pending state. won't return an error - // because the recommit interrupt only applies when sealing - bs.AddTransactions(types.NewTransactionsByPriceAndNonce(bs.Signer(), txs, bs.BaseFee())) + state: w.current.state, + logs: nil, + worker: w, + coinbase: w.coinbase, + baseFee: w.current.header.BaseFee, + signer: w.current.signer, + interrupt: nil, + resubmitAdjustHandled: false, + } + + // try to commit all transactions to the pending state. won't return an error + // because the recommit interrupt only applies when sealing + tcount := w.current.tcount + submitTransactions(&bs, types.NewTransactionsByPriceAndNonce(bs.Signer(), txs, bs.BaseFee())) + // Only update the snapshot if any new transactons were added + // to the pending block + if tcount != w.current.tcount { + w.updateSnapshot() + } bs.Commit() } From 931bbf28138ca6af0714983ab2451b9ba8052891 Mon Sep 17 00:00:00 2001 From: Jared Wasinger Date: Tue, 3 Aug 2021 14:06:11 +0200 Subject: [PATCH 11/13] remove txs and receipts in revert when AddTransactions fails --- miner/collator.go | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/miner/collator.go b/miner/collator.go index bae6cec122f7..e3869e0c3214 100644 --- a/miner/collator.go +++ b/miner/collator.go @@ -120,11 +120,12 @@ func (bs *blockState) Commit() { func (bs *blockState) AddTransactions(sequence types.Transactions) error { var ( - w = bs.worker - snap = w.current.state.Snapshot() - err error - logs []*types.Log - tcount = w.current.tcount + w = bs.worker + snap = w.current.state.Snapshot() + err error + logs []*types.Log + tcount = w.current.tcount + startTCount = w.current.tcount ) if bs.resubmitAdjustHandled { return ErrRecommit @@ -179,6 +180,14 @@ func (bs *blockState) AddTransactions(sequence types.Transactions) error { } if err != nil { bs.state.RevertToSnapshot(snap) + + // remove the txs and receipts that were added + for i := startTCount; i < tcount; i++ { + w.current.txs[i] = nil + w.current.receipts[i] = nil + } + w.current.txs = w.current.txs[:startTCount+1] + w.current.receipts = w.current.receipts[:startTCount+1] } else { bs.logs = append(bs.logs, logs...) w.current.tcount = tcount From d0ae6aa95e567543b0ad13d38310f42594284769 Mon Sep 17 00:00:00 2001 From: Jared Wasinger Date: Tue, 3 Aug 2021 18:15:28 +0200 Subject: [PATCH 12/13] fix --- miner/collator.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/miner/collator.go b/miner/collator.go index e3869e0c3214..72902360ff0a 100644 --- a/miner/collator.go +++ b/miner/collator.go @@ -186,8 +186,8 @@ func (bs *blockState) AddTransactions(sequence types.Transactions) error { w.current.txs[i] = nil w.current.receipts[i] = nil } - w.current.txs = w.current.txs[:startTCount+1] - w.current.receipts = w.current.receipts[:startTCount+1] + w.current.txs = w.current.txs[:startTCount] + w.current.receipts = w.current.receipts[:startTCount] } else { bs.logs = append(bs.logs, logs...) w.current.tcount = tcount From 851885207db651e0134d34a66f2c24ce6f2247db Mon Sep 17 00:00:00 2001 From: Jared Wasinger Date: Tue, 3 Aug 2021 20:10:04 +0200 Subject: [PATCH 13/13] make collator's AddTransactions accept callback that determines whether to revert --- miner/collator.go | 66 ++++++++++++++++++++++++++++++----------------- 1 file changed, 42 insertions(+), 24 deletions(-) diff --git a/miner/collator.go b/miner/collator.go index 72902360ff0a..5d8b24ee8907 100644 --- a/miner/collator.go +++ b/miner/collator.go @@ -29,6 +29,8 @@ import ( "github.com/ethereum/go-ethereum/params" ) +type AddTransactionsResultFunc func(error, []*types.Receipt) bool + // BlockState represents a block-to-be-mined, which is being assembled. // A collator can add transactions by calling AddTransactions type BlockState interface { @@ -40,7 +42,7 @@ type BlockState interface { // If ErrAbort is returned, the collator should immediately abort and return a // value (true) from CollateBlock which indicates to the miner to discard the // block - AddTransactions(sequence types.Transactions) error + AddTransactions(sequence types.Transactions, cb AddTransactionsResultFunc) Gas() (remaining uint64) Coinbase() common.Address BaseFee() *big.Int @@ -118,7 +120,7 @@ func (bs *blockState) Commit() { } } -func (bs *blockState) AddTransactions(sequence types.Transactions) error { +func (bs *blockState) AddTransactions(sequence types.Transactions, cb AddTransactionsResultFunc) { var ( w = bs.worker snap = w.current.state.Snapshot() @@ -128,8 +130,10 @@ func (bs *blockState) AddTransactions(sequence types.Transactions) error { startTCount = w.current.tcount ) if bs.resubmitAdjustHandled { - return ErrRecommit + cb(ErrRecommit, nil) + return } + for _, tx := range sequence { if bs.interrupt != nil && atomic.LoadInt32(bs.interrupt) != commitInterruptNone { // Notify resubmit loop to increase resubmitting interval due to too frequent commits. @@ -178,7 +182,14 @@ func (bs *blockState) AddTransactions(sequence types.Transactions) error { break } } - if err != nil { + var txReceipts []*types.Receipt = nil + if err == nil { + txReceipts = w.current.receipts[startTCount:tcount] + } + // TODO: deep copy the tx receipts here or add a disclaimer to implementors not to modify them? + shouldRevert := cb(err, txReceipts) + + if err != nil || shouldRevert { bs.state.RevertToSnapshot(snap) // remove the txs and receipts that were added @@ -192,7 +203,6 @@ func (bs *blockState) AddTransactions(sequence types.Transactions) error { bs.logs = append(bs.logs, logs...) w.current.tcount = tcount } - return err } func (bs *blockState) Gas() (remaining uint64) { @@ -205,6 +215,31 @@ type DefaultCollator struct{} func submitTransactions(bs BlockState, txs *types.TransactionsByPriceAndNonce) bool { returnVal := false + shouldReturn := false + cb := func(err error, receipts []*types.Receipt) bool { + switch { + case errors.Is(err, core.ErrGasLimitReached): + fallthrough + case errors.Is(err, core.ErrTxTypeNotSupported): + fallthrough + case errors.Is(err, core.ErrNonceTooHigh): + txs.Pop() + case errors.Is(err, ErrAbort): + returnVal = true + shouldReturn = true + case errors.Is(err, ErrRecommit): + returnVal = false + shouldReturn = true + case errors.Is(err, core.ErrNonceTooLow): + fallthrough + case errors.Is(err, nil): + fallthrough + default: + txs.Shift() + } + return false + } + for { // If we don't have enough gas for any further transactions then we're done available := bs.Gas() @@ -221,26 +256,9 @@ func submitTransactions(bs BlockState, txs *types.TransactionsByPriceAndNonce) b txs.Pop() continue } - // Error already logged in AddTransactions - err := bs.AddTransactions(types.Transactions{tx}) - switch { - case errors.Is(err, core.ErrGasLimitReached): - fallthrough - case errors.Is(err, core.ErrTxTypeNotSupported): - fallthrough - case errors.Is(err, core.ErrNonceTooHigh): - txs.Pop() - case errors.Is(err, ErrAbort): - returnVal = true + bs.AddTransactions(types.Transactions{tx}, cb) + if shouldReturn { break - case errors.Is(err, ErrRecommit): - break - case errors.Is(err, core.ErrNonceTooLow): - fallthrough - case errors.Is(err, nil): - fallthrough - default: - txs.Shift() } }