Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -157,12 +157,17 @@ buildsrc: $(SRCPATH)/crypto/lib/libsodium.a node_exporter NONGO_BIN deps $(ALGOD
cd $(SRCPATH) && \
go vet $(UNIT_TEST_SOURCES) $(E2E_TEST_SOURCES)

SOURCES_RACE := github.com/algorand/go-algorand/cmd/kmd

## Build binaries with the race detector enabled in them.
## This allows us to run e2e tests with race detection.
## We overwrite bin-race/kmd with a non -race version due to
## the incredible performance impact of -race on Scrypt.
build-race: build
@mkdir -p $(GOPATH)/bin-race
cd $(SRCPATH) && \
GOBIN=$(GOPATH)/bin-race go install $(GOTAGS) -ldflags="$(GOLDFLAGS)" $(SOURCES)
GOBIN=$(GOPATH)/bin-race go install $(GOTAGS) -race -ldflags="$(GOLDFLAGS)" $(SOURCES) && \
GOBIN=$(GOPATH)/bin-race go install $(GOTAGS) -ldflags="$(GOLDFLAGS)" $(SOURCES_RACE)

NONGO_BIN_FILES=$(GOPATH)/bin/find-nodes.sh $(GOPATH)/bin/update.sh $(GOPATH)/bin/updatekey.json $(GOPATH)/bin/COPYING

Expand Down
3 changes: 3 additions & 0 deletions cmd/goal/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ func init() {
// ledger.go
rootCmd.AddCommand(ledgerCmd)

// completion.go
rootCmd.AddCommand(completionCmd)

// Config
defaultDataDirValue := []string{""}
rootCmd.PersistentFlags().StringArrayVarP(&dataDirs, "datadir", "d", defaultDataDirValue, "Data directory for the node")
Expand Down
59 changes: 59 additions & 0 deletions cmd/goal/completion.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright (C) 2019 Algorand, Inc.
// This file is part of go-algorand
//
// go-algorand is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// go-algorand is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with go-algorand. If not, see <https://www.gnu.org/licenses/>.

package main

import (
"os"

"github.com/spf13/cobra"
)

func init() {
completionCmd.AddCommand(bashCompletionCmd)
completionCmd.AddCommand(zshCompletionCmd)
}

var completionCmd = &cobra.Command{
Use: "completion",
Short: "Shell completion helper",
Long: "Shell completion helper",
Args: validateNoPosArgsFn,
Run: func(cmd *cobra.Command, args []string) {
// If no arguments passed, we should fallback to help
cmd.HelpFunc()(cmd, args)
},
}

var bashCompletionCmd = &cobra.Command{
Use: "bash",
Short: "Generate bash completion commands",
Long: "Generate bash completion commands",
Args: validateNoPosArgsFn,
Run: func(cmd *cobra.Command, _ []string) {
rootCmd.GenBashCompletion(os.Stdout)
},
}

var zshCompletionCmd = &cobra.Command{
Use: "zsh",
Short: "Generate zsh completion commands",
Long: "Generate zsh completion commands",
Args: validateNoPosArgsFn,
Run: func(cmd *cobra.Command, _ []string) {
rootCmd.GenZshCompletion(os.Stdout)
},
}
2 changes: 1 addition & 1 deletion cmd/goal/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,6 @@ var supplyCmd = &cobra.Command{
reportErrorf(errorRequestFail, err)
}

fmt.Printf("Round: %v microAlgos\nTotal Money: %v microAlgos\nOnline Money: %v microAlgos\n", response.Round, response.TotalMoney, response.OnlineMoney)
fmt.Printf("Round: %v\nTotal Money: %v microAlgos\nOnline Money: %v microAlgos\n", response.Round, response.TotalMoney, response.OnlineMoney)
},
}
5 changes: 4 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -669,7 +669,7 @@ type Local struct {
// the max size the sync server would return
TxSyncServeResponseSize int

// IsIndexerActive indicates wheather to activate the indexer for fast retrieval of transactions
// IsIndexerActive indicates whether to activate the indexer for fast retrieval of transactions
// Note -- Indexer cannot operate on non Archival nodes
IsIndexerActive bool

Expand All @@ -678,6 +678,9 @@ type Local struct {
// proxy vendor provides another header field. In the case of CloudFlare proxy, the "CF-Connecting-IP" header
// field can be used.
UseXForwardedForAddressField string

// ForceRelayMessages indicates whether the network library relay messages even in the case that no NetAddress was specified.
ForceRelayMessages bool
}

// Filenames of config files within the configdir (e.g. ~/.algorand)
Expand Down
7 changes: 7 additions & 0 deletions daemon/algod/api/server/lib/middlewares/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,13 @@ func Auth(log logging.Logger, apiToken string) func(http.Handler) http.Handler {
// Handle debug route
// Grab the apiToken from the HTTP header
providedToken := []byte(r.Header.Get(TokenHeader))
if len(providedToken) == 0 {
// Accept tokens provided in a bearer token format.
authentication := strings.SplitN(r.Header.Get("Authorization"), " ", 2)
if len(authentication) == 2 && strings.EqualFold("Bearer", authentication[0]) {
providedToken = []byte(authentication[1])
}
}
if route.GetName() == debugRouteName {
// For debug routes, we place the apiToken in the path itself
providedToken = []byte(mux.Vars(r)["apiToken"])
Expand Down
16 changes: 7 additions & 9 deletions daemon/kmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,15 +101,13 @@ func ValidateConfig(cfg WalletServerConfig) error {

// MakeWalletServer takes a WalletServerConfig, and returns a validated,
// configured WalletServer.
func MakeWalletServer(config WalletServerConfig) (WalletServer, error) {
var ws WalletServer

func MakeWalletServer(config WalletServerConfig) (*WalletServer, error) {
err := ValidateConfig(config)
if err != nil {
return ws, err
return nil, err
}

ws = WalletServer{
ws := &WalletServer{
WalletServerConfig: config,
netPath: filepath.Join(config.DataDir, NetFilename),
pidPath: filepath.Join(config.DataDir, PIDFilename),
Expand Down Expand Up @@ -144,7 +142,7 @@ func (ws *WalletServer) releaseFileLock() error {
}

// Write out a file containing the address kmd is listening on
func (ws WalletServer) writeStateFiles(netAddr string) (err error) {
func (ws *WalletServer) writeStateFiles(netAddr string) (err error) {
// netPath file contains path to sock file
err = ioutil.WriteFile(ws.netPath, []byte(netAddr), 0640)
if err != nil {
Expand All @@ -156,15 +154,15 @@ func (ws WalletServer) writeStateFiles(netAddr string) (err error) {
}

// Delete the state files generated by writeStateFiles
func (ws WalletServer) deleteStateFiles() {
func (ws *WalletServer) deleteStateFiles() {
os.Remove(ws.pidPath)
os.Remove(ws.netPath)
}

// makeWatchdogCallback generates a callback function that either 1. does
// nothing if ws.Timeout is nil, or 2. kicks a watchdog timer that will kill
// kmd when it expires.
func (ws WalletServer) makeWatchdogCallback(kill chan os.Signal) func() {
func (ws *WalletServer) makeWatchdogCallback(kill chan os.Signal) func() {
// If Timeout is nil, then we will not kill kmd after a timeout
if ws.Timeout == nil {
return func() {}
Expand Down Expand Up @@ -193,7 +191,7 @@ func (ws WalletServer) makeWatchdogCallback(kill chan os.Signal) func() {
// returns an error if it was unable to start the server. It reads from the
// `kill` channel in order to shut down the server gracefully, and returns a
// `died` channel that will be written after the server exits.
func (ws WalletServer) Start(kill chan os.Signal) (died chan error, sock string, err error) {
func (ws *WalletServer) Start(kill chan os.Signal) (died chan error, sock string, err error) {
// Ensure we're the only instance of kmd running in this data directory
err = ws.acquireFileLock()
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion installer/config.json.example
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,6 @@
"TxPoolSize": 50000,
"TxSyncIntervalSeconds": 60,
"TxSyncServeResponseSize": 1000000,
"TxSyncTimeoutSeconds": 30
"TxSyncTimeoutSeconds": 30,
"ForceRelayMessages": false
}
39 changes: 21 additions & 18 deletions network/wsNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,7 @@ func (wn *WebsocketNetwork) setup() {
wn.server.IdleTimeout = httpServerIdleTimeout
wn.server.MaxHeaderBytes = httpServerMaxHeaderBytes
wn.ctx, wn.ctxCancel = context.WithCancel(context.Background())
wn.relayMessages = wn.config.NetAddress != "" || wn.config.ForceRelayMessages
// roughly estimate the number of messages that could be sent over the lifespan of a single round.
wn.outgoingMessagesBufferSize = int(config.Consensus[protocol.ConsensusCurrentVersion].NumProposers*2 +
config.Consensus[protocol.ConsensusCurrentVersion].SoftCommitteeSize +
Expand Down Expand Up @@ -801,11 +802,24 @@ func (wn *WebsocketNetwork) updateURLHost(originalRootURL string, originIP net.I

// ServerHTTP handles the gossip network functions over websockets
func (wn *WebsocketNetwork) ServeHTTP(response http.ResponseWriter, request *http.Request) {
remoteHost, _, err := net.SplitHostPort(request.RemoteAddr)
if err != nil {
// this error should not happen. The go framework is responsible for populating the RemoteAddr using the incoming TCP connection
// information.
wn.log.Errorf("could not parse request.RemoteAddr=%v, %s", request.RemoteAddr, err)
response.WriteHeader(http.StatusServiceUnavailable)
return
}
originIP := wn.getForwardedConnectionAddress(request.Header)
if originIP != nil {
remoteHost = originIP.String()
}

if wn.numIncomingPeers() >= wn.config.IncomingConnectionsLimit {
networkConnectionsDroppedTotal.Inc(map[string]string{"reason": "incoming_connection_limit"})
wn.log.EventWithDetails(telemetryspec.Network, telemetryspec.ConnectPeerFailEvent,
telemetryspec.ConnectPeerFailEventDetails{
Address: justHost(request.RemoteAddr),
Address: remoteHost,
HostName: request.Header.Get(TelemetryIDHeader),
Incoming: true,
InstanceName: request.Header.Get(InstanceNameHeader),
Expand All @@ -814,21 +828,12 @@ func (wn *WebsocketNetwork) ServeHTTP(response http.ResponseWriter, request *htt
response.WriteHeader(http.StatusServiceUnavailable)
return
}
remoteHost, _, err := net.SplitHostPort(request.RemoteAddr)
if err != nil {
wn.log.Errorf("could not parse request.RemoteAddr=%v, %s", request.RemoteAddr, err)
response.WriteHeader(http.StatusServiceUnavailable)
return
}
originIP := wn.getForwardedConnectionAddress(request.Header)
if originIP != nil {
remoteHost = originIP.String()
}

if wn.connectedForIP(remoteHost) >= wn.config.MaxConnectionsPerIP {
networkConnectionsDroppedTotal.Inc(map[string]string{"reason": "incoming_connection_limit"})
networkConnectionsDroppedTotal.Inc(map[string]string{"reason": "incoming_connection_per_ip_limit"})
wn.log.EventWithDetails(telemetryspec.Network, telemetryspec.ConnectPeerFailEvent,
telemetryspec.ConnectPeerFailEventDetails{
Address: justHost(request.RemoteAddr),
Address: remoteHost,
HostName: request.Header.Get(TelemetryIDHeader),
Incoming: true,
InstanceName: request.Header.Get(InstanceNameHeader),
Expand Down Expand Up @@ -878,7 +883,7 @@ func (wn *WebsocketNetwork) ServeHTTP(response http.ResponseWriter, request *htt
wn.log.With("event", "ConnectedIn").With("remote", otherPublicAddr).With("local", localAddr).Infof("Accepted incoming connection from peer %s", otherPublicAddr)
wn.log.EventWithDetails(telemetryspec.Network, telemetryspec.ConnectPeerEvent,
telemetryspec.PeerEventDetails{
Address: justHost(request.RemoteAddr),
Address: remoteHost,
HostName: otherTelemetryGUID,
Incoming: true,
InstanceName: otherInstanceName,
Expand Down Expand Up @@ -1528,8 +1533,6 @@ func NewWebsocketNetwork(log logging.Logger, config config.Local, phonebook Phon
outerPhonebook := &MultiPhonebook{phonebooks: []Phonebook{phonebook}}
wn = &WebsocketNetwork{log: log, config: config, phonebook: outerPhonebook, GenesisID: genesisID, NetworkID: networkID}

// TODO - add config parameter to allow non-relays to enable relaying.
wn.relayMessages = config.NetAddress != ""
wn.setup()
return wn, nil
}
Expand All @@ -1554,9 +1557,9 @@ func (wn *WebsocketNetwork) removePeer(peer *wsPeer, reason disconnectReason) {
// definitely don't change this to do the logging while holding the lock.
localAddr, _ := wn.Address()
wn.log.With("event", "Disconnected").With("remote", peer.rootURL).With("local", localAddr).Infof("Peer %v disconnected", peer.rootURL)
peerAddr := ""
peerAddr := peer.OriginAddress()
// we might be able to get addr out of conn, or it might be closed
if peer.conn != nil {
if peerAddr == "" && peer.conn != nil {
paddr := peer.conn.RemoteAddr()
if paddr != nil {
peerAddr = justHost(paddr.String())
Expand Down
83 changes: 83 additions & 0 deletions network/wsNetwork_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1464,3 +1464,86 @@ func TestSlowPeerDisconnection(t *testing.T) {
time.Sleep(time.Millisecond * 5)
}
}

func TestForceMessageRelaying(t *testing.T) {
log := logging.TestingLog(t)
log.SetLevel(logging.Level(defaultConfig.BaseLoggerDebugLevel))
wn := &WebsocketNetwork{
log: log,
config: defaultConfig,
phonebook: emptyPhonebookSingleton,
GenesisID: "go-test-network-genesis",
NetworkID: config.Devtestnet,
}
wn.setup()
wn.eventualReadyDelay = time.Second

netA := wn
netA.config.GossipFanout = 1

defer func() { t.Log("stopping A"); netA.Stop(); t.Log("A done") }()

counter := newMessageCounter(t, 5)
counterDone := counter.done
netA.RegisterHandlers([]TaggedMessageHandler{TaggedMessageHandler{Tag: debugTag, MessageHandler: counter}})
netA.Start()
addrA, postListen := netA.Address()
require.Truef(t, postListen, "Listening network failed to start")

noAddressConfig := defaultConfig
noAddressConfig.NetAddress = ""
netB := makeTestWebsocketNodeWithConfig(t, noAddressConfig)
netB.config.GossipFanout = 1
netB.phonebook = &oneEntryPhonebook{addrA}
netB.Start()
defer func() { t.Log("stopping B"); netB.Stop(); t.Log("B done") }()

noAddressConfig.ForceRelayMessages = true
netC := makeTestWebsocketNodeWithConfig(t, noAddressConfig)
netC.config.GossipFanout = 1
netC.phonebook = &oneEntryPhonebook{addrA}
netC.Start()
defer func() { t.Log("stopping C"); netB.Stop(); t.Log("C done") }()

readyTimeout := time.NewTimer(2 * time.Second)
waitReady(t, netA, readyTimeout.C)
waitReady(t, netB, readyTimeout.C)
waitReady(t, netC, readyTimeout.C)

// send 5 messages from both netB and netC to netA
for i := 0; i < 5; i++ {
err := netB.Relay(context.Background(), debugTag, []byte{1, 2, 3}, true, nil)
require.NoError(t, err)
err = netC.Relay(context.Background(), debugTag, []byte{1, 2, 3}, true, nil)
require.NoError(t, err)
}

select {
case <-counterDone:
case <-time.After(2 * time.Second):
if counter.count < 5 {
require.Failf(t, "One or more messages failed to reach destination network", "%d > %d", 5, counter.count)
} else if counter.count > 5 {
require.Failf(t, "One or more messages that were expected to be dropped, reached destination network", "%d < %d", 5, counter.count)
}
}
netA.ClearHandlers()
counter = newMessageCounter(t, 10)
counterDone = counter.done
netA.RegisterHandlers([]TaggedMessageHandler{TaggedMessageHandler{Tag: debugTag, MessageHandler: counter}})

// hack the relayMessages on the netB so that it would start sending messages.
netB.relayMessages = true
// send additional 10 messages from netB
for i := 0; i < 10; i++ {
err := netB.Relay(context.Background(), debugTag, []byte{1, 2, 3}, true, nil)
require.NoError(t, err)
}

select {
case <-counterDone:
case <-time.After(2 * time.Second):
require.Failf(t, "One or more messages failed to reach destination network", "%d > %d", 10, counter.count)
}

}
8 changes: 8 additions & 0 deletions test/framework/fixtures/libgoalFixture.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,14 @@ func (f *LibGoalFixture) ShutdownImpl(preserveData bool) {
}
}

// intercept baseFixture.failOnError so we can clean up any algods that are still alive
func (f *LibGoalFixture) failOnError(err error, message string) {
if err != nil {
f.network.Stop(f.binDir)
f.baseFixture.failOnError(err, message)
}
}

// PrimaryDataDir returns the data directory for the PrimaryNode for the network
func (f *LibGoalFixture) PrimaryDataDir() string {
return f.network.PrimaryDataDir()
Expand Down