Skip to content

Commit 9d82ffc

Browse files
committed
handle watch for unsafe delete
1 parent 35c2c86 commit 9d82ffc

File tree

8 files changed

+233
-12
lines changed

8 files changed

+233
-12
lines changed

staging/src/k8s.io/apiserver/pkg/storage/etcd3/errors.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,11 @@ limitations under the License.
1717
package etcd3
1818

1919
import (
20+
goerrors "errors"
21+
"net/http"
22+
2023
"k8s.io/apimachinery/pkg/api/errors"
24+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2125
"k8s.io/apiserver/pkg/storage"
2226

2327
etcdrpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
@@ -29,6 +33,19 @@ func interpretWatchError(err error) error {
2933
case err == etcdrpc.ErrCompacted:
3034
return errors.NewResourceExpired("The resourceVersion for the provided watch is too old.")
3135
}
36+
37+
var corruptobjDeletedErr *corruptObjectDeletedError
38+
if goerrors.As(err, &corruptobjDeletedErr) {
39+
return &errors.StatusError{
40+
ErrStatus: metav1.Status{
41+
Status: metav1.StatusFailure,
42+
Code: http.StatusInternalServerError,
43+
Reason: metav1.StatusReasonStoreReadError,
44+
Message: corruptobjDeletedErr.Error(),
45+
},
46+
}
47+
}
48+
3249
return err
3350
}
3451

staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,28 @@ func (s *storeWithPrefixTransformer) UpdatePrefixTransformer(modifier storagetes
194194
}
195195
}
196196

197+
type corruptedTransformer struct {
198+
value.Transformer
199+
}
200+
201+
func (f *corruptedTransformer) TransformFromStorage(ctx context.Context, data []byte, dataCtx value.Context) (out []byte, stale bool, err error) {
202+
return nil, true, &corruptObjectError{err: fmt.Errorf("bits flipped"), errType: untransformable}
203+
}
204+
205+
type storeWithCorruptedTransformer struct {
206+
*store
207+
}
208+
209+
func (s *storeWithCorruptedTransformer) CorruptTransformer() func() {
210+
ct := &corruptedTransformer{Transformer: s.transformer}
211+
s.transformer = ct
212+
s.watcher.transformer = ct
213+
return func() {
214+
s.transformer = ct.Transformer
215+
s.watcher.transformer = ct.Transformer
216+
}
217+
}
218+
197219
func TestGuaranteedUpdate(t *testing.T) {
198220
ctx, store, client := testSetup(t)
199221
storagetesting.RunTestGuaranteedUpdate(ctx, t, &storeWithPrefixTransformer{store}, checkStorageInvariants(client.Client, store.codec))

staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -686,18 +686,39 @@ func (wc *watchChan) prepareObjs(e *event) (curObj runtime.Object, oldObj runtim
686686
if len(e.prevValue) > 0 && (e.isDeleted || !wc.acceptAll()) {
687687
data, _, err := wc.watcher.transformer.TransformFromStorage(wc.ctx, e.prevValue, authenticatedDataString(e.key))
688688
if err != nil {
689-
return nil, nil, err
689+
return nil, nil, wc.watcher.transformIfCorruptObjectError(e, err)
690690
}
691691
// Note that this sends the *old* object with the etcd revision for the time at
692692
// which it gets deleted.
693693
oldObj, err = decodeObj(wc.watcher.codec, wc.watcher.versioner, data, e.rev)
694694
if err != nil {
695-
return nil, nil, err
695+
return nil, nil, wc.watcher.transformIfCorruptObjectError(e, err)
696696
}
697697
}
698698
return curObj, oldObj, nil
699699
}
700700

701+
type corruptObjectDeletedError struct {
702+
err error
703+
}
704+
705+
func (e *corruptObjectDeletedError) Error() string {
706+
return fmt.Sprintf("corrupt object has been deleted -%v", e.err.Error())
707+
}
708+
func (e *corruptObjectDeletedError) Unwrap() error { return e.err }
709+
710+
func (w *watcher) transformIfCorruptObjectError(e *event, err error) error {
711+
var corruptObjErr *corruptObjectError
712+
if !e.isDeleted || !errors.As(err, &corruptObjErr) {
713+
return err
714+
}
715+
716+
// if we are here it means we received a DELETED event but the object
717+
// associated with it was corrupt, since we don't have the object data
718+
// wrap the original error so we can include the partial object in the error chain
719+
return &corruptObjectDeletedError{err: corruptObjErr}
720+
}
721+
701722
func decodeObj(codec runtime.Codec, versioner storage.Versioner, data []byte, rev int64) (_ runtime.Object, err error) {
702723
obj, err := runtime.Decode(codec, []byte(data))
703724
if err != nil {

staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,11 @@ func TestProgressNotify(t *testing.T) {
112112
storagetesting.RunOptionalTestProgressNotify(ctx, t, store)
113113
}
114114

115+
func TestWatchWithUnsafeDelete(t *testing.T) {
116+
ctx, store, _ := testSetup(t)
117+
storagetesting.RunTestWatchWithUnsafeDelete(ctx, t, &storeWithCorruptedTransformer{store})
118+
}
119+
115120
// TestWatchDispatchBookmarkEvents makes sure that
116121
// setting allowWatchBookmarks query param against
117122
// etcd implementation doesn't have any effect.

staging/src/k8s.io/apiserver/pkg/storage/testing/store_tests.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2147,6 +2147,11 @@ type InterfaceWithPrefixTransformer interface {
21472147
UpdatePrefixTransformer(PrefixTransformerModifier) func()
21482148
}
21492149

2150+
type InterfaceWithCorruptTransformer interface {
2151+
storage.Interface
2152+
CorruptTransformer() func()
2153+
}
2154+
21502155
func RunTestListResourceVersionMatch(ctx context.Context, t *testing.T, store InterfaceWithPrefixTransformer) {
21512156
nextPod := func(index uint32) (string, *example.Pod) {
21522157
obj := &example.Pod{

staging/src/k8s.io/apiserver/pkg/storage/testing/watcher_tests.go

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"context"
2121
"fmt"
2222
"net/http"
23+
"strings"
2324
"sync"
2425
"testing"
2526
"time"
@@ -407,6 +408,61 @@ func RunTestWatchError(ctx context.Context, t *testing.T, store InterfaceWithPre
407408
testCheckEventType(t, w, watch.Error)
408409
}
409410

411+
func RunTestWatchWithUnsafeDelete(ctx context.Context, t *testing.T, store InterfaceWithCorruptTransformer) {
412+
obj := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "test-ns"}}
413+
key := computePodKey(obj)
414+
415+
out := &example.Pod{}
416+
if err := store.Create(ctx, key, obj, out, 0); err != nil {
417+
t.Fatalf("failed to create object in the store: %v", err)
418+
}
419+
420+
// Compute the initial resource version from which we can start watching later.
421+
list := &example.PodList{}
422+
storageOpts := storage.ListOptions{
423+
ResourceVersion: "0",
424+
Predicate: storage.Everything,
425+
Recursive: true,
426+
}
427+
if err := store.GetList(ctx, "/pods", storageOpts, list); err != nil {
428+
t.Errorf("Unexpected error: %v", err)
429+
}
430+
431+
// Now trigger watch error by injecting failing transformer.
432+
revertTransformer := store.CorruptTransformer()
433+
defer revertTransformer()
434+
435+
w, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: list.ResourceVersion, Predicate: storage.Everything})
436+
if err != nil {
437+
t.Fatalf("Watch failed: %v", err)
438+
}
439+
440+
// normal deletetion should fail
441+
if err := store.Delete(ctx, key, &example.Pod{}, &storage.Preconditions{}, storage.ValidateAllObjectFunc, nil, storage.DeleteOptions{}); err == nil {
442+
t.Fatalf("Expected normal Delete to fail")
443+
}
444+
if err := store.Delete(ctx, key, &example.Pod{}, &storage.Preconditions{}, storage.ValidateAllObjectFunc, nil, storage.DeleteOptions{IgnoreStoreReadError: true}); err != nil {
445+
t.Fatalf("Expected unsafe Delete to succeed, but got: %v", err)
446+
}
447+
448+
testCheckResultFunc(t, w, func(got watch.Event) {
449+
if want, got := watch.Error, got.Type; want != got {
450+
t.Errorf("Expected event type: %q, but got: %q", want, got)
451+
}
452+
switch v := got.Object.(type) {
453+
case *metav1.Status:
454+
if want, got := metav1.StatusReasonStoreReadError, v.Reason; want != got {
455+
t.Errorf("Expected reason: %q, but got: %q", want, got)
456+
}
457+
if want := "corrupt object has been deleted"; !strings.Contains(v.Message, want) {
458+
t.Errorf("Expected Message to contain: %q, but got: %q", want, v.Message)
459+
}
460+
default:
461+
t.Errorf("expected an metav1 Status object, but got: %v", got.Object)
462+
}
463+
})
464+
}
465+
410466
func RunTestWatchContextCancel(ctx context.Context, t *testing.T, store storage.Interface) {
411467
canceledCtx, cancel := context.WithCancel(ctx)
412468
cancel()

staging/src/k8s.io/client-go/tools/cache/reflector_test.go

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"errors"
2222
"fmt"
2323
"math/rand"
24+
"net/http"
2425
"reflect"
2526
goruntime "runtime"
2627
"strconv"
@@ -1753,6 +1754,63 @@ func TestReflectorListExtract(t *testing.T) {
17531754
}
17541755
}
17551756

1757+
func TestReflectorWithUnsafeDelete(t *testing.T) {
1758+
mkPod := func(id string, rv string) *v1.Pod {
1759+
return &v1.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: id, ResourceVersion: rv}}
1760+
}
1761+
mkList := func(rv string, pods ...*v1.Pod) *v1.PodList {
1762+
list := &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: rv}}
1763+
for _, pod := range pods {
1764+
list.Items = append(list.Items, *pod)
1765+
}
1766+
return list
1767+
}
1768+
makeStatus := func() *metav1.Status {
1769+
return &metav1.Status{
1770+
Status: metav1.StatusFailure,
1771+
Code: http.StatusInternalServerError,
1772+
Reason: metav1.StatusReasonStoreReadError,
1773+
Message: "failed to prepare current and previous objects: corrupt object has been deleted",
1774+
}
1775+
}
1776+
1777+
list := mkList("1")
1778+
events := []watch.Event{
1779+
{Type: watch.Added, Object: mkPod("foo", "2")},
1780+
// the object gets corrupt, and it gets deleted
1781+
{Type: watch.Error, Object: makeStatus()},
1782+
}
1783+
1784+
s := NewFIFO(MetaNamespaceKeyFunc)
1785+
stopCh := make(chan struct{})
1786+
lw := &testLW{
1787+
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
1788+
fw := watch.NewFake()
1789+
go func() {
1790+
for _, e := range events {
1791+
fw.Action(e.Type, e.Object)
1792+
}
1793+
// Because FakeWatcher doesn't buffer events, it's safe to
1794+
// close the stop channel immediately without missing events.
1795+
// But usually, the event producer would instead close the
1796+
// result channel, and wait for the consumer to stop the
1797+
// watcher, to avoid race conditions.
1798+
// TODO: Fix the FakeWatcher to separate watcher.Stop from close(resultCh)
1799+
close(stopCh)
1800+
}()
1801+
return fw, nil
1802+
},
1803+
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
1804+
return list, nil
1805+
},
1806+
}
1807+
1808+
r := NewReflector(lw, &v1.Pod{}, s, 0)
1809+
if err := r.ListAndWatch(stopCh); err != nil {
1810+
t.Errorf("unexpected error from ListAndWatch: %v", err)
1811+
}
1812+
}
1813+
17561814
func BenchmarkExtractList(b *testing.B) {
17571815
_, _, podList := getPodListItems(0, fakeItemsNum)
17581816
_, _, configMapList := getConfigmapListItems(0, fakeItemsNum)

0 commit comments

Comments
 (0)