Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@

[[constraint]]
name = "github.com/kubernetes-csi/csi-lib-utils"
version = "0.3.1"
version = ">=0.4.0-rc1"

[prune]
non-go = true
Expand Down
49 changes: 37 additions & 12 deletions cmd/csi-attacher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,12 @@ import (
csiinformers "k8s.io/csi-api/pkg/client/informers/externalversions"
"k8s.io/klog"

"github.com/kubernetes-csi/external-attacher/pkg/connection"
"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/kubernetes-csi/csi-lib-utils/connection"
"github.com/kubernetes-csi/csi-lib-utils/rpc"
"github.com/kubernetes-csi/external-attacher/pkg/attacher"
"github.com/kubernetes-csi/external-attacher/pkg/controller"
"google.golang.org/grpc"
)

const (
Expand Down Expand Up @@ -103,20 +107,20 @@ func main() {
var csiFactory csiinformers.SharedInformerFactory
var handler controller.Handler

var attacher string
var csiAttacher string
if *dummy {
// Do not connect to any CSI, mark everything as attached.
handler = controller.NewTrivialHandler(clientset)
attacher = dummyAttacherName
csiAttacher = dummyAttacherName
} else {
// Connect to CSI.
csiConn, err := connection.New(*csiAddress)
csiConn, err := connection.Connect(*csiAddress)
if err != nil {
klog.Error(err.Error())
os.Exit(1)
}

err = csiConn.Probe(*timeout)
err = rpc.ProbeForever(csiConn, *timeout)
if err != nil {
klog.Error(err.Error())
os.Exit(1)
Expand All @@ -125,14 +129,14 @@ func main() {
// Find driver name.
ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
defer cancel()
attacher, err = csiConn.GetDriverName(ctx)
csiAttacher, err = rpc.GetDriverName(ctx, csiConn)
if err != nil {
klog.Error(err.Error())
os.Exit(1)
}
klog.V(2).Infof("CSI driver name: %q", attacher)
klog.V(2).Infof("CSI driver name: %q", csiAttacher)

supportsService, err := csiConn.SupportsPluginControllerService(ctx)
supportsService, err := supportsPluginControllerService(ctx, csiConn)
if err != nil {
klog.Error(err.Error())
os.Exit(1)
Expand All @@ -142,7 +146,7 @@ func main() {
klog.V(2).Infof("CSI driver does not support Plugin Controller Service, using trivial handler")
} else {
// Find out if the driver supports attach/detach.
supportsAttach, supportsReadOnly, err := csiConn.SupportsControllerPublish(ctx)
supportsAttach, supportsReadOnly, err := supportsControllerPublish(ctx, csiConn)
if err != nil {
klog.Error(err.Error())
os.Exit(1)
Expand All @@ -153,7 +157,8 @@ func main() {
vaLister := factory.Storage().V1beta1().VolumeAttachments().Lister()
csiFactory := csiinformers.NewSharedInformerFactory(csiClientset, *resync)
nodeInfoLister := csiFactory.Csi().V1alpha1().CSINodeInfos().Lister()
handler = controller.NewCSIHandler(clientset, csiClientset, attacher, csiConn, pvLister, nodeLister, nodeInfoLister, vaLister, timeout, supportsReadOnly)
attacher := attacher.NewAttacher(csiConn)
handler = controller.NewCSIHandler(clientset, csiClientset, csiAttacher, attacher, pvLister, nodeLister, nodeInfoLister, vaLister, timeout, supportsReadOnly)
klog.V(2).Infof("CSI driver supports ControllerPublishUnpublish, using real CSI handler")
} else {
handler = controller.NewTrivialHandler(clientset)
Expand All @@ -164,7 +169,7 @@ func main() {

ctrl := controller.NewCSIAttachController(
clientset,
attacher,
csiAttacher,
handler,
factory.Storage().V1beta1().VolumeAttachments(),
factory.Core().V1().PersistentVolumes(),
Expand Down Expand Up @@ -192,7 +197,7 @@ func main() {
os.Exit(1)
}
// Name of config map with leader election lock
lockName := "external-attacher-leader-" + attacher
lockName := "external-attacher-leader-" + csiAttacher
runAsLeader(clientset, *leaderElectionNamespace, *leaderElectionIdentity, lockName, run)
}
}
Expand All @@ -203,3 +208,23 @@ func buildConfig(kubeconfig string) (*rest.Config, error) {
}
return rest.InClusterConfig()
}

func supportsControllerPublish(ctx context.Context, csiConn *grpc.ClientConn) (supportsControllerPublish bool, supportsPublishReadOnly bool, err error) {
caps, err := rpc.GetControllerCapabilities(ctx, csiConn)
if err != nil {
return false, false, err
}

supportsControllerPublish = caps[csi.ControllerServiceCapability_RPC_PUBLISH_UNPUBLISH_VOLUME]
supportsPublishReadOnly = caps[csi.ControllerServiceCapability_RPC_PUBLISH_READONLY]
return supportsControllerPublish, supportsPublishReadOnly, nil
}

func supportsPluginControllerService(ctx context.Context, csiConn *grpc.ClientConn) (bool, error) {
caps, err := rpc.GetPluginCapabilities(ctx, csiConn)
if err != nil {
return false, err
}

return caps[csi.PluginCapability_Service_CONTROLLER_SERVICE], nil
}
82 changes: 13 additions & 69 deletions pkg/connection/connection.go → pkg/attacher/attacher.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,36 +14,22 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package connection
package attacher

import (
"context"
"time"

"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/kubernetes-csi/csi-lib-utils/connection"
"github.com/kubernetes-csi/csi-lib-utils/protosanitizer"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"k8s.io/klog"
)

// CSIConnection is gRPC connection to a remote CSI driver and abstracts all
// CSI calls.
type CSIConnection interface {
// GetDriverName returns driver name as discovered by GetPluginInfo()
// gRPC call.
GetDriverName(ctx context.Context) (string, error)

// SupportsControllerPublish returns true if the CSI driver reports
// PUBLISH_UNPUBLISH_VOLUME in ControllerGetCapabilities() gRPC call.
SupportsControllerPublish(ctx context.Context) (supportsControllerPublish bool, supportsPublishReadOnly bool, err error)

// SupportsPluginControllerService return true if the CSI driver reports
// CONTROLLER_SERVICE in GetPluginCapabilities() gRPC call.
SupportsPluginControllerService(ctx context.Context) (bool, error)

// Attacher implements attach/detach operations against a remote CSI driver.
type Attacher interface {
// Attach given volume to given node. Returns PublishVolumeInfo. Note that
// "detached" is returned on error and means that the volume is for sure
// detached from the node. "false" means that the volume may be either
Expand All @@ -56,64 +42,26 @@ type CSIConnection interface {
// "false" means that the volume may or may not be detached and caller
// should retry.
Detach(ctx context.Context, volumeID string, nodeID string, secrets map[string]string) (detached bool, err error)

// Probe checks that the CSI driver is ready to process requests
Probe(singleProbeTimeout time.Duration) error

// Close the connection
Close() error
}

type csiConnection struct {
type attacher struct {
conn *grpc.ClientConn
capabilities []csi.ControllerServiceCapability
}

var (
_ CSIConnection = &csiConnection{}
_ Attacher = &attacher{}
)

// New provides a new CSI connection object.
func New(address string) (CSIConnection, error) {
conn, err := connection.Connect(address, connection.OnConnectionLoss(connection.ExitOnConnectionLoss()))
if err != nil {
return nil, err
}
return &csiConnection{
// NewAttacher provides a new Attacher object.
func NewAttacher(conn *grpc.ClientConn) Attacher {
return &attacher{
conn: conn,
}, nil
}

func (c *csiConnection) GetDriverName(ctx context.Context) (string, error) {
return connection.GetDriverName(ctx, c.conn)
}

func (c *csiConnection) Probe(singleProbeTimeout time.Duration) error {
return connection.ProbeForever(c.conn, singleProbeTimeout)
}

func (c *csiConnection) SupportsControllerPublish(ctx context.Context) (supportsControllerPublish bool, supportsPublishReadOnly bool, err error) {
caps, err := connection.GetControllerCapabilities(ctx, c.conn)
if err != nil {
return false, false, err
}

supportsControllerPublish = caps[csi.ControllerServiceCapability_RPC_PUBLISH_UNPUBLISH_VOLUME]
supportsPublishReadOnly = caps[csi.ControllerServiceCapability_RPC_PUBLISH_READONLY]
return supportsControllerPublish, supportsPublishReadOnly, nil
}

func (c *csiConnection) SupportsPluginControllerService(ctx context.Context) (bool, error) {
caps, err := connection.GetPluginCapabilities(ctx, c.conn)
if err != nil {
return false, err
}

return caps[csi.PluginCapability_Service_CONTROLLER_SERVICE], nil
}

func (c *csiConnection) Attach(ctx context.Context, volumeID string, readOnly bool, nodeID string, caps *csi.VolumeCapability, context, secrets map[string]string) (metadata map[string]string, detached bool, err error) {
client := csi.NewControllerClient(c.conn)
func (a *attacher) Attach(ctx context.Context, volumeID string, readOnly bool, nodeID string, caps *csi.VolumeCapability, context, secrets map[string]string) (metadata map[string]string, detached bool, err error) {
client := csi.NewControllerClient(a.conn)

req := csi.ControllerPublishVolumeRequest{
VolumeId: volumeID,
Expand All @@ -131,8 +79,8 @@ func (c *csiConnection) Attach(ctx context.Context, volumeID string, readOnly bo
return rsp.PublishContext, false, nil
}

func (c *csiConnection) Detach(ctx context.Context, volumeID string, nodeID string, secrets map[string]string) (detached bool, err error) {
client := csi.NewControllerClient(c.conn)
func (a *attacher) Detach(ctx context.Context, volumeID string, nodeID string, secrets map[string]string) (detached bool, err error) {
client := csi.NewControllerClient(a.conn)

req := csi.ControllerUnpublishVolumeRequest{
VolumeId: volumeID,
Expand All @@ -147,10 +95,6 @@ func (c *csiConnection) Detach(ctx context.Context, volumeID string, nodeID stri
return true, nil
}

func (c *csiConnection) Close() error {
return c.conn.Close()
}

func logGRPC(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
klog.V(5).Infof("GRPC call: %s", method)
klog.V(5).Infof("GRPC request: %s", protosanitizer.StripSecrets(req))
Expand Down
Loading