Skip to content

Commit 7670970

Browse files
tkashembertinatto
authored andcommitted
UPSTREAM: 115328: annotate early and late requests
UPSTREAM: <carry>: add shutdown annotation to response header If it is useful we will combine this with the following carry: 20caad9: UPSTREAM: 115328: annotate early and late requests UPSTREAM: <carry>: add conditional shutdown response header UPSTREAM: <carry>: Fix tests after backport When backporting some changes from upstream's master branch a unit test ended up being broken. The code covered by this test does not seem to exist upstream, neither the test file. The code this commit fixes was introduced by 57c60d8 and we should squash this one with it once the time for rebasing arrives. Even though 57c60d8 description reads "UPSTREAM: 115328: annotate early and late requests" the upstream PR 115328 was closed without merging.
1 parent 092a4a6 commit 7670970

File tree

5 files changed

+653
-12
lines changed

5 files changed

+653
-12
lines changed

staging/src/k8s.io/apiserver/pkg/server/config.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1095,6 +1095,8 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
10951095
handler = genericapifilters.WithAudit(handler, c.AuditBackend, c.AuditPolicyRuleEvaluator, c.LongRunningFunc)
10961096
handler = filterlatency.TrackStarted(handler, c.TracerProvider, "audit")
10971097

1098+
handler = genericfilters.WithStartupEarlyAnnotation(handler, c.lifecycleSignals.HasBeenReady)
1099+
10981100
failedHandler := genericapifilters.Unauthorized(c.Serializer)
10991101
failedHandler = genericapifilters.WithFailedAuthenticationAudit(failedHandler, c.AuditBackend, c.AuditPolicyRuleEvaluator)
11001102

@@ -1135,6 +1137,7 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
11351137
handler = genericfilters.WithRetryAfter(handler, c.lifecycleSignals.NotAcceptingNewRequest.Signaled())
11361138
}
11371139
handler = genericfilters.WithOptInRetryAfter(handler, c.newServerFullyInitializedFunc())
1140+
handler = genericfilters.WithShutdownResponseHeader(handler, c.lifecycleSignals.ShutdownInitiated, c.ShutdownDelayDuration, c.APIServerID)
11381141
handler = genericfilters.WithHTTPLogging(handler)
11391142
handler = genericapifilters.WithLatencyTrackers(handler)
11401143
// WithRoutine will execute future handlers in a separate goroutine and serving
Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
/*
2+
Copyright 2023 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package filters
18+
19+
import (
20+
"fmt"
21+
"net"
22+
"net/http"
23+
"strings"
24+
"time"
25+
26+
"k8s.io/apiserver/pkg/audit"
27+
"k8s.io/apiserver/pkg/authentication/user"
28+
"k8s.io/apiserver/pkg/endpoints/request"
29+
clockutils "k8s.io/utils/clock"
30+
netutils "k8s.io/utils/net"
31+
)
32+
33+
type lifecycleEvent interface {
34+
// Name returns the name of the signal, useful for logging.
35+
Name() string
36+
37+
// Signaled returns a channel that is closed when the underlying event
38+
// has been signaled. Successive calls to Signaled return the same value.
39+
Signaled() <-chan struct{}
40+
41+
// SignaledAt returns the time the event was signaled. If SignaledAt is
42+
// invoked before the event is signaled nil will be returned.
43+
SignaledAt() *time.Time
44+
}
45+
46+
type shouldExemptFunc func(*http.Request) bool
47+
48+
var (
49+
// the health probes are not annotated by default
50+
healthProbes = []string{
51+
"/readyz",
52+
"/healthz",
53+
"/livez",
54+
}
55+
)
56+
57+
func exemptIfHealthProbe(r *http.Request) bool {
58+
path := "/" + strings.TrimLeft(r.URL.Path, "/")
59+
for _, probe := range healthProbes {
60+
if path == probe {
61+
return true
62+
}
63+
}
64+
return false
65+
}
66+
67+
// WithShutdownResponseHeader, if added to the handler chain, adds a header
68+
// 'X-OpenShift-Disruption' to the response with the following information:
69+
//
70+
// shutdown={true|false} shutdown-delay-duration=%s elapsed=%s host=%s
71+
// shutdown: whether the server is currently shutting down gracefully.
72+
// shutdown-delay-duration: value of --shutdown-delay-duration server run option
73+
// elapsed: how much time has elapsed since the server received a TERM signal
74+
// host: host name of the server, it is used to identify the server instance
75+
// from the others.
76+
//
77+
// This handler will add the response header only if the client opts in by
78+
// adding the 'X-Openshift-If-Disruption' header to the request.
79+
func WithShutdownResponseHeader(handler http.Handler, shutdownInitiated lifecycleEvent, delayDuration time.Duration, apiServerID string) http.Handler {
80+
return withShutdownResponseHeader(handler, shutdownInitiated, delayDuration, apiServerID, clockutils.RealClock{})
81+
}
82+
83+
// WithStartupEarlyAnnotation annotates the request with an annotation keyed as
84+
// 'apiserver.k8s.io/startup' if the request arrives early (the server is not
85+
// fully initialized yet). It should be placed after (in order of execution)
86+
// the 'WithAuthentication' filter.
87+
func WithStartupEarlyAnnotation(handler http.Handler, hasBeenReady lifecycleEvent) http.Handler {
88+
return withStartupEarlyAnnotation(handler, hasBeenReady, exemptIfHealthProbe)
89+
}
90+
91+
func withShutdownResponseHeader(handler http.Handler, shutdownInitiated lifecycleEvent, delayDuration time.Duration, apiServerID string, clock clockutils.PassiveClock) http.Handler {
92+
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
93+
if len(req.Header.Get("X-Openshift-If-Disruption")) == 0 {
94+
handler.ServeHTTP(w, req)
95+
return
96+
}
97+
98+
msgFn := func(shutdown bool, elapsed time.Duration) string {
99+
return fmt.Sprintf("shutdown=%t shutdown-delay-duration=%s elapsed=%s host=%s",
100+
shutdown, delayDuration.Round(time.Second).String(), elapsed.Round(time.Second).String(), apiServerID)
101+
}
102+
103+
select {
104+
case <-shutdownInitiated.Signaled():
105+
default:
106+
w.Header().Set("X-OpenShift-Disruption", msgFn(false, time.Duration(0)))
107+
handler.ServeHTTP(w, req)
108+
return
109+
}
110+
111+
shutdownInitiatedAt := shutdownInitiated.SignaledAt()
112+
if shutdownInitiatedAt == nil {
113+
w.Header().Set("X-OpenShift-Disruption", msgFn(true, time.Duration(0)))
114+
handler.ServeHTTP(w, req)
115+
return
116+
}
117+
118+
w.Header().Set("X-OpenShift-Disruption", msgFn(true, clock.Since(*shutdownInitiatedAt)))
119+
handler.ServeHTTP(w, req)
120+
})
121+
}
122+
123+
func withStartupEarlyAnnotation(handler http.Handler, hasBeenReady lifecycleEvent, shouldExemptFn shouldExemptFunc) http.Handler {
124+
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
125+
select {
126+
case <-hasBeenReady.Signaled():
127+
handler.ServeHTTP(w, req)
128+
return
129+
default:
130+
}
131+
132+
// NOTE: some upstream unit tests have authentication disabled and will
133+
// fail if we require the requestor to be present in the request
134+
// context. Fixing those unit tests will increase the chance of merge
135+
// conflict during rebase.
136+
// This also implies that this filter must be placed after (in order of
137+
// execution) the 'WithAuthentication' filter.
138+
self := "self="
139+
if requestor, exists := request.UserFrom(req.Context()); exists && requestor != nil {
140+
if requestor.GetName() == user.APIServerUser {
141+
handler.ServeHTTP(w, req)
142+
return
143+
}
144+
self = fmt.Sprintf("%s%t", self, false)
145+
}
146+
147+
audit.AddAuditAnnotation(req.Context(), "apiserver.k8s.io/startup",
148+
fmt.Sprintf("early=true %s loopback=%t", self, isLoopback(req.RemoteAddr)))
149+
150+
handler.ServeHTTP(w, req)
151+
})
152+
}
153+
154+
func isLoopback(address string) bool {
155+
host, _, err := net.SplitHostPort(address)
156+
if err != nil {
157+
// if the address is missing a port, SplitHostPort will return an error
158+
// with an empty host, and port value. For such an error, we should
159+
// continue and try to parse the original address.
160+
host = address
161+
}
162+
if ip := netutils.ParseIPSloppy(host); ip != nil {
163+
return ip.IsLoopback()
164+
}
165+
166+
return false
167+
}

0 commit comments

Comments
 (0)