diff --git a/core/blockchain.go b/core/blockchain.go index 7ab7e54e89..38e4c3b13e 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -1920,18 +1920,18 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. // WriteBlockAndSetHead writes the given block and all associated state to the database, // and applies the block as the new chain head. -func (bc *BlockChain) WriteBlockAndSetHead(block *types.Block, receipts []*types.Receipt, logs []*types.Log, state *state.StateDB, emitHeadEvent bool) (status WriteStatus, err error) { +func (bc *BlockChain) WriteBlockAndSetHead(block *types.Block, receipts []*types.Receipt, logs []*types.Log, state *state.StateDB, sealedBlockSender *event.TypeMux) (status WriteStatus, err error) { if !bc.chainmu.TryLock() { return NonStatTy, errChainStopped } defer bc.chainmu.Unlock() - return bc.writeBlockAndSetHead(block, receipts, logs, state, emitHeadEvent) + return bc.writeBlockAndSetHead(block, receipts, logs, state, sealedBlockSender) } // writeBlockAndSetHead is the internal implementation of WriteBlockAndSetHead. // This function expects the chain mutex to be held. -func (bc *BlockChain) writeBlockAndSetHead(block *types.Block, receipts []*types.Receipt, logs []*types.Log, state *state.StateDB, emitHeadEvent bool) (status WriteStatus, err error) { +func (bc *BlockChain) writeBlockAndSetHead(block *types.Block, receipts []*types.Receipt, logs []*types.Log, state *state.StateDB, sealedBlockSender *event.TypeMux) (status WriteStatus, err error) { currentBlock := bc.CurrentBlock() reorg, err := bc.forker.ReorgNeededWithFastFinality(currentBlock, block.Header()) if err != nil { @@ -1940,6 +1940,14 @@ func (bc *BlockChain) writeBlockAndSetHead(block *types.Block, receipts []*types if reorg { bc.highestVerifiedBlock.Store(types.CopyHeader(block.Header())) bc.highestVerifiedBlockFeed.Send(HighestVerifiedBlockEvent{Header: block.Header()}) + if sealedBlockSender != nil { + // If the local DB is corrupted, writeBlockWithState may fail. + // It's fine — other nodes will persist the block. + // + // If the block is invalid, writeBlockWithState will also fail. + // It's fine — other nodes will reject the block. + sealedBlockSender.Post(NewSealedBlockEvent{Block: block}) + } } if err := bc.writeBlockWithState(block, receipts, state); err != nil { @@ -1978,7 +1986,7 @@ func (bc *BlockChain) writeBlockAndSetHead(block *types.Block, receipts []*types bc.SetFinalized(finalizedHeader) } } - if emitHeadEvent { + if sealedBlockSender != nil { bc.chainHeadFeed.Send(ChainHeadEvent{Header: block.Header()}) if finalizedHeader != nil { bc.finalizedHeaderFeed.Send(FinalizedHeaderEvent{finalizedHeader}) @@ -2485,7 +2493,7 @@ func (bc *BlockChain) processBlock(block *types.Block, statedb *state.StateDB, s // Don't set the head, only insert the block err = bc.writeBlockWithState(block, res.Receipts, statedb) } else { - status, err = bc.writeBlockAndSetHead(block, res.Receipts, res.Logs, statedb, false) + status, err = bc.writeBlockAndSetHead(block, res.Receipts, res.Logs, statedb, nil) } if err != nil { return nil, err diff --git a/core/events.go b/core/events.go index 67b5bedfd6..3fc714822c 100644 --- a/core/events.go +++ b/core/events.go @@ -26,7 +26,10 @@ type NewTxsEvent struct{ Txs []*types.Transaction } // ReannoTxsEvent is posted when a batch of local pending transactions exceed a specified duration. type ReannoTxsEvent struct{ Txs []*types.Transaction } -// NewMinedBlockEvent is posted when a block has been imported. +// NewSealedBlockEvent is posted when a block has been sealed. +type NewSealedBlockEvent struct{ Block *types.Block } + +// NewMinedBlockEvent is posted when a block has been mined. type NewMinedBlockEvent struct{ Block *types.Block } // RemovedLogsEvent is posted when a reorg happens diff --git a/eth/handler.go b/eth/handler.go index 2800918ea0..a2bf8f7399 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -749,7 +749,7 @@ func (h *handler) Start(maxPeers int, maxPeersPerIP int) { // broadcast mined blocks h.wg.Add(1) - h.minedBlockSub = h.eventMux.Subscribe(core.NewMinedBlockEvent{}) + h.minedBlockSub = h.eventMux.Subscribe(core.NewMinedBlockEvent{}, core.NewSealedBlockEvent{}) go h.minedBroadcastLoop() // start sync handlers @@ -1043,8 +1043,9 @@ func (h *handler) minedBroadcastLoop() { if obj == nil { continue } - if ev, ok := obj.Data.(core.NewMinedBlockEvent); ok { - h.BroadcastBlock(ev.Block, true) // First propagate block to peers + if ev, ok := obj.Data.(core.NewSealedBlockEvent); ok { + h.BroadcastBlock(ev.Block, true) // Propagate block to peers + } else if ev, ok := obj.Data.(core.NewMinedBlockEvent); ok { h.BroadcastBlock(ev.Block, false) // Only then announce to the rest } case <-h.stopCh: diff --git a/miner/worker.go b/miner/worker.go index 8684909879..602f2836b0 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -660,7 +660,7 @@ func (w *worker) resultLoop() { // Commit block and state to database. task.state.SetExpectedStateRoot(block.Root()) start := time.Now() - status, err := w.chain.WriteBlockAndSetHead(block, receipts, logs, task.state, true) + status, err := w.chain.WriteBlockAndSetHead(block, receipts, logs, task.state, w.mux) if status != core.CanonStatTy { if err != nil { log.Error("Failed writing block to chain", "err", err, "status", status) @@ -673,7 +673,7 @@ func (w *worker) resultLoop() { stats := w.chain.GetBlockStats(block.Hash()) stats.SendBlockTime.Store(time.Now().UnixMilli()) stats.StartMiningTime.Store(task.miningStartAt.UnixMilli()) - log.Info("Successfully sealed new block", "number", block.Number(), "sealhash", sealhash, "hash", hash, + log.Info("Successfully seal and write new block", "number", block.Number(), "sealhash", sealhash, "hash", hash, "elapsed", common.PrettyDuration(time.Since(task.createdAt))) w.mux.Post(core.NewMinedBlockEvent{Block: block})