From 3c93a655b4575afe4650a2ec2c23fbe668781910 Mon Sep 17 00:00:00 2001 From: buddh0 Date: Tue, 3 Jun 2025 11:13:40 +0800 Subject: [PATCH 1/4] miner/worker: broadcast mined block immediately before wroten into db --- core/blockchain.go | 18 +++++++++++++----- miner/worker.go | 6 ++---- 2 files changed, 15 insertions(+), 9 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index 7ab7e54e89..b34b57ebe0 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -1920,18 +1920,18 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. // WriteBlockAndSetHead writes the given block and all associated state to the database, // and applies the block as the new chain head. -func (bc *BlockChain) WriteBlockAndSetHead(block *types.Block, receipts []*types.Receipt, logs []*types.Log, state *state.StateDB, emitHeadEvent bool) (status WriteStatus, err error) { +func (bc *BlockChain) WriteBlockAndSetHead(block *types.Block, receipts []*types.Receipt, logs []*types.Log, state *state.StateDB, minedBlockSender *event.TypeMux) (status WriteStatus, err error) { if !bc.chainmu.TryLock() { return NonStatTy, errChainStopped } defer bc.chainmu.Unlock() - return bc.writeBlockAndSetHead(block, receipts, logs, state, emitHeadEvent) + return bc.writeBlockAndSetHead(block, receipts, logs, state, minedBlockSender) } // writeBlockAndSetHead is the internal implementation of WriteBlockAndSetHead. // This function expects the chain mutex to be held. -func (bc *BlockChain) writeBlockAndSetHead(block *types.Block, receipts []*types.Receipt, logs []*types.Log, state *state.StateDB, emitHeadEvent bool) (status WriteStatus, err error) { +func (bc *BlockChain) writeBlockAndSetHead(block *types.Block, receipts []*types.Receipt, logs []*types.Log, state *state.StateDB, minedBlockSender *event.TypeMux) (status WriteStatus, err error) { currentBlock := bc.CurrentBlock() reorg, err := bc.forker.ReorgNeededWithFastFinality(currentBlock, block.Header()) if err != nil { @@ -1940,6 +1940,14 @@ func (bc *BlockChain) writeBlockAndSetHead(block *types.Block, receipts []*types if reorg { bc.highestVerifiedBlock.Store(types.CopyHeader(block.Header())) bc.highestVerifiedBlockFeed.Send(HighestVerifiedBlockEvent{Header: block.Header()}) + if minedBlockSender != nil { + // If the local DB is corrupted, writeBlockWithState may fail. + // It's fine — other nodes will persist the block. + // + // If the block is invalid, writeBlockWithState will also fail. + // It's fine — other nodes will reject the block. + minedBlockSender.Post(NewMinedBlockEvent{Block: block}) + } } if err := bc.writeBlockWithState(block, receipts, state); err != nil { @@ -1978,7 +1986,7 @@ func (bc *BlockChain) writeBlockAndSetHead(block *types.Block, receipts []*types bc.SetFinalized(finalizedHeader) } } - if emitHeadEvent { + if minedBlockSender != nil { bc.chainHeadFeed.Send(ChainHeadEvent{Header: block.Header()}) if finalizedHeader != nil { bc.finalizedHeaderFeed.Send(FinalizedHeaderEvent{finalizedHeader}) @@ -2485,7 +2493,7 @@ func (bc *BlockChain) processBlock(block *types.Block, statedb *state.StateDB, s // Don't set the head, only insert the block err = bc.writeBlockWithState(block, res.Receipts, statedb) } else { - status, err = bc.writeBlockAndSetHead(block, res.Receipts, res.Logs, statedb, false) + status, err = bc.writeBlockAndSetHead(block, res.Receipts, res.Logs, statedb, nil) } if err != nil { return nil, err diff --git a/miner/worker.go b/miner/worker.go index 8684909879..3361b1360b 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -660,7 +660,7 @@ func (w *worker) resultLoop() { // Commit block and state to database. task.state.SetExpectedStateRoot(block.Root()) start := time.Now() - status, err := w.chain.WriteBlockAndSetHead(block, receipts, logs, task.state, true) + status, err := w.chain.WriteBlockAndSetHead(block, receipts, logs, task.state, w.mux) if status != core.CanonStatTy { if err != nil { log.Error("Failed writing block to chain", "err", err, "status", status) @@ -673,10 +673,8 @@ func (w *worker) resultLoop() { stats := w.chain.GetBlockStats(block.Hash()) stats.SendBlockTime.Store(time.Now().UnixMilli()) stats.StartMiningTime.Store(task.miningStartAt.UnixMilli()) - log.Info("Successfully sealed new block", "number", block.Number(), "sealhash", sealhash, "hash", hash, + log.Info("Successfully seal and write new block", "number", block.Number(), "sealhash", sealhash, "hash", hash, "elapsed", common.PrettyDuration(time.Since(task.createdAt))) - w.mux.Post(core.NewMinedBlockEvent{Block: block}) - case <-w.exitCh: return } From fe2b50054b684ff899c3ea28dc151816035262d7 Mon Sep 17 00:00:00 2001 From: buddh0 Date: Thu, 5 Jun 2025 17:19:01 +0800 Subject: [PATCH 2/4] eth: define NewSealedBlockEvent --- core/blockchain.go | 12 ++++++------ core/events.go | 5 ++++- eth/handler.go | 7 ++++--- 3 files changed, 14 insertions(+), 10 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index b34b57ebe0..38e4c3b13e 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -1920,18 +1920,18 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. // WriteBlockAndSetHead writes the given block and all associated state to the database, // and applies the block as the new chain head. -func (bc *BlockChain) WriteBlockAndSetHead(block *types.Block, receipts []*types.Receipt, logs []*types.Log, state *state.StateDB, minedBlockSender *event.TypeMux) (status WriteStatus, err error) { +func (bc *BlockChain) WriteBlockAndSetHead(block *types.Block, receipts []*types.Receipt, logs []*types.Log, state *state.StateDB, sealedBlockSender *event.TypeMux) (status WriteStatus, err error) { if !bc.chainmu.TryLock() { return NonStatTy, errChainStopped } defer bc.chainmu.Unlock() - return bc.writeBlockAndSetHead(block, receipts, logs, state, minedBlockSender) + return bc.writeBlockAndSetHead(block, receipts, logs, state, sealedBlockSender) } // writeBlockAndSetHead is the internal implementation of WriteBlockAndSetHead. // This function expects the chain mutex to be held. -func (bc *BlockChain) writeBlockAndSetHead(block *types.Block, receipts []*types.Receipt, logs []*types.Log, state *state.StateDB, minedBlockSender *event.TypeMux) (status WriteStatus, err error) { +func (bc *BlockChain) writeBlockAndSetHead(block *types.Block, receipts []*types.Receipt, logs []*types.Log, state *state.StateDB, sealedBlockSender *event.TypeMux) (status WriteStatus, err error) { currentBlock := bc.CurrentBlock() reorg, err := bc.forker.ReorgNeededWithFastFinality(currentBlock, block.Header()) if err != nil { @@ -1940,13 +1940,13 @@ func (bc *BlockChain) writeBlockAndSetHead(block *types.Block, receipts []*types if reorg { bc.highestVerifiedBlock.Store(types.CopyHeader(block.Header())) bc.highestVerifiedBlockFeed.Send(HighestVerifiedBlockEvent{Header: block.Header()}) - if minedBlockSender != nil { + if sealedBlockSender != nil { // If the local DB is corrupted, writeBlockWithState may fail. // It's fine — other nodes will persist the block. // // If the block is invalid, writeBlockWithState will also fail. // It's fine — other nodes will reject the block. - minedBlockSender.Post(NewMinedBlockEvent{Block: block}) + sealedBlockSender.Post(NewSealedBlockEvent{Block: block}) } } @@ -1986,7 +1986,7 @@ func (bc *BlockChain) writeBlockAndSetHead(block *types.Block, receipts []*types bc.SetFinalized(finalizedHeader) } } - if minedBlockSender != nil { + if sealedBlockSender != nil { bc.chainHeadFeed.Send(ChainHeadEvent{Header: block.Header()}) if finalizedHeader != nil { bc.finalizedHeaderFeed.Send(FinalizedHeaderEvent{finalizedHeader}) diff --git a/core/events.go b/core/events.go index 67b5bedfd6..3fc714822c 100644 --- a/core/events.go +++ b/core/events.go @@ -26,7 +26,10 @@ type NewTxsEvent struct{ Txs []*types.Transaction } // ReannoTxsEvent is posted when a batch of local pending transactions exceed a specified duration. type ReannoTxsEvent struct{ Txs []*types.Transaction } -// NewMinedBlockEvent is posted when a block has been imported. +// NewSealedBlockEvent is posted when a block has been sealed. +type NewSealedBlockEvent struct{ Block *types.Block } + +// NewMinedBlockEvent is posted when a block has been mined. type NewMinedBlockEvent struct{ Block *types.Block } // RemovedLogsEvent is posted when a reorg happens diff --git a/eth/handler.go b/eth/handler.go index 2800918ea0..a2bf8f7399 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -749,7 +749,7 @@ func (h *handler) Start(maxPeers int, maxPeersPerIP int) { // broadcast mined blocks h.wg.Add(1) - h.minedBlockSub = h.eventMux.Subscribe(core.NewMinedBlockEvent{}) + h.minedBlockSub = h.eventMux.Subscribe(core.NewMinedBlockEvent{}, core.NewSealedBlockEvent{}) go h.minedBroadcastLoop() // start sync handlers @@ -1043,8 +1043,9 @@ func (h *handler) minedBroadcastLoop() { if obj == nil { continue } - if ev, ok := obj.Data.(core.NewMinedBlockEvent); ok { - h.BroadcastBlock(ev.Block, true) // First propagate block to peers + if ev, ok := obj.Data.(core.NewSealedBlockEvent); ok { + h.BroadcastBlock(ev.Block, true) // Propagate block to peers + } else if ev, ok := obj.Data.(core.NewMinedBlockEvent); ok { h.BroadcastBlock(ev.Block, false) // Only then announce to the rest } case <-h.stopCh: From daeaebc646d7fe519a717668c433106dd1363097 Mon Sep 17 00:00:00 2001 From: buddh0 Date: Thu, 5 Jun 2025 17:36:43 +0800 Subject: [PATCH 3/4] miner: recover posting NewMinedBlockEvent --- miner/worker.go | 1 + 1 file changed, 1 insertion(+) diff --git a/miner/worker.go b/miner/worker.go index 3361b1360b..17fdc695bc 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -675,6 +675,7 @@ func (w *worker) resultLoop() { stats.StartMiningTime.Store(task.miningStartAt.UnixMilli()) log.Info("Successfully seal and write new block", "number", block.Number(), "sealhash", sealhash, "hash", hash, "elapsed", common.PrettyDuration(time.Since(task.createdAt))) + w.mux.Post(core.NewMinedBlockEvent{Block: block}) case <-w.exitCh: return } From 354e913a51e570c75607b6d3abd9da92d1dfc6d3 Mon Sep 17 00:00:00 2001 From: buddh0 Date: Thu, 5 Jun 2025 17:38:26 +0800 Subject: [PATCH 4/4] miner: recover empty line --- miner/worker.go | 1 + 1 file changed, 1 insertion(+) diff --git a/miner/worker.go b/miner/worker.go index 17fdc695bc..602f2836b0 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -676,6 +676,7 @@ func (w *worker) resultLoop() { log.Info("Successfully seal and write new block", "number", block.Number(), "sealhash", sealhash, "hash", hash, "elapsed", common.PrettyDuration(time.Since(task.createdAt))) w.mux.Post(core.NewMinedBlockEvent{Block: block}) + case <-w.exitCh: return }