diff --git a/core/txpool/locals/journal.go b/core/txpool/locals/journal.go index 9eea07c03d7d..ed71d1e3dcff 100644 --- a/core/txpool/locals/journal.go +++ b/core/txpool/locals/journal.go @@ -115,6 +115,25 @@ func (journal *journal) load(add func([]*types.Transaction) []error) error { return failure } +func (journal *journal) setupWriter() error { + if journal.writer != nil { + if err := journal.writer.Close(); err != nil { + return err + } + journal.writer = nil + } + + // Re-open the journal file for appending + // Use O_APPEND to ensure we always write to the end of the file + sink, err := os.OpenFile(journal.path, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644) + if err != nil { + return err + } + journal.writer = sink + + return nil +} + // insert adds the specified transaction to the local disk journal. func (journal *journal) insert(tx *types.Transaction) error { if journal.writer == nil { @@ -137,7 +156,7 @@ func (journal *journal) rotate(all map[common.Address]types.Transactions) error journal.writer = nil } // Generate a new journal with the contents of the current pool - replacement, err := os.OpenFile(journal.path+".new", os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0755) + replacement, err := os.OpenFile(journal.path+".new", os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) if err != nil { return err } @@ -157,7 +176,7 @@ func (journal *journal) rotate(all map[common.Address]types.Transactions) error if err = os.Rename(journal.path+".new", journal.path); err != nil { return err } - sink, err := os.OpenFile(journal.path, os.O_WRONLY|os.O_APPEND, 0755) + sink, err := os.OpenFile(journal.path, os.O_WRONLY|os.O_APPEND, 0644) if err != nil { return err } @@ -175,7 +194,6 @@ func (journal *journal) rotate(all map[common.Address]types.Transactions) error // close flushes the transaction journal contents to disk and closes the file. func (journal *journal) close() error { var err error - if journal.writer != nil { err = journal.writer.Close() journal.writer = nil diff --git a/core/txpool/locals/tx_tracker.go b/core/txpool/locals/tx_tracker.go index 7a24ff2137ad..9013e04109a1 100644 --- a/core/txpool/locals/tx_tracker.go +++ b/core/txpool/locals/tx_tracker.go @@ -115,13 +115,14 @@ func (tracker *TxTracker) TrackAll(txs []*types.Transaction) { } // recheck checks and returns any transactions that needs to be resubmitted. -func (tracker *TxTracker) recheck(journalCheck bool) (resubmits []*types.Transaction, rejournal map[common.Address]types.Transactions) { +func (tracker *TxTracker) recheck(journalCheck bool) []*types.Transaction { tracker.mu.Lock() defer tracker.mu.Unlock() var ( numStales = 0 numOk = 0 + resubmits []*types.Transaction ) for sender, txs := range tracker.byAddr { // Wipe the stales @@ -142,7 +143,7 @@ func (tracker *TxTracker) recheck(journalCheck bool) (resubmits []*types.Transac } if journalCheck { // rejournal - rejournal = make(map[common.Address]types.Transactions) + rejournal := make(map[common.Address]types.Transactions) for _, tx := range tracker.all { addr, _ := types.Sender(tracker.signer, tx) rejournal[addr] = append(rejournal[addr], tx) @@ -154,18 +155,39 @@ func (tracker *TxTracker) recheck(journalCheck bool) (resubmits []*types.Transac return cmp.Compare(a.Nonce(), b.Nonce()) }) } + // Rejournal the tracker while holding the lock. No new transactions will + // be added to the old journal during this period, preventing any potential + // transaction loss. + if tracker.journal != nil { + if err := tracker.journal.rotate(rejournal); err != nil { + log.Warn("Transaction journal rotation failed", "err", err) + } + } } localGauge.Update(int64(len(tracker.all))) log.Debug("Tx tracker status", "need-resubmit", len(resubmits), "stale", numStales, "ok", numOk) - return resubmits, rejournal + return resubmits } // Start implements node.Lifecycle interface // Start is called after all services have been constructed and the networking // layer was also initialized to spawn any goroutines required by the service. func (tracker *TxTracker) Start() error { - tracker.wg.Add(1) - go tracker.loop() + if tracker.journal != nil { + if err := tracker.journal.load(func(transactions []*types.Transaction) []error { + tracker.TrackAll(transactions) + return nil + }); err != nil { + log.Warn("Failed to load transaction journal", "err", err) + } + + // Ensure the writer is ready before Start returns so Track/TrackAll can + // persist transactions immediately. + if err := tracker.journal.setupWriter(); err != nil { + return err + } + } + tracker.wg.Go(tracker.loop) return nil } @@ -179,13 +201,7 @@ func (tracker *TxTracker) Stop() error { } func (tracker *TxTracker) loop() { - defer tracker.wg.Done() - if tracker.journal != nil { - tracker.journal.load(func(transactions []*types.Transaction) []error { - tracker.TrackAll(transactions) - return nil - }) defer tracker.journal.close() } var ( @@ -197,20 +213,15 @@ func (tracker *TxTracker) loop() { case <-tracker.shutdownCh: return case <-timer.C: - checkJournal := tracker.journal != nil && time.Since(lastJournal) > tracker.rejournal - resubmits, rejournal := tracker.recheck(checkJournal) + var rejournal bool + if tracker.journal != nil && time.Since(lastJournal) > tracker.rejournal { + rejournal, lastJournal = true, time.Now() + log.Debug("Rejournal the transaction tracker") + } + resubmits := tracker.recheck(rejournal) if len(resubmits) > 0 { tracker.pool.Add(resubmits, false) } - if checkJournal { - // Lock to prevent journal.rotate <-> journal.insert (via TrackAll) conflicts - tracker.mu.Lock() - lastJournal = time.Now() - if err := tracker.journal.rotate(rejournal); err != nil { - log.Warn("Transaction journal rotation failed", "err", err) - } - tracker.mu.Unlock() - } timer.Reset(recheckInterval) } } diff --git a/core/txpool/locals/tx_tracker_test.go b/core/txpool/locals/tx_tracker_test.go index d23118f1029e..c07f76823a76 100644 --- a/core/txpool/locals/tx_tracker_test.go +++ b/core/txpool/locals/tx_tracker_test.go @@ -17,7 +17,12 @@ package locals import ( + "fmt" + "maps" "math/big" + "math/rand" + "os" + "path/filepath" "testing" "time" @@ -65,7 +70,10 @@ func newTestEnv(t *testing.T, n int, gasTip uint64, journal string) *testEnv { }) db := rawdb.NewMemoryDatabase() - chain, _ := core.NewBlockChain(db, nil, gspec, ethash.NewFaker(), vm.Config{}) + chain, err := core.NewBlockChain(db, nil, gspec, ethash.NewFaker(), vm.Config{}) + if err != nil { + t.Fatalf("Failed to create blockchain: %v", err) + } legacyPool := legacypool.New(legacypool.DefaultConfig, chain) pool, err := txpool.New(gasTip, chain, []txpool.SubPool{legacyPool}) @@ -87,24 +95,10 @@ func newTestEnv(t *testing.T, n int, gasTip uint64, journal string) *testEnv { } func (env *testEnv) close() { - env.chain.Stop() -} - -func (env *testEnv) setGasTip(gasTip uint64) { - env.pool.SetGasTip(new(big.Int).SetUint64(gasTip)) -} - -func (env *testEnv) makeTx(nonce uint64, gasPrice *big.Int) *types.Transaction { - if nonce == 0 { - head := env.chain.CurrentHeader() - state, _ := env.chain.StateAt(head.Root) - nonce = state.GetNonce(address) + if err := env.pool.Close(); err != nil { + panic(fmt.Sprintf("failed to close tx pool: %v", err)) } - if gasPrice == nil { - gasPrice = big.NewInt(params.GWei) - } - tx, _ := types.SignTx(types.NewTransaction(nonce, common.Address{0x00}, big.NewInt(1000), params.TxGas, gasPrice, nil), signer, key) - return tx + env.chain.Stop() } func (env *testEnv) makeTxs(n int) []*types.Transaction { @@ -120,22 +114,6 @@ func (env *testEnv) makeTxs(n int) []*types.Transaction { return txs } -func (env *testEnv) commit() { - head := env.chain.CurrentBlock() - block := env.chain.GetBlock(head.Hash(), head.Number.Uint64()) - blocks, _ := core.GenerateChain(env.chain.Config(), block, ethash.NewFaker(), env.genDb, 1, func(i int, gen *core.BlockGen) { - tx, err := types.SignTx(types.NewTransaction(gen.TxNonce(address), common.Address{0x00}, big.NewInt(1000), params.TxGas, big.NewInt(params.GWei), nil), signer, key) - if err != nil { - panic(err) - } - gen.AddTx(tx) - }) - env.chain.InsertChain(blocks) - if err := env.pool.Sync(); err != nil { - panic(err) - } -} - func TestResubmit(t *testing.T) { env := newTestEnv(t, 10, 0, "") defer env.close() @@ -144,20 +122,98 @@ func TestResubmit(t *testing.T) { txsA := txs[:len(txs)/2] txsB := txs[len(txs)/2:] env.pool.Add(txsA, true) + pending, queued := env.pool.ContentFrom(address) if len(pending) != len(txsA) || len(queued) != 0 { t.Fatalf("Unexpected txpool content: %d, %d", len(pending), len(queued)) } env.tracker.TrackAll(txs) - resubmit, all := env.tracker.recheck(true) + resubmit := env.tracker.recheck(false) if len(resubmit) != len(txsB) { t.Fatalf("Unexpected transactions to resubmit, got: %d, want: %d", len(resubmit), len(txsB)) } - if len(all) == 0 || len(all[address]) == 0 { - t.Fatalf("Unexpected transactions being tracked, got: %d, want: %d", 0, len(txs)) + env.tracker.mu.Lock() + allCopy := maps.Clone(env.tracker.all) + env.tracker.mu.Unlock() + + if len(allCopy) != len(txs) { + t.Fatalf("Unexpected transactions being tracked, got: %d, want: %d", len(allCopy), len(txs)) + } +} + +func TestJournal(t *testing.T) { + journalPath := filepath.Join(t.TempDir(), fmt.Sprintf("%d", rand.Int63())) + env := newTestEnv(t, 10, 0, journalPath) + defer env.close() + + if err := env.tracker.Start(); err != nil { + t.Fatalf("Failed to start tracker: %v", err) + } + + txs := env.makeTxs(10) + txsA := txs[:len(txs)/2] + txsB := txs[len(txs)/2:] + env.pool.Add(txsA, true) + + pending, queued := env.pool.ContentFrom(address) + if len(pending) != len(txsA) || len(queued) != 0 { + t.Fatalf("Unexpected txpool content: %d, %d", len(pending), len(queued)) + } + env.tracker.TrackAll(txsA) + env.tracker.TrackAll(txsB) + env.tracker.Stop() + + // Make sure all the transactions are properly journalled + trackerB := New(journalPath, time.Minute, gspec.Config, env.pool) + if err := trackerB.journal.load(func(transactions []*types.Transaction) []error { + trackerB.TrackAll(transactions) + return nil + }); err != nil { + t.Fatalf("Failed to load journal: %v", err) } - if len(all[address]) != len(txs) { - t.Fatalf("Unexpected transactions being tracked, got: %d, want: %d", len(all[address]), len(txs)) + + trackerB.mu.Lock() + allCopy := maps.Clone(trackerB.all) + trackerB.mu.Unlock() + + if len(allCopy) != len(txs) { + t.Fatalf("Unexpected transactions being tracked, got: %d, want: %d", len(allCopy), len(txs)) + } +} + +func TestStartInitializesJournalWriter(t *testing.T) { + journalPath := filepath.Join(t.TempDir(), fmt.Sprintf("%d", rand.Int63())) + env := newTestEnv(t, 10, 0, journalPath) + defer env.close() + + if err := env.tracker.Start(); err != nil { + t.Fatalf("Failed to start tracker: %v", err) + } + defer env.tracker.Stop() + + if env.tracker.journal == nil { + t.Fatal("Journal should be configured") + } + if env.tracker.journal.writer == nil { + t.Fatal("Journal writer should be initialized before Start returns") + } +} + +func TestStartContinuesOnCorruptedJournal(t *testing.T) { + journalPath := filepath.Join(t.TempDir(), fmt.Sprintf("%d", rand.Int63())) + if err := os.WriteFile(journalPath, []byte{0xff, 0x00, 0x01}, 0o644); err != nil { + t.Fatalf("Failed to create corrupted journal: %v", err) + } + env := newTestEnv(t, 10, 0, journalPath) + defer env.close() + + if err := env.tracker.Start(); err != nil { + t.Fatalf("Start should continue when journal load fails, got: %v", err) + } + defer env.tracker.Stop() + + if env.tracker.journal.writer == nil { + t.Fatal("Journal writer should be initialized even if journal load fails") } }