Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Token gauge metrics implementation. #9239

Merged
merged 13 commits into from
Jun 23, 2020
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ IMPROVEMENTS:
* auth/aws: Add support for Web Identity credentials [[GH-7738](https://github.com/hashicorp/vault/pull/7738)]
* core: Add the Go version used to build a Vault binary to the server message output. [[GH-9078](https://github.com/hashicorp/vault/pull/9078)]
* core: Added Password Policies for user-configurable password generation [[GH-8637](https://github.com/hashicorp/vault/pull/8637)]
* core: New telemetry metrics covering token counts, token creation, KV secret counts, lease creation. [[GH-9239](https://github.com/hashicorp/vault/pull/9239)] [[GH-9250](https://github.com/hashicorp/vault/pull/9250)] [[GH-9244](https://github.com/hashicorp/vault/pull/9244)] [[GH-9052](https://github.com/hashicorp/vault/pull/9052)]
* cli: Support reading TLS parameters from file for the `vault operator raft join` command. [[GH-9060](https://github.com/hashicorp/vault/pull/9060)]
* plugin: Add SDK method, `Sys.ReloadPlugin`, and CLI command, `vault plugin reload`,
for reloading plugins. [[GH-8777](https://github.com/hashicorp/vault/pull/8777)]
Expand Down
4 changes: 2 additions & 2 deletions helper/metricsutil/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ var bucketBoundaries = []struct {
{30 * 24 * time.Hour, "30d"},
}

const overflowBucket = "+Inf"
const OverflowBucket = "+Inf"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of +Inf do we want do +30d?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While writing the RFC, I thought about ">30d" or "30d+". I think the main problem I have is that they are visually very similar to the bare "30d", so in a visualization it would be unclear which bucket was which.


// TTLBucket computes the label to apply for a token TTL.
func TTLBucket(ttl time.Duration) string {
Expand All @@ -31,7 +31,7 @@ func TTLBucket(ttl time.Duration) string {
},
)
if upperBound >= len(bucketBoundaries) {
return overflowBucket
return OverflowBucket
} else {
return bucketBoundaries[upperBound].Label
}
Expand Down
55 changes: 0 additions & 55 deletions vault/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -2087,61 +2087,6 @@ func stopReplicationImpl(c *Core) error {
return nil
}

// emitMetrics is used to periodically expose metrics while running
func (c *Core) emitMetrics(stopCh chan struct{}) {
emitTimer := time.Tick(time.Second)
writeTimer := time.Tick(c.counters.syncInterval)
identityCountTimer := time.Tick(time.Minute * 10)

for {
select {
case <-emitTimer:
c.metricsMutex.Lock()
if c.expiration != nil {
c.expiration.emitMetrics()
}
//Refresh the sealed gauge
if c.Sealed() {
c.metricSink.SetGaugeWithLabels([]string{"core", "unsealed"}, 0, nil)
} else {
c.metricSink.SetGaugeWithLabels([]string{"core", "unsealed"}, 1, nil)
}

c.metricsMutex.Unlock()

case <-writeTimer:
if stopped := grabLockOrStop(c.stateLock.RLock, c.stateLock.RUnlock, stopCh); stopped {
// Go through the loop again, this time the stop channel case
// should trigger
continue
}
if c.perfStandby {
syncCounter(c)
} else {
err := c.saveCurrentRequestCounters(context.Background(), time.Now())
if err != nil {
c.logger.Error("writing request counters to barrier", "err", err)
}
}
c.stateLock.RUnlock()
case <-identityCountTimer:
go func() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
entities, err := c.countActiveEntities(ctx)
if err != nil {
c.logger.Error("error counting identity entities", "err", err)
} else {
metrics.SetGauge([]string{"identity", "num_entities"}, float32(entities.Entities.Total))
}
}()

case <-stopCh:
return
}
}
}

func (c *Core) ReplicationState() consts.ReplicationState {
return consts.ReplicationState(atomic.LoadUint32(c.replicationState))
}
Expand Down
190 changes: 186 additions & 4 deletions vault/core_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,196 @@ package vault

import (
"context"
"errors"
"os"
"strings"
"time"

"github.com/armon/go-metrics"
"github.com/hashicorp/vault/helper/metricsutil"
"github.com/hashicorp/vault/helper/namespace"
"github.com/hashicorp/vault/sdk/logical"
)

// TODO: move emitMetrics into this file.
func (c *Core) metricsLoop(stopCh chan struct{}) {
emitTimer := time.Tick(time.Second)
writeTimer := time.Tick(c.counters.syncInterval)
identityCountTimer := time.Tick(time.Minute * 10)

// This loop covers
// vault.expire.num_leases
// vault.core.unsealed
// vault.identity.num_entities
// and the non-telemetry request counters shown in the UI.
for {
select {
case <-emitTimer:
c.metricsMutex.Lock()
if c.expiration != nil {
c.expiration.emitMetrics()
}
// Refresh the sealed gauge
if c.Sealed() {
c.metricSink.SetGaugeWithLabels([]string{"core", "unsealed"}, 0, nil)
} else {
c.metricSink.SetGaugeWithLabels([]string{"core", "unsealed"}, 1, nil)
}
c.metricsMutex.Unlock()

case <-writeTimer:
if stopped := grabLockOrStop(c.stateLock.RLock, c.stateLock.RUnlock, stopCh); stopped {
// Go through the loop again, this time the stop channel case
// should trigger
continue
}
if c.perfStandby {
syncCounter(c)
} else {
err := c.saveCurrentRequestCounters(context.Background(), time.Now())
if err != nil {
c.logger.Error("writing request counters to barrier", "err", err)
}
}
c.stateLock.RUnlock()
case <-identityCountTimer:
// TODO: this can be replaced by the identity gauge counter; we need to
// sum across all namespaces.
go func() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
entities, err := c.countActiveEntities(ctx)
if err != nil {
c.logger.Error("error counting identity entities", "err", err)
} else {
metrics.SetGauge([]string{"identity", "num_entities"}, float32(entities.Entities.Total))
}
}()

case <-stopCh:
return
}
}
}

// These wrappers are responsible for redirecting to the current instance of
// TokenStore; there is one per method because an additional level of abstraction
// seems confusing.
func (c *Core) tokenGaugeCollector(ctx context.Context) ([]metricsutil.GaugeLabelValues, error) {
// stateLock or authLock protects the tokenStore pointer
c.stateLock.RLock()
ts := c.tokenStore
c.stateLock.RUnlock()
if ts == nil {
return []metricsutil.GaugeLabelValues{}, errors.New("nil token store")
}
return ts.gaugeCollector(ctx)
}

func (c *Core) tokenGaugePolicyCollector(ctx context.Context) ([]metricsutil.GaugeLabelValues, error) {
c.stateLock.RLock()
ts := c.tokenStore
c.stateLock.RUnlock()
if ts == nil {
return []metricsutil.GaugeLabelValues{}, errors.New("nil token store")
}
return ts.gaugeCollectorByPolicy(ctx)
}

func (c *Core) tokenGaugeMethodCollector(ctx context.Context) ([]metricsutil.GaugeLabelValues, error) {
c.stateLock.RLock()
ts := c.tokenStore
c.stateLock.RUnlock()
if ts == nil {
return []metricsutil.GaugeLabelValues{}, errors.New("nil token store")
}
return ts.gaugeCollectorByMethod(ctx)
}

func (c *Core) tokenGaugeTtlCollector(ctx context.Context) ([]metricsutil.GaugeLabelValues, error) {
c.stateLock.RLock()
ts := c.tokenStore
c.stateLock.RUnlock()
if ts == nil {
return []metricsutil.GaugeLabelValues{}, errors.New("nil token store")
}
return ts.gaugeCollectorByTtl(ctx)
}

// emitMetrics is used to start all the periodc metrics; all of them should
// be shut down when stopCh is closed.
func (c *Core) emitMetrics(stopCh chan struct{}) {
// The gauge collection processes are started and stopped here
// because there's more than one TokenManager created during startup,
// but we only want one set of gauges.

metricsInit := []struct {
MetricName []string
MetadataLabel []metrics.Label
CollectorFunc metricsutil.GaugeCollector
DisableEnvVar string
}{
{
[]string{"token", "count"},
[]metrics.Label{{"gauge", "token_by_namespace"}},
c.tokenGaugeCollector,
"",
},
{
[]string{"token", "count", "by_policy"},
[]metrics.Label{{"gauge", "token_by_policy"}},
c.tokenGaugePolicyCollector,
"",
},
{
[]string{"token", "count", "by_auth"},
[]metrics.Label{{"gauge", "token_by_auth"}},
c.tokenGaugeMethodCollector,
"",
},
{
[]string{"token", "count", "by_ttl"},
[]metrics.Label{{"gauge", "token_by_ttl"}},
c.tokenGaugeTtlCollector,
"",
},
{
[]string{"secret", "kv", "count"},
[]metrics.Label{{"gauge", "kv_secrets_by_mountpoint"}},
c.kvSecretGaugeCollector,
"VAULT_DISABLE_KV_GAUGE",
},
}

if c.MetricSink().GaugeInterval == time.Duration(0) {
c.logger.Info("usage gauge collection is disabled")
} else {
for _, init := range metricsInit {
if init.DisableEnvVar != "" {
if os.Getenv(init.DisableEnvVar) != "" {
c.logger.Info("usage gauge collection is disabled for",
"metric", init.MetricName)
continue
}
}

proc, err := c.MetricSink().NewGaugeCollectionProcess(
init.MetricName,
init.MetadataLabel,
init.CollectorFunc,
c.logger,
)
if err != nil {
c.logger.Error("failed to start collector", "metric", init.MetricName, "error", err)
} else {
go proc.Run()
defer proc.Stop()
}
}
}

// When this returns, all the defers set up above will fire.
c.metricsLoop(stopCh)
}

type kvMount struct {
Namespace *namespace.Namespace
Expand Down Expand Up @@ -53,9 +234,9 @@ func (c *Core) kvCollectionErrorCount() {
func (c *Core) walkKvMountSecrets(ctx context.Context, m *kvMount) {
var subdirectories []string
if m.Version == "1" {
subdirectories = []string{m.MountPoint}
subdirectories = []string{m.Namespace.Path + m.MountPoint}
} else {
subdirectories = []string{m.MountPoint + "metadata/"}
subdirectories = []string{m.Namespace.Path + m.MountPoint + "metadata/"}
}

for len(subdirectories) > 0 {
Expand Down Expand Up @@ -115,7 +296,8 @@ func (c *Core) kvSecretGaugeCollector(ctx context.Context) ([]metricsutil.GaugeL
mounts := c.findKvMounts()
results := make([]metricsutil.GaugeLabelValues, len(mounts))

// Context must have root namespace
// Use a root namespace, so include namespace path
// in any queries.
ctx = namespace.RootContext(ctx)

// Route list requests to all the identified mounts.
Expand Down
13 changes: 11 additions & 2 deletions vault/expiration.go
Original file line number Diff line number Diff line change
Expand Up @@ -1901,10 +1901,17 @@ func (m *ExpirationManager) emitMetrics() {
// type (though most likely we would only call this from within the "vault" core package.)
type ExpirationWalkFunction = func(leaseID string, auth *logical.Auth, path string) bool

var (
ErrInRestoreMode = errors.New("expiration manager in restore mode")
)

// WalkTokens extracts the Auth structure from leases corresponding to tokens.
// Returning false from the walk function terminates the iteration.
// TODO: signal if reload hasn't finished yet?
func (m *ExpirationManager) WalkTokens(walkFn ExpirationWalkFunction) {
func (m *ExpirationManager) WalkTokens(walkFn ExpirationWalkFunction) error {
if m.inRestoreMode() {
return ErrInRestoreMode
}

callback := func(key, value interface{}) bool {
p := value.(pendingInfo)
if p.cachedLeaseInfo == nil {
Expand All @@ -1919,6 +1926,8 @@ func (m *ExpirationManager) WalkTokens(walkFn ExpirationWalkFunction) {

m.pending.Range(callback)
m.nonexpiring.Range(callback)

return nil
}

// leaseEntry is used to structure the values the expiration
Expand Down
19 changes: 19 additions & 0 deletions vault/expiration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2092,6 +2092,8 @@ func TestExpiration_WalkTokens(t *testing.T) {
sampleToken(t, exp, "auth/github/login", false, "root"),
}

waitForRestore(t, exp)

for true {
// Count before and after each revocation
t.Logf("Counting %d tokens.", len(tokenEntries))
Expand Down Expand Up @@ -2133,6 +2135,22 @@ func TestExpiration_WalkTokens(t *testing.T) {

}

func waitForRestore(t *testing.T, exp *ExpirationManager) {
t.Helper()

timeout := time.After(200 * time.Millisecond)
ticker := time.Tick(5 * time.Millisecond)

for exp.inRestoreMode() {
select {
case <-timeout:
t.Fatalf("Timeout waiting for expiration manager to recover.")
case <-ticker:
continue
}
}
}

func TestExpiration_CachedPolicyIsShared(t *testing.T) {
exp := mockExpiration(t)

Expand All @@ -2144,6 +2162,7 @@ func TestExpiration_CachedPolicyIsShared(t *testing.T) {

var policies [][]string

waitForRestore(t, exp)
exp.WalkTokens(func(leaseId string, auth *logical.Auth, path string) bool {
policies = append(policies, auth.Policies)
return true
Expand Down
Loading