Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core/vote: vote before committing state and writing block #2589

Merged
merged 2 commits into from
Jul 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ type ChainHeaderReader interface {
// GetHighestVerifiedHeader retrieves the highest header verified.
GetHighestVerifiedHeader() *types.Header

// GetVerifiedBlockByHash retrieves the highest verified block.
GetVerifiedBlockByHash(hash common.Hash) *types.Header

// ChasingHead return the best chain head of peers.
ChasingHead() *types.Header
}
Expand Down
2 changes: 1 addition & 1 deletion consensus/parlia/parlia.go
Original file line number Diff line number Diff line change
Expand Up @@ -1362,7 +1362,7 @@ func (p *Parlia) IsActiveValidatorAt(chain consensus.ChainHeaderReader, header *
func (p *Parlia) VerifyVote(chain consensus.ChainHeaderReader, vote *types.VoteEnvelope) error {
targetNumber := vote.Data.TargetNumber
targetHash := vote.Data.TargetHash
header := chain.GetHeaderByHash(targetHash)
header := chain.GetVerifiedBlockByHash(targetHash)
if header == nil {
log.Warn("BlockHeader at current voteBlockNumber is nil", "targetNumber", targetNumber, "targetHash", targetHash)
return errors.New("BlockHeader at current voteBlockNumber is nil")
Expand Down
33 changes: 20 additions & 13 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,23 +259,25 @@ type BlockChain struct {
triesInMemory uint64
txIndexer *txIndexer // Transaction indexer, might be nil if not enabled

hc *HeaderChain
rmLogsFeed event.Feed
chainFeed event.Feed
chainSideFeed event.Feed
chainHeadFeed event.Feed
chainBlockFeed event.Feed
logsFeed event.Feed
blockProcFeed event.Feed
finalizedHeaderFeed event.Feed
scope event.SubscriptionScope
genesisBlock *types.Block
hc *HeaderChain
rmLogsFeed event.Feed
chainFeed event.Feed
chainSideFeed event.Feed
chainHeadFeed event.Feed
chainBlockFeed event.Feed
logsFeed event.Feed
blockProcFeed event.Feed
finalizedHeaderFeed event.Feed
highestVerifiedBlockFeed event.Feed
scope event.SubscriptionScope
genesisBlock *types.Block

// This mutex synchronizes chain write operations.
// Readers don't need to take it, they can just read the database.
chainmu *syncx.ClosableMutex

highestVerifiedHeader atomic.Pointer[types.Header]
highestVerifiedBlock atomic.Pointer[types.Header]
currentBlock atomic.Pointer[types.Header] // Current head of the chain
currentSnapBlock atomic.Pointer[types.Header] // Current head of snap-sync
currentFinalBlock atomic.Pointer[types.Header] // Latest (consensus) finalized block
Expand Down Expand Up @@ -400,6 +402,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
}

bc.highestVerifiedHeader.Store(nil)
bc.highestVerifiedBlock.Store(nil)
bc.currentBlock.Store(nil)
bc.currentSnapBlock.Store(nil)
bc.chasingHead.Store(nil)
Expand Down Expand Up @@ -1925,8 +1928,12 @@ func (bc *BlockChain) writeBlockAndSetHead(block *types.Block, receipts []*types
if err != nil {
return NonStatTy, err
}
if reorg && mux != nil {
mux.Post(NewSealedBlockEvent{Block: block})
if reorg {
bc.highestVerifiedBlock.Store(types.CopyHeader(block.Header()))
bc.highestVerifiedBlockFeed.Send(HighestVerifiedBlockEvent{Header: block.Header()})
zzzckck marked this conversation as resolved.
Show resolved Hide resolved
if mux != nil {
mux.Post(NewSealedBlockEvent{Block: block})
}
}

if err := bc.writeBlockWithState(block, receipts, state); err != nil {
Expand Down
14 changes: 14 additions & 0 deletions core/blockchain_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,15 @@ func (bc *BlockChain) GetHeaderByHash(hash common.Hash) *types.Header {
return bc.hc.GetHeaderByHash(hash)
}

// GetVerifiedBlockByHash retrieves the header of a verified block, it may be only in memory.
func (bc *BlockChain) GetVerifiedBlockByHash(hash common.Hash) *types.Header {
highestVerifiedBlock := bc.highestVerifiedBlock.Load()
if highestVerifiedBlock != nil && highestVerifiedBlock.Hash() == hash {
return highestVerifiedBlock
}
return bc.hc.GetHeaderByHash(hash)
}

// GetHeaderByNumber retrieves a block header from the database by number,
// caching it (associated with its hash) if found.
func (bc *BlockChain) GetHeaderByNumber(number uint64) *types.Header {
Expand Down Expand Up @@ -486,6 +495,11 @@ func (bc *BlockChain) SubscribeChainHeadEvent(ch chan<- ChainHeadEvent) event.Su
return bc.scope.Track(bc.chainHeadFeed.Subscribe(ch))
}

// SubscribeHighestVerifiedBlockEvent registers a subscription of HighestVerifiedBlockEvent.
func (bc *BlockChain) SubscribeHighestVerifiedHeaderEvent(ch chan<- HighestVerifiedBlockEvent) event.Subscription {
return bc.scope.Track(bc.highestVerifiedBlockFeed.Subscribe(ch))
}

// SubscribeChainBlockEvent registers a subscription of ChainBlockEvent.
func (bc *BlockChain) SubscribeChainBlockEvent(ch chan<- ChainHeadEvent) event.Subscription {
return bc.scope.Track(bc.chainBlockFeed.Subscribe(ch))
Expand Down
4 changes: 4 additions & 0 deletions core/chain_makers.go
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,10 @@ func (cm *chainMaker) GetHighestVerifiedHeader() *types.Header {
panic("not supported")
}

func (cm *chainMaker) GetVerifiedBlockByHash(hash common.Hash) *types.Header {
return cm.GetHeaderByHash(hash)
}

func (cm *chainMaker) ChasingHead() *types.Header {
panic("not supported")
}
4 changes: 4 additions & 0 deletions core/data_availability_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,10 @@ func (r *mockDAHeaderReader) GetHighestVerifiedHeader() *types.Header {
panic("not supported")
}

func (r *mockDAHeaderReader) GetVerifiedBlockByHash(hash common.Hash) *types.Header {
panic("not supported")
}

func createMockDATx(config *params.ChainConfig, sidecar *types.BlobTxSidecar) *types.Transaction {
if sidecar == nil {
tx := &types.DynamicFeeTx{
Expand Down
2 changes: 2 additions & 0 deletions core/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,5 @@ type ChainSideEvent struct {
}

type ChainHeadEvent struct{ Block *types.Block }

type HighestVerifiedBlockEvent struct{ Header *types.Header }
4 changes: 4 additions & 0 deletions core/headerchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,10 @@ func (hc *HeaderChain) GetHighestVerifiedHeader() *types.Header {
return nil
}

func (hc *HeaderChain) GetVerifiedBlockByHash(hash common.Hash) *types.Header {
return hc.GetHeaderByHash(hash)
}

func (hc *HeaderChain) ChasingHead() *types.Header {
return nil
}
Expand Down
30 changes: 15 additions & 15 deletions core/vote/vote_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ type VoteManager struct {

chain *core.BlockChain

chainHeadCh chan core.ChainHeadEvent
chainHeadSub event.Subscription
highestVerifiedBlockCh chan core.HighestVerifiedBlockEvent
highestVerifiedBlockSub event.Subscription

// used for backup validators to sync votes from corresponding mining validator
syncVoteCh chan core.NewVoteEvent
Expand All @@ -49,12 +49,12 @@ type VoteManager struct {

func NewVoteManager(eth Backend, chain *core.BlockChain, pool *VotePool, journalPath, blsPasswordPath, blsWalletPath string, engine consensus.PoSA) (*VoteManager, error) {
voteManager := &VoteManager{
eth: eth,
chain: chain,
chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize),
syncVoteCh: make(chan core.NewVoteEvent, voteBufferForPut),
pool: pool,
engine: engine,
eth: eth,
chain: chain,
highestVerifiedBlockCh: make(chan core.HighestVerifiedBlockEvent, highestVerifiedBlockChanSize),
syncVoteCh: make(chan core.NewVoteEvent, voteBufferForPut),
pool: pool,
engine: engine,
}

// Create voteSigner.
Expand All @@ -74,7 +74,7 @@ func NewVoteManager(eth Backend, chain *core.BlockChain, pool *VotePool, journal
voteManager.journal = voteJournal

// Subscribe to chain head event.
voteManager.chainHeadSub = voteManager.chain.SubscribeChainHeadEvent(voteManager.chainHeadCh)
voteManager.highestVerifiedBlockSub = voteManager.chain.SubscribeHighestVerifiedHeaderEvent(voteManager.highestVerifiedBlockCh)
voteManager.syncVoteSub = voteManager.pool.SubscribeNewVoteEvent(voteManager.syncVoteCh)

go voteManager.loop()
Expand All @@ -84,7 +84,7 @@ func NewVoteManager(eth Backend, chain *core.BlockChain, pool *VotePool, journal

func (voteManager *VoteManager) loop() {
log.Debug("vote manager routine loop started")
defer voteManager.chainHeadSub.Unsubscribe()
defer voteManager.highestVerifiedBlockSub.Unsubscribe()
defer voteManager.syncVoteSub.Unsubscribe()

events := voteManager.eth.EventMux().Subscribe(downloader.StartEvent{}, downloader.DoneEvent{}, downloader.FailedEvent{})
Expand Down Expand Up @@ -119,7 +119,7 @@ func (voteManager *VoteManager) loop() {
log.Debug("downloader is in DoneEvent mode, set the startVote flag to true")
startVote = true
}
case cHead := <-voteManager.chainHeadCh:
case cHead := <-voteManager.highestVerifiedBlockCh:
if !startVote {
log.Debug("startVote flag is false, continue")
continue
Expand All @@ -135,12 +135,12 @@ func (voteManager *VoteManager) loop() {
continue
}

if cHead.Block == nil {
log.Debug("cHead.Block is nil, continue")
if cHead.Header == nil {
log.Debug("cHead.Header is nil, continue")
continue
}

curHead := cHead.Block.Header()
curHead := cHead.Header
if p, ok := voteManager.engine.(*parlia.Parlia); ok {
nextBlockMinedTime := time.Unix(int64((curHead.Time + p.Period())), 0)
timeForBroadcast := 50 * time.Millisecond // enough to broadcast a vote
Expand Down Expand Up @@ -217,7 +217,7 @@ func (voteManager *VoteManager) loop() {
case <-voteManager.syncVoteSub.Err():
log.Debug("voteManager subscribed votes failed")
return
case <-voteManager.chainHeadSub.Err():
case <-voteManager.highestVerifiedBlockSub.Err():
log.Debug("voteManager subscribed chainHead failed")
return
}
Expand Down
42 changes: 21 additions & 21 deletions core/vote/vote_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ const (
lowerLimitOfVoteBlockNumber = 256
upperLimitOfVoteBlockNumber = 11 // refer to fetcher.maxUncleDist

chainHeadChanSize = 10 // chainHeadChanSize is the size of channel listening to ChainHeadEvent.
highestVerifiedBlockChanSize = 10 // highestVerifiedBlockChanSize is the size of channel listening to HighestVerifiedBlockEvent.
)

var (
Expand Down Expand Up @@ -57,8 +57,8 @@ type VotePool struct {
curVotesPq *votesPriorityQueue
futureVotesPq *votesPriorityQueue

chainHeadCh chan core.ChainHeadEvent
chainHeadSub event.Subscription
highestVerifiedBlockCh chan core.HighestVerifiedBlockEvent
highestVerifiedBlockSub event.Subscription

votesCh chan *types.VoteEnvelope

Expand All @@ -69,38 +69,38 @@ type votesPriorityQueue []*types.VoteData

func NewVotePool(chain *core.BlockChain, engine consensus.PoSA) *VotePool {
votePool := &VotePool{
chain: chain,
receivedVotes: mapset.NewSet[common.Hash](),
curVotes: make(map[common.Hash]*VoteBox),
futureVotes: make(map[common.Hash]*VoteBox),
curVotesPq: &votesPriorityQueue{},
futureVotesPq: &votesPriorityQueue{},
chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize),
votesCh: make(chan *types.VoteEnvelope, voteBufferForPut),
engine: engine,
chain: chain,
receivedVotes: mapset.NewSet[common.Hash](),
curVotes: make(map[common.Hash]*VoteBox),
futureVotes: make(map[common.Hash]*VoteBox),
curVotesPq: &votesPriorityQueue{},
futureVotesPq: &votesPriorityQueue{},
highestVerifiedBlockCh: make(chan core.HighestVerifiedBlockEvent, highestVerifiedBlockChanSize),
votesCh: make(chan *types.VoteEnvelope, voteBufferForPut),
engine: engine,
}

// Subscribe events from blockchain and start the main event loop.
votePool.chainHeadSub = votePool.chain.SubscribeChainHeadEvent(votePool.chainHeadCh)
votePool.highestVerifiedBlockSub = votePool.chain.SubscribeHighestVerifiedHeaderEvent(votePool.highestVerifiedBlockCh)

go votePool.loop()
return votePool
}

// loop is the vote pool's main even loop, waiting for and reacting to outside blockchain events and votes channel event.
func (pool *VotePool) loop() {
defer pool.chainHeadSub.Unsubscribe()
defer pool.highestVerifiedBlockSub.Unsubscribe()

for {
select {
// Handle ChainHeadEvent.
case ev := <-pool.chainHeadCh:
if ev.Block != nil {
latestBlockNumber := ev.Block.NumberU64()
case ev := <-pool.highestVerifiedBlockCh:
if ev.Header != nil {
latestBlockNumber := ev.Header.Number.Uint64()
pool.prune(latestBlockNumber)
pool.transferVotesFromFutureToCur(ev.Block.Header())
pool.transferVotesFromFutureToCur(ev.Header)
}
case <-pool.chainHeadSub.Err():
case <-pool.highestVerifiedBlockSub.Err():
return

// Handle votes channel and put the vote into vote pool.
Expand Down Expand Up @@ -135,7 +135,7 @@ func (pool *VotePool) putIntoVotePool(vote *types.VoteEnvelope) bool {
var votesPq *votesPriorityQueue
isFutureVote := false

voteBlock := pool.chain.GetHeaderByHash(targetHash)
voteBlock := pool.chain.GetVerifiedBlockByHash(targetHash)
if voteBlock == nil {
votes = pool.futureVotes
votesPq = pool.futureVotesPq
Expand Down Expand Up @@ -226,7 +226,7 @@ func (pool *VotePool) transferVotesFromFutureToCur(latestBlockHeader *types.Head
futurePqBuffer := make([]*types.VoteData, 0)
for futurePq.Len() > 0 && futurePq.Peek().TargetNumber <= latestBlockNumber {
blockHash := futurePq.Peek().TargetHash
header := pool.chain.GetHeaderByHash(blockHash)
header := pool.chain.GetVerifiedBlockByHash(blockHash)
if header == nil {
// Put into pq buffer used for later put again into futurePq
futurePqBuffer = append(futurePqBuffer, heap.Pop(futurePq).(*types.VoteData))
Expand Down
Loading