Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func createClient(backend string, prefix string, cfg StoreConfig, codec codec.Co

switch backend {
case "consul":
client, err = consul.NewClient(cfg.Consul, codec, logger)
client, err = consul.NewClient(cfg.Consul, codec, logger, reg)

case "etcd":
client, err = etcd.New(cfg.Etcd, codec, logger)
Expand All @@ -138,7 +138,7 @@ func createClient(backend string, prefix string, cfg StoreConfig, codec codec.Co
// If we use the in-memory store, make sure everyone gets the same instance
// within the same process.
inmemoryStoreInit.Do(func() {
inmemoryStore, _ = consul.NewInMemoryClient(codec, logger)
inmemoryStore, _ = consul.NewInMemoryClient(codec, logger, reg)
})
client = inmemoryStore

Expand Down Expand Up @@ -205,5 +205,5 @@ func buildMultiClient(cfg StoreConfig, codec codec.Codec, reg prometheus.Registe
{client: secondary, name: cfg.Multi.Secondary},
}

return NewMultiClient(cfg.Multi, clients, logger), nil
return NewMultiClient(cfg.Multi, clients, logger, reg), nil
}
38 changes: 20 additions & 18 deletions kv/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,15 @@ multi:
func Test_createClient_multiBackend_withSingleRing(t *testing.T) {
storeCfg, testCodec := newConfigsForTest()
require.NotPanics(t, func() {
_, err := createClient("multi", "/collector", storeCfg, testCodec, Primary, prometheus.NewRegistry(), testLogger{})
_, err := createClient("multi", "/collector", storeCfg, testCodec, Primary, prometheus.NewPedanticRegistry(), testLogger{})
require.NoError(t, err)
})
}

func Test_createClient_multiBackend_withMultiRing(t *testing.T) {
storeCfg1, testCodec := newConfigsForTest()
storeCfg2 := StoreConfig{}
reg := prometheus.NewRegistry()
reg := prometheus.NewPedanticRegistry()

require.NotPanics(t, func() {
_, err := createClient("multi", "/test", storeCfg1, testCodec, Primary, reg, testLogger{})
Expand All @@ -61,7 +61,7 @@ func Test_createClient_multiBackend_withMultiRing(t *testing.T) {

func Test_createClient_singleBackend_mustContainRoleAndTypeLabels(t *testing.T) {
storeCfg, testCodec := newConfigsForTest()
reg := prometheus.NewRegistry()
reg := prometheus.NewPedanticRegistry()
client, err := createClient("mock", "/test1", storeCfg, testCodec, Primary, reg, testLogger{})
require.NoError(t, err)
require.NoError(t, client.CAS(context.Background(), "/test", func(_ interface{}) (out interface{}, retry bool, err error) {
Expand All @@ -70,7 +70,7 @@ func Test_createClient_singleBackend_mustContainRoleAndTypeLabels(t *testing.T)
return
}))

actual := typeToRoleMap(t, reg)
actual := typeToRoleMapHistogramLabels(t, reg, "kv_request_duration_seconds")
require.Len(t, actual, 1)
require.Equal(t, "primary", actual["mock"])
}
Expand All @@ -79,7 +79,7 @@ func Test_createClient_multiBackend_mustContainRoleAndTypeLabels(t *testing.T) {
storeCfg, testCodec := newConfigsForTest()
storeCfg.Multi.MirrorEnabled = true
storeCfg.Multi.MirrorTimeout = 10 * time.Second
reg := prometheus.NewRegistry()
reg := prometheus.NewPedanticRegistry()
client, err := createClient("multi", "/test1", storeCfg, testCodec, Primary, reg, testLogger{})
require.NoError(t, err)
require.NoError(t, client.CAS(context.Background(), "/test", func(_ interface{}) (out interface{}, retry bool, err error) {
Expand All @@ -88,7 +88,7 @@ func Test_createClient_multiBackend_mustContainRoleAndTypeLabels(t *testing.T) {
return
}))

actual := typeToRoleMap(t, reg)
actual := typeToRoleMapHistogramLabels(t, reg, "kv_request_duration_seconds")
// expected multi-primary, inmemory-primary and mock-secondary
require.Len(t, actual, 3)
require.Equal(t, "primary", actual["multi"])
Expand All @@ -97,24 +97,26 @@ func Test_createClient_multiBackend_mustContainRoleAndTypeLabels(t *testing.T) {

}

func typeToRoleMap(t *testing.T, reg prometheus.Gatherer) map[string]string {
func typeToRoleMapHistogramLabels(t *testing.T, reg prometheus.Gatherer, histogramWithRoleLabels string) map[string]string {
mfs, err := reg.Gather()
require.NoError(t, err)
result := map[string]string{}
for _, mf := range mfs {
for _, m := range mf.GetMetric() {
backendType := ""
role := ""
for _, l := range m.GetLabel() {
if l.GetName() == "role" {
role = l.GetValue()
} else if l.GetName() == "type" {
backendType = l.GetValue()
if mf.GetName() == histogramWithRoleLabels {
for _, m := range mf.GetMetric() {
backendType := ""
role := ""
for _, l := range m.GetLabel() {
if l.GetName() == "role" {
role = l.GetValue()
} else if l.GetName() == "type" {
backendType = l.GetValue()
}
}
require.NotEmpty(t, backendType)
require.NotEmpty(t, role)
result[backendType] = role
}
require.NotEmpty(t, backendType)
require.NotEmpty(t, role)
result[backendType] = role
}
}
return result
Expand Down
25 changes: 15 additions & 10 deletions kv/consul/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/go-kit/kit/log/level"
consul "github.com/hashicorp/consul/api"
"github.com/hashicorp/go-cleanhttp"
"github.com/prometheus/client_golang/prometheus"
"github.com/weaveworks/common/instrument"
"golang.org/x/time/rate"

Expand Down Expand Up @@ -61,9 +62,10 @@ type kv interface {
// Client is a KV.Client for Consul.
type Client struct {
kv
codec codec.Codec
cfg Config
logger log.Logger
codec codec.Codec
cfg Config
logger log.Logger
consulMetrics *consulMetrics
}

// RegisterFlags adds the flags required to config this to the given FlagSet
Expand All @@ -78,7 +80,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet, prefix string) {
}

// NewClient returns a new Client.
func NewClient(cfg Config, codec codec.Codec, logger log.Logger) (*Client, error) {
func NewClient(cfg Config, codec codec.Codec, logger log.Logger, registerer prometheus.Registerer) (*Client, error) {
client, err := consul.NewClient(&consul.Config{
Address: cfg.Host,
Token: cfg.ACLToken,
Expand All @@ -92,11 +94,14 @@ func NewClient(cfg Config, codec codec.Codec, logger log.Logger) (*Client, error
if err != nil {
return nil, err
}
consulMetrics := newConsulMetrics(registerer)

c := &Client{
kv: consulMetrics{client.KV()},
codec: codec,
cfg: cfg,
logger: logger,
kv: consulInstrumentation{client.KV(), consulMetrics},
codec: codec,
cfg: cfg,
logger: logger,
consulMetrics: consulMetrics,
}
return c, nil
}
Expand All @@ -108,7 +113,7 @@ func (c *Client) Put(ctx context.Context, key string, value interface{}) error {
return err
}

return instrument.CollectedRequest(ctx, "Put", consulRequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
return instrument.CollectedRequest(ctx, "Put", c.consulMetrics.consulRequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
_, err := c.kv.Put(&consul.KVPair{
Key: key,
Value: bytes,
Expand All @@ -120,7 +125,7 @@ func (c *Client) Put(ctx context.Context, key string, value interface{}) error {
// CAS atomically modifies a value in a callback.
// If value doesn't exist you'll get nil as an argument to your callback.
func (c *Client) CAS(ctx context.Context, key string, f func(in interface{}) (out interface{}, retry bool, err error)) error {
return instrument.CollectedRequest(ctx, "CAS loop", consulRequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
return instrument.CollectedRequest(ctx, "CAS loop", c.consulMetrics.consulRequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
return c.cas(ctx, key, f)
})
}
Expand Down
9 changes: 5 additions & 4 deletions kv/consul/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

consul "github.com/hashicorp/consul/api"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand All @@ -33,7 +34,7 @@ func TestWatchKeyWithRateLimit(t *testing.T) {
c, closer := NewInMemoryClientWithConfig(codec.String{}, Config{
WatchKeyRateLimit: 5.0,
WatchKeyBurstSize: 1,
}, testLogger{})
}, testLogger{}, prometheus.NewPedanticRegistry())
t.Cleanup(func() {
assert.NoError(t, closer.Close())
})
Expand Down Expand Up @@ -67,7 +68,7 @@ func TestWatchKeyWithRateLimit(t *testing.T) {
func TestWatchKeyNoRateLimit(t *testing.T) {
c, closer := NewInMemoryClientWithConfig(codec.String{}, Config{
WatchKeyRateLimit: 0,
}, testLogger{})
}, testLogger{}, prometheus.NewPedanticRegistry())
t.Cleanup(func() {
assert.NoError(t, closer.Close())
})
Expand All @@ -89,7 +90,7 @@ func TestWatchKeyNoRateLimit(t *testing.T) {
}

func TestReset(t *testing.T) {
c, closer := NewInMemoryClient(codec.String{}, testLogger{})
c, closer := NewInMemoryClient(codec.String{}, testLogger{}, prometheus.NewPedanticRegistry())
t.Cleanup(func() {
assert.NoError(t, closer.Close())
})
Expand Down Expand Up @@ -148,7 +149,7 @@ func observeValueForSomeTime(t *testing.T, client *Client, key string, timeout t
}

func TestWatchKeyWithNoStartValue(t *testing.T) {
c, closer := NewInMemoryClient(codec.String{}, testLogger{})
c, closer := NewInMemoryClient(codec.String{}, testLogger{}, prometheus.NewPedanticRegistry())
t.Cleanup(func() {
assert.NoError(t, closer.Close())
})
Expand Down
44 changes: 25 additions & 19 deletions kv/consul/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,27 +5,33 @@ import (

consul "github.com/hashicorp/consul/api"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/weaveworks/common/instrument"
)

var consulRequestDuration = instrument.NewHistogramCollector(prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: "consul_request_duration_seconds",
Help: "Time spent on consul requests.",
Buckets: prometheus.DefBuckets,
}, []string{"operation", "status_code"}))

func init() {
consulRequestDuration.Register()
type consulInstrumentation struct {
kv kv
consulMetrics *consulMetrics
}

type consulMetrics struct {
kv
consulRequestDuration *instrument.HistogramCollector
}

func newConsulMetrics(registerer prometheus.Registerer) *consulMetrics {
consulRequestDurationCollector := instrument.NewHistogramCollector(promauto.With(registerer).NewHistogramVec(prometheus.HistogramOpts{
Name: "consul_request_duration_seconds",
Help: "Time spent on consul requests.",
Buckets: prometheus.DefBuckets,
}, []string{"operation", "status_code"}))
consulMetrics := consulMetrics{consulRequestDurationCollector}
return &consulMetrics
}

func (c consulMetrics) CAS(p *consul.KVPair, options *consul.WriteOptions) (bool, *consul.WriteMeta, error) {
func (c consulInstrumentation) CAS(p *consul.KVPair, options *consul.WriteOptions) (bool, *consul.WriteMeta, error) {
var ok bool
var result *consul.WriteMeta
err := instrument.CollectedRequest(options.Context(), "CAS", consulRequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
err := instrument.CollectedRequest(options.Context(), "CAS", c.consulMetrics.consulRequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
options = options.WithContext(ctx)
var err error
ok, result, err = c.kv.CAS(p, options)
Expand All @@ -34,10 +40,10 @@ func (c consulMetrics) CAS(p *consul.KVPair, options *consul.WriteOptions) (bool
return ok, result, err
}

func (c consulMetrics) Get(key string, options *consul.QueryOptions) (*consul.KVPair, *consul.QueryMeta, error) {
func (c consulInstrumentation) Get(key string, options *consul.QueryOptions) (*consul.KVPair, *consul.QueryMeta, error) {
var kvp *consul.KVPair
var meta *consul.QueryMeta
err := instrument.CollectedRequest(options.Context(), "Get", consulRequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
err := instrument.CollectedRequest(options.Context(), "Get", c.consulMetrics.consulRequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
options = options.WithContext(ctx)
var err error
kvp, meta, err = c.kv.Get(key, options)
Expand All @@ -46,10 +52,10 @@ func (c consulMetrics) Get(key string, options *consul.QueryOptions) (*consul.KV
return kvp, meta, err
}

func (c consulMetrics) List(path string, options *consul.QueryOptions) (consul.KVPairs, *consul.QueryMeta, error) {
func (c consulInstrumentation) List(path string, options *consul.QueryOptions) (consul.KVPairs, *consul.QueryMeta, error) {
var kvps consul.KVPairs
var meta *consul.QueryMeta
err := instrument.CollectedRequest(options.Context(), "List", consulRequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
err := instrument.CollectedRequest(options.Context(), "List", c.consulMetrics.consulRequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
options = options.WithContext(ctx)
var err error
kvps, meta, err = c.kv.List(path, options)
Expand All @@ -58,9 +64,9 @@ func (c consulMetrics) List(path string, options *consul.QueryOptions) (consul.K
return kvps, meta, err
}

func (c consulMetrics) Delete(key string, options *consul.WriteOptions) (*consul.WriteMeta, error) {
func (c consulInstrumentation) Delete(key string, options *consul.WriteOptions) (*consul.WriteMeta, error) {
var meta *consul.WriteMeta
err := instrument.CollectedRequest(options.Context(), "Delete", consulRequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
err := instrument.CollectedRequest(options.Context(), "Delete", c.consulMetrics.consulRequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
options = options.WithContext(ctx)
var err error
meta, err = c.kv.Delete(key, options)
Expand All @@ -69,9 +75,9 @@ func (c consulMetrics) Delete(key string, options *consul.WriteOptions) (*consul
return meta, err
}

func (c consulMetrics) Put(p *consul.KVPair, options *consul.WriteOptions) (*consul.WriteMeta, error) {
func (c consulInstrumentation) Put(p *consul.KVPair, options *consul.WriteOptions) (*consul.WriteMeta, error) {
var result *consul.WriteMeta
err := instrument.CollectedRequest(options.Context(), "Put", consulRequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
err := instrument.CollectedRequest(options.Context(), "Put", c.consulMetrics.consulRequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
options = options.WithContext(ctx)
var err error
result, err = c.kv.Put(p, options)
Expand Down
17 changes: 10 additions & 7 deletions kv/consul/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
consul "github.com/hashicorp/consul/api"
"github.com/prometheus/client_golang/prometheus"

"github.com/grafana/dskit/closer"
"github.com/grafana/dskit/kv/codec"
Expand All @@ -28,12 +29,13 @@ type mockKV struct {
}

// NewInMemoryClient makes a new mock consul client.
func NewInMemoryClient(codec codec.Codec, logger log.Logger) (*Client, io.Closer) {
return NewInMemoryClientWithConfig(codec, Config{}, logger)
func NewInMemoryClient(codec codec.Codec, logger log.Logger, registerer prometheus.Registerer) (*Client, io.Closer) {
return NewInMemoryClientWithConfig(codec, Config{}, logger, registerer)
}

// NewInMemoryClientWithConfig makes a new mock consul client with supplied Config.
func NewInMemoryClientWithConfig(codec codec.Codec, cfg Config, logger log.Logger) (*Client, io.Closer) {
func NewInMemoryClientWithConfig(codec codec.Codec, cfg Config, logger log.Logger, registerer prometheus.Registerer) (*Client, io.Closer) {

m := mockKV{
kvps: map[string]*consul.KVPair{},
// Always start from 1, we NEVER want to report back index 0 in the responses.
Expand All @@ -58,10 +60,11 @@ func NewInMemoryClientWithConfig(codec codec.Codec, cfg Config, logger log.Logge
go m.loop()

return &Client{
kv: &m,
codec: codec,
cfg: cfg,
logger: logger,
kv: &m,
codec: codec,
cfg: cfg,
logger: logger,
consulMetrics: newConsulMetrics(registerer),
}, closer
}

Expand Down
2 changes: 1 addition & 1 deletion kv/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func withFixtures(t *testing.T, f func(*testing.T, Client)) {
factory func() (Client, io.Closer, error)
}{
{"consul", func() (Client, io.Closer, error) {
client, closer := consul.NewInMemoryClient(codec.String{}, testLogger{})
client, closer := consul.NewInMemoryClient(codec.String{}, testLogger{}, nil)
return client, closer, nil
}},
{"etcd", func() (Client, io.Closer, error) {
Expand Down
2 changes: 1 addition & 1 deletion kv/kvtls/test/tls_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func newIntegrationClientServer(
) {
// server registers some metrics to default registry
savedRegistry := prometheus.DefaultRegisterer
prometheus.DefaultRegisterer = prometheus.NewRegistry()
prometheus.DefaultRegisterer = prometheus.NewPedanticRegistry()
defer func() {
prometheus.DefaultRegisterer = savedRegistry
}()
Expand Down
Loading