Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 19 additions & 10 deletions pkg/attacher/attacher.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,8 @@ type Attacher interface {
// status.
Attach(ctx context.Context, volumeID string, readOnly bool, nodeID string, caps *csi.VolumeCapability, attributes, secrets map[string]string) (metadata map[string]string, detached bool, err error)

// Detach given volume from given node. Note that "detached" is returned on
// error and means that the volume is for sure detached from the node.
// "false" means that the volume may or may not be detached and caller
// should retry.
Detach(ctx context.Context, volumeID string, nodeID string, secrets map[string]string) (detached bool, err error)
// Detach given volume from given node.
Detach(ctx context.Context, volumeID string, nodeID string, secrets map[string]string) error
}

type attacher struct {
Expand Down Expand Up @@ -79,7 +76,7 @@ func (a *attacher) Attach(ctx context.Context, volumeID string, readOnly bool, n
return rsp.PublishContext, false, nil
}

func (a *attacher) Detach(ctx context.Context, volumeID string, nodeID string, secrets map[string]string) (detached bool, err error) {
func (a *attacher) Detach(ctx context.Context, volumeID string, nodeID string, secrets map[string]string) error {
client := csi.NewControllerClient(a.conn)

req := csi.ControllerUnpublishVolumeRequest{
Expand All @@ -88,11 +85,14 @@ func (a *attacher) Detach(ctx context.Context, volumeID string, nodeID string, s
Secrets: secrets,
}

_, err = client.ControllerUnpublishVolume(ctx, &req)
if err != nil {
return isFinalError(err), err
_, err := client.ControllerUnpublishVolume(ctx, &req)
if err != nil && isNotFound(err) {
// Do not change behavior of NotFound in stable branches.
// See https://github.com/kubernetes-csi/external-attacher/pull/165 and
// https://github.com/container-storage-interface/spec/pull/375
return nil
}
return true, nil
return err
}

func logGRPC(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
Expand Down Expand Up @@ -131,3 +131,12 @@ func isFinalError(err error) bool {
// even start or failed. It is for sure not in progress.
return true
}

func isNotFound(err error) bool {
st, ok := status.FromError(err)
if !ok {
// This is not gRPC error.
return false
}
return st.Code() == codes.NotFound
}
90 changes: 46 additions & 44 deletions pkg/attacher/attacher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,54 +291,59 @@ func TestDetachAttach(t *testing.T) {
}

tests := []struct {
name string
volumeID string
nodeID string
secrets map[string]string
input *csi.ControllerUnpublishVolumeRequest
output *csi.ControllerUnpublishVolumeResponse
injectError codes.Code
expectError bool
expectDetached bool
name string
volumeID string
nodeID string
secrets map[string]string
input *csi.ControllerUnpublishVolumeRequest
output *csi.ControllerUnpublishVolumeResponse
injectError codes.Code
expectError bool
}{
{
name: "success",
volumeID: defaultVolumeID,
nodeID: defaultNodeID,
input: defaultRequest,
output: &csi.ControllerUnpublishVolumeResponse{},
expectError: false,
expectDetached: true,
name: "success",
volumeID: defaultVolumeID,
nodeID: defaultNodeID,
input: defaultRequest,
output: &csi.ControllerUnpublishVolumeResponse{},
expectError: false,
},
{
name: "secrets",
volumeID: defaultVolumeID,
nodeID: defaultNodeID,
secrets: map[string]string{"foo": "bar"},
input: secretsRequest,
output: &csi.ControllerUnpublishVolumeResponse{},
expectError: false,
expectDetached: true,
name: "secrets",
volumeID: defaultVolumeID,
nodeID: defaultNodeID,
secrets: map[string]string{"foo": "bar"},
input: secretsRequest,
output: &csi.ControllerUnpublishVolumeResponse{},
expectError: false,
},
{
name: "gRPC final error",
volumeID: defaultVolumeID,
nodeID: defaultNodeID,
input: defaultRequest,
output: nil,
injectError: codes.NotFound,
expectError: true,
expectDetached: true,
name: "gRPC final error",
volumeID: defaultVolumeID,
nodeID: defaultNodeID,
input: defaultRequest,
output: nil,
injectError: codes.PermissionDenied,
expectError: true,
},
{
name: "gRPC transient error",
volumeID: defaultVolumeID,
nodeID: defaultNodeID,
input: defaultRequest,
output: nil,
injectError: codes.DeadlineExceeded,
expectError: true,
expectDetached: false,
name: "gRPC transient error",
volumeID: defaultVolumeID,
nodeID: defaultNodeID,
input: defaultRequest,
output: nil,
injectError: codes.DeadlineExceeded,
expectError: true,
},
{
// Explicitly test NotFound, it should be ignored.
name: "gRPC NotFound error",
volumeID: defaultVolumeID,
nodeID: defaultNodeID,
input: defaultRequest,
output: nil,
injectError: codes.NotFound,
expectError: false,
},
}

Expand All @@ -365,15 +370,12 @@ func TestDetachAttach(t *testing.T) {
}

a := NewAttacher(csiConn)
detached, err := a.Detach(context.Background(), test.volumeID, test.nodeID, test.secrets)
err := a.Detach(context.Background(), test.volumeID, test.nodeID, test.secrets)
if test.expectError && err == nil {
t.Errorf("test %q: Expected error, got none", test.name)
}
if !test.expectError && err != nil {
t.Errorf("test %q: got error: %v", test.name, err)
}
if detached != test.expectDetached {
t.Errorf("test %q: expected detached=%v, got %v", test.name, test.expectDetached, detached)
}
}
}
12 changes: 4 additions & 8 deletions pkg/controller/csi_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"k8s.io/klog"

"github.com/kubernetes-csi/external-attacher/pkg/attacher"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
storage "k8s.io/api/storage/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
Expand Down Expand Up @@ -359,17 +359,13 @@ func (h *csiHandler) csiDetach(va *storage.VolumeAttachment) (*storage.VolumeAtt

ctx, cancel := context.WithTimeout(context.Background(), h.timeout)
defer cancel()
detached, err := h.attacher.Detach(ctx, volumeHandle, nodeID, secrets)
if err != nil && !detached {
err = h.attacher.Detach(ctx, volumeHandle, nodeID, secrets)
if err != nil {
// The volume may not be fully detached. Save the error and try again
// after backoff.
return va, err
}
if err != nil {
klog.V(2).Infof("Detached %q with error %s", va.Name, err.Error())
} else {
klog.V(2).Infof("Detached %q", va.Name)
}
klog.V(2).Infof("Detached %q", va.Name)

if va, err := markAsDetached(h.client, va); err != nil {
return va, fmt.Errorf("could not mark as detached: %s", err)
Expand Down
41 changes: 15 additions & 26 deletions pkg/controller/csi_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (

"github.com/kubernetes-csi/external-attacher/pkg/attacher"

"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
storage "k8s.io/api/storage/v1beta1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -244,10 +244,10 @@ func TestCSIHandler(t *testing.T) {
var noAttrs map[string]string
var noSecrets map[string]string
var notDetached = false
var detached = true
var success error
var readWrite = false
var readOnly = true
var ignored = false // the value is irrelevant for given call

tests := []testCase{
//
Expand Down Expand Up @@ -633,7 +633,7 @@ func TestCSIHandler(t *testing.T) {
core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, deleted(va(false /*attached*/, "", ann))),
},
expectedCSICalls: []csiCall{
{"detach", testVolumeHandle, testNodeID, noAttrs, noSecrets, readWrite, success, detached, noMetadata, 0},
{"detach", testVolumeHandle, testNodeID, noAttrs, noSecrets, readWrite, success, ignored, noMetadata, 0},
},
},
{
Expand All @@ -645,7 +645,7 @@ func TestCSIHandler(t *testing.T) {
core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, deleted(va(false /*attached*/, "", ann))),
},
expectedCSICalls: []csiCall{
{"detach", testVolumeHandle, testNodeID, noAttrs, map[string]string{"foo": "bar"}, readWrite, success, detached, noMetadata, 0},
{"detach", testVolumeHandle, testNodeID, noAttrs, map[string]string{"foo": "bar"}, readWrite, success, ignored, noMetadata, 0},
},
},
{
Expand All @@ -657,7 +657,7 @@ func TestCSIHandler(t *testing.T) {
core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, deleted(va(false /*attached*/, "", ann))),
},
expectedCSICalls: []csiCall{
{"detach", testVolumeHandle, testNodeID, noAttrs, map[string]string{}, readWrite, success, detached, noMetadata, 0},
{"detach", testVolumeHandle, testNodeID, noAttrs, map[string]string{}, readWrite, success, ignored, noMetadata, 0},
},
},
{
Expand All @@ -671,16 +671,16 @@ func TestCSIHandler(t *testing.T) {
expectedCSICalls: []csiCall{},
},
{
name: "CSI detach fails with transient error -> controller retries",
name: "CSI detach fails with an error -> controller retries",
initialObjects: []runtime.Object{pvWithFinalizer(), node()},
addedVA: deleted(va(true, fin, ann)),
expectedActions: []core.Action{
core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, vaWithDetachError(deleted(va(true /*attached*/, fin, ann)), "mock error")),
core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, deleted(va(false /*attached*/, "", ann))),
},
expectedCSICalls: []csiCall{
{"detach", testVolumeHandle, testNodeID, noAttrs, noSecrets, readWrite, fmt.Errorf("mock error"), notDetached, noMetadata, 0},
{"detach", testVolumeHandle, testNodeID, noAttrs, noSecrets, readWrite, success, detached, noMetadata, 0},
{"detach", testVolumeHandle, testNodeID, noAttrs, noSecrets, readWrite, fmt.Errorf("mock error"), ignored, noMetadata, 0},
{"detach", testVolumeHandle, testNodeID, noAttrs, noSecrets, readWrite, success, ignored, noMetadata, 0},
},
},
{
Expand All @@ -691,19 +691,8 @@ func TestCSIHandler(t *testing.T) {
core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, deleted(va(false /*attached*/, "", ann))),
},
expectedCSICalls: []csiCall{
{"detach", testVolumeHandle, testNodeID, noAttrs, noSecrets, readWrite, success, detached, noMetadata, 500 * time.Millisecond},
{"detach", testVolumeHandle, testNodeID, noAttrs, noSecrets, readWrite, success, detached, noMetadata, time.Duration(0)},
},
},
{
name: "CSI detach fails with final error -> controller does not retry",
initialObjects: []runtime.Object{pvWithFinalizer(), node()},
addedVA: deleted(va(true, fin, ann)),
expectedActions: []core.Action{
core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, deleted(va(false /*attached*/, "", ann))),
},
expectedCSICalls: []csiCall{
{"detach", testVolumeHandle, testNodeID, noAttrs, noSecrets, readWrite, fmt.Errorf("mock error"), detached, noMetadata, 0},
{"detach", testVolumeHandle, testNodeID, noAttrs, noSecrets, readWrite, success, ignored, noMetadata, 500 * time.Millisecond},
{"detach", testVolumeHandle, testNodeID, noAttrs, noSecrets, readWrite, success, ignored, noMetadata, time.Duration(0)},
},
},
{
Expand Down Expand Up @@ -773,7 +762,7 @@ func TestCSIHandler(t *testing.T) {
core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, deleted(va(false /*attached*/, "", map[string]string{vaNodeIDAnnotation: "annotatedNodeID"}))),
},
expectedCSICalls: []csiCall{
{"detach", testVolumeHandle, "annotatedNodeID", noAttrs, noSecrets, readWrite, success, detached, noMetadata, 0},
{"detach", testVolumeHandle, "annotatedNodeID", noAttrs, noSecrets, readWrite, success, ignored, noMetadata, 0},
},
},
{
Expand All @@ -784,7 +773,7 @@ func TestCSIHandler(t *testing.T) {
core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, deleted(va(false /*attached*/, "", map[string]string{vaNodeIDAnnotation: "annotatedNodeID"}))),
},
expectedCSICalls: []csiCall{
{"detach", testVolumeHandle, testNodeID, noAttrs, noSecrets, readWrite, success, detached, noMetadata, 0},
{"detach", testVolumeHandle, testNodeID, noAttrs, noSecrets, readWrite, success, ignored, noMetadata, 0},
},
},
{
Expand Down Expand Up @@ -816,8 +805,8 @@ func TestCSIHandler(t *testing.T) {
core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, deleted(va(false, "", ann))),
},
expectedCSICalls: []csiCall{
{"detach", testVolumeHandle, testNodeID, noAttrs, noSecrets, readWrite, success, detached, noMetadata, 0},
{"detach", testVolumeHandle, testNodeID, noAttrs, noSecrets, readWrite, success, detached, noMetadata, 0},
{"detach", testVolumeHandle, testNodeID, noAttrs, noSecrets, readWrite, success, ignored, noMetadata, 0},
{"detach", testVolumeHandle, testNodeID, noAttrs, noSecrets, readWrite, success, ignored, noMetadata, 0},
},
},
{
Expand All @@ -830,7 +819,7 @@ func TestCSIHandler(t *testing.T) {
},
expectedCSICalls: []csiCall{
{"detach", "projects/UNSPECIFIED/zones/testZone/disks/testpd", testNodeID,
map[string]string{"partition": "0"}, noSecrets, readWrite, success, detached, noMetadata, 0},
map[string]string{"partition": "0"}, noSecrets, readWrite, success, ignored, noMetadata, 0},
},
},
//
Expand Down
8 changes: 4 additions & 4 deletions pkg/controller/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,10 +403,10 @@ func (f *fakeCSIConnection) Attach(ctx context.Context, volumeID string, readOnl
return call.metadata, call.detached, call.err
}

func (f *fakeCSIConnection) Detach(ctx context.Context, volumeID string, nodeID string, secrets map[string]string) (bool, error) {
func (f *fakeCSIConnection) Detach(ctx context.Context, volumeID string, nodeID string, secrets map[string]string) error {
if f.index >= len(f.calls) {
f.t.Errorf("Unexpected CSI Detach call: volume=%s, node=%s, index: %d, calls: %+v", volumeID, nodeID, f.index, f.calls)
return true, fmt.Errorf("unexpected call")
return fmt.Errorf("unexpected call")
}
call := f.calls[f.index]
f.index++
Expand Down Expand Up @@ -437,9 +437,9 @@ func (f *fakeCSIConnection) Detach(ctx context.Context, volumeID string, nodeID
}

if err != nil {
return true, err
return err
}
return call.detached, call.err
return call.err
}

func (f *fakeCSIConnection) Close() error {
Expand Down