Skip to content

Commit

Permalink
Expect test for fast catchup (#1186)
Browse files Browse the repository at this point in the history
When switching to catchup mode, the SetCatchpointCatchupMode function is being called. The function would get blocked by the node mutex, which might be held by Stop. That could lead to a deadlock.

To resolve the above, we return a channel from SetCatchpointCatchupMode instead. The operation is carried by a goroutine, which at the ned of operation write the new node context to the channel. This allows the caller to select between the reception of the new context and the expiration of the service.

Second issue which is addressed in this PR is the timeout context handling. The existing context cancelation was comparing the returned propagated error with the context error err == cs.ctx.Err(). This would have worked correctly if the context errors would have been retained across all the handlers. However, the http client seems to wrap the underlying context error with it's own when aborting due to context expiration/cancelation. This means that in order to correctly detect context expiration, we need to explicitly test cs.ctx.Err() != nil.
  • Loading branch information
tsachiherman authored Jun 24, 2020
1 parent b02ff66 commit d868c91
Show file tree
Hide file tree
Showing 23 changed files with 774 additions and 212 deletions.
39 changes: 31 additions & 8 deletions catchup/catchpointService.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import (
// CatchpointCatchupNodeServices defines the extenal node support needed
// for the catchpoint service to switch the node between "regular" operational mode and catchup mode.
type CatchpointCatchupNodeServices interface {
SetCatchpointCatchupMode(bool) (newCtx context.Context)
SetCatchpointCatchupMode(bool) (newContextCh <-chan context.Context)
}

// CatchpointCatchupStats is used for querying and reporting the current state of the catchpoint catchup process
Expand Down Expand Up @@ -270,7 +270,7 @@ func (cs *CatchpointCatchupService) processStageLedgerDownload() (err error) {

err = cs.ledgerAccessor.ResetStagingBalances(cs.ctx, true)
if err != nil {
if err == cs.ctx.Err() {
if cs.ctx.Err() != nil {
return cs.stopOrAbort()
}
return cs.abort(fmt.Errorf("processStageLedgerDownload failed to reset staging balances : %v", err))
Expand All @@ -279,7 +279,10 @@ func (cs *CatchpointCatchupService) processStageLedgerDownload() (err error) {
if err == nil {
break
}
if err == cs.ctx.Err() {
// instead of testing for err == cs.ctx.Err() , we'll check on the context itself.
// this is more robust, as the http client library sometimes wrap the context canceled
// error with other erros.
if cs.ctx.Err() != nil {
return cs.stopOrAbort()
}

Expand Down Expand Up @@ -319,7 +322,7 @@ func (cs *CatchpointCatchupService) processStageLastestBlockDownload() (err erro
fetcher := fetcherFactory.New()
blk, _, client, err = fetcher.FetchBlock(cs.ctx, blockRound)
if err != nil {
if err == cs.ctx.Err() {
if cs.ctx.Err() != nil {
return cs.stopOrAbort()
}
if attemptsCount <= cs.config.CatchupBlockDownloadRetryAttempts {
Expand Down Expand Up @@ -360,7 +363,7 @@ func (cs *CatchpointCatchupService) processStageLastestBlockDownload() (err erro
// verify that the catchpoint is valid.
err = cs.ledgerAccessor.VerifyCatchpoint(cs.ctx, blk)
if err != nil {
if err == cs.ctx.Err() {
if cs.ctx.Err() != nil {
return cs.stopOrAbort()
}
if attemptsCount <= cs.config.CatchupBlockDownloadRetryAttempts {
Expand Down Expand Up @@ -458,7 +461,7 @@ func (cs *CatchpointCatchupService) processStageBlocksDownload() (err error) {
fetcher := fetcherFactory.New()
blk, _, client, err = fetcher.FetchBlock(cs.ctx, topBlock.Round()-basics.Round(blocksFetched))
if err != nil {
if err == cs.ctx.Err() {
if cs.ctx.Err() != nil {
return cs.stopOrAbort()
}
if attemptsCount <= uint64(cs.config.CatchupBlockDownloadRetryAttempts) {
Expand Down Expand Up @@ -584,9 +587,29 @@ func (cs *CatchpointCatchupService) updateStage(newStage ledger.CatchpointCatchu
return nil
}

// updateNodeCatchupMode requests the node to change it's operational mode from
// catchup mode to normal mode and vice versa.
func (cs *CatchpointCatchupService) updateNodeCatchupMode(catchupModeEnabled bool) {
newCtx := cs.node.SetCatchpointCatchupMode(catchupModeEnabled)
cs.ctx, cs.cancelCtxFunc = context.WithCancel(newCtx)
newCtxCh := cs.node.SetCatchpointCatchupMode(catchupModeEnabled)
select {
case newCtx, open := <-newCtxCh:
if open {
cs.ctx, cs.cancelCtxFunc = context.WithCancel(newCtx)
} else {
// channel is closed, this means that the node is stopping
}
case <-cs.ctx.Done():
// the node context was canceled before the SetCatchpointCatchupMode goroutine had
// the chance of completing. We At this point, the service is shutting down. However,
// we don't know how long it would take for the node mutex until it's become available.
// given that the SetCatchpointCatchupMode gave us a non-buffered channel, it might get blocked
// if we won't be draining that channel. To resolve that, we will create another goroutine here
// which would drain that channel.
go func() {
// We'll wait here for the above goroutine to complete :
<-newCtxCh
}()
}
}

func (cs *CatchpointCatchupService) updateLedgerFetcherProgress(fetcherStats *ledger.CatchpointCatchupAccessorProgress) {
Expand Down
10 changes: 9 additions & 1 deletion cmd/goal/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/spf13/cobra"

"github.com/algorand/go-algorand/config"
"github.com/algorand/go-algorand/netdeploy"
"github.com/algorand/go-algorand/util"
)
Expand Down Expand Up @@ -93,7 +94,14 @@ var networkCreateCmd = &cobra.Command{
panic(err)
}

network, err := netdeploy.CreateNetworkFromTemplate(networkName, networkRootDir, networkTemplateFile, binDir, !noImportKeys, nil, nil)
dataDir := maybeSingleDataDir()
var consensus config.ConsensusProtocols
if dataDir != "" {
// try to load the consensus from there. If there is none, we can just use the built in one.
consensus, _ = config.PreloadConfigurableConsensusProtocols(dataDir)
}

network, err := netdeploy.CreateNetworkFromTemplate(networkName, networkRootDir, networkTemplateFile, binDir, !noImportKeys, nil, consensus)
if err != nil {
if noClean {
reportInfof(" ** failed ** - Preserving network rootdir '%s'", networkRootDir)
Expand Down
120 changes: 71 additions & 49 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -857,6 +857,7 @@ func (node *AlgorandFullNode) StartCatchup(catchpoint string) error {
return err
}
node.catchpointCatchupService.Start(node.ctx)
node.log.Infof("starting catching up toward catchpoint %s", catchpoint)
return nil
}

Expand All @@ -876,63 +877,84 @@ func (node *AlgorandFullNode) AbortCatchup(catchpoint string) error {
return nil
}

// SetCatchpointCatchupMode change the node's operational mode from catchpoint catchup mode and back
func (node *AlgorandFullNode) SetCatchpointCatchupMode(catchpointCatchupMode bool) (newCtx context.Context) {
node.mu.Lock()
if catchpointCatchupMode {
// stop..
defer func() {
// SetCatchpointCatchupMode change the node's operational mode from catchpoint catchup mode and back, it returns a
// channel which contains the updated node context. This function need to work asyncronisly so that the caller could
// detect and handle the usecase where the node is being shut down while we're switching to/from catchup mode without
// deadlocking on the shared node mutex.
func (node *AlgorandFullNode) SetCatchpointCatchupMode(catchpointCatchupMode bool) (outCtxCh <-chan context.Context) {
// create a non-buffered channel to return the newly created context. The fact that it's non-buffered here
// is imporant, as it allows us to syncronize the "receiving" of the new context before canceling of the previous
// one.
ctxCh := make(chan context.Context)
outCtxCh = ctxCh
go func() {
node.mu.Lock()
// check that the node wasn't canceled. If it have been canceled, it means that the node.Stop() was called, in which case
// we should close the channel.
if node.ctx.Err() == context.Canceled {
close(ctxCh)
node.mu.Unlock()
node.waitMonitoringRoutines()
}()
node.net.ClearHandlers()
node.txHandler.Stop()
node.agreementService.Shutdown()
node.catchupService.Stop()
node.txPoolSyncerService.Stop()
node.blockService.Stop()
node.ledgerService.Stop()
node.wsFetcherService.Stop()

node.cancelCtx()
return
}
if catchpointCatchupMode {
// stop..
defer func() {
node.mu.Unlock()
node.waitMonitoringRoutines()
}()
node.net.ClearHandlers()
node.txHandler.Stop()
node.agreementService.Shutdown()
node.catchupService.Stop()
node.txPoolSyncerService.Stop()
node.blockService.Stop()
node.ledgerService.Stop()
node.wsFetcherService.Stop()

prevNodeCancelFunc := node.cancelCtx

// Set up a context we can use to cancel goroutines on Stop()
node.ctx, node.cancelCtx = context.WithCancel(context.Background())
ctxCh <- node.ctx

prevNodeCancelFunc()
return
}
defer node.mu.Unlock()
// start
node.transactionPool.Reset()
node.wsFetcherService.Start()
node.catchupService.Start()
node.agreementService.Start()
node.txPoolSyncerService.Start(node.catchupService.InitialSyncDone)
node.blockService.Start()
node.ledgerService.Start()
node.txHandler.Start()

// Set up a context we can use to cancel goroutines on Stop()
node.ctx, node.cancelCtx = context.WithCancel(context.Background())
return node.ctx
}
defer node.mu.Unlock()
// start
node.transactionPool.Reset()
node.wsFetcherService.Start()
node.catchupService.Start()
node.agreementService.Start()
node.txPoolSyncerService.Start(node.catchupService.InitialSyncDone)
node.blockService.Start()
node.ledgerService.Start()
node.txHandler.Start()

// start indexer
if idx, err := node.Indexer(); err == nil {
err := idx.Start()
if err != nil {
node.log.Errorf("indexer failed to start, turning it off - %v", err)
node.config.IsIndexerActive = false
// start indexer
if idx, err := node.Indexer(); err == nil {
err := idx.Start()
if err != nil {
node.log.Errorf("indexer failed to start, turning it off - %v", err)
node.config.IsIndexerActive = false
} else {
node.log.Info("Indexer was started successfully")
}
} else {
node.log.Info("Indexer was started successfully")
node.log.Infof("Indexer is not available - %v", err)
}
} else {
node.log.Infof("Indexer is not available - %v", err)
}

// Set up a context we can use to cancel goroutines on Stop()
node.ctx, node.cancelCtx = context.WithCancel(context.Background())
// Set up a context we can use to cancel goroutines on Stop()
node.ctx, node.cancelCtx = context.WithCancel(context.Background())

node.startMonitoringRoutines()
node.startMonitoringRoutines()

// at this point, the catchpoint catchup is done ( either successfully or not.. )
node.catchpointCatchupService = nil
// at this point, the catchpoint catchup is done ( either successfully or not.. )
node.catchpointCatchupService = nil

return node.ctx
ctxCh <- node.ctx
}()
return

}

Expand Down
5 changes: 4 additions & 1 deletion scripts/release/test/deb/testDebian.exp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ if { [catch {

source /expectdir/goalExpectCommon.exp

# Create network
# Create network
::AlgorandGoal::CreateNetwork $NETWORK_NAME $NETWORK_TEMPLATE $TEST_ALGO_DIR $TEST_ROOT_DIR

# Start network
::AlgorandGoal::StartNetwork $NETWORK_NAME $NETWORK_TEMPLATE $TEST_ALGO_DIR $TEST_ROOT_DIR

set TEST_ROOT_DIR_LS_OUTPUT [ eval exec ls $TEST_ROOT_DIR ]
Expand Down
3 changes: 3 additions & 0 deletions test/e2e-go/cli/goal/expect/basicGoalTest.exp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ if { [catch {
set NETWORK_TEMPLATE "$TEST_DATA_DIR/nettemplates/TwoNodes50Each.json"

# Create network
::AlgorandGoal::CreateNetwork $NETWORK_NAME $NETWORK_TEMPLATE $TEST_ALGO_DIR $TEST_ROOT_DIR

# Start network
::AlgorandGoal::StartNetwork $NETWORK_NAME $NETWORK_TEMPLATE $TEST_ALGO_DIR $TEST_ROOT_DIR

set PRIMARY_NODE_ADDRESS [ ::AlgorandGoal::GetAlgodNetworkAddress $TEST_PRIMARY_NODE_DIR ]
Expand Down
Loading

0 comments on commit d868c91

Please sign in to comment.