Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/geth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ var (
utils.TxPoolLocalsFlag,
utils.TxPoolNoLocalsFlag,
utils.TxPoolJournalFlag,
utils.TxPoolJournalRemotesFlag,
utils.TxPoolRejournalFlag,
utils.TxPoolPriceLimitFlag,
utils.TxPoolPriceBumpFlag,
Expand Down
8 changes: 8 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,11 @@ var (
Value: txpool.DefaultConfig.Journal,
Category: flags.TxPoolCategory,
}
TxPoolJournalRemotesFlag = &cli.BoolFlag{
Name: "txpool.journalremotes",
Usage: "Includes remote transactions in the journal",
Category: flags.TxPoolCategory,
}
TxPoolRejournalFlag = &cli.DurationFlag{
Name: "txpool.rejournal",
Usage: "Time interval to regenerate the local transaction journal",
Expand Down Expand Up @@ -1625,6 +1630,9 @@ func setTxPool(ctx *cli.Context, cfg *txpool.Config) {
if ctx.IsSet(TxPoolJournalFlag.Name) {
cfg.Journal = ctx.String(TxPoolJournalFlag.Name)
}
if ctx.IsSet(TxPoolJournalRemotesFlag.Name) {
cfg.JournalRemote = ctx.Bool(TxPoolJournalRemotesFlag.Name)
}
if ctx.IsSet(TxPoolRejournalFlag.Name) {
cfg.Rejournal = ctx.Duration(TxPoolRejournalFlag.Name)
}
Expand Down
37 changes: 31 additions & 6 deletions core/txpool/txpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,10 @@ type Config struct {
Journal string // Journal of local transactions to survive node restarts
Rejournal time.Duration // Time interval to regenerate the local transaction journal

// JournalRemote controls whether journaling includes remote transactions or not.
// When true, all transactions loaded from the journal are treated as remote.
JournalRemote bool

PriceLimit uint64 // Minimum gas price to enforce for acceptance into the pool
PriceBump uint64 // Minimum price bump percentage to replace an already existing transaction (nonce)

Expand Down Expand Up @@ -330,14 +334,18 @@ func NewTxPool(config Config, chainconfig *params.ChainConfig, chain blockChain)
pool.wg.Add(1)
go pool.scheduleReorgLoop()

// If local transactions and journaling is enabled, load from disk
if !config.NoLocals && config.Journal != "" {
// If journaling is enabled and has transactions to journal, load from disk
if (!config.NoLocals || config.JournalRemote) && config.Journal != "" {
pool.journal = newTxJournal(config.Journal)

if err := pool.journal.load(pool.AddLocals); err != nil {
add := pool.AddLocals
if config.JournalRemote {
add = pool.AddRemotesSync // Use sync version to match pool.AddLocals
}
if err := pool.journal.load(add); err != nil {
log.Warn("Failed to load transaction journal", "err", err)
}
if err := pool.journal.rotate(pool.local()); err != nil {
if err := pool.journal.rotate(pool.toJournal()); err != nil {
log.Warn("Failed to rotate transaction journal", "err", err)
}
}
Expand Down Expand Up @@ -420,7 +428,7 @@ func (pool *TxPool) loop() {
case <-journal.C:
if pool.journal != nil {
pool.mu.Lock()
if err := pool.journal.rotate(pool.local()); err != nil {
if err := pool.journal.rotate(pool.toJournal()); err != nil {
log.Warn("Failed to rotate local tx journal", "err", err)
}
pool.mu.Unlock()
Expand Down Expand Up @@ -600,6 +608,23 @@ func (pool *TxPool) local() map[common.Address]types.Transactions {
return txs
}

// toJournal retrieves all transactions that should be included in the journal,
// grouped by origin account and sorted by nonce.
// The returned transaction set is a copy and can be freely modified by calling code.
func (pool *TxPool) toJournal() map[common.Address]types.Transactions {
if !pool.config.JournalRemote {
return pool.local()
}
txs := make(map[common.Address]types.Transactions)
for addr, pending := range pool.pending {
txs[addr] = append(txs[addr], pending.Flatten()...)
}
for addr, queued := range pool.queue {
txs[addr] = append(txs[addr], queued.Flatten()...)
}
return txs
}

// validateTxBasics checks whether a transaction is valid according to the consensus
// rules, but does not check state-dependent validation such as sufficient balance.
// This check is meant as an early check which only needs to be performed once,
Expand Down Expand Up @@ -904,7 +929,7 @@ func (pool *TxPool) enqueueTx(hash common.Hash, tx *types.Transaction, local boo
// deemed to have been sent from a local account.
func (pool *TxPool) journalTx(from common.Address, tx *types.Transaction) {
// Only journal if it's enabled and the transaction is local
if pool.journal == nil || !pool.locals.contains(from) {
if pool.journal == nil || (!pool.config.JournalRemote && !pool.locals.contains(from)) {
return
}
if err := pool.journal.insert(tx); err != nil {
Expand Down
28 changes: 21 additions & 7 deletions core/txpool/txpool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2255,10 +2255,13 @@ func TestReplacementDynamicFee(t *testing.T) {

// Tests that local transactions are journaled to disk, but remote transactions
// get discarded between restarts.
func TestJournaling(t *testing.T) { testJournaling(t, false) }
func TestJournalingNoLocals(t *testing.T) { testJournaling(t, true) }
func TestJournaling(t *testing.T) { testJournaling(t, false, false) }
func TestJournalingNoLocals(t *testing.T) { testJournaling(t, true, false) }

func testJournaling(t *testing.T, nolocals bool) {
func TestJournalingRemotes(t *testing.T) { testJournaling(t, false, true) }
func TestJournalingRemotesNoLocals(t *testing.T) { testJournaling(t, true, true) }

func testJournaling(t *testing.T, nolocals bool, journalRemotes bool) {
t.Parallel()

// Create a temporary file for the journal
Expand All @@ -2279,6 +2282,7 @@ func testJournaling(t *testing.T, nolocals bool) {

config := testTxPoolConfig
config.NoLocals = nolocals
config.JournalRemote = journalRemotes
config.Journal = journal
config.Rejournal = time.Second

Expand Down Expand Up @@ -2325,10 +2329,14 @@ func testJournaling(t *testing.T, nolocals bool) {
if queued != 0 {
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 0)
}
if nolocals {
if nolocals && !journalRemotes {
if pending != 0 {
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 0)
}
} else if journalRemotes {
if pending != 3 {
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 3)
}
} else {
if pending != 2 {
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 2)
Expand All @@ -2348,10 +2356,16 @@ func testJournaling(t *testing.T, nolocals bool) {
pool = NewTxPool(config, params.TestChainConfig, blockchain)

pending, queued = pool.Stats()
if pending != 0 {
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 0)
if journalRemotes {
if pending != 1 { // Remove the 2 replaced local transactions, but preserve the remote
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 1)
}
} else {
if pending != 0 {
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 0)
}
}
if nolocals {
if nolocals && !journalRemotes {
if queued != 0 {
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 0)
}
Expand Down