Skip to content

Commit

Permalink
Merge pull request kubernetes-csi#123 from jsafrane/use-lib-connection
Browse files Browse the repository at this point in the history
Use Connection() from util package
  • Loading branch information
k8s-ci-robot authored Feb 22, 2019
2 parents a68f14d + ff48934 commit 0e6f5f2
Show file tree
Hide file tree
Showing 10 changed files with 635 additions and 161 deletions.
19 changes: 11 additions & 8 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 2 additions & 3 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,9 @@
name = "github.com/golang/protobuf"
version = "1.1.0"


[[constraint]]
name = "github.com/kubernetes-csi/csi-test"
version = "v1.0.0-1"
version = "~v1.0.3"

[[constraint]]
name = "google.golang.org/grpc"
Expand Down Expand Up @@ -44,7 +43,7 @@

[[constraint]]
name = "github.com/kubernetes-csi/csi-lib-utils"
version = "0.1.0"
version = "0.3.1"

[prune]
non-go = true
Expand Down
34 changes: 8 additions & 26 deletions cmd/csi-attacher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ const (
var (
kubeconfig = flag.String("kubeconfig", "", "Absolute path to the kubeconfig file. Required only when running out of cluster.")
resync = flag.Duration("resync", 10*time.Minute, "Resync interval of the controller.")
connectionTimeout = flag.Duration("connection-timeout", 1*time.Minute, "Timeout for waiting for CSI driver socket.")
connectionTimeout = flag.Duration("connection-timeout", 0, "This option is deprecated.")
csiAddress = flag.String("csi-address", "/run/csi/socket", "Address of the CSI driver socket.")
dummy = flag.Bool("dummy", false, "Run in dummy mode, i.e. not connecting to CSI driver and marking everything as attached. Expected CSI driver name is \"csi/dummy\".")
showVersion = flag.Bool("version", false, "Show version.")
Expand All @@ -76,6 +76,10 @@ func main() {
}
klog.Infof("Version: %s", version)

if *connectionTimeout != 0 {
klog.Warningf("Warning: option -connection-timeout is deprecated and has no effect")
}

// Create the client config. Use kubeconfig if given, otherwise assume in-cluster.
config, err := buildConfig(*kubeconfig)
if err != nil {
Expand Down Expand Up @@ -106,14 +110,14 @@ func main() {
attacher = dummyAttacherName
} else {
// Connect to CSI.
csiConn, err := connection.New(*csiAddress, *connectionTimeout)
csiConn, err := connection.New(*csiAddress)
if err != nil {
klog.Error(err.Error())
os.Exit(1)
}

// Check it's ready
if err = waitForDriverReady(csiConn, *connectionTimeout); err != nil {
err = csiConn.Probe(*timeout)
if err != nil {
klog.Error(err.Error())
os.Exit(1)
}
Expand Down Expand Up @@ -199,25 +203,3 @@ func buildConfig(kubeconfig string) (*rest.Config, error) {
}
return rest.InClusterConfig()
}

func waitForDriverReady(csiConn connection.CSIConnection, timeout time.Duration) error {
now := time.Now()
finish := now.Add(timeout)
var err error
for {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
err = csiConn.Probe(ctx)
if err == nil {
klog.V(2).Infof("Probe succeeded")
return nil
}
klog.V(2).Infof("Probe failed with %s", err)

now := time.Now()
if now.After(finish) {
return fmt.Errorf("Failed to probe the controller: %s", err)
}
time.Sleep(time.Second)
}
}
120 changes: 16 additions & 104 deletions pkg/connection/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,13 @@ package connection

import (
"context"
"fmt"
"net"
"strings"
"time"

"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/kubernetes-csi/csi-lib-utils/connection"
"github.com/kubernetes-csi/csi-lib-utils/protosanitizer"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/status"
"k8s.io/klog"
)
Expand Down Expand Up @@ -61,23 +58,24 @@ type CSIConnection interface {
Detach(ctx context.Context, volumeID string, nodeID string, secrets map[string]string) (detached bool, err error)

// Probe checks that the CSI driver is ready to process requests
Probe(ctx context.Context) error
Probe(singleProbeTimeout time.Duration) error

// Close the connection
Close() error
}

type csiConnection struct {
conn *grpc.ClientConn
conn *grpc.ClientConn
capabilities []csi.ControllerServiceCapability
}

var (
_ CSIConnection = &csiConnection{}
)

// New provides a new CSI connection object.
func New(address string, timeout time.Duration) (CSIConnection, error) {
conn, err := connect(address, timeout)
func New(address string) (CSIConnection, error) {
conn, err := connection.Connect(address, connection.OnConnectionLoss(connection.ExitOnConnectionLoss()))
if err != nil {
return nil, err
}
Expand All @@ -86,118 +84,32 @@ func New(address string, timeout time.Duration) (CSIConnection, error) {
}, nil
}

func connect(address string, timeout time.Duration) (*grpc.ClientConn, error) {
klog.V(2).Infof("Connecting to %s", address)
dialOptions := []grpc.DialOption{
grpc.WithInsecure(),
grpc.WithBackoffMaxDelay(time.Second),
grpc.WithUnaryInterceptor(logGRPC),
}
if strings.HasPrefix(address, "/") {
dialOptions = append(dialOptions, grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) {
return net.DialTimeout("unix", addr, timeout)
}))
}
conn, err := grpc.Dial(address, dialOptions...)

if err != nil {
return nil, err
}
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
for {
if !conn.WaitForStateChange(ctx, conn.GetState()) {
klog.V(4).Infof("Connection timed out")
return conn, nil // return nil, subsequent GetPluginInfo will show the real connection error
}
if conn.GetState() == connectivity.Ready {
klog.V(3).Infof("Connected")
return conn, nil
}
klog.V(4).Infof("Still trying, connection is %s", conn.GetState())
}
}

func (c *csiConnection) GetDriverName(ctx context.Context) (string, error) {
client := csi.NewIdentityClient(c.conn)

req := csi.GetPluginInfoRequest{}

rsp, err := client.GetPluginInfo(ctx, &req)
if err != nil {
return "", err
}
name := rsp.GetName()
if name == "" {
return "", fmt.Errorf("name is empty")
}
return name, nil
return connection.GetDriverName(ctx, c.conn)
}

func (c *csiConnection) Probe(ctx context.Context) error {
client := csi.NewIdentityClient(c.conn)

req := csi.ProbeRequest{}

_, err := client.Probe(ctx, &req)
if err != nil {
return err
}
return nil
func (c *csiConnection) Probe(singleProbeTimeout time.Duration) error {
return connection.ProbeForever(c.conn, singleProbeTimeout)
}

func (c *csiConnection) SupportsControllerPublish(ctx context.Context) (supportsControllerPublish bool, supportsPublishReadOnly bool, err error) {
supportsControllerPublish = false
supportsPublishReadOnly = false

client := csi.NewControllerClient(c.conn)
req := csi.ControllerGetCapabilitiesRequest{}

rsp, err := client.ControllerGetCapabilities(ctx, &req)
caps, err := connection.GetControllerCapabilities(ctx, c.conn)
if err != nil {
return false, false, err
}
caps := rsp.GetCapabilities()
for _, cap := range caps {
if cap == nil {
continue
}
rpc := cap.GetRpc()
if rpc == nil {
continue
}
if rpc.GetType() == csi.ControllerServiceCapability_RPC_PUBLISH_UNPUBLISH_VOLUME {
supportsControllerPublish = true
}
if rpc.GetType() == csi.ControllerServiceCapability_RPC_PUBLISH_READONLY {
supportsPublishReadOnly = true
}
}

supportsControllerPublish = caps[csi.ControllerServiceCapability_RPC_PUBLISH_UNPUBLISH_VOLUME]
supportsPublishReadOnly = caps[csi.ControllerServiceCapability_RPC_PUBLISH_READONLY]
return supportsControllerPublish, supportsPublishReadOnly, nil
}

func (c *csiConnection) SupportsPluginControllerService(ctx context.Context) (bool, error) {
client := csi.NewIdentityClient(c.conn)
req := csi.GetPluginCapabilitiesRequest{}

rsp, err := client.GetPluginCapabilities(ctx, &req)
caps, err := connection.GetPluginCapabilities(ctx, c.conn)
if err != nil {
return false, err
}
caps := rsp.GetCapabilities()
for _, cap := range caps {
if cap == nil {
continue
}
service := cap.GetService()
if service == nil {
continue
}
if service.GetType() == csi.PluginCapability_Service_CONTROLLER_SERVICE {
return true, nil
}
}
return false, nil

return caps[csi.PluginCapability_Service_CONTROLLER_SERVICE], nil
}

func (c *csiConnection) Attach(ctx context.Context, volumeID string, readOnly bool, nodeID string, caps *csi.VolumeCapability, context, secrets map[string]string) (metadata map[string]string, detached bool, err error) {
Expand Down
Loading

0 comments on commit 0e6f5f2

Please sign in to comment.