diff --git a/dot/metrics/collector.go b/dot/metrics/collector.go deleted file mode 100644 index 87887e0505..0000000000 --- a/dot/metrics/collector.go +++ /dev/null @@ -1,130 +0,0 @@ -// Copyright 2021 ChainSafe Systems (ON) -// SPDX-License-Identifier: LGPL-3.0-only - -package metrics - -import ( - "context" - "runtime" - "sync" - "time" - - ethmetrics "github.com/ethereum/go-ethereum/metrics" -) - -// GaugeMetrics interface allows the exportation of many gauge metrics -// the implementer could exports -type GaugeMetrics interface { - CollectGauge() map[string]int64 -} - -// Collector struct controls the metrics and executes polling to extract the values -type Collector struct { - ctx context.Context - gauges []GaugeMetrics - wg sync.WaitGroup -} - -// NewCollector creates a new Collector -func NewCollector(ctx context.Context) *Collector { - return &Collector{ - ctx: ctx, - wg: sync.WaitGroup{}, - gauges: make([]GaugeMetrics, 0), - } -} - -// Start will start one goroutine to collect all the gauges registered and -// a separate goroutine to collect process metrics -func (c *Collector) Start() { - ethmetrics.Enabled = true - c.wg.Add(2) - - go c.startCollectProccessMetrics() - go c.startCollectGauges() - - c.wg.Wait() -} - -// AddGauge adds a GaugeMetrics implementer on gauges list -func (c *Collector) AddGauge(g GaugeMetrics) { - c.gauges = append(c.gauges, g) -} - -func (c *Collector) startCollectGauges() { - //TODO: Should we better add individual RefreshInterval for each `GaugeMetrics`or `label inside the gauges map`? - t := time.NewTicker(RefreshInterval) - defer func() { - t.Stop() - c.wg.Done() - }() - - for { - select { - case <-c.ctx.Done(): - return - case <-t.C: - for _, g := range c.gauges { - m := g.CollectGauge() - - for label, value := range m { - gauge := ethmetrics.GetOrRegisterGauge(label, nil) - gauge.Update(value) - } - } - } - } -} - -func (c *Collector) startCollectProccessMetrics() { - //TODO: Should we better add individual RefreshInterval for each `GaugeMetrics`or `label inside the gauges map`? - cpuStats := make([]*ethmetrics.CPUStats, 2) - memStats := make([]*runtime.MemStats, 2) - for i := 0; i < len(memStats); i++ { - cpuStats[i] = new(ethmetrics.CPUStats) - memStats[i] = new(runtime.MemStats) - } - - // Define the various metrics to collect - var ( - cpuSysLoad = ethmetrics.GetOrRegisterGauge("system/cpu/sysload", ethmetrics.DefaultRegistry) - cpuSysWait = ethmetrics.GetOrRegisterGauge("system/cpu/syswait", ethmetrics.DefaultRegistry) - cpuProcLoad = ethmetrics.GetOrRegisterGauge("system/cpu/procload", ethmetrics.DefaultRegistry) - cpuGoroutines = ethmetrics.GetOrRegisterGauge("system/cpu/goroutines", ethmetrics.DefaultRegistry) - - memPauses = ethmetrics.GetOrRegisterMeter("system/memory/pauses", ethmetrics.DefaultRegistry) - memAlloc = ethmetrics.GetOrRegisterMeter("system/memory/allocs", ethmetrics.DefaultRegistry) - memFrees = ethmetrics.GetOrRegisterMeter("system/memory/frees", ethmetrics.DefaultRegistry) - memHeld = ethmetrics.GetOrRegisterGauge("system/memory/held", ethmetrics.DefaultRegistry) - memUsed = ethmetrics.GetOrRegisterGauge("system/memory/used", ethmetrics.DefaultRegistry) - ) - - t := time.NewTicker(RefreshInterval) - defer func() { - t.Stop() - c.wg.Done() - }() - - for i := 1; ; i++ { - select { - case <-c.ctx.Done(): - return - case <-t.C: - location1 := i % 2 - location2 := (i - 1) % 2 - - ethmetrics.ReadCPUStats(cpuStats[location1]) - cpuSysLoad.Update((cpuStats[location1].GlobalTime - cpuStats[location2].GlobalTime) / refreshFreq) - cpuSysWait.Update((cpuStats[location1].GlobalWait - cpuStats[location2].GlobalWait) / refreshFreq) - cpuProcLoad.Update((cpuStats[location1].LocalTime - cpuStats[location2].LocalTime) / refreshFreq) - cpuGoroutines.Update(int64(runtime.NumGoroutine())) - - runtime.ReadMemStats(memStats[location1]) - memPauses.Mark(int64(memStats[location1].PauseTotalNs - memStats[location2].PauseTotalNs)) - memAlloc.Mark(int64(memStats[location1].Mallocs - memStats[location2].Mallocs)) - memFrees.Mark(int64(memStats[location1].Frees - memStats[location2].Frees)) - memHeld.Update(int64(memStats[location1].HeapSys - memStats[location1].HeapReleased)) - memUsed.Update(int64(memStats[location1].Alloc)) - } - } -} diff --git a/dot/metrics/metrics.go b/dot/metrics/metrics.go deleted file mode 100644 index 84437ff0f0..0000000000 --- a/dot/metrics/metrics.go +++ /dev/null @@ -1,39 +0,0 @@ -// Copyright 2021 ChainSafe Systems (ON) -// SPDX-License-Identifier: LGPL-3.0-only - -package metrics - -import ( - "net/http" - "time" - - "github.com/ChainSafe/gossamer/internal/log" - ethmetrics "github.com/ethereum/go-ethereum/metrics" - "github.com/ethereum/go-ethereum/metrics/prometheus" -) - -var logger log.LeveledLogger = log.NewFromGlobal(log.AddContext("pkg", "metrics")) - -const ( - // RefreshInterval is the refresh time for publishing metrics. - RefreshInterval = time.Second * 10 - refreshFreq = int64(RefreshInterval / time.Second) -) - -// PublishMetrics function will export the /metrics endpoint to prometheus process -func PublishMetrics(address string) { - ethmetrics.Enabled = true - setupMetricsServer(address) -} - -// setupMetricsServer starts a dedicated metrics server at the given address. -func setupMetricsServer(address string) { - m := http.NewServeMux() - m.Handle("/metrics", prometheus.Handler(ethmetrics.DefaultRegistry)) - logger.Info("Starting metrics server at http://" + address + "/metrics") - go func() { - if err := http.ListenAndServe(address, m); err != nil { - logger.Errorf("Metrics HTTP server crashed: %s", err) - } - }() -} diff --git a/dot/network/config.go b/dot/network/config.go index 42011aff4e..4ac1c6412d 100644 --- a/dot/network/config.go +++ b/dot/network/config.go @@ -12,6 +12,7 @@ import ( "github.com/ChainSafe/gossamer/dot/telemetry" "github.com/ChainSafe/gossamer/internal/log" + "github.com/ChainSafe/gossamer/internal/metrics" ) const ( @@ -93,9 +94,6 @@ type Config struct { // privateKey the private key for the network p2p identity privateKey crypto.PrivKey - // PublishMetrics enables collection of network metrics - PublishMetrics bool - // telemetryInterval how often to send telemetry metrics telemetryInterval time.Duration @@ -107,6 +105,7 @@ type Config struct { SlotDuration time.Duration Telemetry telemetry.Client + Metrics metrics.IntervalConfig } // build checks the configuration, sets up the private key for the network service, diff --git a/dot/network/service.go b/dot/network/service.go index 25464f4b02..f9e49c3ffd 100644 --- a/dot/network/service.go +++ b/dot/network/service.go @@ -11,17 +11,17 @@ import ( "sync" "time" - "github.com/ethereum/go-ethereum/metrics" - libp2pnetwork "github.com/libp2p/go-libp2p-core/network" - "github.com/libp2p/go-libp2p-core/peer" - "github.com/libp2p/go-libp2p-core/protocol" - - gssmrmetrics "github.com/ChainSafe/gossamer/dot/metrics" "github.com/ChainSafe/gossamer/dot/peerset" "github.com/ChainSafe/gossamer/dot/telemetry" "github.com/ChainSafe/gossamer/internal/log" + "github.com/ChainSafe/gossamer/internal/metrics" "github.com/ChainSafe/gossamer/lib/common" "github.com/ChainSafe/gossamer/lib/services" + libp2pnetwork "github.com/libp2p/go-libp2p-core/network" + "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-core/protocol" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" ) const ( @@ -35,14 +35,58 @@ const ( transactionsID = "/transactions/1" maxMessageSize = 1024 * 63 // 63kb for now - - gssmrIsMajorSyncMetric = "gossamer/network/is_major_syncing" ) var ( _ services.Service = &Service{} logger = log.NewFromGlobal(log.AddContext("pkg", "network")) maxReads = 256 + + peerCountGauge = promauto.NewGauge(prometheus.GaugeOpts{ + Namespace: "gossamer_network_node", + Name: "peer_count_total", + Help: "total peer count", + }) + connectionsGauge = promauto.NewGauge(prometheus.GaugeOpts{ + Namespace: "gossamer_network_node", + Name: "connections_total", + Help: "total number of connections", + }) + nodeLatencyGauge = promauto.NewGauge(prometheus.GaugeOpts{ + Namespace: "gossamer_network_node", + Name: "latency_ms", + Help: "average node latency in milliseconds", + }) + inboundBlockAnnounceStreamsGauge = promauto.NewGauge(prometheus.GaugeOpts{ + Namespace: "gossamer_network_streams_block_announce", + Name: "inbound_total", + Help: "total number of inbound block announce streams", + }) + outboundBlockAnnounceStreamsGauge = promauto.NewGauge(prometheus.GaugeOpts{ + Namespace: "gossamer_network_streams_block_announce", + Name: "outbound_total", + Help: "total number of outbound block announce streams", + }) + inboundGrandpaStreamsGauge = promauto.NewGauge(prometheus.GaugeOpts{ + Namespace: "gossamer_network_streams_grandpa", + Name: "inbound_total", + Help: "total number of inbound grandpa streams", + }) + outboundGrandpaStreamsGauge = promauto.NewGauge(prometheus.GaugeOpts{ + Namespace: "gossamer_network_streams_grandpa", + Name: "outbound_total", + Help: "total number of outbound grandpa streams", + }) + inboundStreamsGauge = promauto.NewGauge(prometheus.GaugeOpts{ + Namespace: "gossamer_network_streams", + Name: "inbound_total", + Help: "total number of inbound streams", + }) + outboundStreamsGauge = promauto.NewGauge(prometheus.GaugeOpts{ + Namespace: "gossamer_network_streams", + Name: "outbound_total", + Help: "total number of outbound streams", + }) ) type ( @@ -83,6 +127,8 @@ type Service struct { noMDNS bool noGossip bool // internal option + Metrics metrics.IntervalConfig + // telemetry telemetryInterval time.Duration closeCh chan struct{} @@ -166,6 +212,7 @@ func NewService(cfg *Config) (*Service, error) { streamManager: newStreamManager(ctx), blockResponseBuf: make([]byte, maxBlockResponseSize), telemetry: cfg.Telemetry, + Metrics: cfg.Metrics, } return network, err @@ -277,8 +324,8 @@ func (s *Service) Start() error { logger.Info("started network service with supported protocols " + strings.Join(s.host.protocols(), ", ")) - if s.cfg.PublishMetrics { - go s.collectNetworkMetrics() + if s.Metrics.Publish { + go s.updateMetrics() } go s.logPeerCount() @@ -289,44 +336,27 @@ func (s *Service) Start() error { return nil } -func (s *Service) collectNetworkMetrics() { +func (s *Service) updateMetrics() { + ticker := time.NewTicker(s.Metrics.Interval) + defer ticker.Stop() for { - peerCount := metrics.GetOrRegisterGauge("network/node/peerCount", metrics.DefaultRegistry) - totalConn := metrics.GetOrRegisterGauge("network/node/totalConnection", metrics.DefaultRegistry) - networkLatency := metrics.GetOrRegisterGauge("network/node/latency", metrics.DefaultRegistry) - syncedBlocks := metrics.GetOrRegisterGauge( - "service/blocks/sync", - metrics.DefaultRegistry) - numInboundBlockAnnounceStreams := metrics.GetOrRegisterGauge( - "network/streams/block_announce/inbound", - metrics.DefaultRegistry) - numOutboundBlockAnnounceStreams := metrics.GetOrRegisterGauge( - "network/streams/block_announce/outbound", - metrics.DefaultRegistry) - numInboundGrandpaStreams := metrics.GetOrRegisterGauge("network/streams/grandpa/inbound", metrics.DefaultRegistry) - numOutboundGrandpaStreams := metrics.GetOrRegisterGauge("network/streams/grandpa/outbound", metrics.DefaultRegistry) - totalInboundStreams := metrics.GetOrRegisterGauge("network/streams/total/inbound", metrics.DefaultRegistry) - totalOutboundStreams := metrics.GetOrRegisterGauge("network/streams/total/outbound", metrics.DefaultRegistry) - - peerCount.Update(int64(s.host.peerCount())) - totalConn.Update(int64(len(s.host.h.Network().Conns()))) - networkLatency.Update(int64(s.host.h.Peerstore().LatencyEWMA(s.host.id()))) - - numInboundBlockAnnounceStreams.Update(s.getNumStreams(BlockAnnounceMsgType, true)) - numOutboundBlockAnnounceStreams.Update(s.getNumStreams(BlockAnnounceMsgType, false)) - numInboundGrandpaStreams.Update(s.getNumStreams(ConsensusMsgType, true)) - numOutboundGrandpaStreams.Update(s.getNumStreams(ConsensusMsgType, false)) - totalInboundStreams.Update(s.getTotalStreams(true)) - totalOutboundStreams.Update(s.getTotalStreams(false)) - - num, err := s.blockState.BestBlockNumber() - if err != nil { - syncedBlocks.Update(0) - } else { - syncedBlocks.Update(num.Int64()) + select { + case <-s.ctx.Done(): + return + case <-ticker.C: + peerCountGauge.Set(float64(s.host.peerCount())) + connectionsGauge.Set(float64(len(s.host.h.Network().Conns()))) + nodeLatencyGauge.Set(float64( + s.host.h.Peerstore().LatencyEWMA(s.host.id()).Milliseconds())) + inboundBlockAnnounceStreamsGauge.Set(float64( + s.getNumStreams(BlockAnnounceMsgType, true))) + outboundBlockAnnounceStreamsGauge.Set(float64( + s.getNumStreams(BlockAnnounceMsgType, false))) + inboundGrandpaStreamsGauge.Set(float64(s.getNumStreams(ConsensusMsgType, true))) + outboundGrandpaStreamsGauge.Set(float64(s.getNumStreams(ConsensusMsgType, false))) + inboundStreamsGauge.Set(float64(s.getTotalStreams(true))) + outboundStreamsGauge.Set(float64(s.getTotalStreams(false))) } - - time.Sleep(gssmrmetrics.RefreshInterval) } } @@ -615,18 +645,6 @@ func (s *Service) NodeRoles() byte { return s.cfg.Roles } -// CollectGauge will be used to collect countable metrics from network service -func (s *Service) CollectGauge() map[string]int64 { - var isSynced int64 - if !s.syncer.IsSynced() { - isSynced = 1 - } - - return map[string]int64{ - gssmrIsMajorSyncMetric: isSynced, - } -} - // HighestBlock returns the highest known block number func (*Service) HighestBlock() int64 { // TODO: refactor this to get the data from the sync service (#1857) diff --git a/dot/network/service_test.go b/dot/network/service_test.go index f7474d3e34..b9774ce194 100644 --- a/dot/network/service_test.go +++ b/dot/network/service_test.go @@ -405,24 +405,3 @@ func TestHandleConn(t *testing.T) { } require.NoError(t, err) } - -func TestSerivceIsMajorSyncMetrics(t *testing.T) { - t.Parallel() - - ctrl := gomock.NewController(t) - mocksyncer := NewMockSyncer(ctrl) - - node := &Service{ - syncer: mocksyncer, - } - - mocksyncer.EXPECT().IsSynced().Return(false) - m := node.CollectGauge() - - require.Equal(t, int64(1), m[gssmrIsMajorSyncMetric]) - - mocksyncer.EXPECT().IsSynced().Return(true) - m = node.CollectGauge() - - require.Equal(t, int64(0), m[gssmrIsMajorSyncMetric]) -} diff --git a/dot/node.go b/dot/node.go index cecf0c7719..8b870314b9 100644 --- a/dot/node.go +++ b/dot/node.go @@ -14,7 +14,6 @@ import ( "syscall" "time" - "github.com/ChainSafe/gossamer/dot/metrics" "github.com/ChainSafe/gossamer/dot/network" "github.com/ChainSafe/gossamer/dot/rpc" "github.com/ChainSafe/gossamer/dot/state" @@ -22,6 +21,7 @@ import ( "github.com/ChainSafe/gossamer/dot/telemetry" "github.com/ChainSafe/gossamer/dot/types" "github.com/ChainSafe/gossamer/internal/log" + "github.com/ChainSafe/gossamer/internal/metrics" "github.com/ChainSafe/gossamer/lib/common" "github.com/ChainSafe/gossamer/lib/genesis" "github.com/ChainSafe/gossamer/lib/keystore" @@ -34,10 +34,11 @@ var logger = log.NewFromGlobal(log.AddContext("pkg", "dot")) // Node is a container for all the components of a node. type Node struct { - Name string - Services *services.ServiceRegistry // registry of all node services - wg sync.WaitGroup - started chan struct{} + Name string + Services *services.ServiceRegistry // registry of all node services + wg sync.WaitGroup + started chan struct{} + metricsServer *metrics.Server } // InitNode initialises a new dot node from the provided dot node configuration @@ -87,6 +88,7 @@ func InitNode(cfg *Config) error { RetainedBlocks: cfg.Global.RetainBlocks, }, Telemetry: telemetryMailer, + Metrics: metrics.NewIntervalConfig(cfg.Global.PublishMetrics), } // create new state service @@ -341,16 +343,13 @@ func NewNode(cfg *Config, ks *keystore.GlobalKeystore) (*Node, error) { } if cfg.Global.PublishMetrics { - c := metrics.NewCollector(context.Background()) - c.AddGauge(fg) - c.AddGauge(stateSrvc) - c.AddGauge(networkSrvc) - - go c.Start() - address := fmt.Sprintf("%s:%d", cfg.RPC.Host, cfg.Global.MetricsPort) logger.Info("Enabling stand-alone metrics HTTP endpoint at address " + address) - metrics.PublishMetrics(address) + node.metricsServer = metrics.NewServer(address) + err := node.metricsServer.Start(address) + if err != nil { + return nil, fmt.Errorf("cannot start metrics server: %w", err) + } } return node, nil @@ -424,6 +423,12 @@ func (n *Node) Stop() { // stop all node services n.Services.StopAll() n.wg.Done() + if n.metricsServer != nil { + err := n.metricsServer.Stop() + if err != nil { + log.Errorf("cannot stop metrics server: %s", err) + } + } } func loadRuntime(cfg *Config, ns *runtime.NodeStorage, diff --git a/dot/peerset/peerset.go b/dot/peerset/peerset.go index ff38daf64a..accdcd4a7e 100644 --- a/dot/peerset/peerset.go +++ b/dot/peerset/peerset.go @@ -255,7 +255,6 @@ func reputationTick(reput Reputation) Reputation { } else if diff == 0 && reput > 0 { diff = 1 } - return reput.sub(diff) } diff --git a/dot/services.go b/dot/services.go index 1afd937f76..a07d32948f 100644 --- a/dot/services.go +++ b/dot/services.go @@ -22,6 +22,7 @@ import ( "github.com/ChainSafe/gossamer/dot/telemetry" "github.com/ChainSafe/gossamer/dot/types" "github.com/ChainSafe/gossamer/internal/log" + "github.com/ChainSafe/gossamer/internal/metrics" "github.com/ChainSafe/gossamer/internal/pprof" "github.com/ChainSafe/gossamer/lib/babe" "github.com/ChainSafe/gossamer/lib/common" @@ -47,6 +48,7 @@ func createStateService(cfg *Config) (*state.Service, error) { config := state.Config{ Path: cfg.Global.BasePath, LogLevel: cfg.Log.StateLvl, + Metrics: metrics.NewIntervalConfig(cfg.Global.PublishMetrics), } stateSrvc := state.NewService(config) @@ -283,13 +285,13 @@ func createNetworkService(cfg *Config, stateSrvc *state.Service, NoMDNS: cfg.Network.NoMDNS, MinPeers: cfg.Network.MinPeers, MaxPeers: cfg.Network.MaxPeers, - PublishMetrics: cfg.Global.PublishMetrics, PersistentPeers: cfg.Network.PersistentPeers, DiscoveryInterval: cfg.Network.DiscoveryInterval, SlotDuration: slotDuration, PublicIP: cfg.Network.PublicIP, Telemetry: telemetryMailer, PublicDNS: cfg.Network.PublicDNS, + Metrics: metrics.NewIntervalConfig(cfg.Global.PublishMetrics), } networkSrvc, err := network.NewService(&networkConfig) diff --git a/dot/state/block.go b/dot/state/block.go index 48aca54bd2..6ad1ec04a8 100644 --- a/dot/state/block.go +++ b/dot/state/block.go @@ -19,6 +19,8 @@ import ( "github.com/ChainSafe/gossamer/lib/common" "github.com/ChainSafe/gossamer/lib/runtime" "github.com/ChainSafe/gossamer/pkg/scale" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" rtstorage "github.com/ChainSafe/gossamer/lib/runtime/storage" "github.com/ChainSafe/gossamer/lib/runtime/wasmer" @@ -39,6 +41,12 @@ var ( justificationPrefix = []byte("jcp") // justificationPrefix + hash -> justification errNilBlockBody = errors.New("block body is nil") + + syncedBlocksGauge = promauto.NewGauge(prometheus.GaugeOpts{ + Namespace: "gossamer_network_syncer", + Name: "blocks_synced_total", + Help: "total number of blocks synced", + }) ) // BlockState contains the historical block data of the blockchain, including block headers and bodies. @@ -480,12 +488,17 @@ func (bs *BlockState) BestBlockHash() common.Hash { // BestBlockHeader returns the block header of the current head of the chain func (bs *BlockState) BestBlockHeader() (*types.Header, error) { - return bs.GetHeader(bs.BestBlockHash()) + header, err := bs.GetHeader(bs.BestBlockHash()) + if err != nil { + return nil, fmt.Errorf("cannot get header of best block: %w", err) + } + syncedBlocksGauge.Set(float64(header.Number.Int64())) + return header, nil } // BestBlockStateRoot returns the state root of the current head of the chain func (bs *BlockState) BestBlockStateRoot() (common.Hash, error) { - header, err := bs.GetHeader(bs.BestBlockHash()) + header, err := bs.BestBlockHeader() if err != nil { return common.Hash{}, err } @@ -506,7 +519,7 @@ func (bs *BlockState) GetBlockStateRoot(bhash common.Hash) ( // BestBlockNumber returns the block number of the current head of the chain func (bs *BlockState) BestBlockNumber() (*big.Int, error) { - header, err := bs.GetHeader(bs.BestBlockHash()) + header, err := bs.BestBlockHeader() if err != nil { return nil, err } diff --git a/dot/state/service.go b/dot/state/service.go index 0babdfa169..48c03f99b7 100644 --- a/dot/state/service.go +++ b/dot/state/service.go @@ -12,6 +12,7 @@ import ( "github.com/ChainSafe/gossamer/dot/telemetry" "github.com/ChainSafe/gossamer/dot/types" "github.com/ChainSafe/gossamer/internal/log" + "github.com/ChainSafe/gossamer/internal/metrics" "github.com/ChainSafe/gossamer/lib/blocktree" "github.com/ChainSafe/gossamer/lib/trie" "github.com/ChainSafe/gossamer/lib/utils" @@ -19,12 +20,6 @@ import ( "github.com/ChainSafe/chaindb" ) -const ( - readyPoolTransactionsMetrics = "gossamer/ready/pool/transaction/metrics" - readyPriorityQueueTransactions = "gossamer/ready/queue/transaction/metrics" - substrateNumberLeaves = "gossamer/substrate_number_leaves/metrics" -) - var logger = log.NewFromGlobal( log.AddContext("pkg", "state"), ) @@ -43,14 +38,12 @@ type Service struct { Grandpa *GrandpaState closeCh chan interface{} + PrunerCfg pruner.Config + Telemetry telemetry.Client + // Below are for testing only. BabeThresholdNumerator uint64 BabeThresholdDenominator uint64 - - // Below are for state trie online pruner - PrunerCfg pruner.Config - - Telemetry telemetry.Client } // Config is the default configuration used by state service. @@ -59,6 +52,7 @@ type Config struct { LogLevel log.Level PrunerCfg pruner.Config Telemetry telemetry.Client + Metrics metrics.IntervalConfig } // NewService create a new instance of Service @@ -175,6 +169,7 @@ func (s *Service) Start() error { // Start background goroutine to GC pruned keys. go s.Storage.pruneStorage(s.closeCh) + return nil } @@ -357,12 +352,3 @@ func (s *Service) Import(header *types.Header, t *trie.Trie, firstSlot uint64) e return s.db.Close() } - -// CollectGauge exports 2 metrics related to valid transaction pool and queue -func (s *Service) CollectGauge() map[string]int64 { - return map[string]int64{ - readyPoolTransactionsMetrics: int64(s.Transaction.pool.Len()), - readyPriorityQueueTransactions: int64(s.Transaction.queue.Len()), - substrateNumberLeaves: int64(len(s.Block.Leaves())), - } -} diff --git a/dot/state/service_test.go b/dot/state/service_test.go index 11652ff8a6..d65605813c 100644 --- a/dot/state/service_test.go +++ b/dot/state/service_test.go @@ -4,25 +4,21 @@ package state import ( - "context" "fmt" "math/big" "testing" "time" - "github.com/ChainSafe/gossamer/dot/metrics" "github.com/ChainSafe/gossamer/dot/state/pruner" "github.com/ChainSafe/gossamer/dot/telemetry" "github.com/ChainSafe/gossamer/dot/types" "github.com/ChainSafe/gossamer/internal/log" "github.com/ChainSafe/gossamer/lib/common" "github.com/ChainSafe/gossamer/lib/genesis" - "github.com/ChainSafe/gossamer/lib/transaction" "github.com/ChainSafe/gossamer/lib/trie" "github.com/golang/mock/gomock" "github.com/ChainSafe/chaindb" - ethmetrics "github.com/ethereum/go-ethereum/metrics" "github.com/stretchr/testify/require" ) @@ -422,57 +418,3 @@ func TestService_Import(t *testing.T) { err = serv.Stop() require.NoError(t, err) } - -func TestStateServiceMetrics(t *testing.T) { - ctrl := gomock.NewController(t) - telemetryMock := NewMockClient(ctrl) - telemetryMock.EXPECT().SendMessage(gomock.Any()).AnyTimes() - - config := Config{ - Path: t.TempDir(), - LogLevel: log.Info, - Telemetry: telemetryMock, - } - - ethmetrics.Enabled = true - serv := NewService(config) - serv.Transaction = NewTransactionState(telemetryMock) - serv.Block = newTestBlockState(t, testGenesisHeader) - - m := metrics.NewCollector(context.Background()) - m.AddGauge(serv) - go m.Start() - - vtxs := []*transaction.ValidTransaction{ - { - Extrinsic: []byte("a"), - Validity: &transaction.Validity{Priority: 1}, - }, - { - Extrinsic: []byte("b"), - Validity: &transaction.Validity{Priority: 4}, - }, - } - - hashes := make([]common.Hash, len(vtxs)) - for i, v := range vtxs { - h := serv.Transaction.pool.Insert(v) - serv.Transaction.queue.Push(v) - - hashes[i] = h - } - - time.Sleep(time.Second + metrics.RefreshInterval) - gpool := ethmetrics.GetOrRegisterGauge(readyPoolTransactionsMetrics, nil) - gqueue := ethmetrics.GetOrRegisterGauge(readyPriorityQueueTransactions, nil) - - require.Equal(t, int64(2), gpool.Value()) - require.Equal(t, int64(2), gqueue.Value()) - - serv.Transaction.pool.Remove(hashes[0]) - serv.Transaction.queue.Pop() - - time.Sleep(time.Second + metrics.RefreshInterval) - require.Equal(t, int64(1), gpool.Value()) - require.Equal(t, int64(1), gqueue.Value()) -} diff --git a/dot/sync/chain_sync.go b/dot/sync/chain_sync.go index 70cf52b0da..bdecd68c90 100644 --- a/dot/sync/chain_sync.go +++ b/dot/sync/chain_sync.go @@ -15,6 +15,8 @@ import ( "github.com/ChainSafe/chaindb" "github.com/libp2p/go-libp2p-core/peer" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "github.com/ChainSafe/gossamer/dot/network" "github.com/ChainSafe/gossamer/dot/peerset" @@ -48,7 +50,14 @@ func (s chainSyncState) String() string { } } -var pendingBlocksLimit = maxResponseSize * 32 +var ( + pendingBlocksLimit = maxResponseSize * 32 + isSyncedGauge = promauto.NewGauge(prometheus.GaugeOpts{ + Namespace: "gossamer_network_syncer", + Name: "is_synced", + Help: "bool representing whether the node is synced to the head of the chain", + }) +) // peerState tracks our peers's best reported blocks type peerState struct { @@ -194,6 +203,8 @@ func (cs *chainSync) start() { time.Sleep(time.Millisecond * 100) } + isSyncedGauge.Set(float64(cs.state)) + pendingBlockDoneCh := make(chan struct{}) cs.pendingBlockDoneCh = pendingBlockDoneCh go cs.pendingBlocks.run(pendingBlockDoneCh) @@ -527,6 +538,7 @@ func (cs *chainSync) setMode(mode chainSyncState) { } cs.state = mode + isSyncedGauge.Set(float64(cs.state)) logger.Debugf("switched sync mode to %d", mode) } diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go new file mode 100644 index 0000000000..ac766a3772 --- /dev/null +++ b/internal/metrics/metrics.go @@ -0,0 +1,98 @@ +// Copyright 2022 ChainSafe Systems (ON) +// SPDX-License-Identifier: LGPL-3.0-only + +package metrics + +import ( + "context" + "fmt" + "net/http" + "time" + + "github.com/ChainSafe/gossamer/internal/httpserver" + "github.com/ChainSafe/gossamer/internal/log" + "github.com/prometheus/client_golang/prometheus/promhttp" +) + +const ( + defaultInterval = 10 * time.Second + defaultTimeoutInterval = 30 * time.Second +) + +var ( + logger log.LeveledLogger = log.NewFromGlobal(log.AddContext("pkg", "metrics")) + ErrServerEndedUnexpectedly = fmt.Errorf("metrics server exited unexpectedly") + ErrServerStopTimeout = fmt.Errorf("metrics server exit timeout") +) + +// IntervalConfig for interval collection +type IntervalConfig struct { + Publish bool + Interval time.Duration +} + +// NewIntervalConfig is constructor for IntervalConfig, and uses default metrics interval +func NewIntervalConfig(publish bool) IntervalConfig { + return IntervalConfig{ + Publish: publish, + Interval: defaultInterval, + } +} + +// Server is a metrics http server +type Server struct { + cancel context.CancelFunc + server *httpserver.Server + done chan error +} + +// NewServer is a constructor for metrics server +func NewServer(address string) (s *Server) { + m := http.NewServeMux() + m.Handle("/metrics", promhttp.Handler()) + return &Server{ + server: httpserver.New("metrics", address, m, logger), + } +} + +// Start will start a dedicated metrics server at the given address. +func (s *Server) Start(address string) (err error) { + logger.Infof("Starting metrics server at http://%s/metrics", address) + + ctx, cancel := context.WithCancel(context.Background()) + s.cancel = cancel + ready := make(chan struct{}) + s.done = make(chan error) + + go s.server.Run(ctx, ready, s.done) + + select { + case <-ready: + return nil + case err := <-s.done: + close(s.done) + if err != nil { + return err + } + return ErrServerEndedUnexpectedly + } +} + +// Stop will stop the metrics server +func (s *Server) Stop() (err error) { + s.cancel() + timeout := time.NewTimer(defaultTimeoutInterval) + select { + case err := <-s.done: + close(s.done) + if !timeout.Stop() { + <-timeout.C + } + if err != nil { + return err + } + return nil + case <-timeout.C: + return ErrServerStopTimeout + } +} diff --git a/internal/metrics/metrics_test.go b/internal/metrics/metrics_test.go new file mode 100644 index 0000000000..8db85cbfab --- /dev/null +++ b/internal/metrics/metrics_test.go @@ -0,0 +1,102 @@ +// Copyright 2022 ChainSafe Systems (ON) +// SPDX-License-Identifier: LGPL-3.0-only + +package metrics + +import ( + "net/http" + "reflect" + "testing" + + "github.com/ChainSafe/gossamer/internal/httpserver" + "github.com/stretchr/testify/assert" +) + +func TestServer_Start(t *testing.T) { + type fields struct { + server *httpserver.Server + } + type args struct { + address string + } + tests := []struct { + name string + fields fields + args args + wantErr bool + }{ + { + name: "happy path", + fields: fields{ + server: httpserver.New("metrics", ":0", http.NewServeMux(), logger), + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s := &Server{ + server: tt.fields.server, + } + if err := s.Start(tt.args.address); (err != nil) != tt.wantErr { + t.Errorf("Server.Start() error = %v, wantErr %v", err, tt.wantErr) + } + err := s.Stop() + if err != nil { + t.Errorf("unexpected error after stopping: %v", err) + } + }) + } +} + +func TestNewServer(t *testing.T) { + type args struct { + address string + } + tests := []struct { + name string + args args + wantS *Server + }{ + { + name: "happy path", + args: args{ + address: "someAddress", + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s := NewServer(tt.args.address) + assert.NotNil(t, s) + }) + } +} + +func TestNewIntervalConfig(t *testing.T) { + type args struct { + publish bool + } + tests := []struct { + name string + args args + want IntervalConfig + }{ + { + name: "happy path", + args: args{ + publish: true, + }, + want: IntervalConfig{ + Publish: true, + Interval: defaultInterval, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := NewIntervalConfig(tt.args.publish); !reflect.DeepEqual(got, tt.want) { + t.Errorf("NewIntervalConfig() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/lib/blocktree/blocktree.go b/lib/blocktree/blocktree.go index f3aaf88da7..1c1450f9bf 100644 --- a/lib/blocktree/blocktree.go +++ b/lib/blocktree/blocktree.go @@ -13,8 +13,16 @@ import ( "github.com/ChainSafe/gossamer/lib/common" "github.com/ChainSafe/gossamer/lib/runtime" "github.com/disiqueira/gotree" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" ) +var leavesGauge = promauto.NewGauge(prometheus.GaugeOpts{ + Namespace: "gossamer_block", + Name: "leaves_total", + Help: "total number of blocktree leaves", +}) + // Hash common.Hash type Hash = common.Hash @@ -86,6 +94,8 @@ func (bt *BlockTree) AddBlock(header *types.Header, arrivalTime time.Time) error parent.addChild(n) bt.leaves.replace(parent, n) + + leavesGauge.Set(float64(len(bt.leaves.nodes()))) return nil } @@ -160,6 +170,7 @@ func (bt *BlockTree) Prune(finalised Hash) (pruned []Hash) { bt.runtime.Delete(hash) } + leavesGauge.Set(float64(len(bt.leaves.nodes()))) return pruned } diff --git a/lib/grandpa/grandpa.go b/lib/grandpa/grandpa.go index 4f80c62323..bc2dcd6b71 100644 --- a/lib/grandpa/grandpa.go +++ b/lib/grandpa/grandpa.go @@ -21,19 +21,23 @@ import ( "github.com/ChainSafe/gossamer/lib/common" "github.com/ChainSafe/gossamer/lib/crypto/ed25519" "github.com/ChainSafe/gossamer/pkg/scale" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" ) const ( - finalityGrandpaRoundMetrics = "gossamer/finality/grandpa/round" - defaultGrandpaInterval = time.Second + defaultGrandpaInterval = time.Second ) var ( logger = log.NewFromGlobal(log.AddContext("pkg", "grandpa")) -) -var ( ErrUnsupportedSubround = errors.New("unsupported subround") + roundGauge = promauto.NewGauge(prometheus.GaugeOpts{ + Namespace: "gossamer_grandpa", + Name: "round", + Help: "current grandpa round", + }) ) // Service represents the current state of the grandpa protocol @@ -199,6 +203,7 @@ func (s *Service) Start() error { }() go s.sendNeighbourMessage() + return nil } @@ -231,16 +236,6 @@ func (s *Service) authorities() []*types.Authority { return ad } -// CollectGauge returns the map between metrics label and value -func (s *Service) CollectGauge() map[string]int64 { - s.roundLock.Lock() - defer s.roundLock.Unlock() - - return map[string]int64{ - finalityGrandpaRoundMetrics: int64(s.state.round), - } -} - // updateAuthorities updates the grandpa voter set, increments the setID, and resets the round numbers func (s *Service) updateAuthorities() error { currSetID, err := s.grandpaState.GetCurrentSetID() @@ -264,6 +259,7 @@ func (s *Service) updateAuthorities() error { // setting to 0 before incrementing indicates // the setID has been increased s.state.round = 0 + roundGauge.Set(float64(s.state.round)) s.sendTelemetryAuthoritySet() @@ -313,6 +309,7 @@ func (s *Service) initiateRound() error { "found block finalised in higher round, updating our round to be %d...", round) s.state.round = round + roundGauge.Set(float64(s.state.round)) err = s.grandpaState.SetLatestRound(round) if err != nil { return err @@ -1310,9 +1307,10 @@ func (s *Service) GetSetID() uint64 { // GetRound return the current round number func (s *Service) GetRound() uint64 { + // Tim: I don't think we need to lock in this case. Reading an int will + // not produce a concurrent read/write panic s.roundLock.Lock() defer s.roundLock.Unlock() - return s.state.round } diff --git a/lib/grandpa/grandpa_test.go b/lib/grandpa/grandpa_test.go index 016a2354ad..6a438bebea 100644 --- a/lib/grandpa/grandpa_test.go +++ b/lib/grandpa/grandpa_test.go @@ -4,7 +4,6 @@ package grandpa import ( - "context" "math/big" "math/rand" "sort" @@ -12,7 +11,6 @@ import ( "testing" "time" - "github.com/ChainSafe/gossamer/dot/metrics" "github.com/ChainSafe/gossamer/dot/state" "github.com/ChainSafe/gossamer/dot/types" "github.com/ChainSafe/gossamer/lib/common" @@ -28,8 +26,6 @@ import ( "github.com/stretchr/testify/require" "github.com/ChainSafe/gossamer/lib/grandpa/mocks" - - ethmetrics "github.com/ethereum/go-ethereum/metrics" ) // testGenesisHeader is a test block header @@ -1261,24 +1257,6 @@ func TestGetGrandpaGHOST_MultipleCandidates(t *testing.T) { require.Equal(t, block, pv) } -func TestFinalRoundGaugeMetric(t *testing.T) { - gs, _ := newTestService(t) - ethmetrics.Enabled = true - - gs.state.round = uint64(180) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - coll := metrics.NewCollector(ctx) - coll.AddGauge(gs) - - go coll.Start() - - time.Sleep(metrics.RefreshInterval + time.Second) - gauge := ethmetrics.GetOrRegisterGauge(finalityGrandpaRoundMetrics, nil) - require.Equal(t, gauge.Value(), int64(180)) -} - func TestGrandpaServiceCreateJustification_ShouldCountEquivocatoryVotes(t *testing.T) { // setup granpda service gs, st := newTestService(t) diff --git a/lib/transaction/pool.go b/lib/transaction/pool.go index de4c44de0e..0e01559e27 100644 --- a/lib/transaction/pool.go +++ b/lib/transaction/pool.go @@ -7,8 +7,16 @@ import ( "sync" "github.com/ChainSafe/gossamer/lib/common" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" ) +var transactionPoolGauge = promauto.NewGauge(prometheus.GaugeOpts{ + Namespace: "gossamer_state_transaction", + Name: "pool_total", + Help: "total number of transactions in ready pool", +}) + // Pool represents the transaction pool type Pool struct { transactions map[common.Hash]*ValidTransaction @@ -43,6 +51,7 @@ func (p *Pool) Insert(tx *ValidTransaction) common.Hash { p.mu.Lock() defer p.mu.Unlock() p.transactions[hash] = tx + transactionPoolGauge.Set(float64(len(p.transactions))) return hash } @@ -51,6 +60,7 @@ func (p *Pool) Remove(hash common.Hash) { p.mu.Lock() defer p.mu.Unlock() delete(p.transactions, hash) + transactionPoolGauge.Set(float64(len(p.transactions))) } // Len return the current length of the pool diff --git a/lib/transaction/priority_queue.go b/lib/transaction/priority_queue.go index 68ac140166..0a4d83dd36 100644 --- a/lib/transaction/priority_queue.go +++ b/lib/transaction/priority_queue.go @@ -10,11 +10,19 @@ import ( "github.com/ChainSafe/gossamer/dot/types" "github.com/ChainSafe/gossamer/lib/common" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" ) // ErrTransactionExists is returned when trying to add a transaction to the queue that already exists var ErrTransactionExists = errors.New("transaction is already in queue") +var transactionQueueGauge = promauto.NewGauge(prometheus.GaugeOpts{ + Namespace: "gossamer_state_transaction", + Name: "queue_total", + Help: "total number of transactions in ready queue", +}) + // An Item is something we manage in a priority queue. type Item struct { data *ValidTransaction @@ -121,6 +129,7 @@ func (spq *PriorityQueue) Push(txn *ValidTransaction) (common.Hash, error) { heap.Push(&spq.pq, item) spq.txs[hash] = item + transactionQueueGauge.Set(float64(spq.pq.Len())) return hash, nil } @@ -135,6 +144,8 @@ func (spq *PriorityQueue) Pop() *ValidTransaction { item := heap.Pop(&spq.pq).(*Item) delete(spq.txs, item.hash) + + transactionQueueGauge.Set(float64(spq.pq.Len())) return item.data } diff --git a/tests/stress/stress_test.go b/tests/stress/stress_test.go index 30554b6e42..58afc5823d 100644 --- a/tests/stress/stress_test.go +++ b/tests/stress/stress_test.go @@ -102,7 +102,7 @@ func TestSync_SingleBlockProducer(t *testing.T) { require.NoError(t, err) nodes = append(nodes, node) - time.Sleep(time.Second * 20) + time.Sleep(time.Second * 30) defer func() { errList := utils.StopNodes(t, nodes)