Skip to content

Commit

Permalink
feat: add debounce to channel monitor
Browse files Browse the repository at this point in the history
  • Loading branch information
dirkmc committed Apr 16, 2021
1 parent 10de9cb commit 6798477
Show file tree
Hide file tree
Showing 4 changed files with 174 additions and 80 deletions.
124 changes: 87 additions & 37 deletions channelmonitor/channelmonitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"sync"
"time"

"github.com/bep/debounce"
logging "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p-core/peer"
"golang.org/x/xerrors"
Expand Down Expand Up @@ -39,6 +40,8 @@ type Monitor struct {
type Config struct {
// Max time to wait for other side to accept open channel request before attempting restart
AcceptTimeout time.Duration
// Debounce when restart is triggered by multiple errors
RestartDebounce time.Duration
// Backoff after restarting
RestartBackoff time.Duration
// Number of times to try to restart before failing
Expand Down Expand Up @@ -152,18 +155,20 @@ func (m *Monitor) enabled() bool {
type monitoredChannel struct {
// The parentCtx is used when sending a close message for a channel, so
// that operation can continue even after the monitoredChannel is shutdown
parentCtx context.Context
ctx context.Context
cancel context.CancelFunc
mgr monitorAPI
chid datatransfer.ChannelID
cfg *Config
unsub datatransfer.Unsubscribe
onShutdown func(datatransfer.ChannelID)
shutdownLk sync.Mutex
parentCtx context.Context
ctx context.Context
cancel context.CancelFunc
mgr monitorAPI
chid datatransfer.ChannelID
cfg *Config
unsub datatransfer.Unsubscribe
restartChannelDebounced func()
onShutdown func(datatransfer.ChannelID)
shutdownLk sync.Mutex

restartLk sync.RWMutex
restartedAt time.Time
restartQueued bool
consecutiveRestarts int
}

Expand All @@ -184,6 +189,8 @@ func newMonitoredChannel(
cfg: cfg,
onShutdown: onShutdown,
}
debouncer := debounce.New(cfg.RestartDebounce)
mpc.restartChannelDebounced = func() { debouncer(mpc.restartChannel) }
mpc.start()
return mpc
}
Expand Down Expand Up @@ -243,12 +250,12 @@ func (mc *monitoredChannel) start() {
// If the transport layer reports an error sending data over the wire,
// attempt to restart the channel
log.Warnf("%s: data transfer transport send error, restarting data transfer", mc.chid)
go mc.restartChannel()
go mc.restartChannelDebounced()
case datatransfer.ReceiveDataError:
// If the transport layer reports an error receiving data over the wire,
// attempt to restart the channel
log.Warnf("%s: data transfer transport receive error, restarting data transfer", mc.chid)
go mc.restartChannel()
go mc.restartChannelDebounced()
case datatransfer.FinishTransfer:
// The channel initiator has finished sending / receiving all data.
// Watch to make sure that the responder sends a message to acknowledge
Expand Down Expand Up @@ -297,7 +304,7 @@ func (mc *monitoredChannel) watchForResponderComplete() {
// When the Complete message is received, the channel shuts down and
// its context is cancelled
case <-timer.C:
// Timer expired before we received a Complete from the responder
// Timer expired before we received a Complete message from the responder
err := xerrors.Errorf("%s: timed out waiting %s for Complete message from remote peer",
mc.chid, mc.cfg.AcceptTimeout)
mc.closeChannelAndShutdown(err)
Expand All @@ -321,52 +328,94 @@ func (mc *monitoredChannel) isRestarting() bool {
return !mc.restartedAt.IsZero()
}

// Send a restart message for the channel
func (mc *monitoredChannel) restartChannel() {
var restartCount int
var restartedAt time.Time
mc.restartLk.Lock()
{
// If the channel is not already being restarted, record the restart
// time and increment the consecutive restart count
restartedAt = mc.restartedAt
if mc.restartedAt.IsZero() {
// If there is not already a restart in progress, we'll restart now
mc.restartedAt = time.Now()
mc.consecutiveRestarts++
restartCount = mc.consecutiveRestarts
} else {
// There is already a restart in progress, so queue up a restart
// for after the current one has completed
mc.restartQueued = true
}
}
mc.restartLk.Unlock()

// Check if channel is already being restarted
if !restartedAt.IsZero() {
log.Debugf("%s: restart called but already restarting channel (for %s so far; restart backoff is %s)",
log.Infof("%s: restart called but already restarting channel, "+
"waiting to restart again (since %s; restart backoff is %s)",
mc.chid, time.Since(restartedAt), mc.cfg.RestartBackoff)
return
}

for {
// Send the restart message
err := mc.doRestartChannel()
if err != nil {
// If there was an error restarting, close the channel and shutdown
// the monitor
mc.closeChannelAndShutdown(err)
return
}

// Restart has completed, check if there's another restart queued up
restartAgain := false
mc.restartLk.Lock()
{
if mc.restartQueued {
// There's another restart queued up, so reset the restart time
// to now
mc.restartedAt = time.Now()
restartAgain = true
mc.restartQueued = false
} else {
// No other restarts queued up, so clear the restart time
mc.restartedAt = time.Time{}
}
}
mc.restartLk.Unlock()

if !restartAgain {
// No restart queued, we're done
return
}

// There was a restart queued, restart again
log.Infof("%s: restart was queued - restarting again", mc.chid)
}
}

func (mc *monitoredChannel) doRestartChannel() error {
// Keep track of the number of consecutive restarts with no data
// transferred
mc.restartLk.Lock()
mc.consecutiveRestarts++
restartCount := mc.consecutiveRestarts
mc.restartLk.Unlock()

if uint32(restartCount) > mc.cfg.MaxConsecutiveRestarts {
// If no data has been transferred since the last transfer, and we've
// reached the consecutive restart limit, close the channel and
// shutdown the monitor
err := xerrors.Errorf("%s: after %d consecutive restarts failed to transfer any data", mc.chid, restartCount)
mc.closeChannelAndShutdown(err)
return
// If no data has been transferred since the last restart, and we've
// reached the consecutive restart limit, return an error
return xerrors.Errorf("%s: after %d consecutive restarts failed to transfer any data", mc.chid, restartCount)
}

// Send the restart message
log.Infof("%s: restarting (%d consecutive restarts)", mc.chid, restartCount)
err := mc.sendRestartMessage(restartCount)
if err != nil {
// If the restart message could not be sent, close the channel and
// shutdown the monitor
mc.closeChannelAndShutdown(err)
return
log.Warnf("%s: restart failed, trying again: %s", mc.chid, err)
// If the restart message could not be sent, or there was a timeout
// waiting for the restart to be acknowledged, try again
return mc.doRestartChannel()
}
log.Infof("%s: restart completed successfully", mc.chid)

// Restart complete, so clear the restart time so that another restart
// can begin
mc.restartLk.Lock()
mc.restartedAt = time.Time{}
mc.restartLk.Unlock()
return nil
}

func (mc *monitoredChannel) sendRestartMessage(restartCount int) error {
Expand All @@ -383,7 +432,7 @@ func (mc *monitoredChannel) sendRestartMessage(restartCount int) error {
}
log.Infof("%s: re-established connection to %s in %s", mc.chid, p, time.Since(start))

// Send a restart message for the channel.
// Send a restart message for the channel
restartResult := mc.waitForRestartResponse()
log.Infof("%s: sending restart message to %s (%d consecutive restarts)", mc.chid, p, restartCount)
err = mc.mgr.RestartDataTransferChannel(mc.ctx, mc.chid)
Expand Down Expand Up @@ -430,7 +479,7 @@ func (mc *monitoredChannel) closeChannelAndShutdown(cherr error) {
}

// Close the data transfer channel and fire an error
log.Errorf("closing data-transfer channel: %s", cherr)
log.Errorf("%s: closing data-transfer channel: %s", mc.chid, cherr)
err := mc.mgr.CloseDataTransferChannelWithError(mc.parentCtx, mc.chid, cherr)
if err != nil {
log.Errorf("error closing data-transfer channel %s: %s", mc.chid, err)
Expand All @@ -440,7 +489,7 @@ func (mc *monitoredChannel) closeChannelAndShutdown(cherr error) {
// Wait for the peer to send an acknowledgement to the restart request
func (mc *monitoredChannel) waitForRestartResponse() chan error {
restartFired := make(chan struct{})
restarted := make(chan error)
restarted := make(chan error, 3)
timer := time.NewTimer(mc.cfg.RestartAckTimeout)

unsub := mc.mgr.SubscribeToEvents(func(event datatransfer.Event, channelState datatransfer.ChannelState) {
Expand Down Expand Up @@ -472,8 +521,9 @@ func (mc *monitoredChannel) waitForRestartResponse() chan error {
// Timer expired before receiving a restart ack from peer
case <-timer.C:
p := mc.chid.OtherParty(mc.mgr.PeerID())
restarted <- xerrors.Errorf("did not receive response to restart request from %s after %s",
err := xerrors.Errorf("did not receive response to restart request from %s after %s",
p, mc.cfg.RestartAckTimeout)
restarted <- err
}
}()

Expand Down
Loading

0 comments on commit 6798477

Please sign in to comment.