Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 21 additions & 3 deletions core/txpool/locals/journal.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,25 @@ func (journal *journal) load(add func([]*types.Transaction) []error) error {
return failure
}

func (journal *journal) setupWriter() error {
if journal.writer != nil {
if err := journal.writer.Close(); err != nil {
return err
}
journal.writer = nil
}

// Re-open the journal file for appending
// Use O_APPEND to ensure we always write to the end of the file
sink, err := os.OpenFile(journal.path, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644)
if err != nil {
return err
}
journal.writer = sink

Comment thread
gzliudan marked this conversation as resolved.
Comment thread
gzliudan marked this conversation as resolved.
return nil
}

// insert adds the specified transaction to the local disk journal.
func (journal *journal) insert(tx *types.Transaction) error {
if journal.writer == nil {
Expand All @@ -137,7 +156,7 @@ func (journal *journal) rotate(all map[common.Address]types.Transactions) error
journal.writer = nil
}
// Generate a new journal with the contents of the current pool
replacement, err := os.OpenFile(journal.path+".new", os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0755)
replacement, err := os.OpenFile(journal.path+".new", os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
if err != nil {
return err
}
Expand All @@ -157,7 +176,7 @@ func (journal *journal) rotate(all map[common.Address]types.Transactions) error
if err = os.Rename(journal.path+".new", journal.path); err != nil {
return err
}
sink, err := os.OpenFile(journal.path, os.O_WRONLY|os.O_APPEND, 0755)
sink, err := os.OpenFile(journal.path, os.O_WRONLY|os.O_APPEND, 0644)
if err != nil {
return err
}
Expand All @@ -175,7 +194,6 @@ func (journal *journal) rotate(all map[common.Address]types.Transactions) error
// close flushes the transaction journal contents to disk and closes the file.
func (journal *journal) close() error {
var err error

if journal.writer != nil {
err = journal.writer.Close()
journal.writer = nil
Expand Down
55 changes: 33 additions & 22 deletions core/txpool/locals/tx_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,13 +115,14 @@ func (tracker *TxTracker) TrackAll(txs []*types.Transaction) {
}

// recheck checks and returns any transactions that needs to be resubmitted.
func (tracker *TxTracker) recheck(journalCheck bool) (resubmits []*types.Transaction, rejournal map[common.Address]types.Transactions) {
func (tracker *TxTracker) recheck(journalCheck bool) []*types.Transaction {
tracker.mu.Lock()
defer tracker.mu.Unlock()

var (
numStales = 0
numOk = 0
resubmits []*types.Transaction
)
for sender, txs := range tracker.byAddr {
// Wipe the stales
Expand All @@ -142,7 +143,7 @@ func (tracker *TxTracker) recheck(journalCheck bool) (resubmits []*types.Transac
}

if journalCheck { // rejournal
rejournal = make(map[common.Address]types.Transactions)
rejournal := make(map[common.Address]types.Transactions)
for _, tx := range tracker.all {
addr, _ := types.Sender(tracker.signer, tx)
rejournal[addr] = append(rejournal[addr], tx)
Expand All @@ -154,18 +155,39 @@ func (tracker *TxTracker) recheck(journalCheck bool) (resubmits []*types.Transac
return cmp.Compare(a.Nonce(), b.Nonce())
})
}
// Rejournal the tracker while holding the lock. No new transactions will
// be added to the old journal during this period, preventing any potential
// transaction loss.
if tracker.journal != nil {
if err := tracker.journal.rotate(rejournal); err != nil {
log.Warn("Transaction journal rotation failed", "err", err)
}
}
}
localGauge.Update(int64(len(tracker.all)))
log.Debug("Tx tracker status", "need-resubmit", len(resubmits), "stale", numStales, "ok", numOk)
return resubmits, rejournal
return resubmits
}

// Start implements node.Lifecycle interface
// Start is called after all services have been constructed and the networking
// layer was also initialized to spawn any goroutines required by the service.
func (tracker *TxTracker) Start() error {
tracker.wg.Add(1)
go tracker.loop()
if tracker.journal != nil {
if err := tracker.journal.load(func(transactions []*types.Transaction) []error {
tracker.TrackAll(transactions)
return nil
}); err != nil {
log.Warn("Failed to load transaction journal", "err", err)
}

// Ensure the writer is ready before Start returns so Track/TrackAll can
// persist transactions immediately.
if err := tracker.journal.setupWriter(); err != nil {
return err
}
}
tracker.wg.Go(tracker.loop)
return nil
}

Expand All @@ -179,13 +201,7 @@ func (tracker *TxTracker) Stop() error {
}

func (tracker *TxTracker) loop() {
defer tracker.wg.Done()

if tracker.journal != nil {
tracker.journal.load(func(transactions []*types.Transaction) []error {
tracker.TrackAll(transactions)
return nil
})
defer tracker.journal.close()
}
var (
Expand All @@ -197,20 +213,15 @@ func (tracker *TxTracker) loop() {
case <-tracker.shutdownCh:
return
case <-timer.C:
checkJournal := tracker.journal != nil && time.Since(lastJournal) > tracker.rejournal
resubmits, rejournal := tracker.recheck(checkJournal)
var rejournal bool
if tracker.journal != nil && time.Since(lastJournal) > tracker.rejournal {
rejournal, lastJournal = true, time.Now()
log.Debug("Rejournal the transaction tracker")
}
resubmits := tracker.recheck(rejournal)
if len(resubmits) > 0 {
tracker.pool.Add(resubmits, false)
}
if checkJournal {
// Lock to prevent journal.rotate <-> journal.insert (via TrackAll) conflicts
tracker.mu.Lock()
lastJournal = time.Now()
if err := tracker.journal.rotate(rejournal); err != nil {
log.Warn("Transaction journal rotation failed", "err", err)
}
tracker.mu.Unlock()
}
timer.Reset(recheckInterval)
}
}
Expand Down
134 changes: 95 additions & 39 deletions core/txpool/locals/tx_tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,12 @@
package locals

import (
"fmt"
"maps"
"math/big"
"math/rand"
"os"
"path/filepath"
"testing"
"time"

Expand Down Expand Up @@ -65,7 +70,10 @@ func newTestEnv(t *testing.T, n int, gasTip uint64, journal string) *testEnv {
})

db := rawdb.NewMemoryDatabase()
chain, _ := core.NewBlockChain(db, nil, gspec, ethash.NewFaker(), vm.Config{})
chain, err := core.NewBlockChain(db, nil, gspec, ethash.NewFaker(), vm.Config{})
if err != nil {
t.Fatalf("Failed to create blockchain: %v", err)
}

legacyPool := legacypool.New(legacypool.DefaultConfig, chain)
pool, err := txpool.New(gasTip, chain, []txpool.SubPool{legacyPool})
Expand All @@ -87,24 +95,10 @@ func newTestEnv(t *testing.T, n int, gasTip uint64, journal string) *testEnv {
}

func (env *testEnv) close() {
env.chain.Stop()
}

func (env *testEnv) setGasTip(gasTip uint64) {
env.pool.SetGasTip(new(big.Int).SetUint64(gasTip))
}

func (env *testEnv) makeTx(nonce uint64, gasPrice *big.Int) *types.Transaction {
if nonce == 0 {
head := env.chain.CurrentHeader()
state, _ := env.chain.StateAt(head.Root)
nonce = state.GetNonce(address)
if err := env.pool.Close(); err != nil {
panic(fmt.Sprintf("failed to close tx pool: %v", err))
}
if gasPrice == nil {
gasPrice = big.NewInt(params.GWei)
}
tx, _ := types.SignTx(types.NewTransaction(nonce, common.Address{0x00}, big.NewInt(1000), params.TxGas, gasPrice, nil), signer, key)
return tx
env.chain.Stop()
}

func (env *testEnv) makeTxs(n int) []*types.Transaction {
Expand All @@ -120,22 +114,6 @@ func (env *testEnv) makeTxs(n int) []*types.Transaction {
return txs
}

func (env *testEnv) commit() {
head := env.chain.CurrentBlock()
block := env.chain.GetBlock(head.Hash(), head.Number.Uint64())
blocks, _ := core.GenerateChain(env.chain.Config(), block, ethash.NewFaker(), env.genDb, 1, func(i int, gen *core.BlockGen) {
tx, err := types.SignTx(types.NewTransaction(gen.TxNonce(address), common.Address{0x00}, big.NewInt(1000), params.TxGas, big.NewInt(params.GWei), nil), signer, key)
if err != nil {
panic(err)
}
gen.AddTx(tx)
})
env.chain.InsertChain(blocks)
if err := env.pool.Sync(); err != nil {
panic(err)
}
}

func TestResubmit(t *testing.T) {
env := newTestEnv(t, 10, 0, "")
defer env.close()
Expand All @@ -144,20 +122,98 @@ func TestResubmit(t *testing.T) {
txsA := txs[:len(txs)/2]
txsB := txs[len(txs)/2:]
env.pool.Add(txsA, true)

pending, queued := env.pool.ContentFrom(address)
if len(pending) != len(txsA) || len(queued) != 0 {
t.Fatalf("Unexpected txpool content: %d, %d", len(pending), len(queued))
}
env.tracker.TrackAll(txs)

resubmit, all := env.tracker.recheck(true)
resubmit := env.tracker.recheck(false)
if len(resubmit) != len(txsB) {
t.Fatalf("Unexpected transactions to resubmit, got: %d, want: %d", len(resubmit), len(txsB))
}
if len(all) == 0 || len(all[address]) == 0 {
t.Fatalf("Unexpected transactions being tracked, got: %d, want: %d", 0, len(txs))
env.tracker.mu.Lock()
allCopy := maps.Clone(env.tracker.all)
env.tracker.mu.Unlock()

if len(allCopy) != len(txs) {
t.Fatalf("Unexpected transactions being tracked, got: %d, want: %d", len(allCopy), len(txs))
}
}

func TestJournal(t *testing.T) {
journalPath := filepath.Join(t.TempDir(), fmt.Sprintf("%d", rand.Int63()))
env := newTestEnv(t, 10, 0, journalPath)
defer env.close()

if err := env.tracker.Start(); err != nil {
t.Fatalf("Failed to start tracker: %v", err)
}

txs := env.makeTxs(10)
txsA := txs[:len(txs)/2]
txsB := txs[len(txs)/2:]
env.pool.Add(txsA, true)

pending, queued := env.pool.ContentFrom(address)
if len(pending) != len(txsA) || len(queued) != 0 {
t.Fatalf("Unexpected txpool content: %d, %d", len(pending), len(queued))
}
env.tracker.TrackAll(txsA)
env.tracker.TrackAll(txsB)
env.tracker.Stop()

// Make sure all the transactions are properly journalled
trackerB := New(journalPath, time.Minute, gspec.Config, env.pool)
Comment thread
gzliudan marked this conversation as resolved.
if err := trackerB.journal.load(func(transactions []*types.Transaction) []error {
trackerB.TrackAll(transactions)
return nil
}); err != nil {
t.Fatalf("Failed to load journal: %v", err)
}
if len(all[address]) != len(txs) {
t.Fatalf("Unexpected transactions being tracked, got: %d, want: %d", len(all[address]), len(txs))

trackerB.mu.Lock()
allCopy := maps.Clone(trackerB.all)
trackerB.mu.Unlock()

if len(allCopy) != len(txs) {
t.Fatalf("Unexpected transactions being tracked, got: %d, want: %d", len(allCopy), len(txs))
}
}

func TestStartInitializesJournalWriter(t *testing.T) {
journalPath := filepath.Join(t.TempDir(), fmt.Sprintf("%d", rand.Int63()))
env := newTestEnv(t, 10, 0, journalPath)
defer env.close()

if err := env.tracker.Start(); err != nil {
t.Fatalf("Failed to start tracker: %v", err)
}
defer env.tracker.Stop()

if env.tracker.journal == nil {
t.Fatal("Journal should be configured")
}
if env.tracker.journal.writer == nil {
t.Fatal("Journal writer should be initialized before Start returns")
}
}

func TestStartContinuesOnCorruptedJournal(t *testing.T) {
journalPath := filepath.Join(t.TempDir(), fmt.Sprintf("%d", rand.Int63()))
if err := os.WriteFile(journalPath, []byte{0xff, 0x00, 0x01}, 0o644); err != nil {
t.Fatalf("Failed to create corrupted journal: %v", err)
}
env := newTestEnv(t, 10, 0, journalPath)
defer env.close()

if err := env.tracker.Start(); err != nil {
t.Fatalf("Start should continue when journal load fails, got: %v", err)
}
defer env.tracker.Stop()

if env.tracker.journal.writer == nil {
t.Fatal("Journal writer should be initialized even if journal load fails")
}
}
Loading