Skip to content

Commit

Permalink
Merge pull request #6435 from filecoin-project/fix/sync/chck-expansion
Browse files Browse the repository at this point in the history
fix(sync): do not allow to expand checkpointed tipsets
  • Loading branch information
simlecode authored Dec 11, 2024
2 parents 09eda5b + e542184 commit 32a4727
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 56 deletions.
5 changes: 1 addition & 4 deletions app/submodule/chain/chain_submodule.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/filecoin-project/venus/pkg/vmsupport"
v0api "github.com/filecoin-project/venus/venus-shared/api/chain/v0"
v1api "github.com/filecoin-project/venus/venus-shared/api/chain/v1"
"github.com/filecoin-project/venus/venus-shared/types"
)

// ChainSubmodule enhances the `Node` with chain capabilities.
Expand All @@ -33,8 +32,7 @@ type ChainSubmodule struct { //nolint
SystemCall vm.SyscallsImpl
CirculatingSupplyCalculator *chain.CirculatingSupplyCalculator

CheckPoint types.TipSetKey
Drand beacon.Schedule
Drand beacon.Schedule

config chainConfig

Expand Down Expand Up @@ -92,7 +90,6 @@ func NewChainSubmodule(ctx context.Context,
Drand: drand,
config: config,
Waiter: waiter,
CheckPoint: chainStore.GetCheckPoint(),
}
err = store.ChainReader.Load(context.TODO())
if err != nil {
Expand Down
30 changes: 20 additions & 10 deletions pkg/chain/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ type Store struct {
// head is the tipset at the head of the best known chain.
head *types.TipSet

checkPoint types.TipSetKey
checkPoint *types.TipSet
// Protects head and genesisCid.
mu sync.RWMutex

Expand Down Expand Up @@ -143,7 +143,6 @@ func NewStore(chainDs repo.Datastore,
bsstore: bsstore,
headEvents: pubsub.New(64),

checkPoint: types.EmptyTSK,
genesis: genesisCid,
reorgNotifeeCh: make(chan ReorgNotifee),
tsCache: tsCache,
Expand All @@ -156,11 +155,22 @@ func NewStore(chainDs repo.Datastore,

val, err := store.ds.Get(context.TODO(), CheckPoint)
if err != nil {
store.checkPoint = types.NewTipSetKey(genesisCid)
store.checkPoint, err = store.GetTipSet(context.TODO(), types.NewTipSetKey(genesisCid))
if err != nil {
panic(fmt.Errorf("cannot get genesis tipset: %w", err))
}
} else {
_ = store.checkPoint.UnmarshalCBOR(bytes.NewReader(val)) //nolint:staticcheck
var checkPointTSK types.TipSetKey
err := checkPointTSK.UnmarshalCBOR(bytes.NewReader(val))
if err != nil {
panic(fmt.Errorf("cannot unmarshal checkpoint %s: %w", string(val), err))
}
store.checkPoint, err = store.GetTipSet(context.TODO(), checkPointTSK)
if err != nil {
panic(fmt.Errorf("cannot get checkpoint tipset: %w", err))
}
}
log.Infof("check point value: %v", store.checkPoint)
log.Infof("load check point height: %d, key: %v", store.checkPoint.Height(), store.checkPoint.Key())

store.reorgCh = store.reorgWorker(context.TODO())
return store
Expand Down Expand Up @@ -1112,8 +1122,8 @@ func (store *Store) SetCheckpoint(ctx context.Context, ts *types.TipSet) error {
return err
}

store.mu.RLock()
defer store.mu.RUnlock()
store.mu.Lock()
defer store.mu.Unlock()

finality := store.head.Height() - policy.ChainFinality
targetChain, currentChain := ts, store.head
Expand Down Expand Up @@ -1167,7 +1177,7 @@ func (store *Store) SetCheckpoint(ctx context.Context, ts *types.TipSet) error {
if err := store.ds.Put(ctx, CheckPoint, buf.Bytes()); err != nil {
return fmt.Errorf("checkpoint failed: failed to record checkpoint in the datastore: %w", err)
}
store.checkPoint = ts.Key()
store.checkPoint = ts

return nil
}
Expand All @@ -1187,7 +1197,7 @@ func (store *Store) IsAncestorOf(ctx context.Context, a, b *types.TipSet) (bool,
}

// GetCheckPoint get the check point from store or disk.
func (store *Store) GetCheckPoint() types.TipSetKey {
func (store *Store) GetCheckPoint() *types.TipSet {
store.mu.RLock()
defer store.mu.RUnlock()

Expand Down Expand Up @@ -1722,7 +1732,7 @@ func (store *Store) exceedsForkLength(ctx context.Context, synced, external *typ
}

// Now check to see if we've walked back to the checkpoint.
if synced.Key().Equals(store.checkPoint) {
if synced.Key().Equals(store.checkPoint.Key()) {
return true, nil
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/chain/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ func (cbor *CborBlockStore) PutBlocks(ctx context.Context, blocks []*types.Block
func newChainStore(r repo.Repo, genTS *types.TipSet) *CborBlockStore {
tempBlock := r.Datastore()
cborStore := cbor.NewCborStore(tempBlock)
blkBytes, _ := genTS.Blocks()[0].ToStorageBlock()
_ = tempBlock.Put(context.Background(), blkBytes)
return &CborBlockStore{
Store: chain.NewStore(r.ChainDatastore(), tempBlock, genTS.At(0).Cid(), chainselector.Weight),
cborStore: cborStore,
Expand Down
110 changes: 69 additions & 41 deletions pkg/chainsync/syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ var (
ErrNewChainTooLong = errors.New("input chain forked from best chain past finality limit")
// ErrUnexpectedStoreState indicates that the syncer's chain bsstore is violating expected invariants.
ErrUnexpectedStoreState = errors.New("the chain bsstore is in an unexpected state")
ErrForkCheckpoint = fmt.Errorf("fork would require us to diverge from checkpointed block")

logSyncer = logging.Logger("chainsync.syncer")
)
Expand Down Expand Up @@ -137,8 +138,7 @@ type Syncer struct {

clock clock.Clock

bsstore blockstoreutil.Blockstore
checkPoint types.TipSetKey
bsstore blockstoreutil.Blockstore

fork fork.IFork

Expand Down Expand Up @@ -199,45 +199,41 @@ func (syncer *Syncer) syncOne(ctx context.Context, parent, next *types.TipSet) e
stopwatch := syncOneTimer.Start()
defer stopwatch(ctx)

var err error

if !parent.Key().Equals(syncer.checkPoint) {
var wg errgroup.Group
for i := 0; i < next.Len(); i++ {
blk := next.At(i)
wg.Go(func() error {
// Fetch the URL.
err := syncer.blockValidator.ValidateFullBlock(ctx, blk)
if err == nil {
if err := syncer.chainStore.AddToTipSetTracker(ctx, blk); err != nil {
return fmt.Errorf("failed to add validated header to tipset tracker: %w", err)
}
var wg errgroup.Group
for i := 0; i < next.Len(); i++ {
blk := next.At(i)
wg.Go(func() error {
// Fetch the URL.
err := syncer.blockValidator.ValidateFullBlock(ctx, blk)
if err == nil {
if err := syncer.chainStore.AddToTipSetTracker(ctx, blk); err != nil {
return fmt.Errorf("failed to add validated header to tipset tracker: %w", err)
}
return err
})
}
err = wg.Wait()
if err != nil {
var rootNotMatch bool // nolint

if merr, isok := err.(*multierror.Error); isok {
for _, e := range merr.Errors {
if isRootNotMatch(e) {
rootNotMatch = true
break
}
}
} else {
rootNotMatch = isRootNotMatch(err) // nolint
}
return err
})
}
err := wg.Wait()
if err != nil {
var rootNotMatch bool // nolint

if rootNotMatch { // nolint
// todo: should here rollback, and re-compute?
_ = syncer.stmgr.Rollback(ctx, parent, next)
if merr, isok := err.(*multierror.Error); isok {
for _, e := range merr.Errors {
if isRootNotMatch(e) {
rootNotMatch = true
break
}
}
} else {
rootNotMatch = isRootNotMatch(err) // nolint
}

return fmt.Errorf("validate mining failed %w", err)
if rootNotMatch { // nolint
// todo: should here rollback, and re-compute?
_ = syncer.stmgr.Rollback(ctx, parent, next)
}

return fmt.Errorf("validate mining failed %w", err)
}

syncer.chainStore.PersistTipSetKey(ctx, next.Key())
Expand Down Expand Up @@ -297,8 +293,25 @@ func (syncer *Syncer) HandleNewTipSet(ctx context.Context, target *syncTypes.Tar
return errors.New("do not sync to a target has synced before")
}

if target.Head.Height() == head.Height() {
// check if maybeHead is fully contained in headTipSet
// meaning we already synced all the blocks that are a part of maybeHead
// if that is the case, there is nothing for us to do
// we need to exit out early, otherwise checkpoint-fork logic might wrongly reject it
fullyContained := true
for _, c := range target.Head.Cids() {
if !head.Contains(c) {
fullyContained = false
break
}
}
if fullyContained {
return nil
}
}

syncer.exchangeClient.AddPeer(target.Sender)
tipsets, err := syncer.fetchChainBlocks(ctx, head, target.Head)
tipsets, err := syncer.fetchChainBlocks(ctx, head, target.Head, false)
if err != nil {
return errors.Wrapf(err, "failure fetching or validating headers")
}
Expand Down Expand Up @@ -346,7 +359,7 @@ func (syncer *Syncer) syncSegement(ctx context.Context, target *syncTypes.Target
errProcessChan <- processErr
return
}
if !parent.Key().Equals(syncer.checkPoint) {
if !parent.Key().Equals(syncer.chainStore.GetCheckPoint().Key()) {
logSyncer.Debugf("set chain head, height:%d, blocks:%d", parent.Height(), parent.Len())
if err := syncer.chainStore.RefreshHeaviestTipSet(ctx, parent.Height()); err != nil {
errProcessChan <- err
Expand Down Expand Up @@ -374,7 +387,7 @@ func (syncer *Syncer) syncSegement(ctx context.Context, target *syncTypes.Target
// if local db not exist, get block from network(libp2p),
// if there is a fork, get the common root tipset of knowntip and targettip, and return the block data from root tipset to targettip
// local(···->A->B) + incoming(C->D->E) => ···->A->B->C->D->E
func (syncer *Syncer) fetchChainBlocks(ctx context.Context, knownTip *types.TipSet, targetTip *types.TipSet) ([]*types.TipSet, error) {
func (syncer *Syncer) fetchChainBlocks(ctx context.Context, knownTip *types.TipSet, targetTip *types.TipSet, ignoreCheckpoint bool) ([]*types.TipSet, error) {
chainTipsets := []*types.TipSet{targetTip}
flushDB := func(saveTips []*types.TipSet) error {
bs := blockstoreutil.NewTemporary()
Expand Down Expand Up @@ -448,6 +461,13 @@ loop:
if err != nil {
return nil, fmt.Errorf("failed to load next local tipset: %w", err)
}

if !ignoreCheckpoint {
if chkpt := syncer.chainStore.GetCheckPoint(); chkpt != nil && base.Height() <= chkpt.Height() {
return nil, fmt.Errorf("merge point affecting the checkpoing: %w", ErrForkCheckpoint)
}
}

if base.IsChildOf(knownParent) {
// common case: receiving a block thats potentially part of the same tipset as our best block
chain.Reverse(chainTipsets)
Expand All @@ -456,7 +476,7 @@ loop:

logSyncer.Warnf("(fork detected) synced header chain, base: %v(%d), knownTip: %v(%d)", base.Key(), base.Height(),
knownTip.Key(), knownTip.Height())
fork, err := syncer.syncFork(ctx, base, knownTip)
fork, err := syncer.syncFork(ctx, base, knownTip, ignoreCheckpoint)
if err != nil {
if errors.Is(err, ErrForkTooLong) {
// TODO: we're marking this block bad in the same way that we mark invalid blocks bad. Maybe distinguish?
Expand Down Expand Up @@ -486,7 +506,15 @@ loop:
// D->E-F(targetTip)
// A => D->E>F
// B-C(knownTip)
func (syncer *Syncer) syncFork(ctx context.Context, incoming *types.TipSet, known *types.TipSet) ([]*types.TipSet, error) {
func (syncer *Syncer) syncFork(ctx context.Context, incoming *types.TipSet, known *types.TipSet, ignoreCheckpoint bool) ([]*types.TipSet, error) {
var chkpt *types.TipSet
if !ignoreCheckpoint {
chkpt = syncer.chainStore.GetCheckPoint()
if known.Equals(chkpt) {
return nil, ErrForkCheckpoint
}
}

incomingParentsTsk := incoming.Parents()
commonParent := false
for _, incomingParent := range incomingParentsTsk.Cids() {
Expand Down Expand Up @@ -701,7 +729,7 @@ func (syncer *Syncer) SyncCheckpoint(ctx context.Context, tsk types.TipSetKey) e
if anc, err := syncer.chainStore.IsAncestorOf(ctx, ts, head); err != nil {
return fmt.Errorf("failed to walk the chain when checkpointing: %w", err)
} else if !anc {
tipsets, err := syncer.fetchChainBlocks(ctx, head, target.Head)
tipsets, err := syncer.fetchChainBlocks(ctx, head, target.Head, true)
if err != nil {
return errors.Wrapf(err, "failure fetching or validating headers")
}
Expand Down

0 comments on commit 32a4727

Please sign in to comment.