Skip to content

Commit

Permalink
Dynamic Retrieval Pricing (#542)
Browse files Browse the repository at this point in the history
* refactor for dynamic pricing

* test differential pricing

* dynamic pricing

* changes for dynamic pricing

* Apply suggestions from code review

Co-authored-by: raulk <[email protected]>
Co-authored-by: dirkmc <[email protected]>

* changes as per review and test json

* Apply suggestions from code review

Co-authored-by: dirkmc <[email protected]>

* fix compilation

* changes as per review

* default retrieval pricing function

* fix compilation

* test the default pricing function

* fix bug in quoting price

* fix: go mod tidy

Co-authored-by: raulk <[email protected]>
Co-authored-by: dirkmc <[email protected]>
  • Loading branch information
3 people authored Jun 4, 2021
1 parent 672687c commit 32e5cce
Show file tree
Hide file tree
Showing 17 changed files with 1,224 additions and 130 deletions.
53 changes: 37 additions & 16 deletions retrievalmarket/impl/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,24 @@ func TestClientCanMakeQueryToProvider(t *testing.T) {
expectedQR.Status = retrievalmarket.QueryResponseUnavailable
expectedQR.Size = 0
actualQR, err := client.Query(bgCtx, retrievalPeer, missingPiece, retrievalmarket.QueryParams{})
actualQR.MaxPaymentInterval = expectedQR.MaxPaymentInterval
actualQR.MinPricePerByte = expectedQR.MinPricePerByte
actualQR.MaxPaymentIntervalIncrease = expectedQR.MaxPaymentIntervalIncrease
actualQR.UnsealPrice = expectedQR.UnsealPrice
assert.NoError(t, err)
assert.Equal(t, expectedQR, actualQR)
})

t.Run("when there is some other error, returns error", func(t *testing.T) {
unknownPiece := tut.GenerateCids(1)[0]
expectedQR.Status = retrievalmarket.QueryResponseError
expectedQR.Message = "get cid info: GetCIDInfo failed"
expectedQR.Message = "failed to fetch piece to retrieve from: get cid info: GetCIDInfo failed"
actualQR, err := client.Query(bgCtx, retrievalPeer, unknownPiece, retrievalmarket.QueryParams{})
assert.NoError(t, err)
actualQR.MaxPaymentInterval = expectedQR.MaxPaymentInterval
actualQR.MinPricePerByte = expectedQR.MinPricePerByte
actualQR.MaxPaymentIntervalIncrease = expectedQR.MaxPaymentIntervalIncrease
actualQR.UnsealPrice = expectedQR.UnsealPrice
assert.Equal(t, expectedQR, actualQR)
})

Expand Down Expand Up @@ -148,21 +156,29 @@ func requireSetupTestClientAndProvider(ctx context.Context, t *testing.T, payChA
testutil.StartAndWaitForReady(ctx, t, dt2)
require.NoError(t, err)
providerDs := namespace.Wrap(testData.Ds2, datastore.NewKey("/retrievals/provider"))
provider, err := retrievalimpl.NewProvider(paymentAddress, providerNode, nw2, pieceStore, testData.MultiStore2, dt2, providerDs)

priceFunc := func(ctx context.Context, dealPricingParams retrievalmarket.PricingInput) (retrievalmarket.Ask, error) {
ask := retrievalmarket.Ask{}
ask.PaymentInterval = expectedQR.MaxPaymentInterval
ask.PaymentIntervalIncrease = expectedQR.MaxPaymentIntervalIncrease
ask.PricePerByte = expectedQR.MinPricePerByte
ask.UnsealPrice = expectedQR.UnsealPrice
return ask, nil
}

provider, err := retrievalimpl.NewProvider(paymentAddress, providerNode, nw2, pieceStore, testData.MultiStore2, dt2, providerDs,
priceFunc)
require.NoError(t, err)

ask := provider.GetAsk()
ask.PaymentInterval = expectedQR.MaxPaymentInterval
ask.PaymentIntervalIncrease = expectedQR.MaxPaymentIntervalIncrease
ask.PricePerByte = expectedQR.MinPricePerByte
ask.UnsealPrice = expectedQR.UnsealPrice
provider.SetAsk(ask)
tut.StartAndWaitForReady(ctx, t, provider)
retrievalPeer := retrievalmarket.RetrievalPeer{
Address: paymentAddress,
ID: testData.Host2.ID(),
}
rcNode1.ExpectKnownAddresses(retrievalPeer, nil)

expectedQR.Size = uint64(abi.PaddedPieceSize(expectedQR.Size).Unpadded())

return client, expectedCIDs, missingCID, expectedQR, retrievalPeer, provider
}

Expand Down Expand Up @@ -384,12 +400,14 @@ func TestClientCanMakeDealWithProvider(t *testing.T) {
PieceCID: tut.GenerateCids(1)[0],
Deals: []piecestore.DealInfo{
{
DealID: abi.DealID(100),
SectorID: sectorID,
Offset: offset,
Length: abi.UnpaddedPieceSize(len(carData)).Padded(),
},
},
}
providerNode.ExpectPricingParams(pieceInfo.PieceCID, []abi.DealID{100})
if testCase.failsUnseal {
providerNode.ExpectFailedUnseal(sectorID, offset.Unpadded(), abi.UnpaddedPieceSize(len(carData)))
} else {
Expand Down Expand Up @@ -664,18 +682,21 @@ func setupProvider(
if disableNewDeals {
opts = append(opts, retrievalimpl.DisableNewDeals())
}

priceFunc := func(ctx context.Context, dealPricingParams retrievalmarket.PricingInput) (retrievalmarket.Ask, error) {
ask := retrievalmarket.Ask{}
ask.PaymentInterval = expectedQR.MaxPaymentInterval
ask.PaymentIntervalIncrease = expectedQR.MaxPaymentIntervalIncrease
ask.PricePerByte = expectedQR.MinPricePerByte
ask.UnsealPrice = expectedQR.UnsealPrice
return ask, nil
}

provider, err := retrievalimpl.NewProvider(providerPaymentAddr, providerNode, nw2,
pieceStore, testData.MultiStore2, dt2, providerDs,
pieceStore, testData.MultiStore2, dt2, providerDs, priceFunc,
opts...)
require.NoError(t, err)

ask := provider.GetAsk()

ask.PaymentInterval = expectedQR.MaxPaymentInterval
ask.PaymentIntervalIncrease = expectedQR.MaxPaymentIntervalIncrease
ask.PricePerByte = expectedQR.MinPricePerByte
ask.UnsealPrice = expectedQR.UnsealPrice
provider.SetAsk(ask)
return provider
}

Expand Down
177 changes: 139 additions & 38 deletions retrievalmarket/impl/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package retrievalimpl
import (
"context"
"errors"
"fmt"
"time"

"github.com/hannahhoward/go-pubsub"
"github.com/ipfs/go-cid"
Expand All @@ -15,6 +17,8 @@ import (
versioning "github.com/filecoin-project/go-ds-versioning/pkg"
versionedfsm "github.com/filecoin-project/go-ds-versioning/pkg/fsm"
"github.com/filecoin-project/go-multistore"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/go-statemachine/fsm"

"github.com/filecoin-project/go-fil-markets/piecestore"
Expand All @@ -34,6 +38,10 @@ type RetrievalProviderOption func(p *Provider)
// DealDecider is a function that makes a decision about whether to accept a deal
type DealDecider func(ctx context.Context, state retrievalmarket.ProviderDealState) (bool, string, error)

type RetrievalPricingFunc func(ctx context.Context, dealPricingParams retrievalmarket.PricingInput) (retrievalmarket.Ask, error)

var queryTimeout = 5 * time.Second

// Provider is the production implementation of the RetrievalProvider interface
type Provider struct {
multiStore *multistore.MultiStore
Expand All @@ -51,6 +59,7 @@ type Provider struct {
dealDecider DealDecider
askStore retrievalmarket.AskStore
disableNewDeals bool
retrievalPricingFunc RetrievalPricingFunc
}

type internalProviderEvent struct {
Expand Down Expand Up @@ -95,24 +104,31 @@ func NewProvider(minerAddress address.Address,
multiStore *multistore.MultiStore,
dataTransfer datatransfer.Manager,
ds datastore.Batching,
retrievalPricingFunc RetrievalPricingFunc,
opts ...RetrievalProviderOption,
) (retrievalmarket.RetrievalProvider, error) {

if retrievalPricingFunc == nil {
return nil, xerrors.New("retrievalPricingFunc is nil")
}

p := &Provider{
multiStore: multiStore,
dataTransfer: dataTransfer,
node: node,
network: network,
minerAddress: minerAddress,
pieceStore: pieceStore,
subscribers: pubsub.New(providerDispatcher),
readySub: pubsub.New(shared.ReadyDispatcher),
multiStore: multiStore,
dataTransfer: dataTransfer,
node: node,
network: network,
minerAddress: minerAddress,
pieceStore: pieceStore,
subscribers: pubsub.New(providerDispatcher),
readySub: pubsub.New(shared.ReadyDispatcher),
retrievalPricingFunc: retrievalPricingFunc,
}

err := shared.MoveKey(ds, "retrieval-ask", "retrieval-ask/latest")
if err != nil {
return nil, err
}

askStore, err := askstore.NewAskStore(namespace.Wrap(ds, datastore.NewKey("retrieval-ask")), datastore.NewKey("latest"))
if err != nil {
return nil, err
Expand Down Expand Up @@ -234,6 +250,7 @@ func (p *Provider) GetAsk() *retrievalmarket.Ask {

// SetAsk sets the deal parameters this provider accepts
func (p *Provider) SetAsk(ask *retrievalmarket.Ask) {

err := p.askStore.SetAsk(ask)

if err != nil {
Expand Down Expand Up @@ -268,63 +285,128 @@ A Provider handling a retrieval `Query` does the following:
The connection is kept open only as long as the query-response exchange.
*/
func (p *Provider) HandleQueryStream(stream rmnet.RetrievalQueryStream) {
ctx, cancel := context.WithTimeout(context.TODO(), queryTimeout)
defer cancel()

defer stream.Close()
query, err := stream.ReadQuery()
if err != nil {
return
}

ask := p.GetAsk()
sendResp := func(resp retrievalmarket.QueryResponse) {
if err := stream.WriteQueryResponse(resp); err != nil {
log.Errorf("Retrieval query: writing query response: %s", err)
}
}

answer := retrievalmarket.QueryResponse{
Status: retrievalmarket.QueryResponseUnavailable,
PieceCIDFound: retrievalmarket.QueryItemUnavailable,
MinPricePerByte: ask.PricePerByte,
MaxPaymentInterval: ask.PaymentInterval,
MaxPaymentIntervalIncrease: ask.PaymentIntervalIncrease,
UnsealPrice: ask.UnsealPrice,
Status: retrievalmarket.QueryResponseUnavailable,
PieceCIDFound: retrievalmarket.QueryItemUnavailable,
MinPricePerByte: big.Zero(),
UnsealPrice: big.Zero(),
}

ctx := context.TODO()

// get chain head to query actor states.
tok, _, err := p.node.GetChainHead(ctx)
if err != nil {
log.Errorf("Retrieval query: GetChainHead: %s", err)
return
}

// fetch the payment address the client should send the payment to.
paymentAddress, err := p.node.GetMinerWorkerAddress(ctx, p.minerAddress, tok)
if err != nil {
log.Errorf("Retrieval query: Lookup Payment Address: %s", err)
answer.Status = retrievalmarket.QueryResponseError
answer.Message = err.Error()
} else {
answer.PaymentAddress = paymentAddress

pieceCID := cid.Undef
if query.PieceCID != nil {
pieceCID = *query.PieceCID
answer.Message = fmt.Sprintf("failed to look up payment address: %s", err)
sendResp(answer)
return
}
answer.PaymentAddress = paymentAddress

// fetch the piece from which the payload will be retrieved.
// if user has specified the Piece in the request, we use that.
// Otherwise, we prefer a Piece which can retrieved from an unsealed sector.
pieceCID := cid.Undef
if query.PieceCID != nil {
pieceCID = *query.PieceCID
}
pieceInfo, isUnsealed, err := getPieceInfoFromCid(ctx, p.node, p.pieceStore, query.PayloadCID, pieceCID)
if err != nil {
log.Errorf("Retrieval query: getPieceInfoFromCid: %s", err)
if !xerrors.Is(err, retrievalmarket.ErrNotFound) {
answer.Status = retrievalmarket.QueryResponseError
answer.Message = fmt.Sprintf("failed to fetch piece to retrieve from: %s", err)
}
pieceInfo, err := getPieceInfoFromCid(p.pieceStore, query.PayloadCID, pieceCID)

if err == nil && len(pieceInfo.Deals) > 0 {
answer.Status = retrievalmarket.QueryResponseAvailable
// TODO: get price, look for already unsealed ref to reduce work
answer.Size = uint64(pieceInfo.Deals[0].Length) // TODO: verify on intermediate
answer.PieceCIDFound = retrievalmarket.QueryItemAvailable
}
sendResp(answer)
return
}

if err != nil && !xerrors.Is(err, retrievalmarket.ErrNotFound) {
log.Errorf("Retrieval query: GetRefs: %s", err)
answer.Status = retrievalmarket.QueryResponseError
answer.Message = err.Error()
}
answer.Status = retrievalmarket.QueryResponseAvailable
answer.Size = uint64(pieceInfo.Deals[0].Length.Unpadded()) // TODO: verify on intermediate
answer.PieceCIDFound = retrievalmarket.QueryItemAvailable

storageDeals, err := storageDealsForPiece(query.PieceCID != nil, query.PayloadCID, pieceInfo, p.pieceStore)
if err != nil {
log.Errorf("Retrieval query: storageDealsForPiece: %s", err)
answer.Status = retrievalmarket.QueryResponseError
answer.Message = fmt.Sprintf("failed to fetch storage deals containing payload: %s", err)
sendResp(answer)
return
}

input := retrievalmarket.PricingInput{
// piece from which the payload will be retrieved
// If user hasn't given a PieceCID, we try to choose an unsealed piece in the call to `getPieceInfoFromCid` above.
PieceCID: pieceInfo.PieceCID,

PayloadCID: query.PayloadCID,
Unsealed: isUnsealed,
Client: stream.RemotePeer(),
}
if err := stream.WriteQueryResponse(answer); err != nil {
log.Errorf("Retrieval query: WriteCborRPC: %s", err)
ask, err := p.GetDynamicAsk(ctx, input, storageDeals)
if err != nil {
log.Errorf("Retrieval query: GetAsk: %s", err)
answer.Status = retrievalmarket.QueryResponseError
answer.Message = fmt.Sprintf("failed to price deal: %s", err)
sendResp(answer)
return
}

answer.MinPricePerByte = ask.PricePerByte
answer.MaxPaymentInterval = ask.PaymentInterval
answer.MaxPaymentIntervalIncrease = ask.PaymentIntervalIncrease
answer.UnsealPrice = ask.UnsealPrice
sendResp(answer)
}

// GetDynamicAsk quotes a dynamic price for the retrieval deal by calling the user configured
// dynamic pricing function. It passes the static price parameters set in the Ask Store to the pricing function.
func (p *Provider) GetDynamicAsk(ctx context.Context, input retrievalmarket.PricingInput, storageDeals []abi.DealID) (retrievalmarket.Ask, error) {
dp, err := p.node.GetRetrievalPricingInput(ctx, input.PieceCID, storageDeals)
if err != nil {
return retrievalmarket.Ask{}, xerrors.Errorf("GetRetrievalPricingInput: %s", err)
}
// currAsk cannot be nil as we initialize the ask store with a default ask.
// Users can then change the values in the ask store using SetAsk but not remove it.
currAsk := p.GetAsk()
if currAsk == nil {
return retrievalmarket.Ask{}, xerrors.New("no ask configured in ask-store")
}

dp.PayloadCID = input.PayloadCID
dp.PieceCID = input.PieceCID
dp.Unsealed = input.Unsealed
dp.Client = input.Client
dp.CurrentAsk = *currAsk

ask, err := p.retrievalPricingFunc(ctx, dp)
if err != nil {
return retrievalmarket.Ask{}, xerrors.Errorf("retrievalPricingFunc: %w", err)
}
return ask, nil
}

// Configure reconfigures a provider after initialization
Expand All @@ -342,3 +424,22 @@ var ProviderFSMParameterSpec = fsm.Parameters{
Events: providerstates.ProviderEvents,
StateEntryFuncs: providerstates.ProviderStateEntryFuncs,
}

// DefaultPricingFunc is the default pricing policy that will be used to price retrieval deals.
var DefaultPricingFunc = func(VerifiedDealsFreeTransfer bool) func(ctx context.Context, pricingInput retrievalmarket.PricingInput) (retrievalmarket.Ask, error) {
return func(ctx context.Context, pricingInput retrievalmarket.PricingInput) (retrievalmarket.Ask, error) {
ask := pricingInput.CurrentAsk

// don't charge for Unsealing if we have an Unsealed copy.
if pricingInput.Unsealed {
ask.UnsealPrice = big.Zero()
}

// don't charge for data transfer for verified deals if it's been configured to do so.
if pricingInput.VerifiedDeal && VerifiedDealsFreeTransfer {
ask.PricePerByte = big.Zero()
}

return ask, nil
}
}
Loading

0 comments on commit 32e5cce

Please sign in to comment.