diff --git a/config/config.go b/config/config.go index a8beb05800..2d5d0bdbfe 100644 --- a/config/config.go +++ b/config/config.go @@ -268,6 +268,7 @@ const ( dnssecSRV = 1 << iota dnssecRelayAddr dnssecTelemetryAddr + dnssecTXT ) const ( diff --git a/config/localTemplate.go b/config/localTemplate.go index 1ebdab3870..e748d5eb43 100644 --- a/config/localTemplate.go +++ b/config/localTemplate.go @@ -362,8 +362,9 @@ type Local struct { // 0x01 (dnssecSRV) - validate SRV response // 0x02 (dnssecRelayAddr) - validate relays' names to addresses resolution // 0x04 (dnssecTelemetryAddr) - validate telemetry and metrics names to addresses resolution + // 0x08 (dnssecTXT) - validate TXT response // ... - DNSSecurityFlags uint32 `version[6]:"1"` + DNSSecurityFlags uint32 `version[6]:"1" version[34]:"9"` // EnablePingHandler controls whether the gossip node would respond to ping messages with a pong message. EnablePingHandler bool `version[6]:"true"` @@ -688,11 +689,16 @@ func (cfg Local) DNSSecurityRelayAddrEnforced() bool { return cfg.DNSSecurityFlags&dnssecRelayAddr != 0 } -// DNSSecurityTelemeryAddrEnforced returns true if relay name to ip addr resolution enforced -func (cfg Local) DNSSecurityTelemeryAddrEnforced() bool { +// DNSSecurityTelemetryAddrEnforced returns true if relay name to ip addr resolution enforced +func (cfg Local) DNSSecurityTelemetryAddrEnforced() bool { return cfg.DNSSecurityFlags&dnssecTelemetryAddr != 0 } +// DNSSecurityTXTEnforced returns true if TXT response verification enforced +func (cfg Local) DNSSecurityTXTEnforced() bool { + return cfg.DNSSecurityFlags&dnssecTXT != 0 +} + // CatchupVerifyCertificate returns true if certificate verification is needed func (cfg Local) CatchupVerifyCertificate() bool { return cfg.CatchupBlockValidateMode&catchupValidationModeCertificate == 0 diff --git a/config/local_defaults.go b/config/local_defaults.go index ca9b8f8509..232d4ae5df 100644 --- a/config/local_defaults.go +++ b/config/local_defaults.go @@ -51,7 +51,7 @@ var defaultLocal = Local{ ConnectionsRateLimitingWindowSeconds: 1, CrashDBDir: "", DNSBootstrapID: ".algorand.network?backup=.algorand.net&dedup=.algorand-.(network|net)", - DNSSecurityFlags: 1, + DNSSecurityFlags: 9, DeadlockDetection: 0, DeadlockDetectionThreshold: 30, DisableAPIAuth: false, diff --git a/go.mod b/go.mod index bfddb8e3db..e4d8c5b8d5 100644 --- a/go.mod +++ b/go.mod @@ -29,6 +29,7 @@ require ( github.com/google/go-querystring v1.0.0 github.com/gorilla/mux v1.8.0 github.com/ipfs/go-log v1.0.5 + github.com/ipfs/go-log/v2 v2.5.1 github.com/jmoiron/sqlx v1.2.0 github.com/karalabe/usb v0.0.2 github.com/labstack/echo/v4 v4.9.1 @@ -47,6 +48,7 @@ require ( github.com/spf13/cobra v1.5.0 github.com/stretchr/testify v1.8.4 go.opencensus.io v0.24.0 + go.uber.org/zap v1.27.0 golang.org/x/crypto v0.21.0 golang.org/x/exp v0.0.0-20240213143201-ec583247a57a golang.org/x/sync v0.6.0 @@ -101,7 +103,6 @@ require ( github.com/ipfs/boxo v0.10.0 // indirect github.com/ipfs/go-cid v0.4.1 // indirect github.com/ipfs/go-datastore v0.6.0 // indirect - github.com/ipfs/go-log/v2 v2.5.1 // indirect github.com/ipld/go-ipld-prime v0.20.0 // indirect github.com/jackpal/go-nat-pmp v1.0.2 // indirect github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect @@ -171,7 +172,6 @@ require ( go.uber.org/fx v1.20.1 // indirect go.uber.org/mock v0.4.0 // indirect go.uber.org/multierr v1.11.0 // indirect - go.uber.org/zap v1.27.0 // indirect golang.org/x/mod v0.15.0 // indirect golang.org/x/net v0.23.0 // indirect golang.org/x/term v0.18.0 // indirect diff --git a/installer/config.json.example b/installer/config.json.example index d695679067..b8701ccab3 100644 --- a/installer/config.json.example +++ b/installer/config.json.example @@ -30,7 +30,7 @@ "ConnectionsRateLimitingWindowSeconds": 1, "CrashDBDir": "", "DNSBootstrapID": ".algorand.network?backup=.algorand.net&dedup=.algorand-.(network|net)", - "DNSSecurityFlags": 1, + "DNSSecurityFlags": 9, "DeadlockDetection": 0, "DeadlockDetectionThreshold": 30, "DisableAPIAuth": false, diff --git a/logging/log.go b/logging/log.go index 48f83c1b9e..47a14ec722 100644 --- a/logging/log.go +++ b/logging/log.go @@ -148,6 +148,9 @@ type Logger interface { // source adds file, line and function fields to the event source() *logrus.Entry + // Entry returns the logrus raw entry + Entry() *logrus.Entry + // Adds a hook to the logger AddHook(hook logrus.Hook) @@ -319,6 +322,10 @@ func (l logger) SetJSONFormatter() { l.entry.Logger.Formatter = &logrus.JSONFormatter{TimestampFormat: "2006-01-02T15:04:05.000000Z07:00"} } +func (l logger) Entry() *logrus.Entry { + return l.entry +} + func (l logger) source() *logrus.Entry { event := l.entry diff --git a/netdeploy/remote/nodeConfig.go b/netdeploy/remote/nodeConfig.go index 2c6e0e423f..4880d76eb9 100644 --- a/netdeploy/remote/nodeConfig.go +++ b/netdeploy/remote/nodeConfig.go @@ -34,6 +34,7 @@ type NodeConfig struct { DashboardEndpoint string `json:",omitempty"` DeadlockOverride int `json:",omitempty"` // -1 = Disable deadlock detection, 0 = Use Default for build, 1 = Enable ConfigJSONOverride string `json:",omitempty"` // Raw json to merge into config.json after other modifications are complete + P2PBootstrap bool // True if this node should be a p2p bootstrap node and registered in DNS // NodeNameMatchRegex is tested against Name in generated configs and if matched the rest of the configs in this record are applied as a template NodeNameMatchRegex string `json:",omitempty"` diff --git a/netdeploy/remote/nodecfg/nodeConfigurator.go b/netdeploy/remote/nodecfg/nodeConfigurator.go index 5ab43d5ff7..842570bfc8 100644 --- a/netdeploy/remote/nodecfg/nodeConfigurator.go +++ b/netdeploy/remote/nodecfg/nodeConfigurator.go @@ -42,6 +42,7 @@ type nodeConfigurator struct { bootstrappedTrackerDir string relayEndpoints []srvEntry metricsEndpoints []srvEntry + p2pBootstrapEndpoints []txtEntry } type srvEntry struct { @@ -49,6 +50,11 @@ type srvEntry struct { port string } +type txtEntry struct { + netAddress string + peerID string +} + // ApplyConfigurationToHost attempts to apply the provided configuration to the local host, // based on the configuration specified for the provided hostName, with node // directories being created / updated under the specified rootNodeDir @@ -248,6 +254,31 @@ func (nc *nodeConfigurator) registerDNSRecords() (err error) { return } } + + dnsaddrsFrom := fmt.Sprintf("_dnsaddr.%s.algodev.network", nc.genesisData.Network) + for _, entry := range nc.p2pBootstrapEndpoints { + port, parseErr := strconv.ParseInt(strings.Split(entry.netAddress, ":")[1], 10, 64) + if parseErr != nil { + return parseErr + } + var addrType string + if isIP { + addrType = "ip4" + } else { + addrType = "dnsaddr" + } + addrInfoString := fmt.Sprintf("/%s/%s/tcp/%d/p2p/%s", addrType, nc.dnsName, port, entry.peerID) + to := fmt.Sprintf("dnsaddr=%s", addrInfoString) + + fmt.Fprintf(os.Stdout, "...... Adding P2P TXT Record '%s' -> '%s' .\n", dnsaddrsFrom, to) + const priority = 1 + const proxied = false + dnsErr := cloudflareDNS.CreateDNSRecord(context.Background(), "TXT", dnsaddrsFrom, to, cloudflare.AutomaticTTL, priority, proxied) + if dnsErr != nil { + return dnsErr + } + } + return } @@ -281,3 +312,7 @@ func (nc *nodeConfigurator) addRelaySrv(srvRecord string, port string) { func (nc *nodeConfigurator) registerMetricsSrv(srvRecord string, port string) { nc.metricsEndpoints = append(nc.metricsEndpoints, srvEntry{srvRecord, port}) } + +func (nc *nodeConfigurator) addP2PBootstrap(netAddress string, peerID string) { + nc.p2pBootstrapEndpoints = append(nc.p2pBootstrapEndpoints, txtEntry{netAddress, peerID}) +} diff --git a/netdeploy/remote/nodecfg/nodeDir.go b/netdeploy/remote/nodecfg/nodeDir.go index 9bd13343c1..26b1c9b8d1 100644 --- a/netdeploy/remote/nodecfg/nodeDir.go +++ b/netdeploy/remote/nodecfg/nodeDir.go @@ -18,6 +18,7 @@ package nodecfg import ( "encoding/json" + "errors" "fmt" "net/url" "os" @@ -27,6 +28,7 @@ import ( "github.com/algorand/go-algorand/config" "github.com/algorand/go-algorand/logging" "github.com/algorand/go-algorand/netdeploy/remote" + "github.com/algorand/go-algorand/network/p2p" "github.com/algorand/go-algorand/shared/algoh" "github.com/algorand/go-algorand/util/tokens" ) @@ -101,6 +103,12 @@ func (nd *nodeDir) configure() (err error) { fmt.Fprintf(os.Stdout, "Error during configureNetAddress: %s\n", err) return } + + if err = nd.configureP2PDNSBootstrap(nd.P2PBootstrap); err != nil { + fmt.Fprintf(os.Stdout, "Error during configureP2PDNSBootstrap: %s\n", err) + return + } + fmt.Println("Done configuring node directory.") return } @@ -156,6 +164,45 @@ func (nd *nodeDir) configureNetAddress() (err error) { return } +func (nd *nodeDir) configureP2PDNSBootstrap(p2pBootstrap bool) error { + if !p2pBootstrap { + return nil + } + fmt.Fprintf(os.Stdout, " - Configuring P2P DNS Bootstrap: %s\n", nd.Name) + if err := nd.ensureConfig(); err != nil { + return err + } + // ensure p2p config params set are what is expected: + // - EnableP2P or EnableP2PHybridMode + // - NetAddress or P2PListenAddress is set + // - EnableGossipService + if !nd.config.EnableP2P && !nd.config.EnableP2PHybridMode { + return errors.New("p2p bootstrap requires EnableP2P or EnableP2PHybridMode to be set") + } + if nd.NetAddress == "" && nd.config.P2PListenAddress == "" { + return errors.New("p2p bootstrap requires NetAddress or P2PListenAddress to be set") + } + if !nd.config.EnableGossipService { + return errors.New("p2p bootstrap requires EnableGossipService to be set") + } + + netAddress := nd.NetAddress + if nd.config.P2PListenAddress != "" { + netAddress = nd.config.P2PListenAddress + } + + key, err := p2p.GetPrivKey(config.Local{P2PPersistPeerID: true}, nd.dataDir) + if err != nil { + return err + } + peerID, err := p2p.PeerIDFromPublicKey(key.GetPublic()) + if err != nil { + return err + } + nd.configurator.addP2PBootstrap(netAddress, peerID.String()) + return nil +} + func (nd *nodeDir) configureAPIEndpoint(address string) (err error) { if err = nd.ensureConfig(); err != nil { return diff --git a/network/p2p/http.go b/network/p2p/http.go index d12b51034d..eea40c127e 100644 --- a/network/p2p/http.go +++ b/network/p2p/http.go @@ -21,6 +21,7 @@ import ( "sync" "time" + "github.com/algorand/go-algorand/logging" "github.com/algorand/go-algorand/network/limitcaller" "github.com/gorilla/mux" "github.com/libp2p/go-libp2p" @@ -62,6 +63,7 @@ func MakeHTTPClient(addrInfo *peer.AddrInfo) (*http.Client, error) { if err != nil { return nil, err } + logging.Base().Debugf("MakeHTTPClient made a new P2P host %s for %s", clientStreamHost.ID(), addrInfo.String()) client := libp2phttp.Host{StreamHost: clientStreamHost} diff --git a/network/p2p/logger.go b/network/p2p/logger.go new file mode 100644 index 0000000000..8ac9dc7b97 --- /dev/null +++ b/network/p2p/logger.go @@ -0,0 +1,118 @@ +// Copyright (C) 2019-2024 Algorand, Inc. +// This file is part of go-algorand +// +// go-algorand is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// go-algorand is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with go-algorand. If not, see . + +// This package implement a zap.Core in order to wrap lip2p logger into algod's logger. + +package p2p + +import ( + "runtime" + + p2plogging "github.com/ipfs/go-log/v2" + "github.com/sirupsen/logrus" + "go.uber.org/zap/zapcore" + + "github.com/algorand/go-algorand/logging" +) + +// var levelsMap = map[logging.Level]zapcore.Level{ +// logging.Debug: zapcore.DebugLevel, +// logging.Info: zapcore.InfoLevel, +// logging.Warn: zapcore.WarnLevel, +// logging.Error: zapcore.ErrorLevel, +// logging.Fatal: zapcore.FatalLevel, +// logging.Panic: zapcore.PanicLevel, +// } + +var levelsMap = map[zapcore.Level]logging.Level{ + zapcore.DebugLevel: logging.Debug, + zapcore.InfoLevel: logging.Info, + zapcore.WarnLevel: logging.Warn, + zapcore.ErrorLevel: logging.Error, + zapcore.FatalLevel: logging.Fatal, + zapcore.PanicLevel: logging.Panic, +} + +// loggingCore implements zapcore.Core +type loggingCore struct { + log logging.Logger + level logging.Level + fields []zapcore.Field + zapcore.Core +} + +// EnableP2PLogging enables libp2p logging into the provided logger with the provided level. +func EnableP2PLogging(log logging.Logger, l logging.Level) { + core := loggingCore{ + log: log, + level: l, + } + for p2pLevel, logLevel := range levelsMap { + if logLevel == l { + p2plogging.SetAllLoggers(p2plogging.LogLevel(p2pLevel)) + break + } + } + p2plogging.SetPrimaryCore(&core) +} + +func (c *loggingCore) Enabled(l zapcore.Level) bool { + return c.log.IsLevelEnabled(c.level) +} + +func (c *loggingCore) With(fields []zapcore.Field) zapcore.Core { + return &loggingCore{ + log: c.log, + level: c.level, + fields: append(c.fields, fields...), + } +} + +func (c *loggingCore) Check(e zapcore.Entry, ce *zapcore.CheckedEntry) *zapcore.CheckedEntry { + if c.Enabled(e.Level) { + return ce.AddCore(e, c) + } + return ce +} + +func (c *loggingCore) Write(e zapcore.Entry, fields []zapcore.Field) error { + allFields := append(c.fields, fields...) + loggingFields := make(logging.Fields, len(allFields)) + + for _, f := range allFields { + if len(f.String) > 0 { + loggingFields[f.Key] = f.String + } else if f.Interface != nil { + loggingFields[f.Key] = f.Interface + } else { + loggingFields[f.Key] = f.Integer + } + } + event := c.log.WithFields(loggingFields).With("libp2p", e.LoggerName) + event = event.WithFields(logrus.Fields{ + "file": e.Caller.File, + "line": e.Caller.Line, + }) + if function := runtime.FuncForPC(e.Caller.PC); function != nil { + event = event.With("function", function.Name()) + } + event.Entry().Log(logrus.Level(levelsMap[e.Level]), e.Message) + return nil +} + +func (c *loggingCore) Sync() error { + return nil +} diff --git a/network/p2p/p2p.go b/network/p2p/p2p.go index 9fd64e3aad..9ee9d3fa43 100644 --- a/network/p2p/p2p.go +++ b/network/p2p/p2p.go @@ -245,10 +245,17 @@ func (s *serviceImpl) IDSigner() *PeerIDChallengeSigner { func (s *serviceImpl) DialPeersUntilTargetCount(targetConnCount int) { ps := s.host.Peerstore().(*pstore.PeerStore) peerIDs := ps.GetAddresses(targetConnCount, phonebook.PhoneBookEntryRelayRole) + conns := s.host.Network().Conns() + var numOutgoingConns int + for _, conn := range conns { + if conn.Stat().Direction == network.DirOutbound { + numOutgoingConns++ + } + } for _, peerInfo := range peerIDs { peerInfo := peerInfo.(*peer.AddrInfo) // if we are at our target count stop trying to connect - if len(s.host.Network().Conns()) == targetConnCount { + if numOutgoingConns >= targetConnCount { return } // if we are already connected to this peer, skip it diff --git a/network/p2p/pubsub.go b/network/p2p/pubsub.go index 5d1c144632..8e67ac6725 100644 --- a/network/p2p/pubsub.go +++ b/network/p2p/pubsub.go @@ -93,7 +93,9 @@ func makePubSub(ctx context.Context, cfg config.Local, host host.Host) (*pubsub. pubsub.WithSubscriptionFilter(pubsub.WrapLimitSubscriptionFilter(pubsub.NewAllowlistSubscriptionFilter(TXTopicName), 100)), // pubsub.WithEventTracer(jsonTracer), pubsub.WithValidateQueueSize(256), + pubsub.WithMessageSignaturePolicy(pubsub.StrictNoSign), // pubsub.WithValidateThrottle(cfg.TxBacklogSize), + pubsub.WithValidateWorkers(20), // match to number wsNetwork workers } return pubsub.NewGossipSub(ctx, host, options...) diff --git a/network/p2p/streams.go b/network/p2p/streams.go index 0961141a0c..d16633adfd 100644 --- a/network/p2p/streams.go +++ b/network/p2p/streams.go @@ -75,7 +75,7 @@ func (n *streamManager) streamHandler(stream network.Stream) { n.streams[stream.Conn().RemotePeer()] = stream // streamHandler is supposed to be called for accepted streams, so we expect incoming here - incoming := stream.Stat().Direction == network.DirInbound + incoming := stream.Conn().Stat().Direction == network.DirInbound if !incoming { if stream.Stat().Direction == network.DirUnknown { n.log.Warnf("Unknown direction for a steam %s to/from %s", stream.ID(), remotePeer) @@ -93,7 +93,7 @@ func (n *streamManager) streamHandler(stream network.Stream) { // no old stream n.streams[stream.Conn().RemotePeer()] = stream // streamHandler is supposed to be called for accepted streams, so we expect incoming here - incoming := stream.Stat().Direction == network.DirInbound + incoming := stream.Conn().Stat().Direction == network.DirInbound if !incoming { if stream.Stat().Direction == network.DirUnknown { n.log.Warnf("streamHandler: unknown direction for a steam %s to/from %s", stream.ID(), remotePeer) @@ -142,7 +142,7 @@ func (n *streamManager) Connected(net network.Network, conn network.Conn) { stream, err := n.host.NewStream(n.ctx, remotePeer, AlgorandWsProtocol) if err != nil { - n.log.Infof("Failed to open stream to %s: %v", remotePeer, err) + n.log.Infof("Failed to open stream to %s (%s): %v", remotePeer, conn.RemoteMultiaddr().String(), err) return } n.streams[remotePeer] = stream @@ -153,7 +153,7 @@ func (n *streamManager) Connected(net network.Network, conn network.Conn) { n.streamsLock.Unlock() // a new stream created above, expected direction is outbound - incoming := stream.Stat().Direction == network.DirInbound + incoming := stream.Conn().Stat().Direction == network.DirInbound if incoming { n.log.Warnf("Unexpected incoming stream in streamHandler for connection %s (%s): %s vs %s stream", stream.Conn().ID(), remotePeer, stream.Conn().Stat().Direction, stream.Stat().Direction.String()) } else { diff --git a/network/p2pNetwork.go b/network/p2pNetwork.go index 5eaf6ec36f..22da1915f3 100644 --- a/network/p2pNetwork.go +++ b/network/p2pNetwork.go @@ -18,6 +18,7 @@ package network import ( "context" + "math/rand" "net" "net/http" "strings" @@ -39,6 +40,7 @@ import ( pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/p2p/discovery/backoff" "github.com/multiformats/go-multiaddr" manet "github.com/multiformats/go-multiaddr/net" ) @@ -191,8 +193,9 @@ type p2pPeerStats struct { } type gossipSubPeer struct { - peerID peer.ID - net GossipNode + peerID peer.ID + net GossipNode + routingAddr [8]byte } func (p gossipSubPeer) GetNetwork() GossipNode { return p.net } @@ -206,6 +209,10 @@ func (p gossipSubPeer) OnClose(f func()) { } } +func (p gossipSubPeer) RoutingAddr() []byte { + return p.routingAddr[:] +} + // NewP2PNetwork returns an instance of GossipNode that uses the p2p.Service func NewP2PNetwork(log logging.Logger, cfg config.Local, datadir string, phonebookAddresses []string, genesisID string, networkID protocol.NetworkID, node NodeInfo) (*P2PNetwork, error) { const readBufferLen = 2048 @@ -255,6 +262,8 @@ func NewP2PNetwork(log logging.Logger, cfg config.Local, datadir string, phonebo broadcastQueueBulk: make(chan broadcastRequest, 100), } + p2p.EnableP2PLogging(log, logging.Level(cfg.BaseLoggerDebugLevel)) + h, la, err := p2p.MakeHost(cfg, datadir, pstore) if err != nil { return nil, err @@ -270,7 +279,7 @@ func NewP2PNetwork(log logging.Logger, cfg config.Local, datadir string, phonebo cfg: cfg, networkID: networkID, phonebookPeers: addrInfo, - resolveController: dnsaddr.NewMultiaddrDNSResolveController(cfg.DNSSecuritySRVEnforced(), ""), + resolveController: dnsaddr.NewMultiaddrDNSResolveController(cfg.DNSSecurityTXTEnforced(), ""), log: net.log, } net.bootstrapperStart = bootstrapper.start @@ -393,12 +402,12 @@ func (n *P2PNetwork) innerStop() { } // meshThreadInner fetches nodes from DHT and attempts to connect to them -func (n *P2PNetwork) meshThreadInner() { +func (n *P2PNetwork) meshThreadInner() int { defer n.service.DialPeersUntilTargetCount(n.config.GossipFanout) // fetch peers from DNS var dnsPeers, dhtPeers []peer.AddrInfo - dnsPeers = dnsLookupBootstrapPeers(n.log, n.config, n.networkID, dnsaddr.NewMultiaddrDNSResolveController(n.config.DNSSecuritySRVEnforced(), "")) + dnsPeers = dnsLookupBootstrapPeers(n.log, n.config, n.networkID, dnsaddr.NewMultiaddrDNSResolveController(n.config.DNSSecurityTXTEnforced(), "")) // discover peers from DHT if n.capabilitiesDiscovery != nil { @@ -406,7 +415,6 @@ func (n *P2PNetwork) meshThreadInner() { dhtPeers, err = n.capabilitiesDiscovery.PeersForCapability(p2p.Gossip, n.config.GossipFanout) if err != nil { n.log.Warnf("Error getting relay nodes from capabilities discovery: %v", err) - return } n.log.Debugf("Discovered %d gossip peers from DHT", len(dhtPeers)) } @@ -416,21 +424,36 @@ func (n *P2PNetwork) meshThreadInner() { for i := range peers { replace = append(replace, &peers[i]) } - n.pstore.ReplacePeerList(replace, string(n.networkID), phonebook.PhoneBookEntryRelayRole) + if len(peers) > 0 { + n.pstore.ReplacePeerList(replace, string(n.networkID), phonebook.PhoneBookEntryRelayRole) + } + return len(peers) } func (n *P2PNetwork) meshThread() { defer n.wg.Done() + timer := time.NewTicker(1) // start immediately and reset after + + // Add exponential backoff with jitter to the mesh thread to handle new networks startup + // when no DNS or DHT peers are available. + // The parameters produce approximate the following delays (although they are random but the sequence give the idea): + // 2 2.4 4.6 9 20 19.5 28 24 14 14 35 60 60 + ebf := backoff.NewExponentialDecorrelatedJitter(2*time.Second, meshThreadInterval, 3.0, rand.NewSource(rand.Int63())) + eb := ebf() + defer timer.Stop() - var resetTimer bool for { select { case <-timer.C: - n.meshThreadInner() - if !resetTimer { + numPeers := n.meshThreadInner() + if numPeers > 0 { + // found something, reset timer to the default value timer.Reset(meshThreadInterval) - resetTimer = true + eb.Reset() + } else { + // no peers found, backoff + timer.Reset(eb.Delay()) } case <-n.ctx.Done(): return @@ -717,9 +740,9 @@ func (n *P2PNetwork) GetHTTPRequestConnection(request *http.Request) (conn Deadl // wsStreamHandler is a callback that the p2p package calls when a new peer connects and establishes a // stream for the websocket protocol. -func (n *P2PNetwork) wsStreamHandler(ctx context.Context, p2ppeer peer.ID, stream network.Stream, incoming bool) { +func (n *P2PNetwork) wsStreamHandler(ctx context.Context, p2pPeer peer.ID, stream network.Stream, incoming bool) { if stream.Protocol() != p2p.AlgorandWsProtocol { - n.log.Warnf("unknown protocol %s", stream.Protocol()) + n.log.Warnf("unknown protocol %s from peer%s", stream.Protocol(), p2pPeer) return } @@ -727,7 +750,7 @@ func (n *P2PNetwork) wsStreamHandler(ctx context.Context, p2ppeer peer.ID, strea var initMsg [1]byte rn, err := stream.Read(initMsg[:]) if rn == 0 || err != nil { - n.log.Warnf("wsStreamHandler: error reading initial message: %s", err) + n.log.Warnf("wsStreamHandler: error reading initial message: %s, peer %s (%s)", err, p2pPeer, stream.Conn().RemoteMultiaddr().String()) return } } else { @@ -743,7 +766,7 @@ func (n *P2PNetwork) wsStreamHandler(ctx context.Context, p2ppeer peer.ID, strea if numOutgoingPeers >= n.config.GossipFanout { // this appears to be some auxiliary connection made by libp2p itself like DHT connection. // skip this connection since there are already enough peers - n.log.Debugf("skipping outgoing connection to peer %s: num outgoing %d > fanout %d ", p2ppeer, numOutgoingPeers, n.config.GossipFanout) + n.log.Debugf("skipping outgoing connection to peer %s: num outgoing %d > fanout %d ", p2pPeer, numOutgoingPeers, n.config.GossipFanout) stream.Close() return } @@ -759,11 +782,11 @@ func (n *P2PNetwork) wsStreamHandler(ctx context.Context, p2ppeer peer.ID, strea ma := stream.Conn().RemoteMultiaddr() addr := ma.String() if addr == "" { - n.log.Warnf("Could not get address for peer %s", p2ppeer) + n.log.Warnf("Could not get address for peer %s", p2pPeer) } // create a wsPeer for this stream and added it to the peers map. - addrInfo := &peer.AddrInfo{ID: p2ppeer, Addrs: []multiaddr.Multiaddr{ma}} + addrInfo := &peer.AddrInfo{ID: p2pPeer, Addrs: []multiaddr.Multiaddr{ma}} maxIdleConnsPerHost := int(n.config.ConnectionsRateLimitingCount) client, err := p2p.MakeHTTPClientWithRateLimit(addrInfo, n.pstore, limitcaller.DefaultQueueingTimeout, maxIdleConnsPerHost) if err != nil { @@ -775,16 +798,16 @@ func (n *P2PNetwork) wsStreamHandler(ctx context.Context, p2ppeer peer.ID, strea conn: &wsPeerConnP2PImpl{stream: stream}, outgoing: !incoming, } - protos, err := n.pstore.GetProtocols(p2ppeer) + protos, err := n.pstore.GetProtocols(p2pPeer) if err != nil { - n.log.Warnf("Error getting protocols for peer %s: %v", p2ppeer, err) + n.log.Warnf("Error getting protocols for peer %s: %v", p2pPeer, err) } wsp.TelemetryGUID, wsp.InstanceName = p2p.GetPeerTelemetryInfo(protos) wsp.init(n.config, outgoingMessagesBufferSize) n.wsPeersLock.Lock() - n.wsPeers[p2ppeer] = wsp - n.wsPeersToIDs[wsp] = p2ppeer + n.wsPeers[p2pPeer] = wsp + n.wsPeersToIDs[wsp] = p2pPeer n.wsPeersLock.Unlock() n.wsPeersChangeCounter.Add(1) @@ -794,8 +817,11 @@ func (n *P2PNetwork) wsStreamHandler(ctx context.Context, p2ppeer peer.ID, strea event = "ConnectedIn" msg = "Accepted incoming connection from peer %s" } - localAddr, _ := n.Address() - n.log.With("event", event).With("remote", addr).With("local", localAddr).Infof(msg, p2ppeer.String()) + localAddr, has := n.Address() + if !has { + n.log.Warn("Could not get local address") + } + n.log.With("event", event).With("remote", addr).With("local", localAddr).Infof(msg, p2pPeer.String()) if n.log.GetLevel() >= logging.Debug { n.log.Debugf("streams for %s conn %s ", stream.Conn().Stat().Direction.String(), stream.Conn().ID()) @@ -912,8 +938,18 @@ func (n *P2PNetwork) txTopicHandleLoop() { // txTopicValidator calls txHandler to validate and process incoming transactions. func (n *P2PNetwork) txTopicValidator(ctx context.Context, peerID peer.ID, msg *pubsub.Message) pubsub.ValidationResult { + var routingAddr [8]byte + n.wsPeersLock.Lock() + if wsp, ok := n.wsPeers[peerID]; ok { + copy(routingAddr[:], wsp.RoutingAddr()) + } else { + // well, otherwise use last 8 bytes of peerID + copy(routingAddr[:], peerID[len(peerID)-8:]) + } + n.wsPeersLock.Unlock() + inmsg := IncomingMessage{ - Sender: gossipSubPeer{peerID: msg.ReceivedFrom, net: n}, + Sender: gossipSubPeer{peerID: msg.ReceivedFrom, net: n, routingAddr: routingAddr}, Tag: protocol.TxnTag, Data: msg.Data, Net: n, diff --git a/network/p2pNetwork_test.go b/network/p2pNetwork_test.go index 7cc590ff02..0206871a54 100644 --- a/network/p2pNetwork_test.go +++ b/network/p2pNetwork_test.go @@ -36,6 +36,7 @@ import ( "github.com/algorand/go-algorand/network/phonebook" "github.com/algorand/go-algorand/protocol" "github.com/algorand/go-algorand/test/partitiontest" + "github.com/algorand/go-algorand/util" pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/core/crypto" @@ -1106,3 +1107,22 @@ func TestMergeP2PAddrInfoResolvedAddresses(t *testing.T) { }) } } + +// TestP2PGossipSubPeerCasts checks that gossipSubPeer implements the ErlClient and IPAddressable interfaces +// needed by TxHandler +func TestP2PGossipSubPeerCasts(t *testing.T) { + partitiontest.PartitionTest(t) + t.Parallel() + + var g interface{} = gossipSubPeer{} + _, ok := g.(util.ErlClient) + require.True(t, ok) + + _, ok = g.(IPAddressable) + require.True(t, ok) + + // check that gossipSubPeer is hashable as ERL wants + var m map[util.ErlClient]struct{} + require.Equal(t, m[gossipSubPeer{}], struct{}{}) + require.Equal(t, m[g.(util.ErlClient)], struct{}{}) +} diff --git a/network/websocketProxy_test.go b/network/websocketProxy_test.go index cefe4b687a..96628acb69 100644 --- a/network/websocketProxy_test.go +++ b/network/websocketProxy_test.go @@ -317,7 +317,7 @@ func TestWebsocketProxyWsNet(t *testing.T) { peerB := netA.peers[0] require.NotEmpty(t, peerB.originAddress) require.Equal(t, fakeXForwardedFor, peerB.originAddress) - require.NotEqual(t, peerB.RoutingAddr(), peerB.IPAddr()) + require.NotEqual(t, peerB.RoutingAddr(), peerB.ipAddr()) fakeXForwardedForParsed := net.ParseIP(fakeXForwardedFor) require.NotEqual(t, fakeXForwardedForParsed, peerB.RoutingAddr()) } diff --git a/network/wsPeer.go b/network/wsPeer.go index 7606168687..2b302f071f 100644 --- a/network/wsPeer.go +++ b/network/wsPeer.go @@ -329,7 +329,6 @@ type HTTPPeer interface { // IPAddressable is addressable with either IPv4 or IPv6 address type IPAddressable interface { - IPAddr() []byte RoutingAddr() []byte } @@ -385,7 +384,7 @@ func (wp *wsPeer) Version() string { return wp.version } -func (wp *wsPeer) IPAddr() []byte { +func (wp *wsPeer) ipAddr() []byte { remote := wp.conn.RemoteAddr() if remote == nil { return nil @@ -420,7 +419,7 @@ func (wp *wsPeer) RoutingAddr() []byte { if wp.wsPeerCore.originAddress != "" { ip = net.ParseIP(wp.wsPeerCore.originAddress) } else { - ip = wp.IPAddr() + ip = wp.ipAddr() } if len(ip) != net.IPv6len { diff --git a/network/wsPeer_test.go b/network/wsPeer_test.go index d1f32302a0..973c027b16 100644 --- a/network/wsPeer_test.go +++ b/network/wsPeer_test.go @@ -288,32 +288,32 @@ func TestWsPeerIPAddr(t *testing.T) { } // some raw IPv4 address conn.addr.IP = []byte{127, 0, 0, 1} - require.Equal(t, []byte{127, 0, 0, 1}, peer.IPAddr()) + require.Equal(t, []byte{127, 0, 0, 1}, peer.ipAddr()) require.Equal(t, []byte{127, 0, 0, 1}, peer.RoutingAddr()) // IPv4 constructed from net.IPv4 conn.addr.IP = net.IPv4(127, 0, 0, 2) - require.Equal(t, []byte{127, 0, 0, 2}, peer.IPAddr()) + require.Equal(t, []byte{127, 0, 0, 2}, peer.ipAddr()) require.Equal(t, []byte{127, 0, 0, 2}, peer.RoutingAddr()) // some IPv6 address conn.addr.IP = net.IPv6linklocalallrouters - require.Equal(t, []byte(net.IPv6linklocalallrouters), peer.IPAddr()) + require.Equal(t, []byte(net.IPv6linklocalallrouters), peer.ipAddr()) require.Equal(t, []byte(net.IPv6linklocalallrouters[0:8]), peer.RoutingAddr()) // embedded IPv4 into IPv6 conn.addr.IP = []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 255, 255, 127, 0, 0, 3} require.Equal(t, 16, len(conn.addr.IP)) - require.Equal(t, []byte{127, 0, 0, 3}, peer.IPAddr()) + require.Equal(t, []byte{127, 0, 0, 3}, peer.ipAddr()) require.Equal(t, []byte{127, 0, 0, 3}, peer.RoutingAddr()) conn.addr.IP = []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 127, 0, 0, 4} require.Equal(t, 16, len(conn.addr.IP)) - require.Equal(t, []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 127, 0, 0, 4}, peer.IPAddr()) + require.Equal(t, []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 127, 0, 0, 4}, peer.ipAddr()) require.Equal(t, []byte{127, 0, 0, 4}, peer.RoutingAddr()) // check incoming peer with originAddress set conn.addr.IP = []byte{127, 0, 0, 1} peer.wsPeerCore.originAddress = "127.0.0.2" - require.Equal(t, []byte{127, 0, 0, 1}, peer.IPAddr()) + require.Equal(t, []byte{127, 0, 0, 1}, peer.ipAddr()) require.Equal(t, []byte{127, 0, 0, 2}, peer.RoutingAddr()) } diff --git a/node/node.go b/node/node.go index 206d2fa087..bfb32ba3ff 100644 --- a/node/node.go +++ b/node/node.go @@ -199,14 +199,13 @@ func MakeFull(log logging.Logger, rootDir string, cfg config.Local, phonebookAdd // tie network, block fetcher, and agreement services together var p2pNode network.GossipNode if cfg.EnableP2PHybridMode { - p2pNode, err = network.NewHybridP2PNetwork(node.log, node.config, node.genesisDirs.RootGenesisDir, phonebookAddresses, genesis.ID(), genesis.Network, node) + p2pNode, err = network.NewHybridP2PNetwork(node.log, node.config, rootDir, phonebookAddresses, genesis.ID(), genesis.Network, node) if err != nil { log.Errorf("could not create hybrid p2p node: %v", err) return nil, err } } else if cfg.EnableP2P { - // TODO: pass more appropriate genesisDir (hot/cold). Presently this is just used to store a peerID key. - p2pNode, err = network.NewP2PNetwork(node.log, node.config, node.genesisDirs.RootGenesisDir, phonebookAddresses, genesis.ID(), genesis.Network, node) + p2pNode, err = network.NewP2PNetwork(node.log, node.config, rootDir, phonebookAddresses, genesis.ID(), genesis.Network, node) if err != nil { log.Errorf("could not create p2p node: %v", err) return nil, err diff --git a/node/node_test.go b/node/node_test.go index d4e2a08ebd..361f3478f7 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -891,9 +891,7 @@ func TestNodeHybridTopology(t *testing.T) { cfg.EnableP2PHybridMode = true cfg.EnableDHTProviders = true cfg.P2PPersistPeerID = true - genesisDirs, err := cfg.EnsureAndResolveGenesisDirs(ni.rootDir, ni.genesis.ID(), nil) - require.NoError(t, err) - privKey, err := p2p.GetPrivKey(cfg, genesisDirs.RootGenesisDir) + privKey, err := p2p.GetPrivKey(cfg, ni.rootDir) require.NoError(t, err) ni.p2pID, err = p2p.PeerIDFromPublicKey(privKey.GetPublic()) require.NoError(t, err) @@ -984,9 +982,7 @@ func TestNodeP2PRelays(t *testing.T) { cfg.EnableDHTProviders = true cfg.P2PPersistPeerID = true - genesisDirs, err := cfg.EnsureAndResolveGenesisDirs(ni.rootDir, ni.genesis.ID(), nil) - require.NoError(t, err) - privKey, err := p2p.GetPrivKey(cfg, genesisDirs.RootGenesisDir) + privKey, err := p2p.GetPrivKey(cfg, ni.rootDir) require.NoError(t, err) ni.p2pID, err = p2p.PeerIDFromPublicKey(privKey.GetPublic()) require.NoError(t, err) diff --git a/test/heapwatch/agreement-log.py b/test/heapwatch/agreement-log.py new file mode 100644 index 0000000000..4109b37a71 --- /dev/null +++ b/test/heapwatch/agreement-log.py @@ -0,0 +1,187 @@ +""" +Agreement logs parser, takes either separate node.log files from a directory and guessing names from the file names, +or parses the e2e test failure log file watching for node names as " libgoalFixture.go:376: Relay0/node.log:" strings. + +This tool similar a bit to carpenter but takes multiple log files at once. +To force colors when outputting to a file, set FORCE_COLOR=1 in the environment. +""" + +import argparse +from datetime import datetime, timedelta +import glob +import json +import logging +import os +import time + +from termcolor import COLORS, colored + +logger = logging.getLogger(__name__) + +filtered_events = frozenset(['Persisted']) + +def process_json_line(line: str, node_name: str, by_node: dict, events: list): + """Handles a single line of json log file, returns parsed event or None if it's not an agreement event. + + line is a single line of json log file. + node_name is a name of the node that produced this line. + by_node is dict with unique nodes meta information. + events is a list of all parsed events. It is appended in this function to keep the caller code clean. + """ + try: + evt = json.loads(line) + except json.JSONDecodeError: + logger.error('failed to parse json: %s', line) + return None + if evt.get('Context') == 'Agreement' and evt.get('Type'): + if evt['Type'] in filtered_events: + return None + dt = datetime.strptime(evt['time'], '%Y-%m-%dT%H:%M:%S.%f%z') + sender = evt.get('Sender') + sender = sender[:12] if sender else '' + h = evt.get('Hash') + h = h[:8] if h else '' + w = evt.get('Weight', '-') if not evt['Type'].startswith('Proposal') else ' ' + wt = evt.get('WeightTotal', '-') if not evt['Type'].startswith('Proposal') else ' ' + if evt['Type'] in ('StepTimeout', 'VoteAttest', 'BlockAssembled', 'BlockPipelined'): + w, wt = ' ', ' ' + result = { + 'time': dt, + 'type': evt.get('Type'), + 'round': evt.get('Round', '-'), + 'period': evt.get('Period', '-'), + 'step': evt.get('Step', '-'), + 'object_round': evt.get('ObjectRound', '-'), + 'object_period': evt.get('ObjectPeriod', '-'), + 'object_step': evt.get('ObjectStep', '-'), + 'hash': h, + 'sender': sender, + 'weight': w, + 'weight_total': wt, + 'node': node_name, + } + events.append(result) + metadata = by_node.get(node_name) + if not metadata: + metadata = { + 'type': evt.get('Type'), + 'time': dt + } + by_node[node_name] = metadata + else: + if evt.get('Type') == 'RoundConcluded': + rt = dt - metadata['time'] + result['round_time_ms'] = rt / timedelta(milliseconds=1) + elif evt.get('Type') == 'RoundStart': + metadata['time'] = dt + metadata['type'] = 'RoundStart' + by_node[node_name] = metadata + + return result + return None + +def main(): + os.environ['TZ'] = 'UTC' + time.tzset() + + ap = argparse.ArgumentParser() + ap.add_argument('test_log_or_dir', help='Dir with log files or a single log file from e2e tests') + ap.add_argument('-e', '--end-round', type=int, help=f'Round to end at') + args = ap.parse_args() + + by_node = {} + events = [] + if os.path.isdir(args.test_log_or_dir): + logger.info('processing directory %s', args.test_log_or_dir) + log_files = sorted(glob.glob(os.path.join(args.test_log_or_dir, '*-node.log'))) + if not log_files: + logger.error('no log files found in %s', args.test_log_or_dir) + return 1 + for filename in os.listdir(args.test_log_or_dir): + if filename.endswith("-node.log"): + with open(os.path.join(args.test_log_or_dir, filename), 'r') as file: + node_name = filename[:len(filename) - len('-node.log')] + node_name = node_name.replace('relay', 'R') + node_name = node_name.replace('nonParticipatingNode', 'NPN') + node_name = node_name.replace('node', 'N') + for line in file: + event = process_json_line(line, node_name, by_node, events) + if event and args.end_round and \ + isinstance(event['round'], int) and event['round'] >= args.end_round: + break + + else: + logger.info('processing file %s', args.test_log_or_dir) + with open(args.test_log_or_dir, 'r') as file: + line0 = None + while not line0: + line0 = file.readline() + line0 = line0.strip() + + if line0[0] == '{': + # regular json line + node_name = 'node' + process_json_line(line, node_name, by_node, events) + for line in file: + line = line.strip() + event = process_json_line(line, node_name, by_node, events) + if event and args.end_round and \ + isinstance(event['round'], int) and event['round'] >= args.end_round: + break + else: + # looks like e2e test output with lines line this: + """ + libgoalFixture.go:374: ===================... + libgoalFixture.go:376: Relay0/node.log: + libgoalFixture.go:379: {"file":"server.go"... + """ + node_name = None + if line0.endswith('node.log:'): + node_name = line0.split(' ')[1].split('/')[0] + logger.info('found node name: %s', node_name) + for line in file: + line = line.strip() + if line.endswith('node.log:'): + node_name = line.split(' ')[1].split('/')[0] + logger.info('found node name: %s', node_name) + if node_name: + for line in file: + json_start = line.find('{') + if json_start == -1: + # end of continuous json block + node_name = None + break + line = line[json_start:] + event = process_json_line(line, node_name, by_node, events) + if event and args.end_round and \ + isinstance(event['round'], int) and event['round'] >= args.end_round: + break + + log = sorted(events, key=lambda x: x['time']) + + # num_nodes = len(by_node) + colors = list(COLORS) + colors = colors[colors.index('light_grey'):] + if len(colors) < len(by_node): + colors = colors * (len(by_node) // len(colors) + 1) + node_color = {k: v for k, v in zip(by_node.keys(), colors)} + + fmt = '%15s (%s,%s,%s) (%s,%s,%s) %4s|%-4s %-8s %-18s %8s %12s %5s' + print(fmt % ('TS', 'R', 'P', 'S', 'r', 'p', 's', 'W', 'WT', 'NODE', 'EVENT TYPE', 'HASH', 'SENDER', 'RT ms')) + for e in log: + color = node_color[e['node']] + text = colored(fmt % ( + e['time'].strftime('%H:%M:%S.%f'), + e['round'], e['period'], e['step'], + e['object_round'], e['object_period'], e['object_step'], + e['weight'], e['weight_total'], + e['node'][:8], + e['type'], e['hash'], e['sender'], + int(e['round_time_ms']) if 'round_time_ms' in e else ''), + color, + ) + print(text) + +if __name__ == '__main__': + logging.basicConfig(level=logging.INFO) + main() diff --git a/test/heapwatch/block_history_plot.py b/test/heapwatch/block_history_plot.py index 7de45e21b0..d8c86b454f 100644 --- a/test/heapwatch/block_history_plot.py +++ b/test/heapwatch/block_history_plot.py @@ -138,19 +138,21 @@ def process(path, args): min(tpsv[start:end]), max(tpsv[start:end]), )) print('long round times: {}'.format(' '.join(list(map(str,filter(lambda x: x >= 9,dtv[start:end])))))) - fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2,2) - ax1.set_title('round time (seconds)') + fig, ((ax1, ax2, ax3), (ax4, ax5, ax6)) = plt.subplots(2,3, figsize=(10, 5)) + ax1.set_title('round time histogram (sec)') ax1.hist(list(filter(lambda x: x < 9,dtv[start:end])),bins=20) - if args.rtime: - ax2.set_title('round time') - ax2.plot(dtv) - else: - ax2.set_title('TPS') - ax2.hist(tpsv[start:end],bins=20) + ax4.set_title('round time') + ax4.plot(dtv[start:end]) + + ax2.set_title('txn/block histogram') + ax2.hist(txnv[start:end],bins=20) + + ax5.set_title('txn/block') + ax5.plot(txnv[start:end]) - ax3.set_title('txn/block') - ax3.hist(txnv[start:end],bins=20) + ax3.set_title('TPS') + ax3.hist(tpsv[start:end],bins=20) # 10 round moving average TPS tpsv10 = [] @@ -165,12 +167,12 @@ def process(path, args): dtxn = tca-tc0 tpsv10.append(dtxn/dt) if args.tps1: - ax4.set_title('TPS') - ax4.plot(tpsv[start:end]) + ax6.set_title('TPS') + ax6.plot(tpsv[start:end]) print('fullish block sizes: {}'.format(list(filter(lambda x: x > 100, txnv)))) else: - ax4.set_title('TPS(10 round window)') - ax4.plot(tpsv10) + ax6.set_title('TPS(10 round window)') + ax6.plot(tpsv10) fig.tight_layout() plt.savefig(path + '_hist.svg', format='svg') plt.savefig(path + '_hist.png', format='png') diff --git a/test/heapwatch/client_ram_report.py b/test/heapwatch/client_ram_report.py index 97a1171630..f16fbeaa3f 100644 --- a/test/heapwatch/client_ram_report.py +++ b/test/heapwatch/client_ram_report.py @@ -202,6 +202,10 @@ def main(): heap_totals = get_heap_inuse_totals(args.dir) heap_details = get_heap_metrics(args.dir) + if not heap_totals and not heap_details: + print('no data found', file=sys.stderr) + return 0 + if args.csv: if args.csv == '-': csvf = sys.stdout diff --git a/test/heapwatch/metrics_aggs.py b/test/heapwatch/metrics_aggs.py new file mode 100644 index 0000000000..0189634be5 --- /dev/null +++ b/test/heapwatch/metrics_aggs.py @@ -0,0 +1,175 @@ +#!/usr/bin/env python3 +# Copyright (C) 2019-2024 Algorand, Inc. +# This file is part of go-algorand +# +# go-algorand is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# go-algorand is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with go-algorand. If not, see . +# +### +# +# Process and aggregate /metrics data captured by heapWatch.py +# Useful for metrics with labels and bandwidth analysis. +# +import argparse +import glob +import logging +import os +import time +import sys + +import dash +from dash import dcc, html +import plotly.graph_objs as go +from plotly.subplots import make_subplots + + +from metrics_lib import MetricType, parse_metrics, gather_metrics_files_by_nick + +logger = logging.getLogger(__name__) + + +def main(): + os.environ['TZ'] = 'UTC' + time.tzset() + default_img_filename = 'metrics_aggs.png' + default_html_filename = 'metrics_aggs.html' + + ap = argparse.ArgumentParser() + ap.add_argument('metrics_names', nargs='+', default=None, help='metric name(s) to track') + ap.add_argument('-d', '--dir', type=str, default=None, help='dir path to find /*.metrics in') + ap.add_argument('-l', '--list-nodes', default=False, action='store_true', help='list available node names with metrics') + ap.add_argument('-t', '--tags', action='append', default=[], help='tag/label pairs in a=b format to aggregate by, may be repeated. Empty means aggregation by metric name') + ap.add_argument('--nick-re', action='append', default=[], help='regexp to filter node names, may be repeated') + ap.add_argument('--nick-lre', action='append', default=[], help='label:regexp to filter node names, may be repeated') + ap.add_argument('-s', '--save', type=str, choices=['png', 'html'], help=f'save plot to \'{default_img_filename}\' or \'{default_html_filename}\' file instead of showing it') + ap.add_argument('--verbose', default=False, action='store_true') + + args = ap.parse_args() + if args.verbose: + logging.basicConfig(level=logging.DEBUG) + else: + logging.basicConfig(level=logging.INFO) + + tags = {} + if args.tags: + for tag in args.tags: + if '=' not in tag: + raise (f'Invalid tag: {tag}') + k, v = tag.split('=', 1) + tags[k] = v + tag_keys = set(tags.keys()) + + metrics_files = sorted(glob.glob(os.path.join(args.dir, '*.metrics'))) + metrics_files.extend(glob.glob(os.path.join(args.dir, 'terraform-inventory.host'))) + filesByNick = gather_metrics_files_by_nick(metrics_files, args.nick_re, args.nick_lre) + + if args.list_nodes: + print('Available nodes:', ', '.join(sorted(filesByNick.keys()))) + return 0 + + app = dash.Dash(__name__) + app.layout = html.Div( + html.Div([ + html.H4('Algod Metrics'), + html.Div(id='text'), + dcc.Graph(id='graph'), + ]) + ) + metrics_names = set(args.metrics_names) + nrows = len(metrics_names) + + fig = make_subplots( + rows=nrows, cols=1, + vertical_spacing=0.03, shared_xaxes=True, + subplot_titles=[f'{name}' for name in sorted(metrics_names)], + ) + + fig['layout']['margin'] = { + 'l': 30, 'r': 10, 'b': 10, 't': 20 + } + fig['layout']['height'] = 500 * nrows + + + for nick, files_by_date in filesByNick.items(): + active_metrics = {} + data = {'time': []} + raw_series = {} + raw_times = {} + idx = 0 + for dt, metrics_file in files_by_date.items(): + data['time'].append(dt) + with open(metrics_file, 'rt') as f: + metrics = parse_metrics(f, nick, metrics_names) + for metric_name, metrics_seq in metrics.items(): + active_metric_names = [] + raw_value = 0 + for metric in metrics_seq: + if metric.type != MetricType.COUNTER: + raise RuntimeError('Only COUNT metrics are supported') + if tags is None or tags is not None and metric.has_tags(tag_keys, tags): + raw_value += metric.value + full_name = metric.string(set(tag_keys).union({'n'})) + + if full_name is None: + continue + + if full_name not in data: + # handle gaps in data, sometimes metric file might miss a value + # but the chart requires matching x and y series (time and metric value) + # data is what does into the chart, and raw_series is used to calculate + data[full_name] = [0] * len(files_by_date) + raw_series[full_name] = [] + raw_times[full_name] = [] + + metric_value = raw_value + if len(raw_series[full_name]) > 0 and len(raw_times[full_name]) > 0: + metric_value = (metric_value - raw_series[full_name][-1]) / (dt - raw_times[full_name][-1]).total_seconds() + else: + metric_value = 0 + + data[full_name][idx] = metric_value + raw_series[full_name].append(raw_value) + raw_times[full_name].append(dt) + + active_metric_names.append(full_name) + + active_metric_names.sort() + active_metrics[full_name] = active_metric_names + idx += 1 + + for i, metric_pair in enumerate(sorted(active_metrics.items())): + metric_name, metric_fullnames = metric_pair + for metric_fullname in metric_fullnames: + fig.append_trace(go.Scatter( + x=data['time'], + y=data[metric_fullname], + name=metric_fullname, + mode='lines+markers', + line=dict(width=1), + ), i+1, 1) + + if args.save: + if args.save == 'html': + target_path = os.path.join(args.dir, default_html_filename) + fig.write_html(target_path) + else: + target_path = os.path.join(args.dir, default_img_filename) + fig.write_image(target_path) + print(f'Saved plot to {target_path}') + else: + fig.show() + + return 0 + +if __name__ == '__main__': + sys.exit(main()) \ No newline at end of file diff --git a/test/heapwatch/metrics_delta.py b/test/heapwatch/metrics_delta.py index 50b1e9e2e3..2d64ee097a 100644 --- a/test/heapwatch/metrics_delta.py +++ b/test/heapwatch/metrics_delta.py @@ -22,7 +22,6 @@ # Generate text report on bandwidth in and out of relays/PN/NPN import argparse -import configparser import contextlib import csv import glob @@ -36,42 +35,10 @@ import sys import time -logger = logging.getLogger(__name__) +from metrics_lib import num, hunum, terraform_inventory_ip_not_names, \ + metric_line_re, test_metric_line_re -def num(x): - if '.' in x: - return float(x) - return int(x) - -def hunum(x): - if x >= 10000000000: - return '{:.1f}G'.format(x / 1000000000.0) - if x >= 1000000000: - return '{:.2f}G'.format(x / 1000000000.0) - if x >= 10000000: - return '{:.1f}M'.format(x / 1000000.0) - if x >= 1000000: - return '{:.2f}M'.format(x / 1000000.0) - if x >= 10000: - return '{:.1f}k'.format(x / 1000.0) - if x >= 1000: - return '{:.2f}k'.format(x / 1000.0) - return '{:.2f}x'.format(x) - -metric_line_re = re.compile(r'(\S+\{[^}]*\})\s+(.*)') - -def test_metric_line_re(): - testlines = ( - ('algod_network_connections_dropped_total{reason="write err"} 1', 1), - #('algod_network_sent_bytes_MS 274992', 274992), # handled by split - ) - for line, n in testlines: - try: - m = metric_line_re.match(line) - assert int(m.group(2)) == n - except: - logger.error('failed on line %r', line, exc_info=True) - raise +logger = logging.getLogger(__name__) def parse_metrics(fin): out = dict() @@ -86,10 +53,15 @@ def parse_metrics(fin): continue m = metric_line_re.match(line) if m: - out[m.group(1)] = num(m.group(2)) + key = m.group(1) + val = m.group(2) else: ab = line.split() - out[ab[0]] = num(ab[1]) + key = ab[0] + val = ab[1] + if key.endswith('{}'): + key = key[:-2] + out[key] = num(val) except: print(f'An exception occurred in parse_metrics: {sys.exc_info()}') pass @@ -371,21 +343,6 @@ def process_nick_re(nre, filesByNick, nick_to_tfname, rsum, args, grsum): 'npn': (.7,.7,0), } -def terraform_inventory_ip_not_names(tf_inventory_path): - """return ip to nickname mapping""" - tf_inventory = configparser.ConfigParser(allow_no_value=True) - tf_inventory.read(tf_inventory_path) - ip_to_name = {} - for k, sub in tf_inventory.items(): - if k.startswith('name_'): - for ip in sub: - if ip in ip_to_name: - logger.warning('ip %r already named %r, also got %r', ip, ip_to_name[ip], k) - ip_to_name[ip] = k - #logger.debug('names: %r', sorted(ip_to_name.values())) - #logger.debug('ip to name %r', ip_to_name) - return ip_to_name - def main(): os.environ['TZ'] = 'UTC' time.tzset() @@ -541,7 +498,7 @@ def __init__(self): self.txPLists = {} self.txPSums = {} self.times = [] - # algod_tx_pool_count{} + # algod_tx_pool_count self.txPool = [] # objectBytes = [(curtime, algod_go_memory_classes_heap_objects_bytes), ...] self.objectBytes = [] @@ -601,13 +558,13 @@ def process_files(self, args, nick=None, metrics_files=None, bisource=None): bi = bisource.get(curtime) if bi is None: logger.warning('%s no blockinfo', path) - self.txPool.append(cur.get('algod_tx_pool_count{}')) + self.txPool.append(cur.get('algod_tx_pool_count')) objectBytes = cur.get('algod_go_memory_classes_heap_objects_bytes') if objectBytes: self.objectBytes.append((curtime, objectBytes)) #logger.debug('%s: %r', path, cur) - verifyGood = cur.get('algod_agreement_proposal_verify_good{}') - verifyMs = cur.get('algod_agreement_proposal_verify_ms{}') + verifyGood = cur.get('algod_agreement_proposal_verify_good') + verifyMs = cur.get('algod_agreement_proposal_verify_ms') if verifyGood and verifyMs: # last writer wins self.verifyMillis = verifyMs / verifyGood @@ -626,8 +583,8 @@ def process_files(self, args, nick=None, metrics_files=None, bisource=None): rounds = (bi.get('block',{}).get('rnd', 0) - prevbi.get('block',{}).get('rnd', 0)) if rounds != 0: blocktime = dt/rounds - txBytes = d.get('algod_network_sent_bytes_total{}',0) - rxBytes = d.get('algod_network_received_bytes_total{}',0) + txBytes = d.get('algod_network_sent_bytes_total',0) + rxBytes = d.get('algod_network_received_bytes_total',0) txBytesPerSec = txBytes / dt rxBytesPerSec = rxBytes / dt # TODO: gather algod_network_sent_bytes_* and algod_network_received_bytes_* diff --git a/test/heapwatch/metrics_lib.py b/test/heapwatch/metrics_lib.py new file mode 100644 index 0000000000..fbda555b90 --- /dev/null +++ b/test/heapwatch/metrics_lib.py @@ -0,0 +1,272 @@ +#!/usr/bin/env python3 +# Copyright (C) 2019-2024 Algorand, Inc. +# This file is part of go-algorand +# +# go-algorand is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# go-algorand is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with go-algorand. If not, see . +# +### +# +# Common functions for processing /metrics data captured by heapWatch.py +# +import configparser +from datetime import datetime +from enum import Enum +import logging +import os +import re +import sys +from typing import Dict, Iterable, List, Optional, Tuple, Union +from urllib.parse import urlparse + + +from client_ram_report import dapp + + +logger = logging.getLogger(__name__) +metric_line_re = re.compile(r'(\S+\{[^}]*\})\s+(.*)') + +def num(x): + if '.' in x: + return float(x) + return int(x) + +def hunum(x): + if x >= 10000000000: + return '{:.1f}G'.format(x / 1000000000.0) + if x >= 1000000000: + return '{:.2f}G'.format(x / 1000000000.0) + if x >= 10000000: + return '{:.1f}M'.format(x / 1000000.0) + if x >= 1000000: + return '{:.2f}M'.format(x / 1000000.0) + if x >= 10000: + return '{:.1f}k'.format(x / 1000.0) + if x >= 1000: + return '{:.2f}k'.format(x / 1000.0) + return '{:.2f}x'.format(x) + + +def test_metric_line_re(): + testlines = ( + ('algod_network_connections_dropped_total{reason="write err"} 1', 1), + #('algod_network_sent_bytes_MS 274992', 274992), # handled by split + ) + for line, n in testlines: + try: + m = metric_line_re.match(line) + assert int(m.group(2)) == n + except: + print('failed on line %r', line) + raise + +def terraform_inventory_ip_not_names(tf_inventory_path): + """return ip to nickname mapping""" + tf_inventory = configparser.ConfigParser(allow_no_value=True) + tf_inventory.read(tf_inventory_path) + ip_to_name = {} + for k, sub in tf_inventory.items(): + if k.startswith('name_'): + for ip in sub: + if ip in ip_to_name: + logger.warning('ip %r already named %r, also got %r', ip, ip_to_name[ip], k) + ip_to_name[ip] = k + #logger.debug('names: %r', sorted(ip_to_name.values())) + #logger.debug('ip to name %r', ip_to_name) + return ip_to_name + +metrics_fname_re = re.compile(r'(.*?)\.(\d+_\d+)\.metrics') + +def gather_metrics_files_by_nick( + metrics_files: Iterable[str], nick_res: List[str], nick_lres: List[str] +) -> Dict[str, Dict[datetime, str]]: + """return {"node nickname": {datetime: path, ...}, ...}} + after resolving ip addresses into nodes nick names and applying nick_re and nick_lre filters. + """ + filesByNick = {} + tf_inventory_path = None + for path in metrics_files: + fname = os.path.basename(path) + if fname == 'terraform-inventory.host': + tf_inventory_path = path + continue + m = metrics_fname_re.match(fname) + if not m: + continue + nick = m.group(1) + timestamp = m.group(2) + timestamp = datetime.strptime(timestamp, '%Y%m%d_%H%M%S') + dapp(filesByNick, nick, timestamp, path) + + if tf_inventory_path: + # remap ip addresses to node names + ip_to_name = terraform_inventory_ip_not_names(tf_inventory_path) + filesByNick2 = {} + for nick in filesByNick.keys(): + parsed = urlparse('//' + nick) + name: str = ip_to_name.get(parsed.hostname) + val = filesByNick[nick] + filesByNick2[name] = val + + filesByNick = filesByNick2 + filesByNick2 = {} + + for nick in filesByNick.keys(): + if nick_res or not nick_res and not nick_lres: + # filter by regexp or apply default renaming + for nick_re in nick_res: + if re.match(nick_re, nick): + break + else: + if nick_res: + # regex is given but not matched, continue to the next node + continue + + # apply default renaming + name = nick + idx = name.find('_') + if idx != -1: + name = name[idx+1:] + val = filesByNick[nick] + filesByNick2[name] = val + + elif nick_lres: + # filter by label:regexp + label = None + for nick_lre in nick_lres: + label, nick_re = nick_lre.split(':') + if re.match(nick_re, nick): + break + else: + if nick_lres: + # regex is given but not matched, continue to the next node + continue + + val = filesByNick[nick] + filesByNick2[label] = val + else: + raise RuntimeError('unexpected options combination') + + if filesByNick2: + filesByNick = filesByNick2 + + return filesByNick + +class MetricType(Enum): + GAUGE = 0 + COUNTER = 1 + +class Metric: + """Metric with tags""" + def __init__(self, metric_name: str, type: MetricType, value: Union[int, float]): + full_name = metric_name.strip() + self.name = full_name + self.value = value + self.type = type + self.tags: Dict[str, str] = {} + self.tag_keys: set = set() + + det_idx = self.name.find('{') + if det_idx != -1: + self.name = self.name[:det_idx] + # ensure that the last character is '}' + idx = full_name.index('}') + if idx != len(full_name) - 1: + raise ValueError(f'Invalid metric name: {full_name}') + raw_tags = full_name[full_name.find('{')+1:full_name.find('}')] + tags = raw_tags.split(',') + for tag in tags: + key, value = tag.split('=') + if value[0] == '"' and value[-1] == '"': + value = value[1:-1] + self.tags[key] = value + self.tag_keys.add(key) + + def short_name(self): + return self.name + + def __str__(self): + return self.string() + + def string(self, tags: Optional[set[str]]=None): + result = self.name + if self.tags: + if not tags: + tags = self.tags + result += '{' + ','.join([f'{k}={v}' for k, v in sorted(self.tags.items()) if k in tags]) + '}' + return result + + def add_tag(self, key: str, value: str): + self.tags[key] = value + self.tag_keys.add(key) + + def has_tags(self, tag_keys: set, tags: Dict[str, str]): + """return True if all tags are present in the metric tags + tag_keys are not strictly needed but used as an optimization + """ + if self.tag_keys.intersection(tag_keys) != tag_keys: + return False + for k, v in tags.items(): + if self.tags.get(k) != v: + return False + return True + +def parse_metrics( + fin: Iterable[str], nick: str, metrics_names: set=None, diff: bool=None +) -> Dict[str, List[Metric]]: + """Parse metrics file and return dicts of metric names (no tags) and list of Metric objects + each containing the metric name, value and tags. + """ + out = {} + try: + last_type = None + for line in fin: + if not line: + continue + line = line.strip() + if not line: + continue + if line[0] == '#': + if line.startswith('# TYPE'): + tpe = line.split()[-1] + if tpe == 'gauge': + last_type = MetricType.GAUGE + elif tpe == 'counter': + last_type = MetricType.COUNTER + continue + m = metric_line_re.match(line) + if m: + name = m.group(1) + value = num(m.group(2)) + else: + ab = line.split() + name = ab[0] + value = num(ab[1]) + + metric = Metric(name, last_type, value) + metric.add_tag('n', nick) + if not metrics_names or metric.name in metrics_names: + if metric.name not in out: + out[metric.name] = [metric] + else: + out[metric.name].append(metric) + except: + print(f'An exception occurred in parse_metrics: {sys.exc_info()}') + pass + if diff and metrics_names and len(metrics_names) == 2 and len(out) == 2: + m = list(out.keys()) + name = f'{m[0]}_-_{m[1]}' + metric = Metric(name, MetricType.GAUGE, out[m[0]].value - out[m[1]].value) + out = [{name: metric}] + + return out diff --git a/test/heapwatch/metrics_viz.py b/test/heapwatch/metrics_viz.py index 584fc0ae59..741aa2dd73 100644 --- a/test/heapwatch/metrics_viz.py +++ b/test/heapwatch/metrics_viz.py @@ -11,13 +11,11 @@ """ import argparse -from datetime import datetime import glob import logging import os import re import time -from typing import Dict, Iterable, Tuple import sys import dash @@ -25,95 +23,24 @@ import plotly.graph_objs as go from plotly.subplots import make_subplots -from metrics_delta import metric_line_re, num, terraform_inventory_ip_not_names -from client_ram_report import dapp +from metrics_lib import MetricType, parse_metrics, gather_metrics_files_by_nick logger = logging.getLogger(__name__) -metrics_fname_re = re.compile(r'(.*?)\.(\d+_\d+)\.metrics') - -def gather_metrics_files_by_nick(metrics_files: Iterable[str]) -> Dict[str, Dict[datetime, str]]: - """return {"node nickname": {datetime: path, ...}, ...}}""" - filesByNick = {} - tf_inventory_path = None - for path in metrics_files: - fname = os.path.basename(path) - if fname == 'terraform-inventory.host': - tf_inventory_path = path - continue - m = metrics_fname_re.match(fname) - if not m: - continue - nick = m.group(1) - timestamp = m.group(2) - timestamp = datetime.strptime(timestamp, '%Y%m%d_%H%M%S') - dapp(filesByNick, nick, timestamp, path) - return tf_inventory_path, filesByNick - - -TYPE_GAUGE = 0 -TYPE_COUNTER = 1 - -def parse_metrics(fin: Iterable[str], nick: str, metrics_names: set=None, diff: bool=None) -> Tuple[Dict[str, float], Dict[str, int]]: - """Parse metrics file and return dicts of values and types""" - out = {} - types = {} - try: - last_type = None - for line in fin: - if not line: - continue - line = line.strip() - if not line: - continue - if line[0] == '#': - if line.startswith('# TYPE'): - tpe = line.split()[-1] - if tpe == 'gauge': - last_type = TYPE_GAUGE - elif tpe == 'counter': - last_type = TYPE_COUNTER - continue - m = metric_line_re.match(line) - if m: - name = m.group(1) - value = num(m.group(2)) - else: - ab = line.split() - name = ab[0] - value = num(ab[1]) - - det_idx = name.find('{') - if det_idx != -1: - name = name[:det_idx] - fullname = f'{name}{{n={nick}}}' - if not metrics_names or name in metrics_names: - out[fullname] = value - types[fullname] = last_type - except: - print(f'An exception occurred in parse_metrics: {sys.exc_info()}') - pass - if diff and metrics_names and len(metrics_names) == 2 and len(out) == 2: - m = list(out.keys()) - name = f'{m[0]}_-_{m[1]}' - new_out = {name: out[m[0]] - out[m[1]]} - new_types = {name: TYPE_GAUGE} - out = new_out - types = new_types - - return out, types - def main(): os.environ['TZ'] = 'UTC' time.tzset() - default_output_file = 'metrics_viz.png' + default_img_filename = 'metrics_viz.png' + default_html_filename = 'metrics_viz.html' ap = argparse.ArgumentParser() ap.add_argument('metrics_names', nargs='+', default=None, help='metric name(s) to track') ap.add_argument('-d', '--dir', type=str, default=None, help='dir path to find /*.metrics in') ap.add_argument('-l', '--list-nodes', default=False, action='store_true', help='list available node names with metrics') - ap.add_argument('-s', '--save', action='store_true', default=None, help=f'save plot to \'{default_output_file}\' file instead of showing it') + ap.add_argument('--nick-re', action='append', default=[], help='regexp to filter node names, may be repeated') + ap.add_argument('--nick-lre', action='append', default=[], help='label:regexp to filter node names, may be repeated') + ap.add_argument('-s', '--save', type=str, choices=['png', 'html'], help=f'save plot to \'{default_img_filename}\' or \'{default_html_filename}\' file instead of showing it') ap.add_argument('--diff', action='store_true', default=None, help='diff two gauge metrics instead of plotting their values. Requires two metrics names to be set') ap.add_argument('--verbose', default=False, action='store_true') @@ -128,16 +55,8 @@ def main(): return 1 metrics_files = sorted(glob.glob(os.path.join(args.dir, '*.metrics'))) - tf_inventory_path, filesByNick = gather_metrics_files_by_nick(metrics_files) - if tf_inventory_path: - # remap ip addresses to node names - ip_to_name = terraform_inventory_ip_not_names(tf_inventory_path) - for nick in filesByNick.keys(): - name = ip_to_name.get(nick) - if name: - val = filesByNick[nick] - filesByNick[name] = val - del filesByNick[nick] + metrics_files.extend(glob.glob(os.path.join(args.dir, 'terraform-inventory.host'))) + filesByNick = gather_metrics_files_by_nick(metrics_files, args.nick_re, args.nick_lre) if args.list_nodes: print('Available nodes:', ', '.join(sorted(filesByNick.keys()))) @@ -156,50 +75,76 @@ def main(): fig = make_subplots( rows=nrows, cols=1, - vertical_spacing=0.03, shared_xaxes=True) + vertical_spacing=0.03, shared_xaxes=True, + subplot_titles=[f'{name}' for name in sorted(metrics_names)], + ) fig['layout']['margin'] = { - 'l': 30, 'r': 10, 'b': 10, 't': 10 + 'l': 30, 'r': 10, 'b': 10, 't': 20 } fig['layout']['height'] = 500 * nrows # fig.update_layout(template="plotly_dark") - data = { - 'time': [], - } - raw_series = {} - for nick, items in filesByNick.items(): - active_metrics = set() - for dt, metrics_file in items.items(): + for nick, files_by_date in filesByNick.items(): + active_metrics = {} + data = {'time': []} + raw_series = {} + raw_times = {} + idx = 0 + for dt, metrics_file in files_by_date.items(): data['time'].append(dt) with open(metrics_file, 'rt') as f: - metrics, types = parse_metrics(f, nick, metrics_names, args.diff) - for metric_name, metric_value in metrics.items(): - raw_value = metric_value - if metric_name not in data: - data[metric_name] = [] - raw_series[metric_name] = [] - if types[metric_name] == TYPE_COUNTER: - if len(raw_series[metric_name]) > 0: - metric_value = (metric_value - raw_series[metric_name][-1]) / (dt - data['time'][-2]).total_seconds() - else: - metric_value = 0 - data[metric_name].append(metric_value) - raw_series[metric_name].append(raw_value) - - active_metrics.add(metric_name) - - for i, metric in enumerate(sorted(active_metrics)): - fig.append_trace(go.Scatter( - x=data['time'], - y=data[metric], - name=metric, - mode='lines+markers', - line=dict(width=1), - ), i+1, 1) + metrics = parse_metrics(f, nick, metrics_names, args.diff) + for metric_name, metrics_seq in metrics.items(): + active_metric_names = [] + for metric in metrics_seq: + raw_value = metric.value + + full_name = metric.string() + if full_name not in data: + # handle gaps in data, sometimes metric file might miss a value + # but the chart requires matching x and y series (time and metric value) + # data is what does into the chart, and raw_series is used to calculate + data[full_name] = [0] * len(files_by_date) + raw_series[full_name] = [] + raw_times[full_name] = [] + + metric_value = metric.value + if metric.type == MetricType.COUNTER: + if len(raw_series[full_name]) > 0 and len(raw_times[full_name]) > 0: + metric_value = (metric_value - raw_series[full_name][-1]) / (dt - raw_times[full_name][-1]).total_seconds() + else: + metric_value = 0 + + data[full_name][idx] = metric_value + raw_series[full_name].append(raw_value) + raw_times[full_name].append(dt) + + active_metric_names.append(full_name) + + active_metric_names.sort() + active_metrics[metric_name] = active_metric_names + idx += 1 + + for i, metric_pair in enumerate(sorted(active_metrics.items())): + metric_name, metric_fullnames = metric_pair + for metric_fullname in metric_fullnames: + fig.append_trace(go.Scatter( + x=data['time'], + y=data[metric_fullname], + name=metric_fullname, + mode='lines+markers', + line=dict(width=1), + ), i+1, 1) if args.save: - fig.write_image(os.path.join(args.dir, default_output_file)) + if args.save == 'html': + target_path = os.path.join(args.dir, default_html_filename) + fig.write_html(target_path) + else: + target_path = os.path.join(args.dir, default_img_filename) + fig.write_image(target_path) + print(f'Saved plot to {target_path}') else: fig.show() diff --git a/test/heapwatch/requirements.txt b/test/heapwatch/requirements.txt index d4d68874dd..db92372c6d 100644 --- a/test/heapwatch/requirements.txt +++ b/test/heapwatch/requirements.txt @@ -5,3 +5,6 @@ matplotlib==3.7.2 plotly==5.16.0 py-algorand-sdk==2.3.0 kaleido==0.2.1 +networkx==3.3 +gravis=0.1.0 +termcolor=2.4.0 diff --git a/test/heapwatch/topology-extract-p2p.py b/test/heapwatch/topology-extract-p2p.py new file mode 100644 index 0000000000..41f2be9ffc --- /dev/null +++ b/test/heapwatch/topology-extract-p2p.py @@ -0,0 +1,104 @@ +""" +P2P network topology extraction script from node.log files. + +1. Run P2P scenario like scenario1s-p2p +2. Fetch logs with `algonet play fetch_node_logs` +3. Extract logs +``` +cd nodelog +find . -name 'nodelog.tar.gz' -print | xargs -I{} tar -zxf {} +``` +4. Run this script `python3 topology-extract-p2p.py -o top.json nodelog` +5. Run the visualizer `topology-viz.py top.json` +""" +import argparse +from datetime import datetime +import json +import re +import os +import sys + + +def main(): + # Regex patterns to find node IDs and connections + node_pattern = r"P2P host created: peer ID (\w{52})" + edge_pattern = r"Made outgoing connection to peer (\w{52})" + + ap = argparse.ArgumentParser() + ap.add_argument('log_dir_path', help='logs directory path') + ap.add_argument('-o', '--output', type=argparse.FileType('wt', encoding='utf-8'), help=f'save topology to the file specified instead of showing it') + ap.add_argument('-t', '--timestamp', action='store_true', help=f'store connection timestamp for each edge') + + args = ap.parse_args() + + # Directory containing log files + log_dir_path = args.log_dir_path + + nodes = [] + edges = [] + mapping = {} + + # Iterate through all files in the specified directory + for filename in os.listdir(log_dir_path): + if filename.endswith("-node.log"): + with open(os.path.join(log_dir_path, filename), 'r') as file: + mapped = filename[:len(filename) - len('-node.log')] + mapped = mapped.replace('relay', 'R') + mapped = mapped.replace('nonParticipatingNode', 'NPN') + mapped = mapped.replace('node', 'N') + node_id = None + for line in file: + # Check if line contains relevant substrings before parsing as JSON + if "P2P host created" in line or "Made outgoing connection to peer" in line: + data = json.loads(line.strip()) + + # Check for node creation + if "P2P host created" in data.get("msg", ""): + match = re.search(node_pattern, data["msg"]) + if match: + node_id = match.group(1) + nodes.append(node_id) + mapping[node_id] = mapped + + # Check for connections + elif "Made outgoing connection to peer" in data.get("msg", ""): + match = re.search(edge_pattern, data["msg"]) + if match: + target_node_id = match.group(1) + match = re.findall(r"/p2p/(\w{52})", data["local"]) + if match: + source_node_id = match[0] + else: + print('WARN: no local addr set', data, file=sys.stderr) + source_node_id = node_id + + if args.timestamp: + # datetime is not serializable, so we store it as string for now + edge = (source_node_id, target_node_id, {'dt': data["time"]}) + else: + edge = (source_node_id, target_node_id) + + edges.append(edge) + + result = { + "mapping": mapping, + "nodes": nodes, + "edges": edges + } + + if args.timestamp and not args.output: + edges = sorted(edges, key=lambda x: x[2]['dt']) + for edge in edges: + ts = datetime.strptime(edge[2]['dt'], "%Y-%m-%dT%H:%M:%S.%f%z") + print('%15s %5s -> %-5s' % (ts.strftime('%H:%M:%S.%f'), mapping[edge[0]], mapping[edge[1]])) + return + + if args.output: + json.dump(result, args.output, indent=2) + else: + json.dump(result, sys.stdout, indent=2) + print(file=sys.stdout) + + +if __name__ == '__main__': + main() diff --git a/test/heapwatch/topology-extract-ws.py b/test/heapwatch/topology-extract-ws.py new file mode 100644 index 0000000000..75f1d99f57 --- /dev/null +++ b/test/heapwatch/topology-extract-ws.py @@ -0,0 +1,115 @@ +""" +WSNet network topology extraction script from node.log files. + +1. Run cluster scenario like scenario1s +2. Fetch logs with `algonet play fetch_node_logs` +3. Extract logs +``` +cd nodelog +find . -name 'nodelog.tar.gz' -print | xargs -I{} tar -zxf {} +``` +4. Run this script `python3 topology-extract-ws.py -o top.json -i ../terraform-inventory.json nodelog` +5. Run the visualizer `topology-viz.py top.json` +""" +import argparse +from datetime import datetime +import json +import os +import sys + +def main(): + ap = argparse.ArgumentParser() + ap.add_argument('log_dir_path', help='logs directory path') + ap.add_argument('-i', '--inventory-file', type=argparse.FileType('rt', encoding='utf-8'), required=True, help='terraform inventory file path') + ap.add_argument('-o', '--output', type=argparse.FileType('wt', encoding='utf-8'), help=f'save topology to the file specified instead of showing it') + ap.add_argument('-t', '--timestamp', action='store_true', help=f'store connection timestamp for each edge') + + args = ap.parse_args() + + # Directory containing log files + log_dir_path = args.log_dir_path + inventory_file = args.inventory_file + + nodes = [] + edges = [] + mapping = {} + + inventory = json.load(inventory_file) + + ip_to_name = {} + for k, v in inventory.items(): + if k.startswith('name_'): + name = k.split('_')[1].upper() + if not isinstance(v, list) or len(v) != 1: + raise RuntimeError(f"Invalid inventory entry, expected a single item list: {k}={v}") + ip = v[0] + ip_to_name[ip] = name + # no need for mapping but keep the data compatible with the topology-viz script + mapping[name] = name + + # Iterate through all files in the specified directory + for filename in os.listdir(log_dir_path): + if filename.endswith('-node.log'): + with open(os.path.join(log_dir_path, filename), 'r') as file: + mapped = filename[:len(filename) - len('-node.log')] + mapped = mapped.replace('relay', 'R') + mapped = mapped.replace('nonParticipatingNode', 'NPN') + mapped = mapped.replace('node', 'N') + nodes.append(mapped) + for line in file: + # Check if line contains relevant substrings before parsing as JSON + if "Accepted incoming connection from peer" in line or "Made outgoing connection to peer" in line: + data = json.loads(line.strip()) + + # Check for incoming connections + if "Accepted incoming connection from peer" in data.get("msg", ""): + remote = data['remote'] + remote_ip = remote.split(':')[0] + remote_name = ip_to_name[remote_ip] + source = remote_name + target = mapped + edges.append((source, target)) + + # Check for outgoing connections + elif "Made outgoing connection to peer" in data.get('msg', ""): + remote = data['remote'] + name: str = remote.split('.')[0] + # check ip or name + if name.isdigit(): + remote_ip = remote.split(':')[0] + remote_name = ip_to_name[remote_ip] + target = remote_name + source = mapped + else: + target = name.upper() + source = mapped + + if args.timestamp: + # datetime is not serializable, so we store it as string for now + edge = (source, target, {'dt': data["time"]}) + else: + edge = (source, target) + + edges.append(edge) + + result = { + "mapping": mapping, + "nodes": nodes, + "edges": edges + } + + if args.timestamp and not args.output: + edges = sorted(edges, key=lambda x: x[2]['dt']) + for edge in edges: + ts = datetime.strptime(edge[2]['dt'], "%Y-%m-%dT%H:%M:%S.%f%z") + print('%15s %5s -> %-5s' % (ts.strftime('%H:%M:%S.%f'), edge[0], edge[1])) + return + + if args.output: + json.dump(result, args.output, indent=2) + else: + json.dump(result, sys.stdout, indent=2) + print(file=sys.stdout) + +if __name__ == '__main__': + main() diff --git a/test/heapwatch/topology-viz.py b/test/heapwatch/topology-viz.py new file mode 100644 index 0000000000..1393421696 --- /dev/null +++ b/test/heapwatch/topology-viz.py @@ -0,0 +1,75 @@ +""" +P2P network topology visualization script. +See topology-extract-p2p[-ws].py for details. +""" +import argparse +import json +import sys + +import gravis as gv +import networkx as nx + +ap = argparse.ArgumentParser() +ap.add_argument('topology_filename', help='topology json file') +ap.add_argument('-o', '--output', type=argparse.FileType('wt', encoding='utf-8'), help=f'save plot to the file specified instead of showing it') + +args = ap.parse_args() + +with open(args.topology_filename, 'rt') as f: + topology = json.load(f) + +# Create a new directed graph +G = nx.DiGraph() + +G.add_edges_from(topology['edges']) +nx.relabel_nodes(G, topology['mapping'], copy=False) + +# Set node colors +for node in G: + if node.startswith('R'): + G.nodes[node]['color'] = 'red' + elif node.startswith('NPN'): + G.nodes[node]['color'] = 'blue' + elif node.startswith('N'): + G.nodes[node]['color'] = 'green' + else: + raise RuntimeError(f"Unknown node type: {node}") + +# Calculate in-degrees +in_degrees = dict(G.in_degree()) +out_degrees = dict(G.out_degree()) +degree_centrality = nx.degree_centrality(G) +load_centrality = nx.algorithms.load_centrality(G) + +for node in G: + size = max(2, in_degrees[node]) + G.nodes[node]['size'] = size + G.nodes[node]['in_degree'] = in_degrees[node] + G.nodes[node]['out_degree'] = out_degrees[node] + hover = f'In: {in_degrees[node]}, Out: {out_degrees[node]}' + hover += f'\nDegree centrality: {degree_centrality[node]:.2f}' + hover += f'\nLoad centrality: {load_centrality[node]:.2f}' + G.nodes[node]['hover'] = hover + +print('Transitivity:', nx.transitivity(G)) +print('Clustering coefficient:', nx.average_clustering(G)) +print('Avg shortest path length:', nx.average_shortest_path_length(G.to_undirected(as_view=True))) + +res = gv.d3( + G, + node_hover_tooltip=True, + node_size_data_source='size', + node_label_size_factor=0.5, + use_node_size_normalization=True, + node_size_normalization_max=20, + use_edge_size_normalization=True, + edge_curvature=0.1 + ) + +if not args.output: + res.display() + sys.exit(0) + +# Save to file +data = res.to_html() +args.output.write(data) diff --git a/test/testdata/configs/config-v34.json b/test/testdata/configs/config-v34.json index d695679067..b8701ccab3 100644 --- a/test/testdata/configs/config-v34.json +++ b/test/testdata/configs/config-v34.json @@ -30,7 +30,7 @@ "ConnectionsRateLimitingWindowSeconds": 1, "CrashDBDir": "", "DNSBootstrapID": ".algorand.network?backup=.algorand.net&dedup=.algorand-.(network|net)", - "DNSSecurityFlags": 1, + "DNSSecurityFlags": 9, "DeadlockDetection": 0, "DeadlockDetectionThreshold": 30, "DisableAPIAuth": false, diff --git a/test/testdata/deployednettemplates/recipes/hello-world-small-p2p/genesis.json b/test/testdata/deployednettemplates/recipes/hello-world-small-p2p/genesis.json new file mode 100644 index 0000000000..7ae67edf88 --- /dev/null +++ b/test/testdata/deployednettemplates/recipes/hello-world-small-p2p/genesis.json @@ -0,0 +1,30 @@ +{ + "NetworkName": "hello-p2p", + "VersionModifier": "", + "ConsensusProtocol": "future", + "FirstPartKeyRound": 0, + "LastPartKeyRound": 5000, + "PartKeyDilution": 0, + "Wallets": [ + { + "Name": "Wallet1", + "Stake": 25, + "Online": true + }, + { + "Name": "Wallet2", + "Stake": 25, + "Online": true + }, + { + "Name": "Wallet3", + "Stake": 25, + "Online": true + }, + { + "Name": "Wallet4", + "Stake": 25, + "Online": false + } + ] +} diff --git a/test/testdata/deployednettemplates/recipes/hello-world-small-p2p/net.json b/test/testdata/deployednettemplates/recipes/hello-world-small-p2p/net.json new file mode 100644 index 0000000000..423d31c1a4 --- /dev/null +++ b/test/testdata/deployednettemplates/recipes/hello-world-small-p2p/net.json @@ -0,0 +1,107 @@ +{ + "Hosts": [ + { + "Name": "R1", + "Nodes": [ + { + "Name": "relay1", + "IsRelay": true, + "Wallets": [ + { + "Name": "Wallet1", + "ParticipationOnly": false + } + ], + "NetAddress": "{{NetworkPort}}", + "APIEndpoint": "{{APIEndpoint}}", + "APIToken": "{{APIToken}}", + "AdminAPIToken": "{{AdminAPIToken}}", + "EnableTelemetry": true, + "TelemetryURI": "{{TelemetryURI}}", + "EnableMetrics": true, + "MetricsURI": "{{MetricsURI}}", + "EnableService": false, + "EnableBlockStats": true, + "P2PBootstrap": true, + "ConfigJSONOverride": "{ \"DNSBootstrapID\": \".algodev.network\", \"DeadlockDetection\": -1, \"PeerPingPeriodSeconds\": 30, \"EnableAgreementReporting\": true, \"EnableAgreementTimeMetrics\": true, \"EnableAssembleStats\": true, \"EnableProcessBlockStats\": true, \"EnableProfiler\": true, \"EnableRuntimeMetrics\": true, \"EnableExperimentalAPI\": true, \"EnableAccountUpdatesStats\": true, \"EnableP2P\": true }" + } + ] + }, + { + "Name": "R2", + "Nodes": [ + { + "Name": "relay2", + "IsRelay": true, + "Wallets": [ + { + "Name": "Wallet2", + "ParticipationOnly": false + } + ], + "NetAddress": "{{NetworkPort}}", + "APIEndpoint": "{{APIEndpoint}}", + "APIToken": "{{APIToken}}", + "AdminAPIToken": "{{AdminAPIToken}}", + "EnableTelemetry": true, + "TelemetryURI": "{{TelemetryURI}}", + "EnableMetrics": true, + "MetricsURI": "{{MetricsURI}}", + "EnableService": false, + "EnableBlockStats": true, + "P2PBootstrap": true, + "ConfigJSONOverride": "{ \"DNSBootstrapID\": \".algodev.network\",\"DeadlockDetection\": -1, \"PeerPingPeriodSeconds\": 30, \"EnableAgreementReporting\": true, \"EnableAgreementTimeMetrics\": true, \"EnableAssembleStats\": true, \"EnableProcessBlockStats\": true, \"EnableProfiler\": true, \"EnableRuntimeMetrics\": true, \"EnableExperimentalAPI\": true, \"EnableAccountUpdatesStats\": true, \"EnableP2P\": true }" + } + ] + }, + { + "Name": "N1", + "Group": "", + "Nodes": [ + { + "Name": "node1", + "Wallets": [ + { + "Name": "Wallet3", + "ParticipationOnly": false + } + ], + "APIEndpoint": "{{APIEndpoint}}", + "APIToken": "{{APIToken}}", + "AdminAPIToken": "{{AdminAPIToken}}", + "EnableTelemetry": false, + "TelemetryURI": "{{TelemetryURI}}", + "EnableMetrics": true, + "MetricsURI": "{{MetricsURI}}", + "EnableService": false, + "EnableBlockStats": false, + "ConfigJSONOverride": "{ \"TxPoolExponentialIncreaseFactor\": 1, \"DNSBootstrapID\": \".algodev.network\", \"DeadlockDetection\": -1, \"PeerPingPeriodSeconds\": 30, \"BaseLoggerDebugLevel\": 4, \"EnableProfiler\": true, \"CadaverSizeTarget\": 0, \"EnableAccountUpdatesStats\": true, \"EnableProfiler\": true, \"EnableRuntimeMetrics\": true, \"EnableExperimentalAPI\": true, \"EnableAccountUpdatesStats\": true, \"EnableP2P\": true }" + } + ] + }, + { + "Name": "NPN1", + "Group": "", + "Nodes": [ + { + "Name": "nonParticipatingNode1", + "Wallets": [ + { + "Name": "Wallet4", + "ParticipationOnly": false + } + ], + "APIEndpoint": "{{APIEndpoint}}", + "APIToken": "{{APIToken}}", + "AdminAPIToken": "{{AdminAPIToken}}", + "EnableTelemetry": false, + "EnableMetrics": true, + "MetricsURI": "{{MetricsURI}}", + "EnableService": false, + "EnableBlockStats": false, + "ConfigJSONOverride": "{ \"TxPoolExponentialIncreaseFactor\": 1, \"DNSBootstrapID\": \".algodev.network\", \"DeadlockDetection\": -1, \"BaseLoggerDebugLevel\": 4, \"CadaverSizeTarget\": 0, \"EnableProfiler\": true, \"EnableRuntimeMetrics\": true, \"EnableExperimentalAPI\": true, \"EnableAccountUpdatesStats\": true, \"EnableP2P\": true}" + } + ] + } + ] +} diff --git a/test/testdata/deployednettemplates/recipes/hello-world-small-p2p/recipe.json b/test/testdata/deployednettemplates/recipes/hello-world-small-p2p/recipe.json new file mode 100644 index 0000000000..a2f88f63b4 --- /dev/null +++ b/test/testdata/deployednettemplates/recipes/hello-world-small-p2p/recipe.json @@ -0,0 +1,7 @@ +{ + "GenesisFile":"genesis.json", + "NetworkFile":"net.json", + "ConfigFile": "../../configs/reference.json", + "HostTemplatesFile": "../../hosttemplates/hosttemplates.json", + "TopologyFile": "topology.json" +} diff --git a/test/testdata/deployednettemplates/recipes/hello-world-small-p2p/topology.json b/test/testdata/deployednettemplates/recipes/hello-world-small-p2p/topology.json new file mode 100644 index 0000000000..acc7cca9ec --- /dev/null +++ b/test/testdata/deployednettemplates/recipes/hello-world-small-p2p/topology.json @@ -0,0 +1,20 @@ +{ + "Hosts": [ + { + "Name": "R1", + "Template": "AWS-US-EAST-1-Small" + }, + { + "Name": "R2", + "Template": "AWS-US-EAST-1-Small" + }, + { + "Name": "N1", + "Template": "AWS-US-EAST-1-Small" + }, + { + "Name": "NPN1", + "Template": "AWS-US-EAST-1-Small" + } + ] +} diff --git a/test/testdata/deployednettemplates/recipes/hello-world-tiny-p2p/genesis.json b/test/testdata/deployednettemplates/recipes/hello-world-tiny-p2p/genesis.json new file mode 100644 index 0000000000..7ae67edf88 --- /dev/null +++ b/test/testdata/deployednettemplates/recipes/hello-world-tiny-p2p/genesis.json @@ -0,0 +1,30 @@ +{ + "NetworkName": "hello-p2p", + "VersionModifier": "", + "ConsensusProtocol": "future", + "FirstPartKeyRound": 0, + "LastPartKeyRound": 5000, + "PartKeyDilution": 0, + "Wallets": [ + { + "Name": "Wallet1", + "Stake": 25, + "Online": true + }, + { + "Name": "Wallet2", + "Stake": 25, + "Online": true + }, + { + "Name": "Wallet3", + "Stake": 25, + "Online": true + }, + { + "Name": "Wallet4", + "Stake": 25, + "Online": false + } + ] +} diff --git a/test/testdata/deployednettemplates/recipes/hello-world/hosttemplates.json b/test/testdata/deployednettemplates/recipes/hello-world-tiny-p2p/hosttemplates.json similarity index 100% rename from test/testdata/deployednettemplates/recipes/hello-world/hosttemplates.json rename to test/testdata/deployednettemplates/recipes/hello-world-tiny-p2p/hosttemplates.json diff --git a/test/testdata/deployednettemplates/recipes/hello-world-tiny-p2p/net.json b/test/testdata/deployednettemplates/recipes/hello-world-tiny-p2p/net.json new file mode 100644 index 0000000000..8ea8328c62 --- /dev/null +++ b/test/testdata/deployednettemplates/recipes/hello-world-tiny-p2p/net.json @@ -0,0 +1,101 @@ +{ + "Hosts": [ + { + "Name": "R1", + "Nodes": [ + { + "Name": "relay1", + "IsRelay": true, + "Wallets": [ + { + "Name": "Wallet1", + "ParticipationOnly": false + } + ], + "NetAddress": "{{NetworkPort}}", + "APIEndpoint": "{{APIEndpoint}}", + "APIToken": "{{APIToken}}", + "EnableTelemetry": true, + "TelemetryURI": "{{TelemetryURI}}", + "EnableMetrics": true, + "MetricsURI": "{{MetricsURI}}", + "EnableService": false, + "EnableBlockStats": true, + "P2PBootstrap": true, + "ConfigJSONOverride": "{ \"DNSBootstrapID\": \".algodev.network\", \"DeadlockDetection\": -1, \"PeerPingPeriodSeconds\": 30, \"EnableAgreementReporting\": true, \"EnableAgreementTimeMetrics\": true, \"EnableAssembleStats\": true, \"EnableProcessBlockStats\": true, \"EnableP2P\": true }" + } + ] + }, + { + "Name": "R2", + "Nodes": [ + { + "Name": "relay2", + "IsRelay": true, + "Wallets": [ + { + "Name": "Wallet2", + "ParticipationOnly": false + } + ], + "NetAddress": "{{NetworkPort}}", + "APIEndpoint": "{{APIEndpoint}}", + "APIToken": "{{APIToken}}", + "EnableTelemetry": true, + "TelemetryURI": "{{TelemetryURI}}", + "EnableMetrics": true, + "MetricsURI": "{{MetricsURI}}", + "EnableService": false, + "EnableBlockStats": true, + "ConfigJSONOverride": "{ \"DNSBootstrapID\": \".algodev.network\",\"DeadlockDetection\": -1, \"PeerPingPeriodSeconds\": 30, \"EnableAgreementReporting\": true, \"EnableAgreementTimeMetrics\": true, \"EnableAssembleStats\": true, \"EnableProcessBlockStats\": true, \"EnableP2P\": true }" + } + ] + }, + { + "Name": "N1", + "Group": "", + "Nodes": [ + { + "Name": "node1", + "Wallets": [ + { + "Name": "Wallet3", + "ParticipationOnly": false + } + ], + "APIEndpoint": "{{APIEndpoint}}", + "APIToken": "{{APIToken}}", + "EnableTelemetry": false, + "TelemetryURI": "{{TelemetryURI}}", + "EnableMetrics": false, + "MetricsURI": "{{MetricsURI}}", + "EnableService": false, + "EnableBlockStats": false, + "ConfigJSONOverride": "{ \"TxPoolExponentialIncreaseFactor\": 1, \"DNSBootstrapID\": \".algodev.network\", \"DeadlockDetection\": -1, \"PeerPingPeriodSeconds\": 30, \"BaseLoggerDebugLevel\": 4, \"EnableProfiler\": true, \"CadaverSizeTarget\": 0, \"EnableAccountUpdatesStats\": true, \"EnableP2P\": true }" + } + ] + }, + { + "Name": "NPN1", + "Group": "", + "Nodes": [ + { + "Name": "nonParticipatingNode1", + "Wallets": [ + { + "Name": "Wallet4", + "ParticipationOnly": false + } + ], + "APIEndpoint": "{{APIEndpoint}}", + "APIToken": "{{APIToken}}", + "EnableTelemetry": false, + "EnableMetrics": false, + "EnableService": false, + "EnableBlockStats": false, + "ConfigJSONOverride": "{ \"TxPoolExponentialIncreaseFactor\": 1, \"DNSBootstrapID\": \".algodev.network\", \"DeadlockDetection\": -1, \"BaseLoggerDebugLevel\": 4, \"CadaverSizeTarget\": 0, \"EnableP2P\": true }" + } + ] + } + ] +} diff --git a/test/testdata/deployednettemplates/recipes/hello-world-tiny-p2p/recipe.json b/test/testdata/deployednettemplates/recipes/hello-world-tiny-p2p/recipe.json new file mode 100644 index 0000000000..be6b71ec55 --- /dev/null +++ b/test/testdata/deployednettemplates/recipes/hello-world-tiny-p2p/recipe.json @@ -0,0 +1,7 @@ +{ + "GenesisFile":"genesis.json", + "NetworkFile":"net.json", + "ConfigFile": "../../configs/reference.json", + "HostTemplatesFile": "../../hosttemplates/t2micro-useast1.json", + "TopologyFile": "topology.json" +} diff --git a/test/testdata/deployednettemplates/recipes/hello-world-tiny-p2p/topology.json b/test/testdata/deployednettemplates/recipes/hello-world-tiny-p2p/topology.json new file mode 100644 index 0000000000..acc7cca9ec --- /dev/null +++ b/test/testdata/deployednettemplates/recipes/hello-world-tiny-p2p/topology.json @@ -0,0 +1,20 @@ +{ + "Hosts": [ + { + "Name": "R1", + "Template": "AWS-US-EAST-1-Small" + }, + { + "Name": "R2", + "Template": "AWS-US-EAST-1-Small" + }, + { + "Name": "N1", + "Template": "AWS-US-EAST-1-Small" + }, + { + "Name": "NPN1", + "Template": "AWS-US-EAST-1-Small" + } + ] +} diff --git a/test/testdata/deployednettemplates/recipes/hello-world/genesis.json b/test/testdata/deployednettemplates/recipes/hello-world/genesis.json index 218b694d5f..b7fdd9502b 100644 --- a/test/testdata/deployednettemplates/recipes/hello-world/genesis.json +++ b/test/testdata/deployednettemplates/recipes/hello-world/genesis.json @@ -3,7 +3,7 @@ "VersionModifier": "", "ConsensusProtocol": "future", "FirstPartKeyRound": 0, - "LastPartKeyRound": 1000300, + "LastPartKeyRound": 5000, "PartKeyDilution": 0, "Wallets": [ { diff --git a/test/testdata/deployednettemplates/recipes/scenario1s-p2p/Makefile b/test/testdata/deployednettemplates/recipes/scenario1s-p2p/Makefile new file mode 100644 index 0000000000..f4ec4b3c1f --- /dev/null +++ b/test/testdata/deployednettemplates/recipes/scenario1s-p2p/Makefile @@ -0,0 +1,23 @@ +# scenario1s is scenario1 but smaller, (100 nodes, 100 wallets) -> (20 nodes, 20 wallets), each algod gets single tenancy on a smaller ec2 instance +PARAMS=-w 20 -R 8 -N 20 -n 20 --npn-algod-nodes 10 --node-template node.json --relay-template relay.json --non-participating-node-template nonPartNode.json + +.PHONY: clean all + +all: net.json genesis.json topology.json + +node.json nonPartNode.json relay.json: + python3 copy-node-configs.py + +net.json: node.json nonPartNode.json relay.json ${GOPATH}/bin/netgoal Makefile + netgoal generate -t net -r /tmp/wat -o net.json ${PARAMS} + +genesis.json: ${GOPATH}/bin/netgoal Makefile + netgoal generate -t genesis -r /tmp/wat -o genesis.l.json ${PARAMS} + jq '.LastPartKeyRound=5000|.NetworkName="s1s-p2p"|.ConsensusProtocol="future"' < genesis.l.json > genesis.json + rm genesis.l.json + +topology.json: ../scenario1s/gen_topology.py + python3 ../scenario1s/gen_topology.py + +clean: + rm -f net.json genesis.json node.json nonPartNode.json relay.json topology.json diff --git a/test/testdata/deployednettemplates/recipes/scenario1s-p2p/README.md b/test/testdata/deployednettemplates/recipes/scenario1s-p2p/README.md new file mode 100644 index 0000000000..1cad95bc2d --- /dev/null +++ b/test/testdata/deployednettemplates/recipes/scenario1s-p2p/README.md @@ -0,0 +1,16 @@ +# Scenario1s for P2P testing + +This is a copy of scenario1s with the following changes in nodes configuration: +1. All nodes get `"EnableP2P": true` into their config. +1. All relays additionally get `"P2PBootstrap": true` to their netgoal config. + +## Build + +```sh +export GOPATH=~/go +make +``` + +## Run + +Run as usual cluster test scenario with algonet. diff --git a/test/testdata/deployednettemplates/recipes/scenario1s-p2p/copy-node-configs.py b/test/testdata/deployednettemplates/recipes/scenario1s-p2p/copy-node-configs.py new file mode 100644 index 0000000000..6ffbc01d8d --- /dev/null +++ b/test/testdata/deployednettemplates/recipes/scenario1s-p2p/copy-node-configs.py @@ -0,0 +1,55 @@ +""" +Copies node.json, relay.json and nonPartNode.json from scenario1s: +1. Append \"EnableP2P\": true to all configs +2. Set P2PBootstrap: true to relay.json +3. Set DNSSecurityFlags: 0 to all configs +""" + +import json +import os + +CURRENT_DIR = os.path.dirname(os.path.realpath(__file__)) +SCENARIO1S_DIR = os.path.join(CURRENT_DIR, "..", "scenario1s") + +def main(): + """main""" + with open(os.path.join(SCENARIO1S_DIR, "node.json"), "r") as f: + node = json.load(f) + with open(os.path.join(SCENARIO1S_DIR, "relay.json"), "r") as f: + relay = json.load(f) + with open(os.path.join(SCENARIO1S_DIR, "nonPartNode.json"), "r") as f: + non_part_node = json.load(f) + + # make all relays P2PBootstrap'able + relay["P2PBootstrap"] = True + + # enable P2P for all configs + for config in (node, relay, non_part_node): + override = config.get("ConfigJSONOverride") + if override: + override_json = json.loads(override) + override_json["EnableP2P"] = True + override_json["DNSSecurityFlags"] = 0x8000 # set to some unused value otherwise 0 would be migrated to default that enables DNSSEC + config["ConfigJSONOverride"] = json.dumps(override_json) + altconfigs = config.get("AltConfigs", []) + if altconfigs: + for i, altconfig in enumerate(altconfigs): + override = altconfig.get("ConfigJSONOverride") + if override: + override_json = json.loads(override) + override_json["EnableP2P"] = True + override_json["DNSSecurityFlags"] = 0x8000 # set to some unused value otherwise 0 would be migrated to default that enables DNSSEC + altconfigs[i]["ConfigJSONOverride"] = json.dumps(override_json) + config["AltConfigs"] = altconfigs + + with open("node.json", "w") as f: + json.dump(node, f, indent=4) + with open("relay.json", "w") as f: + json.dump(relay, f, indent=4) + with open("nonPartNode.json", "w") as f: + json.dump(non_part_node, f, indent=4) + + print("Done!") + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/test/testdata/deployednettemplates/recipes/scenario1s-p2p/recipe.json b/test/testdata/deployednettemplates/recipes/scenario1s-p2p/recipe.json new file mode 100644 index 0000000000..a2f88f63b4 --- /dev/null +++ b/test/testdata/deployednettemplates/recipes/scenario1s-p2p/recipe.json @@ -0,0 +1,7 @@ +{ + "GenesisFile":"genesis.json", + "NetworkFile":"net.json", + "ConfigFile": "../../configs/reference.json", + "HostTemplatesFile": "../../hosttemplates/hosttemplates.json", + "TopologyFile": "topology.json" +} diff --git a/test/testdata/deployednettemplates/recipes/scenario1s/Makefile b/test/testdata/deployednettemplates/recipes/scenario1s/Makefile index ed8a70132e..8b83c38b6c 100644 --- a/test/testdata/deployednettemplates/recipes/scenario1s/Makefile +++ b/test/testdata/deployednettemplates/recipes/scenario1s/Makefile @@ -1,14 +1,14 @@ # scenario1s is scenario1 but smaller, (100 nodes, 100 wallets) -> (20 nodes, 20 wallets), each algod gets single tenancy on a smaller ec2 instance PARAMS=-w 20 -R 8 -N 20 -n 20 --npn-algod-nodes 10 --node-template node.json --relay-template relay.json --non-participating-node-template nonPartNode.json -all: net.json genesis.json topology.json bootstrappedFile.json +all: net.json genesis.json topology.json net.json: node.json nonPartNode.json ${GOPATH}/bin/netgoal Makefile netgoal generate -t net -r /tmp/wat -o net.json ${PARAMS} genesis.json: ${GOPATH}/bin/netgoal Makefile netgoal generate -t genesis -r /tmp/wat -o genesis.l.json ${PARAMS} - jq '.LastPartKeyRound=22000|.NetworkName="s1s"|.ConsensusProtocol="future"' < genesis.l.json > genesis.json + jq '.LastPartKeyRound=5000|.NetworkName="s1s"|.ConsensusProtocol="future"' < genesis.l.json > genesis.json rm genesis.l.json topology.json: gen_topology.py diff --git a/util/metrics/opencensus.go b/util/metrics/opencensus.go index 6d38436e6a..fefb1d054b 100644 --- a/util/metrics/opencensus.go +++ b/util/metrics/opencensus.go @@ -77,7 +77,8 @@ type statCounter struct { // WriteMetric outputs Prometheus metrics for all labels/values in statCounter func (st *statCounter) WriteMetric(buf *strings.Builder, parentLabels string) { - counter := makeCounter(MetricName{st.name, st.description}) + name := sanitizePrometheusName(st.name) + counter := makeCounter(MetricName{name, st.description}) for i := 0; i < len(st.labels); i++ { counter.AddUint64(uint64(st.values[i]), st.labels[i]) } @@ -103,7 +104,8 @@ type statDistribution struct { // WriteMetric outputs Prometheus metrics for all labels/values in statCounter func (st *statDistribution) WriteMetric(buf *strings.Builder, parentLabels string) { - gauge := makeGauge(MetricName{st.name, st.description}) + name := sanitizePrometheusName(st.name) + gauge := makeGauge(MetricName{name, st.description}) for i := 0; i < len(st.labels); i++ { gauge.SetLabels(uint64(st.values[i]), st.labels[i]) } diff --git a/util/metrics/opencensus_test.go b/util/metrics/opencensus_test.go index 7a20d75334..b1f5ff102a 100644 --- a/util/metrics/opencensus_test.go +++ b/util/metrics/opencensus_test.go @@ -57,6 +57,7 @@ func TestDHTOpenCensusMetrics(t *testing.T) { err := view.Register(receivedBytesView, sentMessagesView) require.NoError(t, err) + defer view.Unregister(receivedBytesView, sentMessagesView) ctx := context.Background() tags1 := []tag.Mutator{ @@ -132,4 +133,15 @@ func TestDHTOpenCensusMetrics(t *testing.T) { } } } + + // ensure the exported gatherer works + reg := MakeRegistry() + reg.Register(&OpencensusDefaultMetrics) + defer reg.Deregister(&OpencensusDefaultMetrics) + + var buf strings.Builder + reg.WriteMetrics(&buf, "") + + require.Contains(t, buf.String(), "my_sent_messages") + require.Contains(t, buf.String(), "my_received_bytes") } diff --git a/util/metrics/prometheus.go b/util/metrics/prometheus.go index edce668b93..b55f931001 100644 --- a/util/metrics/prometheus.go +++ b/util/metrics/prometheus.go @@ -33,7 +33,7 @@ type defaultPrometheusGatherer struct { // WriteMetric return prometheus converted to algorand format. // Supports only counter and gauge types and ignores go_ metrics. func (pg *defaultPrometheusGatherer) WriteMetric(buf *strings.Builder, parentLabels string) { - metrics := collectOpenCensusMetrics(pg.names) + metrics := collectPrometheusMetrics(pg.names) for _, metric := range metrics { metric.WriteMetric(buf, parentLabels) } diff --git a/util/metrics/prometheus_test.go b/util/metrics/prometheus_test.go index 18313ee220..75ef94f97e 100644 --- a/util/metrics/prometheus_test.go +++ b/util/metrics/prometheus_test.go @@ -74,6 +74,11 @@ func TestPrometheusMetrics(t *testing.T) { prometheus.DefaultRegisterer.MustRegister(counterLabels) prometheus.DefaultRegisterer.MustRegister(counter) + defer prometheus.DefaultRegisterer.Unregister(gaugeLabels) + defer prometheus.DefaultRegisterer.Unregister(gauge) + defer prometheus.DefaultRegisterer.Unregister(counterLabels) + defer prometheus.DefaultRegisterer.Unregister(counter) + // set some values tags := []string{"outbound", "protocol", "/test/proto"} gaugeLabels.WithLabelValues(tags...).Set(float64(1)) @@ -127,4 +132,17 @@ func TestPrometheusMetrics(t *testing.T) { m.AddMetric(values) require.Len(t, values, 1) } + + // ensure the exported gatherer works + reg := MakeRegistry() + reg.Register(&PrometheusDefaultMetrics) + defer reg.Deregister(&PrometheusDefaultMetrics) + + var buf strings.Builder + reg.WriteMetrics(&buf, "") + + require.Contains(t, buf.String(), metricNamespace+"_streams") + require.Contains(t, buf.String(), metricNamespace+"_protocols_count") + require.Contains(t, buf.String(), metricNamespace+"_identify_total") + require.Contains(t, buf.String(), metricNamespace+"_counter_total") }