Skip to content

Commit

Permalink
Parse indexed deals for data segment index
Browse files Browse the repository at this point in the history
this makes boost aware of the format in
https://github.com/filecoin-project/go-data-segment/
for new deals.

this is the same logic as in filecoin-project/lotus#10674
  • Loading branch information
willscott committed Jun 27, 2023
1 parent d2a7df9 commit f12299d
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 17 deletions.
9 changes: 5 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ require (
github.com/filecoin-project/go-commp-utils v0.1.4
github.com/filecoin-project/go-data-transfer v1.15.4-boost
github.com/filecoin-project/go-fil-commcid v0.1.0
github.com/filecoin-project/go-fil-commp-hashhash v0.1.0
github.com/filecoin-project/go-fil-commp-hashhash v0.2.0
github.com/filecoin-project/go-fil-markets v1.28.3
github.com/filecoin-project/go-jsonrpc v0.3.1
github.com/filecoin-project/go-padreader v0.0.1
github.com/filecoin-project/go-paramfetch v0.0.4
Expand Down Expand Up @@ -106,7 +107,7 @@ require (
go.uber.org/fx v1.19.3
go.uber.org/multierr v1.11.0
golang.org/x/crypto v0.10.0
golang.org/x/exp v0.0.0-20230321023759-10a507213a29
golang.org/x/exp v0.0.0-20230418202329-0354be287a23
golang.org/x/sync v0.2.0
golang.org/x/text v0.10.0
golang.org/x/tools v0.9.1
Expand Down Expand Up @@ -212,7 +213,6 @@ require (
github.com/ipfs/bbloom v0.0.4 // indirect
github.com/ipfs/go-bitfield v1.1.0 // indirect
github.com/ipfs/go-ds-badger2 v0.1.3 // indirect
github.com/ipfs/go-ds-leveldb v0.5.0
github.com/ipfs/go-ds-measure v0.2.0 // indirect
github.com/ipfs/go-fs-lock v0.0.7 // indirect
github.com/ipfs/go-ipfs-cmds v0.9.0 // indirect
Expand Down Expand Up @@ -335,10 +335,11 @@ require (
require (
github.com/filecoin-project/boost-gfm v1.26.7
github.com/filecoin-project/boost-graphsync v0.13.6
github.com/filecoin-project/go-data-segment v0.0.0-20230605095649-5d01fdd3e4a1
github.com/filecoin-project/go-data-transfer/v2 v2.0.0-rc7
github.com/filecoin-project/go-fil-markets v1.28.3
github.com/filecoin-project/lotus v1.23.2-0.20230622154405-168d022018ce
github.com/ipfs/boxo v0.10.1
github.com/ipfs/go-ds-leveldb v0.5.0
github.com/ipfs/kubo v0.21.0-rc1
github.com/ipni/go-libipni v0.0.8
github.com/schollz/progressbar/v3 v3.13.1
Expand Down
10 changes: 6 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,8 @@ github.com/filecoin-project/go-commp-utils/nonffi v0.0.0-20220905160352-62059082
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03/go.mod h1:+viYnvGtUTgJRdy6oaeF4MTFKAfatX071MPDPBL11EQ=
github.com/filecoin-project/go-crypto v0.0.1 h1:AcvpSGGCgjaY8y1az6AMfKQWreF/pWO2JJGLl6gCq6o=
github.com/filecoin-project/go-crypto v0.0.1/go.mod h1:+viYnvGtUTgJRdy6oaeF4MTFKAfatX071MPDPBL11EQ=
github.com/filecoin-project/go-data-segment v0.0.0-20230605095649-5d01fdd3e4a1 h1:yPNfH/4vjeLiip2N1QDN64kg+TFd9duY9kPIEKx2ujU=
github.com/filecoin-project/go-data-segment v0.0.0-20230605095649-5d01fdd3e4a1/go.mod h1:H0/NKbsRxmRFBcLibmABv+yFNHdmtl5AyplYLnb0Zv4=
github.com/filecoin-project/go-data-transfer v1.15.4-boost h1:rGsPDeDk0nbzLOPn/9iCIrhLNy69Vkr9tRBcetM4kd0=
github.com/filecoin-project/go-data-transfer v1.15.4-boost/go.mod h1:S5Es9uoD+3TveYyGjxZInAF6mSQtRjNzezV7Y7Sh8X0=
github.com/filecoin-project/go-data-transfer/v2 v2.0.0-rc7 h1:v+zJS5B6pA3ptWZS4t8tbt1Hz9qENnN4nVr1w99aSWc=
Expand All @@ -372,8 +374,8 @@ github.com/filecoin-project/go-ds-versioning v0.1.2/go.mod h1:C9/l9PnB1+mwPa26BB
github.com/filecoin-project/go-fil-commcid v0.0.0-20201016201715-d41df56b4f6a/go.mod h1:Eaox7Hvus1JgPrL5+M3+h7aSPHc0cVqpSxA+TxIEpZQ=
github.com/filecoin-project/go-fil-commcid v0.1.0 h1:3R4ds1A9r6cr8mvZBfMYxTS88OqLYEo6roi+GiIeOh8=
github.com/filecoin-project/go-fil-commcid v0.1.0/go.mod h1:Eaox7Hvus1JgPrL5+M3+h7aSPHc0cVqpSxA+TxIEpZQ=
github.com/filecoin-project/go-fil-commp-hashhash v0.1.0 h1:imrrpZWEHRnNqqv0tN7LXep5bFEVOVmQWHJvl2mgsGo=
github.com/filecoin-project/go-fil-commp-hashhash v0.1.0/go.mod h1:73S8WSEWh9vr0fDJVnKADhfIv/d6dCbAGaAGWbdJEI8=
github.com/filecoin-project/go-fil-commp-hashhash v0.2.0 h1:HYIUugzjq78YvV3vC6rL95+SfC/aSTVSnZSZiDV5pCk=
github.com/filecoin-project/go-fil-commp-hashhash v0.2.0/go.mod h1:VH3fAFOru4yyWar4626IoS5+VGE8SfZiBODJLUigEo4=
github.com/filecoin-project/go-fil-markets v1.28.3 h1:2cFu7tLZYrfNz4LnxjgERaVD7k5+Wwp0H76mnnTGPBk=
github.com/filecoin-project/go-fil-markets v1.28.3/go.mod h1:eryxo/oVgIxaR5g5CNr9PlvZOi+u/bak0IsPL/PT1hk=
github.com/filecoin-project/go-hamt-ipld v0.1.5 h1:uoXrKbCQZ49OHpsTCkrThPNelC4W3LPEk0OrS/ytIBM=
Expand Down Expand Up @@ -2103,8 +2105,8 @@ golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EH
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU=
golang.org/x/exp v0.0.0-20210615023648-acb5c1269671/go.mod h1:DVyR6MI7P4kEQgvZJSj1fQGrWIi2RzIrfYWycwheUAc=
golang.org/x/exp v0.0.0-20210714144626-1041f73d31d8/go.mod h1:DVyR6MI7P4kEQgvZJSj1fQGrWIi2RzIrfYWycwheUAc=
golang.org/x/exp v0.0.0-20230321023759-10a507213a29 h1:ooxPy7fPvB4kwsA2h+iBNHkAbp/4JxTSwCmvdjEYmug=
golang.org/x/exp v0.0.0-20230321023759-10a507213a29/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc=
golang.org/x/exp v0.0.0-20230418202329-0354be287a23 h1:4NKENAGIctmZYLK9W+X1kDK8ObBFqOSCJM6WE7CvkJY=
golang.org/x/exp v0.0.0-20230418202329-0354be287a23/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc=
golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js=
golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
golang.org/x/lint v0.0.0-20180702182130-06c8688daad7/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
Expand Down
68 changes: 59 additions & 9 deletions piecedirectory/piecedirectory.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/filecoin-project/boostd-data/model"
"github.com/filecoin-project/boostd-data/shared/tracing"
bdtypes "github.com/filecoin-project/boostd-data/svc/types"
"github.com/filecoin-project/go-data-segment/datasegment"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/markets/dagstore"
"github.com/hashicorp/go-multierror"
Expand Down Expand Up @@ -200,13 +201,35 @@ func (ps *PieceDirectory) addIndexForPiece(ctx context.Context, pieceCid cid.Cid
return fmt.Errorf("getting reader over piece %s: %w", pieceCid, err)
}

// Iterate over all the blocks in the piece to extract the index records
// Try to parse data as containing a data segment index
log.Debugw("add index: read index", "pieceCid", pieceCid)
recs, err := parseShardWithDataSegmentIndex(ctx, pieceCid, int64(dealInfo.PieceLength), reader)
if err != nil {
log.Debugw("add index: data segment check failed. falling back to car", "pieceCid", pieceCid, "err", err)
// Iterate over all the blocks in the piece to extract the index records
recs, err = parseRecordsFromCar(reader)
if err != nil {
return fmt.Errorf("for piece %s: %w", pieceCid, err)
}
}

// Add mh => piece index to store: "which piece contains the multihash?"
// Add mh => offset index to store: "what is the offset of the multihash within the piece?"
log.Debugw("add index: store index in local index directory", "pieceCid", pieceCid)
if err := ps.store.AddIndex(ctx, pieceCid, recs, true); err != nil {
return fmt.Errorf("adding CAR index for piece %s: %w", pieceCid, err)
}

return nil
}

func parseRecordsFromCar(reader io.Reader) ([]model.Record, error) {
// Iterate over all the blocks in the piece to extract the index records
recs := make([]model.Record, 0)
opts := []carv2.Option{carv2.ZeroLengthSectionAsEOF(true)}
blockReader, err := carv2.NewBlockReader(reader, opts...)
if err != nil {
return fmt.Errorf("getting block reader over piece %s: %w", pieceCid, err)
return nil, fmt.Errorf("getting block reader over piece: %w", err)
}

blockMetadata, err := blockReader.SkipNext()
Expand All @@ -222,17 +245,44 @@ func (ps *PieceDirectory) addIndexForPiece(ctx context.Context, pieceCid cid.Cid
blockMetadata, err = blockReader.SkipNext()
}
if !errors.Is(err, io.EOF) {
return fmt.Errorf("generating index for piece %s: %w", pieceCid, err)
return nil, fmt.Errorf("generating index for piece: %w", err)
}
return recs, nil
}

func parseShardWithDataSegmentIndex(ctx context.Context, pieceCid cid.Cid, size int64, r types.SectionReader) ([]model.Record, error) {
ps := abi.UnpaddedPieceSize(size).Padded()
dsis := datasegment.DataSegmentIndexStartOffset(ps)
if _, err := r.Seek(int64(dsis), io.SeekStart); err != nil {
return nil, fmt.Errorf("could not seek to data segment index: %w", err)
}
dataSegments, err := datasegment.ParseDataSegmentIndex(r)
if err != nil {
return nil, fmt.Errorf("could not parse data segment index: %w", err)
}
segments, err := dataSegments.ValidEntries()
if err != nil {
return nil, fmt.Errorf("could not calculate valid entries: %w", err)
}
if len(segments) == 0 {
return nil, fmt.Errorf("no data segments found")
}

// Add mh => piece index to store: "which piece contains the multihash?"
// Add mh => offset index to store: "what is the offset of the multihash within the piece?"
log.Debugw("add index: store index in local index directory", "pieceCid", pieceCid)
if err := ps.store.AddIndex(ctx, pieceCid, recs, true); err != nil {
return fmt.Errorf("adding CAR index for piece %s: %w", pieceCid, err)
recs := make([]model.Record, 0)
for _, s := range segments {
segOffset := s.UnpaddedOffest()
segSize := s.UnpaddedLength()

lr := io.NewSectionReader(r, int64(segOffset), int64(segSize))
subRecs, err := parseRecordsFromCar(lr)
if err != nil {
log.Debugw("Unexpected index format on generation in shard", "piece", pieceCid, "offset", segOffset)
continue
}
recs = append(recs, subRecs...)
}

return nil
return recs, nil
}

func (ps *PieceDirectory) BuildIndexForPiece(ctx context.Context, pieceCid cid.Cid) error {
Expand Down

0 comments on commit f12299d

Please sign in to comment.