Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ledger/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (mt *metricsTracker) close() {

func (mt *metricsTracker) newBlock(blk bookkeeping.Block, delta ledgercore.StateDelta) {
rnd := blk.Round()
mt.ledgerRound.Set(float64(rnd), map[string]string{})
mt.ledgerRound.Set(float64(rnd))
mt.ledgerTransactionsTotal.Add(float64(len(blk.Payset)), map[string]string{})
// TODO rewards: need to provide meaningful metric here.
mt.ledgerRewardClaimsTotal.Add(float64(1), map[string]string{})
Expand Down
18 changes: 9 additions & 9 deletions network/wsNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -1162,8 +1162,8 @@ func (wn *WebsocketNetwork) ServeHTTP(response http.ResponseWriter, request *htt

wn.maybeSendMessagesOfInterest(peer, nil)

peers.Set(float64(wn.NumPeers()), nil)
incomingPeers.Set(float64(wn.numIncomingPeers()), nil)
peers.Set(float64(wn.NumPeers()))
incomingPeers.Set(float64(wn.numIncomingPeers()))
}

func (wn *WebsocketNetwork) maybeSendMessagesOfInterest(peer *wsPeer, messagesOfInterestEnc []byte) {
Expand Down Expand Up @@ -2103,8 +2103,8 @@ func (wn *WebsocketNetwork) tryConnect(addr, gossipAddr string) {

wn.maybeSendMessagesOfInterest(peer, nil)

peers.Set(float64(wn.NumPeers()), nil)
outgoingPeers.Set(float64(wn.numOutgoingPeers()), nil)
peers.Set(float64(wn.NumPeers()))
outgoingPeers.Set(float64(wn.numOutgoingPeers()))

if wn.prioScheme != nil {
challenge := response.Header.Get(PriorityChallengeHeader)
Expand Down Expand Up @@ -2217,9 +2217,9 @@ func (wn *WebsocketNetwork) removePeer(peer *wsPeer, reason disconnectReason) {
Reason: string(reason),
})

peers.Set(float64(wn.NumPeers()), nil)
incomingPeers.Set(float64(wn.numIncomingPeers()), nil)
outgoingPeers.Set(float64(wn.numOutgoingPeers()), nil)
peers.Set(float64(wn.NumPeers()))
incomingPeers.Set(float64(wn.numIncomingPeers()))
outgoingPeers.Set(float64(wn.numOutgoingPeers()))

wn.peersLock.Lock()
defer wn.peersLock.Unlock()
Expand Down Expand Up @@ -2284,8 +2284,8 @@ func (wn *WebsocketNetwork) countPeersSetGauges() {
numIn++
}
}
networkIncomingConnections.Set(float64(numIn), nil)
networkOutgoingConnections.Set(float64(numOut), nil)
networkIncomingConnections.Set(float64(numIn))
networkOutgoingConnections.Set(float64(numOut))
}

func justHost(hostPort string) string {
Expand Down
4 changes: 2 additions & 2 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -984,7 +984,7 @@ func insertStateProofToRegistry(part account.PersistedParticipation, node *Algor

}

var txPoolGuage = metrics.MakeGauge(metrics.MetricName{Name: "algod_tx_pool_count", Description: "current number of available transactions in pool"})
var txPoolGauge = metrics.MakeGauge(metrics.MetricName{Name: "algod_tx_pool_count", Description: "current number of available transactions in pool"})

func (node *AlgorandFullNode) txPoolGaugeThread(done <-chan struct{}) {
defer node.monitoringRoutinesWaitGroup.Done()
Expand All @@ -993,7 +993,7 @@ func (node *AlgorandFullNode) txPoolGaugeThread(done <-chan struct{}) {
for true {
select {
case <-ticker.C:
txPoolGuage.Set(float64(node.transactionPool.PendingCount()), nil)
txPoolGauge.Set(float64(node.transactionPool.PendingCount()))
case <-done:
return
}
Expand Down
122 changes: 17 additions & 105 deletions util/metrics/gauge.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package metrics

import (
"math"
"strconv"
"strings"

Expand All @@ -27,25 +26,16 @@ import (
// Gauge represent a single gauge variable.
type Gauge struct {
deadlock.Mutex
name string
description string
labels map[string]int // map each label ( i.e. httpErrorCode ) to an index.
valuesIndices map[int]*gaugeValues // maps each set of labels into a concrete gauge
}

type gaugeValues struct {
gauge float64
labels map[string]string
formattedLabels string
name string
description string
value float64
}

// MakeGauge create a new gauge with the provided name and description.
func MakeGauge(metric MetricName) *Gauge {
c := &Gauge{
description: metric.Description,
name: metric.Name,
labels: make(map[string]int),
valuesIndices: make(map[int]*gaugeValues),
description: metric.Description,
name: metric.Name,
}
c.Register(nil)
return c
Expand All @@ -70,123 +60,45 @@ func (gauge *Gauge) Deregister(reg *Registry) {
}

// Add increases gauge by x
func (gauge *Gauge) Add(x float64, labels map[string]string) {
func (gauge *Gauge) Add(x float64) {
gauge.Lock()
defer gauge.Unlock()

labelIndex := gauge.findLabelIndex(labels)

// find where we have the same labels.
if gaugeObj, has := gauge.valuesIndices[labelIndex]; !has {
// we need to add a new gauge.
val := &gaugeValues{
gauge: x,
labels: labels,
}
val.createFormattedLabel()
gauge.valuesIndices[labelIndex] = val
} else {
// update existing value.
gaugeObj.gauge += x
}
gauge.value += x
}

// Set sets gauge to x
func (gauge *Gauge) Set(x float64, labels map[string]string) {
func (gauge *Gauge) Set(x float64) {
gauge.Lock()
defer gauge.Unlock()

labelIndex := gauge.findLabelIndex(labels)

// find where we have the same labels.
if gaugeObj, has := gauge.valuesIndices[labelIndex]; !has {
// we need to add a new gauge.
val := &gaugeValues{
gauge: x,
labels: labels,
}
val.createFormattedLabel()
gauge.valuesIndices[labelIndex] = val
} else {
// update existing value.
gaugeObj.gauge = x
}
}

func (gauge *Gauge) findLabelIndex(labels map[string]string) int {
accumulatedIndex := 0
for k, v := range labels {
t := k + ":" + v
// do we already have this key ( label ) in our map ?
if i, has := gauge.labels[t]; has {
// yes, we do. use this index.
accumulatedIndex += i
} else {
// no, we don't have it.
gauge.labels[t] = int(math.Exp2(float64(len(gauge.labels))))
accumulatedIndex += gauge.labels[t]
}
}
return accumulatedIndex
}

func (cv *gaugeValues) createFormattedLabel() {
var buf strings.Builder
if len(cv.labels) < 1 {
return
}
for k, v := range cv.labels {
buf.WriteString("," + k + "=\"" + v + "\"")
}

cv.formattedLabels = buf.String()[1:]
gauge.value = x
}

// WriteMetric writes the metric into the output stream
func (gauge *Gauge) WriteMetric(buf *strings.Builder, parentLabels string) {
gauge.Lock()
defer gauge.Unlock()

if len(gauge.valuesIndices) < 1 {
return
}
buf.WriteString("# HELP ")
buf.WriteString(gauge.name)
buf.WriteString(" ")
buf.WriteString(gauge.description)
buf.WriteString("\n# TYPE ")
buf.WriteString(gauge.name)
buf.WriteString(" gauge\n")
for _, l := range gauge.valuesIndices {
buf.WriteString(gauge.name)
buf.WriteString("{")
if len(parentLabels) > 0 {
buf.WriteString(parentLabels)
if len(l.formattedLabels) > 0 {
buf.WriteString(",")
}
}
buf.WriteString(l.formattedLabels)
buf.WriteString("} ")
buf.WriteString(strconv.FormatFloat(l.gauge, 'f', -1, 32))
buf.WriteString("\n")
buf.WriteString(gauge.name)
buf.WriteString("{")
if len(parentLabels) > 0 {
buf.WriteString(parentLabels)
}
buf.WriteString("} ")
buf.WriteString(strconv.FormatFloat(gauge.value, 'f', -1, 32))
buf.WriteString("\n")
}

// AddMetric adds the metric into the map
func (gauge *Gauge) AddMetric(values map[string]float64) {
gauge.Lock()
defer gauge.Unlock()

if len(gauge.valuesIndices) < 1 {
return
}

for _, l := range gauge.valuesIndices {
var suffix string
if len(l.formattedLabels) > 0 {
suffix = ":" + l.formattedLabels
}
values[sanitizeTelemetryName(gauge.name+suffix)] = l.gauge
}
values[sanitizeTelemetryName(gauge.name)] = gauge.value
}
41 changes: 19 additions & 22 deletions util/metrics/gauge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,13 @@ func TestMetricGauge(t *testing.T) {
"session_id": "AFX-229"},
})
metricService.Start(context.Background())

gauge := MakeGauge(MetricName{Name: "metric_test_name1", Description: "this is the metric test for counter object"})

for i := 0; i < 20; i++ {
gauge.Set(float64(i*10), map[string]string{"pid": "123", "data_host": fmt.Sprintf("host%d", i%5)})
gauges := make([]*Gauge, 3)
for i := 0; i < 3; i++ {
gauges[i] = MakeGauge(MetricName{Name: fmt.Sprintf("gauge_%d", i), Description: "this is the metric test for gauge object"})
}
for i := 0; i < 9; i++ {
gauges[i%3].Set(float64(i * 100))
gauges[i%3].Add(float64(i))
// wait half-a cycle
time.Sleep(test.sampleRate / 2)
}
Expand All @@ -60,32 +62,27 @@ func TestMetricGauge(t *testing.T) {
time.Sleep(test.sampleRate * 2)

metricService.Shutdown()
gauge.Deregister(nil)
for _, gauge := range gauges {
gauge.Deregister(nil)
}
// test the metrics values.

test.Lock()
defer test.Unlock()

// the the loop above we've created a single metric name with five different labels set ( host0, host1 .. host 4)
// let's see if we received all the 5 different labels.
require.Equal(t, 5, len(test.metrics), "Missing metric counts were reported: %+v", test.metrics)
// the the loop above we've created 3 separate gauges
// let's see if we received all 3 metrics
require.Equal(t, 3, len(test.metrics), "Missing metric counts were reported: %+v", test.metrics)

// iterate through the metrics and check the each of the metrics reached it's correct count.
for k, v := range test.metrics {
if strings.Contains(k, "host0") {
require.Equal(t, "150", v, fmt.Sprintf("The metric '%s' reached value '%s'", k, v))
}
if strings.Contains(k, "host1") {
require.Equal(t, "160", v, fmt.Sprintf("The metric '%s' reached value '%s'", k, v))
}
if strings.Contains(k, "host2") {
require.Equal(t, "170", v, fmt.Sprintf("The metric '%s' reached value '%s'", k, v))
if strings.Contains(k, "gauge_0") {
require.Equal(t, "606", v, fmt.Sprintf("The metric '%s' reached value '%s'", k, v))
}
if strings.Contains(k, "host3") {
require.Equal(t, "180", v, fmt.Sprintf("The metric '%s' reached value '%s'", k, v))
if strings.Contains(k, "gauge_1") {
require.Equal(t, "707", v, fmt.Sprintf("The metric '%s' reached value '%s'", k, v))
}
if strings.Contains(k, "host4") {
require.Equal(t, "190", v, fmt.Sprintf("The metric '%s' reached value '%s'", k, v))
if strings.Contains(k, "gauge_2") {
require.Equal(t, "808", v, fmt.Sprintf("The metric '%s' reached value '%s'", k, v))
}
}
}