diff --git a/CHANGELOG-0.x.md b/CHANGELOG-0.x.md index 8b209f3a20..a5fd0400ac 100644 --- a/CHANGELOG-0.x.md +++ b/CHANGELOG-0.x.md @@ -1,3 +1,8 @@ +# v1.1.1 + +### Bug fixes +- update inFlight cache to avoid race condition on volume operation ([#924])(https://github.com/kubernetes-sigs/aws-ebs-csi-driver/pull/924), [@AndyXiangLi](https://github.com/AndyXiangLi)) + # v1.1.0 ## Notable changes diff --git a/Makefile b/Makefile index 989c45dd6e..dfd3d7bcfa 100644 --- a/Makefile +++ b/Makefile @@ -14,7 +14,7 @@ PKG=github.com/kubernetes-sigs/aws-ebs-csi-driver IMAGE?=amazon/aws-ebs-csi-driver -VERSION=v1.0.0 +VERSION=v1.1.1 VERSION_AMAZONLINUX=$(VERSION)-amazonlinux GIT_COMMIT?=$(shell git rev-parse HEAD) BUILD_DATE?=$(shell date -u +"%Y-%m-%dT%H:%M:%SZ") diff --git a/charts/aws-ebs-csi-driver/CHANGELOG.md b/charts/aws-ebs-csi-driver/CHANGELOG.md new file mode 100644 index 0000000000..b82cf8631f --- /dev/null +++ b/charts/aws-ebs-csi-driver/CHANGELOG.md @@ -0,0 +1,6 @@ +# Helm chart + +# v1.2.4 +* Bump app/driver version to `v1.1.1` +* Install VolumeSnapshotClass, VolumeSnapshotContent, VolumeSnapshot CRDs if enableVolumeSnapshot is true +* Only run csi-snapshotter sidecar if enableVolumeSnapshot is true or if CRDs are already installed diff --git a/charts/aws-ebs-csi-driver/Chart.yaml b/charts/aws-ebs-csi-driver/Chart.yaml index 8bebbc9943..fdba841218 100644 --- a/charts/aws-ebs-csi-driver/Chart.yaml +++ b/charts/aws-ebs-csi-driver/Chart.yaml @@ -1,8 +1,8 @@ apiVersion: v1 -appVersion: "1.1.0" +appVersion: "1.1.1" name: aws-ebs-csi-driver description: A Helm chart for AWS EBS CSI Driver -version: 1.2.0 +version: 1.2.4 kubeVersion: ">=1.17.0-0" home: https://github.com/kubernetes-sigs/aws-ebs-csi-driver sources: diff --git a/charts/aws-ebs-csi-driver/values.yaml b/charts/aws-ebs-csi-driver/values.yaml index 5a9e6e2138..96ca4006cc 100644 --- a/charts/aws-ebs-csi-driver/values.yaml +++ b/charts/aws-ebs-csi-driver/values.yaml @@ -4,7 +4,7 @@ image: repository: k8s.gcr.io/provider-aws/aws-ebs-csi-driver - tag: "v1.1.0" + tag: "v1.1.1" pullPolicy: IfNotPresent sidecars: @@ -124,7 +124,6 @@ controller: # whenUnsatisfiable: ScheduleAnyway topologySpreadConstraints: [] - # Moving to values under node # The "maximum number of attachable volumes" per node volumeAttachLimit: diff --git a/deploy/kubernetes/base/controller.yaml b/deploy/kubernetes/base/controller.yaml index 9b8327223e..bb693effcd 100644 --- a/deploy/kubernetes/base/controller.yaml +++ b/deploy/kubernetes/base/controller.yaml @@ -31,7 +31,7 @@ spec: tolerationSeconds: 300 containers: - name: ebs-plugin - image: k8s.gcr.io/provider-aws/aws-ebs-csi-driver:v1.1.0 + image: k8s.gcr.io/provider-aws/aws-ebs-csi-driver:v1.1.1 imagePullPolicy: IfNotPresent args: # - {all,controller,node} # specify the driver mode diff --git a/deploy/kubernetes/base/node.yaml b/deploy/kubernetes/base/node.yaml index 038fc488e9..907f7e3d97 100644 --- a/deploy/kubernetes/base/node.yaml +++ b/deploy/kubernetes/base/node.yaml @@ -42,7 +42,7 @@ spec: - name: ebs-plugin securityContext: privileged: true - image: k8s.gcr.io/provider-aws/aws-ebs-csi-driver:v1.1.0 + image: k8s.gcr.io/provider-aws/aws-ebs-csi-driver:v1.1.1 args: - node - --endpoint=$(CSI_ENDPOINT) diff --git a/deploy/kubernetes/overlays/stable/ecr/kustomization.yaml b/deploy/kubernetes/overlays/stable/ecr/kustomization.yaml index d36d7ca71e..e5f2a1ddf4 100644 --- a/deploy/kubernetes/overlays/stable/ecr/kustomization.yaml +++ b/deploy/kubernetes/overlays/stable/ecr/kustomization.yaml @@ -5,7 +5,7 @@ bases: images: - name: k8s.gcr.io/provider-aws/aws-ebs-csi-driver newName: 602401143452.dkr.ecr.us-west-2.amazonaws.com/eks/aws-ebs-csi-driver - newTag: v1.0.0 + newTag: v1.1.1 - name: k8s.gcr.io/sig-storage/csi-provisioner newName: public.ecr.aws/eks-distro/kubernetes-csi/external-provisioner newTag: v2.1.1-eks-1-18-3 diff --git a/deploy/kubernetes/overlays/stable/kustomization.yaml b/deploy/kubernetes/overlays/stable/kustomization.yaml index 36f0134e75..50ced3ef5d 100644 --- a/deploy/kubernetes/overlays/stable/kustomization.yaml +++ b/deploy/kubernetes/overlays/stable/kustomization.yaml @@ -4,7 +4,7 @@ bases: - ../../base images: - name: k8s.gcr.io/provider-aws/aws-ebs-csi-driver - newTag: v1.1.0 + newTag: v1.1.1 - name: k8s.gcr.io/sig-storage/csi-provisioner newTag: v2.1.1 - name: k8s.gcr.io/sig-storage/csi-attacher diff --git a/docs/README.md b/docs/README.md index 5ef60d1814..7b1638b992 100644 --- a/docs/README.md +++ b/docs/README.md @@ -12,7 +12,7 @@ The [Amazon Elastic Block Store](https://aws.amazon.com/ebs/) Container Storage | AWS EBS CSI Driver \ CSI Version | v0.3.0| v1.0.0 | v1.1.0 | |----------------------------------------|-------|--------|--------| | master branch | no | no | yes | -| v1.1.0 | no | no | yes | +| v1.1.x | no | no | yes | | v1.0.0 | no | no | yes | | v0.10.x | no | no | yes | | v0.9.x | no | no | yes | @@ -96,7 +96,7 @@ Following sections are Kubernetes specific. If you are Kubernetes user, use foll ## Container Images: |AWS EBS CSI Driver Version | Image | |---------------------------|--------------------------------------------------| -|master branch |amazon/aws-ebs-csi-driver:latest | +|v1.1.1 |k8s.gcr.io/provider-aws/aws-ebs-csi-driver:v1.1.1 | |v1.1.0 |k8s.gcr.io/provider-aws/aws-ebs-csi-driver:v1.1.0 | |v1.0.0 |k8s.gcr.io/provider-aws/aws-ebs-csi-driver:v1.0.0 | |v0.10.1 |k8s.gcr.io/provider-aws/aws-ebs-csi-driver:v0.10.1| diff --git a/pkg/driver/controller.go b/pkg/driver/controller.go index 03a550deff..21f37eb97f 100644 --- a/pkg/driver/controller.go +++ b/pkg/driver/controller.go @@ -99,27 +99,21 @@ func newControllerService(driverOptions *DriverOptions) controllerService { func (d *controllerService) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) { klog.V(4).Infof("CreateVolume: called with args %+v", *req) - volName := req.GetName() - if len(volName) == 0 { - return nil, status.Error(codes.InvalidArgument, "Volume name not provided") + if err := validateCreateVolumeRequest(req); err != nil { + return nil, err } - volSizeBytes, err := getVolSizeBytes(req) if err != nil { return nil, err } + volName := req.GetName() - volCaps := req.GetVolumeCapabilities() - if len(volCaps) == 0 { - return nil, status.Error(codes.InvalidArgument, "Volume capabilities not provided") - } - - if !isValidVolumeCapabilities(volCaps) { - modes := util.GetAccessModes(volCaps) - stringModes := strings.Join(*modes, ", ") - errString := "Volume capabilities " + stringModes + " not supported. Only AccessModes[ReadWriteOnce] supported." - return nil, status.Error(codes.InvalidArgument, errString) + // check if a request is already in-flight + if ok := d.inFlight.Insert(volName); !ok { + msg := fmt.Sprintf("Create volume request for %s is already in progress", volName) + return nil, status.Error(codes.Aborted, msg) } + defer d.inFlight.Delete(volName) disk, err := d.cloud.GetDiskByName(ctx, volName, volSizeBytes) if err != nil { @@ -217,13 +211,6 @@ func (d *controllerService) CreateVolume(ctx context.Context, req *csi.CreateVol return newCreateVolumeResponse(disk), nil } - // check if a request is already in-flight because the CreateVolume API is not idempotent - if ok := d.inFlight.Insert(req.String()); !ok { - msg := fmt.Sprintf("Create volume request for %s is already in progress", volName) - return nil, status.Error(codes.Aborted, msg) - } - defer d.inFlight.Delete(req.String()) - // create a new volume zone := pickAvailabilityZone(req.GetAccessibilityRequirements()) outpostArn := getOutpostArn(req.GetAccessibilityRequirements()) @@ -264,12 +251,40 @@ func (d *controllerService) CreateVolume(ctx context.Context, req *csi.CreateVol return newCreateVolumeResponse(disk), nil } +func validateCreateVolumeRequest(req *csi.CreateVolumeRequest) error { + volName := req.GetName() + if len(volName) == 0 { + return status.Error(codes.InvalidArgument, "Volume name not provided") + } + + volCaps := req.GetVolumeCapabilities() + if len(volCaps) == 0 { + return status.Error(codes.InvalidArgument, "Volume capabilities not provided") + } + + if !isValidVolumeCapabilities(volCaps) { + modes := util.GetAccessModes(volCaps) + stringModes := strings.Join(*modes, ", ") + errString := "Volume capabilities " + stringModes + " not supported. Only AccessModes[ReadWriteOnce] supported." + return status.Error(codes.InvalidArgument, errString) + } + return nil +} + func (d *controllerService) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) { klog.V(4).Infof("DeleteVolume: called with args: %+v", *req) + if err := validateDeleteVolumeRequest(req); err != nil { + return nil, err + } + volumeID := req.GetVolumeId() - if len(volumeID) == 0 { - return nil, status.Error(codes.InvalidArgument, "Volume ID not provided") + + // check if a request is already in-flight + if ok := d.inFlight.Insert(volumeID); !ok { + msg := fmt.Sprintf(internal.VolumeOperationAlreadyExistsErrorMsg, volumeID) + return nil, status.Error(codes.Aborted, msg) } + defer d.inFlight.Delete(volumeID) if _, err := d.cloud.DeleteDisk(ctx, volumeID); err != nil { if err == cloud.ErrNotFound { @@ -282,30 +297,21 @@ func (d *controllerService) DeleteVolume(ctx context.Context, req *csi.DeleteVol return &csi.DeleteVolumeResponse{}, nil } +func validateDeleteVolumeRequest(req *csi.DeleteVolumeRequest) error { + if len(req.GetVolumeId()) == 0 { + return status.Error(codes.InvalidArgument, "Volume ID not provided") + } + return nil +} + func (d *controllerService) ControllerPublishVolume(ctx context.Context, req *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) { klog.V(4).Infof("ControllerPublishVolume: called with args %+v", *req) - volumeID := req.GetVolumeId() - if len(volumeID) == 0 { - return nil, status.Error(codes.InvalidArgument, "Volume ID not provided") + if err := validateControllerPublishVolumeRequest(req); err != nil { + return nil, err } + volumeID := req.GetVolumeId() nodeID := req.GetNodeId() - if len(nodeID) == 0 { - return nil, status.Error(codes.InvalidArgument, "Node ID not provided") - } - - volCap := req.GetVolumeCapability() - if volCap == nil { - return nil, status.Error(codes.InvalidArgument, "Volume capability not provided") - } - - caps := []*csi.VolumeCapability{volCap} - if !isValidVolumeCapabilities(caps) { - modes := util.GetAccessModes(caps) - stringModes := strings.Join(*modes, ", ") - errString := "Volume capabilities " + stringModes + " not supported. Only AccessModes[ReadWriteOnce] supported." - return nil, status.Error(codes.InvalidArgument, errString) - } if !d.cloud.IsExistInstance(ctx, nodeID) { return nil, status.Errorf(codes.NotFound, "Instance %q not found", nodeID) @@ -333,17 +339,38 @@ func (d *controllerService) ControllerPublishVolume(ctx context.Context, req *cs return &csi.ControllerPublishVolumeResponse{PublishContext: pvInfo}, nil } +func validateControllerPublishVolumeRequest(req *csi.ControllerPublishVolumeRequest) error { + if len(req.GetVolumeId()) == 0 { + return status.Error(codes.InvalidArgument, "Volume ID not provided") + } + + if len(req.GetNodeId()) == 0 { + return status.Error(codes.InvalidArgument, "Node ID not provided") + } + + volCap := req.GetVolumeCapability() + if volCap == nil { + return status.Error(codes.InvalidArgument, "Volume capability not provided") + } + + caps := []*csi.VolumeCapability{volCap} + if !isValidVolumeCapabilities(caps) { + modes := util.GetAccessModes(caps) + stringModes := strings.Join(*modes, ", ") + errString := "Volume capabilities " + stringModes + " not supported. Only AccessModes[ReadWriteOnce] supported." + return status.Error(codes.InvalidArgument, errString) + } + return nil +} + func (d *controllerService) ControllerUnpublishVolume(ctx context.Context, req *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error) { klog.V(4).Infof("ControllerUnpublishVolume: called with args %+v", *req) - volumeID := req.GetVolumeId() - if len(volumeID) == 0 { - return nil, status.Error(codes.InvalidArgument, "Volume ID not provided") + if err := validateControllerUnpublishVolumeRequest(req); err != nil { + return nil, err } + volumeID := req.GetVolumeId() nodeID := req.GetNodeId() - if len(nodeID) == 0 { - return nil, status.Error(codes.InvalidArgument, "Node ID not provided") - } if err := d.cloud.DetachDisk(ctx, volumeID, nodeID); err != nil { if err == cloud.ErrNotFound { @@ -356,6 +383,18 @@ func (d *controllerService) ControllerUnpublishVolume(ctx context.Context, req * return &csi.ControllerUnpublishVolumeResponse{}, nil } +func validateControllerUnpublishVolumeRequest(req *csi.ControllerUnpublishVolumeRequest) error { + if len(req.GetVolumeId()) == 0 { + return status.Error(codes.InvalidArgument, "Volume ID not provided") + } + + if len(req.GetNodeId()) == 0 { + return status.Error(codes.InvalidArgument, "Node ID not provided") + } + + return nil +} + func (d *controllerService) ControllerGetCapabilities(ctx context.Context, req *csi.ControllerGetCapabilitiesRequest) (*csi.ControllerGetCapabilitiesResponse, error) { klog.V(4).Infof("ControllerGetCapabilities: called with args %+v", *req) var caps []*csi.ControllerServiceCapability @@ -489,15 +528,20 @@ func isValidVolumeContext(volContext map[string]string) bool { func (d *controllerService) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotRequest) (*csi.CreateSnapshotResponse, error) { klog.V(4).Infof("CreateSnapshot: called with args %+v", req) - snapshotName := req.GetName() - if len(snapshotName) == 0 { - return nil, status.Error(codes.InvalidArgument, "Snapshot name not provided") + if err := validateCreateSnapshotRequest(req); err != nil { + return nil, err } + snapshotName := req.GetName() volumeID := req.GetSourceVolumeId() - if len(volumeID) == 0 { - return nil, status.Error(codes.InvalidArgument, "Snapshot volume source ID not provided") + + // check if a request is already in-flight + if ok := d.inFlight.Insert(snapshotName); !ok { + msg := fmt.Sprintf(internal.VolumeOperationAlreadyExistsErrorMsg, snapshotName) + return nil, status.Error(codes.Aborted, msg) } + defer d.inFlight.Delete(snapshotName) + snapshot, err := d.cloud.GetSnapshotByName(ctx, snapshotName) if err != nil && err != cloud.ErrNotFound { klog.Errorf("Error looking for the snapshot %s: %v", snapshotName, err) @@ -535,12 +579,31 @@ func (d *controllerService) CreateSnapshot(ctx context.Context, req *csi.CreateS return newCreateSnapshotResponse(snapshot) } +func validateCreateSnapshotRequest(req *csi.CreateSnapshotRequest) error { + if len(req.GetName()) == 0 { + return status.Error(codes.InvalidArgument, "Snapshot name not provided") + } + + if len(req.GetSourceVolumeId()) == 0 { + return status.Error(codes.InvalidArgument, "Snapshot volume source ID not provided") + } + return nil +} + func (d *controllerService) DeleteSnapshot(ctx context.Context, req *csi.DeleteSnapshotRequest) (*csi.DeleteSnapshotResponse, error) { klog.V(4).Infof("DeleteSnapshot: called with args %+v", req) + if err := validateDeleteSnapshotRequest(req); err != nil { + return nil, err + } + snapshotID := req.GetSnapshotId() - if len(snapshotID) == 0 { - return nil, status.Error(codes.InvalidArgument, "Snapshot ID not provided") + + // check if a request is already in-flight + if ok := d.inFlight.Insert(snapshotID); !ok { + msg := fmt.Sprintf("DeleteSnapshot for Snapshot %s is already in progress", snapshotID) + return nil, status.Error(codes.Aborted, msg) } + defer d.inFlight.Delete(snapshotID) if _, err := d.cloud.DeleteSnapshot(ctx, snapshotID); err != nil { if err == cloud.ErrNotFound { @@ -553,6 +616,13 @@ func (d *controllerService) DeleteSnapshot(ctx context.Context, req *csi.DeleteS return &csi.DeleteSnapshotResponse{}, nil } +func validateDeleteSnapshotRequest(req *csi.DeleteSnapshotRequest) error { + if len(req.GetSnapshotId()) == 0 { + return status.Error(codes.InvalidArgument, "Snapshot ID not provided") + } + return nil +} + func (d *controllerService) ListSnapshots(ctx context.Context, req *csi.ListSnapshotsRequest) (*csi.ListSnapshotsResponse, error) { klog.V(4).Infof("ListSnapshots: called with args %+v", req) var snapshots []*cloud.Snapshot diff --git a/pkg/driver/controller_test.go b/pkg/driver/controller_test.go index 84d78db1b0..ca2f96115a 100644 --- a/pkg/driver/controller_test.go +++ b/pkg/driver/controller_test.go @@ -1536,11 +1536,10 @@ func TestCreateVolume(t *testing.T) { defer mockCtl.Finish() mockCloud := mocks.NewMockCloud(mockCtl) - mockCloud.EXPECT().GetDiskByName(gomock.Eq(ctx), gomock.Eq(req.Name), gomock.Eq(stdVolSize)).Return(nil, cloud.ErrNotFound) inFlight := internal.NewInFlight() - inFlight.Insert(req.String()) - defer inFlight.Delete(req.String()) + inFlight.Insert(req.GetName()) + defer inFlight.Delete(req.GetName()) awsDriver := controllerService{ cloud: mockCloud, @@ -1549,17 +1548,8 @@ func TestCreateVolume(t *testing.T) { } _, err := awsDriver.CreateVolume(ctx, req) - if err == nil { - t.Fatalf("Expected CreateVolume to fail but got no error") - } - srvErr, ok := status.FromError(err) - if !ok { - t.Fatalf("Could not get error status code from error: %v", srvErr) - } - if srvErr.Code() != codes.Aborted { - t.Fatalf("Expected Aborted but got: %s", srvErr.Code()) - } + checkAbortedErrorCode(t, err) }, }, { @@ -1714,6 +1704,31 @@ func TestDeleteVolume(t *testing.T) { } }, }, + { + name: "fail another request already in-flight", + testFunc: func(t *testing.T) { + req := &csi.DeleteVolumeRequest{ + VolumeId: "vol-test", + } + + ctx := context.Background() + mockCtl := gomock.NewController(t) + defer mockCtl.Finish() + + mockCloud := mocks.NewMockCloud(mockCtl) + inFlight := internal.NewInFlight() + inFlight.Insert(req.GetVolumeId()) + defer inFlight.Delete(req.GetVolumeId()) + awsDriver := controllerService{ + cloud: mockCloud, + inFlight: inFlight, + driverOptions: &DriverOptions{}, + } + _, err := awsDriver.DeleteVolume(ctx, req) + + checkAbortedErrorCode(t, err) + }, + }, } for _, tc := range testCases { @@ -2259,6 +2274,34 @@ func TestCreateSnapshot(t *testing.T) { } }, }, + { + name: "fail with another request in-flight", + testFunc: func(t *testing.T) { + req := &csi.CreateSnapshotRequest{ + Name: "test-snapshot", + Parameters: nil, + SourceVolumeId: "vol-test", + } + + mockCtl := gomock.NewController(t) + defer mockCtl.Finish() + + mockCloud := mocks.NewMockCloud(mockCtl) + + inFlight := internal.NewInFlight() + inFlight.Insert(req.GetName()) + defer inFlight.Delete(req.GetName()) + + awsDriver := controllerService{ + cloud: mockCloud, + inFlight: inFlight, + driverOptions: &DriverOptions{}, + } + _, err := awsDriver.CreateSnapshot(context.Background(), req) + + checkAbortedErrorCode(t, err) + }, + }, } for _, tc := range testCases { @@ -2321,6 +2364,34 @@ func TestDeleteSnapshot(t *testing.T) { } }, }, + { + name: "fail with another request in-flight", + testFunc: func(t *testing.T) { + ctx := context.Background() + + mockCtl := gomock.NewController(t) + defer mockCtl.Finish() + + mockCloud := mocks.NewMockCloud(mockCtl) + + req := &csi.DeleteSnapshotRequest{ + SnapshotId: "test-snapshotID", + } + inFlight := internal.NewInFlight() + inFlight.Insert(req.GetSnapshotId()) + defer inFlight.Delete(req.GetSnapshotId()) + + awsDriver := controllerService{ + cloud: mockCloud, + inFlight: inFlight, + driverOptions: &DriverOptions{}, + } + + _, err := awsDriver.DeleteSnapshot(ctx, req) + + checkAbortedErrorCode(t, err) + }, + }, } for _, tc := range testCases { @@ -3082,3 +3153,17 @@ func TestControllerExpandVolume(t *testing.T) { }) } } + +func checkAbortedErrorCode(t *testing.T, err error) { + if err == nil { + t.Fatalf("Expected operation to fail but got no error") + } + + srvErr, ok := status.FromError(err) + if !ok { + t.Fatalf("Could not get error status code from error: %v", srvErr) + } + if srvErr.Code() != codes.Aborted { + t.Fatalf("Expected Aborted but got: %s", srvErr.Code()) + } +} diff --git a/pkg/driver/internal/inflight.go b/pkg/driver/internal/inflight.go index 5f0d2a9ad7..9b45680fbc 100644 --- a/pkg/driver/internal/inflight.go +++ b/pkg/driver/internal/inflight.go @@ -30,6 +30,10 @@ type Idempotent interface { String() string } +const ( + VolumeOperationAlreadyExistsErrorMsg = "An operation with the given Volume %s already exists" +) + // InFlight is a struct used to manage in flight requests per volumeId. type InFlight struct { mux *sync.Mutex