Skip to content

Commit

Permalink
feat:splitstore:retain tipset references in hot store (#9960)
Browse files Browse the repository at this point in the history
* Retain tipset reference

Co-authored-by: zenground0 <[email protected]>
  • Loading branch information
ZenGround0 and ZenGround0 authored Jan 6, 2023
1 parent ccd08c2 commit 06393da
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 1 deletion.
23 changes: 22 additions & 1 deletion blockstore/splitstore/splitstore_compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -905,6 +905,10 @@ func (s *SplitStore) walkChain(ts *types.TipSet, inclState, inclMsgs abi.ChainEp
walkCnt := new(int64)
scanCnt := new(int64)

tsRef := func(blkCids []cid.Cid) (cid.Cid, error) {
return types.NewTipSetKey(blkCids...).Cid()
}

stopWalk := func(_ cid.Cid) error { return errStopWalk }

walkBlock := func(c cid.Cid) error {
Expand All @@ -926,11 +930,19 @@ func (s *SplitStore) walkChain(ts *types.TipSet, inclState, inclMsgs abi.ChainEp
err = s.view(c, func(data []byte) error {
return hdr.UnmarshalCBOR(bytes.NewBuffer(data))
})

if err != nil {
return xerrors.Errorf("error unmarshaling block header (cid: %s): %w", c, err)
}

// tipset CID references are retained
pRef, err := tsRef(hdr.Parents)
if err != nil {
return xerrors.Errorf("error computing cid reference to parent tipset")
}
if err := s.walkObjectIncomplete(pRef, visitor, fHot, stopWalk); err != nil {
return xerrors.Errorf("error walking parent tipset cid reference")
}

// message are retained if within the inclMsgs boundary
if hdr.Height >= inclMsgs && hdr.Height > 0 {
if inclMsgs < inclState {
Expand Down Expand Up @@ -981,6 +993,15 @@ func (s *SplitStore) walkChain(ts *types.TipSet, inclState, inclMsgs abi.ChainEp
return nil
}

// retain ref to chain head
hRef, err := tsRef(ts.Cids())
if err != nil {
return xerrors.Errorf("error computing cid reference to parent tipset")
}
if err := s.walkObjectIncomplete(hRef, visitor, fHot, stopWalk); err != nil {
return xerrors.Errorf("error walking parent tipset cid reference")
}

for len(toWalk) > 0 {
// walking can take a while, so check this with every opportunity
if err := s.checkClosing(); err != nil {
Expand Down
53 changes: 53 additions & 0 deletions itests/splitstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,49 @@ func TestMessagesMode(t *testing.T) {
assert.True(g.t, g.Exists(ctx, garbageM), "Garbage message not found in splitstore")
}

func TestCompactRetainsTipSetRef(t *testing.T) {
ctx := context.Background()
// disable sync checking because efficient itests require that the node is out of sync : /
splitstore.CheckSyncGap = false
opts := []interface{}{kit.MockProofs(), kit.SplitstoreDiscard()}
full, genesisMiner, ens := kit.EnsembleMinimal(t, opts...)
bm := ens.InterconnectAll().BeginMining(4 * time.Millisecond)[0]
_ = genesisMiner
_ = bm

check, err := full.ChainHead(ctx)
require.NoError(t, err)
e := check.Height()
checkRef, err := check.Key().Cid()
require.NoError(t, err)
assert.True(t, ipldExists(ctx, t, checkRef, full)) // reference to tipset key should be persisted before compaction

// Determine index of compaction that covers tipset "check" and wait for compaction
for {
bm.Pause()
if splitStoreCompacting(ctx, t, full) {
bm.Restart()
time.Sleep(1 * time.Second)
} else {
break
}
}
lastCompactionEpoch := splitStoreBaseEpoch(ctx, t, full)
garbageCompactionIndex := splitStoreCompactionIndex(ctx, t, full) + 1
boundary := lastCompactionEpoch + splitstore.CompactionThreshold - splitstore.CompactionBoundary

for e > boundary {
boundary += splitstore.CompactionThreshold - splitstore.CompactionBoundary
garbageCompactionIndex++
}
bm.Restart()

// wait for compaction to occur
waitForCompaction(ctx, t, garbageCompactionIndex, full)
assert.True(t, ipldExists(ctx, t, checkRef, full)) // reference to tipset key should be persisted after compaction
bm.Stop()
}

func waitForCompaction(ctx context.Context, t *testing.T, cIdx int64, n *kit.TestFullNode) {
for {
if splitStoreCompactionIndex(ctx, t, n) >= cIdx {
Expand Down Expand Up @@ -307,6 +350,16 @@ func splitStorePruneIndex(ctx context.Context, t *testing.T, n *kit.TestFullNode
return pruneIndex
}

func ipldExists(ctx context.Context, t *testing.T, c cid.Cid, n *kit.TestFullNode) bool {
_, err := n.ChainReadObj(ctx, c)
if ipld.IsNotFound(err) {
return false
} else if err != nil {
t.Fatalf("ChainReadObj failure on existence check: %s", err)
}
return true
}

// Create on chain unreachable garbage for a network to exercise splitstore
// one garbage cid created at a time
//
Expand Down

0 comments on commit 06393da

Please sign in to comment.