Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions cmd/csi-attacher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@ import (
"fmt"
"net/http"
"os"
"strings"
"time"

"k8s.io/apimachinery/pkg/runtime"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
Expand All @@ -49,6 +51,7 @@ import (
"github.com/kubernetes-csi/external-attacher/pkg/attacher"
"github.com/kubernetes-csi/external-attacher/pkg/controller"
"google.golang.org/grpc"
utilflag "k8s.io/component-base/cli/flag"
)

const (
Expand Down Expand Up @@ -88,13 +91,18 @@ var (
kubeAPIBurst = flag.Int("kube-api-burst", 10, "Burst to use while communicating with the kubernetes apiserver. Defaults to 10.")

maxGRPCLogLength = flag.Int("max-grpc-log-length", -1, "The maximum amount of characters logged for every grpc responses. Defaults to no limit")

featureGates map[string]bool
)

var (
version = "unknown"
)

func main() {
flag.Var(utilflag.NewMapStringBool(&featureGates), "feature-gates", "A set of key=value pairs that describe feature gates for alpha/experimental features. "+
"Options are:\n"+strings.Join(utilfeature.DefaultFeatureGate.KnownFeatures(), "\n"))

fg := featuregate.NewFeatureGate()
logsapi.AddFeatureGates(fg)
c := logsapi.NewLoggingConfiguration()
Expand All @@ -108,6 +116,11 @@ func main() {
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}

if err := utilfeature.DefaultMutableFeatureGate.SetFromMap(featureGates); err != nil {
logger.Error(err, "failed to store flag gates", "featureGates", featureGates)
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}

if *showVersion {
fmt.Println(os.Args[0], version)
return
Expand Down
9 changes: 5 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@ require (
github.com/kubernetes-csi/csi-lib-utils v0.22.0
github.com/kubernetes-csi/csi-test/v5 v5.3.1
google.golang.org/grpc v1.72.1
k8s.io/api v0.33.1
k8s.io/apimachinery v0.33.1
k8s.io/client-go v0.33.1
k8s.io/component-base v0.33.1
k8s.io/api v0.33.3
k8s.io/apimachinery v0.33.3
k8s.io/apiserver v0.33.3
k8s.io/client-go v0.33.3
k8s.io/component-base v0.33.3
k8s.io/csi-translation-lib v0.33.0
k8s.io/klog/v2 v2.130.1
)
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,8 @@ k8s.io/api v0.33.0 h1:yTgZVn1XEe6opVpP1FylmNrIFWuDqe2H0V8CT5gxfIU=
k8s.io/api v0.33.0/go.mod h1:CTO61ECK/KU7haa3qq8sarQ0biLq2ju405IZAd9zsiM=
k8s.io/apimachinery v0.33.0 h1:1a6kHrJxb2hs4t8EE5wuR/WxKDwGN1FKH3JvDtA0CIQ=
k8s.io/apimachinery v0.33.0/go.mod h1:BHW0YOu7n22fFv/JkYOEfkUYNRN0fj0BlvMFWA7b+SM=
k8s.io/apiserver v0.33.3 h1:Wv0hGc+QFdMJB4ZSiHrCgN3zL3QRatu56+rpccKC3J4=
k8s.io/apiserver v0.33.3/go.mod h1:05632ifFEe6TxwjdAIrwINHWE2hLwyADFk5mBsQa15E=
k8s.io/client-go v0.33.0 h1:UASR0sAYVUzs2kYuKn/ZakZlcs2bEHaizrrHUZg0G98=
k8s.io/client-go v0.33.0/go.mod h1:kGkd+l/gNGg8GYWAPr0xF1rRKvVWvzh9vmZAMXtaKOg=
k8s.io/component-base v0.33.0 h1:Ot4PyJI+0JAD9covDhwLp9UNkUja209OzsJ4FzScBNk=
Expand Down
15 changes: 14 additions & 1 deletion pkg/controller/csi_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,14 @@ import (

"github.com/kubernetes-csi/csi-lib-utils/connection"
"github.com/kubernetes-csi/external-attacher/pkg/attacher"
"github.com/kubernetes-csi/external-attacher/pkg/features"
"google.golang.org/grpc/status"
v1 "k8s.io/api/core/v1"
storage "k8s.io/api/storage/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/kubernetes"
corelisters "k8s.io/client-go/listers/core/v1"
storagelisters "k8s.io/client-go/listers/storage/v1"
Expand Down Expand Up @@ -610,11 +613,21 @@ func (h *csiHandler) saveAttachError(ctx context.Context, va *storage.VolumeAtta
logger := klog.FromContext(ctx)
logger.V(4).Info("Saving attach error")
clone := va.DeepCopy()
clone.Status.AttachError = &storage.VolumeError{

volumeError := &storage.VolumeError{
Message: err.Error(),
Time: metav1.Now(),
}

if utilfeature.DefaultFeatureGate.Enabled(features.MutableCSINodeAllocatableCount) {
Comment thread
torredil marked this conversation as resolved.
if st, ok := status.FromError(err); ok {
errorCode := int32(st.Code())
volumeError.ErrorCode = &errorCode
}
}

clone.Status.AttachError = volumeError
Comment thread
torredil marked this conversation as resolved.

var newVa *storage.VolumeAttachment
if newVa, err = h.patchVA(ctx, va, clone, "status"); err != nil {
return va, err
Expand Down
72 changes: 71 additions & 1 deletion pkg/controller/csi_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,24 @@ import (
"testing"
"time"

"google.golang.org/grpc/codes"

"github.com/kubernetes-csi/csi-lib-utils/connection"
"github.com/kubernetes-csi/external-attacher/pkg/attacher"

"github.com/kubernetes-csi/external-attacher/pkg/features"
"google.golang.org/grpc/status"
v1 "k8s.io/api/core/v1"
storage "k8s.io/api/storage/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
core "k8s.io/client-go/testing"
featuregatetesting "k8s.io/component-base/featuregate/testing"
csitranslator "k8s.io/csi-translation-lib"
"k8s.io/klog/v2"
_ "k8s.io/klog/v2/ktesting/init"
Expand Down Expand Up @@ -269,6 +274,16 @@ func patch(original, new interface{}) []byte {
return patch
}

func vaWithAttachErrorAndCode(va *storage.VolumeAttachment, message string, code codes.Code) *storage.VolumeAttachment {
errorCode := int32(code)
va.Status.AttachError = &storage.VolumeError{
Message: message,
Time: metav1.Time{},
ErrorCode: &errorCode,
}
return va
}

func TestCSIHandler(t *testing.T) {
vaGroupResourceVersion := schema.GroupVersionResource{
Group: storage.GroupName,
Expand Down Expand Up @@ -1403,6 +1418,61 @@ func TestCSIHandler(t *testing.T) {
runTests(t, csiHandlerFactory, tests)
}

func TestVolumeAttachmentWithErrorCode(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.MutableCSINodeAllocatableCount, true)

vaGroupResourceVersion := schema.GroupVersionResource{
Group: storage.GroupName,
Version: "v1",
Resource: "volumeattachments",
}

var noMetadata map[string]string
var noAttrs map[string]string
var noSecrets map[string]string
var notDetached = false
var success error
var readWrite = false

test := testCase{
name: "CSI attach fails with gRPC error -> controller saves ErrorCode and retries",
initialObjects: []runtime.Object{pvWithFinalizer(), csiNode()},
addedVA: va(false, "", nil),
expectedActions: []core.Action{
core.NewPatchAction(vaGroupResourceVersion, metav1.NamespaceNone, testPVName+"-"+testNodeName,
types.MergePatchType, patch(va(false, "", nil), va(false, fin, ann))),

// The CSI call fails, so the controller saves the error status.
core.NewPatchSubresourceAction(vaGroupResourceVersion, metav1.NamespaceNone,
testPVName+"-"+testNodeName,
types.MergePatchType, patch(va(false, fin, ann),
vaWithAttachErrorAndCode(va(false, fin, ann), "rpc error: code = ResourceExhausted desc = mock rpc error", codes.ResourceExhausted)), "status"),

// On retry, the controller reads the original VA again and tries to re-apply the finalizer/annotation.
core.NewPatchAction(vaGroupResourceVersion, metav1.NamespaceNone, testPVName+"-"+testNodeName,
types.MergePatchType, patch(
vaWithAttachErrorAndCode(va(false, "", nil), "rpc error: code = ResourceExhausted desc = mock rpc error", codes.ResourceExhausted),
vaWithAttachErrorAndCode(va(false, fin, ann), "rpc error: code = ResourceExhausted desc = mock rpc error", codes.ResourceExhausted),
)),

// The CSI call succeeds now, and the controller clears the error and marks the VA as attached.
core.NewPatchSubresourceAction(vaGroupResourceVersion, metav1.NamespaceNone,
testPVName+"-"+testNodeName,
types.MergePatchType, patch(
vaWithAttachErrorAndCode(va(false, fin, ann), "rpc error: code = ResourceExhausted desc = mock rpc error", codes.ResourceExhausted),
va(true /*attached*/, fin, ann),
),
"status"),
},
expectedCSICalls: []csiCall{
{"attach", testVolumeHandle, testNodeID, noAttrs, noSecrets, readWrite, status.Error(codes.ResourceExhausted, "mock rpc error"), notDetached, noMetadata, 0},
{"attach", testVolumeHandle, testNodeID, noAttrs, noSecrets, readWrite, success, notDetached, noMetadata, 0},
},
}

runTests(t, csiHandlerFactory, []testCase{test})
}

func TestCSIHandlerReconcileVA(t *testing.T) {
nID := map[string]string{
vaNodeIDAnnotation: testNodeID,
Expand Down
44 changes: 44 additions & 0 deletions pkg/features/features.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
Copyright 2025 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package features

import (
"k8s.io/apiserver/pkg/util/feature"
"k8s.io/component-base/featuregate"
)

const (
// owner: @torredil @gnufied @msau42
// kep: https://kep.k8s.io/4876
// alpha: v1.33
// beta: v1.34
//
// Makes CSINode.Spec.Drivers[*].Allocatable.Count mutable, allowing CSI drivers to
// update the number of volumes that can be allocated on a node. Additionally, enables
// setting ErrorCode field in VolumeAttachment status.
MutableCSINodeAllocatableCount featuregate.Feature = "MutableCSINodeAllocatableCount"
)

func init() {
feature.DefaultMutableFeatureGate.Add(defaultKubernetesFeatureGates)
}

// defaultKubernetesFeatureGates consists of all known feature keys specific to external-attacher.
// To add a new feature, define a key for it above and add it here.
var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureSpec{
MutableCSINodeAllocatableCount: {Default: false, PreRelease: featuregate.Beta},
}
Loading