Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 46 additions & 29 deletions cmd/algoh/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func (c *stdCollector) Write(p []byte) (n int, err error) {
}

func main() {
blockWatcherInitialized := false
flag.Parse()
nc := getNodeController()

Expand Down Expand Up @@ -95,11 +96,11 @@ func main() {
reportErrorf("Data directory %s does not appear to be valid\n", dataDir)
}

config, err := algoh.LoadConfigFromFile(filepath.Join(dataDir, algoh.ConfigFilename))
algohConfig, err := algoh.LoadConfigFromFile(filepath.Join(dataDir, algoh.ConfigFilename))
if err != nil && !os.IsNotExist(err) {
reportErrorf("Error loading configuration, %v\n", err)
}
validateConfig(config)
validateConfig(algohConfig)

log := logging.Base()
configureLogging(genesisID, log, absolutePath)
Expand All @@ -126,35 +127,27 @@ func main() {

err = cmd.Start()
if err != nil {
reportErrorf("Error starting algod: %v", err)
reportErrorf("error starting algod: %v", err)
}
err = cmd.Wait()
if err != nil {
reportErrorf("error waiting for algod: %v", err)
}
cmd.Wait()
close(done)

// capture logs if algod terminated prior to blockWatcher starting
if !blockWatcherInitialized {
captureErrorLogs(algohConfig, errorOutput, output, absolutePath, true)
}

log.Infoln("++++++++++++++++++++++++++++++++++++++++")
log.Infoln("algod exited. Exiting...")
log.Infoln("++++++++++++++++++++++++++++++++++++++++")
}()

// Set up error capturing in case algod exits before we can get REST client
// Set up error capturing
defer func() {
if errorOutput.output != "" {
fmt.Fprintf(os.Stderr, errorOutput.output)
details := telemetryspec.ErrorOutputEventDetails{
Error: errorOutput.output,
Output: output.output,
}
log.EventWithDetails(telemetryspec.HostApplicationState, telemetryspec.ErrorOutputEvent, details)

// Write stdout & stderr streams to disk
ioutil.WriteFile(filepath.Join(absolutePath, nodecontrol.StdOutFilename), []byte(output.output), os.ModePerm)
ioutil.WriteFile(filepath.Join(absolutePath, nodecontrol.StdErrFilename), []byte(errorOutput.output), os.ModePerm)

if config.UploadOnError {
fmt.Fprintf(os.Stdout, "Uploading logs...\n")
sendLogs()
}
}
captureErrorLogs(algohConfig, errorOutput, output, absolutePath, false)
}()

// Handle signals cleanly
Expand All @@ -167,28 +160,30 @@ func main() {
os.Exit(0)
}()

client, err := waitForClient(nc, done)
algodClient, err := waitForClient(nc, done)
if err != nil {
reportErrorf("error creating Rest Client: %v\n", err)
}

var wg sync.WaitGroup

deadMan := makeDeadManWatcher(config.DeadManTimeSec, client, config.UploadOnError, done, &wg)
deadMan := makeDeadManWatcher(algohConfig.DeadManTimeSec, algodClient, algohConfig.UploadOnError, done, &wg)
wg.Add(1)

listeners := []blockListener{deadMan}
if config.SendBlockStats {
if algohConfig.SendBlockStats {
// Note: Resume can be implemented here. Store blockListener state and set curBlock based on latestBlock/lastBlock.
listeners = append(listeners, &blockstats{log: logging.Base()})
}

delayBetweenStatusChecks := time.Duration(config.StatusDelayMS) * time.Millisecond
stallDetectionDelay := time.Duration(config.StallDelayMS) * time.Millisecond
delayBetweenStatusChecks := time.Duration(algohConfig.StatusDelayMS) * time.Millisecond
stallDetectionDelay := time.Duration(algohConfig.StallDelayMS) * time.Millisecond

runBlockWatcher(listeners, client, done, &wg, delayBetweenStatusChecks, stallDetectionDelay)
runBlockWatcher(listeners, algodClient, done, &wg, delayBetweenStatusChecks, stallDetectionDelay)
wg.Add(1)

blockWatcherInitialized = true

wg.Wait()
fmt.Println("Exiting algoh normally...")
}
Expand All @@ -202,7 +197,7 @@ func waitForClient(nc nodecontrol.NodeController, abort chan struct{}) (client c

select {
case <-abort:
err = fmt.Errorf("Aborted waiting for client")
err = fmt.Errorf("aborted waiting for client")
return
case <-time.After(100 * time.Millisecond):
}
Expand Down Expand Up @@ -323,6 +318,28 @@ func initTelemetry(genesisID string, log logging.Logger, dataDirectory string) {
}
}

// capture algod error output and optionally upload logs
func captureErrorLogs(algohConfig algoh.HostConfig, errorOutput stdCollector, output stdCollector, absolutePath string, errorCondition bool) {
if errorOutput.output != "" {
fmt.Fprintf(os.Stdout, "errorOutput.output: `%s`\n", errorOutput.output)
errorCondition = true
fmt.Fprintf(os.Stderr, errorOutput.output)
details := telemetryspec.ErrorOutputEventDetails{
Error: errorOutput.output,
Output: output.output,
}
log.EventWithDetails(telemetryspec.HostApplicationState, telemetryspec.ErrorOutputEvent, details)

// Write stdout & stderr streams to disk
_ = ioutil.WriteFile(filepath.Join(absolutePath, nodecontrol.StdOutFilename), []byte(output.output), os.ModePerm)
_ = ioutil.WriteFile(filepath.Join(absolutePath, nodecontrol.StdErrFilename), []byte(errorOutput.output), os.ModePerm)
}
if errorCondition && algohConfig.UploadOnError {
fmt.Fprintf(os.Stdout, "Uploading logs...\n")
sendLogs()
}
}

func reportErrorf(format string, args ...interface{}) {
fmt.Fprintf(os.Stderr, format, args...)
logging.Base().Fatalf(format, args...)
Expand Down