Skip to content

Commit

Permalink
Merge pull request #5824 from filecoin-project/opt/lt/fast_chain_index
Browse files Browse the repository at this point in the history
feat: make chain tipset fetching 1000x faster
  • Loading branch information
simlecode authored Mar 16, 2023
2 parents 73746ce + 02acbd9 commit 511f1bd
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 28 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ on:
branches:
- '**'

env:
CHAIN_INDEX_CACHE: 2048

jobs:

test:
Expand Down
72 changes: 46 additions & 26 deletions pkg/chain/chain_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,31 @@ package chain
import (
"context"
"fmt"
"os"
"strconv"
"sync"

"github.com/filecoin-project/go-state-types/abi"

"github.com/filecoin-project/venus/venus-shared/types"
lru "github.com/hashicorp/golang-lru"
)

var DefaultChainIndexCacheSize = 32 << 10
var DefaultChainIndexCacheSize = 32 << 15

func init() {
if s := os.Getenv("CHAIN_INDEX_CACHE"); s != "" {
lcic, err := strconv.Atoi(s)
if err != nil {
log.Errorf("failed to parse 'CHAIN_INDEX_CACHE' env var: %s", err)
}
DefaultChainIndexCacheSize = lcic
}
}

// ChainIndex tipset height index, used to getting tipset by height quickly
type ChainIndex struct { //nolint
skipCache *lru.ARCCache
indexCacheLk sync.Mutex
indexCache map[types.TipSetKey]*lbEntry

loadTipSet loadTipSetFunc

Expand All @@ -22,17 +36,14 @@ type ChainIndex struct { //nolint

// NewChainIndex return a new chain index with arc cache
func NewChainIndex(lts loadTipSetFunc) *ChainIndex {
sc, _ := lru.NewARC(DefaultChainIndexCacheSize)
return &ChainIndex{
skipCache: sc,
indexCache: make(map[types.TipSetKey]*lbEntry, DefaultChainIndexCacheSize),
loadTipSet: lts,
skipLength: 20,
}
}

type lbEntry struct {
ts *types.TipSet
parentHeight abi.ChainEpoch
targetHeight abi.ChainEpoch
target types.TipSetKey
}
Expand All @@ -48,26 +59,36 @@ func (ci *ChainIndex) GetTipSetByHeight(ctx context.Context, from *types.TipSet,

rounded, err := ci.roundDown(ctx, from)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to round down: %w", err)
}

ci.indexCacheLk.Lock()
defer ci.indexCacheLk.Unlock()
cur := rounded.Key()
// cur := from.Key()
for {
cval, ok := ci.skipCache.Get(cur)
lbe, ok := ci.indexCache[cur]
if !ok {
fc, err := ci.fillCache(ctx, cur)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to fill cache: %w", err)
}
cval = fc
lbe = fc
}

lbe := cval.(*lbEntry)
if lbe.ts.Height() == to || lbe.parentHeight < to {
return lbe.ts, nil
} else if to > lbe.targetHeight {
return ci.walkBack(ctx, lbe.ts, to)
if to == lbe.targetHeight {
ts, err := ci.loadTipSet(ctx, lbe.target)
if err != nil {
return nil, fmt.Errorf("failed to load tipset: %w", err)
}

return ts, nil
}
if to > lbe.targetHeight {
ts, err := ci.loadTipSet(ctx, cur)
if err != nil {
return nil, fmt.Errorf("failed to load tipset: %w", err)
}
return ci.walkBack(ctx, ts, to)
}

cur = lbe.target
Expand All @@ -79,16 +100,17 @@ func (ci *ChainIndex) GetTipsetByHeightWithoutCache(ctx context.Context, from *t
return ci.walkBack(ctx, from, to)
}

// Caller must hold indexCacheLk
func (ci *ChainIndex) fillCache(ctx context.Context, tsk types.TipSetKey) (*lbEntry, error) {
ts, err := ci.loadTipSet(ctx, tsk)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to load tipset: %w", err)
}

if ts.Height() == 0 {
return &lbEntry{
ts: ts,
parentHeight: 0,
targetHeight: 0,
target: tsk,
}, nil
}

Expand All @@ -111,17 +133,15 @@ func (ci *ChainIndex) fillCache(ctx context.Context, tsk types.TipSetKey) (*lbEn
} else {
skipTarget, err = ci.walkBack(ctx, parent, rheight)
if err != nil {
return nil, fmt.Errorf("fillCache walkback: %s", err)
return nil, fmt.Errorf("fillCache walkback: %w", err)
}
}

lbe := &lbEntry{
ts: ts,
parentHeight: parent.Height(),
targetHeight: skipTarget.Height(),
target: skipTarget.Key(),
}
ci.skipCache.Add(tsk, lbe)
ci.indexCache[tsk] = lbe

return lbe, nil
}
Expand All @@ -136,7 +156,7 @@ func (ci *ChainIndex) roundDown(ctx context.Context, ts *types.TipSet) (*types.T

rounded, err := ci.walkBack(ctx, ts, target)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to walk back: %w", err)
}

return rounded, nil
Expand All @@ -156,7 +176,7 @@ func (ci *ChainIndex) walkBack(ctx context.Context, from *types.TipSet, to abi.C
for {
pts, err := ci.loadTipSet(ctx, ts.Parents())
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to load tipset: %w", err)
}

if to > pts.Height() {
Expand Down
3 changes: 2 additions & 1 deletion pkg/chain/chain_index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"fmt"
"math/rand"
"os"
"testing"

"github.com/filecoin-project/go-address"
Expand Down Expand Up @@ -32,7 +33,7 @@ func TestChainIndex(t *testing.T) {

head := links[linksCount-1]

DefaultChainIndexCacheSize = 10
_ = os.Setenv("CHAIN_INDEX_CACHE", "10")
chainIndex := NewChainIndex(builder.GetTipSet)
chainIndex.skipLength = 10

Expand Down
3 changes: 2 additions & 1 deletion pkg/chain/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"fmt"
"math/rand"
"os"
"testing"

"github.com/filecoin-project/go-address"
Expand Down Expand Up @@ -397,7 +398,7 @@ func TestLoadTipsetMeta(t *testing.T) {
requirePutBlocksToCborStore(t, cst, ts.ToSlice()...)
}

chain.DefaultChainIndexCacheSize = 2
_ = os.Setenv("CHAIN_INDEX_CACHE", "2")
chain.DefaultTipsetLruCacheSize = 2

cs := chain.NewStore(ds, bs, genTS.At(0).Cid(), chain.NewMockCirculatingSupplyCalculator())
Expand Down

0 comments on commit 511f1bd

Please sign in to comment.