diff --git a/Gopkg.lock b/Gopkg.lock index 27e494d47..9a3567c66 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -136,15 +136,16 @@ version = "v1.1.5" [[projects]] - digest = "1:b09c9ac14d93f481c768e73a12d4e8527ac25232d593f2ad918ffe462901b654" + digest = "1:a0892607b4f5385bb9fb12759facc8fad4e61b8b557384e4a078150c6ba43623" name = "github.com/kubernetes-csi/csi-lib-utils" packages = [ "connection", "protosanitizer", + "rpc", ] pruneopts = "NUT" - revision = "e581edfed23c6ce10243bc0efb2292cf68b83e1d" - version = "v0.3.1" + revision = "8053f37bf1d11d769c20f9514538c4b3b906e1f7" + version = "v0.4.0-rc1" [[projects]] digest = "1:0f47ba38b647bb8e7cddb71c3341134b2c2eaa8cef2af82291b5c9870ee7f572" @@ -688,6 +689,7 @@ "github.com/golang/protobuf/proto", "github.com/kubernetes-csi/csi-lib-utils/connection", "github.com/kubernetes-csi/csi-lib-utils/protosanitizer", + "github.com/kubernetes-csi/csi-lib-utils/rpc", "github.com/kubernetes-csi/csi-test/driver", "google.golang.org/grpc", "google.golang.org/grpc/codes", diff --git a/Gopkg.toml b/Gopkg.toml index 29b409cdd..679d66e02 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -43,7 +43,7 @@ [[constraint]] name = "github.com/kubernetes-csi/csi-lib-utils" - version = "0.3.1" + version = ">=0.4.0-rc1" [prune] non-go = true diff --git a/cmd/csi-attacher/main.go b/cmd/csi-attacher/main.go index e414715ac..d50de7907 100644 --- a/cmd/csi-attacher/main.go +++ b/cmd/csi-attacher/main.go @@ -31,8 +31,12 @@ import ( csiinformers "k8s.io/csi-api/pkg/client/informers/externalversions" "k8s.io/klog" - "github.com/kubernetes-csi/external-attacher/pkg/connection" + "github.com/container-storage-interface/spec/lib/go/csi" + "github.com/kubernetes-csi/csi-lib-utils/connection" + "github.com/kubernetes-csi/csi-lib-utils/rpc" + "github.com/kubernetes-csi/external-attacher/pkg/attacher" "github.com/kubernetes-csi/external-attacher/pkg/controller" + "google.golang.org/grpc" ) const ( @@ -103,20 +107,20 @@ func main() { var csiFactory csiinformers.SharedInformerFactory var handler controller.Handler - var attacher string + var csiAttacher string if *dummy { // Do not connect to any CSI, mark everything as attached. handler = controller.NewTrivialHandler(clientset) - attacher = dummyAttacherName + csiAttacher = dummyAttacherName } else { // Connect to CSI. - csiConn, err := connection.New(*csiAddress) + csiConn, err := connection.Connect(*csiAddress) if err != nil { klog.Error(err.Error()) os.Exit(1) } - err = csiConn.Probe(*timeout) + err = rpc.ProbeForever(csiConn, *timeout) if err != nil { klog.Error(err.Error()) os.Exit(1) @@ -125,14 +129,14 @@ func main() { // Find driver name. ctx, cancel := context.WithTimeout(context.Background(), csiTimeout) defer cancel() - attacher, err = csiConn.GetDriverName(ctx) + csiAttacher, err = rpc.GetDriverName(ctx, csiConn) if err != nil { klog.Error(err.Error()) os.Exit(1) } - klog.V(2).Infof("CSI driver name: %q", attacher) + klog.V(2).Infof("CSI driver name: %q", csiAttacher) - supportsService, err := csiConn.SupportsPluginControllerService(ctx) + supportsService, err := supportsPluginControllerService(ctx, csiConn) if err != nil { klog.Error(err.Error()) os.Exit(1) @@ -142,7 +146,7 @@ func main() { klog.V(2).Infof("CSI driver does not support Plugin Controller Service, using trivial handler") } else { // Find out if the driver supports attach/detach. - supportsAttach, supportsReadOnly, err := csiConn.SupportsControllerPublish(ctx) + supportsAttach, supportsReadOnly, err := supportsControllerPublish(ctx, csiConn) if err != nil { klog.Error(err.Error()) os.Exit(1) @@ -153,7 +157,8 @@ func main() { vaLister := factory.Storage().V1beta1().VolumeAttachments().Lister() csiFactory := csiinformers.NewSharedInformerFactory(csiClientset, *resync) nodeInfoLister := csiFactory.Csi().V1alpha1().CSINodeInfos().Lister() - handler = controller.NewCSIHandler(clientset, csiClientset, attacher, csiConn, pvLister, nodeLister, nodeInfoLister, vaLister, timeout, supportsReadOnly) + attacher := attacher.NewAttacher(csiConn) + handler = controller.NewCSIHandler(clientset, csiClientset, csiAttacher, attacher, pvLister, nodeLister, nodeInfoLister, vaLister, timeout, supportsReadOnly) klog.V(2).Infof("CSI driver supports ControllerPublishUnpublish, using real CSI handler") } else { handler = controller.NewTrivialHandler(clientset) @@ -164,7 +169,7 @@ func main() { ctrl := controller.NewCSIAttachController( clientset, - attacher, + csiAttacher, handler, factory.Storage().V1beta1().VolumeAttachments(), factory.Core().V1().PersistentVolumes(), @@ -192,7 +197,7 @@ func main() { os.Exit(1) } // Name of config map with leader election lock - lockName := "external-attacher-leader-" + attacher + lockName := "external-attacher-leader-" + csiAttacher runAsLeader(clientset, *leaderElectionNamespace, *leaderElectionIdentity, lockName, run) } } @@ -203,3 +208,23 @@ func buildConfig(kubeconfig string) (*rest.Config, error) { } return rest.InClusterConfig() } + +func supportsControllerPublish(ctx context.Context, csiConn *grpc.ClientConn) (supportsControllerPublish bool, supportsPublishReadOnly bool, err error) { + caps, err := rpc.GetControllerCapabilities(ctx, csiConn) + if err != nil { + return false, false, err + } + + supportsControllerPublish = caps[csi.ControllerServiceCapability_RPC_PUBLISH_UNPUBLISH_VOLUME] + supportsPublishReadOnly = caps[csi.ControllerServiceCapability_RPC_PUBLISH_READONLY] + return supportsControllerPublish, supportsPublishReadOnly, nil +} + +func supportsPluginControllerService(ctx context.Context, csiConn *grpc.ClientConn) (bool, error) { + caps, err := rpc.GetPluginCapabilities(ctx, csiConn) + if err != nil { + return false, err + } + + return caps[csi.PluginCapability_Service_CONTROLLER_SERVICE], nil +} diff --git a/pkg/connection/connection.go b/pkg/attacher/attacher.go similarity index 59% rename from pkg/connection/connection.go rename to pkg/attacher/attacher.go index 356426339..555fe41e8 100644 --- a/pkg/connection/connection.go +++ b/pkg/attacher/attacher.go @@ -14,36 +14,22 @@ See the License for the specific language governing permissions and limitations under the License. */ -package connection +package attacher import ( "context" - "time" "github.com/container-storage-interface/spec/lib/go/csi" - "github.com/kubernetes-csi/csi-lib-utils/connection" "github.com/kubernetes-csi/csi-lib-utils/protosanitizer" + "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "k8s.io/klog" ) -// CSIConnection is gRPC connection to a remote CSI driver and abstracts all -// CSI calls. -type CSIConnection interface { - // GetDriverName returns driver name as discovered by GetPluginInfo() - // gRPC call. - GetDriverName(ctx context.Context) (string, error) - - // SupportsControllerPublish returns true if the CSI driver reports - // PUBLISH_UNPUBLISH_VOLUME in ControllerGetCapabilities() gRPC call. - SupportsControllerPublish(ctx context.Context) (supportsControllerPublish bool, supportsPublishReadOnly bool, err error) - - // SupportsPluginControllerService return true if the CSI driver reports - // CONTROLLER_SERVICE in GetPluginCapabilities() gRPC call. - SupportsPluginControllerService(ctx context.Context) (bool, error) - +// Attacher implements attach/detach operations against a remote CSI driver. +type Attacher interface { // Attach given volume to given node. Returns PublishVolumeInfo. Note that // "detached" is returned on error and means that the volume is for sure // detached from the node. "false" means that the volume may be either @@ -56,64 +42,26 @@ type CSIConnection interface { // "false" means that the volume may or may not be detached and caller // should retry. Detach(ctx context.Context, volumeID string, nodeID string, secrets map[string]string) (detached bool, err error) - - // Probe checks that the CSI driver is ready to process requests - Probe(singleProbeTimeout time.Duration) error - - // Close the connection - Close() error } -type csiConnection struct { +type attacher struct { conn *grpc.ClientConn capabilities []csi.ControllerServiceCapability } var ( - _ CSIConnection = &csiConnection{} + _ Attacher = &attacher{} ) -// New provides a new CSI connection object. -func New(address string) (CSIConnection, error) { - conn, err := connection.Connect(address, connection.OnConnectionLoss(connection.ExitOnConnectionLoss())) - if err != nil { - return nil, err - } - return &csiConnection{ +// NewAttacher provides a new Attacher object. +func NewAttacher(conn *grpc.ClientConn) Attacher { + return &attacher{ conn: conn, - }, nil -} - -func (c *csiConnection) GetDriverName(ctx context.Context) (string, error) { - return connection.GetDriverName(ctx, c.conn) -} - -func (c *csiConnection) Probe(singleProbeTimeout time.Duration) error { - return connection.ProbeForever(c.conn, singleProbeTimeout) -} - -func (c *csiConnection) SupportsControllerPublish(ctx context.Context) (supportsControllerPublish bool, supportsPublishReadOnly bool, err error) { - caps, err := connection.GetControllerCapabilities(ctx, c.conn) - if err != nil { - return false, false, err } - - supportsControllerPublish = caps[csi.ControllerServiceCapability_RPC_PUBLISH_UNPUBLISH_VOLUME] - supportsPublishReadOnly = caps[csi.ControllerServiceCapability_RPC_PUBLISH_READONLY] - return supportsControllerPublish, supportsPublishReadOnly, nil } -func (c *csiConnection) SupportsPluginControllerService(ctx context.Context) (bool, error) { - caps, err := connection.GetPluginCapabilities(ctx, c.conn) - if err != nil { - return false, err - } - - return caps[csi.PluginCapability_Service_CONTROLLER_SERVICE], nil -} - -func (c *csiConnection) Attach(ctx context.Context, volumeID string, readOnly bool, nodeID string, caps *csi.VolumeCapability, context, secrets map[string]string) (metadata map[string]string, detached bool, err error) { - client := csi.NewControllerClient(c.conn) +func (a *attacher) Attach(ctx context.Context, volumeID string, readOnly bool, nodeID string, caps *csi.VolumeCapability, context, secrets map[string]string) (metadata map[string]string, detached bool, err error) { + client := csi.NewControllerClient(a.conn) req := csi.ControllerPublishVolumeRequest{ VolumeId: volumeID, @@ -131,8 +79,8 @@ func (c *csiConnection) Attach(ctx context.Context, volumeID string, readOnly bo return rsp.PublishContext, false, nil } -func (c *csiConnection) Detach(ctx context.Context, volumeID string, nodeID string, secrets map[string]string) (detached bool, err error) { - client := csi.NewControllerClient(c.conn) +func (a *attacher) Detach(ctx context.Context, volumeID string, nodeID string, secrets map[string]string) (detached bool, err error) { + client := csi.NewControllerClient(a.conn) req := csi.ControllerUnpublishVolumeRequest{ VolumeId: volumeID, @@ -147,10 +95,6 @@ func (c *csiConnection) Detach(ctx context.Context, volumeID string, nodeID stri return true, nil } -func (c *csiConnection) Close() error { - return c.conn.Close() -} - func logGRPC(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { klog.V(5).Infof("GRPC call: %s", method) klog.V(5).Infof("GRPC request: %s", protosanitizer.StripSecrets(req)) diff --git a/pkg/connection/connection_test.go b/pkg/attacher/attacher_test.go similarity index 55% rename from pkg/connection/connection_test.go rename to pkg/attacher/attacher_test.go index 216410d11..06d6eb14e 100644 --- a/pkg/connection/connection_test.go +++ b/pkg/attacher/attacher_test.go @@ -1,5 +1,5 @@ /* -Copyright 2017 The Kubernetes Authors. +Copyright 2019 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package connection +package attacher import ( "context" @@ -28,7 +28,9 @@ import ( "github.com/container-storage-interface/spec/lib/go/csi" "github.com/golang/mock/gomock" "github.com/golang/protobuf/proto" + "github.com/kubernetes-csi/csi-lib-utils/connection" "github.com/kubernetes-csi/csi-test/driver" + "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) @@ -63,7 +65,7 @@ func tempDir(t *testing.T) string { return dir } -func createMockServer(t *testing.T, tmpdir string) (*gomock.Controller, *driver.MockCSIDriver, *driver.MockIdentityServer, *driver.MockControllerServer, CSIConnection, error) { +func createMockServer(t *testing.T, tmpdir string) (*gomock.Controller, *driver.MockCSIDriver, *driver.MockIdentityServer, *driver.MockControllerServer, *grpc.ClientConn, error) { // Start the mock server mockController := gomock.NewController(t) identityServer := driver.NewMockIdentityServer(mockController) @@ -77,7 +79,7 @@ func createMockServer(t *testing.T, tmpdir string) (*gomock.Controller, *driver. // Create a client connection to it addr := drv.Address() t.Logf("adds: %s", addr) - csiConn, err := New(addr) + csiConn, err := connection.Connect(addr) if err != nil { return nil, nil, nil, nil, nil, err } @@ -85,321 +87,6 @@ func createMockServer(t *testing.T, tmpdir string) (*gomock.Controller, *driver. return mockController, drv, identityServer, controllerServer, csiConn, nil } -func TestGetPluginInfo(t *testing.T) { - tests := []struct { - name string - output *csi.GetPluginInfoResponse - injectError bool - expectError bool - }{ - { - name: "success", - output: &csi.GetPluginInfoResponse{ - Name: "csi/example", - VendorVersion: "0.2.0", - Manifest: map[string]string{ - "hello": "world", - }, - }, - expectError: false, - }, - { - name: "gRPC error", - output: nil, - injectError: true, - expectError: true, - }, - { - name: "empty name", - output: &csi.GetPluginInfoResponse{ - Name: "", - }, - expectError: true, - }, - } - - tmpdir := tempDir(t) - defer os.RemoveAll(tmpdir) - mockController, driver, identityServer, _, csiConn, err := createMockServer(t, tmpdir) - if err != nil { - t.Fatal(err) - } - defer mockController.Finish() - defer driver.Stop() - defer csiConn.Close() - - for _, test := range tests { - - in := &csi.GetPluginInfoRequest{} - - out := test.output - var injectedErr error - if test.injectError { - injectedErr = fmt.Errorf("mock error") - } - - // Setup expectation - identityServer.EXPECT().GetPluginInfo(gomock.Any(), pbMatch(in)).Return(out, injectedErr).Times(1) - - name, err := csiConn.GetDriverName(context.Background()) - if test.expectError && err == nil { - t.Errorf("test %q: Expected error, got none", test.name) - } - if !test.expectError && err != nil { - t.Errorf("test %q: got error: %v", test.name, err) - } - if err == nil && name != "csi/example" { - t.Errorf("got unexpected name: %q", name) - } - } -} - -func TestSupportsControllerPublish(t *testing.T) { - tests := []struct { - name string - output *csi.ControllerGetCapabilitiesResponse - injectError bool - expectSupportsPublish bool - expectSupportsReadOnly bool - expectError bool - }{ - { - name: "success", - output: &csi.ControllerGetCapabilitiesResponse{ - Capabilities: []*csi.ControllerServiceCapability{ - { - Type: &csi.ControllerServiceCapability_Rpc{ - Rpc: &csi.ControllerServiceCapability_RPC{ - Type: csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME, - }, - }, - }, - { - Type: &csi.ControllerServiceCapability_Rpc{ - Rpc: &csi.ControllerServiceCapability_RPC{ - Type: csi.ControllerServiceCapability_RPC_PUBLISH_UNPUBLISH_VOLUME, - }, - }, - }, - }, - }, - expectSupportsPublish: true, - expectSupportsReadOnly: false, - expectError: false, - }, - { - name: "supports read only", - output: &csi.ControllerGetCapabilitiesResponse{ - Capabilities: []*csi.ControllerServiceCapability{ - { - Type: &csi.ControllerServiceCapability_Rpc{ - Rpc: &csi.ControllerServiceCapability_RPC{ - Type: csi.ControllerServiceCapability_RPC_PUBLISH_READONLY, - }, - }, - }, - { - Type: &csi.ControllerServiceCapability_Rpc{ - Rpc: &csi.ControllerServiceCapability_RPC{ - Type: csi.ControllerServiceCapability_RPC_PUBLISH_UNPUBLISH_VOLUME, - }, - }, - }, - }, - }, - expectSupportsPublish: true, - expectSupportsReadOnly: true, - expectError: false, - }, - { - name: "gRPC error", - output: nil, - injectError: true, - expectError: true, - }, - { - name: "no publish", - output: &csi.ControllerGetCapabilitiesResponse{ - Capabilities: []*csi.ControllerServiceCapability{ - { - Type: &csi.ControllerServiceCapability_Rpc{ - Rpc: &csi.ControllerServiceCapability_RPC{ - Type: csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME, - }, - }, - }, - }, - }, - expectSupportsPublish: false, - expectSupportsReadOnly: false, - expectError: false, - }, - { - name: "empty capability", - output: &csi.ControllerGetCapabilitiesResponse{ - Capabilities: []*csi.ControllerServiceCapability{ - { - Type: nil, - }, - }, - }, - expectSupportsPublish: false, - expectSupportsReadOnly: false, - expectError: false, - }, - { - name: "no capabilities", - output: &csi.ControllerGetCapabilitiesResponse{ - Capabilities: []*csi.ControllerServiceCapability{}, - }, - expectSupportsPublish: false, - expectSupportsReadOnly: false, - expectError: false, - }, - } - - tmpdir := tempDir(t) - defer os.RemoveAll(tmpdir) - mockController, driver, _, controllerServer, csiConn, err := createMockServer(t, tmpdir) - if err != nil { - t.Fatal(err) - } - defer mockController.Finish() - defer driver.Stop() - defer csiConn.Close() - - for _, test := range tests { - - in := &csi.ControllerGetCapabilitiesRequest{} - - out := test.output - var injectedErr error - if test.injectError { - injectedErr = fmt.Errorf("mock error") - } - - // Setup expectation - controllerServer.EXPECT().ControllerGetCapabilities(gomock.Any(), pbMatch(in)).Return(out, injectedErr).Times(1) - - supportsPublish, supportsReadOnly, err := csiConn.SupportsControllerPublish(context.Background()) - if test.expectError && err == nil { - t.Errorf("test %q: Expected error, got none", test.name) - } - if !test.expectError && err != nil { - t.Errorf("test %q: got error: %v", test.name, err) - } - if test.expectSupportsPublish != supportsPublish { - t.Errorf("test %q: expected PUBLISH_UNPUBLISH_VOLUME %t, got %t", test.name, test.expectSupportsPublish, supportsPublish) - } - if test.expectSupportsReadOnly != supportsReadOnly { - t.Errorf("test %q: expected PUBLISH_READONLY %t, got %t", test.name, test.expectSupportsReadOnly, supportsReadOnly) - } - } -} - -func TestSupportsPluginControllerService(t *testing.T) { - tests := []struct { - name string - output *csi.GetPluginCapabilitiesResponse - injectError bool - expectError bool - }{ - { - name: "success", - output: &csi.GetPluginCapabilitiesResponse{ - Capabilities: []*csi.PluginCapability{ - { - Type: &csi.PluginCapability_Service_{ - Service: &csi.PluginCapability_Service{ - Type: csi.PluginCapability_Service_CONTROLLER_SERVICE, - }, - }, - }, - { - Type: &csi.PluginCapability_Service_{ - Service: &csi.PluginCapability_Service{ - Type: csi.PluginCapability_Service_UNKNOWN, - }, - }, - }, - }, - }, - expectError: false, - }, - { - name: "gRPC error", - output: nil, - injectError: true, - expectError: true, - }, - { - name: "no controller service", - output: &csi.GetPluginCapabilitiesResponse{ - Capabilities: []*csi.PluginCapability{ - { - Type: &csi.PluginCapability_Service_{ - Service: &csi.PluginCapability_Service{ - Type: csi.PluginCapability_Service_UNKNOWN, - }, - }, - }, - }, - }, - expectError: false, - }, - { - name: "empty capability", - output: &csi.GetPluginCapabilitiesResponse{ - Capabilities: []*csi.PluginCapability{ - { - Type: nil, - }, - }, - }, - expectError: false, - }, - { - name: "no capabilities", - output: &csi.GetPluginCapabilitiesResponse{ - Capabilities: []*csi.PluginCapability{}, - }, - expectError: false, - }, - } - - tmpdir := tempDir(t) - defer os.RemoveAll(tmpdir) - mockController, driver, identityServer, _, csiConn, err := createMockServer(t, tmpdir) - if err != nil { - t.Fatal(err) - } - defer mockController.Finish() - defer driver.Stop() - defer csiConn.Close() - - for _, test := range tests { - - in := &csi.GetPluginCapabilitiesRequest{} - - out := test.output - var injectedErr error - if test.injectError { - injectedErr = fmt.Errorf("mock error") - } - - // Setup expectation - identityServer.EXPECT().GetPluginCapabilities(gomock.Any(), pbMatch(in)).Return(out, injectedErr).Times(1) - - _, err = csiConn.SupportsPluginControllerService(context.Background()) - if test.expectError && err == nil { - t.Errorf("test %q: Expected error, got none", test.name) - } - if !test.expectError && err != nil { - t.Errorf("test %q: got error: %v", test.name, err) - } - } -} - func TestAttach(t *testing.T) { defaultVolumeID := "myname" defaultNodeID := "MyNodeID" @@ -556,7 +243,6 @@ func TestAttach(t *testing.T) { } defer mockController.Finish() defer driver.Stop() - defer csiConn.Close() for _, test := range tests { in := test.input @@ -571,7 +257,8 @@ func TestAttach(t *testing.T) { controllerServer.EXPECT().ControllerPublishVolume(gomock.Any(), pbMatch(in)).Return(out, injectedErr).Times(1) } - publishInfo, detached, err := csiConn.Attach(context.Background(), test.volumeID, test.readonly, test.nodeID, test.caps, test.attributes, test.secrets) + a := NewAttacher(csiConn) + publishInfo, detached, err := a.Attach(context.Background(), test.volumeID, test.readonly, test.nodeID, test.caps, test.attributes, test.secrets) if test.expectError && err == nil { t.Errorf("test %q: Expected error, got none", test.name) } @@ -663,7 +350,6 @@ func TestDetachAttach(t *testing.T) { } defer mockController.Finish() defer driver.Stop() - defer csiConn.Close() for _, test := range tests { in := test.input @@ -678,7 +364,8 @@ func TestDetachAttach(t *testing.T) { controllerServer.EXPECT().ControllerUnpublishVolume(gomock.Any(), pbMatch(in)).Return(out, injectedErr).Times(1) } - detached, err := csiConn.Detach(context.Background(), test.volumeID, test.nodeID, test.secrets) + a := NewAttacher(csiConn) + detached, err := a.Detach(context.Background(), test.volumeID, test.nodeID, test.secrets) if test.expectError && err == nil { t.Errorf("test %q: Expected error, got none", test.name) } diff --git a/pkg/controller/csi_handler.go b/pkg/controller/csi_handler.go index e2c01b27e..d536e03f1 100644 --- a/pkg/controller/csi_handler.go +++ b/pkg/controller/csi_handler.go @@ -23,8 +23,7 @@ import ( "k8s.io/klog" - "github.com/kubernetes-csi/external-attacher/pkg/connection" - + "github.com/kubernetes-csi/external-attacher/pkg/attacher" "k8s.io/api/core/v1" storage "k8s.io/api/storage/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -45,7 +44,7 @@ type csiHandler struct { client kubernetes.Interface csiClientSet csiclient.Interface attacherName string - csiConnection connection.CSIConnection + attacher attacher.Attacher pvLister corelisters.PersistentVolumeLister nodeLister corelisters.NodeLister nodeInfoLister csilisters.CSINodeInfoLister @@ -62,7 +61,7 @@ func NewCSIHandler( client kubernetes.Interface, csiClientSet csiclient.Interface, attacherName string, - csiConnection connection.CSIConnection, + attacher attacher.Attacher, pvLister corelisters.PersistentVolumeLister, nodeLister corelisters.NodeLister, nodeInfoLister csilisters.CSINodeInfoLister, @@ -74,7 +73,7 @@ func NewCSIHandler( client: client, csiClientSet: csiClientSet, attacherName: attacherName, - csiConnection: csiConnection, + attacher: attacher, pvLister: pvLister, nodeLister: nodeLister, nodeInfoLister: nodeInfoLister, @@ -326,7 +325,7 @@ func (h *csiHandler) csiAttach(va *storage.VolumeAttachment) (*storage.VolumeAtt defer cancel() // We're not interested in `detached` return value, the controller will // issue Detach to be sure the volume is really detached. - publishInfo, _, err := h.csiConnection.Attach(ctx, volumeHandle, readOnly, nodeID, volumeCapabilities, attributes, secrets) + publishInfo, _, err := h.attacher.Attach(ctx, volumeHandle, readOnly, nodeID, volumeCapabilities, attributes, secrets) if err != nil { return va, nil, err } @@ -365,7 +364,7 @@ func (h *csiHandler) csiDetach(va *storage.VolumeAttachment) (*storage.VolumeAtt ctx, cancel := context.WithTimeout(context.Background(), h.timeout) defer cancel() - detached, err := h.csiConnection.Detach(ctx, volumeHandle, nodeID, secrets) + detached, err := h.attacher.Detach(ctx, volumeHandle, nodeID, secrets) if err != nil && !detached { // The volume may not be fully detached. Save the error and try again // after backoff. diff --git a/pkg/controller/csi_handler_test.go b/pkg/controller/csi_handler_test.go index 3c188e523..1d8aa5316 100644 --- a/pkg/controller/csi_handler_test.go +++ b/pkg/controller/csi_handler_test.go @@ -22,7 +22,7 @@ import ( "testing" "time" - "github.com/kubernetes-csi/external-attacher/pkg/connection" + "github.com/kubernetes-csi/external-attacher/pkg/attacher" "k8s.io/api/core/v1" storage "k8s.io/api/storage/v1beta1" @@ -51,7 +51,7 @@ var ( var timeout = 10 * time.Millisecond -func csiHandlerFactory(client kubernetes.Interface, csiClient csiclient.Interface, informerFactory informers.SharedInformerFactory, csiInformerFactory csiinformers.SharedInformerFactory, csi connection.CSIConnection) Handler { +func csiHandlerFactory(client kubernetes.Interface, csiClient csiclient.Interface, informerFactory informers.SharedInformerFactory, csiInformerFactory csiinformers.SharedInformerFactory, csi attacher.Attacher) Handler { return NewCSIHandler( client, csiClient, @@ -66,7 +66,7 @@ func csiHandlerFactory(client kubernetes.Interface, csiClient csiclient.Interfac ) } -func csiHandlerFactoryNoReadOnly(client kubernetes.Interface, csiClient csiclient.Interface, informerFactory informers.SharedInformerFactory, csiInformerFactory csiinformers.SharedInformerFactory, csi connection.CSIConnection) Handler { +func csiHandlerFactoryNoReadOnly(client kubernetes.Interface, csiClient csiclient.Interface, informerFactory informers.SharedInformerFactory, csiInformerFactory csiinformers.SharedInformerFactory, csi attacher.Attacher) Handler { return NewCSIHandler( client, csiClient, diff --git a/pkg/controller/framework_test.go b/pkg/controller/framework_test.go index 83a64c002..00e172168 100644 --- a/pkg/controller/framework_test.go +++ b/pkg/controller/framework_test.go @@ -26,7 +26,7 @@ import ( "github.com/container-storage-interface/spec/lib/go/csi" "github.com/davecgh/go-spew/spew" - "github.com/kubernetes-csi/external-attacher/pkg/connection" + "github.com/kubernetes-csi/external-attacher/pkg/attacher" "k8s.io/klog" "k8s.io/api/core/v1" @@ -102,7 +102,7 @@ type csiCall struct { delay time.Duration } -type handlerFactory func(client kubernetes.Interface, csiClient csiclient.Interface, informerFactory informers.SharedInformerFactory, csiInformerFactory csiinformers.SharedInformerFactory, csi connection.CSIConnection) Handler +type handlerFactory func(client kubernetes.Interface, csiClient csiclient.Interface, informerFactory informers.SharedInformerFactory, csiInformerFactory csiinformers.SharedInformerFactory, csi attacher.Attacher) Handler func runTests(t *testing.T, handlerFactory handlerFactory, tests []testCase) { for _, test := range tests { diff --git a/pkg/controller/trivial_handler_test.go b/pkg/controller/trivial_handler_test.go index b1b73699a..ca8c7744f 100644 --- a/pkg/controller/trivial_handler_test.go +++ b/pkg/controller/trivial_handler_test.go @@ -20,7 +20,7 @@ import ( "errors" "testing" - "github.com/kubernetes-csi/external-attacher/pkg/connection" + "github.com/kubernetes-csi/external-attacher/pkg/attacher" storage "k8s.io/api/storage/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -34,7 +34,7 @@ import ( csiinformers "k8s.io/csi-api/pkg/client/informers/externalversions" ) -func trivialHandlerFactory(client kubernetes.Interface, csiClient csiclient.Interface, informerFactory informers.SharedInformerFactory, csiInformerFactory csiinformers.SharedInformerFactory, csi connection.CSIConnection) Handler { +func trivialHandlerFactory(client kubernetes.Interface, csiClient csiclient.Interface, informerFactory informers.SharedInformerFactory, csiInformerFactory csiinformers.SharedInformerFactory, csi attacher.Attacher) Handler { return NewTrivialHandler(client) } diff --git a/vendor/github.com/kubernetes-csi/csi-lib-utils/rpc/common.go b/vendor/github.com/kubernetes-csi/csi-lib-utils/rpc/common.go new file mode 100644 index 000000000..bb4a5c448 --- /dev/null +++ b/vendor/github.com/kubernetes-csi/csi-lib-utils/rpc/common.go @@ -0,0 +1,160 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package rpc + +import ( + "context" + "fmt" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + "github.com/container-storage-interface/spec/lib/go/csi" + + "k8s.io/klog" +) + +const ( + // Interval of trying to call Probe() until it succeeds + probeInterval = 1 * time.Second +) + +// GetDriverName returns name of CSI driver. +func GetDriverName(ctx context.Context, conn *grpc.ClientConn) (string, error) { + client := csi.NewIdentityClient(conn) + + req := csi.GetPluginInfoRequest{} + rsp, err := client.GetPluginInfo(ctx, &req) + if err != nil { + return "", err + } + name := rsp.GetName() + if name == "" { + return "", fmt.Errorf("driver name is empty") + } + return name, nil +} + +// PluginCapabilitySet is set of CSI plugin capabilities. Only supported capabilities are in the map. +type PluginCapabilitySet map[csi.PluginCapability_Service_Type]bool + +// GetPluginCapabilities returns set of supported capabilities of CSI driver. +func GetPluginCapabilities(ctx context.Context, conn *grpc.ClientConn) (PluginCapabilitySet, error) { + client := csi.NewIdentityClient(conn) + req := csi.GetPluginCapabilitiesRequest{} + rsp, err := client.GetPluginCapabilities(ctx, &req) + if err != nil { + return nil, err + } + caps := PluginCapabilitySet{} + for _, cap := range rsp.GetCapabilities() { + if cap == nil { + continue + } + srv := cap.GetService() + if srv == nil { + continue + } + t := srv.GetType() + caps[t] = true + } + return caps, nil +} + +// ControllerCapabilitySet is set of CSI controller capabilities. Only supported capabilities are in the map. +type ControllerCapabilitySet map[csi.ControllerServiceCapability_RPC_Type]bool + +// GetControllerCapabilities returns set of supported controller capabilities of CSI driver. +func GetControllerCapabilities(ctx context.Context, conn *grpc.ClientConn) (ControllerCapabilitySet, error) { + client := csi.NewControllerClient(conn) + req := csi.ControllerGetCapabilitiesRequest{} + rsp, err := client.ControllerGetCapabilities(ctx, &req) + if err != nil { + return nil, err + } + + caps := ControllerCapabilitySet{} + for _, cap := range rsp.GetCapabilities() { + if cap == nil { + continue + } + rpc := cap.GetRpc() + if rpc == nil { + continue + } + t := rpc.GetType() + caps[t] = true + } + return caps, nil +} + +// ProbeForever calls Probe() of a CSI driver and waits until the driver becomes ready. +// Any error other than timeout is returned. +func ProbeForever(conn *grpc.ClientConn, singleProbeTimeout time.Duration) error { + for { + klog.Info("Probing CSI driver for readiness") + ready, err := probeOnce(conn, singleProbeTimeout) + if err != nil { + st, ok := status.FromError(err) + if !ok { + // This is not gRPC error. The probe must have failed before gRPC + // method was called, otherwise we would get gRPC error. + return fmt.Errorf("CSI driver probe failed: %s", err) + } + if st.Code() != codes.DeadlineExceeded { + return fmt.Errorf("CSI driver probe failed: %s", err) + } + // Timeout -> driver is not ready. Fall through to sleep() below. + klog.Warning("CSI driver probe timed out") + } else { + if ready { + return nil + } + klog.Warning("CSI driver is not ready") + } + // Timeout was returned or driver is not ready. + time.Sleep(probeInterval) + } +} + +// probeOnce is a helper to simplify defer cancel() +func probeOnce(conn *grpc.ClientConn, timeout time.Duration) (bool, error) { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + return Probe(ctx, conn) +} + +// Probe calls driver Probe() just once and returns its result without any processing. +func Probe(ctx context.Context, conn *grpc.ClientConn) (ready bool, err error) { + client := csi.NewIdentityClient(conn) + + req := csi.ProbeRequest{} + rsp, err := client.Probe(ctx, &req) + + if err != nil { + return false, err + } + + r := rsp.GetReady() + if r == nil { + // "If not present, the caller SHALL assume that the plugin is in a ready state" + return true, nil + } + return r.GetValue(), nil +}