Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions agreement/voteAggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,8 @@ func (agg *voteAggregator) filterBundle(ub unauthenticatedBundle, freshData fres

// voteStepFresh is a helper function for vote relay rules. Votes from steps
// [soft, next] are always propagated, as are votes from [s-1, s+1] where s is
// the current/last concluding step.
// the current/last concluding step. Set mine to 0 to effectively disable allowing
// votes adjacent to the current/last concluding step.
func voteStepFresh(descr string, proto protocol.ConsensusVersion, mine, vote step) error {
if vote <= next {
// always propagate first recovery vote to ensure synchronous block of periods after partition
Expand All @@ -248,8 +249,12 @@ func voteFresh(proto protocol.ConsensusVersion, freshData freshnessData, vote un
return fmt.Errorf("filtered vote from bad round: player.Round=%v; vote.Round=%v", freshData.PlayerRound, vote.R.Round)
}

if freshData.PlayerRound+1 == vote.R.Round && (vote.R.Period > 0 || vote.R.Step > next) {
return fmt.Errorf("filtered future vote from bad period or step: player.Round=%v; vote.(Round,Period,Step)=(%v,%v,%v)", freshData.PlayerRound, vote.R.Round, vote.R.Period, vote.R.Step)
if freshData.PlayerRound+1 == vote.R.Round {
if (vote.R.Period > 0) {
return fmt.Errorf("filtered future vote from bad period: player.Round=%v; vote.(Round,Period,Step)=(%v,%v,%v)", freshData.PlayerRound, vote.R.Round, vote.R.Period, vote.R.Step)
}
// pipeline votes from next round period 0
return voteStepFresh("from next round", proto, 0, vote.R.Step)
}

switch vote.R.Period {
Expand Down
62 changes: 62 additions & 0 deletions agreement/voteAggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -777,3 +777,65 @@ func TestVoteAggregatorFiltersVotePresentPeriod(t *testing.T) {
require.NoError(t, err)
require.NoErrorf(t, res, "VotePresent not correctly filtered")
}

func TestVoteAggregatorFiltersVoteNextRound(t *testing.T) {
// Set up a composed test machine
rRouter := new(rootRouter)
rRouter.update(player{}, 0, false)
voteM := &ioAutomataConcrete{
listener: rRouter.voteRoot,
routerCtx: rRouter,
}
helper := voteMakerHelper{}
helper.Setup()
b := testCaseBuilder{}

// define a current player state for freshness testing
lastConcludingStep := next
msgTemplate := filterableMessageEvent{
FreshnessData: freshnessData{
PlayerRound: round(10),
PlayerPeriod: period(10),
PlayerStep: next + 5,
PlayerLastConcluding: lastConcludingStep,
},
}
// generate old next vote in next round, period 0, step 1; make sure it is accepted
pV := helper.MakeRandomProposalValue()
uv := helper.MakeUnauthenticatedVote(t, 0, round(11), period(0), soft, *pV)
inMsg := msgTemplate // copy
inMsg.messageEvent = messageEvent{
T: votePresent,
Input: message{
UnauthenticatedVote: uv,
},
}
b.AddInOutPair(inMsg, emptyEvent{})

// next round, period 0, step > next should be rejected
uv = helper.MakeUnauthenticatedVote(t, 1, round(11), period(0), next+1, *pV)
inMsg = msgTemplate // copy
inMsg.messageEvent = messageEvent{
T: votePresent,
Input: message{
UnauthenticatedVote: uv,
},
}
b.AddInOutPair(inMsg, filteredEvent{T: voteFiltered})

// next round, period 1 should be rejected
uv = helper.MakeUnauthenticatedVote(t, 1, round(11), period(1), soft, *pV)
inMsg = msgTemplate // copy
inMsg.messageEvent = messageEvent{
T: votePresent,
Input: message{
UnauthenticatedVote: uv,
},
}
b.AddInOutPair(inMsg, filteredEvent{T: voteFiltered})

// finalize
res, err := b.Build().Validate(voteM)
require.NoError(t, err)
require.NoErrorf(t, res, "Votes from next round not correctly filtered")
}
3 changes: 2 additions & 1 deletion auction/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"errors"
"fmt"
"math"
"sync"

"github.com/algorand/go-deadlock"
Expand Down Expand Up @@ -289,7 +290,7 @@ func (am *Tracker) LiveUpdateWithContext(ctx context.Context, wg *sync.WaitGroup
log.Debugf("Getting transactions for %d-%d",
am.LastRound+1, status.LastRound)

transactions, err := rc.TransactionsByAddr(am.AuctionKey.GetChecksumAddress().String(), am.LastRound+1, status.LastRound)
transactions, err := rc.TransactionsByAddr(am.AuctionKey.GetChecksumAddress().String(), am.LastRound+1, status.LastRound, math.MaxUint64)
if err != nil {
log.Error(err)
fmt.Println(err)
Expand Down
22 changes: 19 additions & 3 deletions cmd/algod/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/algorand/go-algorand/logging"
"github.com/algorand/go-algorand/logging/telemetryspec"
"github.com/algorand/go-algorand/protocol"
"github.com/algorand/go-algorand/util/metrics"
"github.com/algorand/go-algorand/util/tokens"
)

Expand Down Expand Up @@ -71,13 +72,20 @@ func main() {
rand.Seed(time.Now().UnixNano())
}

version := config.GetCurrentVersion()
if *versionCheck {
version := config.GetCurrentVersion()
fmt.Printf("%d\n%s.%s [%s] (commit #%s)\n%s\n", version.AsUInt64(), version.String(),
version.Channel, version.Branch, version.GetCommitHash(), config.GetLicenseInfo())
return
}

heartbeatGauge := metrics.MakeStringGauge()
heartbeatGauge.Set("version", version.String())
heartbeatGauge.Set("version-num", strconv.FormatUint(version.AsUInt64(), 10))
heartbeatGauge.Set("channel", version.Channel)
heartbeatGauge.Set("branch", version.Branch)
heartbeatGauge.Set("commit-hash", version.GetCommitHash())

if *branchCheck {
fmt.Println(config.Branch)
return
Expand Down Expand Up @@ -183,10 +191,18 @@ func main() {

log.EventWithDetails(telemetryspec.ApplicationState, telemetryspec.StartupEvent, startupDetails)
// Send a heartbeat event every 10 minutes as a sign of life
ticker := time.NewTicker(10 * time.Minute)
go func() {
values := make(map[string]string)
for {
log.Event(telemetryspec.ApplicationState, telemetryspec.HeartbeatEvent)
<-time.After(10 * time.Minute)
metrics.DefaultRegistry().AddMetrics(values)

heartbeatDetails := telemetryspec.HeartbeatEventDetails{
Metrics: values,
}

log.EventWithDetails(telemetryspec.ApplicationState, telemetryspec.HeartbeatEvent, heartbeatDetails)
<-ticker.C
}
}()
}
Expand Down
75 changes: 46 additions & 29 deletions cmd/algoh/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func (c *stdCollector) Write(p []byte) (n int, err error) {
}

func main() {
blockWatcherInitialized := false
flag.Parse()
nc := getNodeController()

Expand Down Expand Up @@ -95,11 +96,11 @@ func main() {
reportErrorf("Data directory %s does not appear to be valid\n", dataDir)
}

config, err := algoh.LoadConfigFromFile(filepath.Join(dataDir, algoh.ConfigFilename))
algohConfig, err := algoh.LoadConfigFromFile(filepath.Join(dataDir, algoh.ConfigFilename))
if err != nil && !os.IsNotExist(err) {
reportErrorf("Error loading configuration, %v\n", err)
}
validateConfig(config)
validateConfig(algohConfig)

log := logging.Base()
configureLogging(genesisID, log, absolutePath)
Expand All @@ -126,35 +127,27 @@ func main() {

err = cmd.Start()
if err != nil {
reportErrorf("Error starting algod: %v", err)
reportErrorf("error starting algod: %v", err)
}
err = cmd.Wait()
if err != nil {
reportErrorf("error waiting for algod: %v", err)
}
cmd.Wait()
close(done)

// capture logs if algod terminated prior to blockWatcher starting
if !blockWatcherInitialized {
captureErrorLogs(algohConfig, errorOutput, output, absolutePath, true)
}

log.Infoln("++++++++++++++++++++++++++++++++++++++++")
log.Infoln("algod exited. Exiting...")
log.Infoln("++++++++++++++++++++++++++++++++++++++++")
}()

// Set up error capturing in case algod exits before we can get REST client
// Set up error capturing
defer func() {
if errorOutput.output != "" {
fmt.Fprintf(os.Stderr, errorOutput.output)
details := telemetryspec.ErrorOutputEventDetails{
Error: errorOutput.output,
Output: output.output,
}
log.EventWithDetails(telemetryspec.HostApplicationState, telemetryspec.ErrorOutputEvent, details)

// Write stdout & stderr streams to disk
ioutil.WriteFile(filepath.Join(absolutePath, nodecontrol.StdOutFilename), []byte(output.output), os.ModePerm)
ioutil.WriteFile(filepath.Join(absolutePath, nodecontrol.StdErrFilename), []byte(errorOutput.output), os.ModePerm)

if config.UploadOnError {
fmt.Fprintf(os.Stdout, "Uploading logs...\n")
sendLogs()
}
}
captureErrorLogs(algohConfig, errorOutput, output, absolutePath, false)
}()

// Handle signals cleanly
Expand All @@ -167,28 +160,30 @@ func main() {
os.Exit(0)
}()

client, err := waitForClient(nc, done)
algodClient, err := waitForClient(nc, done)
if err != nil {
reportErrorf("error creating Rest Client: %v\n", err)
}

var wg sync.WaitGroup

deadMan := makeDeadManWatcher(config.DeadManTimeSec, client, config.UploadOnError, done, &wg)
deadMan := makeDeadManWatcher(algohConfig.DeadManTimeSec, algodClient, algohConfig.UploadOnError, done, &wg)
wg.Add(1)

listeners := []blockListener{deadMan}
if config.SendBlockStats {
if algohConfig.SendBlockStats {
// Note: Resume can be implemented here. Store blockListener state and set curBlock based on latestBlock/lastBlock.
listeners = append(listeners, &blockstats{log: logging.Base()})
}

delayBetweenStatusChecks := time.Duration(config.StatusDelayMS) * time.Millisecond
stallDetectionDelay := time.Duration(config.StallDelayMS) * time.Millisecond
delayBetweenStatusChecks := time.Duration(algohConfig.StatusDelayMS) * time.Millisecond
stallDetectionDelay := time.Duration(algohConfig.StallDelayMS) * time.Millisecond

runBlockWatcher(listeners, client, done, &wg, delayBetweenStatusChecks, stallDetectionDelay)
runBlockWatcher(listeners, algodClient, done, &wg, delayBetweenStatusChecks, stallDetectionDelay)
wg.Add(1)

blockWatcherInitialized = true

wg.Wait()
fmt.Println("Exiting algoh normally...")
}
Expand All @@ -202,7 +197,7 @@ func waitForClient(nc nodecontrol.NodeController, abort chan struct{}) (client c

select {
case <-abort:
err = fmt.Errorf("Aborted waiting for client")
err = fmt.Errorf("aborted waiting for client")
return
case <-time.After(100 * time.Millisecond):
}
Expand Down Expand Up @@ -323,6 +318,28 @@ func initTelemetry(genesisID string, log logging.Logger, dataDirectory string) {
}
}

// capture algod error output and optionally upload logs
func captureErrorLogs(algohConfig algoh.HostConfig, errorOutput stdCollector, output stdCollector, absolutePath string, errorCondition bool) {
if errorOutput.output != "" {
fmt.Fprintf(os.Stdout, "errorOutput.output: `%s`\n", errorOutput.output)
errorCondition = true
fmt.Fprintf(os.Stderr, errorOutput.output)
details := telemetryspec.ErrorOutputEventDetails{
Error: errorOutput.output,
Output: output.output,
}
log.EventWithDetails(telemetryspec.HostApplicationState, telemetryspec.ErrorOutputEvent, details)

// Write stdout & stderr streams to disk
_ = ioutil.WriteFile(filepath.Join(absolutePath, nodecontrol.StdOutFilename), []byte(output.output), os.ModePerm)
_ = ioutil.WriteFile(filepath.Join(absolutePath, nodecontrol.StdErrFilename), []byte(errorOutput.output), os.ModePerm)
}
if errorCondition && algohConfig.UploadOnError {
fmt.Fprintf(os.Stdout, "Uploading logs...\n")
sendLogs()
}
}

func reportErrorf(format string, args ...interface{}) {
fmt.Fprintf(os.Stderr, format, args...)
logging.Base().Fatalf(format, args...)
Expand Down
Loading