Skip to content

Commit f39fd11

Browse files
captncraigrfratto
andauthored
flow: kubernetes SD (#1941)
* initial k8s_sd * working k8s sd * pull out dedicated discovery helper * clean up discovery logic * apply defaults * move config to common package * fmt * add river tags * embed HTTPClientConfig * use more convert functions * can't count on parent context existing * more river tags * more river tags * add file * fix issue where new exports sometimes didn't propagate during graph re-eval * handle empty proxy string * update comment * fix merge * lint comments * lint issues * remove unused: * code review * comments * use last discoverer pattern * lint Co-authored-by: Robert Fratto <[email protected]>
1 parent 53e8cf3 commit f39fd11

File tree

6 files changed

+457
-136
lines changed

6 files changed

+457
-136
lines changed

component/all/all.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,11 @@
22
package all
33

44
import (
5-
_ "github.com/grafana/agent/component/local/file" // Import local.file
6-
_ "github.com/grafana/agent/component/metrics/mutate" // Import metrics.mutate
7-
_ "github.com/grafana/agent/component/metrics/remotewrite" // Import metrics.remotewrite
8-
_ "github.com/grafana/agent/component/metrics/scrape" // Import metrics.scrape
9-
_ "github.com/grafana/agent/component/remote/s3" // Import s3.file
10-
_ "github.com/grafana/agent/component/targets/mutate" // Import targets.mutate
5+
_ "github.com/grafana/agent/component/discovery/kubernetes" // Import discovery.k8s
6+
_ "github.com/grafana/agent/component/local/file" // Import local.file
7+
_ "github.com/grafana/agent/component/metrics/mutate" // Import metrics.mutate
8+
_ "github.com/grafana/agent/component/metrics/remotewrite" // Import metrics.remotewrite
9+
_ "github.com/grafana/agent/component/metrics/scrape" // Import metrics.scrape
10+
_ "github.com/grafana/agent/component/remote/s3" // Import s3.file
11+
_ "github.com/grafana/agent/component/targets/mutate" // Import targets.mutate
1112
)

component/common/config/types.go

Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
// Package config contains types from github.com/prometheus/common/config,
2+
// but modifiys them to be serializable with River.
3+
package config
4+
5+
import (
6+
"fmt"
7+
"net/url"
8+
9+
"github.com/grafana/agent/pkg/flow/rivertypes"
10+
"github.com/prometheus/common/config"
11+
)
12+
13+
// HTTPClientConfig mirrors config.HTTPClientConfig
14+
type HTTPClientConfig struct {
15+
BasicAuth *BasicAuth `river:"basic_auth,block,optional"`
16+
Authorization *Authorization `river:"authorization,block,optional"`
17+
OAuth2 *OAuth2Config `river:"oauth2,block,optional"`
18+
BearerToken rivertypes.Secret `river:"bearer_token,attr,optional"`
19+
BearerTokenFile string `river:"bearer_token_file,attr,optional"`
20+
ProxyURL URL `river:"proxy_url,attr,optional"`
21+
TLSConfig TLSConfig `river:"tls_config,block,optional"`
22+
FollowRedirects bool `river:"follow_redirects,attr,optional"`
23+
EnableHTTP2 bool `river:"enable_http2,attr,optional"`
24+
}
25+
26+
// Convert converts our type to the native prometheus type
27+
func (h *HTTPClientConfig) Convert() *config.HTTPClientConfig {
28+
return &config.HTTPClientConfig{
29+
BasicAuth: h.BasicAuth.Convert(),
30+
Authorization: h.Authorization.Convert(),
31+
OAuth2: h.OAuth2.Convert(),
32+
BearerToken: config.Secret(h.BearerToken),
33+
BearerTokenFile: h.BearerTokenFile,
34+
ProxyURL: h.ProxyURL.Convert(),
35+
TLSConfig: *h.TLSConfig.Convert(),
36+
FollowRedirects: h.FollowRedirects,
37+
EnableHTTP2: h.EnableHTTP2,
38+
}
39+
}
40+
41+
// DefaultHTTPClientConfig for initializing objects
42+
var DefaultHTTPClientConfig = HTTPClientConfig{
43+
FollowRedirects: true,
44+
EnableHTTP2: true,
45+
}
46+
47+
// BasicAuth configures Basic HTTP authentication credentials.
48+
type BasicAuth struct {
49+
Username string `river:"username,attr,optional"`
50+
Password rivertypes.Secret `river:"password,attr,optional"`
51+
PasswordFile string `river:"password_file,attr,optional"`
52+
}
53+
54+
// Convert converts our type to the native prometheus type
55+
func (b *BasicAuth) Convert() *config.BasicAuth {
56+
if b == nil {
57+
return nil
58+
}
59+
return &config.BasicAuth{
60+
Username: b.Username,
61+
Password: config.Secret(b.Password),
62+
PasswordFile: b.PasswordFile,
63+
}
64+
}
65+
66+
// URL mirrors config.URL
67+
type URL struct {
68+
*url.URL
69+
}
70+
71+
// MarshalText implements encoding.TextMarshaler
72+
func (u URL) MarshalText() (text []byte, err error) {
73+
u2 := &config.URL{
74+
URL: u.URL,
75+
}
76+
if u.URL != nil {
77+
return []byte(u2.Redacted()), nil
78+
}
79+
return nil, nil
80+
}
81+
82+
// UnmarshalText implements encoding.TextUnmarshaler
83+
func (u *URL) UnmarshalText(text []byte) error {
84+
s := string(text)
85+
urlp, err := url.Parse(s)
86+
if err != nil {
87+
return err
88+
}
89+
u.URL = urlp
90+
return nil
91+
}
92+
93+
// Convert converts our type to the native prometheus type
94+
func (u URL) Convert() config.URL {
95+
return config.URL{URL: u.URL}
96+
}
97+
98+
// Authorization sets up HTTP authorization credentials.
99+
type Authorization struct {
100+
Type string `river:"type,attr,optional"`
101+
Credentials rivertypes.Secret `river:"credentials,attr,optional"`
102+
CredentialsFile string `river:"credentials_file,attr,optional"`
103+
}
104+
105+
// Convert converts our type to the native prometheus type
106+
func (a *Authorization) Convert() *config.Authorization {
107+
if a == nil {
108+
return nil
109+
}
110+
return &config.Authorization{
111+
Type: a.Type,
112+
Credentials: config.Secret(a.Credentials),
113+
CredentialsFile: a.CredentialsFile,
114+
}
115+
}
116+
117+
// TLSVersion mirrors config.TLSVersion
118+
type TLSVersion uint16
119+
120+
// MarshalText implements encoding.TextMarshaler
121+
func (tv TLSVersion) MarshalText() (text []byte, err error) {
122+
for s, v := range config.TLSVersions {
123+
if config.TLSVersion(tv) == v {
124+
return []byte(s), nil
125+
}
126+
}
127+
return nil, fmt.Errorf("unknown TLS version: %d", tv)
128+
}
129+
130+
// UnmarshalText implements encoding.TextUnmarshaler
131+
func (tv *TLSVersion) UnmarshalText(text []byte) error {
132+
if v, ok := config.TLSVersions[string(text)]; ok {
133+
*tv = TLSVersion(v)
134+
return nil
135+
}
136+
return fmt.Errorf("unknown TLS version: %s", string(text))
137+
}
138+
139+
// TLSConfig sets up options for TLS connections.
140+
type TLSConfig struct {
141+
CAFile string `river:"ca_file,attr,optional"`
142+
CertFile string `river:"cert_file,attr,optional"`
143+
KeyFile string `river:"key_file,attr,optional"`
144+
ServerName string `river:"server_name,attr,optional"`
145+
InsecureSkipVerify bool `river:"insecure_skip_verify,attr,optional"`
146+
MinVersion TLSVersion `river:"min_version,attr,optional"`
147+
}
148+
149+
// Convert converts our type to the native prometheus type
150+
func (t *TLSConfig) Convert() *config.TLSConfig {
151+
if t == nil {
152+
return nil
153+
}
154+
return &config.TLSConfig{
155+
CAFile: t.CAFile,
156+
CertFile: t.CertFile,
157+
KeyFile: t.KeyFile,
158+
ServerName: t.ServerName,
159+
InsecureSkipVerify: t.InsecureSkipVerify,
160+
MinVersion: config.TLSVersion(t.MinVersion),
161+
}
162+
}
163+
164+
// OAuth2Config sets up the OAuth2 client.
165+
type OAuth2Config struct {
166+
ClientID string `river:"client_id,attr,optional"`
167+
ClientSecret rivertypes.Secret `river:"client_secret,attr,optional"`
168+
ClientSecretFile string `river:"client_secret_file,attr,optional"`
169+
Scopes []string `river:"scopes,attr,optional"`
170+
TokenURL string `river:"token_url,attr,optional"`
171+
EndpointParams map[string]string `river:"endpoint_params,attr,optional"`
172+
ProxyURL URL `river:"proxy_url,attr,optional"`
173+
TLSConfig *TLSConfig `river:"tls_config,attr,optional"`
174+
}
175+
176+
// Convert converts our type to the native prometheus type
177+
func (o *OAuth2Config) Convert() *config.OAuth2 {
178+
if o == nil {
179+
return nil
180+
}
181+
return &config.OAuth2{
182+
ClientID: o.ClientID,
183+
ClientSecret: config.Secret(o.ClientSecret),
184+
ClientSecretFile: o.ClientSecretFile,
185+
Scopes: o.Scopes,
186+
TokenURL: o.TokenURL,
187+
EndpointParams: o.EndpointParams,
188+
ProxyURL: o.ProxyURL.Convert(),
189+
TLSConfig: *o.TLSConfig.Convert(),
190+
}
191+
}

component/discovery/discovery.go

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
package discovery
2+
3+
import (
4+
"context"
5+
"time"
6+
7+
"github.com/grafana/agent/component/metrics/scrape"
8+
"github.com/prometheus/prometheus/discovery"
9+
"github.com/prometheus/prometheus/discovery/targetgroup"
10+
)
11+
12+
// maxUpdateFrequency is the minimum time to wait between updating targets.
13+
// Currently not settable, since prometheus uses a static threshold, but
14+
// we could reconsider later.
15+
const maxUpdateFrequency = 5 * time.Second
16+
17+
// Discoverer is an alias for Prometheus' Discoverer interface, so users of this package don't need
18+
// to import github.com/prometheus/prometheus/discover as well.
19+
type Discoverer discovery.Discoverer
20+
21+
// RunDiscovery is a utility for consuming and forwarding target groups from a discoverer.
22+
// It will handle collating targets (and clearing), as well as time based throttling of updates.
23+
// f should be a function that updates the component's exports, most likely calling `opts.OnStateChange()`.
24+
func RunDiscovery(ctx context.Context, d Discoverer, f func([]scrape.Target)) {
25+
// all targets we have seen so far
26+
cache := map[string]*targetgroup.Group{}
27+
28+
ch := make(chan []*targetgroup.Group)
29+
go d.Run(ctx, ch)
30+
31+
// function to convert and send targets in format scraper expects
32+
send := func() {
33+
allTargets := []scrape.Target{}
34+
for _, group := range cache {
35+
for _, target := range group.Targets {
36+
labels := map[string]string{}
37+
// first add the group labels, and then the
38+
// target labels, so that target labels take precedence.
39+
for k, v := range group.Labels {
40+
labels[string(k)] = string(v)
41+
}
42+
for k, v := range target {
43+
labels[string(k)] = string(v)
44+
}
45+
allTargets = append(allTargets, labels)
46+
}
47+
}
48+
f(allTargets)
49+
}
50+
51+
ticker := time.NewTicker(maxUpdateFrequency)
52+
// true if we have received new targets and need to send.
53+
haveUpdates := false
54+
for {
55+
select {
56+
case <-ticker.C:
57+
if haveUpdates {
58+
send()
59+
haveUpdates = false
60+
}
61+
case <-ctx.Done():
62+
send()
63+
return
64+
case groups := <-ch:
65+
for _, group := range groups {
66+
// Discoverer will send an empty target set to indicate the group (keyed by Source field)
67+
// should be removed
68+
if len(group.Targets) == 0 {
69+
delete(cache, group.Source)
70+
} else {
71+
cache[group.Source] = group
72+
}
73+
}
74+
haveUpdates = true
75+
}
76+
}
77+
}

0 commit comments

Comments
 (0)