Skip to content

Commit

Permalink
fix headChangeNotifee for revert
Browse files Browse the repository at this point in the history
  • Loading branch information
ta0li committed Oct 21, 2021
1 parent 2ba17df commit 655fc8c
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 25 deletions.
13 changes: 13 additions & 0 deletions app/submodule/chain/chaininfo_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,19 @@ func (cia *chainInfoAPI) ChainExport(ctx context.Context, nroots abi.ChainEpoch,
return out, nil
}

// ChainGetPath returns a set of revert/apply operations needed to get from
// one tipset to another, for example:
//```
// to
// ^
// from tAA
// ^ ^
// tBA tAB
// ^---*--^
// ^
// tRR
//```
// Would return `[revert(tBA), apply(tAB), apply(tAA)]`
func (cia *chainInfoAPI) ChainGetPath(ctx context.Context, from types.TipSetKey, to types.TipSetKey) ([]*chain.HeadChange, error) {
fts, err := cia.chain.ChainReader.GetTipSet(from)
if err != nil {
Expand Down
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ require (
github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d
github.com/awnumar/memguard v0.22.2
github.com/bluele/gcache v0.0.0-20190518031135-bc40bd653833
github.com/cskr/pubsub v1.0.2
github.com/detailyang/go-fallocate v0.0.0-20180908115635-432fa640bd2e
github.com/dgraph-io/badger/v2 v2.2007.2
github.com/docker/distribution v2.7.1+incompatible // indirect
Expand Down
61 changes: 37 additions & 24 deletions pkg/chain/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/filecoin-project/go-state-types/network"
"github.com/filecoin-project/venus/pkg/state"

"github.com/cskr/pubsub"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big"
Expand All @@ -26,6 +25,7 @@ import (
carutil "github.com/ipld/go-car/util"
"github.com/pkg/errors"
cbg "github.com/whyrusleeping/cbor-gen"
"github.com/whyrusleeping/pubsub"
"go.opencensus.io/trace"
"golang.org/x/xerrors"

Expand Down Expand Up @@ -334,25 +334,29 @@ func (store *Store) GetTipSet(key types.TipSetKey) (*types.TipSet, error) {
if key.IsEmpty() {
return store.GetHead(), nil
}
var blks []*types.BlockHeader

val, has := store.tsCache.Get(key)
if has {
return val.(*types.TipSet), nil
}

for _, id := range key.Cids() {
blk, err := store.GetBlock(context.TODO(), id)

cids := key.Cids()
blks := make([]*types.BlockHeader, len(cids))
for idx, c := range cids {
blk, err := store.GetBlock(context.TODO(), c)
if err != nil {
return nil, err
}
blks = append(blks, blk)

blks[idx] = blk
}

ts, err := types.NewTipSet(blks...)
if err != nil {
return nil, err
}
store.tsCache.Add(key, ts)

return ts, nil
}

Expand Down Expand Up @@ -548,7 +552,7 @@ func (store *Store) SetHead(ctx context.Context, newTS *types.TipSet) error {

//todo wrap by go function
Reverse(added)
Reverse(dropped)
//Reverse(dropped)

//do reorg
store.reorgCh <- reorg{
Expand All @@ -561,19 +565,20 @@ func (store *Store) SetHead(ctx context.Context, newTS *types.TipSet) error {
func (store *Store) reorgWorker(ctx context.Context) chan reorg {
headChangeNotifee := func(rev, app []*types.TipSet) error {
notif := make([]*HeadChange, len(rev)+len(app))
for i, apply := range rev {
for i, revert := range rev {
notif[i] = &HeadChange{
Type: HCRevert,
Val: apply,
Val: revert,
}
}

for i, revert := range app {
for i, apply := range app {
notif[i+len(rev)] = &HeadChange{
Type: HCApply,
Val: revert,
Val: apply,
}
}

// Publish an event that we have a new head.
store.headEvents.Pub(notif, HeadChangeTopic)
return nil
Expand Down Expand Up @@ -632,16 +637,17 @@ func (store *Store) reorgWorker(ctx context.Context) chan reorg {
// First message is guaranteed to be of len == 1, and type == 'current'.
// Then event in the message may be HCApply and HCRevert.
func (store *Store) SubHeadChanges(ctx context.Context) chan []*HeadChange {
out := make(chan []*HeadChange, 16)
store.mu.RLock()
subCh := store.headEvents.Sub(HeadChangeTopic)
head := store.head
store.mu.RUnlock()

out := make(chan []*HeadChange, 16)
out <- []*HeadChange{{
Type: HCCurrent,
Val: head,
}}

subCh := store.headEvents.Sub(HeadChangeTopic)
go func() {
defer close(out)
var unsubOnce sync.Once
Expand All @@ -653,12 +659,15 @@ func (store *Store) SubHeadChanges(ctx context.Context) chan []*HeadChange {
log.Warn("chain head sub exit loop")
return
}
if len(out) > 5 {
log.Warnf("head change sub is slow, has %d buffered entries", len(out))
}

select {
case out <- val.([]*HeadChange):
case <-ctx.Done():
default:
log.Errorf("closing head change subscription due to slow reader")
return
}
if len(out) > 5 {
log.Warnf("head change sub is slow, has %d buffered entries", len(out))
}
case <-ctx.Done():
unsubOnce.Do(func() {
Expand Down Expand Up @@ -1115,28 +1124,32 @@ func (store *Store) ReorgOps(a, b *types.TipSet) ([]*types.TipSet, []*types.TipS
return ReorgOps(store.GetTipSet, a, b)
}

// ReorgOps takes two tipsets (which can be at different heights), and walks
// their corresponding chains backwards one step at a time until we find
// a common ancestor. It then returns the respective chain segments that fork
// from the identified ancestor, in reverse order, where the first element of
// each slice is the supplied tipset, and the last element is the common
// ancestor.
//
// If an error happens along the way, we return the error with nil slices.
// todo should move this code into store.ReorgOps. anywhere use this function should invoke store.ReorgOps
func ReorgOps(lts func(types.TipSetKey) (*types.TipSet, error), a, b *types.TipSet) ([]*types.TipSet, []*types.TipSet, error) {
left := a
right := b

var leftChain, rightChain []*types.TipSet
for !left.Equals(right) {
lh := left.Height()
rh := right.Height()
if lh > rh {
if left.Height() > right.Height() {
leftChain = append(leftChain, left)
lKey := left.Parents()
par, err := lts(lKey)
par, err := lts(left.Parents())
if err != nil {
return nil, nil, err
}

left = par
} else {
rightChain = append(rightChain, right)
rKey := right.Parents()
par, err := lts(rKey)
par, err := lts(right.Parents())
if err != nil {
log.Infof("failed to fetch right.Parents: %s", err)
return nil, nil, err
Expand Down

0 comments on commit 655fc8c

Please sign in to comment.