Skip to content

Commit

Permalink
fix: update deal state
Browse files Browse the repository at this point in the history
  • Loading branch information
simlecode committed Aug 11, 2023
1 parent ffd7a18 commit 278f898
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 29 deletions.
9 changes: 4 additions & 5 deletions models/badger/retrieval_deal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,17 @@ import (
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
types "github.com/filecoin-project/venus/venus-shared/types/market"
"github.com/ipfs-force-community/droplet/v2/models/repo"
selectorparse "github.com/ipld/go-ipld-prime/traversal/selector/parse"
"github.com/libp2p/go-libp2p/core/peer"
cbg "github.com/whyrusleeping/cbor-gen"

"github.com/filecoin-project/venus/venus-shared/testutil"
"github.com/stretchr/testify/assert"
)

func init() {
testutil.MustRegisterDefaultValueProvier(func(t *testing.T) *cbg.Deferred {
return &cbg.Deferred{
Raw: make([]byte, 1),
}
testutil.MustRegisterDefaultValueProvier(func(t *testing.T) retrievalmarket.CborGenCompatibleNode {
allSelector := selectorparse.CommonSelector_ExploreAllRecursively
return retrievalmarket.CborGenCompatibleNode{Node: allSelector}
})
}

Expand Down
35 changes: 25 additions & 10 deletions retrievalprovider/datatransfer_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ type IDatatransferHandler interface {
HandleCompleteFor(context.Context, rm.ProviderDealIdentifier) error
HandleAcceptFor(context.Context, rm.ProviderDealIdentifier, datatransfer.ChannelID) error
HandleDisconnectFor(context.Context, rm.ProviderDealIdentifier, error) error
HandlePaymentRequested(context.Context, rm.ProviderDealIdentifier) error
HandleProcessPayment(context.Context, rm.ProviderDealIdentifier) error
HandleLastPayment(context.Context, rm.ProviderDealIdentifier) error
UpdateFunding(context.Context, rm.ProviderDealIdentifier) error

HandleCancelForDeal(context.Context, rm.ProviderDealIdentifier) error
HandleErrorForDeal(context.Context, rm.ProviderDealIdentifier, error) error
Expand Down Expand Up @@ -76,33 +77,47 @@ func (d *DataTransferHandler) HandleDisconnectFor(ctx context.Context, identifie
return d.retrievalDealHandler.Error(ctx, deal, errIn)
}

func (d *DataTransferHandler) HandleLastPayment(ctx context.Context, identifier rm.ProviderDealIdentifier) error {
func (d *DataTransferHandler) HandlePaymentRequested(ctx context.Context, identifier rm.ProviderDealIdentifier) error {
deal, err := d.retrievalDealStore.GetDeal(ctx, identifier.Receiver, identifier.DealID)
if err != nil {
return err
}
if deal.Status == rm.DealStatusUnsealed || deal.Status == rm.DealStatusOngoing {
deal.Status = rm.DealStatusFundsNeededLastPayment
if deal.Status == rm.DealStatusOngoing || deal.Status == rm.DealStatusUnsealed {
deal.Status = rm.DealStatusFundsNeeded
if err := d.retrievalDealStore.SaveDeal(ctx, deal); err != nil {
return err
}
return d.retrievalDealHandler.UpdateFunding(ctx, deal)
}
return nil
if deal.Status == rm.DealStatusNew {
deal.Status = rm.DealStatusFundsNeededUnseal
if err := d.retrievalDealStore.SaveDeal(ctx, deal); err != nil {
return err
}
}
return d.retrievalDealHandler.UpdateFunding(ctx, deal)
}

func (d *DataTransferHandler) UpdateFunding(ctx context.Context, identifier rm.ProviderDealIdentifier) error {
func (d *DataTransferHandler) HandleProcessPayment(ctx context.Context, identifier rm.ProviderDealIdentifier) error {
deal, err := d.retrievalDealStore.GetDeal(ctx, identifier.Receiver, identifier.DealID)
if err != nil {
return err
}
if deal.Status == rm.DealStatusOngoing || deal.Status == rm.DealStatusUnsealed {
deal.Status = rm.DealStatusFundsNeeded
return d.retrievalDealHandler.UpdateFunding(ctx, deal)
}

func (d *DataTransferHandler) HandleLastPayment(ctx context.Context, identifier rm.ProviderDealIdentifier) error {
deal, err := d.retrievalDealStore.GetDeal(ctx, identifier.Receiver, identifier.DealID)
if err != nil {
return err
}
if deal.Status == rm.DealStatusUnsealed || deal.Status == rm.DealStatusOngoing {
deal.Status = rm.DealStatusFundsNeededLastPayment
if err := d.retrievalDealStore.SaveDeal(ctx, deal); err != nil {
return err
}
return d.retrievalDealHandler.UpdateFunding(ctx, deal)
}
return d.retrievalDealHandler.UpdateFunding(ctx, deal)
return nil
}

func (d *DataTransferHandler) HandleCancelForDeal(ctx context.Context, identifier rm.ProviderDealIdentifier) error {
Expand Down
14 changes: 8 additions & 6 deletions retrievalprovider/provider_datatransfer_sub.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,17 @@ func ProviderDataTransferSubscriber(deals IDatatransferHandler) datatransfer.Sub
return
}

mlog := log.With("event", datatransfer.Events[event.Code], "dealID", dealProposal.ID, "peer", channelState.OtherPeer())

identify := rm.ProviderDealIdentifier{DealID: dealProposal.ID, Receiver: channelState.Recipient()}
if channelState.Status() == datatransfer.Completed {
mlog.Debugf("deal completed")
err := deals.HandleCompleteFor(ctx, identify)
if err != nil {
log.Errorf("processing dt event: %s", err)
}
}

mlog := log.With("event", datatransfer.Events[event.Code], "dealID", dealProposal.ID, "peer", channelState.OtherPeer())
switch event.Code {
case datatransfer.Accept:
mlog.With("retrievalEvent", rm.ProviderEventDealAccepted)
Expand All @@ -58,9 +60,9 @@ func ProviderDataTransferSubscriber(deals IDatatransferHandler) datatransfer.Sub
case datatransfer.DataLimitExceeded:
// DataLimitExceeded indicates it's time to wait for a payment
mlog.With("retrievalEvent", rm.ProviderEventPaymentRequested)
err := deals.UpdateFunding(ctx, identify)
err := deals.HandlePaymentRequested(ctx, identify)
if err != nil {
log.Errorf("processing dt event %v: %s", datatransfer.DataLimitExceeded.String(), err)
log.Errorf("processing dt event: %s", err)
}
case datatransfer.BeginFinalizing:
// BeginFinalizing indicates it's time to wait for a final payment
Expand All @@ -70,14 +72,14 @@ func ProviderDataTransferSubscriber(deals IDatatransferHandler) datatransfer.Sub
mlog.With("retrievalEvent", rm.ProviderEventLastPaymentRequested)
err := deals.HandleLastPayment(ctx, identify)
if err != nil {
log.Errorf("processing dt event %v: %s", datatransfer.BeginFinalizing.String(), err)
log.Errorf("processing dt event: %s", err)
}
case datatransfer.NewVoucher:
// NewVoucher indicates a potential new payment we should attempt to process
mlog.With("retrievalEvent", rm.ProviderEventProcessPayment)
err := deals.UpdateFunding(ctx, identify)
err := deals.HandleProcessPayment(ctx, identify)
if err != nil {
log.Errorf("processing dt event %v: %s", datatransfer.NewVoucher.String(), err)
log.Errorf("processing dt event: %s", err)
}
case datatransfer.Cancel:
mlog.With("retrievalEvent", rm.ProviderEventClientCancelled)
Expand Down
35 changes: 27 additions & 8 deletions retrievalprovider/retrieval_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ func (p *RetrievalDealHandler) updateFunding(ctx context.Context,
// process payment, determining how many more funds we have then the current deal.FundsReceived
received, err := p.processLastVoucher(ctx, channelState, deal)
if err != nil {
_ = p.CancelDeal(ctx, deal)
return errorDealResponse(deal.Identifier(), err)
}

Expand All @@ -247,9 +248,6 @@ func (p *RetrievalDealHandler) updateFunding(ctx context.Context,
if received.GreaterThan(big.Zero()) {
log.Debugf("provider: owed %d: sending partial payment request", owed)
deal.FundsReceived = big.Add(deal.FundsReceived, received)
if err := p.retrievalDealStore.SaveDeal(ctx, deal); err != nil {
log.Errorf("save deal FundsReceived failed: %v", err)
}
}
// sending this response voucher is primarily to cover for current client logic --
// our client expects a voucher requesting payment before it sends anything
Expand All @@ -264,8 +262,11 @@ func (p *RetrievalDealHandler) updateFunding(ctx context.Context,
} else {
// send an event to record payment received
deal.FundsReceived = big.Add(deal.FundsReceived, received)
if err := p.retrievalDealStore.SaveDeal(ctx, deal); err != nil {
log.Errorf("save deal FundsReceived failed: %v", err)
if deal.Status == rm.DealStatusFundsNeeded {
deal.Status = rm.DealStatusOngoing
}
if deal.Status == rm.DealStatusFundsNeededUnseal {
deal.Status = rm.DealStatusUnsealing
}
if deal.Status == rm.DealStatusFundsNeededLastPayment {
log.Debugf("provider: funds needed: last payment")
Expand All @@ -276,9 +277,16 @@ func (p *RetrievalDealHandler) updateFunding(ctx context.Context,
ID: deal.ID,
Status: rm.DealStatusCompleted,
}
deal.Status = rm.DealStatusFinalizing
}
}

if err := p.retrievalDealStore.SaveDeal(ctx, deal); err != nil {
log.Infof("save retrieval deal %d status failed: %v", deal.ID, err)
_ = p.CancelDeal(ctx, deal)
return errorDealResponse(deal.Identifier(), err)
}

vr := datatransfer.ValidationResult{
Accepted: true,
ForcePause: deal.Status == rm.DealStatusUnsealing || deal.Status == rm.DealStatusFundsNeededUnseal,
Expand All @@ -293,16 +301,27 @@ func (p *RetrievalDealHandler) updateFunding(ctx context.Context,
}

func (p *RetrievalDealHandler) savePayment(ctx context.Context, payment *rm.DealPayment, deal *mktypes.ProviderDealState) (abi.TokenAmount, error) {
updateDeal := func(err error) {
if deal.Status == rm.DealStatusFundsNeededUnseal || deal.Status == rm.DealStatusFundsNeeded || deal.Status == rm.DealStatusFundsNeededLastPayment {
deal.Status = rm.DealStatusFailing
}
deal.Message = err.Error()
if err := p.retrievalDealStore.SaveDeal(ctx, deal); err != nil {
log.Infof("save retrieval deal %d status failed: %v", deal.ID, err)
}
}

tok, _, err := p.env.GetChainHead(context.TODO())
if err != nil {
_ = p.CancelDeal(ctx, deal)
updateDeal(err)
return big.Zero(), err
}
// Save voucher
received, err := p.env.SavePaymentVoucher(context.TODO(), payment.PaymentChannel, payment.PaymentVoucher, nil, big.Zero(), tok)
if err != nil {
_ = p.CancelDeal(ctx, deal)
return big.Zero(), fmt.Errorf("save payment voucher failed: %v", err)
err = fmt.Errorf("save payment voucher failed: %v", err)
updateDeal(err)
return big.Zero(), err
}
return received, nil
}
Expand Down

0 comments on commit 278f898

Please sign in to comment.