Skip to content

Commit 25efc8f

Browse files
tkashemAbu Kashem
authored andcommitted
handle watch for unsafe delete
1 parent 9932dbe commit 25efc8f

File tree

8 files changed

+351
-12
lines changed

8 files changed

+351
-12
lines changed

staging/src/k8s.io/apiserver/pkg/storage/etcd3/errors.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,11 @@ limitations under the License.
1717
package etcd3
1818

1919
import (
20+
goerrors "errors"
21+
"net/http"
22+
2023
"k8s.io/apimachinery/pkg/api/errors"
24+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2125
"k8s.io/apiserver/pkg/storage"
2226

2327
etcdrpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
@@ -29,6 +33,19 @@ func interpretWatchError(err error) error {
2933
case err == etcdrpc.ErrCompacted:
3034
return errors.NewResourceExpired("The resourceVersion for the provided watch is too old.")
3135
}
36+
37+
var corruptobjDeletedErr *corruptObjectDeletedError
38+
if goerrors.As(err, &corruptobjDeletedErr) {
39+
return &errors.StatusError{
40+
ErrStatus: metav1.Status{
41+
Status: metav1.StatusFailure,
42+
Code: http.StatusInternalServerError,
43+
Reason: metav1.StatusReasonStoreReadError,
44+
Message: corruptobjDeletedErr.Error(),
45+
},
46+
}
47+
}
48+
3249
return err
3350
}
3451

staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,28 @@ func (s *storeWithPrefixTransformer) UpdatePrefixTransformer(modifier storagetes
194194
}
195195
}
196196

197+
type corruptedTransformer struct {
198+
value.Transformer
199+
}
200+
201+
func (f *corruptedTransformer) TransformFromStorage(ctx context.Context, data []byte, dataCtx value.Context) (out []byte, stale bool, err error) {
202+
return nil, true, &corruptObjectError{err: fmt.Errorf("bits flipped"), errType: untransformable}
203+
}
204+
205+
type storeWithCorruptedTransformer struct {
206+
*store
207+
}
208+
209+
func (s *storeWithCorruptedTransformer) CorruptTransformer() func() {
210+
ct := &corruptedTransformer{Transformer: s.transformer}
211+
s.transformer = ct
212+
s.watcher.transformer = ct
213+
return func() {
214+
s.transformer = ct.Transformer
215+
s.watcher.transformer = ct.Transformer
216+
}
217+
}
218+
197219
func TestGuaranteedUpdate(t *testing.T) {
198220
ctx, store, client := testSetup(t)
199221
storagetesting.RunTestGuaranteedUpdate(ctx, t, &storeWithPrefixTransformer{store}, checkStorageInvariants(client.Client, store.codec))

staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -686,18 +686,40 @@ func (wc *watchChan) prepareObjs(e *event) (curObj runtime.Object, oldObj runtim
686686
if len(e.prevValue) > 0 && (e.isDeleted || !wc.acceptAll()) {
687687
data, _, err := wc.watcher.transformer.TransformFromStorage(wc.ctx, e.prevValue, authenticatedDataString(e.key))
688688
if err != nil {
689-
return nil, nil, err
689+
return nil, nil, wc.watcher.transformIfCorruptObjectError(e, err)
690690
}
691691
// Note that this sends the *old* object with the etcd revision for the time at
692692
// which it gets deleted.
693693
oldObj, err = decodeObj(wc.watcher.codec, wc.watcher.versioner, data, e.rev)
694694
if err != nil {
695-
return nil, nil, err
695+
return nil, nil, wc.watcher.transformIfCorruptObjectError(e, err)
696696
}
697697
}
698698
return curObj, oldObj, nil
699699
}
700700

701+
type corruptObjectDeletedError struct {
702+
err error
703+
}
704+
705+
func (e *corruptObjectDeletedError) Error() string {
706+
return fmt.Sprintf("saw a DELETED event, but object data is corrupt - %v", e.err)
707+
}
708+
func (e *corruptObjectDeletedError) Unwrap() error { return e.err }
709+
710+
func (w *watcher) transformIfCorruptObjectError(e *event, err error) error {
711+
var corruptObjErr *corruptObjectError
712+
if !e.isDeleted || !errors.As(err, &corruptObjErr) {
713+
return err
714+
}
715+
716+
// if we are here it means we received a DELETED event but the object
717+
// associated with it is corrupt because we failed to transform or
718+
// decode the data associated with the object.
719+
// wrap the original error so we can send a proper watch Error event.
720+
return &corruptObjectDeletedError{err: corruptObjErr}
721+
}
722+
701723
func decodeObj(codec runtime.Codec, versioner storage.Versioner, data []byte, rev int64) (_ runtime.Object, err error) {
702724
obj, err := runtime.Decode(codec, []byte(data))
703725
if err != nil {

staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,12 @@ func TestProgressNotify(t *testing.T) {
112112
storagetesting.RunOptionalTestProgressNotify(ctx, t, store)
113113
}
114114

115+
func TestWatchWithUnsafeDelete(t *testing.T) {
116+
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.AllowUnsafeMalformedObjectDeletion, true)
117+
ctx, store, _ := testSetup(t)
118+
storagetesting.RunTestWatchWithUnsafeDelete(ctx, t, &storeWithCorruptedTransformer{store})
119+
}
120+
115121
// TestWatchDispatchBookmarkEvents makes sure that
116122
// setting allowWatchBookmarks query param against
117123
// etcd implementation doesn't have any effect.

staging/src/k8s.io/apiserver/pkg/storage/testing/store_tests.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2147,6 +2147,11 @@ type InterfaceWithPrefixTransformer interface {
21472147
UpdatePrefixTransformer(PrefixTransformerModifier) func()
21482148
}
21492149

2150+
type InterfaceWithCorruptTransformer interface {
2151+
storage.Interface
2152+
CorruptTransformer() func()
2153+
}
2154+
21502155
func RunTestListResourceVersionMatch(ctx context.Context, t *testing.T, store InterfaceWithPrefixTransformer) {
21512156
nextPod := func(index uint32) (string, *example.Pod) {
21522157
obj := &example.Pod{

staging/src/k8s.io/apiserver/pkg/storage/testing/watcher_tests.go

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"context"
2121
"fmt"
2222
"net/http"
23+
"strings"
2324
"sync"
2425
"testing"
2526
"time"
@@ -407,6 +408,61 @@ func RunTestWatchError(ctx context.Context, t *testing.T, store InterfaceWithPre
407408
testCheckEventType(t, w, watch.Error)
408409
}
409410

411+
func RunTestWatchWithUnsafeDelete(ctx context.Context, t *testing.T, store InterfaceWithCorruptTransformer) {
412+
obj := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "test-ns"}}
413+
key := computePodKey(obj)
414+
415+
out := &example.Pod{}
416+
if err := store.Create(ctx, key, obj, out, 0); err != nil {
417+
t.Fatalf("failed to create object in the store: %v", err)
418+
}
419+
420+
// Compute the initial resource version from which we can start watching later.
421+
list := &example.PodList{}
422+
storageOpts := storage.ListOptions{
423+
ResourceVersion: "0",
424+
Predicate: storage.Everything,
425+
Recursive: true,
426+
}
427+
if err := store.GetList(ctx, "/pods", storageOpts, list); err != nil {
428+
t.Errorf("Unexpected error: %v", err)
429+
}
430+
431+
// Now trigger watch error by injecting failing transformer.
432+
revertTransformer := store.CorruptTransformer()
433+
defer revertTransformer()
434+
435+
w, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: list.ResourceVersion, Predicate: storage.Everything})
436+
if err != nil {
437+
t.Fatalf("Watch failed: %v", err)
438+
}
439+
440+
// normal deletetion should fail
441+
if err := store.Delete(ctx, key, &example.Pod{}, nil, storage.ValidateAllObjectFunc, nil, storage.DeleteOptions{}); err == nil {
442+
t.Fatalf("Expected normal Delete to fail")
443+
}
444+
if err := store.Delete(ctx, key, &example.Pod{}, nil, storage.ValidateAllObjectFunc, nil, storage.DeleteOptions{IgnoreStoreReadError: true}); err != nil {
445+
t.Fatalf("Expected unsafe Delete to succeed, but got: %v", err)
446+
}
447+
448+
testCheckResultFunc(t, w, func(got watch.Event) {
449+
if want, got := watch.Error, got.Type; want != got {
450+
t.Errorf("Expected event type: %q, but got: %q", want, got)
451+
}
452+
switch v := got.Object.(type) {
453+
case *metav1.Status:
454+
if want, got := metav1.StatusReasonStoreReadError, v.Reason; want != got {
455+
t.Errorf("Expected reason: %q, but got: %q", want, got)
456+
}
457+
if want := "saw a DELETED event, but object data is corrupt"; !strings.Contains(v.Message, want) {
458+
t.Errorf("Expected Message to contain: %q, but got: %q", want, v.Message)
459+
}
460+
default:
461+
t.Errorf("expected an metav1 Status object, but got: %v", got.Object)
462+
}
463+
})
464+
}
465+
410466
func RunTestWatchContextCancel(ctx context.Context, t *testing.T, store storage.Interface) {
411467
canceledCtx, cancel := context.WithCancel(ctx)
412468
cancel()

staging/src/k8s.io/client-go/tools/cache/reflector_test.go

Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,12 @@ import (
2121
"errors"
2222
"fmt"
2323
"math/rand"
24+
"net/http"
2425
"reflect"
2526
goruntime "runtime"
2627
"strconv"
28+
"sync"
29+
"sync/atomic"
2730
"syscall"
2831
"testing"
2932
"time"
@@ -1753,6 +1756,152 @@ func TestReflectorListExtract(t *testing.T) {
17531756
}
17541757
}
17551758

1759+
func TestReflectorReplacesStoreOnUnsafeDelete(t *testing.T) {
1760+
mkPod := func(id string, rv string) *v1.Pod {
1761+
return &v1.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: id, ResourceVersion: rv}}
1762+
}
1763+
mkList := func(rv string, pods ...*v1.Pod) *v1.PodList {
1764+
list := &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: rv}}
1765+
for _, pod := range pods {
1766+
list.Items = append(list.Items, *pod)
1767+
}
1768+
return list
1769+
}
1770+
makeStatus := func() *metav1.Status {
1771+
return &metav1.Status{
1772+
Status: metav1.StatusFailure,
1773+
Code: http.StatusInternalServerError,
1774+
Reason: metav1.StatusReasonStoreReadError,
1775+
Message: "failed to prepare current and previous objects: corrupt object has been deleted",
1776+
}
1777+
}
1778+
1779+
// these pods preexist and never get updated/deleted
1780+
preExisting := mkPod("foo-1", "1")
1781+
pods := []*v1.Pod{preExisting, mkPod("foo-2", "2"), mkPod("foo-3", "3")}
1782+
lastExpectedRV := "5"
1783+
lists := []*v1.PodList{
1784+
mkList("3", pods...), // initial list
1785+
mkList(lastExpectedRV, pods...), // re-list due to watch error
1786+
}
1787+
corruptObj := mkPod("foo", "4")
1788+
events := []watch.Event{
1789+
{Type: watch.Added, Object: corruptObj},
1790+
// the object becomes corrupt, and it gets unsafe-deleted, and
1791+
// watch sends the following Error event, note the RV has
1792+
// advanced to "5" in the storage due to the delete operation
1793+
{Type: watch.Error, Object: makeStatus()},
1794+
}
1795+
1796+
s := NewFIFO(MetaNamespaceKeyFunc)
1797+
var replaceInvoked atomic.Int32
1798+
store := &fakeStore{
1799+
Store: s,
1800+
beforeReplace: func(list []interface{}, rv string) {
1801+
// interested in the Replace call that happens after the Error event
1802+
if rv == lastExpectedRV {
1803+
replaceInvoked.Add(1)
1804+
_, exists, err := s.Get(corruptObj)
1805+
if err != nil || !exists {
1806+
t.Errorf("expected the object to exist in the store, exists: %t, err: %v", exists, err)
1807+
}
1808+
_, exists, err = s.Get(preExisting)
1809+
if err != nil || !exists {
1810+
t.Errorf("expected the pre-existing object to be in the store, exists: %t, err: %v", exists, err)
1811+
}
1812+
}
1813+
},
1814+
afterReplace: func(rv string, err error) {
1815+
if rv == lastExpectedRV {
1816+
replaceInvoked.Add(1)
1817+
if err != nil {
1818+
t.Errorf("expected Replace to have succeeded, but got error: %v", err)
1819+
}
1820+
_, exists, err := s.Get(corruptObj)
1821+
if err != nil || exists {
1822+
t.Errorf("expected the object to have been removed from the store, exists: %t, err: %v", exists, err)
1823+
}
1824+
// show that a pre-existing pod is still in the cache
1825+
_, exists, err = s.Get(preExisting)
1826+
if err != nil || !exists {
1827+
t.Errorf("expected the pre-existing object to be in the store, exists: %t, err: %v", exists, err)
1828+
}
1829+
}
1830+
},
1831+
}
1832+
1833+
var once sync.Once
1834+
lw := &testLW{
1835+
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
1836+
fw := watch.NewFake()
1837+
go func() {
1838+
once.Do(func() {
1839+
for _, e := range events {
1840+
fw.Action(e.Type, e.Object)
1841+
}
1842+
})
1843+
}()
1844+
return fw, nil
1845+
},
1846+
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
1847+
var list runtime.Object
1848+
if len(lists) > 0 {
1849+
list = lists[0]
1850+
lists = lists[1:]
1851+
}
1852+
return list, nil
1853+
},
1854+
}
1855+
1856+
r := NewReflector(lw, &v1.Pod{}, store, 0)
1857+
doneCh, stopCh := make(chan struct{}), make(chan struct{})
1858+
go func() {
1859+
defer close(doneCh)
1860+
r.Run(stopCh)
1861+
}()
1862+
1863+
// wait for the RV to sync to the version returned by the final list
1864+
err := wait.PollUntilContextTimeout(context.Background(), 100*time.Millisecond, wait.ForeverTestTimeout, true, func(ctx context.Context) (done bool, err error) {
1865+
if rv := r.LastSyncResourceVersion(); rv == lastExpectedRV {
1866+
return true, nil
1867+
}
1868+
return false, nil
1869+
})
1870+
if err != nil {
1871+
t.Fatalf("reflector never caught up with expected revision: %q, err: %v", lastExpectedRV, err)
1872+
}
1873+
1874+
if want, got := lastExpectedRV, r.LastSyncResourceVersion(); want != got {
1875+
t.Errorf("expected LastSyncResourceVersion to be %q, but got: %q", want, got)
1876+
}
1877+
if want, got := 2, int(replaceInvoked.Load()); want != got {
1878+
t.Errorf("expected store Delete hooks to be invoked %d times, but got: %d", want, got)
1879+
}
1880+
if want, got := len(pods), len(s.List()); want != got {
1881+
t.Errorf("expected the store to have %d objects, but got: %d", want, got)
1882+
}
1883+
1884+
close(stopCh)
1885+
select {
1886+
case <-doneCh:
1887+
case <-time.After(wait.ForeverTestTimeout):
1888+
t.Errorf("timed out waiting for Run to return")
1889+
}
1890+
}
1891+
1892+
type fakeStore struct {
1893+
Store
1894+
beforeReplace func(list []interface{}, s string)
1895+
afterReplace func(rv string, err error)
1896+
}
1897+
1898+
func (f *fakeStore) Replace(list []interface{}, rv string) error {
1899+
f.beforeReplace(list, rv)
1900+
err := f.Store.Replace(list, rv)
1901+
f.afterReplace(rv, err)
1902+
return err
1903+
}
1904+
17561905
func BenchmarkExtractList(b *testing.B) {
17571906
_, _, podList := getPodListItems(0, fakeItemsNum)
17581907
_, _, configMapList := getConfigmapListItems(0, fakeItemsNum)

0 commit comments

Comments
 (0)