Skip to content

Commit

Permalink
Merge pull request #234 from jsafrane/use-lib-connection
Browse files Browse the repository at this point in the history
Use Connection() from util package
  • Loading branch information
k8s-ci-robot authored Mar 1, 2019
2 parents c42566f + 20cb5e5 commit f588078
Show file tree
Hide file tree
Showing 9 changed files with 656 additions and 132 deletions.
22 changes: 12 additions & 10 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

[[constraint]]
name = "github.com/kubernetes-csi/csi-test"
version = "v1.0.0-1"
version = "~v1.0.3"

[[constraint]]
name = "github.com/kubernetes-csi/external-snapshotter"
Expand Down Expand Up @@ -62,7 +62,8 @@

[[constraint]]
name = "github.com/kubernetes-csi/csi-lib-utils"
version = "0.1.0"
version = ">=0.4.0-rc1"

[prune]
non-go = true
go-tests = true
Expand Down
28 changes: 15 additions & 13 deletions cmd/csi-provisioner/csi-provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"k8s.io/klog"

flag "github.com/spf13/pflag"
"google.golang.org/grpc"

ctrl "github.com/kubernetes-csi/external-provisioner/pkg/controller"
snapclientset "github.com/kubernetes-csi/external-snapshotter/pkg/client/clientset/versioned"
Expand All @@ -49,7 +48,7 @@ var (
master = flag.String("master", "", "Master URL to build a client config from. Either this or kubeconfig needs to be set if the provisioner is being run out of cluster.")
kubeconfig = flag.String("kubeconfig", "", "Absolute path to the kubeconfig file. Either this or master needs to be set if the provisioner is being run out of cluster.")
csiEndpoint = flag.String("csi-address", "/run/csi/socket", "The gRPC endpoint for Target CSI Volume.")
connectionTimeout = flag.Duration("connection-timeout", 10*time.Second, "Timeout for waiting for CSI driver socket.")
connectionTimeout = flag.Duration("connection-timeout", 0, "This option is deprecated.")
volumeNamePrefix = flag.String("volume-name-prefix", "pvc", "Prefix to apply to the name of a created volume.")
volumeNameUUIDLength = flag.Int("volume-name-uuid-length", -1, "Truncates generated UUID of a created volume to this length. Defaults behavior is to NOT truncate.")
showVersion = flag.Bool("version", false, "Show version.")
Expand Down Expand Up @@ -78,6 +77,10 @@ func init() {
flag.Set("logtostderr", "true")
flag.Parse()

if *connectionTimeout != 0 {
klog.Warningf("Warning: option -connection-timeout is deprecated and has no effect")
}

if err := utilfeature.DefaultFeatureGate.SetFromMap(featureGates); err != nil {
klog.Fatal(err)
}
Expand Down Expand Up @@ -127,17 +130,16 @@ func init() {
klog.Fatalf("Error getting server version: %v", err)
}

// Provisioner will stay in Init until driver opens csi socket, once it's done
// controller will exit this loop and proceed normally.
socketDown := true
grpcClient := &grpc.ClientConn{}
for socketDown {
grpcClient, err = ctrl.Connect(*csiEndpoint, *connectionTimeout)
if err == nil {
socketDown = false
continue
}
time.Sleep(10 * time.Second)
grpcClient, err := ctrl.Connect(*csiEndpoint)
if err != nil {
klog.Error(err.Error())
os.Exit(1)
}

err = ctrl.Probe(grpcClient, *operationTimeout)
if err != nil {
klog.Error(err.Error())
os.Exit(1)
}

// Autodetect provisioner name
Expand Down
98 changes: 15 additions & 83 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ import (
"context"
"fmt"
"math"
"net"
"os"
"strings"
"time"

"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/kubernetes-csi/csi-lib-utils/connection"
"github.com/kubernetes-csi/csi-lib-utils/protosanitizer"
"github.com/kubernetes-csi/external-provisioner/pkg/features"
snapapi "github.com/kubernetes-csi/external-snapshotter/pkg/apis/volumesnapshot/v1alpha1"
Expand All @@ -46,7 +46,6 @@ import (
"k8s.io/klog"

"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
)

type deprecatedSecretParamsMap struct {
Expand Down Expand Up @@ -185,55 +184,18 @@ func logGRPC(ctx context.Context, method string, req, reply interface{}, cc *grp
return err
}

func Connect(address string, timeout time.Duration) (*grpc.ClientConn, error) {
klog.V(2).Infof("Connecting to %s", address)
dialOptions := []grpc.DialOption{
grpc.WithInsecure(),
grpc.WithBackoffMaxDelay(time.Second),
grpc.WithUnaryInterceptor(logGRPC),
}
if strings.HasPrefix(address, "/") {
dialOptions = append(dialOptions, grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) {
return net.DialTimeout("unix", addr, timeout)
}))
}
conn, err := grpc.Dial(address, dialOptions...)
func Connect(address string) (*grpc.ClientConn, error) {
return connection.Connect(address, connection.OnConnectionLoss(connection.ExitOnConnectionLoss()))
}

if err != nil {
return nil, err
}
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
for {
if !conn.WaitForStateChange(ctx, conn.GetState()) {
klog.V(4).Infof("Connection timed out")
return conn, fmt.Errorf("Connection timed out")
}
if conn.GetState() == connectivity.Ready {
klog.V(3).Infof("Connected")
return conn, nil
}
klog.V(4).Infof("Still trying, connection is %s", conn.GetState())
}
func Probe(conn *grpc.ClientConn, singleCallTimeout time.Duration) error {
return connection.ProbeForever(conn, singleCallTimeout)
}

func GetDriverName(conn *grpc.ClientConn, timeout time.Duration) (string, error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

client := csi.NewIdentityClient(conn)

req := csi.GetPluginInfoRequest{}

rsp, err := client.GetPluginInfo(ctx, &req)
if err != nil {
return "", err
}
name := rsp.GetName()
if name == "" {
return "", fmt.Errorf("name is empty")
}
return name, nil
return connection.GetDriverName(ctx, conn)
}

func getDriverCapabilities(conn *grpc.ClientConn, timeout time.Duration) (sets.Int, error) {
Expand All @@ -248,30 +210,16 @@ func getDriverCapabilities(conn *grpc.ClientConn, timeout time.Duration) (sets.I
}

capabilities := make(sets.Int)
for _, cap := range pluginCaps {
if cap == nil {
continue
}
service := cap.GetService()
if service == nil {
continue
}
switch service.GetType() {
for cap := range pluginCaps {
switch cap {
case csi.PluginCapability_Service_CONTROLLER_SERVICE:
capabilities.Insert(PluginCapability_CONTROLLER_SERVICE)
case csi.PluginCapability_Service_VOLUME_ACCESSIBILITY_CONSTRAINTS:
capabilities.Insert(PluginCapability_ACCESSIBILITY_CONSTRAINTS)
}
}
for _, cap := range controllerCaps {
if cap == nil {
continue
}
rpc := cap.GetRpc()
if rpc == nil {
continue
}
switch rpc.GetType() {
for cap := range controllerCaps {
switch cap {
case csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME:
capabilities.Insert(ControllerCapability_CREATE_DELETE_VOLUME)
case csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT:
Expand All @@ -281,32 +229,16 @@ func getDriverCapabilities(conn *grpc.ClientConn, timeout time.Duration) (sets.I
return capabilities, nil
}

func getPluginCapabilities(conn *grpc.ClientConn, timeout time.Duration) ([]*csi.PluginCapability, error) {
func getPluginCapabilities(conn *grpc.ClientConn, timeout time.Duration) (connection.PluginCapabilitySet, error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

client := csi.NewIdentityClient(conn)
req := csi.GetPluginCapabilitiesRequest{}

rsp, err := client.GetPluginCapabilities(ctx, &req)
if err != nil {
return nil, err
}
return rsp.GetCapabilities(), nil
return connection.GetPluginCapabilities(ctx, conn)
}

func getControllerCapabilities(conn *grpc.ClientConn, timeout time.Duration) ([]*csi.ControllerServiceCapability, error) {
func getControllerCapabilities(conn *grpc.ClientConn, timeout time.Duration) (connection.ControllerCapabilitySet, error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

client := csi.NewControllerClient(conn)
req := csi.ControllerGetCapabilitiesRequest{}

rsp, err := client.ControllerGetCapabilities(ctx, &req)
if err != nil {
return nil, err
}
return rsp.GetCapabilities(), nil
return connection.GetControllerCapabilities(ctx, conn)
}

// NewCSIProvisioner creates new CSI provisioner
Expand Down
Loading

0 comments on commit f588078

Please sign in to comment.