diff --git a/app/submodule/chain/chain_submodule.go b/app/submodule/chain/chain_submodule.go index d28eb1cc5f..7d454ceb48 100644 --- a/app/submodule/chain/chain_submodule.go +++ b/app/submodule/chain/chain_submodule.go @@ -21,7 +21,6 @@ import ( "github.com/filecoin-project/venus/pkg/vmsupport" v0api "github.com/filecoin-project/venus/venus-shared/api/chain/v0" v1api "github.com/filecoin-project/venus/venus-shared/api/chain/v1" - "github.com/filecoin-project/venus/venus-shared/types" ) // ChainSubmodule enhances the `Node` with chain capabilities. @@ -33,8 +32,7 @@ type ChainSubmodule struct { //nolint SystemCall vm.SyscallsImpl CirculatingSupplyCalculator *chain.CirculatingSupplyCalculator - CheckPoint types.TipSetKey - Drand beacon.Schedule + Drand beacon.Schedule config chainConfig @@ -92,7 +90,6 @@ func NewChainSubmodule(ctx context.Context, Drand: drand, config: config, Waiter: waiter, - CheckPoint: chainStore.GetCheckPoint(), } err = store.ChainReader.Load(context.TODO()) if err != nil { diff --git a/extern/filecoin-ffi b/extern/filecoin-ffi index 62131ac060..72e100df9d 160000 --- a/extern/filecoin-ffi +++ b/extern/filecoin-ffi @@ -1 +1 @@ -Subproject commit 62131ac060d4d0cb8f963c1de5b291fba704079c +Subproject commit 72e100df9db736aeda690672ab120500ddbd5e0d diff --git a/pkg/chain/store.go b/pkg/chain/store.go index 55620b1f4d..4f34553c9e 100644 --- a/pkg/chain/store.go +++ b/pkg/chain/store.go @@ -100,7 +100,7 @@ type Store struct { // head is the tipset at the head of the best known chain. head *types.TipSet - checkPoint types.TipSetKey + checkPoint *types.TipSet // Protects head and genesisCid. mu sync.RWMutex @@ -143,7 +143,6 @@ func NewStore(chainDs repo.Datastore, bsstore: bsstore, headEvents: pubsub.New(64), - checkPoint: types.EmptyTSK, genesis: genesisCid, reorgNotifeeCh: make(chan ReorgNotifee), tsCache: tsCache, @@ -156,11 +155,22 @@ func NewStore(chainDs repo.Datastore, val, err := store.ds.Get(context.TODO(), CheckPoint) if err != nil { - store.checkPoint = types.NewTipSetKey(genesisCid) + store.checkPoint, err = store.GetTipSet(context.TODO(), types.NewTipSetKey(genesisCid)) + if err != nil { + panic(fmt.Errorf("cannot get genesis tipset: %w", err)) + } } else { - _ = store.checkPoint.UnmarshalCBOR(bytes.NewReader(val)) //nolint:staticcheck + var checkPointTSK types.TipSetKey + err := checkPointTSK.UnmarshalCBOR(bytes.NewReader(val)) + if err != nil { + panic(fmt.Errorf("cannot unmarshal checkpoint %s: %w", string(val), err)) + } + store.checkPoint, err = store.GetTipSet(context.TODO(), checkPointTSK) + if err != nil { + panic(fmt.Errorf("cannot get checkpoint tipset: %w", err)) + } } - log.Infof("check point value: %v", store.checkPoint) + log.Infof("load check point height: %d, key: %v", store.checkPoint.Height(), store.checkPoint.Key()) store.reorgCh = store.reorgWorker(context.TODO()) return store @@ -1112,8 +1122,8 @@ func (store *Store) SetCheckpoint(ctx context.Context, ts *types.TipSet) error { return err } - store.mu.RLock() - defer store.mu.RUnlock() + store.mu.Lock() + defer store.mu.Unlock() finality := store.head.Height() - policy.ChainFinality targetChain, currentChain := ts, store.head @@ -1167,7 +1177,7 @@ func (store *Store) SetCheckpoint(ctx context.Context, ts *types.TipSet) error { if err := store.ds.Put(ctx, CheckPoint, buf.Bytes()); err != nil { return fmt.Errorf("checkpoint failed: failed to record checkpoint in the datastore: %w", err) } - store.checkPoint = ts.Key() + store.checkPoint = ts return nil } @@ -1187,7 +1197,7 @@ func (store *Store) IsAncestorOf(ctx context.Context, a, b *types.TipSet) (bool, } // GetCheckPoint get the check point from store or disk. -func (store *Store) GetCheckPoint() types.TipSetKey { +func (store *Store) GetCheckPoint() *types.TipSet { store.mu.RLock() defer store.mu.RUnlock() @@ -1722,7 +1732,7 @@ func (store *Store) exceedsForkLength(ctx context.Context, synced, external *typ } // Now check to see if we've walked back to the checkpoint. - if synced.Key().Equals(store.checkPoint) { + if synced.Key().Equals(store.checkPoint.Key()) { return true, nil } diff --git a/pkg/chain/store_test.go b/pkg/chain/store_test.go index 15fab6420e..c82e63cf4c 100644 --- a/pkg/chain/store_test.go +++ b/pkg/chain/store_test.go @@ -43,6 +43,8 @@ func (cbor *CborBlockStore) PutBlocks(ctx context.Context, blocks []*types.Block func newChainStore(r repo.Repo, genTS *types.TipSet) *CborBlockStore { tempBlock := r.Datastore() cborStore := cbor.NewCborStore(tempBlock) + blkBytes, _ := genTS.Blocks()[0].ToStorageBlock() + _ = tempBlock.Put(context.Background(), blkBytes) return &CborBlockStore{ Store: chain.NewStore(r.ChainDatastore(), tempBlock, genTS.At(0).Cid(), chainselector.Weight), cborStore: cborStore, diff --git a/pkg/chainsync/syncer/syncer.go b/pkg/chainsync/syncer/syncer.go index ced8977138..3c1b756561 100644 --- a/pkg/chainsync/syncer/syncer.go +++ b/pkg/chainsync/syncer/syncer.go @@ -60,6 +60,7 @@ var ( ErrNewChainTooLong = errors.New("input chain forked from best chain past finality limit") // ErrUnexpectedStoreState indicates that the syncer's chain bsstore is violating expected invariants. ErrUnexpectedStoreState = errors.New("the chain bsstore is in an unexpected state") + ErrForkCheckpoint = fmt.Errorf("fork would require us to diverge from checkpointed block") logSyncer = logging.Logger("chainsync.syncer") ) @@ -137,8 +138,7 @@ type Syncer struct { clock clock.Clock - bsstore blockstoreutil.Blockstore - checkPoint types.TipSetKey + bsstore blockstoreutil.Blockstore fork fork.IFork @@ -199,45 +199,41 @@ func (syncer *Syncer) syncOne(ctx context.Context, parent, next *types.TipSet) e stopwatch := syncOneTimer.Start() defer stopwatch(ctx) - var err error - - if !parent.Key().Equals(syncer.checkPoint) { - var wg errgroup.Group - for i := 0; i < next.Len(); i++ { - blk := next.At(i) - wg.Go(func() error { - // Fetch the URL. - err := syncer.blockValidator.ValidateFullBlock(ctx, blk) - if err == nil { - if err := syncer.chainStore.AddToTipSetTracker(ctx, blk); err != nil { - return fmt.Errorf("failed to add validated header to tipset tracker: %w", err) - } + var wg errgroup.Group + for i := 0; i < next.Len(); i++ { + blk := next.At(i) + wg.Go(func() error { + // Fetch the URL. + err := syncer.blockValidator.ValidateFullBlock(ctx, blk) + if err == nil { + if err := syncer.chainStore.AddToTipSetTracker(ctx, blk); err != nil { + return fmt.Errorf("failed to add validated header to tipset tracker: %w", err) } - return err - }) - } - err = wg.Wait() - if err != nil { - var rootNotMatch bool // nolint - - if merr, isok := err.(*multierror.Error); isok { - for _, e := range merr.Errors { - if isRootNotMatch(e) { - rootNotMatch = true - break - } - } - } else { - rootNotMatch = isRootNotMatch(err) // nolint } + return err + }) + } + err := wg.Wait() + if err != nil { + var rootNotMatch bool // nolint - if rootNotMatch { // nolint - // todo: should here rollback, and re-compute? - _ = syncer.stmgr.Rollback(ctx, parent, next) + if merr, isok := err.(*multierror.Error); isok { + for _, e := range merr.Errors { + if isRootNotMatch(e) { + rootNotMatch = true + break + } } + } else { + rootNotMatch = isRootNotMatch(err) // nolint + } - return fmt.Errorf("validate mining failed %w", err) + if rootNotMatch { // nolint + // todo: should here rollback, and re-compute? + _ = syncer.stmgr.Rollback(ctx, parent, next) } + + return fmt.Errorf("validate mining failed %w", err) } syncer.chainStore.PersistTipSetKey(ctx, next.Key()) @@ -297,8 +293,25 @@ func (syncer *Syncer) HandleNewTipSet(ctx context.Context, target *syncTypes.Tar return errors.New("do not sync to a target has synced before") } + if target.Head.Height() == head.Height() { + // check if maybeHead is fully contained in headTipSet + // meaning we already synced all the blocks that are a part of maybeHead + // if that is the case, there is nothing for us to do + // we need to exit out early, otherwise checkpoint-fork logic might wrongly reject it + fullyContained := true + for _, c := range target.Head.Cids() { + if !head.Contains(c) { + fullyContained = false + break + } + } + if fullyContained { + return nil + } + } + syncer.exchangeClient.AddPeer(target.Sender) - tipsets, err := syncer.fetchChainBlocks(ctx, head, target.Head) + tipsets, err := syncer.fetchChainBlocks(ctx, head, target.Head, false) if err != nil { return errors.Wrapf(err, "failure fetching or validating headers") } @@ -346,7 +359,7 @@ func (syncer *Syncer) syncSegement(ctx context.Context, target *syncTypes.Target errProcessChan <- processErr return } - if !parent.Key().Equals(syncer.checkPoint) { + if !parent.Key().Equals(syncer.chainStore.GetCheckPoint().Key()) { logSyncer.Debugf("set chain head, height:%d, blocks:%d", parent.Height(), parent.Len()) if err := syncer.chainStore.RefreshHeaviestTipSet(ctx, parent.Height()); err != nil { errProcessChan <- err @@ -374,7 +387,7 @@ func (syncer *Syncer) syncSegement(ctx context.Context, target *syncTypes.Target // if local db not exist, get block from network(libp2p), // if there is a fork, get the common root tipset of knowntip and targettip, and return the block data from root tipset to targettip // local(···->A->B) + incoming(C->D->E) => ···->A->B->C->D->E -func (syncer *Syncer) fetchChainBlocks(ctx context.Context, knownTip *types.TipSet, targetTip *types.TipSet) ([]*types.TipSet, error) { +func (syncer *Syncer) fetchChainBlocks(ctx context.Context, knownTip *types.TipSet, targetTip *types.TipSet, ignoreCheckpoint bool) ([]*types.TipSet, error) { chainTipsets := []*types.TipSet{targetTip} flushDB := func(saveTips []*types.TipSet) error { bs := blockstoreutil.NewTemporary() @@ -448,6 +461,13 @@ loop: if err != nil { return nil, fmt.Errorf("failed to load next local tipset: %w", err) } + + if !ignoreCheckpoint { + if chkpt := syncer.chainStore.GetCheckPoint(); chkpt != nil && base.Height() <= chkpt.Height() { + return nil, fmt.Errorf("merge point affecting the checkpoing: %w", ErrForkCheckpoint) + } + } + if base.IsChildOf(knownParent) { // common case: receiving a block thats potentially part of the same tipset as our best block chain.Reverse(chainTipsets) @@ -456,7 +476,7 @@ loop: logSyncer.Warnf("(fork detected) synced header chain, base: %v(%d), knownTip: %v(%d)", base.Key(), base.Height(), knownTip.Key(), knownTip.Height()) - fork, err := syncer.syncFork(ctx, base, knownTip) + fork, err := syncer.syncFork(ctx, base, knownTip, ignoreCheckpoint) if err != nil { if errors.Is(err, ErrForkTooLong) { // TODO: we're marking this block bad in the same way that we mark invalid blocks bad. Maybe distinguish? @@ -486,7 +506,15 @@ loop: // D->E-F(targetTip) // A => D->E>F // B-C(knownTip) -func (syncer *Syncer) syncFork(ctx context.Context, incoming *types.TipSet, known *types.TipSet) ([]*types.TipSet, error) { +func (syncer *Syncer) syncFork(ctx context.Context, incoming *types.TipSet, known *types.TipSet, ignoreCheckpoint bool) ([]*types.TipSet, error) { + var chkpt *types.TipSet + if !ignoreCheckpoint { + chkpt = syncer.chainStore.GetCheckPoint() + if known.Equals(chkpt) { + return nil, ErrForkCheckpoint + } + } + incomingParentsTsk := incoming.Parents() commonParent := false for _, incomingParent := range incomingParentsTsk.Cids() { @@ -701,7 +729,7 @@ func (syncer *Syncer) SyncCheckpoint(ctx context.Context, tsk types.TipSetKey) e if anc, err := syncer.chainStore.IsAncestorOf(ctx, ts, head); err != nil { return fmt.Errorf("failed to walk the chain when checkpointing: %w", err) } else if !anc { - tipsets, err := syncer.fetchChainBlocks(ctx, head, target.Head) + tipsets, err := syncer.fetchChainBlocks(ctx, head, target.Head, true) if err != nil { return errors.Wrapf(err, "failure fetching or validating headers") }