Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
257 changes: 190 additions & 67 deletions CHANGELOG/CHANGELOG-1.32.md

Large diffs are not rendered by default.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion openshift-hack/images/hyperkube/Dockerfile.rhel
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@ COPY --from=builder /tmp/build/* /usr/bin/
LABEL io.k8s.display-name="OpenShift Kubernetes Server Commands" \
io.k8s.description="OpenShift is a platform for developing, building, and deploying containerized applications." \
io.openshift.tags="openshift,hyperkube" \
io.openshift.build.versions="kubernetes=1.32.3"
io.openshift.build.versions="kubernetes=1.32.4"
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"context"
"errors"
"fmt"
"slices"
"sync"
"time"

Expand Down Expand Up @@ -528,15 +529,21 @@ func (dswp *desiredStateOfWorldPopulator) getPVCExtractPV(
return nil, fmt.Errorf("failed to fetch PVC from API server: %v", err)
}

// Pods that uses a PVC that is being deleted must not be started.
// Pods that uses a PVC that is being deleted and not protected by
// kubernetes.io/pvc-protection must not be started.
//
// In case an old kubelet is running without this check or some kubelets
// have this feature disabled, the worst that can happen is that such
// pod is scheduled. This was the default behavior in 1.8 and earlier
// and users should not be that surprised.
// 1) In case an old kubelet is running without this check, the worst
// that can happen is that such pod is scheduled. This was the default
// behavior in 1.8 and earlier and users should not be that surprised.
// It should happen only in very rare case when scheduler schedules
// a pod and user deletes a PVC that's used by it at the same time.
if pvc.ObjectMeta.DeletionTimestamp != nil {
//
// 2) Adding a check for kubernetes.io/pvc-protection here to prevent
// the existing running pods from being affected during the rebuild of
// the desired state of the world cache when the kubelet is restarted.
// It is safe for kubelet to add this check here because the PVC will
// be stuck in Terminating state until the pod is deleted.
if pvc.ObjectMeta.DeletionTimestamp != nil && !slices.Contains(pvc.Finalizers, util.PVCProtectionFinalizer) {
return nil, errors.New("PVC is being deleted")
}

Expand Down
71 changes: 40 additions & 31 deletions staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,12 @@ func (wc *watchChan) serialProcessEvents(wg *sync.WaitGroup) {
for {
select {
case e := <-wc.incomingEventChan:
res := wc.transform(e)
res, err := wc.transform(e)
if err != nil {
wc.sendError(err)
return
}

if res == nil {
continue
}
Expand All @@ -461,10 +466,8 @@ func (wc *watchChan) serialProcessEvents(wg *sync.WaitGroup) {

func (wc *watchChan) concurrentProcessEvents(wg *sync.WaitGroup) {
p := concurrentOrderedEventProcessing{
input: wc.incomingEventChan,
processFunc: wc.transform,
output: wc.resultChan,
processingQueue: make(chan chan *watch.Event, processEventConcurrency-1),
wc: wc,
processingQueue: make(chan chan *processingResult, processEventConcurrency-1),

objectType: wc.watcher.objectType,
groupResource: wc.watcher.groupResource,
Expand All @@ -481,12 +484,15 @@ func (wc *watchChan) concurrentProcessEvents(wg *sync.WaitGroup) {
}()
}

type processingResult struct {
event *watch.Event
err error
}

type concurrentOrderedEventProcessing struct {
input chan *event
processFunc func(*event) *watch.Event
output chan watch.Event
wc *watchChan

processingQueue chan chan *watch.Event
processingQueue chan chan *processingResult
// Metadata for logging
objectType string
groupResource schema.GroupResource
Expand All @@ -498,28 +504,29 @@ func (p *concurrentOrderedEventProcessing) scheduleEventProcessing(ctx context.C
select {
case <-ctx.Done():
return
case e = <-p.input:
case e = <-p.wc.incomingEventChan:
}
processingResponse := make(chan *watch.Event, 1)
processingResponse := make(chan *processingResult, 1)
select {
case <-ctx.Done():
return
case p.processingQueue <- processingResponse:
}
wg.Add(1)
go func(e *event, response chan<- *watch.Event) {
go func(e *event, response chan<- *processingResult) {
defer wg.Done()
responseEvent, err := p.wc.transform(e)
select {
case <-ctx.Done():
case response <- p.processFunc(e):
case response <- &processingResult{event: responseEvent, err: err}:
}
}(e, processingResponse)
}
}

func (p *concurrentOrderedEventProcessing) collectEventProcessing(ctx context.Context) {
var processingResponse chan *watch.Event
var e *watch.Event
var processingResponse chan *processingResult
var r *processingResult
for {
select {
case <-ctx.Done():
Expand All @@ -529,21 +536,25 @@ func (p *concurrentOrderedEventProcessing) collectEventProcessing(ctx context.Co
select {
case <-ctx.Done():
return
case e = <-processingResponse:
case r = <-processingResponse:
}
if e == nil {
if r.err != nil {
p.wc.sendError(r.err)
return
}
if r.event == nil {
continue
}
if len(p.output) == cap(p.output) {
klog.V(3).InfoS("Fast watcher, slow processing. Probably caused by slow dispatching events to watchers", "outgoingEvents", outgoingBufSize, "objectType", p.objectType, "groupResource", p.groupResource)
if len(p.wc.resultChan) == cap(p.wc.resultChan) {
klog.V(3).InfoS("Fast watcher, slow processing. Probably caused by slow dispatching events to watchers", "outgoingEvents", outgoingBufSize, "objectType", p.wc.watcher.objectType, "groupResource", p.wc.watcher.groupResource)
}
// If user couldn't receive results fast enough, we also block incoming events from watcher.
// Because storing events in local will cause more memory usage.
// The worst case would be closing the fast watcher.
select {
case <-ctx.Done():
case p.wc.resultChan <- *r.event:
case <-p.wc.ctx.Done():
return
case p.output <- *e:
}
}
}
Expand All @@ -561,25 +572,23 @@ func (wc *watchChan) acceptAll() bool {
}

// transform transforms an event into a result for user if not filtered.
func (wc *watchChan) transform(e *event) (res *watch.Event) {
func (wc *watchChan) transform(e *event) (res *watch.Event, err error) {
curObj, oldObj, err := wc.prepareObjs(e)
if err != nil {
klog.Errorf("failed to prepare current and previous objects: %v", err)
wc.sendError(err)
return nil
return nil, err
}

switch {
case e.isProgressNotify:
object := wc.watcher.newFunc()
if err := wc.watcher.versioner.UpdateObject(object, uint64(e.rev)); err != nil {
klog.Errorf("failed to propagate object version: %v", err)
return nil
return nil, fmt.Errorf("failed to propagate object resource version: %w", err)
}
if e.isInitialEventsEndBookmark {
if err := storage.AnnotateInitialEventsEndBookmark(object); err != nil {
wc.sendError(fmt.Errorf("error while accessing object's metadata gr: %v, type: %v, obj: %#v, err: %v", wc.watcher.groupResource, wc.watcher.objectType, object, err))
return nil
return nil, fmt.Errorf("error while accessing object's metadata gr: %v, type: %v, obj: %#v, err: %w", wc.watcher.groupResource, wc.watcher.objectType, object, err)
}
}
res = &watch.Event{
Expand All @@ -588,15 +597,15 @@ func (wc *watchChan) transform(e *event) (res *watch.Event) {
}
case e.isDeleted:
if !wc.filter(oldObj) {
return nil
return nil, nil
}
res = &watch.Event{
Type: watch.Deleted,
Object: oldObj,
}
case e.isCreated:
if !wc.filter(curObj) {
return nil
return nil, nil
}
res = &watch.Event{
Type: watch.Added,
Expand All @@ -608,7 +617,7 @@ func (wc *watchChan) transform(e *event) (res *watch.Event) {
Type: watch.Modified,
Object: curObj,
}
return res
return res, nil
}
curObjPasses := wc.filter(curObj)
oldObjPasses := wc.filter(oldObj)
Expand All @@ -630,7 +639,7 @@ func (wc *watchChan) transform(e *event) (res *watch.Event) {
}
}
}
return res
return res, nil
}

func transformErrorToEvent(err error) *watch.Event {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,11 @@ func TestWatchListMatchSingle(t *testing.T) {
storagetesting.RunWatchListMatchSingle(ctx, t, store)
}

func TestWatchErrorEventIsBlockingFurtherEvent(t *testing.T) {
ctx, store, _ := testSetup(t)
storagetesting.RunWatchErrorIsBlockingFurtherEvents(ctx, t, &storeWithPrefixTransformer{store})
}

// =======================================================================
// Implementation-specific tests are following.
// The following tests are exercising the details of the implementation
Expand Down
67 changes: 67 additions & 0 deletions staging/src/k8s.io/apiserver/pkg/storage/testing/watcher_tests.go
Original file line number Diff line number Diff line change
Expand Up @@ -1698,6 +1698,73 @@ func RunWatchListMatchSingle(ctx context.Context, t *testing.T, store storage.In
TestCheckNoMoreResultsWithIgnoreFunc(t, w, nil)
}

func RunWatchErrorIsBlockingFurtherEvents(ctx context.Context, t *testing.T, store InterfaceWithPrefixTransformer) {
foo := &example.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: "test", Name: "foo"}}
fooKey := fmt.Sprintf("/pods/%s/%s", foo.Namespace, foo.Name)
fooCreated := &example.Pod{}
if err := store.Create(context.Background(), fooKey, foo, fooCreated, 0); err != nil {
t.Errorf("failed to create object: %v", err)
}
bar := &example.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: "test", Name: "bar"}}
barKey := fmt.Sprintf("/pods/%s/%s", bar.Namespace, bar.Name)
barCreated := &example.Pod{}
if err := store.Create(context.Background(), barKey, bar, barCreated, 0); err != nil {
t.Errorf("failed to create object: %v", err)
}

// Update transformer to ensure that foo will become effectively corrupted.
revertTransformer := store.UpdatePrefixTransformer(
func(transformer *PrefixTransformer) value.Transformer {
transformer.prefix = []byte("other-prefix")
return transformer
})
defer revertTransformer()

baz := &example.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: "test", Name: "baz"}}
bazKey := fmt.Sprintf("/pods/%s/%s", baz.Namespace, baz.Name)
bazCreated := &example.Pod{}
if err := store.Create(context.Background(), bazKey, baz, bazCreated, 0); err != nil {
t.Errorf("failed to create object: %v", err)
}

opts := storage.ListOptions{
ResourceVersion: fooCreated.ResourceVersion,
Predicate: storage.Everything,
Recursive: true,
}

// Run N concurrent watches. Given the asynchronous nature, we increase the
// probability of hitting the race in at least one of those watches.
concurrentWatches := 10
wg := sync.WaitGroup{}
for i := 0; i < concurrentWatches; i++ {
wg.Add(1)
go func() {
defer wg.Done()
w, err := store.Watch(ctx, "/pods", opts)
if err != nil {
t.Errorf("failed to create watch: %v", err)
return
}

// We issue the watch starting from object bar.
// The object fails TransformFromStorage and generates ERROR watch event.
// The further events (i.e. ADDED event for baz object) should not be
// emitted, so we verify no events other than ERROR type are emitted.
for {
event, ok := <-w.ResultChan()
if !ok {
break
}
if event.Type != watch.Error {
t.Errorf("unexpected event: %#v", event)
}
}
}()
}
wg.Wait()
}

func makePod(namePrefix string) *example.Pod {
return &example.Pod{
ObjectMeta: metav1.ObjectMeta{
Expand Down
Loading