Skip to content

Commit

Permalink
feat: manual publish storage deal message (#1585)
Browse files Browse the repository at this point in the history
* external PSD control

* fix proposalCid, update UI and error

* fix config comment

* apply suggestions, modify test

* fix docsgen, clean up mutations

* fix resolver arg, return

* add test

* refactor

* apply suggestions

* init map in publisher
  • Loading branch information
LexLuthr authored Jul 24, 2023
1 parent b178d6f commit 53fdf99
Show file tree
Hide file tree
Showing 10 changed files with 217 additions and 15 deletions.
37 changes: 36 additions & 1 deletion gql/resolver_dealpublish.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ import (

"github.com/filecoin-project/boost/gql/types"
cborutil "github.com/filecoin-project/go-cbor-util"
"github.com/google/uuid"
"github.com/graph-gophers/graphql-go"
"github.com/ipfs/go-cid"
)

// basicDealResolver just has simple types (as opposed to dealResolver which
Expand All @@ -32,6 +34,7 @@ type basicDealResolver struct {
}

type dealPublishResolver struct {
ManualPSD bool
Start graphql.Time
Period int32
MaxDealsPerMsg int32
Expand Down Expand Up @@ -151,15 +154,47 @@ func (r *resolver) DealPublish(ctx context.Context) (*dealPublishResolver, error
}

return &dealPublishResolver{
ManualPSD: r.publisher.ManualPSD(),
Deals: basicDeals,
Period: int32(pending.PublishPeriod.Seconds()),
Start: graphql.Time{Time: pending.PublishPeriodStart},
MaxDealsPerMsg: int32(r.cfg.LotusDealmaking.MaxDealsPerPublishMsg),
}, nil
}

// mutation: dealPublishNow(): bool
func (r *resolver) DealPublishNow(ctx context.Context) (bool, error) {
r.publisher.ForcePublishPendingDeals()
return true, nil
}

// mutation: publishPendingDeals([ID!]!): [ID!]!
func (r *resolver) PublishPendingDeals(ctx context.Context, args struct{ IDs []graphql.ID }) ([]graphql.ID, error) {
var pcids []cid.Cid
uuidToPcid := make(map[cid.Cid]uuid.UUID)
var ret []graphql.ID

for _, id := range args.IDs {
dealId, err := toUuid(id)
if err != nil {
return nil, err
}
deal, err := r.dealsDB.ByID(ctx, dealId)
if err != nil {
return nil, fmt.Errorf("failed to get deal details from DB %s: %w", dealId.String(), err)
}
signedProp, err := cborutil.AsIpld(&deal.ClientDealProposal)
if err != nil {
return nil, fmt.Errorf("error in generating proposal cid for deal %s: %w", dealId.String(), err)
}
pcid := signedProp.Cid()
uuidToPcid[pcid] = dealId
pcids = append(pcids, pcid)
}

publishedCids := r.publisher.PublishQueuedDeals(pcids)
for _, c := range publishedCids {
ret = append(ret, graphql.ID(uuidToPcid[c].String()))
}

return ret, nil
}
4 changes: 4 additions & 0 deletions gql/schema.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,7 @@ type FundsLog {
}

type DealPublish {
ManualPSD: Boolean!
Period: Int!
Start: Time!
MaxDealsPerMsg: Int!
Expand Down Expand Up @@ -609,6 +610,9 @@ type RootMutation {

"""Update the Storage Ask (price of doing a storage deal)"""
storageAskUpdate(update: StorageAskUpdate!): Boolean!

"""Publish the deal for the supplied deal UUIDs"""
publishPendingDeals(ids: [ID!]!): [ID!]!
}

type RootSubscription {
Expand Down
81 changes: 67 additions & 14 deletions markets/storageadapter/dealpublisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"sync"
"time"

cborutil "github.com/filecoin-project/go-cbor-util"
"github.com/ipfs/go-cid"
"go.uber.org/fx"
"golang.org/x/xerrors"
Expand Down Expand Up @@ -53,12 +54,13 @@ type DealPublisher struct {
ctx context.Context
Shutdown context.CancelFunc

manualPSD bool
maxDealsPerPublishMsg uint64
publishPeriod time.Duration
publishSpec *api.MessageSendSpec

lk sync.Mutex
pending []*pendingDeal
pending map[cid.Cid]*pendingDeal
cancelWaitForMoreDeals context.CancelFunc
publishPeriodStart time.Time
startEpochSealingBuffer abi.ChainEpoch
Expand All @@ -77,12 +79,19 @@ type publishResult struct {
err error
}

func newPendingDeal(ctx context.Context, deal market.ClientDealProposal) *pendingDeal {
func newPendingDeal(ctx context.Context, deal market.ClientDealProposal) (*pendingDeal, cid.Cid, error) {

// Generate unique identifier for the deal
signedProp, err := cborutil.AsIpld(&deal)
if err != nil {
return nil, cid.Undef, fmt.Errorf("failed to compute signed deal proposal ipld node: %w", err)
}

return &pendingDeal{
ctx: ctx,
deal: deal,
Result: make(chan publishResult),
}
}, signedProp.Cid(), nil
}

type PublishMsgConfig struct {
Expand All @@ -94,6 +103,10 @@ type PublishMsgConfig struct {
MaxDealsPerMsg uint64
// Minimum start epoch buffer to give time for sealing of sector with deal
StartEpochSealingBuffer uint64
// When set to true, the user is responsible for publishing deals manually.
// The values of MaxDealsPerMsg and Period will be ignored, and deals will
// remain in the pending state until manually published.
ManualDealPublish bool
}

func NewDealPublisher(
Expand Down Expand Up @@ -133,6 +146,8 @@ func newDealPublisher(
publishPeriod: publishMsgCfg.Period,
startEpochSealingBuffer: abi.ChainEpoch(publishMsgCfg.StartEpochSealingBuffer),
publishSpec: publishSpec,
manualPSD: publishMsgCfg.ManualDealPublish,
pending: make(map[cid.Cid]*pendingDeal),
}
}

Expand Down Expand Up @@ -172,10 +187,13 @@ func (p *DealPublisher) ForcePublishPendingDeals() {
}

func (p *DealPublisher) Publish(ctx context.Context, deal market.ClientDealProposal) (cid.Cid, error) {
pdeal := newPendingDeal(ctx, deal)
pdeal, pcid, err := newPendingDeal(ctx, deal)
if err != nil {
return cid.Undef, fmt.Errorf("failed create pending deal: %w", err)
}

// Add the deal to the queue
p.processNewDeal(pdeal)
p.processNewDeal(pdeal, pcid)

// Wait for the deal to be submitted
select {
Expand All @@ -186,7 +204,7 @@ func (p *DealPublisher) Publish(ctx context.Context, deal market.ClientDealPropo
}
}

func (p *DealPublisher) processNewDeal(pdeal *pendingDeal) {
func (p *DealPublisher) processNewDeal(pdeal *pendingDeal, pcid cid.Cid) {
p.lk.Lock()
defer p.lk.Unlock()

Expand All @@ -205,10 +223,15 @@ func (p *DealPublisher) processNewDeal(pdeal *pendingDeal) {
}

// Add the new deal to the queue
p.pending = append(p.pending, pdeal)
p.pending[pcid] = pdeal
log.Infof("add deal with piece CID %s to publish deals queue - %d deals in queue (max queue size %d)",
pdeal.deal.Proposal.PieceCID, len(p.pending), p.maxDealsPerPublishMsg)

// Return from here if manual PSD is enabled
if p.manualPSD {
return
}

// If the maximum number of deals per message has been reached or we're not batching, send a
// publish message
if uint64(len(p.pending)) >= p.maxDealsPerPublishMsg || p.publishPeriod == 0 {
Expand Down Expand Up @@ -257,6 +280,7 @@ func (p *DealPublisher) waitForMoreDeals() {
}

func (p *DealPublisher) publishAllDeals() {

// If the timeout hasn't yet been cancelled, cancel it
if p.cancelWaitForMoreDeals != nil {
p.cancelWaitForMoreDeals()
Expand All @@ -266,10 +290,13 @@ func (p *DealPublisher) publishAllDeals() {

// Filter out any deals that have been cancelled
p.filterCancelledDeals()
deals := p.pending
p.pending = nil

// Send the publish message
var deals []*pendingDeal
for _, deal := range p.pending {
deals = append(deals, deal)
}
p.pending = make(map[cid.Cid]*pendingDeal)
go p.publishReady(deals)
}

Expand Down Expand Up @@ -434,12 +461,38 @@ func pieceCids(deals []market.ClientDealProposal) string {

// filter out deals that have been cancelled
func (p *DealPublisher) filterCancelledDeals() {
filtered := p.pending[:0]
for _, pd := range p.pending {
for c, pd := range p.pending {
if pd.ctx.Err() != nil {
continue
delete(p.pending, c)
}
filtered = append(filtered, pd)
}
p.pending = filtered
}

func (p *DealPublisher) PublishQueuedDeals(deals []cid.Cid) []cid.Cid {
p.lk.Lock()
defer p.lk.Unlock()
var ret []cid.Cid
var toPublish []*pendingDeal

p.filterCancelledDeals()

for _, c := range deals {
if p.pending[c] != nil {
toPublish = append(toPublish, p.pending[c])
ret = append(ret, c)
delete(p.pending, c)
} else {
log.Debugf("failed to find the proposal %s in pending deals", c)
}
}

log.Infof("publishing deal proposals: %s", ret)

// Send the publish message
go p.publishReady(toPublish)
return ret
}

func (p *DealPublisher) ManualPSD() bool {
return p.manualPSD
}
64 changes: 64 additions & 0 deletions markets/storageadapter/dealpublisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"testing"
"time"

cborutil "github.com/filecoin-project/go-cbor-util"
"github.com/ipfs/go-cid"
"github.com/raulk/clock"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -232,6 +233,69 @@ func TestForcePublish(t *testing.T) {
checkPublishedDeals(t, dpapi, dealsToPublish, []int{2})
}

func TestPublishPendingDeals(t *testing.T) {
dpapi := newDPAPI(t)

// Create a deal publisher
publishPeriod := time.Hour
dp := newDealPublisher(dpapi, nil, PublishMsgConfig{
Period: publishPeriod,
MaxDealsPerMsg: 10,
ManualDealPublish: true,
}, &api.MessageSendSpec{MaxFee: abi.NewTokenAmount(1)})

// Queue three deals for publishing, one with a cancelled context
// 1. Regular deal
publishDeal(t, dp, 0, false, false)
// 2. Deal with cancelled context
publishDeal(t, dp, 0, true, false)
// 3. Regular deal
publishDeal(t, dp, 0, false, false)
// 4. Regular deal
publishDeal(t, dp, 0, false, false)

// Allow a moment for them to be queued
build.Clock.Sleep(10 * time.Millisecond)

// Should be three deals in the pending deals list
// (deal with cancelled context is ignored)
pendingInfo := dp.PendingDeals()
require.Len(t, pendingInfo.Deals, 3)

var pcids []cid.Cid
props := pendingInfo.Deals
for _, p := range props {
signedProp, err := cborutil.AsIpld(&p)
require.NoError(t, err)
pcids = append(pcids, signedProp.Cid())
}

toPublish := pcids[1:]
pending := []cid.Cid{pcids[0]}

// Send an additional CID not present in publisher
c, err := cid.Decode("bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4")
require.NoError(t, err)

// Publish three pending deals and verify all deals whose context has not expired have been published
publishedDeals := dp.PublishQueuedDeals(append(toPublish, c))
require.Equal(t, toPublish, publishedDeals)

// Should be one remaining pending deal
pendingInfo1 := dp.PendingDeals()
var ppcids []cid.Cid
require.Len(t, pendingInfo1.Deals, 1)
for _, p := range pendingInfo1.Deals {
signedProp, err := cborutil.AsIpld(&p)
require.NoError(t, err)
ppcids = append(ppcids, signedProp.Cid())
}
require.Equal(t, pending, ppcids)

// Make sure the expected deals were published
checkPublishedDeals(t, dpapi, props[1:], []int{2})
}

func publishDeal(t *testing.T, dp *DealPublisher, invalid int, ctxCancelled bool, expired bool) markettypes.ClientDealProposal {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
Expand Down
1 change: 1 addition & 0 deletions node/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,7 @@ func ConfigBoost(cfg *config.Boost) Option {
Period: time.Duration(cfg.LotusDealmaking.PublishMsgPeriod),
MaxDealsPerMsg: cfg.LotusDealmaking.MaxDealsPerPublishMsg,
StartEpochSealingBuffer: cfg.LotusDealmaking.StartEpochSealingBuffer,
ManualDealPublish: cfg.Dealmaking.ManualDealPublish,
})),

Override(new(sealer.Unsealer), From(new(lotus_modules.MinerStorageService))),
Expand Down
1 change: 1 addition & 0 deletions node/config/def.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ func DefaultBoost() *Boost {
SealingPipelineCacheTimeout: Duration(30 * time.Second),
FundsTaggingEnabled: true,
EnableLegacyStorageDeals: false,
ManualDealPublish: false,
BitswapPublicAddresses: []string{},
},

Expand Down
8 changes: 8 additions & 0 deletions node/config/doc_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions node/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,11 @@ type DealmakingConfig struct {
// Whether to enable legacy deals on the Boost node or not. We recommend keeping
// them disabled. These will be completely deprecated soon.
EnableLegacyStorageDeals bool

// When set to true, the user is responsible for publishing deals manually.
// The values of MaxDealsPerPublishMsg and PublishMsgPeriod will be
// ignored, and deals will remain in the pending state until manually published.
ManualDealPublish bool
}

type ContractDealsConfig struct {
Expand Down
Loading

0 comments on commit 53fdf99

Please sign in to comment.