Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

piece doctor and sector state manager refactor #1463

Merged
merged 42 commits into from
Jun 1, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
1f90ebd
fix timer.Reset and improve logs
nonsense May 23, 2023
1b2264c
revert randomization
nonsense May 24, 2023
8409e5b
piece doc: handle errors
nonsense May 24, 2023
a9a7b07
adjust piece check
nonsense May 24, 2023
3918e94
refactor unsealsectormanager
nonsense May 25, 2023
dfbecb9
refactor piece doctor
nonsense May 25, 2023
9a8e89c
add random ports
nonsense May 25, 2023
7554280
ignore tests
nonsense May 25, 2023
cdf4192
add version to boostd-data
nonsense May 25, 2023
155a103
fix ctx in Start
nonsense May 26, 2023
356853c
fix: add reader mock to fix tests
dirkmc May 26, 2023
e5f68d5
fix: pass new piece directory to provider on test restart
dirkmc May 26, 2023
b92eed2
fix synchronisation
nonsense May 26, 2023
c9e4d2b
Merge branch 'nonsense/piece-doctor-logs' of ssh://github.com/filecoi…
nonsense May 26, 2023
f9bb53e
note that panics are not propagated in tests
nonsense May 26, 2023
1161a14
carv1 panics piece directory
nonsense May 26, 2023
4817efb
print panics
nonsense May 26, 2023
39724ee
fix: use reader that supports Seek in piece reader mock
dirkmc May 26, 2023
5a94a70
fix: reset mock car reader on each invocation
dirkmc May 26, 2023
2a59356
fix: TestOfflineDealDataCleanup
dirkmc May 26, 2023
0f720fe
add check for nil cancel func
nonsense May 26, 2023
85a9241
bump min check period for LevelDB to 5 minutes
nonsense May 26, 2023
22e7eaf
check if sector state mgr is initialised
nonsense May 26, 2023
ef31c83
debug line for unflagging
nonsense May 26, 2023
4c24fbc
commenting out TestMultipleDealsConcurrent -- flaky test -- works loc…
nonsense May 26, 2023
b44c1b8
add SectorStateUpdates pubsub
nonsense May 30, 2023
f8b3cf4
add close for pubsub
nonsense May 30, 2023
eed2b5a
add mock sectorstatemgr
nonsense May 30, 2023
b18bcd5
add wrapper tests
nonsense May 30, 2023
289ccb4
fixup
nonsense May 30, 2023
db908c3
cleanup
nonsense May 30, 2023
36aa47b
cleanup
nonsense May 31, 2023
afc055e
better names
nonsense May 31, 2023
174678e
t.Skip for test
nonsense May 31, 2023
6e9ca99
remove TODO above println for panic
nonsense May 31, 2023
ecce469
add unit tests for refreshState
nonsense May 31, 2023
e572b06
rename tests
nonsense May 31, 2023
6627cbd
more cases
nonsense Jun 1, 2023
38d084c
more tests
nonsense Jun 1, 2023
734ac01
update description
nonsense Jun 1, 2023
f91ae9f
better comment
nonsense Jun 1, 2023
8f67c82
better names and comments
nonsense Jun 1, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cmd/boostd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ func before(cctx *cli.Context) error {
_ = logging.SetLogLevel("piecedir", "INFO")
_ = logging.SetLogLevel("index-provider-wrapper", "INFO")
_ = logging.SetLogLevel("unsmgr", "INFO")
_ = logging.SetLogLevel("piecedoc", "INFO")
_ = logging.SetLogLevel("piecedirectory", "INFO")

if cliutil.IsVeryVerbose {
_ = logging.SetLogLevel("boostd", "DEBUG")
Expand All @@ -83,6 +85,8 @@ func before(cctx *cli.Context) error {
_ = logging.SetLogLevel("piecedir", "DEBUG")
_ = logging.SetLogLevel("fxlog", "DEBUG")
_ = logging.SetLogLevel("unsmgr", "DEBUG")
_ = logging.SetLogLevel("piecedoc", "DEBUG")
_ = logging.SetLogLevel("piecedirectory", "DEBUG")
}

return nil
Expand Down
9 changes: 5 additions & 4 deletions cmd/boostd/recover.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/filecoin-project/boost/cmd/lib"
"github.com/filecoin-project/boost/db"
"github.com/filecoin-project/boost/piecedirectory"
bdclient "github.com/filecoin-project/boostd-data/client"
"github.com/filecoin-project/boostd-data/model"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-commp-utils/writer"
Expand Down Expand Up @@ -190,14 +191,14 @@ func action(cctx *cli.Context) error {
if ignoreLID {
pd = nil
} else {
pdClient := piecedirectory.NewStore()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This store is really the boostd-data service and NOT the piece directory.

defer pdClient.Close(ctx)
err = pdClient.Dial(ctx, cctx.String("api-lid"))
cl := bdclient.NewStore()
defer cl.Close(ctx)
err = cl.Dial(ctx, cctx.String("api-lid"))
if err != nil {
return fmt.Errorf("connecting to local index directory service: %w", err)
}
pr := &piecedirectory.SectorAccessorAsPieceReader{SectorAccessor: sa}
pd = piecedirectory.NewPieceDirectory(pdClient, pr, cctx.Int("add-index-throttle"))
pd = piecedirectory.NewPieceDirectory(cl, pr, cctx.Int("add-index-throttle"))
pd.Start(ctx)
}

Expand Down
13 changes: 7 additions & 6 deletions cmd/booster-bitswap/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/filecoin-project/boost/cmd/lib/remoteblockstore"
"github.com/filecoin-project/boost/metrics"
"github.com/filecoin-project/boost/piecedirectory"
bdclient "github.com/filecoin-project/boostd-data/client"
"github.com/filecoin-project/boostd-data/shared/tracing"
lcli "github.com/filecoin-project/lotus/cli"
"github.com/libp2p/go-libp2p/core/peer"
Expand Down Expand Up @@ -157,9 +158,9 @@ var runCmd = &cli.Command{
defer storageCloser()

// Connect to the local index directory service
pdClient := piecedirectory.NewStore()
defer pdClient.Close(ctx)
err = pdClient.Dial(ctx, cctx.String("api-lid"))
cl := bdclient.NewStore()
defer cl.Close(ctx)
err = cl.Dial(ctx, cctx.String("api-lid"))
if err != nil {
return fmt.Errorf("connecting to local index directory service: %w", err)
}
Expand Down Expand Up @@ -196,8 +197,8 @@ var runCmd = &cli.Command{
return fmt.Errorf("starting block filter: %w", err)
}
pr := &piecedirectory.SectorAccessorAsPieceReader{SectorAccessor: sa}
piecedirectory := piecedirectory.NewPieceDirectory(pdClient, pr, cctx.Int("add-index-throttle"))
remoteStore := remoteblockstore.NewRemoteBlockstore(piecedirectory, &bitswapBlockMetrics)
pd := piecedirectory.NewPieceDirectory(cl, pr, cctx.Int("add-index-throttle"))
remoteStore := remoteblockstore.NewRemoteBlockstore(pd, &bitswapBlockMetrics)
server := NewBitswapServer(remoteStore, host, multiFilter)

var proxyAddrInfo *peer.AddrInfo
Expand All @@ -210,7 +211,7 @@ var runCmd = &cli.Command{
}

// Start the local index directory
piecedirectory.Start(ctx)
pd.Start(ctx)

// Start the bitswap server
log.Infof("Starting booster-bitswap node on port %d", port)
Expand Down
15 changes: 8 additions & 7 deletions cmd/booster-http/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/filecoin-project/boost/cmd/lib/remoteblockstore"
"github.com/filecoin-project/boost/metrics"
"github.com/filecoin-project/boost/piecedirectory"
bdclient "github.com/filecoin-project/boostd-data/client"
"github.com/filecoin-project/boostd-data/model"
"github.com/filecoin-project/boostd-data/shared/tracing"
"github.com/filecoin-project/dagstore/mount"
Expand Down Expand Up @@ -127,9 +128,9 @@ var runCmd = &cli.Command{

// Connect to the local index directory service
ctx := lcli.ReqContext(cctx)
pdClient := piecedirectory.NewStore()
defer pdClient.Close(ctx)
err := pdClient.Dial(ctx, cctx.String("api-lid"))
cl := bdclient.NewStore()
defer cl.Close(ctx)
err := cl.Dial(ctx, cctx.String("api-lid"))
if err != nil {
return fmt.Errorf("connecting to local index directory service: %w", err)
}
Expand Down Expand Up @@ -168,7 +169,7 @@ var runCmd = &cli.Command{

// Create the server API
pr := &piecedirectory.SectorAccessorAsPieceReader{SectorAccessor: sa}
piecedirectory := piecedirectory.NewPieceDirectory(pdClient, pr, cctx.Int("add-index-throttle"))
pd := piecedirectory.NewPieceDirectory(cl, pr, cctx.Int("add-index-throttle"))

opts := &HttpServerOptions{
ServePieces: servePieces,
Expand Down Expand Up @@ -199,11 +200,11 @@ var runCmd = &cli.Command{
GetSizeFailResponseCount: metrics.HttpRblsGetSizeFailResponseCount,
GetSizeSuccessResponseCount: metrics.HttpRblsGetSizeSuccessResponseCount,
}
rbs := remoteblockstore.NewRemoteBlockstore(piecedirectory, &httpBlockMetrics)
rbs := remoteblockstore.NewRemoteBlockstore(pd, &httpBlockMetrics)
filtered := filters.NewFilteredBlockstore(rbs, multiFilter)
opts.Blockstore = filtered
}
sapi := serverApi{ctx: ctx, piecedirectory: piecedirectory, sa: sa}
sapi := serverApi{ctx: ctx, piecedirectory: pd, sa: sa}
server := NewHttpServer(
cctx.String("base-path"),
cctx.Int("port"),
Expand All @@ -212,7 +213,7 @@ var runCmd = &cli.Command{
)

// Start the local index directory
piecedirectory.Start(ctx)
pd.Start(ctx)

// Start the server
log.Infof("Starting booster-http node on port %d with base path '%s'",
Expand Down
28 changes: 28 additions & 0 deletions extern/boostd-data/clientutil/clientutil.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package clientutil

import (
"context"
"fmt"

"github.com/filecoin-project/boostd-data/client"
"github.com/filecoin-project/boostd-data/svc"
)

func NewTestStore(ctx context.Context) *client.Store {
bdsvc, err := svc.NewLevelDB("")
if err != nil {
panic(err)
}
ln, err := bdsvc.Start(ctx, "localhost:0")
if err != nil {
panic(err)
}

cl := client.NewStore()
err = cl.Dial(ctx, fmt.Sprintf("ws://%s", ln.String()))
if err != nil {
panic(err)
}

return cl
}
2 changes: 1 addition & 1 deletion extern/boostd-data/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func runAction(cctx *cli.Context, dbType string, store *svc.Service) error {

// Start the server
addr := cctx.String("addr")
err = store.Start(ctx, addr)
_, err = store.Start(ctx, addr)
if err != nil {
return fmt.Errorf("starting %s store: %w", dbType, err)
}
Expand Down
8 changes: 4 additions & 4 deletions extern/boostd-data/svc/svc.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,15 @@ func MakeLevelDBDir(repoPath string) (string, error) {
return repoPath, nil
}

func (s *Service) Start(ctx context.Context, addr string) error {
func (s *Service) Start(ctx context.Context, addr string) (net.Addr, error) {
ln, err := net.Listen("tcp", addr)
if err != nil {
return fmt.Errorf("setting up listener for local index directory service: %w", err)
return nil, fmt.Errorf("setting up listener for local index directory service: %w", err)
}

err = s.Impl.Start(ctx)
if err != nil {
return fmt.Errorf("starting local index directory service: %w", err)
return nil, fmt.Errorf("starting local index directory service: %w", err)
}

server := jsonrpc.NewServer()
Expand Down Expand Up @@ -97,5 +97,5 @@ func (s *Service) Start(ctx context.Context, addr string) error {
<-done
}()

return nil
return ln.Addr(), nil
}
36 changes: 28 additions & 8 deletions extern/boostd-data/svc/svc_size_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,19 @@ import (
"context"
"encoding/json"
"fmt"
"math"
"math/rand"
"testing"
"time"

"github.com/filecoin-project/boost/testutil"
"github.com/filecoin-project/boostd-data/client"
"github.com/filecoin-project/boostd-data/model"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
"github.com/ipld/go-car/v2/index"
"github.com/multiformats/go-multihash"
"github.com/stretchr/testify/require"
"math"
"math/rand"
"testing"
"time"
)

var tlg = logging.Logger("tlg")
Expand All @@ -33,7 +36,7 @@ func TestSizeLimit(t *testing.T) {
bdsvc, err := NewLevelDB("")
require.NoError(t, err)

testSizeLimit(ctx, t, bdsvc, "localhost:8042")
testSizeLimit(ctx, t, bdsvc, "localhost:0")
})

t.Run("yugabyte", func(t *testing.T) {
Expand All @@ -43,17 +46,17 @@ func TestSizeLimit(t *testing.T) {

bdsvc := NewYugabyte(TestYugabyteSettings)

addr := "localhost:8044"
addr := "localhost:0"
testSizeLimit(ctx, t, bdsvc, addr)
})
}

func testSizeLimit(ctx context.Context, t *testing.T, bdsvc *Service, addr string) {
err := bdsvc.Start(ctx, addr)
ln, err := bdsvc.Start(ctx, addr)
require.NoError(t, err)

cl := client.NewStore()
err = cl.Dial(context.Background(), fmt.Sprintf("ws://%s", addr))
err = cl.Dial(context.Background(), fmt.Sprintf("ws://%s", ln.String()))
require.NoError(t, err)
defer cl.Close(ctx)

Expand Down Expand Up @@ -119,3 +122,20 @@ func generateRandomCid(baseCid []byte) (cid.Cid, error) {

return c, nil
}

func toEntries(idx index.Index) (map[string]uint64, bool) {
it, ok := idx.(index.IterableIndex)
if !ok {
return nil, false
}

entries := make(map[string]uint64)
err := it.ForEach(func(mh multihash.Multihash, o uint64) error {
entries[mh.String()] = o
return nil
})
if err != nil {
return nil, false
}
return entries, true
}
Loading