Skip to content

Commit

Permalink
Fire a transfer queued event when a transfer is queued in Graphsync (#…
Browse files Browse the repository at this point in the history
…221)

* req queued event in GS

* fire a transfer queued event

* changes as per review and tests

* address nit

* update graphsync deps
  • Loading branch information
aarshkshah1992 authored Jun 9, 2021
1 parent 54e3630 commit 7794046
Show file tree
Hide file tree
Showing 11 changed files with 125 additions and 3 deletions.
4 changes: 4 additions & 0 deletions channels/channels.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,10 @@ func (c *Channels) Accept(chid datatransfer.ChannelID) error {
return c.send(chid, datatransfer.Accept)
}

func (c *Channels) TransferRequestQueued(chid datatransfer.ChannelID) error {
return c.send(chid, datatransfer.TransferRequestQueued)
}

// Restart marks a data transfer as restarted
func (c *Channels) Restart(chid datatransfer.ChannelID) error {
return c.send(chid, datatransfer.Restart)
Expand Down
7 changes: 7 additions & 0 deletions channels/channels_fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ var ChannelEvents = fsm.Events{
chst.AddLog("")
return nil
}),

fsm.Event(datatransfer.TransferRequestQueued).FromAny().ToJustRecord().Action(func(chst *internal.ChannelState) error {
chst.Message = ""
chst.AddLog("")
return nil
}),

fsm.Event(datatransfer.Restart).FromAny().ToJustRecord().Action(func(chst *internal.ChannelState) error {
chst.Message = ""
chst.AddLog("")
Expand Down
11 changes: 11 additions & 0 deletions channels/channels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,17 @@ func TestChannels(t *testing.T) {
require.True(t, xerrors.As(err, new(*channels.ErrNotFound)))
})

t.Run("transfer queued", func(t *testing.T) {
state, err := channelList.GetByID(ctx, datatransfer.ChannelID{Initiator: peers[0], Responder: peers[1], ID: tid1})
require.NoError(t, err)
require.Equal(t, state.Status(), datatransfer.Ongoing)

err = channelList.TransferRequestQueued(datatransfer.ChannelID{Initiator: peers[0], Responder: peers[1], ID: tid1})
require.NoError(t, err)
state = checkEvent(ctx, t, received, datatransfer.TransferRequestQueued)
require.Equal(t, state.Status(), datatransfer.Ongoing)
})

t.Run("updating send/receive values", func(t *testing.T) {
ds := dss.MutexWrap(datastore.NewMapDatastore())
dir := os.TempDir()
Expand Down
4 changes: 4 additions & 0 deletions events.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ const (
// ReceiveDataError indicates that the transport layer had an error
// receiving data from the remote peer
ReceiveDataError

// TransferRequestQueued indicates that a new data transfer request has been queued in the transport layer
TransferRequestQueued
)

// Events are human readable names for data transfer events
Expand Down Expand Up @@ -134,6 +137,7 @@ var Events = map[EventCode]string{
RequestTimedOut: "RequestTimedOut",
SendDataError: "SendDataError",
ReceiveDataError: "ReceiveDataError",
TransferRequestQueued: "TransferRequestQueued",
}

// Event is a struct containing information about a data transfer event
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ require (
github.com/ipfs/go-cid v0.0.7
github.com/ipfs/go-datastore v0.4.5
github.com/ipfs/go-ds-badger v0.2.3
github.com/ipfs/go-graphsync v0.6.1
github.com/ipfs/go-graphsync v0.6.4
github.com/ipfs/go-ipfs-blockstore v1.0.1
github.com/ipfs/go-ipfs-blocksutil v0.0.1
github.com/ipfs/go-ipfs-chunker v0.0.5
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,8 @@ github.com/ipfs/go-ds-badger v0.2.3/go.mod h1:pEYw0rgg3FIrywKKnL+Snr+w/LjJZVMTBR
github.com/ipfs/go-ds-leveldb v0.0.1/go.mod h1:feO8V3kubwsEF22n0YRQCffeb79OOYIykR4L04tMOYc=
github.com/ipfs/go-ds-leveldb v0.4.1/go.mod h1:jpbku/YqBSsBc1qgME8BkWS4AxzF2cEu1Ii2r79Hh9s=
github.com/ipfs/go-ds-leveldb v0.4.2/go.mod h1:jpbku/YqBSsBc1qgME8BkWS4AxzF2cEu1Ii2r79Hh9s=
github.com/ipfs/go-graphsync v0.6.1 h1:i9wN7YkBXWwIsUjVQeuaDxFB59yWZrG1xL564Nz7aGE=
github.com/ipfs/go-graphsync v0.6.1/go.mod h1:e2ZxnClqBBYAtd901g9vXMJzS47labjAtOzsWtOzKNk=
github.com/ipfs/go-graphsync v0.6.4 h1:g6wFRK2BkLPnx8nfoSdnokp5gtpuGyWZjbqI6q3NGb8=
github.com/ipfs/go-graphsync v0.6.4/go.mod h1:5WyaeigpNdpiYQuW2vwpuecOoEfB4h747ZGEOKmAGTg=
github.com/ipfs/go-ipfs-blockstore v0.0.1/go.mod h1:d3WClOmRQKFnJ0Jz/jj/zmksX0ma1gROTlovZKBmN08=
github.com/ipfs/go-ipfs-blockstore v0.1.0/go.mod h1:5aD0AvHPi7mZc6Ci1WCAhiBQu2IsfTduLl+422H6Rqw=
github.com/ipfs/go-ipfs-blockstore v0.1.4/go.mod h1:Jxm3XMVjh6R17WvxFEiyKBLUGr86HgIYJW/D/MwqeYQ=
Expand Down
4 changes: 4 additions & 0 deletions impl/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,10 @@ func (m *manager) OnRequestReceived(chid datatransfer.ChannelID, request datatra
return nil, nil
}

func (m *manager) OnTransferQueued(chid datatransfer.ChannelID) {
m.channels.TransferRequestQueued(chid)
}

func (m *manager) OnResponseReceived(chid datatransfer.ChannelID, response datatransfer.Response) error {
if response.IsCancel() {
log.Infof("channel %s: received cancel response, cancelling channel", chid)
Expand Down
9 changes: 9 additions & 0 deletions testutil/fakegraphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ type FakeGraphSync struct {
OutgoingRequestHook graphsync.OnOutgoingRequestHook
IncomingBlockHook graphsync.OnIncomingBlockHook
OutgoingBlockHook graphsync.OnOutgoingBlockHook
IncomingRequestQueuedHook graphsync.OnIncomingRequestQueuedHook
IncomingRequestHook graphsync.OnIncomingRequestHook
CompletedResponseListener graphsync.OnResponseCompletedListener
RequestUpdatedHook graphsync.OnRequestUpdatedHook
Expand Down Expand Up @@ -286,6 +287,14 @@ func (fgs *FakeGraphSync) RegisterIncomingRequestHook(hook graphsync.OnIncomingR
}
}

// RegisterIncomingRequestQueuedHook adds a hook that runs when an incoming GS request is queued.
func (fgs *FakeGraphSync) RegisterIncomingRequestQueuedHook(hook graphsync.OnIncomingRequestQueuedHook) graphsync.UnregisterHookFunc {
fgs.IncomingRequestQueuedHook = hook
return func() {
fgs.IncomingRequestQueuedHook = nil
}
}

// RegisterIncomingResponseHook adds a hook that runs when a response is received
func (fgs *FakeGraphSync) RegisterIncomingResponseHook(hook graphsync.OnIncomingResponseHook) graphsync.UnregisterHookFunc {
fgs.IncomingResponseHook = hook
Expand Down
3 changes: 3 additions & 0 deletions transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ type EventsHandler interface {
// OnDataSent is called when we send data for the given channel ID
OnDataSent(chid ChannelID, link ipld.Link, size uint64) error

// OnTransferQueued is called when a new data transfer request is queued in the transport layer.
OnTransferQueued(chid ChannelID)

// OnRequestReceived is called when we receive a new request to send data
// for the given channel ID
// return values are:
Expand Down
35 changes: 35 additions & 0 deletions transport/graphsync/graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,7 @@ func (t *Transport) SetEventHandler(events datatransfer.EventsHandler) error {
}
t.events = events

t.unregisterFuncs = append(t.unregisterFuncs, t.gs.RegisterIncomingRequestQueuedHook(t.gsReqQueuedHook))
t.unregisterFuncs = append(t.unregisterFuncs, t.gs.RegisterIncomingRequestHook(t.gsReqRecdHook))
t.unregisterFuncs = append(t.unregisterFuncs, t.gs.RegisterCompletedResponseListener(t.gsCompletedResponseListener))
t.unregisterFuncs = append(t.unregisterFuncs, t.gs.RegisterIncomingBlockHook(t.gsIncomingBlockHook))
Expand Down Expand Up @@ -448,6 +449,40 @@ func (t *Transport) gsOutgoingBlockHook(p peer.ID, request graphsync.RequestData
}
}

// gsReqQueuedHook is called when graphsync enqueues an incoming request for data
func (t *Transport) gsReqQueuedHook(p peer.ID, request graphsync.RequestData) {
msg, err := extension.GetTransferData(request, t.supportedExtensions)
if err != nil {
log.Errorf("failed GetTransferData, req=%+v, err=%s", request, err)
}
// extension not found; probably not our request.
if msg == nil {
return
}

var chid datatransfer.ChannelID
if msg.IsRequest() {
// when a data transfer request comes in on graphsync, the remote peer
// initiated a pull
chid = datatransfer.ChannelID{ID: msg.TransferID(), Initiator: p, Responder: t.peerID}
request := msg.(datatransfer.Request)
if request.IsNew() {
log.Infof("%s, pull request queued, req=%+v", chid, request)
t.events.OnTransferQueued(chid)
}
} else {
// when a data transfer response comes in on graphsync, this node
// initiated a push, and the remote peer responded with a request
// for data
chid = datatransfer.ChannelID{ID: msg.TransferID(), Initiator: t.peerID, Responder: p}
response := msg.(datatransfer.Response)
if response.IsNew() {
log.Infof("%s, GS pull request queued in response to our push, req=%+v", chid, request)
t.events.OnTransferQueued(chid)
}
}
}

// gsReqRecdHook is called when graphsync receives an incoming request for data
func (t *Transport) gsReqRecdHook(p peer.ID, request graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
// if this is a push request the sender is us.
Expand Down
45 changes: 45 additions & 0 deletions transport/graphsync/graphsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,7 @@ func TestManager(t *testing.T) {
require.NoError(t, gsData.outgoingBlockHookActions.TerminationError)
},
},

"incoming gs request with recognized dt response will record outgoing blocks": {
requestConfig: gsRequestConfig{
dtIsResponse: true,
Expand Down Expand Up @@ -485,6 +486,7 @@ func TestManager(t *testing.T) {
require.True(t, events.ChannelCompletedSuccess)
},
},

"recognized incoming request will record unsuccessful request completion": {
responseConfig: gsResponseConfig{
status: graphsync.RequestCompletedPartial,
Expand Down Expand Up @@ -586,6 +588,35 @@ func TestManager(t *testing.T) {
gsData.fgs.AssertNoPauseResponseReceived(t)
},
},

"incoming request can be queued": {
action: func(gsData *harness) {
gsData.incomingRequestQueuedHook()
},
check: func(t *testing.T, events *fakeEvents, gsData *harness) {
require.True(t, events.TransferQueuedCalled)
require.Equal(t, datatransfer.ChannelID{ID: gsData.transferID, Responder: gsData.self, Initiator: gsData.other},
events.TransferQueuedChannelID)
},
},

"incoming request with dtResponse can be queued": {
requestConfig: gsRequestConfig{
dtIsResponse: true,
},
responseConfig: gsResponseConfig{
dtIsResponse: true,
},
action: func(gsData *harness) {
gsData.incomingRequestQueuedHook()
},
check: func(t *testing.T, events *fakeEvents, gsData *harness) {
require.True(t, events.TransferQueuedCalled)
require.Equal(t, datatransfer.ChannelID{ID: gsData.transferID, Responder: gsData.other, Initiator: gsData.self},
events.TransferQueuedChannelID)
},
},

"recognized incoming request can be resumed": {
action: func(gsData *harness) {
gsData.incomingRequestHook()
Expand All @@ -600,6 +631,7 @@ func TestManager(t *testing.T) {
gsData.fgs.AssertResumeResponseReceived(gsData.ctx, t)
},
},

"unrecognized request cannot be resumed": {
check: func(t *testing.T, events *fakeEvents, gsData *harness) {
err := gsData.transport.ResumeChannel(gsData.ctx,
Expand Down Expand Up @@ -1018,6 +1050,9 @@ type fakeEvents struct {
OnReceiveDataErrorCalled bool
OnReceiveDataErrorChannelID datatransfer.ChannelID

TransferQueuedCalled bool
TransferQueuedChannelID datatransfer.ChannelID

ChannelCompletedSuccess bool
RequestReceivedRequest datatransfer.Request
RequestReceivedResponse datatransfer.Response
Expand All @@ -1037,6 +1072,11 @@ func (fe *fakeEvents) OnRequestTimedOut(chid datatransfer.ChannelID, err error)
return nil
}

func (fe *fakeEvents) OnTransferQueued(chid datatransfer.ChannelID) {
fe.TransferQueuedCalled = true
fe.TransferQueuedChannelID = chid
}

func (fe *fakeEvents) OnRequestDisconnected(chid datatransfer.ChannelID, err error) error {
return nil
}
Expand Down Expand Up @@ -1129,6 +1169,11 @@ func (ha *harness) outgoingBlockHook() {
func (ha *harness) incomingRequestHook() {
ha.fgs.IncomingRequestHook(ha.other, ha.request, ha.incomingRequestHookActions)
}

func (ha *harness) incomingRequestQueuedHook() {
ha.fgs.IncomingRequestQueuedHook(ha.other, ha.request)
}

func (ha *harness) requestUpdatedHook() {
ha.fgs.RequestUpdatedHook(ha.other, ha.request, ha.updatedRequest, ha.requestUpdatedHookActions)
}
Expand Down

0 comments on commit 7794046

Please sign in to comment.