Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions catchup/catchpointService.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ package catchup
import (
"context"
"fmt"
"github.com/algorand/go-algorand/stateproof"
"sync"
"time"

"github.com/algorand/go-algorand/stateproof"
Comment thread
algorandskiy marked this conversation as resolved.

"github.com/algorand/go-deadlock"

"github.com/algorand/go-algorand/config"
Expand Down Expand Up @@ -497,6 +498,7 @@ func (cs *CatchpointCatchupService) processStageBlocksDownload() (err error) {
if lookback < lookbackForStateProofSupport {
lookback = lookbackForStateProofSupport
}

// in case the effective lookback is going before our rounds count, trim it there.
// ( a catchpoint is generated starting round MaxBalLookback, and this is a possible in any round in the range of MaxBalLookback..MaxTxnLife)
if lookback >= uint64(topBlock.Round()) {
Expand Down Expand Up @@ -586,9 +588,12 @@ func (cs *CatchpointCatchupService) processStageBlocksDownload() (err error) {
return cs.abort(fmt.Errorf("processStageBlocksDownload: downloaded block content does not match downloaded block header"))
}

cs.updateBlockRetrievalStatistics(0, 1)
peerRank := cs.blocksDownloadPeerSelector.peerDownloadDurationToRank(psp, blockDownloadDuration)
cs.blocksDownloadPeerSelector.rankPeer(psp, peerRank)
if psp != nil {
// the block might have been retrieved from the local ledger, nothing to rank
cs.updateBlockRetrievalStatistics(0, 1)
peerRank := cs.blocksDownloadPeerSelector.peerDownloadDurationToRank(psp, blockDownloadDuration)
cs.blocksDownloadPeerSelector.rankPeer(psp, peerRank)
}

// all good, persist and move on.
err = cs.ledgerAccessor.StoreBlock(cs.ctx, blk)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

"github.com/algorand/go-deadlock"

"github.com/algorand/go-algorand/logging"
"github.com/algorand/go-algorand/test/framework/fixtures"
)

Expand All @@ -49,14 +50,15 @@ func main() {
return
}
var mu deadlock.Mutex
wp, err := fixtures.MakeWebProxy(*webProxyDestination, func(response http.ResponseWriter, request *http.Request, next http.HandlerFunc) {
log := logging.Base()
wp, err := fixtures.MakeWebProxy(*webProxyDestination, log, func(response http.ResponseWriter, request *http.Request, next http.HandlerFunc) {
mu.Lock()
time.Sleep(time.Duration(*webProxyRequestDelay) * time.Millisecond)
mu.Unlock()
// prevent requests for block #2 to go through.
if strings.HasSuffix(request.URL.String(), "/block/2") {
response.Write([]byte("webProxy prevents block 2 from serving"))
response.WriteHeader(http.StatusBadRequest)
response.Write([]byte("webProxy prevents block 2 from serving"))
return
}
if *webProxyLogFile != "" {
Expand Down
79 changes: 55 additions & 24 deletions test/e2e-go/features/catchup/catchpointCatchup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package catchup

import (
"fmt"
generatedV2 "github.com/algorand/go-algorand/daemon/algod/api/server/v2/generated"
"net/http"
"os/exec"
"path/filepath"
Expand All @@ -32,6 +31,9 @@ import (

"github.com/algorand/go-algorand/config"
algodclient "github.com/algorand/go-algorand/daemon/algod/api/client"
generatedV2 "github.com/algorand/go-algorand/daemon/algod/api/server/v2/generated"
"github.com/algorand/go-algorand/data/basics"
"github.com/algorand/go-algorand/ledger/ledgercore"
"github.com/algorand/go-algorand/logging"
"github.com/algorand/go-algorand/nodecontrol"
"github.com/algorand/go-algorand/protocol"
Expand Down Expand Up @@ -116,11 +118,11 @@ func TestBasicCatchpointCatchup(t *testing.T) {

// Overview of this test:
// Start a two-node network (primary has 100%, secondary has 0%)
// Nodes are having a consensus allowing balances history of 32 rounds and transaction history of 33 rounds.
// Let it run for 37 rounds.
// create a web proxy, and connect it to the primary node, blocking all requests for round #1. ( and allowing everything else )
// start a secondary node, and instuct it to catchpoint catchup from the proxy. ( which would be for round 36 )
// wait until the clone node cought up, skipping the "impossible" hole of round #1.
// Nodes are having a consensus allowing balances history of 8 rounds and transaction history of 13 rounds.
// Let it run for 21 rounds.
// create a web proxy, and connect it to the primary node, blocking all requests for round #2. ( and allowing everything else )
// start a secondary node, and instuct it to catchpoint catchup from the proxy. ( which would be for round 20 )
// wait until the clone node cought up, skipping the "impossible" hole of round #2.

consensus := make(config.ConsensusProtocols)
const consensusCatchpointCatchupTestProtocol = protocol.ConsensusVersion("catchpointtestingprotocol")
Expand All @@ -129,9 +131,9 @@ func TestBasicCatchpointCatchup(t *testing.T) {
// MaxBalLookback = 2 x SeedRefreshInterval x SeedLookback
// ref. https://github.com/algorandfoundation/specs/blob/master/dev/abft.md
catchpointCatchupProtocol.SeedLookback = 2
catchpointCatchupProtocol.SeedRefreshInterval = 8
catchpointCatchupProtocol.MaxBalLookback = 2 * catchpointCatchupProtocol.SeedLookback * catchpointCatchupProtocol.SeedRefreshInterval // 32
catchpointCatchupProtocol.MaxTxnLife = 33
catchpointCatchupProtocol.SeedRefreshInterval = 2
catchpointCatchupProtocol.MaxBalLookback = 2 * catchpointCatchupProtocol.SeedLookback * catchpointCatchupProtocol.SeedRefreshInterval // 8
catchpointCatchupProtocol.MaxTxnLife = 13
catchpointCatchupProtocol.CatchpointLookback = catchpointCatchupProtocol.MaxBalLookback
catchpointCatchupProtocol.EnableOnlineAccountCatchpoints = true

Expand Down Expand Up @@ -161,13 +163,16 @@ func TestBasicCatchpointCatchup(t *testing.T) {
// prepare it's configuration file to set it to generate a catchpoint every 4 rounds.
cfg, err := config.LoadConfigFromDisk(primaryNode.GetDataDir())
a.NoError(err)
cfg.CatchpointInterval = 4
const catchpointInterval = 4
cfg.CatchpointInterval = catchpointInterval
cfg.MaxAcctLookback = 2
cfg.SaveToDisk(primaryNode.GetDataDir())
cfg.Archival = false
cfg.CatchpointInterval = 0
cfg.NetAddress = ""
cfg.EnableLedgerService = false
cfg.EnableBlockService = false
cfg.BaseLoggerDebugLevel = uint32(logging.Debug)
cfg.SaveToDisk(secondNode.GetDataDir())

// start the primary node
Expand All @@ -180,10 +185,15 @@ func TestBasicCatchpointCatchup(t *testing.T) {
ExitErrorCallback: errorsCollector.nodeExitWithError,
})
a.NoError(err)
defer primaryNode.StopAlgod()

// Let the network make some progress
currentRound := uint64(1)
const targetRound = uint64(37)
expectedBlocksToDownload := catchpointCatchupProtocol.MaxTxnLife + catchpointCatchupProtocol.DeeperBlockHeaderHistory
const restrictedBlock = 2 // block number that is rejected to be downloaded to ensure fast catchup and not regular catchup is running
// calculate the target round: this is the next round after catchpoint that is greater than expectedBlocksToDownload before the restrictedBlock block number
targetCatchpointRound := (basics.Round(expectedBlocksToDownload+restrictedBlock)/catchpointInterval + 1) * catchpointInterval
targetRound := uint64(targetCatchpointRound) + 1 // 21
primaryNodeRestClient := fixture.GetAlgodClientForController(primaryNode)
primaryNodeRestClient.SetAPIVersionAffinity(algodclient.APIVersionV2)
log.Infof("Building ledger history..")
Expand All @@ -194,16 +204,17 @@ func TestBasicCatchpointCatchup(t *testing.T) {
break
}
currentRound++

}
log.Infof("done building!\n")

primaryListeningAddress, err := primaryNode.GetListeningAddress()
a.NoError(err)

wp, err := fixtures.MakeWebProxy(primaryListeningAddress, func(response http.ResponseWriter, request *http.Request, next http.HandlerFunc) {
wp, err := fixtures.MakeWebProxy(primaryListeningAddress, log, func(response http.ResponseWriter, request *http.Request, next http.HandlerFunc) {
// prevent requests for block #2 to go through.
if request.URL.String() == "/v1/test-v1/block/2" {
response.WriteHeader(http.StatusBadRequest)
response.Write([]byte("webProxy prevents block 2 from serving"))
return
}
next(response, request)
Expand All @@ -222,6 +233,7 @@ func TestBasicCatchpointCatchup(t *testing.T) {
ExitErrorCallback: errorsCollector.nodeExitWithError,
})
a.NoError(err)
defer secondNode.StopAlgod()

// wait until node is caught up.
secondNodeRestClient := fixture.GetAlgodClientForController(secondNode)
Expand All @@ -240,10 +252,32 @@ func TestBasicCatchpointCatchup(t *testing.T) {
}
log.Infof(" - done catching up!\n")

status, err := awaitCatchpointCreation(primaryNodeRestClient, &fixture, 3)
a.NoError(err)
// ensure the catchpoint is created for targetCatchpointRound
var status generatedV2.NodeStatusResponse
timer := time.NewTimer(10 * time.Second)
outer:
for {
status, err = primaryNodeRestClient.Status()
a.NoError(err)

log.Infof("primary node latest catchpoint - %s!\n", status.LastCatchpoint)
var round basics.Round
if status.LastCatchpoint != nil && len(*status.LastCatchpoint) > 0 {
round, _, err = ledgercore.ParseCatchpointLabel(*status.LastCatchpoint)
a.NoError(err)
if round >= targetCatchpointRound {
break
}
}
select {
case <-timer.C:
a.Failf("timeout waiting a catchpoint", "target: %d, got %d", targetCatchpointRound, round)
break outer
default:
time.Sleep(250 * time.Millisecond)
}
}

log.Infof("primary node latest catchpoint - %s!\n", *status.LastCatchpoint)
_, err = secondNodeRestClient.Catchup(*status.LastCatchpoint)
a.NoError(err)

Comment thread
michaeldiamant marked this conversation as resolved.
Expand All @@ -260,9 +294,6 @@ func TestBasicCatchpointCatchup(t *testing.T) {
currentRound++
}
log.Infof("done catching up!\n")

secondNode.StopAlgod()
primaryNode.StopAlgod()
}

func TestCatchpointLabelGeneration(t *testing.T) {
Expand Down Expand Up @@ -295,9 +326,9 @@ func TestCatchpointLabelGeneration(t *testing.T) {
// MaxBalLookback = 2 x SeedRefreshInterval x SeedLookback
// ref. https://github.com/algorandfoundation/specs/blob/master/dev/abft.md
catchpointCatchupProtocol.SeedLookback = 2
catchpointCatchupProtocol.SeedRefreshInterval = 8
catchpointCatchupProtocol.MaxBalLookback = 2 * catchpointCatchupProtocol.SeedLookback * catchpointCatchupProtocol.SeedRefreshInterval // 32
catchpointCatchupProtocol.MaxTxnLife = 33
catchpointCatchupProtocol.SeedRefreshInterval = 2
catchpointCatchupProtocol.MaxBalLookback = 2 * catchpointCatchupProtocol.SeedLookback * catchpointCatchupProtocol.SeedRefreshInterval // 8
catchpointCatchupProtocol.MaxTxnLife = 13
catchpointCatchupProtocol.CatchpointLookback = catchpointCatchupProtocol.MaxBalLookback
catchpointCatchupProtocol.EnableOnlineAccountCatchpoints = true

Expand Down Expand Up @@ -338,10 +369,11 @@ func TestCatchpointLabelGeneration(t *testing.T) {
ExitErrorCallback: errorsCollector.nodeExitWithError,
})
a.NoError(err)
defer primaryNode.StopAlgod()

// Let the network make some progress
currentRound := uint64(1)
targetRound := uint64(41)
targetRound := uint64(21)
primaryNodeRestClient := fixture.GetAlgodClientForController(primaryNode)
primaryNodeRestClient.SetAPIVersionAffinity(algodclient.APIVersionV2)
log.Infof("Building ledger history..")
Expand All @@ -364,7 +396,6 @@ func TestCatchpointLabelGeneration(t *testing.T) {
} else {
a.Empty(*primaryNodeStatus.LastCatchpoint)
}
primaryNode.StopAlgod()
})
}
}
18 changes: 10 additions & 8 deletions test/framework/fixtures/webProxyFixture.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@
package fixtures

import (
"fmt"
"net"
"net/http"
"strings"

"github.com/algorand/go-algorand/logging"
)

// WebProxyInterceptFunc expose the web proxy intercept function
Expand All @@ -32,16 +33,16 @@ type WebProxy struct {
listener net.Listener
destination string
intercept WebProxyInterceptFunc
log logging.Logger
}

// MakeWebProxy creates an instance of the web proxy
func MakeWebProxy(destination string, intercept WebProxyInterceptFunc) (wp *WebProxy, err error) {
if strings.HasPrefix(destination, "http://") {
destination = destination[7:]
}
func MakeWebProxy(destination string, log logging.Logger, intercept WebProxyInterceptFunc) (wp *WebProxy, err error) {
destination = strings.TrimPrefix(destination, "http://")
wp = &WebProxy{
destination: destination,
intercept: intercept,
log: log,
}
wp.server = &http.Server{
Handler: wp,
Expand All @@ -63,14 +64,15 @@ func (wp *WebProxy) GetListenAddress() string {

// Close release the web proxy resources
func (wp *WebProxy) Close() {
wp.log.Debugln("webproxy: quiting")
// we can't use shutdown, since we have tunneled websocket, which is a hijacked connection
// that http.Server doens't know how to handle.
wp.server.Close()
}

// ServeHTTP serves a single HTTP request
func (wp *WebProxy) ServeHTTP(response http.ResponseWriter, request *http.Request) {
//fmt.Printf("incoming request for %v\n", request.URL)
wp.log.Debugf("webproxy: incoming request for %v", request.URL)
if wp.intercept == nil {
wp.Passthrough(response, request)
return
Expand All @@ -86,7 +88,7 @@ func (wp *WebProxy) Passthrough(response http.ResponseWriter, request *http.Requ
clientRequestURL.Host = wp.destination
clientRequest, err := http.NewRequest(request.Method, clientRequestURL.String(), request.Body)
if err != nil {
fmt.Printf("Passthrough request assembly error %v (%#v)\n", err, clientRequestURL)
wp.log.Debugf("Passthrough request assembly error %v (%#v)", err, clientRequestURL)
response.WriteHeader(http.StatusInternalServerError)
return
}
Expand All @@ -99,7 +101,7 @@ func (wp *WebProxy) Passthrough(response http.ResponseWriter, request *http.Requ
}
clientResponse, err := client.Do(clientRequest)
if err != nil {
fmt.Printf("Passthrough request error %v (%v)\n", err, request.URL.String())
wp.log.Debugf("Passthrough request error %v (%v)", err, request.URL.String())
response.WriteHeader(http.StatusInternalServerError)
return
}
Expand Down