diff --git a/catchup/catchpointService.go b/catchup/catchpointService.go index 5d01fa9608..47947f6d18 100644 --- a/catchup/catchpointService.go +++ b/catchup/catchpointService.go @@ -19,10 +19,11 @@ package catchup import ( "context" "fmt" - "github.com/algorand/go-algorand/stateproof" "sync" "time" + "github.com/algorand/go-algorand/stateproof" + "github.com/algorand/go-deadlock" "github.com/algorand/go-algorand/config" @@ -497,6 +498,7 @@ func (cs *CatchpointCatchupService) processStageBlocksDownload() (err error) { if lookback < lookbackForStateProofSupport { lookback = lookbackForStateProofSupport } + // in case the effective lookback is going before our rounds count, trim it there. // ( a catchpoint is generated starting round MaxBalLookback, and this is a possible in any round in the range of MaxBalLookback..MaxTxnLife) if lookback >= uint64(topBlock.Round()) { @@ -586,9 +588,12 @@ func (cs *CatchpointCatchupService) processStageBlocksDownload() (err error) { return cs.abort(fmt.Errorf("processStageBlocksDownload: downloaded block content does not match downloaded block header")) } - cs.updateBlockRetrievalStatistics(0, 1) - peerRank := cs.blocksDownloadPeerSelector.peerDownloadDurationToRank(psp, blockDownloadDuration) - cs.blocksDownloadPeerSelector.rankPeer(psp, peerRank) + if psp != nil { + // the block might have been retrieved from the local ledger, nothing to rank + cs.updateBlockRetrievalStatistics(0, 1) + peerRank := cs.blocksDownloadPeerSelector.peerDownloadDurationToRank(psp, blockDownloadDuration) + cs.blocksDownloadPeerSelector.rankPeer(psp, peerRank) + } // all good, persist and move on. err = cs.ledgerAccessor.StoreBlock(cs.ctx, blk) diff --git a/test/e2e-go/cli/goal/expect/catchpointCatchupWebProxy/webproxy.go b/test/e2e-go/cli/goal/expect/catchpointCatchupWebProxy/webproxy.go index 335d5bafb2..8c574c4b1e 100644 --- a/test/e2e-go/cli/goal/expect/catchpointCatchupWebProxy/webproxy.go +++ b/test/e2e-go/cli/goal/expect/catchpointCatchupWebProxy/webproxy.go @@ -28,6 +28,7 @@ import ( "github.com/algorand/go-deadlock" + "github.com/algorand/go-algorand/logging" "github.com/algorand/go-algorand/test/framework/fixtures" ) @@ -49,14 +50,15 @@ func main() { return } var mu deadlock.Mutex - wp, err := fixtures.MakeWebProxy(*webProxyDestination, func(response http.ResponseWriter, request *http.Request, next http.HandlerFunc) { + log := logging.Base() + wp, err := fixtures.MakeWebProxy(*webProxyDestination, log, func(response http.ResponseWriter, request *http.Request, next http.HandlerFunc) { mu.Lock() time.Sleep(time.Duration(*webProxyRequestDelay) * time.Millisecond) mu.Unlock() // prevent requests for block #2 to go through. if strings.HasSuffix(request.URL.String(), "/block/2") { - response.Write([]byte("webProxy prevents block 2 from serving")) response.WriteHeader(http.StatusBadRequest) + response.Write([]byte("webProxy prevents block 2 from serving")) return } if *webProxyLogFile != "" { diff --git a/test/e2e-go/features/catchup/catchpointCatchup_test.go b/test/e2e-go/features/catchup/catchpointCatchup_test.go index 98efc0b266..e5eba0e97c 100644 --- a/test/e2e-go/features/catchup/catchpointCatchup_test.go +++ b/test/e2e-go/features/catchup/catchpointCatchup_test.go @@ -18,7 +18,6 @@ package catchup import ( "fmt" - generatedV2 "github.com/algorand/go-algorand/daemon/algod/api/server/v2/generated" "net/http" "os/exec" "path/filepath" @@ -32,6 +31,9 @@ import ( "github.com/algorand/go-algorand/config" algodclient "github.com/algorand/go-algorand/daemon/algod/api/client" + generatedV2 "github.com/algorand/go-algorand/daemon/algod/api/server/v2/generated" + "github.com/algorand/go-algorand/data/basics" + "github.com/algorand/go-algorand/ledger/ledgercore" "github.com/algorand/go-algorand/logging" "github.com/algorand/go-algorand/nodecontrol" "github.com/algorand/go-algorand/protocol" @@ -116,11 +118,11 @@ func TestBasicCatchpointCatchup(t *testing.T) { // Overview of this test: // Start a two-node network (primary has 100%, secondary has 0%) - // Nodes are having a consensus allowing balances history of 32 rounds and transaction history of 33 rounds. - // Let it run for 37 rounds. - // create a web proxy, and connect it to the primary node, blocking all requests for round #1. ( and allowing everything else ) - // start a secondary node, and instuct it to catchpoint catchup from the proxy. ( which would be for round 36 ) - // wait until the clone node cought up, skipping the "impossible" hole of round #1. + // Nodes are having a consensus allowing balances history of 8 rounds and transaction history of 13 rounds. + // Let it run for 21 rounds. + // create a web proxy, and connect it to the primary node, blocking all requests for round #2. ( and allowing everything else ) + // start a secondary node, and instuct it to catchpoint catchup from the proxy. ( which would be for round 20 ) + // wait until the clone node cought up, skipping the "impossible" hole of round #2. consensus := make(config.ConsensusProtocols) const consensusCatchpointCatchupTestProtocol = protocol.ConsensusVersion("catchpointtestingprotocol") @@ -129,9 +131,9 @@ func TestBasicCatchpointCatchup(t *testing.T) { // MaxBalLookback = 2 x SeedRefreshInterval x SeedLookback // ref. https://github.com/algorandfoundation/specs/blob/master/dev/abft.md catchpointCatchupProtocol.SeedLookback = 2 - catchpointCatchupProtocol.SeedRefreshInterval = 8 - catchpointCatchupProtocol.MaxBalLookback = 2 * catchpointCatchupProtocol.SeedLookback * catchpointCatchupProtocol.SeedRefreshInterval // 32 - catchpointCatchupProtocol.MaxTxnLife = 33 + catchpointCatchupProtocol.SeedRefreshInterval = 2 + catchpointCatchupProtocol.MaxBalLookback = 2 * catchpointCatchupProtocol.SeedLookback * catchpointCatchupProtocol.SeedRefreshInterval // 8 + catchpointCatchupProtocol.MaxTxnLife = 13 catchpointCatchupProtocol.CatchpointLookback = catchpointCatchupProtocol.MaxBalLookback catchpointCatchupProtocol.EnableOnlineAccountCatchpoints = true @@ -161,13 +163,16 @@ func TestBasicCatchpointCatchup(t *testing.T) { // prepare it's configuration file to set it to generate a catchpoint every 4 rounds. cfg, err := config.LoadConfigFromDisk(primaryNode.GetDataDir()) a.NoError(err) - cfg.CatchpointInterval = 4 + const catchpointInterval = 4 + cfg.CatchpointInterval = catchpointInterval cfg.MaxAcctLookback = 2 cfg.SaveToDisk(primaryNode.GetDataDir()) cfg.Archival = false + cfg.CatchpointInterval = 0 cfg.NetAddress = "" cfg.EnableLedgerService = false cfg.EnableBlockService = false + cfg.BaseLoggerDebugLevel = uint32(logging.Debug) cfg.SaveToDisk(secondNode.GetDataDir()) // start the primary node @@ -180,10 +185,15 @@ func TestBasicCatchpointCatchup(t *testing.T) { ExitErrorCallback: errorsCollector.nodeExitWithError, }) a.NoError(err) + defer primaryNode.StopAlgod() // Let the network make some progress currentRound := uint64(1) - const targetRound = uint64(37) + expectedBlocksToDownload := catchpointCatchupProtocol.MaxTxnLife + catchpointCatchupProtocol.DeeperBlockHeaderHistory + const restrictedBlock = 2 // block number that is rejected to be downloaded to ensure fast catchup and not regular catchup is running + // calculate the target round: this is the next round after catchpoint that is greater than expectedBlocksToDownload before the restrictedBlock block number + targetCatchpointRound := (basics.Round(expectedBlocksToDownload+restrictedBlock)/catchpointInterval + 1) * catchpointInterval + targetRound := uint64(targetCatchpointRound) + 1 // 21 primaryNodeRestClient := fixture.GetAlgodClientForController(primaryNode) primaryNodeRestClient.SetAPIVersionAffinity(algodclient.APIVersionV2) log.Infof("Building ledger history..") @@ -194,16 +204,17 @@ func TestBasicCatchpointCatchup(t *testing.T) { break } currentRound++ - } log.Infof("done building!\n") + primaryListeningAddress, err := primaryNode.GetListeningAddress() a.NoError(err) - wp, err := fixtures.MakeWebProxy(primaryListeningAddress, func(response http.ResponseWriter, request *http.Request, next http.HandlerFunc) { + wp, err := fixtures.MakeWebProxy(primaryListeningAddress, log, func(response http.ResponseWriter, request *http.Request, next http.HandlerFunc) { // prevent requests for block #2 to go through. if request.URL.String() == "/v1/test-v1/block/2" { response.WriteHeader(http.StatusBadRequest) + response.Write([]byte("webProxy prevents block 2 from serving")) return } next(response, request) @@ -222,6 +233,7 @@ func TestBasicCatchpointCatchup(t *testing.T) { ExitErrorCallback: errorsCollector.nodeExitWithError, }) a.NoError(err) + defer secondNode.StopAlgod() // wait until node is caught up. secondNodeRestClient := fixture.GetAlgodClientForController(secondNode) @@ -240,10 +252,32 @@ func TestBasicCatchpointCatchup(t *testing.T) { } log.Infof(" - done catching up!\n") - status, err := awaitCatchpointCreation(primaryNodeRestClient, &fixture, 3) - a.NoError(err) + // ensure the catchpoint is created for targetCatchpointRound + var status generatedV2.NodeStatusResponse + timer := time.NewTimer(10 * time.Second) +outer: + for { + status, err = primaryNodeRestClient.Status() + a.NoError(err) - log.Infof("primary node latest catchpoint - %s!\n", status.LastCatchpoint) + var round basics.Round + if status.LastCatchpoint != nil && len(*status.LastCatchpoint) > 0 { + round, _, err = ledgercore.ParseCatchpointLabel(*status.LastCatchpoint) + a.NoError(err) + if round >= targetCatchpointRound { + break + } + } + select { + case <-timer.C: + a.Failf("timeout waiting a catchpoint", "target: %d, got %d", targetCatchpointRound, round) + break outer + default: + time.Sleep(250 * time.Millisecond) + } + } + + log.Infof("primary node latest catchpoint - %s!\n", *status.LastCatchpoint) _, err = secondNodeRestClient.Catchup(*status.LastCatchpoint) a.NoError(err) @@ -260,9 +294,6 @@ func TestBasicCatchpointCatchup(t *testing.T) { currentRound++ } log.Infof("done catching up!\n") - - secondNode.StopAlgod() - primaryNode.StopAlgod() } func TestCatchpointLabelGeneration(t *testing.T) { @@ -295,9 +326,9 @@ func TestCatchpointLabelGeneration(t *testing.T) { // MaxBalLookback = 2 x SeedRefreshInterval x SeedLookback // ref. https://github.com/algorandfoundation/specs/blob/master/dev/abft.md catchpointCatchupProtocol.SeedLookback = 2 - catchpointCatchupProtocol.SeedRefreshInterval = 8 - catchpointCatchupProtocol.MaxBalLookback = 2 * catchpointCatchupProtocol.SeedLookback * catchpointCatchupProtocol.SeedRefreshInterval // 32 - catchpointCatchupProtocol.MaxTxnLife = 33 + catchpointCatchupProtocol.SeedRefreshInterval = 2 + catchpointCatchupProtocol.MaxBalLookback = 2 * catchpointCatchupProtocol.SeedLookback * catchpointCatchupProtocol.SeedRefreshInterval // 8 + catchpointCatchupProtocol.MaxTxnLife = 13 catchpointCatchupProtocol.CatchpointLookback = catchpointCatchupProtocol.MaxBalLookback catchpointCatchupProtocol.EnableOnlineAccountCatchpoints = true @@ -338,10 +369,11 @@ func TestCatchpointLabelGeneration(t *testing.T) { ExitErrorCallback: errorsCollector.nodeExitWithError, }) a.NoError(err) + defer primaryNode.StopAlgod() // Let the network make some progress currentRound := uint64(1) - targetRound := uint64(41) + targetRound := uint64(21) primaryNodeRestClient := fixture.GetAlgodClientForController(primaryNode) primaryNodeRestClient.SetAPIVersionAffinity(algodclient.APIVersionV2) log.Infof("Building ledger history..") @@ -364,7 +396,6 @@ func TestCatchpointLabelGeneration(t *testing.T) { } else { a.Empty(*primaryNodeStatus.LastCatchpoint) } - primaryNode.StopAlgod() }) } } diff --git a/test/framework/fixtures/webProxyFixture.go b/test/framework/fixtures/webProxyFixture.go index 94cee4ce7c..0b9e12d936 100644 --- a/test/framework/fixtures/webProxyFixture.go +++ b/test/framework/fixtures/webProxyFixture.go @@ -17,10 +17,11 @@ package fixtures import ( - "fmt" "net" "net/http" "strings" + + "github.com/algorand/go-algorand/logging" ) // WebProxyInterceptFunc expose the web proxy intercept function @@ -32,16 +33,16 @@ type WebProxy struct { listener net.Listener destination string intercept WebProxyInterceptFunc + log logging.Logger } // MakeWebProxy creates an instance of the web proxy -func MakeWebProxy(destination string, intercept WebProxyInterceptFunc) (wp *WebProxy, err error) { - if strings.HasPrefix(destination, "http://") { - destination = destination[7:] - } +func MakeWebProxy(destination string, log logging.Logger, intercept WebProxyInterceptFunc) (wp *WebProxy, err error) { + destination = strings.TrimPrefix(destination, "http://") wp = &WebProxy{ destination: destination, intercept: intercept, + log: log, } wp.server = &http.Server{ Handler: wp, @@ -63,6 +64,7 @@ func (wp *WebProxy) GetListenAddress() string { // Close release the web proxy resources func (wp *WebProxy) Close() { + wp.log.Debugln("webproxy: quiting") // we can't use shutdown, since we have tunneled websocket, which is a hijacked connection // that http.Server doens't know how to handle. wp.server.Close() @@ -70,7 +72,7 @@ func (wp *WebProxy) Close() { // ServeHTTP serves a single HTTP request func (wp *WebProxy) ServeHTTP(response http.ResponseWriter, request *http.Request) { - //fmt.Printf("incoming request for %v\n", request.URL) + wp.log.Debugf("webproxy: incoming request for %v", request.URL) if wp.intercept == nil { wp.Passthrough(response, request) return @@ -86,7 +88,7 @@ func (wp *WebProxy) Passthrough(response http.ResponseWriter, request *http.Requ clientRequestURL.Host = wp.destination clientRequest, err := http.NewRequest(request.Method, clientRequestURL.String(), request.Body) if err != nil { - fmt.Printf("Passthrough request assembly error %v (%#v)\n", err, clientRequestURL) + wp.log.Debugf("Passthrough request assembly error %v (%#v)", err, clientRequestURL) response.WriteHeader(http.StatusInternalServerError) return } @@ -99,7 +101,7 @@ func (wp *WebProxy) Passthrough(response http.ResponseWriter, request *http.Requ } clientResponse, err := client.Do(clientRequest) if err != nil { - fmt.Printf("Passthrough request error %v (%v)\n", err, request.URL.String()) + wp.log.Debugf("Passthrough request error %v (%v)", err, request.URL.String()) response.WriteHeader(http.StatusInternalServerError) return }