Skip to content

Commit

Permalink
fix(server): properly cancel graphsync requests (#1475)
Browse files Browse the repository at this point in the history
  • Loading branch information
hannahhoward authored May 30, 2023
1 parent d4acbed commit 592a701
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 0 deletions.
2 changes: 2 additions & 0 deletions retrievalmarket/server/channelstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"

"github.com/filecoin-project/boost-gfm/retrievalmarket"
graphsync "github.com/filecoin-project/boost-graphsync"
datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/ipfs/go-cid"
"github.com/ipld/go-ipld-prime"
Expand All @@ -22,6 +23,7 @@ type retrievalState struct {
retType RetrievalType
cs *channelState
mkts *retrievalmarket.ProviderDealState
gsReq graphsync.RequestID
}

func (r retrievalState) ChannelState() channelState { return *r.cs }
Expand Down
11 changes: 11 additions & 0 deletions retrievalmarket/server/gsunpaidretrieval.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,18 @@ func (g *GraphsyncUnpaidRetrieval) CancelTransfer(ctx context.Context, id datatr

rcpt := state.cs.recipient
tID := state.cs.transferID
gsRequestID := state.gsReq
g.activeRetrievalsLk.Unlock()

// tell GraphSync to cancel the request
if (gsRequestID != graphsync.RequestID{}) {
err := g.Cancel(ctx, gsRequestID)
if err != nil {
log.Info("unable to force close graphsync request %s: %s", tID, err)
}
}

// send a message on data transfer
err := g.dtnet.SendMessage(ctx, rcpt, message.CancelResponse(tID))
g.failTransfer(state, errors.New("transfer cancelled by provider"))

Expand Down Expand Up @@ -326,6 +336,7 @@ func (g *GraphsyncUnpaidRetrieval) handleRetrievalDeal(peerID peer.ID, msg datat
retType: retType,
cs: cs,
mkts: mktsState,
gsReq: request.ID(),
}

// Record the data transfer ID so that we can intercept future
Expand Down
4 changes: 4 additions & 0 deletions retrievalmarket/server/gsunpaidretrieval_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,10 @@ func runRequestTest(t *testing.T, tc testCase) {
} else {
require.NoError(t, err)
}

// final verification -- the server has no active graphsync requests
stats := gsupr.Stats()
require.Equal(t, stats.IncomingRequests.Active, uint64(0))
}

func createRetrievalProvider(ctx context.Context, t *testing.T, testData *tut.Libp2pTestData, pieceStore *tut.TestPieceStore, sectorAccessor *testnodes.TestSectorAccessor, dagstoreWrapper *tut.MockDagStoreWrapper, gs graphsync.GraphExchange, paymentAddress address.Address) retrievalmarket.RetrievalProvider {
Expand Down

0 comments on commit 592a701

Please sign in to comment.