Skip to content

Commit

Permalink
refactor: verify and save block messages when receive new bock
Browse files Browse the repository at this point in the history
  • Loading branch information
simlecode committed Sep 4, 2023
1 parent 58f9b9d commit 6fd93cc
Show file tree
Hide file tree
Showing 15 changed files with 465 additions and 244 deletions.
22 changes: 9 additions & 13 deletions app/submodule/syncer/syncer_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,14 @@ func (sa *syncerAPI) SyncerTracker(ctx context.Context) *types.TargetTracker {
}
convertTarget := func(src *syncTypes.Target) *types.Target {
return &types.Target{
State: convertSyncStateStage(src.State),
Base: src.Base,
Current: src.Current,
Start: src.Start,
End: src.End,
Err: src.Err,
ChainInfo: src.ChainInfo,
State: convertSyncStateStage(src.State),
Base: src.Base,
Current: src.Current,
Start: src.Start,
End: src.End,
Err: src.Err,
Head: src.Head,
Sender: src.Sender,
}
}
for _, target := range tracker.History() {
Expand Down Expand Up @@ -136,16 +137,11 @@ func (sa *syncerAPI) SyncSubmitBlock(ctx context.Context, blk *types.BlockMsg) e
return fmt.Errorf("provided messages did not match block: %v", err)
}

ts, err := types.NewTipSet([]*types.BlockHeader{blk.Header})
if err != nil {
return fmt.Errorf("somehow failed to make a tipset out of a single block: %v", err)
}

if _, err := chainModule.ChainReader.PutObject(ctx, blk.Header); err != nil {
return err
}
localPeer := sa.syncer.NetworkModule.Network.GetPeerID()
ci := types.NewChainInfo(localPeer, localPeer, ts)
ci := types.NewChainInfo(localPeer, localPeer, &types.FullTipSet{Blocks: []*types.FullBlock{fb}})
if err := sa.syncer.SyncProvider.HandleNewTipSet(ci); err != nil {
return fmt.Errorf("sync to submitted block failed: %v", err)
}
Expand Down
14 changes: 10 additions & 4 deletions app/submodule/syncer/syncer_submodule.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,11 +207,13 @@ func (syncer *SyncerSubmodule) handleIncomingBlocks(ctx context.Context, msg pub

blkSvc := blockservice.New(syncer.BlockstoreModule.Blockstore, syncer.NetworkModule.Bitswap)

if _, err := syncer.NetworkModule.FetchMessagesByCids(ctx, blkSvc, bm.BlsMessages); err != nil {
blsMsgs, err := syncer.NetworkModule.FetchMessagesByCids(ctx, blkSvc, bm.BlsMessages)
if err != nil {
log.Errorf("fetch block bls messages failed:%s", err.Error())
return
}
if _, err := syncer.NetworkModule.FetchSignedMessagesByCids(ctx, blkSvc, bm.SecpkMessages); err != nil {
secpMsgs, err := syncer.NetworkModule.FetchSignedMessagesByCids(ctx, blkSvc, bm.SecpkMessages)
if err != nil {
log.Errorf("fetch block signed messages failed:%s", err.Error())
return
}
Expand All @@ -224,8 +226,12 @@ func (syncer *SyncerSubmodule) handleIncomingBlocks(ctx context.Context, msg pub

syncer.NetworkModule.Host.ConnManager().TagPeer(sender, "new-block", 20)

ts, _ := types.NewTipSet([]*types.BlockHeader{header})
chainInfo := types.NewChainInfo(source, sender, ts)
fullBlock := &types.FullBlock{
Header: header,
BLSMessages: blsMsgs,
SECPMessages: secpMsgs,
}
chainInfo := types.NewChainInfo(source, sender, &types.FullTipSet{Blocks: []*types.FullBlock{fullBlock}})

if err = syncer.ChainSyncManager.BlockProposer().SendGossipBlock(chainInfo); err != nil {
log.Errorf("failed to notify syncer of new block, block: %s", err)
Expand Down
5 changes: 4 additions & 1 deletion pkg/chainsync/chainsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,10 @@ func NewManager(
}

return Manager{
dispatcher: dispatcher.NewDispatcher(chainSyncer),
dispatcher: dispatcher.NewDispatcher(struct {
*syncer.Syncer
*consensus.BlockValidator
}{Syncer: chainSyncer, BlockValidator: hv}, submodule.ChainReader),
}, nil
}

Expand Down
57 changes: 44 additions & 13 deletions pkg/chainsync/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ package dispatcher
import (
"container/list"
"context"
"fmt"
"runtime/debug"
"sync"
atmoic2 "sync/atomic"
"time"

"github.com/filecoin-project/pubsub"
"github.com/filecoin-project/venus/pkg/chain"
"github.com/filecoin-project/venus/pkg/chainsync/types"
types2 "github.com/filecoin-project/venus/venus-shared/types"
"github.com/streadway/handy/atomic"
Expand All @@ -30,15 +32,16 @@ const LocalIncoming = "incoming"
type dispatchSyncer interface {
Head() *types2.TipSet
HandleNewTipSet(context.Context, *types.Target) error
ValidateMsgMeta(ctx context.Context, fblk *types2.FullBlock) error
}

// NewDispatcher creates a new syncing dispatcher with default queue sizes.
func NewDispatcher(catchupSyncer dispatchSyncer) *Dispatcher {
return NewDispatcherWithSizes(catchupSyncer, DefaultWorkQueueSize, DefaultInQueueSize)
func NewDispatcher(catchupSyncer dispatchSyncer, chainStore *chain.Store) *Dispatcher {
return NewDispatcherWithSizes(catchupSyncer, chainStore, DefaultWorkQueueSize, DefaultInQueueSize)
}

// NewDispatcherWithSizes creates a new syncing dispatcher.
func NewDispatcherWithSizes(syncer dispatchSyncer, workQueueSize, inQueueSize int) *Dispatcher {
func NewDispatcherWithSizes(syncer dispatchSyncer, chainStore *chain.Store, workQueueSize, inQueueSize int) *Dispatcher {
return &Dispatcher{
workTracker: types.NewTargetTracker(workQueueSize),
syncer: syncer,
Expand All @@ -48,6 +51,7 @@ func NewDispatcherWithSizes(syncer dispatchSyncer, workQueueSize, inQueueSize in
cancelControler: list.New(),
maxCount: 1,
incomingPubsub: pubsub.New(50),
chainStore: chainStore,
}
}

Expand Down Expand Up @@ -90,35 +94,62 @@ type Dispatcher struct {
maxCount int64

incomingPubsub *pubsub.PubSub
chainStore *chain.Store
}

// SyncTracker returns the target tracker of syncing
// SyncTracker returnss the target tracker of syncing
func (d *Dispatcher) SyncTracker() *types.TargetTracker {
return d.workTracker
}

func (d *Dispatcher) sendHead(ci *types2.ChainInfo) error {
ctx := context.Background()
fts := ci.FullTipSet
if fts == nil {
return fmt.Errorf("got nil tipset")
}

for _, b := range fts.Blocks {
if err := d.syncer.ValidateMsgMeta(ctx, b); err != nil {
log.Warnf("invalid block %s received: %s", b.Cid(), err)
return fmt.Errorf("validate block %s message meta failed: %v", b.Cid(), err)
}
}

for _, b := range fts.Blocks {
_, err := d.chainStore.PutObject(ctx, b.Header)
if err != nil {
return fmt.Errorf("fail to save block to tipset")
}
}

d.incomingPubsub.Pub(fts.TipSet().Blocks(), LocalIncoming)

return d.addTracker(ci)
}

// SendHello handles chain information from bootstrap peers.
func (d *Dispatcher) SendHello(ci *types2.ChainInfo) error {
return d.addTracker(ci)
return d.sendHead(ci)
}

// SendOwnBlock handles chain info from a node's own mining system
func (d *Dispatcher) SendOwnBlock(ci *types2.ChainInfo) error {
return d.addTracker(ci)
return d.sendHead(ci)
}

// SendGossipBlock handles chain info from new blocks sent on pubsub
func (d *Dispatcher) SendGossipBlock(ci *types2.ChainInfo) error {
return d.addTracker(ci)
return d.sendHead(ci)
}

func (d *Dispatcher) addTracker(ci *types2.ChainInfo) error {
d.incomingPubsub.Pub(ci.Head.Blocks(), LocalIncoming)
d.incoming <- &types.Target{
ChainInfo: *ci,
Base: d.syncer.Head(),
Current: d.syncer.Head(),
Start: time.Now(),
Head: ci.FullTipSet.TipSet(),
Base: d.syncer.Head(),
Current: d.syncer.Head(),
Start: time.Now(),
Sender: ci.Sender,
}
return nil
}
Expand Down Expand Up @@ -152,7 +183,7 @@ func (d *Dispatcher) processIncoming(ctx context.Context) {
// Sort new targets by putting on work queue.
if d.workTracker.Add(target) {
log.Infow("received new tipset", "height", target.Head.Height(), "blocks", target.Head.Len(), "from",
target.ChainInfo.Sender, "current work len", d.workTracker.Len(), "incoming channel len", len(d.incoming))
target.Sender, "current work len", d.workTracker.Len(), "incoming channel len", len(d.incoming))
}
}
}
Expand Down
37 changes: 23 additions & 14 deletions pkg/chainsync/dispatcher/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (

"github.com/ipfs/go-cid"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/venus/pkg/chain"
"github.com/filecoin-project/venus/pkg/testhelpers"

fbig "github.com/filecoin-project/go-state-types/big"
Expand Down Expand Up @@ -36,12 +38,18 @@ func (fs *mockSyncer) HandleNewTipSet(_ context.Context, ci *syncTypes.Target) e
return nil
}

func (fs *mockSyncer) ValidateMsgMeta(ctx context.Context, fblk *types.FullBlock) error {
return nil
}

func TestDispatchStartHappy(t *testing.T) {
tf.UnitTest(t)
s := &mockSyncer{
headsCalled: make([]*types.TipSet, 0),
}
testDispatch := dispatcher.NewDispatcher(s)
builder := chain.NewBuilder(t, address.Undef)

testDispatch := dispatcher.NewDispatcher(s, builder.Store())

cis := []*types.ChainInfo{
// We need to put these in priority order to avoid a race.
Expand Down Expand Up @@ -69,7 +77,7 @@ func TestDispatchStartHappy(t *testing.T) {
waitCh := make(chan struct{})
// stm: @CHAINSYNC_DISPATCHER_REGISTER_CALLBACK_001
testDispatch.RegisterCallback(func(target *syncTypes.Target, _ error) {
if target.Head.Key().Equals(cis[4].Head.Key()) {
if target.Head.Key().Equals(cis[4].FullTipSet.TipSet().Key()) {
waitCh <- struct{}{}
}
})
Expand All @@ -93,10 +101,10 @@ func TestQueueHappy(t *testing.T) {
testQ := syncTypes.NewTargetTracker(20)

// Add syncRequests out of order
sR0 := &syncTypes.Target{ChainInfo: *(chainInfoWithHeightAndWeight(t, 0, 1001))}
sR1 := &syncTypes.Target{ChainInfo: *(chainInfoWithHeightAndWeight(t, 1, 1001))}
sR2 := &syncTypes.Target{ChainInfo: *(chainInfoWithHeightAndWeight(t, 2, 1001))}
sR47 := &syncTypes.Target{ChainInfo: *(chainInfoWithHeightAndWeight(t, 47, 1001))}
sR0 := &syncTypes.Target{Head: chainInfoWithHeightAndWeight(t, 0, 1001).FullTipSet.TipSet()}
sR1 := &syncTypes.Target{Head: chainInfoWithHeightAndWeight(t, 1, 1001).FullTipSet.TipSet()}
sR2 := &syncTypes.Target{Head: chainInfoWithHeightAndWeight(t, 2, 1001).FullTipSet.TipSet()}
sR47 := &syncTypes.Target{Head: chainInfoWithHeightAndWeight(t, 47, 1001).FullTipSet.TipSet()}

testQ.Add(sR2)
testQ.Add(sR47)
Expand All @@ -108,7 +116,7 @@ func TestQueueHappy(t *testing.T) {
// Pop in order
out0 := requirePop(t, testQ)

weight := out0.ChainInfo.Head.ParentWeight()
weight := out0.Head.ParentWeight()
assert.Equal(t, int64(1001), weight.Int.Int64())
}

Expand All @@ -117,8 +125,8 @@ func TestQueueDuplicates(t *testing.T) {
testQ := syncTypes.NewTargetTracker(20)

// Add syncRequests with same height
sR0 := &syncTypes.Target{ChainInfo: *(chainInfoWithHeightAndWeight(t, 0, 1001))}
sR0dup := &syncTypes.Target{ChainInfo: *(chainInfoWithHeightAndWeight(t, 0, 1001))}
sR0 := &syncTypes.Target{Head: chainInfoWithHeightAndWeight(t, 0, 1001).FullTipSet.TipSet()}
sR0dup := &syncTypes.Target{Head: chainInfoWithHeightAndWeight(t, 0, 1001).FullTipSet.TipSet()}

testQ.Add(sR0)
testQ.Add(sR0dup)
Expand All @@ -128,15 +136,15 @@ func TestQueueDuplicates(t *testing.T) {

// Pop
first := requirePop(t, testQ)
assert.Equal(t, abi.ChainEpoch(0), first.ChainInfo.Head.Height())
assert.Equal(t, abi.ChainEpoch(0), first.Head.Height())
testQ.Remove(first)
}

func TestQueueEmptyPopErrors(t *testing.T) {
tf.UnitTest(t)
testQ := syncTypes.NewTargetTracker(20)
sR0 := &syncTypes.Target{ChainInfo: *(chainInfoWithHeightAndWeight(t, 0, 1002))}
sR47 := &syncTypes.Target{ChainInfo: *(chainInfoWithHeightAndWeight(t, 47, 1001))}
sR0 := &syncTypes.Target{Head: chainInfoWithHeightAndWeight(t, 0, 1002).FullTipSet.TipSet()}
sR47 := &syncTypes.Target{Head: chainInfoWithHeightAndWeight(t, 47, 1001).FullTipSet.TipSet()}

// Add 2
testQ.Add(sR47)
Expand Down Expand Up @@ -187,8 +195,9 @@ func chainInfoWithHeightAndWeight(t *testing.T, h int, weight int64) *types.Chai
Data: []byte{0x4},
},
}
b, _ := types.NewTipSet([]*types.BlockHeader{blk})
return &types.ChainInfo{
Head: b,
FullTipSet: &types.FullTipSet{
Blocks: []*types.FullBlock{{Header: blk}},
},
}
}
24 changes: 12 additions & 12 deletions pkg/chainsync/syncer/syncer_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,20 +53,20 @@ func TestLoadFork(t *testing.T) {
right := builder.AppendManyOn(ctx, 3, base)

leftTarget := &types.Target{
Base: nil,
Current: nil,
Start: time.Time{},
End: time.Time{},
Err: nil,
ChainInfo: *types2.NewChainInfo("", "", left),
Base: nil,
Current: nil,
Start: time.Time{},
End: time.Time{},
Err: nil,
Head: left,
}
rightTarget := &types.Target{
Base: nil,
Current: nil,
Start: time.Time{},
End: time.Time{},
Err: nil,
ChainInfo: *types2.NewChainInfo("", "", right),
Base: nil,
Current: nil,
Start: time.Time{},
End: time.Time{},
Err: nil,
Head: right,
}
// Sync the two branches, which stores all blocks in the underlying stores.
// stm: @CHAINSYNC_SYNCER_HANDLE_NEW_TIP_SET_001, @CHAINSYNC_SYNCER_SET_HEAD_001
Expand Down
Loading

0 comments on commit 6fd93cc

Please sign in to comment.