Skip to content

Commit

Permalink
refactor piece doctor
Browse files Browse the repository at this point in the history
  • Loading branch information
nonsense committed May 25, 2023
1 parent 3918e94 commit dfbecb9
Show file tree
Hide file tree
Showing 17 changed files with 353 additions and 356 deletions.
9 changes: 5 additions & 4 deletions cmd/boostd/recover.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/filecoin-project/boost/cmd/lib"
"github.com/filecoin-project/boost/db"
"github.com/filecoin-project/boost/piecedirectory"
bdclient "github.com/filecoin-project/boostd-data/client"
"github.com/filecoin-project/boostd-data/model"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-commp-utils/writer"
Expand Down Expand Up @@ -190,14 +191,14 @@ func action(cctx *cli.Context) error {
if ignoreLID {
pd = nil
} else {
pdClient := piecedirectory.NewStore()
defer pdClient.Close(ctx)
err = pdClient.Dial(ctx, cctx.String("api-lid"))
cl := bdclient.NewStore()
defer cl.Close(ctx)
err = cl.Dial(ctx, cctx.String("api-lid"))
if err != nil {
return fmt.Errorf("connecting to local index directory service: %w", err)
}
pr := &piecedirectory.SectorAccessorAsPieceReader{SectorAccessor: sa}
pd = piecedirectory.NewPieceDirectory(pdClient, pr, cctx.Int("add-index-throttle"))
pd = piecedirectory.NewPieceDirectory(cl, pr, cctx.Int("add-index-throttle"))
pd.Start(ctx)
}

Expand Down
13 changes: 7 additions & 6 deletions cmd/booster-bitswap/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/filecoin-project/boost/cmd/lib/remoteblockstore"
"github.com/filecoin-project/boost/metrics"
"github.com/filecoin-project/boost/piecedirectory"
bdclient "github.com/filecoin-project/boostd-data/client"
"github.com/filecoin-project/boostd-data/shared/tracing"
lcli "github.com/filecoin-project/lotus/cli"
"github.com/libp2p/go-libp2p/core/peer"
Expand Down Expand Up @@ -157,9 +158,9 @@ var runCmd = &cli.Command{
defer storageCloser()

// Connect to the local index directory service
pdClient := piecedirectory.NewStore()
defer pdClient.Close(ctx)
err = pdClient.Dial(ctx, cctx.String("api-lid"))
cl := bdclient.NewStore()
defer cl.Close(ctx)
err = cl.Dial(ctx, cctx.String("api-lid"))
if err != nil {
return fmt.Errorf("connecting to local index directory service: %w", err)
}
Expand Down Expand Up @@ -196,8 +197,8 @@ var runCmd = &cli.Command{
return fmt.Errorf("starting block filter: %w", err)
}
pr := &piecedirectory.SectorAccessorAsPieceReader{SectorAccessor: sa}
piecedirectory := piecedirectory.NewPieceDirectory(pdClient, pr, cctx.Int("add-index-throttle"))
remoteStore := remoteblockstore.NewRemoteBlockstore(piecedirectory, &bitswapBlockMetrics)
pd := piecedirectory.NewPieceDirectory(cl, pr, cctx.Int("add-index-throttle"))
remoteStore := remoteblockstore.NewRemoteBlockstore(pd, &bitswapBlockMetrics)
server := NewBitswapServer(remoteStore, host, multiFilter)

var proxyAddrInfo *peer.AddrInfo
Expand All @@ -210,7 +211,7 @@ var runCmd = &cli.Command{
}

// Start the local index directory
piecedirectory.Start(ctx)
pd.Start(ctx)

// Start the bitswap server
log.Infof("Starting booster-bitswap node on port %d", port)
Expand Down
15 changes: 8 additions & 7 deletions cmd/booster-http/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/filecoin-project/boost/cmd/lib/remoteblockstore"
"github.com/filecoin-project/boost/metrics"
"github.com/filecoin-project/boost/piecedirectory"
bdclient "github.com/filecoin-project/boostd-data/client"
"github.com/filecoin-project/boostd-data/model"
"github.com/filecoin-project/boostd-data/shared/tracing"
"github.com/filecoin-project/dagstore/mount"
Expand Down Expand Up @@ -127,9 +128,9 @@ var runCmd = &cli.Command{

// Connect to the local index directory service
ctx := lcli.ReqContext(cctx)
pdClient := piecedirectory.NewStore()
defer pdClient.Close(ctx)
err := pdClient.Dial(ctx, cctx.String("api-lid"))
cl := bdclient.NewStore()
defer cl.Close(ctx)
err := cl.Dial(ctx, cctx.String("api-lid"))
if err != nil {
return fmt.Errorf("connecting to local index directory service: %w", err)
}
Expand Down Expand Up @@ -168,7 +169,7 @@ var runCmd = &cli.Command{

// Create the server API
pr := &piecedirectory.SectorAccessorAsPieceReader{SectorAccessor: sa}
piecedirectory := piecedirectory.NewPieceDirectory(pdClient, pr, cctx.Int("add-index-throttle"))
pd := piecedirectory.NewPieceDirectory(cl, pr, cctx.Int("add-index-throttle"))

opts := &HttpServerOptions{
ServePieces: servePieces,
Expand Down Expand Up @@ -199,11 +200,11 @@ var runCmd = &cli.Command{
GetSizeFailResponseCount: metrics.HttpRblsGetSizeFailResponseCount,
GetSizeSuccessResponseCount: metrics.HttpRblsGetSizeSuccessResponseCount,
}
rbs := remoteblockstore.NewRemoteBlockstore(piecedirectory, &httpBlockMetrics)
rbs := remoteblockstore.NewRemoteBlockstore(pd, &httpBlockMetrics)
filtered := filters.NewFilteredBlockstore(rbs, multiFilter)
opts.Blockstore = filtered
}
sapi := serverApi{ctx: ctx, piecedirectory: piecedirectory, sa: sa}
sapi := serverApi{ctx: ctx, piecedirectory: pd, sa: sa}
server := NewHttpServer(
cctx.String("base-path"),
cctx.Int("port"),
Expand All @@ -212,7 +213,7 @@ var runCmd = &cli.Command{
)

// Start the local index directory
piecedirectory.Start(ctx)
pd.Start(ctx)

// Start the server
log.Infof("Starting booster-http node on port %d with base path '%s'",
Expand Down
28 changes: 28 additions & 0 deletions extern/boostd-data/clientutil/clientutil.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package clientutil

import (
"context"
"fmt"

"github.com/filecoin-project/boostd-data/client"
"github.com/filecoin-project/boostd-data/svc"
)

func NewTestStore(ctx context.Context) *client.Store {
bdsvc, err := svc.NewLevelDB("")
if err != nil {
panic(err)
}
ln, err := bdsvc.Start(ctx, "localhost:0")
if err != nil {
panic(err)
}

cl := client.NewStore()
err = cl.Dial(ctx, fmt.Sprintf("ws://%s", ln.String()))
if err != nil {
panic(err)
}

return cl
}
2 changes: 1 addition & 1 deletion extern/boostd-data/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func runAction(cctx *cli.Context, dbType string, store *svc.Service) error {

// Start the server
addr := cctx.String("addr")
err = store.Start(ctx, addr)
_, err = store.Start(ctx, addr)
if err != nil {
return fmt.Errorf("starting %s store: %w", dbType, err)
}
Expand Down
8 changes: 4 additions & 4 deletions extern/boostd-data/svc/svc.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,15 @@ func MakeLevelDBDir(repoPath string) (string, error) {
return repoPath, nil
}

func (s *Service) Start(ctx context.Context, addr string) error {
func (s *Service) Start(ctx context.Context, addr string) (net.Addr, error) {
ln, err := net.Listen("tcp", addr)
if err != nil {
return fmt.Errorf("setting up listener for local index directory service: %w", err)
return nil, fmt.Errorf("setting up listener for local index directory service: %w", err)
}

err = s.Impl.Start(ctx)
if err != nil {
return fmt.Errorf("starting local index directory service: %w", err)
return nil, fmt.Errorf("starting local index directory service: %w", err)
}

server := jsonrpc.NewServer()
Expand Down Expand Up @@ -97,5 +97,5 @@ func (s *Service) Start(ctx context.Context, addr string) error {
<-done
}()

return nil
return ln.Addr(), nil
}
36 changes: 28 additions & 8 deletions extern/boostd-data/svc/svc_size_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,19 @@ import (
"context"
"encoding/json"
"fmt"
"math"
"math/rand"
"testing"
"time"

"github.com/filecoin-project/boost/testutil"
"github.com/filecoin-project/boostd-data/client"
"github.com/filecoin-project/boostd-data/model"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
"github.com/ipld/go-car/v2/index"
"github.com/multiformats/go-multihash"
"github.com/stretchr/testify/require"
"math"
"math/rand"
"testing"
"time"
)

var tlg = logging.Logger("tlg")
Expand All @@ -33,7 +36,7 @@ func TestSizeLimit(t *testing.T) {
bdsvc, err := NewLevelDB("")
require.NoError(t, err)

testSizeLimit(ctx, t, bdsvc, "localhost:8042")
testSizeLimit(ctx, t, bdsvc, "localhost:0")
})

t.Run("yugabyte", func(t *testing.T) {
Expand All @@ -43,17 +46,17 @@ func TestSizeLimit(t *testing.T) {

bdsvc := NewYugabyte(TestYugabyteSettings)

addr := "localhost:8044"
addr := "localhost:0"
testSizeLimit(ctx, t, bdsvc, addr)
})
}

func testSizeLimit(ctx context.Context, t *testing.T, bdsvc *Service, addr string) {
err := bdsvc.Start(ctx, addr)
ln, err := bdsvc.Start(ctx, addr)
require.NoError(t, err)

cl := client.NewStore()
err = cl.Dial(context.Background(), fmt.Sprintf("ws://%s", addr))
err = cl.Dial(context.Background(), fmt.Sprintf("ws://%s", ln.String()))
require.NoError(t, err)
defer cl.Close(ctx)

Expand Down Expand Up @@ -119,3 +122,20 @@ func generateRandomCid(baseCid []byte) (cid.Cid, error) {

return c, nil
}

func toEntries(idx index.Index) (map[string]uint64, bool) {
it, ok := idx.(index.IterableIndex)
if !ok {
return nil, false
}

entries := make(map[string]uint64)
err := it.ForEach(func(mh multihash.Multihash, o uint64) error {
entries[mh.String()] = o
return nil
})
if err != nil {
return nil, false
}
return entries, true
}
17 changes: 0 additions & 17 deletions extern/boostd-data/svc/svc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,23 +449,6 @@ func compareIndices(subject, subjectDb index.Index) (bool, error) {
return equal, nil
}

func toEntries(idx index.Index) (map[string]uint64, bool) {
it, ok := idx.(index.IterableIndex)
if !ok {
return nil, false
}

entries := make(map[string]uint64)
err := it.ForEach(func(mh multihash.Multihash, o uint64) error {
entries[mh.String()] = o
return nil
})
if err != nil {
return nil, false
}
return entries, true
}

func TestCleanup(t *testing.T) {
_ = logging.SetLogLevel("*", "debug")

Expand Down
Loading

0 comments on commit dfbecb9

Please sign in to comment.