diff --git a/channelmonitor/channelmonitor.go b/channelmonitor/channelmonitor.go index cbba90f8..e58a9892 100644 --- a/channelmonitor/channelmonitor.go +++ b/channelmonitor/channelmonitor.go @@ -6,6 +6,7 @@ import ( "sync" "time" + "github.com/bep/debounce" logging "github.com/ipfs/go-log/v2" "github.com/libp2p/go-libp2p-core/peer" "golang.org/x/xerrors" @@ -39,6 +40,8 @@ type Monitor struct { type Config struct { // Max time to wait for other side to accept open channel request before attempting restart AcceptTimeout time.Duration + // Debounce when restart is triggered by multiple errors + RestartDebounce time.Duration // Backoff after restarting RestartBackoff time.Duration // Number of times to try to restart before failing @@ -152,18 +155,20 @@ func (m *Monitor) enabled() bool { type monitoredChannel struct { // The parentCtx is used when sending a close message for a channel, so // that operation can continue even after the monitoredChannel is shutdown - parentCtx context.Context - ctx context.Context - cancel context.CancelFunc - mgr monitorAPI - chid datatransfer.ChannelID - cfg *Config - unsub datatransfer.Unsubscribe - onShutdown func(datatransfer.ChannelID) - shutdownLk sync.Mutex + parentCtx context.Context + ctx context.Context + cancel context.CancelFunc + mgr monitorAPI + chid datatransfer.ChannelID + cfg *Config + unsub datatransfer.Unsubscribe + restartChannelDebounced func() + onShutdown func(datatransfer.ChannelID) + shutdownLk sync.Mutex restartLk sync.RWMutex restartedAt time.Time + restartQueued bool consecutiveRestarts int } @@ -184,6 +189,8 @@ func newMonitoredChannel( cfg: cfg, onShutdown: onShutdown, } + debouncer := debounce.New(cfg.RestartDebounce) + mpc.restartChannelDebounced = func() { debouncer(mpc.restartChannel) } mpc.start() return mpc } @@ -243,12 +250,12 @@ func (mc *monitoredChannel) start() { // If the transport layer reports an error sending data over the wire, // attempt to restart the channel log.Warnf("%s: data transfer transport send error, restarting data transfer", mc.chid) - go mc.restartChannel() + go mc.restartChannelDebounced() case datatransfer.ReceiveDataError: // If the transport layer reports an error receiving data over the wire, // attempt to restart the channel log.Warnf("%s: data transfer transport receive error, restarting data transfer", mc.chid) - go mc.restartChannel() + go mc.restartChannelDebounced() case datatransfer.FinishTransfer: // The channel initiator has finished sending / receiving all data. // Watch to make sure that the responder sends a message to acknowledge @@ -297,7 +304,7 @@ func (mc *monitoredChannel) watchForResponderComplete() { // When the Complete message is received, the channel shuts down and // its context is cancelled case <-timer.C: - // Timer expired before we received a Complete from the responder + // Timer expired before we received a Complete message from the responder err := xerrors.Errorf("%s: timed out waiting %s for Complete message from remote peer", mc.chid, mc.cfg.AcceptTimeout) mc.closeChannelAndShutdown(err) @@ -321,52 +328,94 @@ func (mc *monitoredChannel) isRestarting() bool { return !mc.restartedAt.IsZero() } +// Send a restart message for the channel func (mc *monitoredChannel) restartChannel() { - var restartCount int var restartedAt time.Time mc.restartLk.Lock() { - // If the channel is not already being restarted, record the restart - // time and increment the consecutive restart count restartedAt = mc.restartedAt if mc.restartedAt.IsZero() { + // If there is not already a restart in progress, we'll restart now mc.restartedAt = time.Now() - mc.consecutiveRestarts++ - restartCount = mc.consecutiveRestarts + } else { + // There is already a restart in progress, so queue up a restart + // for after the current one has completed + mc.restartQueued = true } } mc.restartLk.Unlock() // Check if channel is already being restarted if !restartedAt.IsZero() { - log.Debugf("%s: restart called but already restarting channel (for %s so far; restart backoff is %s)", + log.Infof("%s: restart called but already restarting channel, "+ + "waiting to restart again (since %s; restart backoff is %s)", mc.chid, time.Since(restartedAt), mc.cfg.RestartBackoff) return } + for { + // Send the restart message + err := mc.doRestartChannel() + if err != nil { + // If there was an error restarting, close the channel and shutdown + // the monitor + mc.closeChannelAndShutdown(err) + return + } + + // Restart has completed, check if there's another restart queued up + restartAgain := false + mc.restartLk.Lock() + { + if mc.restartQueued { + // There's another restart queued up, so reset the restart time + // to now + mc.restartedAt = time.Now() + restartAgain = true + mc.restartQueued = false + } else { + // No other restarts queued up, so clear the restart time + mc.restartedAt = time.Time{} + } + } + mc.restartLk.Unlock() + + if !restartAgain { + // No restart queued, we're done + return + } + + // There was a restart queued, restart again + log.Infof("%s: restart was queued - restarting again", mc.chid) + } +} + +func (mc *monitoredChannel) doRestartChannel() error { + // Keep track of the number of consecutive restarts with no data + // transferred + mc.restartLk.Lock() + mc.consecutiveRestarts++ + restartCount := mc.consecutiveRestarts + mc.restartLk.Unlock() + if uint32(restartCount) > mc.cfg.MaxConsecutiveRestarts { - // If no data has been transferred since the last transfer, and we've - // reached the consecutive restart limit, close the channel and - // shutdown the monitor - err := xerrors.Errorf("%s: after %d consecutive restarts failed to transfer any data", mc.chid, restartCount) - mc.closeChannelAndShutdown(err) - return + // If no data has been transferred since the last restart, and we've + // reached the consecutive restart limit, return an error + return xerrors.Errorf("%s: after %d consecutive restarts failed to transfer any data", mc.chid, restartCount) } // Send the restart message + log.Infof("%s: restarting (%d consecutive restarts)", mc.chid, restartCount) err := mc.sendRestartMessage(restartCount) if err != nil { - // If the restart message could not be sent, close the channel and - // shutdown the monitor - mc.closeChannelAndShutdown(err) - return + log.Warnf("%s: restart failed, trying again: %s", mc.chid, err) + // If the restart message could not be sent, or there was a timeout + // waiting for the restart to be acknowledged, try again + return mc.doRestartChannel() } + log.Infof("%s: restart completed successfully", mc.chid) - // Restart complete, so clear the restart time so that another restart - // can begin - mc.restartLk.Lock() - mc.restartedAt = time.Time{} - mc.restartLk.Unlock() + return nil } func (mc *monitoredChannel) sendRestartMessage(restartCount int) error { @@ -383,7 +432,7 @@ func (mc *monitoredChannel) sendRestartMessage(restartCount int) error { } log.Infof("%s: re-established connection to %s in %s", mc.chid, p, time.Since(start)) - // Send a restart message for the channel. + // Send a restart message for the channel restartResult := mc.waitForRestartResponse() log.Infof("%s: sending restart message to %s (%d consecutive restarts)", mc.chid, p, restartCount) err = mc.mgr.RestartDataTransferChannel(mc.ctx, mc.chid) @@ -430,7 +479,7 @@ func (mc *monitoredChannel) closeChannelAndShutdown(cherr error) { } // Close the data transfer channel and fire an error - log.Errorf("closing data-transfer channel: %s", cherr) + log.Errorf("%s: closing data-transfer channel: %s", mc.chid, cherr) err := mc.mgr.CloseDataTransferChannelWithError(mc.parentCtx, mc.chid, cherr) if err != nil { log.Errorf("error closing data-transfer channel %s: %s", mc.chid, err) @@ -440,7 +489,7 @@ func (mc *monitoredChannel) closeChannelAndShutdown(cherr error) { // Wait for the peer to send an acknowledgement to the restart request func (mc *monitoredChannel) waitForRestartResponse() chan error { restartFired := make(chan struct{}) - restarted := make(chan error) + restarted := make(chan error, 3) timer := time.NewTimer(mc.cfg.RestartAckTimeout) unsub := mc.mgr.SubscribeToEvents(func(event datatransfer.Event, channelState datatransfer.ChannelState) { @@ -472,8 +521,9 @@ func (mc *monitoredChannel) waitForRestartResponse() chan error { // Timer expired before receiving a restart ack from peer case <-timer.C: p := mc.chid.OtherParty(mc.mgr.PeerID()) - restarted <- xerrors.Errorf("did not receive response to restart request from %s after %s", + err := xerrors.Errorf("did not receive response to restart request from %s after %s", p, mc.cfg.RestartAckTimeout) + restarted <- err } }() diff --git a/channelmonitor/channelmonitor_test.go b/channelmonitor/channelmonitor_test.go index ce72ee36..13785ca9 100644 --- a/channelmonitor/channelmonitor_test.go +++ b/channelmonitor/channelmonitor_test.go @@ -59,7 +59,7 @@ func TestChannelMonitorAutoRestart(t *testing.T) { m := NewMonitor(mockAPI, &Config{ AcceptTimeout: time.Hour, MaxConsecutiveRestarts: 3, - RestartAckTimeout: 50 * time.Millisecond, + RestartAckTimeout: 20 * time.Millisecond, CompleteTimeout: time.Hour, }) @@ -93,11 +93,8 @@ func TestChannelMonitorAutoRestart(t *testing.T) { } // Verify that restart message is sent - select { - case <-time.After(100 * time.Millisecond): - require.Fail(t, "failed to restart channel") - case <-mockAPI.restartMessages: - } + err := mockAPI.awaitRestartSent() + require.NoError(t, err) // If simulating a restart ack timeout, don't fire the restart // ack event and expect the channel to be closed with an error @@ -217,6 +214,63 @@ func awaitRestartComplete(mch *monitoredChannel) error { return xerrors.Errorf("restart did not complete after 10ms") } +func TestChannelMonitorQueuedRestart(t *testing.T) { + runTest := func(name string, isPush bool) { + t.Run(name, func(t *testing.T) { + ch := &mockChannelState{chid: ch1} + mockAPI := newMockMonitorAPI(ch, false, false) + + triggerErrorEvent := func() { + if isPush { + mockAPI.sendDataErrorEvent() + } else { + mockAPI.receiveDataErrorEvent() + } + } + + m := NewMonitor(mockAPI, &Config{ + AcceptTimeout: time.Hour, + RestartDebounce: 10 * time.Millisecond, + MaxConsecutiveRestarts: 3, + RestartAckTimeout: time.Hour, + CompleteTimeout: time.Hour, + }) + + if isPush { + m.AddPushChannel(ch1) + + mockAPI.dataQueued(10) + mockAPI.dataSent(5) + } else { + m.AddPullChannel(ch1) + + mockAPI.dataReceived(5) + } + + // Trigger an error event, should cause a restart + triggerErrorEvent() + // Wait for restart to occur + err := mockAPI.awaitRestartSent() + require.NoError(t, err) + + // Trigger another error event before the restart has completed + triggerErrorEvent() + + // Simulate receiving restart ack from peer (for first restart) + mockAPI.restartEvent() + + // A second restart should be sent because of the second error + err = mockAPI.awaitRestartSent() + require.NoError(t, err) + }) + } + + // test push channel + runTest("push", true) + // test pull channel + runTest("pull", false) +} + func TestChannelMonitorTimeouts(t *testing.T) { type testCase struct { name string @@ -324,8 +378,8 @@ func verifyChannelShutdown(t *testing.T, shutdownCtx context.Context) { type mockMonitorAPI struct { ch *mockChannelState - connectErrors chan error - restartErrors chan error + connectErrors bool + restartErrors bool restartMessages chan struct{} closeErr chan error @@ -334,28 +388,14 @@ type mockMonitorAPI struct { } func newMockMonitorAPI(ch *mockChannelState, errOnReconnect, errOnRestart bool) *mockMonitorAPI { - m := &mockMonitorAPI{ + return &mockMonitorAPI{ ch: ch, - restartMessages: make(chan struct{}, 1), + connectErrors: errOnReconnect, + restartErrors: errOnRestart, + restartMessages: make(chan struct{}, 100), closeErr: make(chan error, 1), - connectErrors: make(chan error, 1), - restartErrors: make(chan error, 1), subscribers: make(map[int]datatransfer.Subscriber), } - - var connectErr error - if errOnReconnect { - connectErr = xerrors.Errorf("connect err") - } - m.connectErrors <- connectErr - - var restartErr error - if errOnRestart { - restartErr = xerrors.Errorf("restart err") - } - m.restartErrors <- restartErr - - return m } func (m *mockMonitorAPI) SubscribeToEvents(subscriber datatransfer.Subscriber) datatransfer.Unsubscribe { @@ -374,18 +414,19 @@ func (m *mockMonitorAPI) SubscribeToEvents(subscriber datatransfer.Subscriber) d } func (m *mockMonitorAPI) fireEvent(e datatransfer.Event, state datatransfer.ChannelState) { + m.lk.Lock() + defer m.lk.Unlock() + for _, subscriber := range m.subscribers { subscriber(e, state) } } func (m *mockMonitorAPI) ConnectTo(ctx context.Context, id peer.ID) error { - select { - case err := <-m.connectErrors: - return err - default: - return nil + if m.connectErrors { + return xerrors.Errorf("connect err") } + return nil } func (m *mockMonitorAPI) PeerID() peer.ID { @@ -397,18 +438,17 @@ func (m *mockMonitorAPI) RestartDataTransferChannel(ctx context.Context, chid da m.restartMessages <- struct{}{} }() - select { - case err := <-m.restartErrors: - return err - default: - return nil + if m.restartErrors { + return xerrors.Errorf("restart err") } + return nil } func (m *mockMonitorAPI) awaitRestartSent() error { + timeout := 100 * time.Millisecond select { - case <-time.After(10 * time.Millisecond): - return xerrors.Errorf("failed to restart channel") + case <-time.After(timeout): + return xerrors.Errorf("failed to restart channel after %s", timeout) case <-m.restartMessages: return nil } @@ -421,9 +461,10 @@ func (m *mockMonitorAPI) CloseDataTransferChannelWithError(ctx context.Context, func (m *mockMonitorAPI) verifyChannelClosed(t *testing.T, expectErr bool) { // Verify channel has been closed + closeTimeout := 100 * time.Millisecond select { - case <-time.After(100 * time.Millisecond): - require.Fail(t, "failed to close channel within 100ms") + case <-time.After(closeTimeout): + require.Fail(t, fmt.Sprintf("failed to close channel within %s", closeTimeout)) case err := <-m.closeErr: if expectErr && err == nil { require.Fail(t, "expected error on close") diff --git a/go.mod b/go.mod index 5f587e0c..b0295286 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/filecoin-project/go-data-transfer go 1.14 require ( + github.com/bep/debounce v1.2.0 github.com/filecoin-project/go-ds-versioning v0.1.0 github.com/filecoin-project/go-statemachine v0.0.0-20200925024713-05bd7c71fbfe github.com/hannahhoward/cbor-gen-for v0.0.0-20200817222906-ea96cece81f1 @@ -12,7 +13,7 @@ require ( github.com/ipfs/go-cid v0.0.7 github.com/ipfs/go-datastore v0.4.5 github.com/ipfs/go-ds-badger v0.2.3 - github.com/ipfs/go-graphsync v0.6.1-0.20210407112122-089b2abad957 + github.com/ipfs/go-graphsync v0.6.1 github.com/ipfs/go-ipfs-blockstore v1.0.1 github.com/ipfs/go-ipfs-blocksutil v0.0.1 github.com/ipfs/go-ipfs-chunker v0.0.5 diff --git a/go.sum b/go.sum index da32ec7c..4dd52060 100644 --- a/go.sum +++ b/go.sum @@ -28,6 +28,8 @@ github.com/Stebalien/go-bitfield v0.0.1 h1:X3kbSSPUaJK60wV2hjOPZwmpljr6VGCqdq4cB github.com/Stebalien/go-bitfield v0.0.1/go.mod h1:GNjFpasyUVkHMsfEOk8EFLJ9syQ6SI+XWrX9Wf2XH0s= github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII= github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= +github.com/bep/debounce v1.2.0 h1:wXds8Kq8qRfwAOpAxHrJDbCXgC5aHSzgQb/0gKsHQqo= +github.com/bep/debounce v1.2.0/go.mod h1:H8yggRPQKLUhUoqrJC1bO2xNya7vanpDl7xR3ISbCJ0= github.com/btcsuite/btcd v0.0.0-20190213025234-306aecffea32/go.mod h1:DrZx5ec/dmnfpw9KyYoQyYo7d0KEvTkk/5M/vbZjAr8= github.com/btcsuite/btcd v0.0.0-20190523000118-16327141da8c/go.mod h1:3J08xEfcugPacsc34/LKRU2yO7YmuT8yt28J8k2+rrI= github.com/btcsuite/btcd v0.0.0-20190605094302-a0d1e3e36d50/go.mod h1:3J08xEfcugPacsc34/LKRU2yO7YmuT8yt28J8k2+rrI= @@ -213,8 +215,8 @@ github.com/ipfs/go-ds-badger v0.2.3/go.mod h1:pEYw0rgg3FIrywKKnL+Snr+w/LjJZVMTBR github.com/ipfs/go-ds-leveldb v0.0.1/go.mod h1:feO8V3kubwsEF22n0YRQCffeb79OOYIykR4L04tMOYc= github.com/ipfs/go-ds-leveldb v0.4.1/go.mod h1:jpbku/YqBSsBc1qgME8BkWS4AxzF2cEu1Ii2r79Hh9s= github.com/ipfs/go-ds-leveldb v0.4.2/go.mod h1:jpbku/YqBSsBc1qgME8BkWS4AxzF2cEu1Ii2r79Hh9s= -github.com/ipfs/go-graphsync v0.6.1-0.20210407112122-089b2abad957 h1:JMQvhEKMk8kz31F7GcQba4XCwrO35zXad0/pmhyxfwk= -github.com/ipfs/go-graphsync v0.6.1-0.20210407112122-089b2abad957/go.mod h1:e2ZxnClqBBYAtd901g9vXMJzS47labjAtOzsWtOzKNk= +github.com/ipfs/go-graphsync v0.6.1 h1:i9wN7YkBXWwIsUjVQeuaDxFB59yWZrG1xL564Nz7aGE= +github.com/ipfs/go-graphsync v0.6.1/go.mod h1:e2ZxnClqBBYAtd901g9vXMJzS47labjAtOzsWtOzKNk= github.com/ipfs/go-ipfs-blockstore v0.0.1/go.mod h1:d3WClOmRQKFnJ0Jz/jj/zmksX0ma1gROTlovZKBmN08= github.com/ipfs/go-ipfs-blockstore v0.1.0/go.mod h1:5aD0AvHPi7mZc6Ci1WCAhiBQu2IsfTduLl+422H6Rqw= github.com/ipfs/go-ipfs-blockstore v0.1.4/go.mod h1:Jxm3XMVjh6R17WvxFEiyKBLUGr86HgIYJW/D/MwqeYQ=