From 0e6de313a98fc5569f9f697a6e45ced70375f77c Mon Sep 17 00:00:00 2001 From: Abu Kashem Date: Fri, 7 May 2021 12:47:35 -0400 Subject: [PATCH 1/2] UPSTREAM: 100523: refactor rest.FinishRequest function --- .../pkg/endpoints/handlers/create.go | 3 +- .../pkg/endpoints/handlers/delete.go | 5 +- .../endpoints/handlers/finisher/finisher.go | 109 ++++++++++++ .../handlers/finisher/finisher_test.go | 158 ++++++++++++++++++ .../apiserver/pkg/endpoints/handlers/patch.go | 3 +- .../apiserver/pkg/endpoints/handlers/rest.go | 55 ------ .../pkg/endpoints/handlers/rest_test.go | 118 ------------- .../pkg/endpoints/handlers/update.go | 3 +- 8 files changed, 276 insertions(+), 178 deletions(-) create mode 100644 staging/src/k8s.io/apiserver/pkg/endpoints/handlers/finisher/finisher.go create mode 100644 staging/src/k8s.io/apiserver/pkg/endpoints/handlers/finisher/finisher_test.go diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/create.go b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/create.go index b2e167f26fd17..79fc5afabd01d 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/create.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/create.go @@ -36,6 +36,7 @@ import ( "k8s.io/apiserver/pkg/admission" "k8s.io/apiserver/pkg/audit" "k8s.io/apiserver/pkg/endpoints/handlers/fieldmanager" + "k8s.io/apiserver/pkg/endpoints/handlers/finisher" "k8s.io/apiserver/pkg/endpoints/handlers/negotiation" "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/features" @@ -157,7 +158,7 @@ func createHandler(r rest.NamedCreater, scope *RequestScope, admit admission.Int } // Dedup owner references before updating managed fields dedupOwnerReferencesAndAddWarning(obj, req.Context(), false) - result, err := finishRequest(ctx, func() (runtime.Object, error) { + result, err := finisher.FinishRequest(ctx, func() (runtime.Object, error) { if scope.FieldManager != nil { liveObj, err := scope.Creater.New(scope.Kind) if err != nil { diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/delete.go b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/delete.go index 545ed897c2bfb..c1a1fc987eee6 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/delete.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/delete.go @@ -32,6 +32,7 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apiserver/pkg/admission" "k8s.io/apiserver/pkg/audit" + "k8s.io/apiserver/pkg/endpoints/handlers/finisher" "k8s.io/apiserver/pkg/endpoints/handlers/negotiation" "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/features" @@ -124,7 +125,7 @@ func DeleteResource(r rest.GracefulDeleter, allowsOptions bool, scope *RequestSc wasDeleted := true userInfo, _ := request.UserFrom(ctx) staticAdmissionAttrs := admission.NewAttributesRecord(nil, nil, scope.Kind, namespace, name, scope.Resource, scope.Subresource, admission.Delete, options, dryrun.IsDryRun(options.DryRun), userInfo) - result, err := finishRequest(ctx, func() (runtime.Object, error) { + result, err := finisher.FinishRequest(ctx, func() (runtime.Object, error) { obj, deleted, err := r.Delete(ctx, name, rest.AdmissionToValidateObjectDeleteFunc(admit, staticAdmissionAttrs, scope), options) wasDeleted = deleted return obj, err @@ -267,7 +268,7 @@ func DeleteCollection(r rest.CollectionDeleter, checkBody bool, scope *RequestSc admit = admission.WithAudit(admit, ae) userInfo, _ := request.UserFrom(ctx) staticAdmissionAttrs := admission.NewAttributesRecord(nil, nil, scope.Kind, namespace, "", scope.Resource, scope.Subresource, admission.Delete, options, dryrun.IsDryRun(options.DryRun), userInfo) - result, err := finishRequest(ctx, func() (runtime.Object, error) { + result, err := finisher.FinishRequest(ctx, func() (runtime.Object, error) { return r.DeleteCollection(ctx, rest.AdmissionToValidateObjectDeleteFunc(admit, staticAdmissionAttrs, scope), options, &listOptions) }) if err != nil { diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/finisher/finisher.go b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/finisher/finisher.go new file mode 100644 index 0000000000000..42b81c7f70340 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/finisher/finisher.go @@ -0,0 +1,109 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package finisher + +import ( + "context" + "fmt" + "net/http" + goruntime "runtime" + + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" +) + +// ResultFunc is a function that returns a rest result and can be run in a goroutine +type ResultFunc func() (runtime.Object, error) + +// result stores the return values or panic from a ResultFunc function +type result struct { + // object stores the response returned by the ResultFunc function + object runtime.Object + // err stores the error returned by the ResultFunc function + err error + // reason stores the reason from a panic thrown by the ResultFunc function + reason interface{} +} + +// Return processes the result returned by a ResultFunc function +func (r *result) Return() (runtime.Object, error) { + switch { + case r.reason != nil: + // panic has higher precedence, the goroutine executing ResultFunc has panic'd, + // so propagate a panic to the caller. + panic(r.reason) + case r.err != nil: + return nil, r.err + default: + // if we are here, it means neither a panic, nor an error + if status, ok := r.object.(*metav1.Status); ok { + // An api.Status object with status != success is considered an "error", + // which interrupts the normal response flow. + if status.Status != metav1.StatusSuccess { + return nil, errors.FromObject(status) + } + } + return r.object, nil + } +} + +// FinishRequest makes a given ResultFunc asynchronous and handles errors returned by the response. +func FinishRequest(ctx context.Context, fn ResultFunc) (runtime.Object, error) { + // the channel needs to be buffered to prevent the goroutine below from hanging indefinitely + // when the select statement reads something other than the one the goroutine sends on. + resultCh := make(chan *result, 1) + + go func() { + result := &result{} + + // panics don't cross goroutine boundaries, so we have to handle ourselves + defer func() { + reason := recover() + if reason != nil { + // do not wrap the sentinel ErrAbortHandler panic value + if reason != http.ErrAbortHandler { + // Same as stdlib http server code. Manually allocate stack + // trace buffer size to prevent excessively large logs + const size = 64 << 10 + buf := make([]byte, size) + buf = buf[:goruntime.Stack(buf, false)] + reason = fmt.Sprintf("%v\n%s", reason, buf) + } + + // store the panic reason into the result. + result.reason = reason + } + + // Propagate the result to the parent goroutine + resultCh <- result + }() + + if object, err := fn(); err != nil { + result.err = err + } else { + result.object = object + } + }() + + select { + case result := <-resultCh: + return result.Return() + case <-ctx.Done(): + return nil, errors.NewTimeoutError(fmt.Sprintf("request did not complete within requested timeout %s", ctx.Err()), 0) + } +} diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/finisher/finisher_test.go b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/finisher/finisher_test.go new file mode 100644 index 0000000000000..f4e667aaf6c47 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/finisher/finisher_test.go @@ -0,0 +1,158 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package finisher + +import ( + "context" + "fmt" + "net/http" + "reflect" + "strings" + "testing" + "time" + + apiequality "k8s.io/apimachinery/pkg/api/equality" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apiserver/pkg/apis/example" +) + +func TestFinishRequest(t *testing.T) { + exampleObj := &example.Pod{} + exampleErr := fmt.Errorf("error") + successStatusObj := &metav1.Status{Status: metav1.StatusSuccess, Message: "success message"} + errorStatusObj := &metav1.Status{Status: metav1.StatusFailure, Message: "error message"} + timeoutFunc := func() (context.Context, context.CancelFunc) { + return context.WithTimeout(context.TODO(), time.Second) + } + + testcases := []struct { + name string + timeout func() (context.Context, context.CancelFunc) + fn ResultFunc + expectedObj runtime.Object + expectedErr error + expectedPanic string + + expectedPanicObj interface{} + }{ + { + name: "Expected obj is returned", + timeout: timeoutFunc, + fn: func() (runtime.Object, error) { + return exampleObj, nil + }, + expectedObj: exampleObj, + expectedErr: nil, + }, + { + name: "Expected error is returned", + timeout: timeoutFunc, + fn: func() (runtime.Object, error) { + return nil, exampleErr + }, + expectedObj: nil, + expectedErr: exampleErr, + }, + { + name: "No expected error or object or panic", + timeout: timeoutFunc, + fn: func() (runtime.Object, error) { + return nil, nil + }, + }, + { + name: "Successful status object is returned as expected", + timeout: timeoutFunc, + fn: func() (runtime.Object, error) { + return successStatusObj, nil + }, + expectedObj: successStatusObj, + expectedErr: nil, + }, + { + name: "Error status object is converted to StatusError", + timeout: timeoutFunc, + fn: func() (runtime.Object, error) { + return errorStatusObj, nil + }, + expectedObj: nil, + expectedErr: apierrors.FromObject(errorStatusObj), + }, + { + name: "Panic is propagated up", + timeout: timeoutFunc, + fn: func() (runtime.Object, error) { + panic("my panic") + }, + expectedObj: nil, + expectedErr: nil, + expectedPanic: "my panic", + }, + { + name: "Panic is propagated with stack", + timeout: timeoutFunc, + fn: func() (runtime.Object, error) { + panic("my panic") + }, + expectedObj: nil, + expectedErr: nil, + expectedPanic: "finisher_test.go", + }, + { + name: "http.ErrAbortHandler panic is propagated without wrapping with stack", + timeout: timeoutFunc, + fn: func() (runtime.Object, error) { + panic(http.ErrAbortHandler) + }, + expectedObj: nil, + expectedErr: nil, + expectedPanic: http.ErrAbortHandler.Error(), + expectedPanicObj: http.ErrAbortHandler, + }, + } + for i, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + ctx, cancel := tc.timeout() + defer func() { + cancel() + + r := recover() + switch { + case r == nil && len(tc.expectedPanic) > 0: + t.Errorf("expected panic containing '%s', got none", tc.expectedPanic) + case r != nil && len(tc.expectedPanic) == 0: + t.Errorf("unexpected panic: %v", r) + case r != nil && len(tc.expectedPanic) > 0 && !strings.Contains(fmt.Sprintf("%v", r), tc.expectedPanic): + t.Errorf("expected panic containing '%s', got '%v'", tc.expectedPanic, r) + } + + if tc.expectedPanicObj != nil && !reflect.DeepEqual(tc.expectedPanicObj, r) { + t.Errorf("expected panic obj %#v, got %#v", tc.expectedPanicObj, r) + } + }() + obj, err := FinishRequest(ctx, tc.fn) + if (err == nil && tc.expectedErr != nil) || (err != nil && tc.expectedErr == nil) || (err != nil && tc.expectedErr != nil && err.Error() != tc.expectedErr.Error()) { + t.Errorf("%d: unexpected err. expected: %v, got: %v", i, tc.expectedErr, err) + } + if !apiequality.Semantic.DeepEqual(obj, tc.expectedObj) { + t.Errorf("%d: unexpected obj. expected %#v, got %#v", i, tc.expectedObj, obj) + } + }) + } +} diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/patch.go b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/patch.go index 903307fc28520..1abcccaa17000 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/patch.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/patch.go @@ -43,6 +43,7 @@ import ( "k8s.io/apiserver/pkg/audit" "k8s.io/apiserver/pkg/authorization/authorizer" "k8s.io/apiserver/pkg/endpoints/handlers/fieldmanager" + "k8s.io/apiserver/pkg/endpoints/handlers/finisher" "k8s.io/apiserver/pkg/endpoints/handlers/negotiation" "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/features" @@ -590,7 +591,7 @@ func (p *patcher) patchResource(ctx context.Context, scope *RequestScope) (runti wasCreated = created return updateObject, updateErr } - result, err := finishRequest(ctx, func() (runtime.Object, error) { + result, err := finisher.FinishRequest(ctx, func() (runtime.Object, error) { result, err := requestFunc() // If the object wasn't committed to storage because it's serialized size was too large, // it is safe to remove managedFields (which can be large) and try again. diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/rest.go b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/rest.go index 783ab96b9b981..f618b3867d6d4 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/rest.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/rest.go @@ -24,7 +24,6 @@ import ( "io/ioutil" "net/http" "net/url" - goruntime "runtime" "strings" "time" @@ -225,60 +224,6 @@ func (r *responder) Error(err error) { r.scope.err(err, r.w, r.req) } -// resultFunc is a function that returns a rest result and can be run in a goroutine -type resultFunc func() (runtime.Object, error) - -// finishRequest makes a given resultFunc asynchronous and handles errors returned by the response. -// An api.Status object with status != success is considered an "error", which interrupts the normal response flow. -func finishRequest(ctx context.Context, fn resultFunc) (result runtime.Object, err error) { - // these channels need to be buffered to prevent the goroutine below from hanging indefinitely - // when the select statement reads something other than the one the goroutine sends on. - ch := make(chan runtime.Object, 1) - errCh := make(chan error, 1) - panicCh := make(chan interface{}, 1) - go func() { - // panics don't cross goroutine boundaries, so we have to handle ourselves - defer func() { - panicReason := recover() - if panicReason != nil { - // do not wrap the sentinel ErrAbortHandler panic value - if panicReason != http.ErrAbortHandler { - // Same as stdlib http server code. Manually allocate stack - // trace buffer size to prevent excessively large logs - const size = 64 << 10 - buf := make([]byte, size) - buf = buf[:goruntime.Stack(buf, false)] - panicReason = fmt.Sprintf("%v\n%s", panicReason, buf) - } - // Propagate to parent goroutine - panicCh <- panicReason - } - }() - - if result, err := fn(); err != nil { - errCh <- err - } else { - ch <- result - } - }() - - select { - case result = <-ch: - if status, ok := result.(*metav1.Status); ok { - if status.Status != metav1.StatusSuccess { - return nil, errors.FromObject(status) - } - } - return result, nil - case err = <-errCh: - return nil, err - case p := <-panicCh: - panic(p) - case <-ctx.Done(): - return nil, errors.NewTimeoutError(fmt.Sprintf("request did not complete within requested timeout %s", ctx.Err()), 0) - } -} - // transformDecodeError adds additional information into a bad-request api error when a decode fails. func transformDecodeError(typer runtime.ObjectTyper, baseErr error, into runtime.Object, gvk *schema.GroupVersionKind, body []byte) error { objGVKs, _, err := typer.ObjectKinds(into) diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/rest_test.go b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/rest_test.go index 5bd34f004f0fa..a2e27076e5241 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/rest_test.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/rest_test.go @@ -826,124 +826,6 @@ func TestHasUID(t *testing.T) { } } -func TestFinishRequest(t *testing.T) { - exampleObj := &example.Pod{} - exampleErr := fmt.Errorf("error") - successStatusObj := &metav1.Status{Status: metav1.StatusSuccess, Message: "success message"} - errorStatusObj := &metav1.Status{Status: metav1.StatusFailure, Message: "error message"} - timeoutFunc := func() (context.Context, context.CancelFunc) { - return context.WithTimeout(context.TODO(), time.Second) - } - - testcases := []struct { - name string - timeout func() (context.Context, context.CancelFunc) - fn resultFunc - expectedObj runtime.Object - expectedErr error - expectedPanic string - - expectedPanicObj interface{} - }{ - { - name: "Expected obj is returned", - timeout: timeoutFunc, - fn: func() (runtime.Object, error) { - return exampleObj, nil - }, - expectedObj: exampleObj, - expectedErr: nil, - }, - { - name: "Expected error is returned", - timeout: timeoutFunc, - fn: func() (runtime.Object, error) { - return nil, exampleErr - }, - expectedObj: nil, - expectedErr: exampleErr, - }, - { - name: "Successful status object is returned as expected", - timeout: timeoutFunc, - fn: func() (runtime.Object, error) { - return successStatusObj, nil - }, - expectedObj: successStatusObj, - expectedErr: nil, - }, - { - name: "Error status object is converted to StatusError", - timeout: timeoutFunc, - fn: func() (runtime.Object, error) { - return errorStatusObj, nil - }, - expectedObj: nil, - expectedErr: apierrors.FromObject(errorStatusObj), - }, - { - name: "Panic is propagated up", - timeout: timeoutFunc, - fn: func() (runtime.Object, error) { - panic("my panic") - }, - expectedObj: nil, - expectedErr: nil, - expectedPanic: "my panic", - }, - { - name: "Panic is propagated with stack", - timeout: timeoutFunc, - fn: func() (runtime.Object, error) { - panic("my panic") - }, - expectedObj: nil, - expectedErr: nil, - expectedPanic: "rest_test.go", - }, - { - name: "http.ErrAbortHandler panic is propagated without wrapping with stack", - timeout: timeoutFunc, - fn: func() (runtime.Object, error) { - panic(http.ErrAbortHandler) - }, - expectedObj: nil, - expectedErr: nil, - expectedPanic: http.ErrAbortHandler.Error(), - expectedPanicObj: http.ErrAbortHandler, - }, - } - for i, tc := range testcases { - t.Run(tc.name, func(t *testing.T) { - ctx, cancel := tc.timeout() - defer func() { - cancel() - - r := recover() - switch { - case r == nil && len(tc.expectedPanic) > 0: - t.Errorf("expected panic containing '%s', got none", tc.expectedPanic) - case r != nil && len(tc.expectedPanic) == 0: - t.Errorf("unexpected panic: %v", r) - case r != nil && len(tc.expectedPanic) > 0 && !strings.Contains(fmt.Sprintf("%v", r), tc.expectedPanic): - t.Errorf("expected panic containing '%s', got '%v'", tc.expectedPanic, r) - } - - if tc.expectedPanicObj != nil && !reflect.DeepEqual(tc.expectedPanicObj, r) { - t.Errorf("expected panic obj %#v, got %#v", tc.expectedPanicObj, r) - } - }() - obj, err := finishRequest(ctx, tc.fn) - if (err == nil && tc.expectedErr != nil) || (err != nil && tc.expectedErr == nil) || (err != nil && tc.expectedErr != nil && err.Error() != tc.expectedErr.Error()) { - t.Errorf("%d: unexpected err. expected: %v, got: %v", i, tc.expectedErr, err) - } - if !apiequality.Semantic.DeepEqual(obj, tc.expectedObj) { - t.Errorf("%d: unexpected obj. expected %#v, got %#v", i, tc.expectedObj, obj) - } - }) - } -} - func setTcPod(tcPod *example.Pod, name string, namespace string, uid types.UID, resourceVersion string, apiVersion string, activeDeadlineSeconds *int64, nodeName string) { tcPod.Name = name tcPod.Namespace = namespace diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/update.go b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/update.go index 57daefd9cf1ec..b66b57a8b62aa 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/update.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/update.go @@ -34,6 +34,7 @@ import ( "k8s.io/apiserver/pkg/audit" "k8s.io/apiserver/pkg/authorization/authorizer" "k8s.io/apiserver/pkg/endpoints/handlers/fieldmanager" + "k8s.io/apiserver/pkg/endpoints/handlers/finisher" "k8s.io/apiserver/pkg/endpoints/handlers/negotiation" "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/features" @@ -198,7 +199,7 @@ func UpdateResource(r rest.Updater, scope *RequestScope, admit admission.Interfa } // Dedup owner references before updating managed fields dedupOwnerReferencesAndAddWarning(obj, req.Context(), false) - result, err := finishRequest(ctx, func() (runtime.Object, error) { + result, err := finisher.FinishRequest(ctx, func() (runtime.Object, error) { result, err := requestFunc() // If the object wasn't committed to storage because it's serialized size was too large, // it is safe to remove managedFields (which can be large) and try again. From 5a556a1e846587e4c09872062650457bf262e179 Mon Sep 17 00:00:00 2001 From: Abu Kashem Date: Fri, 7 May 2021 12:54:27 -0400 Subject: [PATCH 2/2] UPSTREAM: 97428: add more context to post timeout request activities --- .../endpoints/handlers/finisher/finisher.go | 73 ++++++++++- .../handlers/finisher/finisher_test.go | 120 ++++++++++++++++++ .../pkg/endpoints/metrics/metrics.go | 55 ++++++++ .../apiserver/pkg/server/filters/timeout.go | 17 ++- .../pkg/server/filters/timeout_test.go | 26 +++- 5 files changed, 278 insertions(+), 13 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/finisher/finisher.go b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/finisher/finisher.go index 42b81c7f70340..dd7651718b90e 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/finisher/finisher.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/finisher/finisher.go @@ -21,10 +21,14 @@ import ( "fmt" "net/http" goruntime "runtime" + "time" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apiserver/pkg/endpoints/metrics" + "k8s.io/klog/v2" ) // ResultFunc is a function that returns a rest result and can be run in a goroutine @@ -62,10 +66,27 @@ func (r *result) Return() (runtime.Object, error) { } } +// PostTimeoutLoggerFunc is a function that can be used to log the result returned +// by a ResultFunc after the request had timed out. +// timedOutAt is the time the request had been timed out. +// r is the result returned by the child goroutine. +type PostTimeoutLoggerFunc func(timedOutAt time.Time, r *result) + +const ( + // how much time the post-timeout receiver goroutine will wait for the sender + // (child goroutine executing ResultFunc) to send a result after the request. + // had timed out. + postTimeoutLoggerWait = 5 * time.Minute +) + // FinishRequest makes a given ResultFunc asynchronous and handles errors returned by the response. func FinishRequest(ctx context.Context, fn ResultFunc) (runtime.Object, error) { - // the channel needs to be buffered to prevent the goroutine below from hanging indefinitely - // when the select statement reads something other than the one the goroutine sends on. + return finishRequest(ctx, fn, postTimeoutLoggerWait, logPostTimeoutResult) +} + +func finishRequest(ctx context.Context, fn ResultFunc, postTimeoutWait time.Duration, postTimeoutLogger PostTimeoutLoggerFunc) (runtime.Object, error) { + // the channel needs to be buffered since the post-timeout receiver goroutine + // waits up to 5 minutes for the child goroutine to return. resultCh := make(chan *result, 1) go func() { @@ -104,6 +125,52 @@ func FinishRequest(ctx context.Context, fn ResultFunc) (runtime.Object, error) { case result := <-resultCh: return result.Return() case <-ctx.Done(): - return nil, errors.NewTimeoutError(fmt.Sprintf("request did not complete within requested timeout %s", ctx.Err()), 0) + // we are going to send a timeout response to the caller, but the asynchronous goroutine + // (sender) is still executing the ResultFunc function. + // kick off a goroutine (receiver) here to wait for the sender (goroutine executing ResultFunc) + // to send the result and then log details of the result. + defer func() { + go func() { + timedOutAt := time.Now() + + var result *result + select { + case result = <-resultCh: + case <-time.After(postTimeoutWait): + // we will not wait forever, if we are here then we know that some sender + // goroutines are taking longer than postTimeoutWait. + } + postTimeoutLogger(timedOutAt, result) + }() + }() + return nil, errors.NewTimeoutError(fmt.Sprintf("request did not complete within requested timeout - %s", ctx.Err()), 0) } } + +// logPostTimeoutResult logs a panic or an error from the result that the sender (goroutine that is +// executing the ResultFunc function) has sent to the receiver after the request had timed out. +// timedOutAt is the time the request had been timed out +func logPostTimeoutResult(timedOutAt time.Time, r *result) { + if r == nil { + // we are using r == nil to indicate that the child goroutine never returned a result. + metrics.RecordRequestPostTimeout(metrics.PostTimeoutSourceRestHandler, metrics.PostTimeoutHandlerPending) + klog.Errorf("FinishRequest: post-timeout activity, waited for %s, child goroutine has not returned yet", time.Since(timedOutAt)) + return + } + + var status string + switch { + case r.reason != nil: + // a non empty reason inside a result object indicates that there was a panic. + status = metrics.PostTimeoutHandlerPanic + case r.err != nil: + status = metrics.PostTimeoutHandlerError + default: + status = metrics.PostTimeoutHandlerOK + } + + metrics.RecordRequestPostTimeout(metrics.PostTimeoutSourceRestHandler, status) + err := fmt.Errorf("FinishRequest: post-timeout activity - time-elapsed: %s, panicked: %t, err: %v, panic-reason: %v", + time.Since(timedOutAt), r.reason != nil, r.err, r.reason) + utilruntime.HandleError(err) +} diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/finisher/finisher_test.go b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/finisher/finisher_test.go index f4e667aaf6c47..7da783489498e 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/finisher/finisher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/finisher/finisher_test.go @@ -18,6 +18,7 @@ package finisher import ( "context" + "errors" "fmt" "net/http" "reflect" @@ -30,6 +31,8 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apiserver/pkg/apis/example" + + "github.com/google/go-cmp/cmp" ) func TestFinishRequest(t *testing.T) { @@ -156,3 +159,120 @@ func TestFinishRequest(t *testing.T) { }) } } + +func TestFinishRequestWithPostTimeoutTracker(t *testing.T) { + tests := []struct { + name string + object runtime.Object + postTimeoutWait time.Duration + childGoroutineNeverReturns bool + err error + reason string + }{ + { + name: "ResultFunc function returns a result after the request had timed out", + object: &example.Pod{}, + postTimeoutWait: 5 * time.Minute, + }, + { + name: "ResultFunc function returns an error after the request had timed out", + err: errors.New("my error"), + postTimeoutWait: 5 * time.Minute, + }, + { + name: "ResultFunc function panics after the request had timed out", + reason: "my panic", + postTimeoutWait: 5 * time.Minute, + }, + { + name: "ResultFunc function never returns, parent gives up after postTimeoutWait", + postTimeoutWait: 1 * time.Second, + childGoroutineNeverReturns: true, + }, + } + + expectedTimeoutErr := apierrors.NewTimeoutError(fmt.Sprintf("request did not complete within requested timeout - %s", + context.DeadlineExceeded), 0) + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.TODO(), time.Millisecond) + defer cancel() + + timeoutAsDesignedCh, resultFuncDoneCh := make(chan struct{}), make(chan struct{}) + resultFn := func() (runtime.Object, error) { + defer func() { + if test.childGoroutineNeverReturns { + // sleep a bit more than test.postTimeoutWait so the + // post-timeout monitor gives up. + time.Sleep(test.postTimeoutWait + time.Second) + } + close(resultFuncDoneCh) + }() + + // it will block here + <-timeoutAsDesignedCh + + if len(test.reason) > 0 { + panic(test.reason) + } + if test.err != nil && test.object != nil { + t.Fatal("both result and err are set, wrong test setup") + } + + return test.object, test.err + } + + var resultGot *result + postTimeoutLoggerCompletedCh := make(chan struct{}) + decoratedPostTimeoutLogger := func(timedOutAt time.Time, r *result) { + defer func() { + resultGot = r + close(postTimeoutLoggerCompletedCh) + }() + + logPostTimeoutResult(timedOutAt, r) + } + + _, err := finishRequest(ctx, resultFn, test.postTimeoutWait, decoratedPostTimeoutLogger) + if err == nil || err.Error() != expectedTimeoutErr.Error() { + t.Errorf("expected timeout error: %v, but got: %v", expectedTimeoutErr, err) + } + + // the rest ResultFunc is still running, let's unblock it so it can complete + close(timeoutAsDesignedCh) + + t.Log("waiting for the ResultFunc rest function to finish") + <-resultFuncDoneCh + + t.Log("waiting for the post-timeout logger to return") + <-postTimeoutLoggerCompletedCh + + switch { + case test.childGoroutineNeverReturns && resultGot != nil: + t.Fatal("expected the result for the post-timeout logger to be nil") + case test.childGoroutineNeverReturns: + // resultGot is nil, nothing more to verify + return + case !test.childGoroutineNeverReturns && resultGot == nil: + t.Fatal("expected a result for the post-timeout logger, but got nil") + } + + if test.object != resultGot.object { + t.Errorf("expected object to match, diff: %s", cmp.Diff(test.object, resultGot.object)) + } + if test.err != resultGot.err { + t.Errorf("expected err: %v, but got: %v", test.err, resultGot.err) + } + + switch { + case len(test.reason) == 0: + if resultGot.reason != nil { + t.Errorf("unexpected panic: %v", resultGot.reason) + } + case !strings.Contains(fmt.Sprintf("%v", resultGot.reason), test.reason): + t.Errorf("expected panic to contain: %q, but got: %v", test.reason, resultGot.reason) + } + }) + } +} diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/metrics/metrics.go b/staging/src/k8s.io/apiserver/pkg/endpoints/metrics/metrics.go index a278566438ce6..92acf6381d402 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/metrics/metrics.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/metrics/metrics.go @@ -211,6 +211,26 @@ var ( []string{"verb", "group", "version", "resource", "subresource", "scope"}, ) + // requestPostTimeoutTotal tracks the activity of the executing request handler after the associated request + // has been timed out by the apiserver. + // source: the name of the handler that is recording this metric. Currently, we have two: + // - timeout-handler: the "executing" handler returns after the timeout filter times out the request. + // - rest-handler: the "executing" handler returns after the rest layer times out the request. + // status: whether the handler panicked or threw an error, possible values: + // - 'panic': the handler panicked + // - 'error': the handler return an error + // - 'ok': the handler returned a result (no error and no panic) + // - 'pending': the handler is still running in the background and it did not return + // within the wait threshold. + requestPostTimeoutTotal = compbasemetrics.NewCounterVec( + &compbasemetrics.CounterOpts{ + Name: "apiserver_request_post_timeout_total", + Help: "Tracks the activity of the request handlers after the associated requests have been timed out by the apiserver", + StabilityLevel: compbasemetrics.ALPHA, + }, + []string{"source", "status"}, + ) + metrics = []resettableCollector{ deprecatedRequestGauge, requestCounter, @@ -228,6 +248,7 @@ var ( apiSelfRequestCounter, requestFilterDuration, requestAbortsTotal, + requestPostTimeoutTotal, } // these are the valid request methods which we report in our metrics. Any other request methods @@ -271,6 +292,36 @@ const ( removedReleaseAnnotationKey = "k8s.io/removed-release" ) +const ( + // The source that is recording the apiserver_request_post_timeout_total metric. + // The "executing" request handler returns after the timeout filter times out the request. + PostTimeoutSourceTimeoutHandler = "timeout-handler" + + // The source that is recording the apiserver_request_post_timeout_total metric. + // The "executing" request handler returns after the rest layer times out the request. + PostTimeoutSourceRestHandler = "rest-handler" +) + +const ( + // The executing request handler panicked after the request had + // been timed out by the apiserver. + PostTimeoutHandlerPanic = "panic" + + // The executing request handler has returned an error to the post-timeout + // receiver after the request had been timed out by the apiserver. + PostTimeoutHandlerError = "error" + + // The executing request handler has returned a result to the post-timeout + // receiver after the request had been timed out by the apiserver. + PostTimeoutHandlerOK = "ok" + + // The executing request handler has not panicked or returned any error/result to + // the post-timeout receiver yet after the request had been timed out by the apiserver. + // The post-timeout receiver gives up after waiting for certain threshold and if the + // executing request handler has not returned yet we use the following label. + PostTimeoutHandlerPending = "pending" +) + var registerMetrics sync.Once // Register all metrics. @@ -308,6 +359,10 @@ func RecordFilterLatency(ctx context.Context, name string, elapsed time.Duration requestFilterDuration.WithContext(ctx).WithLabelValues(name).Observe(elapsed.Seconds()) } +func RecordRequestPostTimeout(source string, status string) { + requestPostTimeoutTotal.WithLabelValues(source, status).Inc() +} + // RecordRequestAbort records that the request was aborted possibly due to a timeout. func RecordRequestAbort(req *http.Request, requestInfo *request.RequestInfo) { if requestInfo == nil { diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/timeout.go b/staging/src/k8s.io/apiserver/pkg/server/filters/timeout.go index ccbed60dba5d3..69e4fd4f21cee 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/timeout.go +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/timeout.go @@ -24,6 +24,7 @@ import ( "net/http" "runtime" "sync" + "time" apierrors "k8s.io/apimachinery/pkg/api/errors" utilruntime "k8s.io/apimachinery/pkg/util/runtime" @@ -119,15 +120,19 @@ func (t *timeoutHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { // the work needs to send to it. This is defer'd to ensure it runs // ever if the post timeout work itself panics. go func() { + timedOutAt := time.Now() res := <-resultCh + + status := metrics.PostTimeoutHandlerOK if res != nil { - switch t := res.(type) { - case error: - utilruntime.HandleError(t) - default: - utilruntime.HandleError(fmt.Errorf("%v", res)) - } + // a non nil res indicates that there was a panic. + status = metrics.PostTimeoutHandlerPanic } + + metrics.RecordRequestPostTimeout(metrics.PostTimeoutSourceTimeoutHandler, status) + err := fmt.Errorf("post-timeout activity - time-elapsed: %s, %v %q result: %v", + time.Since(timedOutAt), r.Method, r.URL.Path, res) + utilruntime.HandleError(err) }() }() diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/timeout_test.go b/staging/src/k8s.io/apiserver/pkg/server/filters/timeout_test.go index 1cd243584137b..d3fbf0380880b 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/timeout_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/timeout_test.go @@ -260,8 +260,10 @@ func TestErrConnKilled(t *testing.T) { // we should only get one line for this, not the big stack from before capturedOutput := readStdErr() - if strings.Count(capturedOutput, "\n") != 1 { - t.Errorf("unexpected output captured actual = %v", capturedOutput) + + // We don't expect stack trace from the panic to be included in the log. + if isStackTraceLoggedByRuntime(capturedOutput) { + t.Errorf("unexpected stack trace in log, actual = %v", capturedOutput) } if !strings.Contains(capturedOutput, `timeout or abort while handling: method=GET URI="/" audit-ID=""`) { t.Errorf("unexpected output captured actual = %v", capturedOutput) @@ -350,8 +352,10 @@ func TestErrConnKilledHTTP2(t *testing.T) { // we should only get one line for this, not the big stack from before capturedOutput := readStdErr() - if strings.Count(capturedOutput, "\n") != 1 { - t.Errorf("unexpected output captured actual = %v", capturedOutput) + + // We don't expect stack trace from the panic to be included in the log. + if isStackTraceLoggedByRuntime(capturedOutput) { + t.Errorf("unexpected stack trace in log, actual = %v", capturedOutput) } if !strings.Contains(capturedOutput, `timeout or abort while handling: method=GET URI="/" audit-ID=""`) { t.Errorf("unexpected output captured actual = %v", capturedOutput) @@ -366,6 +370,20 @@ func TestErrConnKilledHTTP2(t *testing.T) { } } +func isStackTraceLoggedByRuntime(message string) bool { + // Check the captured output for the following patterns to find out if the + // stack trace is included in the log: + // - 'Observed a panic' (apimachinery runtime.go logs panic with this message) + // - 'goroutine 44 [running]:' (stack trace always starts with this) + if strings.Contains(message, "Observed a panic") && + strings.Contains(message, "goroutine") && + strings.Contains(message, "[running]:") { + return true + } + + return false +} + var tsCrt = []byte(`-----BEGIN CERTIFICATE----- MIIDTjCCAjagAwIBAgIJAJdcQEBN2CjoMA0GCSqGSIb3DQEBCwUAMFAxCzAJBgNV BAYTAlBMMQ8wDQYDVQQIDAZQb2xhbmQxDzANBgNVBAcMBkdkYW5zazELMAkGA1UE