Skip to content

Commit ded52ff

Browse files
Merge pull request #52302 from smarterclayton/simplify_metrics_registration
Automatic merge from submit-queue. If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.. Collapse all metrics handlers into common code Remove the MonitorRequest method and replace with a method that takes request.RequestInfo, which is our default way to talk about API objects. Preserves existing semantics for calls. Not for 1.8, but fixes the ugliness and code duplication in #52237 Kubernetes-commit: 30f015a6fc13979193a6105890ce91ace651eb5c
2 parents 0f3e92f + 4175a07 commit ded52ff

File tree

9 files changed

+351
-318
lines changed

9 files changed

+351
-318
lines changed

Godeps/Godeps.json

+220-220
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/endpoints/handlers/proxy.go

+26-34
Original file line numberDiff line numberDiff line change
@@ -54,22 +54,18 @@ type ProxyHandler struct {
5454

5555
func (r *ProxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
5656
reqStart := time.Now()
57-
proxyHandlerTraceID := rand.Int63()
5857

59-
var verb, apiResource, subresource, scope string
6058
var httpCode int
61-
59+
var requestInfo *request.RequestInfo
6260
defer func() {
6361
responseLength := 0
6462
if rw, ok := w.(*metrics.ResponseWriterDelegator); ok {
6563
responseLength = rw.ContentLength()
64+
if httpCode == 0 {
65+
httpCode = rw.Status()
66+
}
6667
}
67-
metrics.Monitor(
68-
verb, apiResource, subresource, scope,
69-
net.GetHTTPClient(req),
70-
w.Header().Get("Content-Type"),
71-
httpCode, responseLength, reqStart,
72-
)
68+
metrics.Record(req, requestInfo, w.Header().Get("Content-Type"), httpCode, responseLength, time.Now().Sub(reqStart))
7369
}()
7470

7571
ctx, ok := r.Mapper.Get(req)
@@ -79,32 +75,32 @@ func (r *ProxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
7975
return
8076
}
8177

82-
requestInfo, ok := request.RequestInfoFrom(ctx)
78+
requestInfo, ok = request.RequestInfoFrom(ctx)
8379
if !ok {
8480
responsewriters.InternalError(w, req, errors.New("Error getting RequestInfo from context"))
8581
httpCode = http.StatusInternalServerError
8682
return
8783
}
84+
85+
metrics.RecordLongRunning(req, requestInfo, func() {
86+
httpCode = r.serveHTTP(w, req, ctx, requestInfo)
87+
})
88+
}
89+
90+
// serveHTTP performs proxy handling and returns the status code of the operation.
91+
func (r *ProxyHandler) serveHTTP(w http.ResponseWriter, req *http.Request, ctx request.Context, requestInfo *request.RequestInfo) int {
92+
proxyHandlerTraceID := rand.Int63()
93+
8894
if !requestInfo.IsResourceRequest {
8995
responsewriters.NotFound(w, req)
90-
httpCode = http.StatusNotFound
91-
return
92-
}
93-
verb = requestInfo.Verb
94-
namespace, resource, subresource, parts := requestInfo.Namespace, requestInfo.Resource, requestInfo.Subresource, requestInfo.Parts
95-
scope = "cluster"
96-
if namespace != "" {
97-
scope = "namespace"
98-
}
99-
if requestInfo.Name != "" {
100-
scope = "resource"
96+
return http.StatusNotFound
10197
}
98+
namespace, resource, parts := requestInfo.Namespace, requestInfo.Resource, requestInfo.Parts
10299

103100
ctx = request.WithNamespace(ctx, namespace)
104101
if len(parts) < 2 {
105102
responsewriters.NotFound(w, req)
106-
httpCode = http.StatusNotFound
107-
return
103+
return http.StatusNotFound
108104
}
109105
id := parts[1]
110106
remainder := ""
@@ -122,31 +118,26 @@ func (r *ProxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
122118
if !ok {
123119
httplog.LogOf(req, w).Addf("'%v' has no storage object", resource)
124120
responsewriters.NotFound(w, req)
125-
httpCode = http.StatusNotFound
126-
return
121+
return http.StatusNotFound
127122
}
128-
apiResource = resource
129123

130124
gv := schema.GroupVersion{Group: requestInfo.APIGroup, Version: requestInfo.APIVersion}
131125

132126
redirector, ok := storage.(rest.Redirector)
133127
if !ok {
134128
httplog.LogOf(req, w).Addf("'%v' is not a redirector", resource)
135-
httpCode = responsewriters.ErrorNegotiated(ctx, apierrors.NewMethodNotSupported(schema.GroupResource{Resource: resource}, "proxy"), r.Serializer, gv, w, req)
136-
return
129+
return responsewriters.ErrorNegotiated(ctx, apierrors.NewMethodNotSupported(schema.GroupResource{Resource: resource}, "proxy"), r.Serializer, gv, w, req)
137130
}
138131

139132
location, roundTripper, err := redirector.ResourceLocation(ctx, id)
140133
if err != nil {
141134
httplog.LogOf(req, w).Addf("Error getting ResourceLocation: %v", err)
142-
httpCode = responsewriters.ErrorNegotiated(ctx, err, r.Serializer, gv, w, req)
143-
return
135+
return responsewriters.ErrorNegotiated(ctx, err, r.Serializer, gv, w, req)
144136
}
145137
if location == nil {
146138
httplog.LogOf(req, w).Addf("ResourceLocation for %v returned nil", id)
147139
responsewriters.NotFound(w, req)
148-
httpCode = http.StatusNotFound
149-
return
140+
return http.StatusNotFound
150141
}
151142

152143
if roundTripper != nil {
@@ -179,7 +170,7 @@ func (r *ProxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
179170
// https://github.com/openshift/origin/blob/master/pkg/util/httpproxy/upgradeawareproxy.go.
180171
// That proxy needs to be modified to support multiple backends, not just 1.
181172
if r.tryUpgrade(ctx, w, req, newReq, location, roundTripper, gv) {
182-
return
173+
return http.StatusSwitchingProtocols
183174
}
184175

185176
// Redirect requests of the form "/{resource}/{name}" to "/{resource}/{name}/"
@@ -192,7 +183,7 @@ func (r *ProxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
192183
}
193184
w.Header().Set("Location", req.URL.Path+"/"+queryPart)
194185
w.WriteHeader(http.StatusMovedPermanently)
195-
return
186+
return http.StatusMovedPermanently
196187
}
197188

198189
start := time.Now()
@@ -224,6 +215,7 @@ func (r *ProxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
224215
proxy.Transport = roundTripper
225216
proxy.FlushInterval = 200 * time.Millisecond
226217
proxy.ServeHTTP(w, newReq)
218+
return 0
227219
}
228220

229221
// tryUpgrade returns true if the request was handled.

pkg/endpoints/handlers/responsewriters/BUILD

+1
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ go_library(
4242
"//vendor/k8s.io/apiserver/pkg/audit:go_default_library",
4343
"//vendor/k8s.io/apiserver/pkg/authorization/authorizer:go_default_library",
4444
"//vendor/k8s.io/apiserver/pkg/endpoints/handlers/negotiation:go_default_library",
45+
"//vendor/k8s.io/apiserver/pkg/endpoints/metrics:go_default_library",
4546
"//vendor/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
4647
"//vendor/k8s.io/apiserver/pkg/registry/rest:go_default_library",
4748
"//vendor/k8s.io/apiserver/pkg/storage:go_default_library",

pkg/endpoints/handlers/responsewriters/writers.go

+5-1
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
2929
"k8s.io/apiserver/pkg/audit"
3030
"k8s.io/apiserver/pkg/endpoints/handlers/negotiation"
31+
"k8s.io/apiserver/pkg/endpoints/metrics"
3132
"k8s.io/apiserver/pkg/endpoints/request"
3233
"k8s.io/apiserver/pkg/registry/rest"
3334
"k8s.io/apiserver/pkg/util/flushwriter"
@@ -42,7 +43,10 @@ import (
4243
func WriteObject(ctx request.Context, statusCode int, gv schema.GroupVersion, s runtime.NegotiatedSerializer, object runtime.Object, w http.ResponseWriter, req *http.Request) {
4344
stream, ok := object.(rest.ResourceStreamer)
4445
if ok {
45-
StreamObject(ctx, statusCode, gv, s, stream, w, req)
46+
requestInfo, _ := request.RequestInfoFrom(ctx)
47+
metrics.RecordLongRunning(req, requestInfo, func() {
48+
StreamObject(ctx, statusCode, gv, s, stream, w, req)
49+
})
4650
return
4751
}
4852
WriteObjectNegotiated(ctx, s, gv, w, req, statusCode, object)

pkg/endpoints/handlers/rest.go

+14-7
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ import (
4646
"k8s.io/apiserver/pkg/audit"
4747
"k8s.io/apiserver/pkg/endpoints/handlers/negotiation"
4848
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
49+
"k8s.io/apiserver/pkg/endpoints/metrics"
4950
"k8s.io/apiserver/pkg/endpoints/request"
5051
"k8s.io/apiserver/pkg/registry/rest"
5152
utiltrace "k8s.io/apiserver/pkg/util/trace"
@@ -261,12 +262,15 @@ func ConnectResource(connecter rest.Connecter, scope RequestScope, admit admissi
261262
return
262263
}
263264
}
264-
handler, err := connecter.Connect(ctx, name, opts, &responder{scope: scope, req: req, w: w})
265-
if err != nil {
266-
scope.err(err, w, req)
267-
return
268-
}
269-
handler.ServeHTTP(w, req)
265+
requestInfo, _ := request.RequestInfoFrom(ctx)
266+
metrics.RecordLongRunning(req, requestInfo, func() {
267+
handler, err := connecter.Connect(ctx, name, opts, &responder{scope: scope, req: req, w: w})
268+
if err != nil {
269+
scope.err(err, w, req)
270+
return
271+
}
272+
handler.ServeHTTP(w, req)
273+
})
270274
}
271275
}
272276

@@ -366,7 +370,10 @@ func ListResource(r rest.Lister, rw rest.Watcher, scope RequestScope, forceWatch
366370
scope.err(err, w, req)
367371
return
368372
}
369-
serveWatch(watcher, scope, req, w, timeout)
373+
requestInfo, _ := request.RequestInfoFrom(ctx)
374+
metrics.RecordLongRunning(req, requestInfo, func() {
375+
serveWatch(watcher, scope, req, w, timeout)
376+
})
370377
return
371378
}
372379

pkg/endpoints/metrics/BUILD

+1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ go_library(
1919
"//vendor/github.com/emicklei/go-restful:go_default_library",
2020
"//vendor/github.com/prometheus/client_golang/prometheus:go_default_library",
2121
"//vendor/k8s.io/apimachinery/pkg/util/net:go_default_library",
22+
"//vendor/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
2223
],
2324
)
2425

pkg/endpoints/metrics/metrics.go

+82-28
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import (
2727
"time"
2828

2929
utilnet "k8s.io/apimachinery/pkg/util/net"
30-
//utilruntime "k8s.io/apimachinery/pkg/util/runtime"
30+
"k8s.io/apiserver/pkg/endpoints/request"
3131

3232
"github.com/emicklei/go-restful"
3333
"github.com/prometheus/client_golang/prometheus"
@@ -43,6 +43,13 @@ var (
4343
},
4444
[]string{"verb", "resource", "subresource", "scope", "client", "contentType", "code"},
4545
)
46+
longRunningRequestGauge = prometheus.NewGaugeVec(
47+
prometheus.GaugeOpts{
48+
Name: "apiserver_longrunning_gauge",
49+
Help: "Gauge of all active long-running apiserver requests broken out by verb, API resource, and scope. Not all requests are tracked this way.",
50+
},
51+
[]string{"verb", "resource", "subresource", "scope"},
52+
)
4653
requestLatencies = prometheus.NewHistogramVec(
4754
prometheus.HistogramOpts{
4855
Name: "apiserver_request_latencies",
@@ -77,43 +84,59 @@ var (
7784
// Register all metrics.
7885
func Register() {
7986
prometheus.MustRegister(requestCounter)
87+
prometheus.MustRegister(longRunningRequestGauge)
8088
prometheus.MustRegister(requestLatencies)
8189
prometheus.MustRegister(requestLatenciesSummary)
8290
prometheus.MustRegister(responseSizes)
8391
}
8492

85-
// Monitor records a request to the apiserver endpoints that follow the Kubernetes API conventions. verb must be
86-
// uppercase to be backwards compatible with existing monitoring tooling.
87-
func Monitor(verb, resource, subresource, scope, client, contentType string, httpCode, respSize int, reqStart time.Time) {
88-
elapsed := float64((time.Since(reqStart)) / time.Microsecond)
89-
requestCounter.WithLabelValues(verb, resource, subresource, scope, client, contentType, codeToString(httpCode)).Inc()
90-
requestLatencies.WithLabelValues(verb, resource, subresource, scope).Observe(elapsed)
91-
requestLatenciesSummary.WithLabelValues(verb, resource, subresource, scope).Observe(elapsed)
92-
// We are only interested in response sizes of read requests.
93-
if verb == "GET" || verb == "LIST" {
94-
responseSizes.WithLabelValues(verb, resource, subresource, scope).Observe(float64(respSize))
93+
// Record records a single request to the standard metrics endpoints. For use by handlers that perform their own
94+
// processing. All API paths should use InstrumentRouteFunc implicitly. Use this instead of MonitorRequest if
95+
// you already have a RequestInfo object.
96+
func Record(req *http.Request, requestInfo *request.RequestInfo, contentType string, code int, responseSizeInBytes int, elapsed time.Duration) {
97+
if requestInfo == nil {
98+
requestInfo = &request.RequestInfo{Verb: req.Method, Path: req.URL.Path}
99+
}
100+
scope := cleanScope(requestInfo)
101+
if requestInfo.IsResourceRequest {
102+
MonitorRequest(req, strings.ToUpper(requestInfo.Verb), requestInfo.Resource, requestInfo.Subresource, contentType, scope, code, responseSizeInBytes, elapsed)
103+
} else {
104+
MonitorRequest(req, strings.ToUpper(requestInfo.Verb), "", requestInfo.Path, contentType, scope, code, responseSizeInBytes, elapsed)
95105
}
96106
}
97107

98-
// MonitorRequest handles standard transformations for client and the reported verb and then invokes Monitor to record
99-
// a request. verb must be uppercase to be backwards compatible with existing monitoring tooling.
100-
func MonitorRequest(request *http.Request, verb, resource, subresource, scope, contentType string, httpCode, respSize int, reqStart time.Time) {
101-
reportedVerb := verb
102-
if verb == "LIST" {
103-
// see apimachinery/pkg/runtime/conversion.go Convert_Slice_string_To_bool
104-
if values := request.URL.Query()["watch"]; len(values) > 0 {
105-
if value := strings.ToLower(values[0]); value != "0" && value != "false" {
106-
reportedVerb = "WATCH"
107-
}
108-
}
108+
// RecordLongRunning tracks the execution of a long running request against the API server. It provides an accurate count
109+
// of the total number of open long running requests. requestInfo may be nil if the caller is not in the normal request flow.
110+
func RecordLongRunning(req *http.Request, requestInfo *request.RequestInfo, fn func()) {
111+
if requestInfo == nil {
112+
requestInfo = &request.RequestInfo{Verb: req.Method, Path: req.URL.Path}
109113
}
110-
// normalize the legacy WATCHLIST to WATCH to ensure users aren't surprised by metrics
111-
if verb == "WATCHLIST" {
112-
reportedVerb = "WATCH"
114+
var g prometheus.Gauge
115+
scope := cleanScope(requestInfo)
116+
reportedVerb := cleanVerb(strings.ToUpper(requestInfo.Verb), req)
117+
if requestInfo.IsResourceRequest {
118+
g = longRunningRequestGauge.WithLabelValues(reportedVerb, requestInfo.Resource, requestInfo.Subresource, scope)
119+
} else {
120+
g = longRunningRequestGauge.WithLabelValues(reportedVerb, "", requestInfo.Path, scope)
113121
}
122+
g.Inc()
123+
defer g.Dec()
124+
fn()
125+
}
114126

115-
client := cleanUserAgent(utilnet.GetHTTPClient(request))
116-
Monitor(reportedVerb, resource, subresource, scope, client, contentType, httpCode, respSize, reqStart)
127+
// MonitorRequest handles standard transformations for client and the reported verb and then invokes Monitor to record
128+
// a request. verb must be uppercase to be backwards compatible with existing monitoring tooling.
129+
func MonitorRequest(req *http.Request, verb, resource, subresource, scope, contentType string, httpCode, respSize int, elapsed time.Duration) {
130+
reportedVerb := cleanVerb(verb, req)
131+
client := cleanUserAgent(utilnet.GetHTTPClient(req))
132+
elapsedMicroseconds := float64(elapsed / time.Microsecond)
133+
requestCounter.WithLabelValues(reportedVerb, resource, subresource, scope, client, contentType, codeToString(httpCode)).Inc()
134+
requestLatencies.WithLabelValues(reportedVerb, resource, subresource, scope).Observe(elapsedMicroseconds)
135+
requestLatenciesSummary.WithLabelValues(reportedVerb, resource, subresource, scope).Observe(elapsedMicroseconds)
136+
// We are only interested in response sizes of read requests.
137+
if verb == "GET" || verb == "LIST" {
138+
responseSizes.WithLabelValues(reportedVerb, resource, subresource, scope).Observe(float64(respSize))
139+
}
117140
}
118141

119142
func Reset() {
@@ -144,10 +167,41 @@ func InstrumentRouteFunc(verb, resource, subresource, scope string, routeFunc re
144167

145168
routeFunc(request, response)
146169

147-
MonitorRequest(request.Request, verb, resource, subresource, scope, delegate.Header().Get("Content-Type"), delegate.Status(), delegate.ContentLength(), now)
170+
MonitorRequest(request.Request, verb, resource, subresource, scope, delegate.Header().Get("Content-Type"), delegate.Status(), delegate.ContentLength(), time.Now().Sub(now))
148171
})
149172
}
150173

174+
func cleanScope(requestInfo *request.RequestInfo) string {
175+
if requestInfo.Namespace != "" {
176+
return "namespace"
177+
}
178+
if requestInfo.Name != "" {
179+
return "resource"
180+
}
181+
if requestInfo.IsResourceRequest {
182+
return "cluster"
183+
}
184+
// this is the empty scope
185+
return ""
186+
}
187+
188+
func cleanVerb(verb string, request *http.Request) string {
189+
reportedVerb := verb
190+
if verb == "LIST" {
191+
// see apimachinery/pkg/runtime/conversion.go Convert_Slice_string_To_bool
192+
if values := request.URL.Query()["watch"]; len(values) > 0 {
193+
if value := strings.ToLower(values[0]); value != "0" && value != "false" {
194+
reportedVerb = "WATCH"
195+
}
196+
}
197+
}
198+
// normalize the legacy WATCHLIST to WATCH to ensure users aren't surprised by metrics
199+
if verb == "WATCHLIST" {
200+
reportedVerb = "WATCH"
201+
}
202+
return reportedVerb
203+
}
204+
151205
func cleanUserAgent(ua string) string {
152206
// We collapse all "web browser"-type user agents into one "browser" to reduce metric cardinality.
153207
if strings.HasPrefix(ua, "Mozilla/") {

pkg/server/filters/maxinflight.go

+1-14
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@ package filters
1919
import (
2020
"fmt"
2121
"net/http"
22-
"strings"
23-
"time"
2422

2523
"k8s.io/apimachinery/pkg/util/sets"
2624
"k8s.io/apiserver/pkg/authentication/user"
@@ -108,18 +106,7 @@ func WithMaxInFlightLimit(
108106
}
109107
}
110108
}
111-
scope := "cluster"
112-
if requestInfo.Namespace != "" {
113-
scope = "namespace"
114-
}
115-
if requestInfo.Name != "" {
116-
scope = "resource"
117-
}
118-
if requestInfo.IsResourceRequest {
119-
metrics.MonitorRequest(r, strings.ToUpper(requestInfo.Verb), requestInfo.Resource, requestInfo.Subresource, "", scope, http.StatusTooManyRequests, 0, time.Now())
120-
} else {
121-
metrics.MonitorRequest(r, strings.ToUpper(requestInfo.Verb), "", requestInfo.Path, "", scope, http.StatusTooManyRequests, 0, time.Now())
122-
}
109+
metrics.Record(r, requestInfo, "", http.StatusTooManyRequests, 0, 0)
123110
tooManyRequests(r, w)
124111
}
125112
}

0 commit comments

Comments
 (0)