Skip to content

Commit

Permalink
Channel monitor watches for errors instead of measuring data rate (#190)
Browse files Browse the repository at this point in the history
* feat: channel monitor watches for errors instead of measuring data rate

* refactor: better method name

* feat: add debounce to channel monitor

* fix: restart channel size
  • Loading branch information
dirkmc authored Apr 16, 2021
1 parent dbc57eb commit f14cc40
Show file tree
Hide file tree
Showing 16 changed files with 611 additions and 776 deletions.
501 changes: 202 additions & 299 deletions channelmonitor/channelmonitor.go

Large diffs are not rendered by default.

725 changes: 267 additions & 458 deletions channelmonitor/channelmonitor_test.go

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions channels/channels.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,12 @@ func (c *Channels) SendDataError(chid datatransfer.ChannelID, err error) error {
return c.send(chid, datatransfer.SendDataError, err)
}

// ReceiveDataError indicates that the transport layer had an error receiving
// data from the remote peer
func (c *Channels) ReceiveDataError(chid datatransfer.ChannelID, err error) error {
return c.send(chid, datatransfer.ReceiveDataError, err)
}

// HasChannel returns true if the given channel id is being tracked
func (c *Channels) HasChannel(chid datatransfer.ChannelID) (bool, error) {
return c.stateMachines.Has(chid)
Expand Down
5 changes: 5 additions & 0 deletions channels/channels_fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@ var ChannelEvents = fsm.Events{
chst.AddLog("data transfer send error: %s", chst.Message)
return nil
}),
fsm.Event(datatransfer.ReceiveDataError).FromAny().ToNoChange().Action(func(chst *internal.ChannelState, err error) error {
chst.Message = err.Error()
chst.AddLog("data transfer receive error: %s", chst.Message)
return nil
}),
fsm.Event(datatransfer.RequestTimedOut).FromAny().ToNoChange().Action(func(chst *internal.ChannelState, err error) error {
chst.Message = err.Error()
chst.AddLog("data transfer request timed out: %s", chst.Message)
Expand Down
7 changes: 7 additions & 0 deletions events.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ const (
// SendDataError indicates that the transport layer had an error trying
// to send data to the remote peer
SendDataError

// ReceiveDataError indicates that the transport layer had an error
// receiving data from the remote peer
ReceiveDataError
)

// Events are human readable names for data transfer events
Expand Down Expand Up @@ -127,6 +131,9 @@ var Events = map[EventCode]string{
DataQueuedProgress: "DataQueuedProgress",
DataSentProgress: "DataSentProgress",
DataReceivedProgress: "DataReceivedProgress",
RequestTimedOut: "RequestTimedOut",
SendDataError: "SendDataError",
ReceiveDataError: "ReceiveDataError",
}

// Event is a struct containing information about a data transfer event
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/filecoin-project/go-data-transfer
go 1.14

require (
github.com/bep/debounce v1.2.0
github.com/filecoin-project/go-ds-versioning v0.1.0
github.com/filecoin-project/go-statemachine v0.0.0-20200925024713-05bd7c71fbfe
github.com/hannahhoward/cbor-gen-for v0.0.0-20200817222906-ea96cece81f1
Expand All @@ -12,7 +13,7 @@ require (
github.com/ipfs/go-cid v0.0.7
github.com/ipfs/go-datastore v0.4.5
github.com/ipfs/go-ds-badger v0.2.3
github.com/ipfs/go-graphsync v0.6.0
github.com/ipfs/go-graphsync v0.6.1
github.com/ipfs/go-ipfs-blockstore v1.0.1
github.com/ipfs/go-ipfs-blocksutil v0.0.1
github.com/ipfs/go-ipfs-chunker v0.0.5
Expand Down
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ github.com/Stebalien/go-bitfield v0.0.1 h1:X3kbSSPUaJK60wV2hjOPZwmpljr6VGCqdq4cB
github.com/Stebalien/go-bitfield v0.0.1/go.mod h1:GNjFpasyUVkHMsfEOk8EFLJ9syQ6SI+XWrX9Wf2XH0s=
github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/bep/debounce v1.2.0 h1:wXds8Kq8qRfwAOpAxHrJDbCXgC5aHSzgQb/0gKsHQqo=
github.com/bep/debounce v1.2.0/go.mod h1:H8yggRPQKLUhUoqrJC1bO2xNya7vanpDl7xR3ISbCJ0=
github.com/btcsuite/btcd v0.0.0-20190213025234-306aecffea32/go.mod h1:DrZx5ec/dmnfpw9KyYoQyYo7d0KEvTkk/5M/vbZjAr8=
github.com/btcsuite/btcd v0.0.0-20190523000118-16327141da8c/go.mod h1:3J08xEfcugPacsc34/LKRU2yO7YmuT8yt28J8k2+rrI=
github.com/btcsuite/btcd v0.0.0-20190605094302-a0d1e3e36d50/go.mod h1:3J08xEfcugPacsc34/LKRU2yO7YmuT8yt28J8k2+rrI=
Expand Down Expand Up @@ -213,8 +215,8 @@ github.com/ipfs/go-ds-badger v0.2.3/go.mod h1:pEYw0rgg3FIrywKKnL+Snr+w/LjJZVMTBR
github.com/ipfs/go-ds-leveldb v0.0.1/go.mod h1:feO8V3kubwsEF22n0YRQCffeb79OOYIykR4L04tMOYc=
github.com/ipfs/go-ds-leveldb v0.4.1/go.mod h1:jpbku/YqBSsBc1qgME8BkWS4AxzF2cEu1Ii2r79Hh9s=
github.com/ipfs/go-ds-leveldb v0.4.2/go.mod h1:jpbku/YqBSsBc1qgME8BkWS4AxzF2cEu1Ii2r79Hh9s=
github.com/ipfs/go-graphsync v0.6.0 h1:x6UvDUGA7wjaKNqx5Vbo7FGT8aJ5ryYA0dMQ5jN3dF0=
github.com/ipfs/go-graphsync v0.6.0/go.mod h1:e2ZxnClqBBYAtd901g9vXMJzS47labjAtOzsWtOzKNk=
github.com/ipfs/go-graphsync v0.6.1 h1:i9wN7YkBXWwIsUjVQeuaDxFB59yWZrG1xL564Nz7aGE=
github.com/ipfs/go-graphsync v0.6.1/go.mod h1:e2ZxnClqBBYAtd901g9vXMJzS47labjAtOzsWtOzKNk=
github.com/ipfs/go-ipfs-blockstore v0.0.1/go.mod h1:d3WClOmRQKFnJ0Jz/jj/zmksX0ma1gROTlovZKBmN08=
github.com/ipfs/go-ipfs-blockstore v0.1.0/go.mod h1:5aD0AvHPi7mZc6Ci1WCAhiBQu2IsfTduLl+422H6Rqw=
github.com/ipfs/go-ipfs-blockstore v0.1.4/go.mod h1:Jxm3XMVjh6R17WvxFEiyKBLUGr86HgIYJW/D/MwqeYQ=
Expand Down
5 changes: 5 additions & 0 deletions impl/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,11 @@ func (m *manager) OnSendDataError(chid datatransfer.ChannelID, err error) error
return m.channels.SendDataError(chid, err)
}

func (m *manager) OnReceiveDataError(chid datatransfer.ChannelID, err error) error {
log.Warnf("channel %+v had transport receive error: %s", chid, err)
return m.channels.ReceiveDataError(chid, err)
}

// OnChannelCompleted is called
// - by the requester when all data for a transfer has been received
// - by the responder when all data for a transfer has been sent
Expand Down
13 changes: 11 additions & 2 deletions impl/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,9 @@ func NewDataTransfer(ds datastore.Batching, cidListsDir string, dataTransferNetw
option(m)
}

// Start push / pull channel monitor after applying config options as the config
// Create push / pull channel monitor after applying config options as the config
// options may apply to the monitor
m.channelMonitor = channelmonitor.NewMonitor(m, m.channelMonitorCfg)
m.channelMonitor.Start()

return m, nil
}
Expand Down Expand Up @@ -320,6 +319,12 @@ func (m *manager) CloseDataTransferChannel(ctx context.Context, chid datatransfe
return nil
}

// ConnectTo opens a connection to a peer on the data-transfer protocol,
// retrying if necessary
func (m *manager) ConnectTo(ctx context.Context, p peer.ID) error {
return m.dataTransferNetwork.ConnectWithRetry(ctx, p)
}

// close an open channel and fire an error event
func (m *manager) CloseDataTransferChannelWithError(ctx context.Context, chid datatransfer.ChannelID, cherr error) error {
log.Infof("close channel %s with error %s", chid, cherr)
Expand Down Expand Up @@ -511,3 +516,7 @@ func (m *manager) channelDataTransferType(channel datatransfer.ChannelState) Cha
// we received a push channel
return ManagerPeerReceivePush
}

func (m *manager) PeerID() peer.ID {
return m.peerID
}
8 changes: 2 additions & 6 deletions impl/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ func TestManyReceiversAtOnce(t *testing.T) {
}
}

// disconnectCoordinator is used by TestPushRequestAutoRestart to allow
// disconnectCoordinator is used by TestAutoRestart to allow
// test cases to signal when a disconnect should start, and whether
// to wait for the disconnect to take effect before continuing
type disconnectCoordinator struct {
Expand Down Expand Up @@ -724,14 +724,10 @@ func TestAutoRestart(t *testing.T) {

// Set up
restartConf := ChannelRestartConfig(channelmonitor.Config{
MonitorPushChannels: tc.isPush,
MonitorPullChannels: !tc.isPush,
AcceptTimeout: 100 * time.Millisecond,
Interval: 100 * time.Millisecond,
MinBytesTransferred: 1,
ChecksPerInterval: 10,
RestartBackoff: 500 * time.Millisecond,
MaxConsecutiveRestarts: 5,
RestartAckTimeout: 100 * time.Millisecond,
CompleteTimeout: 100 * time.Millisecond,
})
initiator, err := NewDataTransfer(gsData.DtDs1, gsData.TempDir1, gsData.DtNet1, initiatorGSTspt, restartConf)
Expand Down
5 changes: 5 additions & 0 deletions network/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ type DataTransferNetwork interface {
// ConnectTo establishes a connection to the given peer
ConnectTo(context.Context, peer.ID) error

// ConnectWithRetry establishes a connection to the given peer, retrying if
// necessary, and opens a stream on the data-transfer protocol to verify
// the peer will accept messages on the protocol
ConnectWithRetry(ctx context.Context, p peer.ID) error

// ID returns the peer id of this libp2p host
ID() peer.ID
}
Expand Down
16 changes: 16 additions & 0 deletions network/libp2p_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,22 @@ func (dtnet *libp2pDataTransferNetwork) ConnectTo(ctx context.Context, p peer.ID
return dtnet.host.Connect(ctx, peer.AddrInfo{ID: p})
}

// ConnectWithRetry establishes a connection to the given peer, retrying if
// necessary, and opens a stream on the data-transfer protocol to verify
// the peer will accept messages on the protocol
func (dtnet *libp2pDataTransferNetwork) ConnectWithRetry(ctx context.Context, p peer.ID) error {
// Open a stream over the data-transfer protocol, to make sure that the
// peer is listening on the protocol
s, err := dtnet.openStream(ctx, p, dtnet.dtProtocols...)
if err != nil {
return err
}

// We don't actually use the stream, we just open it to verify it's
// possible to connect over the data-transfer protocol, so we close it here
return s.Close()
}

// handleNewStream receives a new stream from the network.
func (dtnet *libp2pDataTransferNetwork) handleNewStream(s network.Stream) {
defer s.Close() // nolint: errcheck,gosec
Expand Down
4 changes: 4 additions & 0 deletions testutil/testnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ func (fn *FakeNetwork) ConnectTo(_ context.Context, _ peer.ID) error {
panic("not implemented")
}

func (fn *FakeNetwork) ConnectWithRetry(ctx context.Context, p peer.ID) error {
panic("implement me")
}

// ID returns a stubbed id for host of this network
func (fn *FakeNetwork) ID() peer.ID {
return fn.PeerID
Expand Down
4 changes: 4 additions & 0 deletions transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ type EventsHandler interface {
// OnSendDataError is called when a network error occurs sending data
// at the transport layer
OnSendDataError(chid ChannelID, err error) error

// OnReceiveDataError is called when a network error occurs receiving data
// at the transport layer
OnReceiveDataError(chid ChannelID, err error) error
}

/*
Expand Down
26 changes: 25 additions & 1 deletion transport/graphsync/graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,7 @@ func (t *Transport) SetEventHandler(events datatransfer.EventsHandler) error {
t.unregisterFuncs = append(t.unregisterFuncs, t.gs.RegisterRequestUpdatedHook(t.gsRequestUpdatedHook))
t.unregisterFuncs = append(t.unregisterFuncs, t.gs.RegisterRequestorCancelledListener(t.gsRequestorCancelledListener))
t.unregisterFuncs = append(t.unregisterFuncs, t.gs.RegisterNetworkErrorListener(t.gsNetworkSendErrorListener))
t.unregisterFuncs = append(t.unregisterFuncs, t.gs.RegisterReceiverNetworkErrorListener(t.gsNetworkReceiveErrorListener))
return nil
}

Expand Down Expand Up @@ -809,13 +810,36 @@ func (t *Transport) gsNetworkSendErrorListener(p peer.ID, request graphsync.Requ
t.dataLock.Lock()
defer t.dataLock.Unlock()

// Fire an error if the graphsync request was made by this node or the remote peer
chid, ok := t.graphsyncRequestMap[graphsyncKey{request.ID(), p}]
if !ok {
return
chid, ok = t.graphsyncRequestMap[graphsyncKey{request.ID(), t.peerID}]
if !ok {
return
}
}

err := t.events.OnSendDataError(chid, gserr)
if err != nil {
log.Errorf("failed to fire transport send error %s: %s", gserr, err)
}
}

// Called when there is a graphsync error receiving data
func (t *Transport) gsNetworkReceiveErrorListener(p peer.ID, gserr error) {
t.dataLock.Lock()
defer t.dataLock.Unlock()

// Fire a receive data error on all ongoing graphsync transfers with that
// peer
for _, chid := range t.graphsyncRequestMap {
if chid.Initiator != p && chid.Responder != p {
continue
}

err := t.events.OnReceiveDataError(chid, gserr)
if err != nil {
log.Errorf("failed to fire transport receive error %s: %s", gserr, err)
}
}
}
53 changes: 46 additions & 7 deletions transport/graphsync/graphsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ func TestManager(t *testing.T) {
require.False(t, events.OnDataQueuedCalled)
},
},
"outgoing data send error will terminate request": {
"outgoing data queued error will terminate request": {
events: fakeEvents{
OnDataQueuedError: errors.New("something went wrong"),
},
Expand All @@ -345,7 +345,7 @@ func TestManager(t *testing.T) {
require.Error(t, gsData.outgoingBlockHookActions.TerminationError)
},
},
"outgoing data send error == pause will pause request": {
"outgoing data queued error == pause will pause request": {
events: fakeEvents{
OnDataQueuedError: datatransfer.ErrPause,
},
Expand Down Expand Up @@ -626,7 +626,7 @@ func TestManager(t *testing.T) {
assertHasOutgoingMessage(t, gsData.incomingRequestHookActions.SentExtensions, gsData.incoming)
},
},
"recognized incoming request will record network error": {
"recognized incoming request will record network send error": {
action: func(gsData *harness) {
gsData.incomingRequestHook()
gsData.networkErrorListener(errors.New("something went wrong"))
Expand All @@ -636,6 +636,34 @@ func TestManager(t *testing.T) {
require.True(t, events.OnSendDataErrorCalled)
},
},
"recognized outgoing request will record network send error": {
action: func(gsData *harness) {
gsData.outgoingRequestHook()
gsData.networkErrorListener(errors.New("something went wrong"))
},
check: func(t *testing.T, events *fakeEvents, gsData *harness) {
require.True(t, events.OnSendDataErrorCalled)
},
},
"recognized incoming request will record network receive error": {
action: func(gsData *harness) {
gsData.incomingRequestHook()
gsData.receiverNetworkErrorListener(errors.New("something went wrong"))
},
check: func(t *testing.T, events *fakeEvents, gsData *harness) {
require.Equal(t, 1, events.OnRequestReceivedCallCount)
require.True(t, events.OnReceiveDataErrorCalled)
},
},
"recognized outgoing request will record network receive error": {
action: func(gsData *harness) {
gsData.outgoingRequestHook()
gsData.receiverNetworkErrorListener(errors.New("something went wrong"))
},
check: func(t *testing.T, events *fakeEvents, gsData *harness) {
require.True(t, events.OnReceiveDataErrorCalled)
},
},
"open channel adds doNotSendCids to the DoNotSend extension": {
action: func(gsData *harness) {
cids := testutil.GenerateCids(2)
Expand Down Expand Up @@ -974,10 +1002,12 @@ type fakeEvents struct {
OnDataQueuedMessage datatransfer.Message
OnDataQueuedError error

OnRequestTimedOutCalled bool
OnRequestTimedOutChannelId datatransfer.ChannelID
OnSendDataErrorCalled bool
OnSendDataErrorChannelID datatransfer.ChannelID
OnRequestTimedOutCalled bool
OnRequestTimedOutChannelId datatransfer.ChannelID
OnSendDataErrorCalled bool
OnSendDataErrorChannelID datatransfer.ChannelID
OnReceiveDataErrorCalled bool
OnReceiveDataErrorChannelID datatransfer.ChannelID

ChannelCompletedSuccess bool
RequestReceivedRequest datatransfer.Request
Expand Down Expand Up @@ -1008,6 +1038,12 @@ func (fe *fakeEvents) OnSendDataError(chid datatransfer.ChannelID, err error) er
return nil
}

func (fe *fakeEvents) OnReceiveDataError(chid datatransfer.ChannelID, err error) error {
fe.OnReceiveDataErrorCalled = true
fe.OnReceiveDataErrorChannelID = chid
return nil
}

func (fe *fakeEvents) OnChannelOpened(chid datatransfer.ChannelID) error {
fe.ChannelOpenedChannelID = chid
return fe.OnChannelOpenedError
Expand Down Expand Up @@ -1099,6 +1135,9 @@ func (ha *harness) requestorCancelledListener() {
func (ha *harness) networkErrorListener(err error) {
ha.fgs.NetworkErrorListener(ha.other, ha.request, err)
}
func (ha *harness) receiverNetworkErrorListener(err error) {
ha.fgs.ReceiverNetworkErrorListener(ha.other, err)
}

type dtConfig struct {
dtExtensionMissing bool
Expand Down

0 comments on commit f14cc40

Please sign in to comment.