Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: manual publish storage deal message #1585

Merged
merged 12 commits into from
Jul 24, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 36 additions & 1 deletion gql/resolver_dealpublish.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ import (

"github.com/filecoin-project/boost/gql/types"
cborutil "github.com/filecoin-project/go-cbor-util"
"github.com/google/uuid"
"github.com/graph-gophers/graphql-go"
"github.com/ipfs/go-cid"
)

// basicDealResolver just has simple types (as opposed to dealResolver which
Expand All @@ -32,6 +34,7 @@ type basicDealResolver struct {
}

type dealPublishResolver struct {
ManualPSD bool
Start graphql.Time
Period int32
MaxDealsPerMsg int32
Expand Down Expand Up @@ -151,15 +154,47 @@ func (r *resolver) DealPublish(ctx context.Context) (*dealPublishResolver, error
}

return &dealPublishResolver{
ManualPSD: r.publisher.ManualPSD(),
Deals: basicDeals,
Period: int32(pending.PublishPeriod.Seconds()),
Start: graphql.Time{Time: pending.PublishPeriodStart},
MaxDealsPerMsg: int32(r.cfg.LotusDealmaking.MaxDealsPerPublishMsg),
}, nil
}

// mutation: dealPublishNow(): bool
func (r *resolver) DealPublishNow(ctx context.Context) (bool, error) {
r.publisher.ForcePublishPendingDeals()
return true, nil
}

// mutation: publishPendingDeals([ID!]!): [ID!]!
func (r *resolver) PublishPendingDeals(ctx context.Context, args struct{ IDs []graphql.ID }) ([]graphql.ID, error) {
var pcids []cid.Cid
uuidToPcid := make(map[cid.Cid]uuid.UUID)
var ret []graphql.ID

for _, id := range args.IDs {
dealId, err := toUuid(id)
if err != nil {
return nil, err
}
deal, err := r.dealsDB.ByID(ctx, dealId)
if err != nil {
return nil, fmt.Errorf("failed to get deal details from DB %s: %w", dealId.String(), err)
}
signedProp, err := cborutil.AsIpld(&deal.ClientDealProposal)
if err != nil {
return nil, fmt.Errorf("error in generating proposal cid for deal %s: %w", dealId.String(), err)
}
pcid := signedProp.Cid()
uuidToPcid[pcid] = dealId
pcids = append(pcids, pcid)
}

publishedCids := r.publisher.PublishQueuedDeals(pcids)
for _, c := range publishedCids {
ret = append(ret, graphql.ID(uuidToPcid[c].String()))
}

return ret, nil
}
4 changes: 4 additions & 0 deletions gql/schema.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,7 @@ type FundsLog {
}

type DealPublish {
ManualPSD: Boolean!
Period: Int!
Start: Time!
MaxDealsPerMsg: Int!
Expand Down Expand Up @@ -609,6 +610,9 @@ type RootMutation {

"""Update the Storage Ask (price of doing a storage deal)"""
storageAskUpdate(update: StorageAskUpdate!): Boolean!

"""Publish the deal for the supplied deal UUIDs"""
publishPendingDeals(ids: [ID!]!): [ID!]!
}

type RootSubscription {
Expand Down
102 changes: 93 additions & 9 deletions markets/storageadapter/dealpublisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"sync"
"time"

cborutil "github.com/filecoin-project/go-cbor-util"
"github.com/ipfs/go-cid"
"go.uber.org/fx"
"golang.org/x/xerrors"
Expand Down Expand Up @@ -53,6 +54,7 @@ type DealPublisher struct {
ctx context.Context
Shutdown context.CancelFunc

manualPSD bool
maxDealsPerPublishMsg uint64
publishPeriod time.Duration
publishSpec *api.MessageSendSpec
Expand All @@ -66,9 +68,10 @@ type DealPublisher struct {

// A deal that is queued to be published
type pendingDeal struct {
ctx context.Context
deal market.ClientDealProposal
Result chan publishResult
ctx context.Context
deal market.ClientDealProposal
Result chan publishResult
proposalCid cid.Cid
}

// The result of publishing a deal
Expand All @@ -77,12 +80,20 @@ type publishResult struct {
err error
}

func newPendingDeal(ctx context.Context, deal market.ClientDealProposal) *pendingDeal {
return &pendingDeal{
ctx: ctx,
deal: deal,
Result: make(chan publishResult),
func newPendingDeal(ctx context.Context, deal market.ClientDealProposal) (*pendingDeal, error) {

// Generate unique identifier for the deal
signedProp, err := cborutil.AsIpld(&deal)
if err != nil {
return nil, fmt.Errorf("failed to compute signed deal proposal ipld node: %w", err)
}

return &pendingDeal{
ctx: ctx,
deal: deal,
Result: make(chan publishResult),
proposalCid: signedProp.Cid(),
}, nil
}

type PublishMsgConfig struct {
Expand All @@ -94,6 +105,9 @@ type PublishMsgConfig struct {
MaxDealsPerMsg uint64
// Minimum start epoch buffer to give time for sealing of sector with deal
StartEpochSealingBuffer uint64
// Enable ExternalDealPublishControl to allow users to choose when to send Publish messages
// for each deal
ExternalDealPublishControl bool
LexLuthr marked this conversation as resolved.
Show resolved Hide resolved
}

func NewDealPublisher(
Expand Down Expand Up @@ -133,6 +147,7 @@ func newDealPublisher(
publishPeriod: publishMsgCfg.Period,
startEpochSealingBuffer: abi.ChainEpoch(publishMsgCfg.StartEpochSealingBuffer),
publishSpec: publishSpec,
manualPSD: publishMsgCfg.ExternalDealPublishControl,
}
}

Expand Down Expand Up @@ -172,7 +187,10 @@ func (p *DealPublisher) ForcePublishPendingDeals() {
}

func (p *DealPublisher) Publish(ctx context.Context, deal market.ClientDealProposal) (cid.Cid, error) {
pdeal := newPendingDeal(ctx, deal)
pdeal, err := newPendingDeal(ctx, deal)
if err != nil {
return cid.Undef, fmt.Errorf("failed create pending deal: %w", err)
}

// Add the deal to the queue
p.processNewDeal(pdeal)
Expand Down Expand Up @@ -209,6 +227,11 @@ func (p *DealPublisher) processNewDeal(pdeal *pendingDeal) {
log.Infof("add deal with piece CID %s to publish deals queue - %d deals in queue (max queue size %d)",
pdeal.deal.Proposal.PieceCID, len(p.pending), p.maxDealsPerPublishMsg)

// Return from here if manual PSD is enabled
if p.manualPSD {
return
}

// If the maximum number of deals per message has been reached or we're not batching, send a
// publish message
if uint64(len(p.pending)) >= p.maxDealsPerPublishMsg || p.publishPeriod == 0 {
Expand Down Expand Up @@ -257,6 +280,7 @@ func (p *DealPublisher) waitForMoreDeals() {
}

func (p *DealPublisher) publishAllDeals() {

// If the timeout hasn't yet been cancelled, cancel it
if p.cancelWaitForMoreDeals != nil {
p.cancelWaitForMoreDeals()
Expand Down Expand Up @@ -443,3 +467,63 @@ func (p *DealPublisher) filterCancelledDeals() {
}
p.pending = filtered
}

func (p *DealPublisher) PublishQueuedDeals(deals []cid.Cid) []cid.Cid {
p.lk.Lock()
var ret []cid.Cid
var toPublish []*pendingDeal

// Check that each deal is part of the queue
for _, c := range deals {
found := false
for _, pd := range p.pending {
if c.Equals(pd.proposalCid) {
found = true
toPublish = append(toPublish, pd)
break
}
}
if found {
ret = append(ret, c)
} else {
log.Debugf("failed to find the proposal %S in pending deals", c)
LexLuthr marked this conversation as resolved.
Show resolved Hide resolved
}
}

LexLuthr marked this conversation as resolved.
Show resolved Hide resolved
p.lk.Unlock()

log.Infof("publishing deal proposals: %s", ret)

// Remove the deal from Pending
p.cleanupPending(deals)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest we clean up pending deals under the lock also, so that the whole operation is atomic.
Otherwise two threads could interfere with each other.
Maybe you can even refactor so that it splits the list into the items to be published and the items that will remain (you're doing this in separate for loops at the moment, probably you can do both in the same for loop).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have refactored the code a little. Unique identifier don't go well with slice.
The new code looks more clean. It resulted in some minor changes in other functions as well.


// Send the publish message
go p.publishReady(toPublish)
return ret
}

func (p *DealPublisher) cleanupPending(dealIds []cid.Cid) {
p.lk.Lock()
defer p.lk.Unlock()

var newPending []*pendingDeal

for _, dp := range p.pending {
found := false
for _, id := range dealIds {
if id == dp.proposalCid {
found = true
}
}
if !found {
newPending = append(newPending, dp)
}
}

p.pending = newPending

}

func (p *DealPublisher) ManualPSD() bool {
return p.manualPSD
}
56 changes: 56 additions & 0 deletions markets/storageadapter/dealpublisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"testing"
"time"

cborutil "github.com/filecoin-project/go-cbor-util"
"github.com/ipfs/go-cid"
"github.com/raulk/clock"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -232,6 +233,61 @@ func TestForcePublish(t *testing.T) {
checkPublishedDeals(t, dpapi, dealsToPublish, []int{2})
}

func TestPublishPendingDeals(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also add a test that only publishes some of the deals instead of all of them.
eg there are three pending deals, only publish two of them and verify that there is one remaining pending deal.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have refactored the test to send 4 deals. 1 with ctx.Error and other 3 normal. It will try to Publish 3 deals total (2 correct ones and 1 random CID which is not in Publisher). Test will result in 2 deal Publish, 1 debug log with error about random CID (not found in publisher) and returning 1 pending deal.

//stm: @MARKET_DEAL_PUBLISHER_PUBLISH_001, @MARKET_DEAL_PUBLISHER_GET_PENDING_DEALS_001
//stm: @MARKET_DEAL_PUBLISHER_FORCE_PUBLISH_ALL_001
dpapi := newDPAPI(t)

// Create a deal publisher
start := build.Clock.Now()
publishPeriod := time.Hour
dp := newDealPublisher(dpapi, nil, PublishMsgConfig{
Period: publishPeriod,
MaxDealsPerMsg: 10,
}, &api.MessageSendSpec{MaxFee: abi.NewTokenAmount(1)})

// Queue three deals for publishing, one with a cancelled context
var dealsToPublish []markettypes.ClientDealProposal
// 1. Regular deal
deal := publishDeal(t, dp, 0, false, false)
dealsToPublish = append(dealsToPublish, deal)
// 2. Deal with cancelled context
publishDeal(t, dp, 0, true, false)
// 3. Regular deal
deal = publishDeal(t, dp, 0, false, false)
dealsToPublish = append(dealsToPublish, deal)

// Allow a moment for them to be queued
build.Clock.Sleep(10 * time.Millisecond)

// Should be two deals in the pending deals list
LexLuthr marked this conversation as resolved.
Show resolved Hide resolved
// (deal with cancelled context is ignored)
pendingInfo := dp.PendingDeals()
require.Len(t, pendingInfo.Deals, 2)
require.Equal(t, publishPeriod, pendingInfo.PublishPeriod)
require.True(t, pendingInfo.PublishPeriodStart.After(start))
require.True(t, pendingInfo.PublishPeriodStart.Before(build.Clock.Now()))

var pcids []cid.Cid
props := pendingInfo.Deals
for _, p := range props {
signedProp, err := cborutil.AsIpld(&p)
require.NoError(t, err)
pcids = append(pcids, signedProp.Cid())
}

// Publish all pending deals and verify all have been published
LexLuthr marked this conversation as resolved.
Show resolved Hide resolved
publishedDeals := dp.PublishQueuedDeals(pcids)
require.Equal(t, pcids, publishedDeals)

// Should be no pending deals
LexLuthr marked this conversation as resolved.
Show resolved Hide resolved
pendingInfo = dp.PendingDeals()
require.Len(t, pendingInfo.Deals, 0)

// Make sure the expected deals were published
checkPublishedDeals(t, dpapi, dealsToPublish, []int{2})
}

func publishDeal(t *testing.T, dp *DealPublisher, invalid int, ctxCancelled bool, expired bool) markettypes.ClientDealProposal {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
Expand Down
7 changes: 4 additions & 3 deletions node/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -626,9 +626,10 @@ func ConfigBoost(cfg *config.Boost) Option {
),

Override(new(*storageadapter.DealPublisher), storageadapter.NewDealPublisher(&legacyFees, storageadapter.PublishMsgConfig{
Period: time.Duration(cfg.LotusDealmaking.PublishMsgPeriod),
MaxDealsPerMsg: cfg.LotusDealmaking.MaxDealsPerPublishMsg,
StartEpochSealingBuffer: cfg.LotusDealmaking.StartEpochSealingBuffer,
Period: time.Duration(cfg.LotusDealmaking.PublishMsgPeriod),
MaxDealsPerMsg: cfg.LotusDealmaking.MaxDealsPerPublishMsg,
StartEpochSealingBuffer: cfg.LotusDealmaking.StartEpochSealingBuffer,
ExternalDealPublishControl: cfg.Dealmaking.ManualDealPublish,
})),

Override(new(sealer.Unsealer), From(new(lotus_modules.MinerStorageService))),
Expand Down
1 change: 1 addition & 0 deletions node/config/def.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ func DefaultBoost() *Boost {
SealingPipelineCacheTimeout: Duration(30 * time.Second),
FundsTaggingEnabled: true,
EnableLegacyStorageDeals: false,
ManualDealPublish: false,
},

LotusDealmaking: lotus_config.DealmakingConfig{
Expand Down
8 changes: 8 additions & 0 deletions node/config/doc_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions node/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,11 @@ type DealmakingConfig struct {
// Whether to enable legacy deals on the Boost node or not. We recommend keeping
// them disabled. These will be completely deprecated soon.
EnableLegacyStorageDeals bool

// When set to true, the user is responsible for publishing deals manually.
// The values of MaxDealsPerPublishMsg and PublishMsgPeriod will be
// ignored, and deals will remain in the pending state until manually published.
ManualDealPublish bool
}

type ContractDealsConfig struct {
Expand Down
Loading