Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 1 addition & 7 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 0 additions & 4 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,6 @@
name = "k8s.io/client-go"
version = "kubernetes-1.14.0"

[[constraint]]
name = "k8s.io/csi-api"
version = "kubernetes-1.14.0"

[[constraint]]
name = "github.com/kubernetes-csi/csi-lib-utils"
version = ">=0.4.0-rc1"
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ This information reflects the head of this branch.

| Compatible with CSI Version | Container Image | Min K8s Version |
| ------------------------------------------------------------------------------------------ | ------------------------------------| --------------- |
| [CSI Spec v1.0.0](https://github.com/container-storage-interface/spec/releases/tag/v1.0.0) | quay.io/k8scsi/csi-attacher:v1.0.1 | 1.13 |
| [CSI Spec v1.0.0](https://github.com/container-storage-interface/spec/releases/tag/v1.0.0) | quay.io/k8scsi/csi-attacher:v1.0.1 | 1.14 |

## Usage

Expand Down
12 changes: 2 additions & 10 deletions cmd/csi-attacher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
csiclient "k8s.io/csi-api/pkg/client/clientset/versioned"
csiinformers "k8s.io/csi-api/pkg/client/informers/externalversions"
"k8s.io/klog"

Expand Down Expand Up @@ -97,12 +96,6 @@ func main() {
os.Exit(1)
}

csiClientset, err := csiclient.NewForConfig(config)
if err != nil {
klog.Error(err.Error())
os.Exit(1)
}

factory := informers.NewSharedInformerFactory(clientset, *resync)
var csiFactory csiinformers.SharedInformerFactory
var handler controller.Handler
Expand Down Expand Up @@ -155,10 +148,9 @@ func main() {
pvLister := factory.Core().V1().PersistentVolumes().Lister()
nodeLister := factory.Core().V1().Nodes().Lister()
vaLister := factory.Storage().V1beta1().VolumeAttachments().Lister()
csiFactory := csiinformers.NewSharedInformerFactory(csiClientset, *resync)
nodeInfoLister := csiFactory.Csi().V1alpha1().CSINodeInfos().Lister()
csiNodeLister := factory.Storage().V1beta1().CSINodes().Lister()
attacher := attacher.NewAttacher(csiConn)
handler = controller.NewCSIHandler(clientset, csiClientset, csiAttacher, attacher, pvLister, nodeLister, nodeInfoLister, vaLister, timeout, supportsReadOnly)
handler = controller.NewCSIHandler(clientset, csiAttacher, attacher, pvLister, nodeLister, csiNodeLister, vaLister, timeout, supportsReadOnly)
klog.V(2).Infof("CSI driver supports ControllerPublishUnpublish, using real CSI handler")
} else {
handler = controller.NewTrivialHandler(clientset)
Expand Down
28 changes: 11 additions & 17 deletions pkg/controller/csi_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ import (
corelisters "k8s.io/client-go/listers/core/v1"
storagelisters "k8s.io/client-go/listers/storage/v1beta1"
"k8s.io/client-go/util/workqueue"
csiclient "k8s.io/csi-api/pkg/client/clientset/versioned"
csilisters "k8s.io/csi-api/pkg/client/listers/csi/v1alpha1"
csitranslationlib "k8s.io/csi-translation-lib"
)

Expand All @@ -42,12 +40,11 @@ import (
// before deletion.
type csiHandler struct {
client kubernetes.Interface
csiClientSet csiclient.Interface
attacherName string
attacher attacher.Attacher
pvLister corelisters.PersistentVolumeLister
nodeLister corelisters.NodeLister
nodeInfoLister csilisters.CSINodeInfoLister
csiNodeLister storagelisters.CSINodeLister
vaLister storagelisters.VolumeAttachmentLister
vaQueue, pvQueue workqueue.RateLimitingInterface
timeout time.Duration
Expand All @@ -59,24 +56,22 @@ var _ Handler = &csiHandler{}
// NewCSIHandler creates a new CSIHandler.
func NewCSIHandler(
client kubernetes.Interface,
csiClientSet csiclient.Interface,
attacherName string,
attacher attacher.Attacher,
pvLister corelisters.PersistentVolumeLister,
nodeLister corelisters.NodeLister,
nodeInfoLister csilisters.CSINodeInfoLister,
csiNodeLister storagelisters.CSINodeLister,
vaLister storagelisters.VolumeAttachmentLister,
timeout *time.Duration,
supportsPublishReadOnly bool) Handler {

return &csiHandler{
client: client,
csiClientSet: csiClientSet,
attacherName: attacherName,
attacher: attacher,
pvLister: pvLister,
nodeLister: nodeLister,
nodeInfoLister: nodeInfoLister,
csiNodeLister: csiNodeLister,
vaLister: vaLister,
timeout: *timeout,
supportsPublishReadOnly: supportsPublishReadOnly,
Expand Down Expand Up @@ -507,20 +502,19 @@ func (h *csiHandler) getCredentialsFromPV(csiSource *v1.CSIPersistentVolumeSourc
// getNodeID finds node ID from Node API object. If caller wants, it can find
// node ID stored in VolumeAttachment annotation.
func (h *csiHandler) getNodeID(driver string, nodeName string, va *storage.VolumeAttachment) (string, error) {
// Try to find CSINodeInfo first.
// nodeInfo, err := h.nodeInfoLister.Get(nodeName) // TODO (kubernetes/kubernetes #71052) use the lister once it syncs existing CSINodeInfo objects properly.
nodeInfo, err := h.csiClientSet.CsiV1alpha1().CSINodeInfos().Get(nodeName, metav1.GetOptions{})
// Try to find CSINode first.
csiNode, err := h.csiNodeLister.Get(nodeName)
if err == nil {
if nodeID, found := GetNodeIDFromNodeInfo(driver, nodeInfo); found {
klog.V(4).Infof("Found NodeID %s in CSINodeInfo %s", nodeID, nodeName)
if nodeID, found := GetNodeIDFromCSINode(driver, csiNode); found {
klog.V(4).Infof("Found NodeID %s in CSINode %s", nodeID, nodeName)
return nodeID, nil
}
klog.V(4).Infof("CSINodeInfo %s does not contain driver %s", nodeName, driver)
// CSINodeInfo exists, but does not have the requested driver.
klog.V(4).Infof("CSINode %s does not contain driver %s", nodeName, driver)
// CSINode exists, but does not have the requested driver.
// Fall through to Node annotation.
} else {
// Can't get CSINodeInfo, fall through to Node annotation.
klog.V(4).Infof("Can't get CSINodeInfo %s: %s", nodeName, err)
// Can't get CSINode, fall through to Node annotation.
klog.V(4).Infof("Can't get CSINode %s: %s", nodeName, err)
}

// Check Node annotation.
Expand Down
35 changes: 15 additions & 20 deletions pkg/controller/csi_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,6 @@ import (
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
core "k8s.io/client-go/testing"
csiapi "k8s.io/csi-api/pkg/apis/csi/v1alpha1"
csiclient "k8s.io/csi-api/pkg/client/clientset/versioned"
csiinformers "k8s.io/csi-api/pkg/client/informers/externalversions"
)

const (
Expand All @@ -51,30 +48,28 @@ var (

var timeout = 10 * time.Millisecond

func csiHandlerFactory(client kubernetes.Interface, csiClient csiclient.Interface, informerFactory informers.SharedInformerFactory, csiInformerFactory csiinformers.SharedInformerFactory, csi attacher.Attacher) Handler {
func csiHandlerFactory(client kubernetes.Interface, informerFactory informers.SharedInformerFactory, csi attacher.Attacher) Handler {
return NewCSIHandler(
client,
csiClient,
testAttacherName,
csi,
informerFactory.Core().V1().PersistentVolumes().Lister(),
informerFactory.Core().V1().Nodes().Lister(),
csiInformerFactory.Csi().V1alpha1().CSINodeInfos().Lister(),
informerFactory.Storage().V1beta1().CSINodes().Lister(),
informerFactory.Storage().V1beta1().VolumeAttachments().Lister(),
&timeout,
true, /* supports PUBLISH_READONLY */
)
}

func csiHandlerFactoryNoReadOnly(client kubernetes.Interface, csiClient csiclient.Interface, informerFactory informers.SharedInformerFactory, csiInformerFactory csiinformers.SharedInformerFactory, csi attacher.Attacher) Handler {
func csiHandlerFactoryNoReadOnly(client kubernetes.Interface, informerFactory informers.SharedInformerFactory, csi attacher.Attacher) Handler {
return NewCSIHandler(
client,
csiClient,
testAttacherName,
csi,
informerFactory.Core().V1().PersistentVolumes().Lister(),
informerFactory.Core().V1().Nodes().Lister(),
csiInformerFactory.Csi().V1alpha1().CSINodeInfos().Lister(),
informerFactory.Storage().V1beta1().CSINodes().Lister(),
informerFactory.Storage().V1beta1().VolumeAttachments().Lister(),
&timeout,
false, /* does not support PUBLISH_READONLY */
Expand Down Expand Up @@ -182,13 +177,13 @@ func nodeWithoutAnnotations() *v1.Node {
return n
}

func csiNodeInfo() *csiapi.CSINodeInfo {
return &csiapi.CSINodeInfo{
func csiNode() *storage.CSINode {
return &storage.CSINode{
ObjectMeta: metav1.ObjectMeta{
Name: testNodeName,
},
Spec: csiapi.CSINodeInfoSpec{
Drivers: []csiapi.CSIDriverInfoSpec{
Spec: storage.CSINodeSpec{
Drivers: []storage.CSINodeDriver{
{
Name: testAttacherName,
NodeID: testNodeID,
Expand All @@ -198,12 +193,12 @@ func csiNodeInfo() *csiapi.CSINodeInfo {
}
}

func csiNodeInfoEmpty() *csiapi.CSINodeInfo {
return &csiapi.CSINodeInfo{
func csiNodeEmpty() *storage.CSINode {
return &storage.CSINode{
ObjectMeta: metav1.ObjectMeta{
Name: testNodeName,
},
Spec: csiapi.CSINodeInfoSpec{Drivers: []csiapi.CSIDriverInfoSpec{}},
Spec: storage.CSINodeSpec{Drivers: []storage.CSINodeDriver{}},
}
}

Expand Down Expand Up @@ -593,16 +588,16 @@ func TestCSIHandler(t *testing.T) {
},
},
{
name: "CSINodeInfo exists without the driver, Node without annotations -> error",
initialObjects: []runtime.Object{pvWithFinalizer(), nodeWithoutAnnotations(), csiNodeInfoEmpty()},
name: "CSINode exists without the driver, Node without annotations -> error",
initialObjects: []runtime.Object{pvWithFinalizer(), nodeWithoutAnnotations(), csiNodeEmpty()},
addedVA: va(false, fin, ann),
expectedActions: []core.Action{
core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, vaWithAttachError(va(false, fin, ann), "node \"node1\" has no NodeID annotation")),
},
},
{
name: "CSINodeInfo exists with the driver, Node without annotations -> success",
initialObjects: []runtime.Object{pvWithFinalizer(), nodeWithoutAnnotations(), csiNodeInfo()},
name: "CSINode exists with the driver, Node without annotations -> success",
initialObjects: []runtime.Object{pvWithFinalizer(), nodeWithoutAnnotations(), csiNode()},
addedVA: va(false /*attached*/, "" /*finalizer*/, nil),
expectedActions: []core.Action{
// Finalizer is saved first
Expand Down
18 changes: 6 additions & 12 deletions pkg/controller/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,6 @@ import (
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
core "k8s.io/client-go/testing"
csiapi "k8s.io/csi-api/pkg/apis/csi/v1alpha1"
csiclient "k8s.io/csi-api/pkg/client/clientset/versioned"
fakecsi "k8s.io/csi-api/pkg/client/clientset/versioned/fake"
csiinformers "k8s.io/csi-api/pkg/client/informers/externalversions"
)

// This is an unit test framework. It is heavily inspired by serviceaccount
Expand Down Expand Up @@ -102,7 +98,7 @@ type csiCall struct {
delay time.Duration
}

type handlerFactory func(client kubernetes.Interface, csiClient csiclient.Interface, informerFactory informers.SharedInformerFactory, csiInformerFactory csiinformers.SharedInformerFactory, csi attacher.Attacher) Handler
type handlerFactory func(client kubernetes.Interface, informerFactory informers.SharedInformerFactory, csi attacher.Attacher) Handler

func runTests(t *testing.T, handlerFactory handlerFactory, tests []testCase) {
for _, test := range tests {
Expand All @@ -122,7 +118,7 @@ func runTests(t *testing.T, handlerFactory handlerFactory, tests []testCase) {
csiObjs := []runtime.Object{}
for _, obj := range objs {
switch obj.(type) {
case *csiapi.CSINodeInfo:
case *storage.CSINode:
csiObjs = append(csiObjs, obj)
default:
coreObjs = append(coreObjs, obj)
Expand All @@ -131,13 +127,11 @@ func runTests(t *testing.T, handlerFactory handlerFactory, tests []testCase) {

// Create client and informers
client := fake.NewSimpleClientset(coreObjs...)
csiClient := fakecsi.NewSimpleClientset(csiObjs...)
informers := informers.NewSharedInformerFactory(client, time.Hour /* disable resync*/)
vaInformer := informers.Storage().V1beta1().VolumeAttachments()
pvInformer := informers.Core().V1().PersistentVolumes()
nodeInformer := informers.Core().V1().Nodes()
csiInformers := csiinformers.NewSharedInformerFactory(csiClient, time.Hour /* disable resync*/)
nodeInfoInformer := csiInformers.Csi().V1alpha1().CSINodeInfos()
csiNodeInformer := informers.Storage().V1beta1().CSINodes()
// Fill the informers with initial objects so controller can Get() them
for _, obj := range objs {
switch obj.(type) {
Expand All @@ -149,8 +143,8 @@ func runTests(t *testing.T, handlerFactory handlerFactory, tests []testCase) {
vaInformer.Informer().GetStore().Add(obj)
case *v1.Secret:
// Secrets are not cached in any informer
case *csiapi.CSINodeInfo:
nodeInfoInformer.Informer().GetStore().Add(obj)
case *storage.CSINode:
csiNodeInformer.Informer().GetStore().Add(obj)
default:
t.Fatalf("Unknown initalObject type: %+v", obj)
}
Expand Down Expand Up @@ -180,7 +174,7 @@ func runTests(t *testing.T, handlerFactory handlerFactory, tests []testCase) {

// Construct controller
csiConnection := &fakeCSIConnection{t: t, calls: test.expectedCSICalls}
handler := handlerFactory(client, csiClient, informers, csiInformers, csiConnection)
handler := handlerFactory(client, informers, csiConnection)
ctrl := NewCSIAttachController(client, testAttacherName, handler, vaInformer, pvInformer)

// Start the test by enqueueing the right event
Expand Down
4 changes: 1 addition & 3 deletions pkg/controller/trivial_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,9 @@ import (
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
core "k8s.io/client-go/testing"
csiclient "k8s.io/csi-api/pkg/client/clientset/versioned"
csiinformers "k8s.io/csi-api/pkg/client/informers/externalversions"
)

func trivialHandlerFactory(client kubernetes.Interface, csiClient csiclient.Interface, informerFactory informers.SharedInformerFactory, csiInformerFactory csiinformers.SharedInformerFactory, csi attacher.Attacher) Handler {
func trivialHandlerFactory(client kubernetes.Interface, informerFactory informers.SharedInformerFactory, csi attacher.Attacher) Handler {
return NewTrivialHandler(client)
}

Expand Down
7 changes: 3 additions & 4 deletions pkg/controller/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"k8s.io/api/core/v1"
storage "k8s.io/api/storage/v1beta1"
"k8s.io/client-go/kubernetes"
csiapi "k8s.io/csi-api/pkg/apis/csi/v1alpha1"
"k8s.io/klog"
)

Expand Down Expand Up @@ -125,9 +124,9 @@ func GetNodeIDFromNode(driver string, node *v1.Node) (string, error) {
return nodeID, nil
}

// GetNodeIDFromNodeInfo returns nodeID from CSIDriverInfoSpec
func GetNodeIDFromNodeInfo(driver string, nodeInfo *csiapi.CSINodeInfo) (string, bool) {
for _, d := range nodeInfo.Spec.Drivers {
// GetNodeIDFromCSINode returns nodeID from CSIDriverInfoSpec
func GetNodeIDFromCSINode(driver string, csiNode *storage.CSINode) (string, bool) {
for _, d := range csiNode.Spec.Drivers {
if d.Name == driver {
return d.NodeID, true
}
Expand Down
Loading