Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Token gauge metrics implementation. #9239

Merged
merged 13 commits into from
Jun 23, 2020
44 changes: 44 additions & 0 deletions vault/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -2093,6 +2093,49 @@ func (c *Core) emitMetrics(stopCh chan struct{}) {
writeTimer := time.Tick(c.counters.syncInterval)
identityCountTimer := time.Tick(time.Minute * 10)

// The gauge collection processes are started and stopped here
// because there's more than one TokenManager created during startup,
// but we only want one set of gauges.
if c.MetricSink().GaugeInterval != time.Duration(0) {
tokenCount1, err := c.MetricSink().NewGaugeCollectionProcess(
[]string{"token", "count"},
[]metrics.Label{{"gauge", "token_by_namespace"}},
func(ctx context.Context) ([]metricsutil.GaugeLabelValues, error) {
if c.tokenStore == nil {
return []metricsutil.GaugeLabelValues{}, errors.New("nil token store")
} else {
return c.tokenStore.gaugeCollector(ctx)
}
},
c.logger,
)
if err != nil {
c.logger.Error("failed to start token count", "error", err)
} else {
go tokenCount1.Run()
defer tokenCount1.Stop()
}

tokenCount2, err := c.MetricSink().NewGaugeCollectionProcess(
[]string{"token", "count", "by_policy"},
[]metrics.Label{{"gauge", "token_by_policy"}},
func(ctx context.Context) ([]metricsutil.GaugeLabelValues, error) {
if c.tokenStore == nil {
return []metricsutil.GaugeLabelValues{}, errors.New("nil token store")
} else {
return c.tokenStore.gaugeCollectorByPolicy(ctx)
}
},
c.logger,
)
if err != nil {
c.logger.Error("failed to start token count", "error", err)
} else {
go tokenCount2.Run()
defer tokenCount2.Stop()
}
}

for {
select {
case <-emitTimer:
Expand Down Expand Up @@ -2140,6 +2183,7 @@ func (c *Core) emitMetrics(stopCh chan struct{}) {
return
}
}

}

func (c *Core) ReplicationState() consts.ReplicationState {
Expand Down
9 changes: 7 additions & 2 deletions vault/expiration.go
Original file line number Diff line number Diff line change
Expand Up @@ -1903,8 +1903,11 @@ type ExpirationWalkFunction = func(leaseID string, auth *logical.Auth, path stri

// WalkTokens extracts the Auth structure from leases corresponding to tokens.
// Returning false from the walk function terminates the iteration.
// TODO: signal if reload hasn't finished yet?
func (m *ExpirationManager) WalkTokens(walkFn ExpirationWalkFunction) {
func (m *ExpirationManager) WalkTokens(walkFn ExpirationWalkFunction) error {
if m.inRestoreMode() {
return errors.New("expiration manager in restore mode")
}

callback := func(key, value interface{}) bool {
p := value.(pendingInfo)
if p.cachedLeaseInfo == nil {
Expand All @@ -1919,6 +1922,8 @@ func (m *ExpirationManager) WalkTokens(walkFn ExpirationWalkFunction) {

m.pending.Range(callback)
m.nonexpiring.Range(callback)

return nil
}

// leaseEntry is used to structure the values the expiration
Expand Down
19 changes: 19 additions & 0 deletions vault/expiration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2092,6 +2092,8 @@ func TestExpiration_WalkTokens(t *testing.T) {
sampleToken(t, exp, "auth/github/login", false, "root"),
}

waitForRestore(t, exp)

for true {
// Count before and after each revocation
t.Logf("Counting %d tokens.", len(tokenEntries))
Expand Down Expand Up @@ -2133,6 +2135,22 @@ func TestExpiration_WalkTokens(t *testing.T) {

}

func waitForRestore(t *testing.T, exp *ExpirationManager) {
t.Helper()

timeout := time.After(200 * time.Millisecond)
ticker := time.Tick(5 * time.Millisecond)

for exp.inRestoreMode() {
select {
case <-timeout:
t.Fatalf("Timeout waiting for expiration manager to recover.")
case <-ticker:
continue
}
}
}

func TestExpiration_CachedPolicyIsShared(t *testing.T) {
exp := mockExpiration(t)

Expand All @@ -2144,6 +2162,7 @@ func TestExpiration_CachedPolicyIsShared(t *testing.T) {

var policies [][]string

waitForRestore(t, exp)
exp.WalkTokens(func(leaseId string, auth *logical.Auth, path string) bool {
policies = append(policies, auth.Policies)
return true
Expand Down
155 changes: 154 additions & 1 deletion vault/token_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"time"

"github.com/armon/go-metrics"
"github.com/armon/go-radix"
"github.com/golang/protobuf/proto"
"github.com/hashicorp/errwrap"
log "github.com/hashicorp/go-hclog"
Expand Down Expand Up @@ -3419,7 +3420,7 @@ func (ts *TokenStore) gaugeCollector(ctx context.Context) ([]metricsutil.GaugeLa
namespacePosition[ns.ID] = i
}

ts.expiration.WalkTokens(func(leaseID string, auth *logical.Auth, path string) bool {
err := ts.expiration.WalkTokens(func(leaseID string, auth *logical.Auth, path string) bool {
select {
// Abort and return empty collection if it's taking too much time, nonblocking check.
case <-ctx.Done():
Expand All @@ -3439,10 +3440,16 @@ func (ts *TokenStore) gaugeCollector(ctx context.Context) ([]metricsutil.GaugeLa
}
})

if err != nil {
return []metricsutil.GaugeLabelValues{}, nil
}

// If collection was cancelled, return an empty array.
select {
case <-ctx.Done():
return []metricsutil.GaugeLabelValues{}, nil
default:
break
}

for i := range values {
Expand All @@ -3452,6 +3459,152 @@ func (ts *TokenStore) gaugeCollector(ctx context.Context) ([]metricsutil.GaugeLa

}

func (ts *TokenStore) gaugeCollectorByPolicy(ctx context.Context) ([]metricsutil.GaugeLabelValues, error) {
if ts.expiration == nil {
return []metricsutil.GaugeLabelValues{}, errors.New("expiration manager is nil")
}

allNamespaces := ts.core.collectNamespaces()
byNsAndPolicy := make(map[string]map[string]int)

err := ts.expiration.WalkTokens(func(leaseID string, auth *logical.Auth, path string) bool {
select {
// Abort and return empty collection if it's taking too much time, nonblocking check.
case <-ctx.Done():
return false
default:
_, nsID := namespace.SplitIDFromString(leaseID)
if nsID == "" {
nsID = namespace.RootNamespaceID
}
policyMap, ok := byNsAndPolicy[nsID]
if !ok {
policyMap = make(map[string]int)
byNsAndPolicy[nsID] = policyMap
}
for _, policy := range auth.Policies {
policyMap[policy] = policyMap[policy] + 1
}
return true
}
})

if err != nil {
return []metricsutil.GaugeLabelValues{}, nil
}

// If collection was cancelled, return an empty array.
select {
case <-ctx.Done():
return []metricsutil.GaugeLabelValues{}, nil
default:
break
}

// TODO: can we estimate the needed size?
flattenedResults := make([]metricsutil.GaugeLabelValues, 0)
for _, ns := range allNamespaces {
for policy, count := range byNsAndPolicy[ns.ID] {
flattenedResults = append(flattenedResults,
metricsutil.GaugeLabelValues{
Labels: []metrics.Label{
metricsutil.NamespaceLabel(ns),
{"policy", policy},
},
Value: float32(count),
})
}
}
return flattenedResults, nil
}

func (ts *TokenStore) gaugeCollectorByMethod(ctx context.Context) ([]metricsutil.GaugeLabelValues, error) {
if ts.expiration == nil {
return []metricsutil.GaugeLabelValues{}, errors.New("expiration manager is nil")
}

rootContext := namespace.RootContext(ctx)
allNamespaces := ts.core.collectNamespaces()
byNsAndMethod := make(map[string]map[string]int)

// Cache the prefixes that we find locally rather than
// hitting the shared mount table every time
prefixTree := radix.New()

pathToPrefix := func(path string) string {
_, method, ok := prefixTree.LongestPrefix(path)
if ok {
return method.(string)
}

// This method needs a context that has a namespace
// FIXME: does it need to match the namespace in the lease?
mountEntry := ts.core.router.MatchingMountEntry(rootContext, path)
if mountEntry == nil {
return "unknown"
}

// FIXME: is there a better way to do this, rather than accessing
// the router twice? mountEntry.Path is incomplete.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You may need to join the ns path with the mount path. Is that what you meant by incomplete?

Copy link
Contributor Author

@mgritter mgritter Jun 19, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The specific issue here was that mountEntry.Path doesn't include auth/, but the path stored in the lease does.

I run into the same problem in various places; it makes a certain amount of sense because the auth mount table is separate and the mounts can only be under auth/. I should probably make a function that converts the mountEntry back to its user-visible version, but I'm not sure if there is some existing functionality that does the job.

I was a little wary of just appending auth/ to the name, because of namespaces, but I do need to test that this works correctly in that case-- maybe it does not.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Verified I don't need to join the namespace (see example output in a comment.)

matchingMount := ts.core.router.MatchingMount(rootContext, path)
if matchingMount == "" {
return "unknown"
}

prefixTree.Insert(matchingMount, mountEntry.Type)
return mountEntry.Type
}

err := ts.expiration.WalkTokens(func(leaseID string, auth *logical.Auth, path string) bool {
select {
// Abort and return empty collection if it's taking too much time, nonblocking check.
case <-ctx.Done():
return false
default:
_, nsID := namespace.SplitIDFromString(leaseID)
if nsID == "" {
nsID = namespace.RootNamespaceID
}
methodMap, ok := byNsAndMethod[nsID]
if !ok {
methodMap = make(map[string]int)
byNsAndMethod[nsID] = methodMap
}
method := pathToPrefix(path)
methodMap[method] = methodMap[method] + 1
return true
}
})

if err != nil {
return []metricsutil.GaugeLabelValues{}, nil
}

// If collection was cancelled, return an empty array.
select {
case <-ctx.Done():
return []metricsutil.GaugeLabelValues{}, nil
default:
break
}

// TODO: can we estimate the needed size?
flattenedResults := make([]metricsutil.GaugeLabelValues, 0)
for _, ns := range allNamespaces {
for method, count := range byNsAndMethod[ns.ID] {
flattenedResults = append(flattenedResults,
metricsutil.GaugeLabelValues{
Labels: []metrics.Label{
metricsutil.NamespaceLabel(ns),
{"auth_method", method},
},
Value: float32(count),
})
}
}
return flattenedResults, nil
}

const (
tokenTidyHelp = `
This endpoint performs cleanup tasks that can be run if certain error
Expand Down