Skip to content

Commit

Permalink
Merge pull request kubernetes#25490 from p0lyn0mial/release-4-5-incon…
Browse files Browse the repository at this point in the history
…sistent-list

Bug 1877346: Fix bug for inconsistent lists served from etcd

Origin-commit: 530288217294faea179bbcba066361b35818ee13
  • Loading branch information
k8s-publishing-bot committed Sep 27, 2020
2 parents 7f90a86 + 148ec40 commit 48767cf
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 3 deletions.
13 changes: 10 additions & 3 deletions staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,7 @@ func (s *store) List(ctx context.Context, key, resourceVersion string, pred stor

newItemFunc := getNewItemFunc(listObj, v)

var returnedRV, continueRV int64
var returnedRV, continueRV, withRev int64
var continueKey string
switch {
case s.pagingEnabled && len(pred.Continue) > 0:
Expand All @@ -568,7 +568,7 @@ func (s *store) List(ctx context.Context, key, resourceVersion string, pred stor
// continueRV==0 is invalid.
// If continueRV < 0, the request is for the latest resource version.
if continueRV > 0 {
options = append(options, clientv3.WithRev(continueRV))
withRev = continueRV
returnedRV = continueRV
}
case s.pagingEnabled && pred.Limit > 0:
Expand All @@ -578,7 +578,7 @@ func (s *store) List(ctx context.Context, key, resourceVersion string, pred stor
return apierrors.NewBadRequest(fmt.Sprintf("invalid resource version: %v", err))
}
if fromRV > 0 {
options = append(options, clientv3.WithRev(int64(fromRV)))
withRev = int64(fromRV)
}
returnedRV = int64(fromRV)
}
Expand All @@ -589,6 +589,9 @@ func (s *store) List(ctx context.Context, key, resourceVersion string, pred stor
default:
options = append(options, clientv3.WithPrefix())
}
if withRev != 0 {
options = append(options, clientv3.WithRev(withRev))
}

// loop until we have filled the requested limit from etcd or there are no more results
var lastKey []byte
Expand Down Expand Up @@ -650,6 +653,10 @@ func (s *store) List(ctx context.Context, key, resourceVersion string, pred stor
break
}
key = string(lastKey) + "\x00"
if withRev == 0 {
withRev = returnedRV
options = append(options, clientv3.WithRev(withRev))
}
}

// instruct the client to begin querying from immediately after the last key we returned
Expand Down
84 changes: 84 additions & 0 deletions staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1762,3 +1762,87 @@ func Test_growSlice(t *testing.T) {
})
}
}

// fancyTransformer creates next object on each call to
// TransformFromStorage call.
type fancyTransformer struct {
transformer value.Transformer
store *store

lock sync.Mutex
index int
}

func (t *fancyTransformer) TransformFromStorage(b []byte, ctx value.Context) ([]byte, bool, error) {
if err := t.createObject(); err != nil {
return nil, false, err
}
return t.transformer.TransformFromStorage(b, ctx)
}

func (t *fancyTransformer) TransformToStorage(b []byte, ctx value.Context) ([]byte, error) {
return t.transformer.TransformToStorage(b, ctx)
}

func (t *fancyTransformer) createObject() error {
t.lock.Lock()
defer t.lock.Unlock()

t.index++
key := fmt.Sprintf("pod-%d", t.index)
obj := &example.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: key,
Labels: map[string]string{
"even": strconv.FormatBool(t.index%2 == 0),
},
},
}
out := &example.Pod{}
return t.store.Create(context.TODO(), key, obj, out, 0)
}

func TestConsistentList(t *testing.T) {
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer cluster.Terminate(t)

transformer := &fancyTransformer{
transformer: &prefixTransformer{prefix: []byte(defaultTestPrefix)},
}
store := newStore(cluster.RandClient(), true, codec, "", transformer)
transformer.store = store

for i := 0; i < 5; i++ {
if err := transformer.createObject(); err != nil {
t.Fatalf("failed to create object: %v", err)
}
}

getAttrs := func(obj runtime.Object) (labels.Set, fields.Set, error) {
pod, ok := obj.(*example.Pod)
if !ok {
return nil, nil, fmt.Errorf("invalid object")
}
return labels.Set(pod.Labels), nil, nil
}
predicate := storage.SelectionPredicate{
Label: labels.Set{"even": "true"}.AsSelector(),
GetAttrs: getAttrs,
Limit: 4,
}

result1 := example.PodList{}
if err := store.List(context.TODO(), "/", "", predicate, &result1); err != nil {
t.Fatalf("failed to list objects: %v", err)
}

result2 := example.PodList{}
if err := store.List(context.TODO(), "/", result1.ResourceVersion, predicate, &result2); err != nil {
t.Fatalf("failed to list objects: %v", err)
}

if !reflect.DeepEqual(result1, result2) {
t.Errorf("inconsistent lists: %#v, %#v", result1, result2)
}
}

0 comments on commit 48767cf

Please sign in to comment.