Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Connection() from util package #234

Merged
merged 2 commits into from
Mar 1, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 12 additions & 10 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

[[constraint]]
name = "github.com/kubernetes-csi/csi-test"
version = "v1.0.0-1"
version = "~v1.0.3"

[[constraint]]
name = "github.com/kubernetes-csi/external-snapshotter"
Expand Down Expand Up @@ -62,7 +62,8 @@

[[constraint]]
name = "github.com/kubernetes-csi/csi-lib-utils"
version = "0.1.0"
version = ">=0.4.0-rc1"

[prune]
non-go = true
go-tests = true
Expand Down
28 changes: 15 additions & 13 deletions cmd/csi-provisioner/csi-provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"k8s.io/klog"

flag "github.com/spf13/pflag"
"google.golang.org/grpc"

ctrl "github.com/kubernetes-csi/external-provisioner/pkg/controller"
snapclientset "github.com/kubernetes-csi/external-snapshotter/pkg/client/clientset/versioned"
Expand All @@ -49,7 +48,7 @@ var (
master = flag.String("master", "", "Master URL to build a client config from. Either this or kubeconfig needs to be set if the provisioner is being run out of cluster.")
kubeconfig = flag.String("kubeconfig", "", "Absolute path to the kubeconfig file. Either this or master needs to be set if the provisioner is being run out of cluster.")
csiEndpoint = flag.String("csi-address", "/run/csi/socket", "The gRPC endpoint for Target CSI Volume.")
connectionTimeout = flag.Duration("connection-timeout", 10*time.Second, "Timeout for waiting for CSI driver socket.")
connectionTimeout = flag.Duration("connection-timeout", 0, "This option is deprecated.")
volumeNamePrefix = flag.String("volume-name-prefix", "pvc", "Prefix to apply to the name of a created volume.")
volumeNameUUIDLength = flag.Int("volume-name-uuid-length", -1, "Truncates generated UUID of a created volume to this length. Defaults behavior is to NOT truncate.")
showVersion = flag.Bool("version", false, "Show version.")
Expand Down Expand Up @@ -78,6 +77,10 @@ func init() {
flag.Set("logtostderr", "true")
flag.Parse()

if *connectionTimeout != 0 {
klog.Warningf("Warning: option -connection-timeout is deprecated and has no effect")
}

if err := utilfeature.DefaultFeatureGate.SetFromMap(featureGates); err != nil {
klog.Fatal(err)
}
Expand Down Expand Up @@ -127,17 +130,16 @@ func init() {
klog.Fatalf("Error getting server version: %v", err)
}

// Provisioner will stay in Init until driver opens csi socket, once it's done
// controller will exit this loop and proceed normally.
socketDown := true
grpcClient := &grpc.ClientConn{}
for socketDown {
grpcClient, err = ctrl.Connect(*csiEndpoint, *connectionTimeout)
if err == nil {
socketDown = false
continue
}
time.Sleep(10 * time.Second)
grpcClient, err := ctrl.Connect(*csiEndpoint)
if err != nil {
klog.Error(err.Error())
os.Exit(1)
}

err = ctrl.Probe(grpcClient, *operationTimeout)
if err != nil {
klog.Error(err.Error())
os.Exit(1)
}

// Autodetect provisioner name
Expand Down
98 changes: 15 additions & 83 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ import (
"context"
"fmt"
"math"
"net"
"os"
"strings"
"time"

"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/kubernetes-csi/csi-lib-utils/connection"
"github.com/kubernetes-csi/csi-lib-utils/protosanitizer"
"github.com/kubernetes-csi/external-provisioner/pkg/features"
snapapi "github.com/kubernetes-csi/external-snapshotter/pkg/apis/volumesnapshot/v1alpha1"
Expand All @@ -46,7 +46,6 @@ import (
"k8s.io/klog"

"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
)

type deprecatedSecretParamsMap struct {
Expand Down Expand Up @@ -185,55 +184,18 @@ func logGRPC(ctx context.Context, method string, req, reply interface{}, cc *grp
return err
}

func Connect(address string, timeout time.Duration) (*grpc.ClientConn, error) {
klog.V(2).Infof("Connecting to %s", address)
dialOptions := []grpc.DialOption{
grpc.WithInsecure(),
grpc.WithBackoffMaxDelay(time.Second),
grpc.WithUnaryInterceptor(logGRPC),
}
if strings.HasPrefix(address, "/") {
dialOptions = append(dialOptions, grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) {
return net.DialTimeout("unix", addr, timeout)
}))
}
conn, err := grpc.Dial(address, dialOptions...)
func Connect(address string) (*grpc.ClientConn, error) {
return connection.Connect(address, connection.OnConnectionLoss(connection.ExitOnConnectionLoss()))
}

if err != nil {
return nil, err
}
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
for {
if !conn.WaitForStateChange(ctx, conn.GetState()) {
klog.V(4).Infof("Connection timed out")
return conn, fmt.Errorf("Connection timed out")
}
if conn.GetState() == connectivity.Ready {
klog.V(3).Infof("Connected")
return conn, nil
}
klog.V(4).Infof("Still trying, connection is %s", conn.GetState())
}
func Probe(conn *grpc.ClientConn, singleCallTimeout time.Duration) error {
return connection.ProbeForever(conn, singleCallTimeout)
}

func GetDriverName(conn *grpc.ClientConn, timeout time.Duration) (string, error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

client := csi.NewIdentityClient(conn)

req := csi.GetPluginInfoRequest{}

rsp, err := client.GetPluginInfo(ctx, &req)
if err != nil {
return "", err
}
name := rsp.GetName()
if name == "" {
return "", fmt.Errorf("name is empty")
}
return name, nil
return connection.GetDriverName(ctx, conn)
}

func getDriverCapabilities(conn *grpc.ClientConn, timeout time.Duration) (sets.Int, error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and update getcapabilities?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

Expand All @@ -248,30 +210,16 @@ func getDriverCapabilities(conn *grpc.ClientConn, timeout time.Duration) (sets.I
}

capabilities := make(sets.Int)
for _, cap := range pluginCaps {
if cap == nil {
continue
}
service := cap.GetService()
if service == nil {
continue
}
switch service.GetType() {
for cap := range pluginCaps {
switch cap {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is mixing plugin and controller capabilities into the same set correct? Could you have conflicts in values?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PluginCapability_* and ControllerCapability_* are part of the same enum defined in the provisioner, they don't come from CSI:

const (
PluginCapability_CONTROLLER_SERVICE = iota
PluginCapability_ACCESSIBILITY_CONSTRAINTS
ControllerCapability_CREATE_DELETE_VOLUME
ControllerCapability_CREATE_DELETE_SNAPSHOT
)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing that out. Is there a reason why it needs to have one more level of translation from CSI capabilities?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we sort it out in a separate PR? I'd like to cache the capabilities so provisioner does not load them from the driver every time it needs them. Refactoring of the enum would be better there.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure do you want to use csi-lib-utils 0.4.0-rc1 here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to 0.4.0-rc1

case csi.PluginCapability_Service_CONTROLLER_SERVICE:
capabilities.Insert(PluginCapability_CONTROLLER_SERVICE)
case csi.PluginCapability_Service_VOLUME_ACCESSIBILITY_CONSTRAINTS:
capabilities.Insert(PluginCapability_ACCESSIBILITY_CONSTRAINTS)
}
}
for _, cap := range controllerCaps {
if cap == nil {
continue
}
rpc := cap.GetRpc()
if rpc == nil {
continue
}
switch rpc.GetType() {
for cap := range controllerCaps {
switch cap {
case csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME:
capabilities.Insert(ControllerCapability_CREATE_DELETE_VOLUME)
case csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT:
Expand All @@ -281,32 +229,16 @@ func getDriverCapabilities(conn *grpc.ClientConn, timeout time.Duration) (sets.I
return capabilities, nil
}

func getPluginCapabilities(conn *grpc.ClientConn, timeout time.Duration) ([]*csi.PluginCapability, error) {
func getPluginCapabilities(conn *grpc.ClientConn, timeout time.Duration) (connection.PluginCapabilitySet, error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

client := csi.NewIdentityClient(conn)
req := csi.GetPluginCapabilitiesRequest{}

rsp, err := client.GetPluginCapabilities(ctx, &req)
if err != nil {
return nil, err
}
return rsp.GetCapabilities(), nil
return connection.GetPluginCapabilities(ctx, conn)
}

func getControllerCapabilities(conn *grpc.ClientConn, timeout time.Duration) ([]*csi.ControllerServiceCapability, error) {
func getControllerCapabilities(conn *grpc.ClientConn, timeout time.Duration) (connection.ControllerCapabilitySet, error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

client := csi.NewControllerClient(conn)
req := csi.ControllerGetCapabilitiesRequest{}

rsp, err := client.ControllerGetCapabilities(ctx, &req)
if err != nil {
return nil, err
}
return rsp.GetCapabilities(), nil
return connection.GetControllerCapabilities(ctx, conn)
}

// NewCSIProvisioner creates new CSI provisioner
Expand Down
Loading