Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,5 +205,5 @@ func buildMultiClient(cfg StoreConfig, codec codec.Codec, reg prometheus.Registe
{client: secondary, name: cfg.Multi.Secondary},
}

return NewMultiClient(cfg.Multi, clients, logger), nil
return NewMultiClient(cfg.Multi, clients, logger, reg), nil
}
28 changes: 15 additions & 13 deletions kv/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func Test_createClient_singleBackend_mustContainRoleAndTypeLabels(t *testing.T)
return
}))

actual := typeToRoleMap(t, reg)
actual := typeToRoleMapHistogramLabels(t, reg)
require.Len(t, actual, 1)
require.Equal(t, "primary", actual["mock"])
}
Expand All @@ -88,7 +88,7 @@ func Test_createClient_multiBackend_mustContainRoleAndTypeLabels(t *testing.T) {
return
}))

actual := typeToRoleMap(t, reg)
actual := typeToRoleMapHistogramLabels(t, reg)
// expected multi-primary, inmemory-primary and mock-secondary
require.Len(t, actual, 3)
require.Equal(t, "primary", actual["multi"])
Expand All @@ -97,24 +97,26 @@ func Test_createClient_multiBackend_mustContainRoleAndTypeLabels(t *testing.T) {

}

func typeToRoleMap(t *testing.T, reg prometheus.Gatherer) map[string]string {
func typeToRoleMapHistogramLabels(t *testing.T, reg prometheus.Gatherer) map[string]string {
mfs, err := reg.Gather()
require.NoError(t, err)
result := map[string]string{}
for _, mf := range mfs {
for _, m := range mf.GetMetric() {
backendType := ""
role := ""
for _, l := range m.GetLabel() {
if l.GetName() == "role" {
role = l.GetValue()
} else if l.GetName() == "type" {
backendType = l.GetValue()
if m.GetHistogram() != nil {
backendType := ""
role := ""
for _, l := range m.GetLabel() {
if l.GetName() == "role" {
role = l.GetValue()
} else if l.GetName() == "type" {
backendType = l.GetValue()
}
}
require.NotEmpty(t, backendType)
require.NotEmpty(t, role)
result[backendType] = role
}
require.NotEmpty(t, backendType)
require.NotEmpty(t, role)
result[backendType] = role
}
}
return result
Expand Down
1 change: 1 addition & 0 deletions kv/consul/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/weaveworks/common/instrument"
)

// Needs a with call
var consulRequestDuration = instrument.NewHistogramCollector(prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: "consul_request_duration_seconds",
Help: "Time spent on consul requests.",
Expand Down
8 changes: 6 additions & 2 deletions kv/memberlist/kv_init_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/go-kit/kit/log"
"github.com/hashicorp/memberlist"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/atomic"

"github.com/grafana/dskit/services"
Expand All @@ -28,6 +29,7 @@ type KVInitService struct {
cfg *KVConfig
logger log.Logger
dnsProvider DNSProvider
registerer prometheus.Registerer

// init function, to avoid multiple initializations.
init sync.Once
Expand All @@ -38,11 +40,13 @@ type KVInitService struct {
watcher *services.FailureWatcher
}

func NewKVInitService(cfg *KVConfig, logger log.Logger, dnsProvider DNSProvider) *KVInitService {
func NewKVInitService(cfg *KVConfig, logger log.Logger, registerer prometheus.Registerer, dnsProvider DNSProvider) *KVInitService {
kvinit := &KVInitService{

cfg: cfg,
watcher: services.NewFailureWatcher(),
logger: logger,
registerer: registerer,
dnsProvider: dnsProvider,
}
kvinit.Service = services.NewBasicService(nil, kvinit.running, kvinit.stopping).WithName("memberlist KV service")
Expand All @@ -52,7 +56,7 @@ func NewKVInitService(cfg *KVConfig, logger log.Logger, dnsProvider DNSProvider)
// GetMemberlistKV will initialize Memberlist.KV on first call, and add it to service failure watcher.
func (kvs *KVInitService) GetMemberlistKV() (*KV, error) {
kvs.init.Do(func() {
kv := NewKV(*kvs.cfg, kvs.logger, kvs.dnsProvider)
kv := NewKV(*kvs.cfg, kvs.logger, kvs.registerer, kvs.dnsProvider)
kvs.watcher.WatchService(kv)
kvs.err = kv.StartAsync(context.Background())

Expand Down
3 changes: 2 additions & 1 deletion kv/memberlist/kv_init_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/hashicorp/memberlist"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"

"github.com/grafana/dskit/flagext"
Expand Down Expand Up @@ -56,6 +57,6 @@ func TestPage(t *testing.T) {
func TestStop(t *testing.T) {
var cfg KVConfig
flagext.DefaultValues(&cfg)
kvinit := NewKVInitService(&cfg, nil, &dnsProviderMock{})
kvinit := NewKVInitService(&cfg, nil, prometheus.NewRegistry(), &dnsProviderMock{})
require.NoError(t, kvinit.stopping(nil))
}
19 changes: 11 additions & 8 deletions kv/memberlist/memberlist_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,9 @@ func generateRandomSuffix(logger log.Logger) string {
type KV struct {
services.Service

cfg KVConfig
logger log.Logger
cfg KVConfig
logger log.Logger
registerer prometheus.Registerer

// dns discovery provider
provider DNSProvider
Expand Down Expand Up @@ -326,14 +327,16 @@ var (
// gossiping part. Only after service is in Running state, it is really gossiping. Starting the service will also
// trigger connecting to the existing memberlist cluster. If that fails and AbortIfJoinFails is true, error is returned
// and service enters Failed state.
func NewKV(cfg KVConfig, logger log.Logger, dnsProvider DNSProvider) *KV {
func NewKV(cfg KVConfig, logger log.Logger, registerer prometheus.Registerer, dnsProvider DNSProvider) *KV {
cfg.TCPTransport.MetricsRegisterer = cfg.MetricsRegisterer
cfg.TCPTransport.MetricsNamespace = cfg.MetricsNamespace

mlkv := &KV{
cfg: cfg,
logger: logger,
provider: dnsProvider,
cfg: cfg,
logger: logger,
registerer: registerer,
provider: dnsProvider,

store: make(map[string]valueDesc),
codecs: make(map[string]codec.Codec),
watchers: make(map[string][]chan string),
Expand All @@ -342,7 +345,7 @@ func NewKV(cfg KVConfig, logger log.Logger, dnsProvider DNSProvider) *KV {
maxCasRetries: maxCasRetries,
}

mlkv.createAndRegisterMetrics()
mlkv.createAndRegisterMetrics(mlkv.registerer)

for _, c := range cfg.Codecs {
mlkv.codecs[c.CodecID()] = c
Expand All @@ -357,7 +360,7 @@ func defaultMemberlistConfig() *memberlist.Config {
}

func (m *KV) buildMemberlistConfig() (*memberlist.Config, error) {
tr, err := NewTCPTransport(m.cfg.TCPTransport, m.logger)
tr, err := NewTCPTransport(m.cfg.TCPTransport, m.logger, m.registerer)
if err != nil {
return nil, fmt.Errorf("failed to create transport: %v", err)
}
Expand Down
29 changes: 15 additions & 14 deletions kv/memberlist/memberlist_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@ import (
"math"
"math/rand"
"net"
reflect "reflect"
"reflect"
"sort"
"sync"
"testing"
"time"

"github.com/go-kit/kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -246,7 +247,7 @@ func TestBasicGetAndCas(t *testing.T) {
}
cfg.Codecs = []codec.Codec{c}

mkv := NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{})
mkv := NewKV(cfg, log.NewNopLogger(), prometheus.NewRegistry(), &dnsProviderMock{})
require.NoError(t, services.StartAndAwaitRunning(context.Background(), mkv))
defer services.StopAndAwaitTerminated(context.Background(), mkv) //nolint:errcheck

Expand Down Expand Up @@ -302,7 +303,7 @@ func withFixtures(t *testing.T, testFN func(t *testing.T, kv *Client)) {
cfg.TCPTransport = TCPTransportConfig{}
cfg.Codecs = []codec.Codec{c}

mkv := NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{})
mkv := NewKV(cfg, log.NewNopLogger(), prometheus.NewPedanticRegistry(), &dnsProviderMock{})
require.NoError(t, services.StartAndAwaitRunning(context.Background(), mkv))
defer services.StopAndAwaitTerminated(context.Background(), mkv) //nolint:errcheck

Expand Down Expand Up @@ -446,7 +447,7 @@ func TestMultipleCAS(t *testing.T) {
flagext.DefaultValues(&cfg)
cfg.Codecs = []codec.Codec{c}

mkv := NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{})
mkv := NewKV(cfg, log.NewNopLogger(), prometheus.NewPedanticRegistry(), &dnsProviderMock{})
mkv.maxCasRetries = 20
require.NoError(t, services.StartAndAwaitRunning(context.Background(), mkv))
defer services.StopAndAwaitTerminated(context.Background(), mkv) //nolint:errcheck
Expand Down Expand Up @@ -548,7 +549,7 @@ func TestMultipleClients(t *testing.T) {

cfg.Codecs = []codec.Codec{c}

mkv := NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{})
mkv := NewKV(cfg, log.NewNopLogger(), prometheus.NewPedanticRegistry(), &dnsProviderMock{})
require.NoError(t, services.StartAndAwaitRunning(context.Background(), mkv))

kv, err := NewClient(mkv, c)
Expand Down Expand Up @@ -701,7 +702,7 @@ func TestJoinMembersWithRetryBackoff(t *testing.T) {
time.Sleep(1 * time.Second)
}

mkv := NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{}) // Not started yet.
mkv := NewKV(cfg, log.NewNopLogger(), prometheus.NewPedanticRegistry(), &dnsProviderMock{}) // Not started yet.
watcher.WatchService(mkv)

kv, err := NewClient(mkv, c)
Expand Down Expand Up @@ -775,7 +776,7 @@ func TestMemberlistFailsToJoin(t *testing.T) {

cfg.Codecs = []codec.Codec{c}

mkv := NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{})
mkv := NewKV(cfg, log.NewNopLogger(), prometheus.NewPedanticRegistry(), &dnsProviderMock{})
require.NoError(t, services.StartAndAwaitRunning(context.Background(), mkv))

ctxTimeout, cancel := context.WithTimeout(context.Background(), 5*time.Second)
Expand Down Expand Up @@ -946,7 +947,7 @@ func TestMultipleCodecs(t *testing.T) {
distributedCounterCodec{},
}

mkv1 := NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{})
mkv1 := NewKV(cfg, log.NewNopLogger(), prometheus.NewPedanticRegistry(), &dnsProviderMock{})
require.NoError(t, services.StartAndAwaitRunning(context.Background(), mkv1))
defer services.StopAndAwaitTerminated(context.Background(), mkv1) //nolint:errcheck

Expand Down Expand Up @@ -989,7 +990,7 @@ func TestMultipleCodecs(t *testing.T) {
require.NoError(t, err)

// We will read values from second KV, which will join the first one
mkv2 := NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{})
mkv2 := NewKV(cfg, log.NewNopLogger(), prometheus.NewPedanticRegistry(), &dnsProviderMock{})
require.NoError(t, services.StartAndAwaitRunning(context.Background(), mkv2))
defer services.StopAndAwaitTerminated(context.Background(), mkv2) //nolint:errcheck

Expand Down Expand Up @@ -1041,11 +1042,11 @@ func TestRejoin(t *testing.T) {
cfg2.JoinMembers = []string{fmt.Sprintf("localhost:%d", ports[0])}
cfg2.RejoinInterval = 1 * time.Second

mkv1 := NewKV(cfg1, log.NewNopLogger(), &dnsProviderMock{})
mkv1 := NewKV(cfg1, log.NewNopLogger(), prometheus.NewPedanticRegistry(), &dnsProviderMock{})
require.NoError(t, services.StartAndAwaitRunning(context.Background(), mkv1))
defer services.StopAndAwaitTerminated(context.Background(), mkv1) //nolint:errcheck

mkv2 := NewKV(cfg2, log.NewNopLogger(), &dnsProviderMock{})
mkv2 := NewKV(cfg2, log.NewNopLogger(), prometheus.NewPedanticRegistry(), &dnsProviderMock{})
require.NoError(t, services.StartAndAwaitRunning(context.Background(), mkv2))
defer services.StopAndAwaitTerminated(context.Background(), mkv2) //nolint:errcheck

Expand All @@ -1062,7 +1063,7 @@ func TestRejoin(t *testing.T) {
poll(t, 5*time.Second, 1, membersFunc)

// Let's start first KV again. It is not configured to join the cluster, but KV2 is rejoining.
mkv1 = NewKV(cfg1, log.NewNopLogger(), &dnsProviderMock{})
mkv1 = NewKV(cfg1, log.NewNopLogger(), prometheus.NewPedanticRegistry(), &dnsProviderMock{})
require.NoError(t, services.StartAndAwaitRunning(context.Background(), mkv1))
defer services.StopAndAwaitTerminated(context.Background(), mkv1) //nolint:errcheck

Expand Down Expand Up @@ -1094,7 +1095,7 @@ func TestNotifyMsgResendsOnlyChanges(t *testing.T) {
cfg.RetransmitMult = 1
cfg.Codecs = append(cfg.Codecs, codec)

kv := NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{})
kv := NewKV(cfg, log.NewNopLogger(), prometheus.NewPedanticRegistry(), &dnsProviderMock{})
require.NoError(t, services.StartAndAwaitRunning(context.Background(), kv))
defer services.StopAndAwaitTerminated(context.Background(), kv) //nolint:errcheck

Expand Down Expand Up @@ -1157,7 +1158,7 @@ func TestSendingOldTombstoneShouldNotForwardMessage(t *testing.T) {
cfg.LeftIngestersTimeout = 5 * time.Minute
cfg.Codecs = append(cfg.Codecs, codec)

kv := NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{})
kv := NewKV(cfg, log.NewNopLogger(), prometheus.NewPedanticRegistry(), &dnsProviderMock{})
require.NoError(t, services.StartAndAwaitRunning(context.Background(), kv))
defer services.StopAndAwaitTerminated(context.Background(), kv) //nolint:errcheck

Expand Down
Loading