Skip to content

Commit

Permalink
Merge pull request #511 from ipfs-force-community/fix/generate-index
Browse files Browse the repository at this point in the history
fix: generate direct deal index
  • Loading branch information
LinZexiao authored Mar 14, 2024
2 parents e09d628 + 9fab0c6 commit ece8be5
Show file tree
Hide file tree
Showing 10 changed files with 288 additions and 24 deletions.
1 change: 1 addition & 0 deletions cli/direct-deal.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ var getDirectDeal = &cli.Command{
{"SectorID", deal.SectorID},
{"Offset", deal.Offset},
{"Length", deal.Length},
{"PayloadSize", deal.PayloadSize},
{"StartEpoch", deal.StartEpoch},
{"EndEpoch", deal.EndEpoch},
}
Expand Down
31 changes: 26 additions & 5 deletions dagstore/market_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ import (

"github.com/filecoin-project/dagstore/mount"
"github.com/filecoin-project/dagstore/throttle"
"github.com/filecoin-project/go-fil-markets/piecestore"
"github.com/filecoin-project/go-padreader"
"github.com/filecoin-project/go-state-types/abi"
gatewayAPIV2 "github.com/filecoin-project/venus/venus-shared/api/gateway/v2"

marketMetrics "github.com/ipfs-force-community/droplet/v2/metrics"
Expand All @@ -31,8 +33,9 @@ type MarketAPI interface {
}

type marketAPI struct {
pieceStorageMgr *piecestorage.PieceStorageManager
pieceRepo repo.StorageDealRepo
pieceStorageMgr *piecestorage.PieceStorageManager
repo repo.Repo

useTransient bool
metricsCtx metrics.MetricsCtx
gatewayMarketClient gatewayAPIV2.IMarketClient
Expand All @@ -51,7 +54,7 @@ func NewMarketAPI(
concurrency int) MarketAPI {

return &marketAPI{
pieceRepo: repo.StorageDealRepo(),
repo: repo,
pieceStorageMgr: pieceStorageMgr,
useTransient: useTransient,
metricsCtx: ctx,
Expand All @@ -65,6 +68,24 @@ func (m *marketAPI) Start(_ context.Context) error {
return nil
}

func (m *marketAPI) getPieceInfo(ctx context.Context, pieceCID cid.Cid) (*piecestore.PieceInfo, error) {
pieceInfo, err := m.repo.StorageDealRepo().GetPieceInfo(ctx, pieceCID)
if err == nil {
return pieceInfo, nil
}

return m.repo.DirectDealRepo().GetPieceInfo(ctx, pieceCID)
}

func (m *marketAPI) getPieceSize(ctx context.Context, pieceCID cid.Cid) (uint64, abi.PaddedPieceSize, error) {
payloadSize, pieceSize, err := m.repo.StorageDealRepo().GetPieceSize(ctx, pieceCID)
if err == nil {
return payloadSize, pieceSize, nil
}

return m.repo.DirectDealRepo().GetPieceSize(ctx, pieceCID)
}

func (m *marketAPI) IsUnsealed(ctx context.Context, pieceCid cid.Cid) (bool, error) {
_, err := m.pieceStorageMgr.FindStorageForRead(ctx, pieceCid.String())
if err != nil {
Expand All @@ -79,7 +100,7 @@ func (m *marketAPI) IsUnsealed(ctx context.Context, pieceCid cid.Cid) (bool, err
}

func (m *marketAPI) FetchFromPieceStorage(ctx context.Context, pieceCid cid.Cid) (mount.Reader, error) {
payloadSize, pieceSize, err := m.pieceRepo.GetPieceSize(ctx, pieceCid)
payloadSize, pieceSize, err := m.getPieceSize(ctx, pieceCid)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -121,7 +142,7 @@ func (m *marketAPI) FetchFromPieceStorage(ctx context.Context, pieceCid cid.Cid)
}

func (m *marketAPI) GetUnpaddedCARSize(ctx context.Context, pieceCid cid.Cid) (uint64, error) {
pieceInfo, err := m.pieceRepo.GetPieceInfo(ctx, pieceCid)
pieceInfo, err := m.getPieceInfo(ctx, pieceCid)
if err != nil {
return 0, fmt.Errorf("failed to fetch pieceInfo for piece %s: %w", pieceCid, err)
}
Expand Down
49 changes: 49 additions & 0 deletions models/badger/direct_deal.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@ import (
"fmt"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-fil-markets/piecestore"
"github.com/filecoin-project/go-state-types/abi"
types "github.com/filecoin-project/venus/venus-shared/types/market"
"github.com/google/uuid"
"github.com/ipfs-force-community/droplet/v2/models/repo"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
)

Expand All @@ -22,6 +25,7 @@ type directDealRepo struct {

func (r *directDealRepo) SaveDeal(ctx context.Context, d *types.DirectDeal) error {
key := keyFromID(d.ID)
d.TimeStamp = makeRefreshedTimeStamp(&d.TimeStamp)
data, err := json.Marshal(d)
if err != nil {
return err
Expand Down Expand Up @@ -88,6 +92,51 @@ func (r *directDealRepo) GetDealsByMinerAndState(ctx context.Context, miner addr

return deals, nil
}

func (r *directDealRepo) GetPieceInfo(ctx context.Context, pieceCID cid.Cid) (*piecestore.PieceInfo, error) {
pieceInfo := piecestore.PieceInfo{
PieceCID: pieceCID,
Deals: nil,
}
var err error
if err = travelJSONAbleDS(ctx, r.ds, func(deal *types.DirectDeal) (bool, error) {
if deal.PieceCID.Equals(pieceCID) {
pieceInfo.Deals = append(pieceInfo.Deals, piecestore.DealInfo{
SectorID: deal.SectorID,
Offset: deal.Offset,
Length: deal.PieceSize,
})
}
return false, nil
}); err != nil {
return nil, err
}

if len(pieceInfo.Deals) == 0 {
err = repo.ErrNotFound
}

return &pieceInfo, err
}

func (r *directDealRepo) GetPieceSize(ctx context.Context, pieceCID cid.Cid) (uint64, abi.PaddedPieceSize, error) {
var deal *types.DirectDeal
err := travelJSONAbleDS(ctx, r.ds, func(inDeal *types.DirectDeal) (stop bool, err error) {
if inDeal.PieceCID == pieceCID && inDeal.State != types.DealExpired {
deal = inDeal
return true, nil
}
return false, nil
})
if err != nil {
return 0, 0, err
}
if deal == nil {
return 0, 0, repo.ErrNotFound
}
return deal.PayloadSize, deal.PieceSize, nil
}

func (r *directDealRepo) ListDeal(ctx context.Context, params types.DirectDealQueryParams) ([]*types.DirectDeal, error) {
var deals []*types.DirectDeal
end := params.Limit + params.Offset
Expand Down
63 changes: 63 additions & 0 deletions models/badger/direct_deal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ import (
"time"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-fil-markets/piecestore"
"github.com/filecoin-project/venus/venus-shared/testutil"
types "github.com/filecoin-project/venus/venus-shared/types/market"
"github.com/google/uuid"
"github.com/ipfs-force-community/droplet/v2/models/repo"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -109,3 +111,64 @@ func TestDirectDeal(t *testing.T) {
assert.Len(t, res, 10)
})
}

func prepareDirectDealTest(t *testing.T) (context.Context, repo.DirectDealRepo, []types.DirectDeal) {
ctx := context.Background()
repo := setup(t)
r := repo.DirectDealRepo()

dealCases := make([]types.DirectDeal, 10)
testutil.Provide(t, &dealCases)
dealCases[0].State = types.DealAllocation
return ctx, r, dealCases
}

func TestGetDirectDealPieceInfo(t *testing.T) {
ctx, r, dealCases := prepareDirectDealTest(t)

for _, deal := range dealCases {
err := r.SaveDeal(ctx, &deal)
assert.NoError(t, err)
}

// refresh UpdatedAt
for i := range dealCases {
res, err := r.GetDeal(ctx, dealCases[i].ID)
assert.NoError(t, err)
dealCases[i].UpdatedAt = res.UpdatedAt
}

res, err := r.GetPieceInfo(ctx, dealCases[0].PieceCID)
assert.NoError(t, err)
expect := piecestore.PieceInfo{
PieceCID: dealCases[0].PieceCID,
Deals: nil,
}
expect.Deals = append(expect.Deals, piecestore.DealInfo{
SectorID: dealCases[0].SectorID,
Offset: dealCases[0].Offset,
Length: dealCases[0].PieceSize,
})
assert.Equal(t, expect, *res)
}

func TestGetDirectDealPieceSize(t *testing.T) {
ctx, r, dealCases := prepareDirectDealTest(t)

for _, deal := range dealCases {
err := r.SaveDeal(ctx, &deal)
assert.NoError(t, err)
}

// refresh UpdatedAt
for i := range dealCases {
res, err := r.GetDeal(ctx, dealCases[i].ID)
assert.NoError(t, err)
dealCases[i].UpdatedAt = res.UpdatedAt
}

PLSize, PSize, err := r.GetPieceSize(ctx, dealCases[0].PieceCID)
assert.NoError(t, err)
assert.Equal(t, dealCases[0].PieceSize, PSize)
assert.Equal(t, dealCases[0].PayloadSize, PLSize)
}
2 changes: 1 addition & 1 deletion models/badger/storage_deal.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ func (sdr *storageDealRepo) GetPieceSize(ctx context.Context, pieceCID cid.Cid)
return false, nil
})
if err != nil {
return 0, 0, nil
return 0, 0, err
}
if deal == nil {
return 0, 0, repo.ErrNotFound
Expand Down
1 change: 1 addition & 0 deletions models/badger/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func WrapDbToRepo(db datastore.Batching) repo.Repo {
RetrAskDs: NewRetrievalAskDS(NewRetrievalProviderDS(db)),
CidInfoDs: NewCidInfoDs(NewPieceMetaDs(db)),
RetrievalDealsDs: NewRetrievalDealsDS(NewRetrievalProviderDS(db)),
DirectDealsDs: NewDirectDealsDS(db),
})
}

Expand Down
33 changes: 33 additions & 0 deletions models/mysql/direct_deal.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ import (
"context"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-fil-markets/piecestore"
"github.com/filecoin-project/go-state-types/abi"
types "github.com/filecoin-project/venus/venus-shared/types/market"
"github.com/google/uuid"
"github.com/ipfs-force-community/droplet/v2/models/repo"
"github.com/ipfs/go-cid"
"gorm.io/gorm"
)

Expand Down Expand Up @@ -151,6 +153,37 @@ func (ddr *directDealRepo) GetDealsByMinerAndState(ctx context.Context, miner ad
return out, nil
}

func (ddr *directDealRepo) GetPieceInfo(ctx context.Context, pieceCID cid.Cid) (*piecestore.PieceInfo, error) {
var deals []*directDeal
if err := ddr.DB.WithContext(ctx).Table(directDealTableName).Find(&deals, "piece_cid = ?", pieceCID.String()).Error; err != nil {
return nil, err
}

pieceInfo := piecestore.PieceInfo{
PieceCID: pieceCID,
Deals: nil,
}

for _, d := range deals {
pieceInfo.Deals = append(pieceInfo.Deals, piecestore.DealInfo{
SectorID: abi.SectorNumber(d.SectorID),
Offset: abi.PaddedPieceSize(d.Offset),
Length: abi.PaddedPieceSize(d.PieceSize),
})
}
return &pieceInfo, nil
}

func (ddr *directDealRepo) GetPieceSize(ctx context.Context, pieceCID cid.Cid) (uint64, abi.PaddedPieceSize, error) {
var deal directDeal
if err := ddr.WithContext(ctx).Table(directDealTableName).Take(&deal, "piece_cid = ? and state != ?",
pieceCID.String(), types.DealError).Error; err != nil {
return 0, 0, err
}

return deal.PayloadSize, abi.PaddedPieceSize(deal.PieceSize), nil
}

func (ddr *directDealRepo) ListDeal(ctx context.Context, params types.DirectDealQueryParams) ([]*types.DirectDeal, error) {
var deals []*directDeal

Expand Down
80 changes: 80 additions & 0 deletions models/mysql/direct_deal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"testing"

"github.com/DATA-DOG/go-sqlmock"
"github.com/filecoin-project/go-fil-markets/piecestore"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/venus/venus-shared/testutil"
types "github.com/filecoin-project/venus/venus-shared/types/market"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -141,3 +143,81 @@ func TestListDirectDeal(t *testing.T) {

assert.NoError(t, closeDB(mock, sqlDB))
}

func TestGetDirectDealPieceSize(t *testing.T) {
ctx := context.Background()
r, mock, sqlDB := setup(t)

var deals []*types.DirectDeal
testutil.Provide(t, &deals)
dbDeals := make([]*directDeal, 0, len(deals))
for _, deal := range deals {
dbDeals = append(dbDeals, fromDirectDeal(deal))
}

deal := deals[0]
dbDeal := dbDeals[0]

db, err := getMysqlDryrunDB()
assert.NoError(t, err)

rows, err := getFullRows(dbDeal)
assert.NoError(t, err)

var nullDeal *directDeal
sql, vars, err := getSQL(db.Table(directDealTableName).Take(&nullDeal, "piece_cid = ? and state != ?", DBCid(deal.PieceCID).String(), types.DealError))
assert.NoError(t, err)

mock.ExpectQuery(regexp.QuoteMeta(sql)).WithArgs(vars...).WillReturnRows(rows)

playLoadSize, paddedPieceSize, err := r.DirectDealRepo().GetPieceSize(ctx, deal.PieceCID)
assert.NoError(t, err)
assert.Equal(t, dbDeal.PayloadSize, playLoadSize)
assert.Equal(t, abi.PaddedPieceSize(dbDeal.PieceSize), paddedPieceSize)

assert.NoError(t, closeDB(mock, sqlDB))
}

func TestGetDirectDealPieceInfo(t *testing.T) {
ctx := context.Background()
r, mock, sqlDB := setup(t)

var deals []*types.DirectDeal
testutil.Provide(t, &deals)
dbDeals := make([]*directDeal, 0, len(deals))
for _, deal := range deals {
dbDeals = append(dbDeals, fromDirectDeal(deal))
}

deal := deals[0]
dbDeal := dbDeals[0]

db, err := getMysqlDryrunDB()
assert.NoError(t, err)

rows, err := getFullRows(dbDeal)
assert.NoError(t, err)

var nullDeal *directDeal
sql, vars, err := getSQL(db.Table(directDealTableName).Find(&nullDeal, "piece_cid = ?", DBCid(deal.PieceCID).String()))
assert.NoError(t, err)

mock.ExpectQuery(regexp.QuoteMeta(sql)).WithArgs(vars...).WillReturnRows(rows)

pInfo := &piecestore.PieceInfo{
PieceCID: deal.PieceCID,
Deals: []piecestore.DealInfo{
{
Offset: deal.Offset,
Length: deal.PieceSize,
SectorID: deal.SectorID,
},
},
}

res, err := r.DirectDealRepo().GetPieceInfo(ctx, deal.PieceCID)
assert.NoError(t, err)
assert.Equal(t, pInfo, res)

assert.NoError(t, closeDB(mock, sqlDB))
}
2 changes: 2 additions & 0 deletions models/repo/repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ type DirectDealRepo interface {
GetDeal(ctx context.Context, id uuid.UUID) (*types.DirectDeal, error)
GetDealByAllocationID(ctx context.Context, id uint64) (*types.DirectDeal, error)
GetDealsByMinerAndState(ctx context.Context, miner address.Address, state types.DirectDealState) ([]*types.DirectDeal, error)
GetPieceInfo(ctx context.Context, pieceCID cid.Cid) (*piecestore.PieceInfo, error)
GetPieceSize(ctx context.Context, pieceCID cid.Cid) (uint64, abi.PaddedPieceSize, error)
ListDeal(ctx context.Context, params types.DirectDealQueryParams) ([]*types.DirectDeal, error)
}

Expand Down
Loading

0 comments on commit ece8be5

Please sign in to comment.