Skip to content

Commit

Permalink
Token gauge metrics implementation. (#9239)
Browse files Browse the repository at this point in the history
* Token gauge metrics implementation.
* Enable gauges only when interval is nonzero.
* Added count by TTL
* Yandle "in restore mode" error specifically.
* Refactored initialization code for gauge collection processes.
* Fixed for multiple namespaces.
* Ability to disable individual gauges with environment variable.
* changelog++
  • Loading branch information
Mark Gritter authored Jun 23, 2020
1 parent 2b0b33f commit 9f143ca
Show file tree
Hide file tree
Showing 8 changed files with 549 additions and 64 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ IMPROVEMENTS:
* auth/kubernetes: Allow disabling `iss` validation [[GH-91](https://github.com/hashicorp/vault-plugin-auth-kubernetes/pull/91)]
* core: Add the Go version used to build a Vault binary to the server message output. [[GH-9078](https://github.com/hashicorp/vault/pull/9078)]
* core: Added Password Policies for user-configurable password generation [[GH-8637](https://github.com/hashicorp/vault/pull/8637)]
* core: New telemetry metrics covering token counts, token creation, KV secret counts, lease creation. [[GH-9239](https://github.com/hashicorp/vault/pull/9239)] [[GH-9250](https://github.com/hashicorp/vault/pull/9250)] [[GH-9244](https://github.com/hashicorp/vault/pull/9244)] [[GH-9052](https://github.com/hashicorp/vault/pull/9052)]
* cli: Support reading TLS parameters from file for the `vault operator raft join` command. [[GH-9060](https://github.com/hashicorp/vault/pull/9060)]
* plugin: Add SDK method, `Sys.ReloadPlugin`, and CLI command, `vault plugin reload`,
for reloading plugins. [[GH-8777](https://github.com/hashicorp/vault/pull/8777)]
Expand Down
4 changes: 2 additions & 2 deletions helper/metricsutil/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ var bucketBoundaries = []struct {
{30 * 24 * time.Hour, "30d"},
}

const overflowBucket = "+Inf"
const OverflowBucket = "+Inf"

// TTLBucket computes the label to apply for a token TTL.
func TTLBucket(ttl time.Duration) string {
Expand All @@ -31,7 +31,7 @@ func TTLBucket(ttl time.Duration) string {
},
)
if upperBound >= len(bucketBoundaries) {
return overflowBucket
return OverflowBucket
} else {
return bucketBoundaries[upperBound].Label
}
Expand Down
55 changes: 0 additions & 55 deletions vault/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -2086,61 +2086,6 @@ func stopReplicationImpl(c *Core) error {
return nil
}

// emitMetrics is used to periodically expose metrics while running
func (c *Core) emitMetrics(stopCh chan struct{}) {
emitTimer := time.Tick(time.Second)
writeTimer := time.Tick(c.counters.syncInterval)
identityCountTimer := time.Tick(time.Minute * 10)

for {
select {
case <-emitTimer:
c.metricsMutex.Lock()
if c.expiration != nil {
c.expiration.emitMetrics()
}
//Refresh the sealed gauge
if c.Sealed() {
c.metricSink.SetGaugeWithLabels([]string{"core", "unsealed"}, 0, nil)
} else {
c.metricSink.SetGaugeWithLabels([]string{"core", "unsealed"}, 1, nil)
}

c.metricsMutex.Unlock()

case <-writeTimer:
if stopped := grabLockOrStop(c.stateLock.RLock, c.stateLock.RUnlock, stopCh); stopped {
// Go through the loop again, this time the stop channel case
// should trigger
continue
}
if c.perfStandby {
syncCounter(c)
} else {
err := c.saveCurrentRequestCounters(context.Background(), time.Now())
if err != nil {
c.logger.Error("writing request counters to barrier", "err", err)
}
}
c.stateLock.RUnlock()
case <-identityCountTimer:
go func() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
entities, err := c.countActiveEntities(ctx)
if err != nil {
c.logger.Error("error counting identity entities", "err", err)
} else {
metrics.SetGauge([]string{"identity", "num_entities"}, float32(entities.Entities.Total))
}
}()

case <-stopCh:
return
}
}
}

func (c *Core) ReplicationState() consts.ReplicationState {
return consts.ReplicationState(atomic.LoadUint32(c.replicationState))
}
Expand Down
190 changes: 186 additions & 4 deletions vault/core_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,196 @@ package vault

import (
"context"
"errors"
"os"
"strings"
"time"

"github.com/armon/go-metrics"
"github.com/hashicorp/vault/helper/metricsutil"
"github.com/hashicorp/vault/helper/namespace"
"github.com/hashicorp/vault/sdk/logical"
)

// TODO: move emitMetrics into this file.
func (c *Core) metricsLoop(stopCh chan struct{}) {
emitTimer := time.Tick(time.Second)
writeTimer := time.Tick(c.counters.syncInterval)
identityCountTimer := time.Tick(time.Minute * 10)

// This loop covers
// vault.expire.num_leases
// vault.core.unsealed
// vault.identity.num_entities
// and the non-telemetry request counters shown in the UI.
for {
select {
case <-emitTimer:
c.metricsMutex.Lock()
if c.expiration != nil {
c.expiration.emitMetrics()
}
// Refresh the sealed gauge
if c.Sealed() {
c.metricSink.SetGaugeWithLabels([]string{"core", "unsealed"}, 0, nil)
} else {
c.metricSink.SetGaugeWithLabels([]string{"core", "unsealed"}, 1, nil)
}
c.metricsMutex.Unlock()

case <-writeTimer:
if stopped := grabLockOrStop(c.stateLock.RLock, c.stateLock.RUnlock, stopCh); stopped {
// Go through the loop again, this time the stop channel case
// should trigger
continue
}
if c.perfStandby {
syncCounter(c)
} else {
err := c.saveCurrentRequestCounters(context.Background(), time.Now())
if err != nil {
c.logger.Error("writing request counters to barrier", "err", err)
}
}
c.stateLock.RUnlock()
case <-identityCountTimer:
// TODO: this can be replaced by the identity gauge counter; we need to
// sum across all namespaces.
go func() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
entities, err := c.countActiveEntities(ctx)
if err != nil {
c.logger.Error("error counting identity entities", "err", err)
} else {
metrics.SetGauge([]string{"identity", "num_entities"}, float32(entities.Entities.Total))
}
}()

case <-stopCh:
return
}
}
}

// These wrappers are responsible for redirecting to the current instance of
// TokenStore; there is one per method because an additional level of abstraction
// seems confusing.
func (c *Core) tokenGaugeCollector(ctx context.Context) ([]metricsutil.GaugeLabelValues, error) {
// stateLock or authLock protects the tokenStore pointer
c.stateLock.RLock()
ts := c.tokenStore
c.stateLock.RUnlock()
if ts == nil {
return []metricsutil.GaugeLabelValues{}, errors.New("nil token store")
}
return ts.gaugeCollector(ctx)
}

func (c *Core) tokenGaugePolicyCollector(ctx context.Context) ([]metricsutil.GaugeLabelValues, error) {
c.stateLock.RLock()
ts := c.tokenStore
c.stateLock.RUnlock()
if ts == nil {
return []metricsutil.GaugeLabelValues{}, errors.New("nil token store")
}
return ts.gaugeCollectorByPolicy(ctx)
}

func (c *Core) tokenGaugeMethodCollector(ctx context.Context) ([]metricsutil.GaugeLabelValues, error) {
c.stateLock.RLock()
ts := c.tokenStore
c.stateLock.RUnlock()
if ts == nil {
return []metricsutil.GaugeLabelValues{}, errors.New("nil token store")
}
return ts.gaugeCollectorByMethod(ctx)
}

func (c *Core) tokenGaugeTtlCollector(ctx context.Context) ([]metricsutil.GaugeLabelValues, error) {
c.stateLock.RLock()
ts := c.tokenStore
c.stateLock.RUnlock()
if ts == nil {
return []metricsutil.GaugeLabelValues{}, errors.New("nil token store")
}
return ts.gaugeCollectorByTtl(ctx)
}

// emitMetrics is used to start all the periodc metrics; all of them should
// be shut down when stopCh is closed.
func (c *Core) emitMetrics(stopCh chan struct{}) {
// The gauge collection processes are started and stopped here
// because there's more than one TokenManager created during startup,
// but we only want one set of gauges.

metricsInit := []struct {
MetricName []string
MetadataLabel []metrics.Label
CollectorFunc metricsutil.GaugeCollector
DisableEnvVar string
}{
{
[]string{"token", "count"},
[]metrics.Label{{"gauge", "token_by_namespace"}},
c.tokenGaugeCollector,
"",
},
{
[]string{"token", "count", "by_policy"},
[]metrics.Label{{"gauge", "token_by_policy"}},
c.tokenGaugePolicyCollector,
"",
},
{
[]string{"token", "count", "by_auth"},
[]metrics.Label{{"gauge", "token_by_auth"}},
c.tokenGaugeMethodCollector,
"",
},
{
[]string{"token", "count", "by_ttl"},
[]metrics.Label{{"gauge", "token_by_ttl"}},
c.tokenGaugeTtlCollector,
"",
},
{
[]string{"secret", "kv", "count"},
[]metrics.Label{{"gauge", "kv_secrets_by_mountpoint"}},
c.kvSecretGaugeCollector,
"VAULT_DISABLE_KV_GAUGE",
},
}

if c.MetricSink().GaugeInterval == time.Duration(0) {
c.logger.Info("usage gauge collection is disabled")
} else {
for _, init := range metricsInit {
if init.DisableEnvVar != "" {
if os.Getenv(init.DisableEnvVar) != "" {
c.logger.Info("usage gauge collection is disabled for",
"metric", init.MetricName)
continue
}
}

proc, err := c.MetricSink().NewGaugeCollectionProcess(
init.MetricName,
init.MetadataLabel,
init.CollectorFunc,
c.logger,
)
if err != nil {
c.logger.Error("failed to start collector", "metric", init.MetricName, "error", err)
} else {
go proc.Run()
defer proc.Stop()
}
}
}

// When this returns, all the defers set up above will fire.
c.metricsLoop(stopCh)
}

type kvMount struct {
Namespace *namespace.Namespace
Expand Down Expand Up @@ -53,9 +234,9 @@ func (c *Core) kvCollectionErrorCount() {
func (c *Core) walkKvMountSecrets(ctx context.Context, m *kvMount) {
var subdirectories []string
if m.Version == "1" {
subdirectories = []string{m.MountPoint}
subdirectories = []string{m.Namespace.Path + m.MountPoint}
} else {
subdirectories = []string{m.MountPoint + "metadata/"}
subdirectories = []string{m.Namespace.Path + m.MountPoint + "metadata/"}
}

for len(subdirectories) > 0 {
Expand Down Expand Up @@ -115,7 +296,8 @@ func (c *Core) kvSecretGaugeCollector(ctx context.Context) ([]metricsutil.GaugeL
mounts := c.findKvMounts()
results := make([]metricsutil.GaugeLabelValues, len(mounts))

// Context must have root namespace
// Use a root namespace, so include namespace path
// in any queries.
ctx = namespace.RootContext(ctx)

// Route list requests to all the identified mounts.
Expand Down
13 changes: 11 additions & 2 deletions vault/expiration.go
Original file line number Diff line number Diff line change
Expand Up @@ -1901,10 +1901,17 @@ func (m *ExpirationManager) emitMetrics() {
// type (though most likely we would only call this from within the "vault" core package.)
type ExpirationWalkFunction = func(leaseID string, auth *logical.Auth, path string) bool

var (
ErrInRestoreMode = errors.New("expiration manager in restore mode")
)

// WalkTokens extracts the Auth structure from leases corresponding to tokens.
// Returning false from the walk function terminates the iteration.
// TODO: signal if reload hasn't finished yet?
func (m *ExpirationManager) WalkTokens(walkFn ExpirationWalkFunction) {
func (m *ExpirationManager) WalkTokens(walkFn ExpirationWalkFunction) error {
if m.inRestoreMode() {
return ErrInRestoreMode
}

callback := func(key, value interface{}) bool {
p := value.(pendingInfo)
if p.cachedLeaseInfo == nil {
Expand All @@ -1919,6 +1926,8 @@ func (m *ExpirationManager) WalkTokens(walkFn ExpirationWalkFunction) {

m.pending.Range(callback)
m.nonexpiring.Range(callback)

return nil
}

// leaseEntry is used to structure the values the expiration
Expand Down
19 changes: 19 additions & 0 deletions vault/expiration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2092,6 +2092,8 @@ func TestExpiration_WalkTokens(t *testing.T) {
sampleToken(t, exp, "auth/github/login", false, "root"),
}

waitForRestore(t, exp)

for true {
// Count before and after each revocation
t.Logf("Counting %d tokens.", len(tokenEntries))
Expand Down Expand Up @@ -2133,6 +2135,22 @@ func TestExpiration_WalkTokens(t *testing.T) {

}

func waitForRestore(t *testing.T, exp *ExpirationManager) {
t.Helper()

timeout := time.After(200 * time.Millisecond)
ticker := time.Tick(5 * time.Millisecond)

for exp.inRestoreMode() {
select {
case <-timeout:
t.Fatalf("Timeout waiting for expiration manager to recover.")
case <-ticker:
continue
}
}
}

func TestExpiration_CachedPolicyIsShared(t *testing.T) {
exp := mockExpiration(t)

Expand All @@ -2144,6 +2162,7 @@ func TestExpiration_CachedPolicyIsShared(t *testing.T) {

var policies [][]string

waitForRestore(t, exp)
exp.WalkTokens(func(leaseId string, auth *logical.Auth, path string) bool {
policies = append(policies, auth.Policies)
return true
Expand Down
Loading

0 comments on commit 9f143ca

Please sign in to comment.