Skip to content

Commit

Permalink
[refactor] Minor clean-up of jaegertracing#2657 (jaegertracing#2728)
Browse files Browse the repository at this point in the history
* [refactor] Minor clean-up of jaegertracing#2657

Signed-off-by: Yuri Shkuro <[email protected]>

* add test

Signed-off-by: Yuri Shkuro <[email protected]>
  • Loading branch information
yurishkuro authored and bhiravabhatla committed Jan 25, 2021
1 parent 00b7daa commit b980e87
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 81 deletions.
36 changes: 24 additions & 12 deletions cmd/agent/app/reporter/connect_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@
package reporter

import (
"expvar"

"github.com/uber/jaeger-lib/metrics"
"go.uber.org/zap"
)

type connectMetrics struct {
Expand All @@ -29,26 +30,37 @@ type connectMetrics struct {

// ConnectMetrics include connectMetrics necessary params if want to modify metrics of connectMetrics, must via ConnectMetrics API
type ConnectMetrics struct {
Logger *zap.Logger // required
MetricsFactory metrics.Factory // required
connectMetrics *connectMetrics
metrics connectMetrics
target *expvar.String
}

// NewConnectMetrics will be initialize ConnectMetrics
func (r *ConnectMetrics) NewConnectMetrics() {
r.connectMetrics = new(connectMetrics)
r.MetricsFactory = r.MetricsFactory.Namespace(metrics.NSOptions{Name: "connection_status"})
metrics.MustInit(r.connectMetrics, r.MetricsFactory, nil)
func NewConnectMetrics(mf metrics.Factory) *ConnectMetrics {
cm := &ConnectMetrics{}
metrics.MustInit(&cm.metrics, mf.Namespace(metrics.NSOptions{Name: "connection_status"}), nil)

if r := expvar.Get("gRPCTarget"); r == nil {
cm.target = expvar.NewString("gRPCTarget")
} else {
cm.target = r.(*expvar.String)
}

return cm
}

// OnConnectionStatusChange used for pass the status parameter when connection is changed
// 0 is disconnected, 1 is connected
// For quick view status via use `sum(jaeger_agent_connection_status_collector_connected{}) by (instance) > bool 0`
func (r *ConnectMetrics) OnConnectionStatusChange(connected bool) {
func (cm *ConnectMetrics) OnConnectionStatusChange(connected bool) {
if connected {
r.connectMetrics.Status.Update(1)
r.connectMetrics.Reconnects.Inc(1)
cm.metrics.Status.Update(1)
cm.metrics.Reconnects.Inc(1)
} else {
r.connectMetrics.Status.Update(0)
cm.metrics.Status.Update(0)
}
}

// RecordTarget writes the current connection target to an expvar field.
func (cm *ConnectMetrics) RecordTarget(target string) {
cm.target.Set(target)
}
73 changes: 26 additions & 47 deletions cmd/agent/app/reporter/connect_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,67 +15,46 @@
package reporter

import (
"expvar"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/uber/jaeger-lib/metrics/metricstest"
)

type connectMetricsTest struct {
mf *metricstest.Factory
}

func testConnectMetrics(fn func(tr *connectMetricsTest, r *ConnectMetrics)) {
testConnectMetricsWithParams(&ConnectMetrics{}, fn)
}

func testConnectMetricsWithParams(cm *ConnectMetrics, fn func(tr *connectMetricsTest, r *ConnectMetrics)) {
func TestConnectMetrics(t *testing.T) {
mf := metricstest.NewFactory(time.Hour)
cm.MetricsFactory = mf
cm.NewConnectMetrics()
cm := NewConnectMetrics(mf)

tr := &connectMetricsTest{
mf: mf,
getGauge := func() map[string]int64 {
_, gauges := mf.Snapshot()
return gauges
}

fn(tr, cm)
}

func testCollectorConnected(r *ConnectMetrics) {
r.OnConnectionStatusChange(true)
}

func testCollectorAborted(r *ConnectMetrics) {
r.OnConnectionStatusChange(false)
}

func TestConnectMetrics(t *testing.T) {

testConnectMetrics(func(tr *connectMetricsTest, r *ConnectMetrics) {
getGauge := func() map[string]int64 {
_, gauges := tr.mf.Snapshot()
return gauges
}
getCount := func() map[string]int64 {
counts, _ := mf.Snapshot()
return counts
}

getCount := func() map[string]int64 {
counts, _ := tr.mf.Snapshot()
return counts
}
// no connection
cm.OnConnectionStatusChange(false)
assert.EqualValues(t, 0, getGauge()["connection_status.collector_connected"])

// testing connect aborted
testCollectorAborted(r)
assert.EqualValues(t, 0, getGauge()["connection_status.collector_connected"])
// first connection
cm.OnConnectionStatusChange(true)
assert.EqualValues(t, 1, getGauge()["connection_status.collector_connected"])
assert.EqualValues(t, 1, getCount()["connection_status.collector_reconnects"])

// testing connect connected
testCollectorConnected(r)
assert.EqualValues(t, 1, getGauge()["connection_status.collector_connected"])
assert.EqualValues(t, 1, getCount()["connection_status.collector_reconnects"])
// reconnect
cm.OnConnectionStatusChange(false)
cm.OnConnectionStatusChange(true)
assert.EqualValues(t, 2, getCount()["connection_status.collector_reconnects"])

// testing reconnect counts
testCollectorAborted(r)
testCollectorConnected(r)
assert.EqualValues(t, 2, getCount()["connection_status.collector_reconnects"])
cm.RecordTarget("collector-host")
assert.Equal(t, `"collector-host"`, expvar.Get("gRPCTarget").String())

})
// since expvars are singletons, the second constructor should grab the same var
cm2 := NewConnectMetrics(mf)
assert.Same(t, cm.target, cm2.target)
}
28 changes: 6 additions & 22 deletions cmd/agent/app/reporter/grpc/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,10 @@ package grpc
import (
"context"
"errors"
"expvar"
"fmt"
"strings"

"github.com/grpc-ecosystem/go-grpc-middleware/retry"
grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
"github.com/uber/jaeger-lib/metrics"
"go.uber.org/zap"
"google.golang.org/grpc"
Expand All @@ -47,9 +46,6 @@ type ConnBuilder struct {
DiscoveryMinPeers int
Notifier discovery.Notifier
Discoverer discovery.Discoverer

// for unit test and provide ConnectMetrics and outside call
ConnectMetrics *reporter.ConnectMetrics
}

// NewConnBuilder creates a new grpc connection builder.
Expand Down Expand Up @@ -104,38 +100,26 @@ func (b *ConnBuilder) CreateConnection(logger *zap.Logger, mFactory metrics.Fact
return nil, err
}

if b.ConnectMetrics == nil {
cm := reporter.ConnectMetrics{
Logger: logger,
MetricsFactory: mFactory.Namespace(metrics.NSOptions{Name: "", Tags: map[string]string{"protocol": "grpc"}}),
}
cm.NewConnectMetrics()
b.ConnectMetrics = &cm
}
connectMetrics := reporter.NewConnectMetrics(
mFactory.Namespace(metrics.NSOptions{Tags: map[string]string{"protocol": "grpc"}}),
)

go func(cc *grpc.ClientConn, cm *reporter.ConnectMetrics) {
logger.Info("Checking connection to collector")
var egt *expvar.String
r := expvar.Get("gRPCTarget")
if r == nil {
egt = expvar.NewString("gRPCTarget")
} else {
egt = r.(*expvar.String)
}

for {
s := cc.GetState()
if s == connectivity.Ready {
cm.OnConnectionStatusChange(true)
egt.Set(cc.Target())
cm.RecordTarget(cc.Target())
} else {
cm.OnConnectionStatusChange(false)
}

logger.Info("Agent collector connection state change", zap.String("dialTarget", dialTarget), zap.Stringer("status", s))
cc.WaitForStateChange(context.Background(), s)
}
}(conn, b.ConnectMetrics)
}(conn, connectMetrics)

return conn, nil
}

0 comments on commit b980e87

Please sign in to comment.