Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 13 additions & 5 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1920,18 +1920,18 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.

// WriteBlockAndSetHead writes the given block and all associated state to the database,
// and applies the block as the new chain head.
func (bc *BlockChain) WriteBlockAndSetHead(block *types.Block, receipts []*types.Receipt, logs []*types.Log, state *state.StateDB, emitHeadEvent bool) (status WriteStatus, err error) {
func (bc *BlockChain) WriteBlockAndSetHead(block *types.Block, receipts []*types.Receipt, logs []*types.Log, state *state.StateDB, sealedBlockSender *event.TypeMux) (status WriteStatus, err error) {
if !bc.chainmu.TryLock() {
return NonStatTy, errChainStopped
}
defer bc.chainmu.Unlock()

return bc.writeBlockAndSetHead(block, receipts, logs, state, emitHeadEvent)
return bc.writeBlockAndSetHead(block, receipts, logs, state, sealedBlockSender)
}

// writeBlockAndSetHead is the internal implementation of WriteBlockAndSetHead.
// This function expects the chain mutex to be held.
func (bc *BlockChain) writeBlockAndSetHead(block *types.Block, receipts []*types.Receipt, logs []*types.Log, state *state.StateDB, emitHeadEvent bool) (status WriteStatus, err error) {
func (bc *BlockChain) writeBlockAndSetHead(block *types.Block, receipts []*types.Receipt, logs []*types.Log, state *state.StateDB, sealedBlockSender *event.TypeMux) (status WriteStatus, err error) {
currentBlock := bc.CurrentBlock()
reorg, err := bc.forker.ReorgNeededWithFastFinality(currentBlock, block.Header())
if err != nil {
Expand All @@ -1940,6 +1940,14 @@ func (bc *BlockChain) writeBlockAndSetHead(block *types.Block, receipts []*types
if reorg {
bc.highestVerifiedBlock.Store(types.CopyHeader(block.Header()))
bc.highestVerifiedBlockFeed.Send(HighestVerifiedBlockEvent{Header: block.Header()})
if sealedBlockSender != nil {
// If the local DB is corrupted, writeBlockWithState may fail.
// It's fine — other nodes will persist the block.
//
// If the block is invalid, writeBlockWithState will also fail.
// It's fine — other nodes will reject the block.
sealedBlockSender.Post(NewSealedBlockEvent{Block: block})
}
}

if err := bc.writeBlockWithState(block, receipts, state); err != nil {
Expand Down Expand Up @@ -1978,7 +1986,7 @@ func (bc *BlockChain) writeBlockAndSetHead(block *types.Block, receipts []*types
bc.SetFinalized(finalizedHeader)
}
}
if emitHeadEvent {
if sealedBlockSender != nil {
bc.chainHeadFeed.Send(ChainHeadEvent{Header: block.Header()})
if finalizedHeader != nil {
bc.finalizedHeaderFeed.Send(FinalizedHeaderEvent{finalizedHeader})
Expand Down Expand Up @@ -2485,7 +2493,7 @@ func (bc *BlockChain) processBlock(block *types.Block, statedb *state.StateDB, s
// Don't set the head, only insert the block
err = bc.writeBlockWithState(block, res.Receipts, statedb)
} else {
status, err = bc.writeBlockAndSetHead(block, res.Receipts, res.Logs, statedb, false)
status, err = bc.writeBlockAndSetHead(block, res.Receipts, res.Logs, statedb, nil)
}
if err != nil {
return nil, err
Expand Down
5 changes: 4 additions & 1 deletion core/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ type NewTxsEvent struct{ Txs []*types.Transaction }
// ReannoTxsEvent is posted when a batch of local pending transactions exceed a specified duration.
type ReannoTxsEvent struct{ Txs []*types.Transaction }

// NewMinedBlockEvent is posted when a block has been imported.
// NewSealedBlockEvent is posted when a block has been sealed.
type NewSealedBlockEvent struct{ Block *types.Block }

// NewMinedBlockEvent is posted when a block has been mined.
type NewMinedBlockEvent struct{ Block *types.Block }

// RemovedLogsEvent is posted when a reorg happens
Expand Down
7 changes: 4 additions & 3 deletions eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -749,7 +749,7 @@ func (h *handler) Start(maxPeers int, maxPeersPerIP int) {

// broadcast mined blocks
h.wg.Add(1)
h.minedBlockSub = h.eventMux.Subscribe(core.NewMinedBlockEvent{})
h.minedBlockSub = h.eventMux.Subscribe(core.NewMinedBlockEvent{}, core.NewSealedBlockEvent{})
go h.minedBroadcastLoop()

// start sync handlers
Expand Down Expand Up @@ -1043,8 +1043,9 @@ func (h *handler) minedBroadcastLoop() {
if obj == nil {
continue
}
if ev, ok := obj.Data.(core.NewMinedBlockEvent); ok {
h.BroadcastBlock(ev.Block, true) // First propagate block to peers
if ev, ok := obj.Data.(core.NewSealedBlockEvent); ok {
h.BroadcastBlock(ev.Block, true) // Propagate block to peers
} else if ev, ok := obj.Data.(core.NewMinedBlockEvent); ok {
h.BroadcastBlock(ev.Block, false) // Only then announce to the rest
}
case <-h.stopCh:
Expand Down
4 changes: 2 additions & 2 deletions miner/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -660,7 +660,7 @@ func (w *worker) resultLoop() {
// Commit block and state to database.
task.state.SetExpectedStateRoot(block.Root())
start := time.Now()
status, err := w.chain.WriteBlockAndSetHead(block, receipts, logs, task.state, true)
status, err := w.chain.WriteBlockAndSetHead(block, receipts, logs, task.state, w.mux)
if status != core.CanonStatTy {
if err != nil {
log.Error("Failed writing block to chain", "err", err, "status", status)
Expand All @@ -673,7 +673,7 @@ func (w *worker) resultLoop() {
stats := w.chain.GetBlockStats(block.Hash())
stats.SendBlockTime.Store(time.Now().UnixMilli())
stats.StartMiningTime.Store(task.miningStartAt.UnixMilli())
log.Info("Successfully sealed new block", "number", block.Number(), "sealhash", sealhash, "hash", hash,
log.Info("Successfully seal and write new block", "number", block.Number(), "sealhash", sealhash, "hash", hash,
"elapsed", common.PrettyDuration(time.Since(task.createdAt)))
w.mux.Post(core.NewMinedBlockEvent{Block: block})

Expand Down