Skip to content

Commit

Permalink
remove dependence of VolumePluginMgr from DesiredStateOfWorld and Act…
Browse files Browse the repository at this point in the history
…ualStateOfWorld
  • Loading branch information
mlmhl committed Dec 29, 2017
1 parent 0f01d54 commit 96b7de3
Show file tree
Hide file tree
Showing 13 changed files with 251 additions and 260 deletions.
21 changes: 13 additions & 8 deletions cmd/kube-controller-manager/app/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ import (
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/controller"
serviceaccountcontroller "k8s.io/kubernetes/pkg/controller/serviceaccount"
"k8s.io/kubernetes/pkg/controller/volume/cache"
"k8s.io/kubernetes/pkg/serviceaccount"
"k8s.io/kubernetes/pkg/util/configz"
"k8s.io/kubernetes/pkg/version"
Expand Down Expand Up @@ -280,6 +281,9 @@ type ControllerContext struct {
// InformersStarted is closed after all of the controllers have been initialized and are running. After this point it is safe,
// for an individual controller to start the shared informers. Before it is closed, they should not.
InformersStarted chan struct{}

// DesiredStateOfWorld is the desired state of volumes used by AttachDetachController and ExpandController.
DesiredStateOfWorld cache.DesiredStateOfWorld
}

func (c ControllerContext) IsControllerEnabled(name string) bool {
Expand Down Expand Up @@ -473,14 +477,15 @@ func CreateControllerContext(s *options.CMServer, rootClientBuilder, clientBuild
}

ctx := ControllerContext{
ClientBuilder: clientBuilder,
InformerFactory: sharedInformers,
Options: *s,
AvailableResources: availableResources,
Cloud: cloud,
LoopMode: loopMode,
Stop: stop,
InformersStarted: make(chan struct{}),
ClientBuilder: clientBuilder,
InformerFactory: sharedInformers,
Options: *s,
AvailableResources: availableResources,
Cloud: cloud,
LoopMode: loopMode,
Stop: stop,
InformersStarted: make(chan struct{}),
DesiredStateOfWorld: cache.NewDesiredStateOfWorld(),
}
return ctx, nil
}
Expand Down
1 change: 1 addition & 0 deletions cmd/kube-controller-manager/app/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ func startAttachDetachController(ctx ControllerContext) (bool, error) {
ctx.Options.DisableAttachDetachReconcilerSync,
ctx.Options.ReconcilerSyncLoopPeriod.Duration,
attachdetach.DefaultTimerConfig,
ctx.DesiredStateOfWorld,
)
if attachDetachControllerErr != nil {
return true, fmt.Errorf("failed to start attach/detach controller: %v", attachDetachControllerErr)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ import (
"k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/volume/cache"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/populator"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/reconciler"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/statusupdater"
"k8s.io/kubernetes/pkg/controller/volume/cache"
"k8s.io/kubernetes/pkg/controller/volume/util"
"k8s.io/kubernetes/pkg/util/io"
"k8s.io/kubernetes/pkg/util/mount"
Expand Down Expand Up @@ -101,7 +101,8 @@ func NewAttachDetachController(
prober volume.DynamicPluginProber,
disableReconciliationSync bool,
reconcilerSyncDuration time.Duration,
timerConfig TimerConfig) (AttachDetachController, error) {
timerConfig TimerConfig,
desiredStateOfWorld cache.DesiredStateOfWorld) (AttachDetachController, error) {
// TODO: The default resyncPeriod for shared informers is 12 hours, this is
// unacceptable for the attach/detach controller. For example, if a pod is
// skipped because the node it is scheduled to didn't set its annotation in
Expand Down Expand Up @@ -139,8 +140,8 @@ func NewAttachDetachController(
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "attachdetach-controller"})
blkutil := volumeutil.NewBlockVolumePathHandler()

adc.desiredStateOfWorld = cache.NewDesiredStateOfWorld(&adc.volumePluginMgr)
adc.actualStateOfWorld = cache.NewActualStateOfWorld(&adc.volumePluginMgr)
adc.desiredStateOfWorld = desiredStateOfWorld
adc.actualStateOfWorld = cache.NewActualStateOfWorld()
adc.attacherDetacher =
operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
kubeClient,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/informers"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/volume/cache"
controllervolumetesting "k8s.io/kubernetes/pkg/controller/volume/attachdetach/testing"
"k8s.io/kubernetes/pkg/controller/volume/cache"
"k8s.io/kubernetes/pkg/volume"
)

Expand All @@ -49,7 +49,8 @@ func Test_NewAttachDetachController_Positive(t *testing.T) {
nil, /* prober */
false,
5*time.Second,
DefaultTimerConfig)
DefaultTimerConfig,
cache.NewDesiredStateOfWorld())

// Assert
if err != nil {
Expand Down Expand Up @@ -87,8 +88,8 @@ func Test_AttachDetachControllerStateOfWolrdPopulators_Positive(t *testing.T) {
t.Fatalf("Could not initialize volume plugins for Attach/Detach Controller: %+v", err)
}

adc.actualStateOfWorld = cache.NewActualStateOfWorld(&adc.volumePluginMgr)
adc.desiredStateOfWorld = cache.NewDesiredStateOfWorld(&adc.volumePluginMgr)
adc.actualStateOfWorld = cache.NewActualStateOfWorld()
adc.desiredStateOfWorld = cache.NewDesiredStateOfWorld()

err := adc.populateActualStateOfWorld()
if err != nil {
Expand Down Expand Up @@ -223,7 +224,8 @@ func attachDetachRecoveryTestCase(t *testing.T, extraPods1 []*v1.Pod, extraPods2
prober,
false,
1*time.Second,
DefaultTimerConfig)
DefaultTimerConfig,
cache.NewDesiredStateOfWorld())

if err != nil {
t.Fatalf("Run failed with error. Expected: <no error> Actual: <%v>", err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func TestFindAndAddActivePods_FindAndRemoveDeletedPods(t *testing.T) {
fakeInformerFactory := informers.NewSharedInformerFactory(fakeClient, controller.NoResyncPeriodFunc())
fakePodInformer := fakeInformerFactory.Core().V1().Pods()

fakesDSW := cache.NewDesiredStateOfWorld(fakeVolumePluginMgr)
fakesDSW := cache.NewDesiredStateOfWorld()

pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Expand Down
77 changes: 54 additions & 23 deletions pkg/controller/volume/attachdetach/reconciler/reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,13 @@ import (
"k8s.io/client-go/informers"
"k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/volume/cache"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/statusupdater"
controllervolumetesting "k8s.io/kubernetes/pkg/controller/volume/attachdetach/testing"
"k8s.io/kubernetes/pkg/controller/volume/cache"
volumetesting "k8s.io/kubernetes/pkg/volume/testing"
"k8s.io/kubernetes/pkg/volume/util/operationexecutor"
"k8s.io/kubernetes/pkg/volume/util/types"
"k8s.io/kubernetes/pkg/volume/util/volumehelper"
)

const (
Expand All @@ -46,8 +47,8 @@ const (
func Test_Run_Positive_DoNothing(t *testing.T) {
// Arrange
volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t)
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
asw := cache.NewActualStateOfWorld(volumePluginMgr)
dsw := cache.NewDesiredStateOfWorld()
asw := cache.NewActualStateOfWorld()
fakeKubeClient := controllervolumetesting.CreateTestClient()
fakeRecorder := &record.FakeRecorder{}
fakeHandler := volumetesting.NewBlockVolumePathHandler()
Expand Down Expand Up @@ -82,8 +83,8 @@ func Test_Run_Positive_DoNothing(t *testing.T) {
func Test_Run_Positive_OneDesiredVolumeAttach(t *testing.T) {
// Arrange
volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t)
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
asw := cache.NewActualStateOfWorld(volumePluginMgr)
dsw := cache.NewDesiredStateOfWorld()
asw := cache.NewActualStateOfWorld()
fakeKubeClient := controllervolumetesting.CreateTestClient()
fakeRecorder := &record.FakeRecorder{}
fakeHandler := volumetesting.NewBlockVolumePathHandler()
Expand All @@ -109,7 +110,12 @@ func Test_Run_Positive_OneDesiredVolumeAttach(t *testing.T) {
nodeName)
}

_, podErr := dsw.AddPod(types.UniquePodName(podName), controllervolumetesting.NewPod(podName, podName), volumeSpec, nodeName)
volumeNameFromSpec, getVolumeNameErr := volumehelper.GetUniqueVolumeNameFromSpec(fakePlugin, volumeSpec)
if getVolumeNameErr != nil {
t.Fatalf("GetUniqueVolumeNameFromSpec. Expected: <no error> Actual: <%v>", getVolumeNameErr)
}

_, podErr := dsw.AddPod(types.UniquePodName(podName), controllervolumetesting.NewPod(podName, podName), volumeSpec, nodeName, volumeNameFromSpec)
if podErr != nil {
t.Fatalf("AddPod failed. Expected: <no error> Actual: <%v>", podErr)
}
Expand All @@ -134,8 +140,8 @@ func Test_Run_Positive_OneDesiredVolumeAttach(t *testing.T) {
func Test_Run_Positive_OneDesiredVolumeAttachThenDetachWithUnmountedVolume(t *testing.T) {
// Arrange
volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t)
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
asw := cache.NewActualStateOfWorld(volumePluginMgr)
dsw := cache.NewDesiredStateOfWorld()
asw := cache.NewActualStateOfWorld()
fakeKubeClient := controllervolumetesting.CreateTestClient()
fakeRecorder := &record.FakeRecorder{}
fakeHandler := volumetesting.NewBlockVolumePathHandler()
Expand All @@ -161,7 +167,12 @@ func Test_Run_Positive_OneDesiredVolumeAttachThenDetachWithUnmountedVolume(t *te
nodeName)
}

generatedVolumeName, podAddErr := dsw.AddPod(types.UniquePodName(podName), controllervolumetesting.NewPod(podName, podName), volumeSpec, nodeName)
volumeNameFromSpec, getVolumeNameErr := volumehelper.GetUniqueVolumeNameFromSpec(fakePlugin, volumeSpec)
if getVolumeNameErr != nil {
t.Fatalf("GetUniqueVolumeNameFromSpec. Expected: <no error> Actual: <%v>", getVolumeNameErr)
}

generatedVolumeName, podAddErr := dsw.AddPod(types.UniquePodName(podName), controllervolumetesting.NewPod(podName, podName), volumeSpec, nodeName, volumeNameFromSpec)
if podAddErr != nil {
t.Fatalf("AddPod failed. Expected: <no error> Actual: <%v>", podAddErr)
}
Expand Down Expand Up @@ -207,8 +218,8 @@ func Test_Run_Positive_OneDesiredVolumeAttachThenDetachWithUnmountedVolume(t *te
func Test_Run_Positive_OneDesiredVolumeAttachThenDetachWithMountedVolume(t *testing.T) {
// Arrange
volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t)
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
asw := cache.NewActualStateOfWorld(volumePluginMgr)
dsw := cache.NewDesiredStateOfWorld()
asw := cache.NewActualStateOfWorld()
fakeKubeClient := controllervolumetesting.CreateTestClient()
fakeRecorder := &record.FakeRecorder{}
fakeHandler := volumetesting.NewBlockVolumePathHandler()
Expand All @@ -234,7 +245,12 @@ func Test_Run_Positive_OneDesiredVolumeAttachThenDetachWithMountedVolume(t *test
nodeName)
}

generatedVolumeName, podAddErr := dsw.AddPod(types.UniquePodName(podName), controllervolumetesting.NewPod(podName, podName), volumeSpec, nodeName)
volumeNameFromSpec, getVolumeNameErr := volumehelper.GetUniqueVolumeNameFromSpec(fakePlugin, volumeSpec)
if getVolumeNameErr != nil {
t.Fatalf("GetUniqueVolumeNameFromSpec. Expected: <no error> Actual: <%v>", getVolumeNameErr)
}

generatedVolumeName, podAddErr := dsw.AddPod(types.UniquePodName(podName), controllervolumetesting.NewPod(podName, podName), volumeSpec, nodeName, volumeNameFromSpec)
if podAddErr != nil {
t.Fatalf("AddPod failed. Expected: <no error> Actual: <%v>", podAddErr)
}
Expand Down Expand Up @@ -280,8 +296,8 @@ func Test_Run_Positive_OneDesiredVolumeAttachThenDetachWithMountedVolume(t *test
func Test_Run_Negative_OneDesiredVolumeAttachThenDetachWithUnmountedVolumeUpdateStatusFail(t *testing.T) {
// Arrange
volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t)
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
asw := cache.NewActualStateOfWorld(volumePluginMgr)
dsw := cache.NewDesiredStateOfWorld()
asw := cache.NewActualStateOfWorld()
fakeKubeClient := controllervolumetesting.CreateTestClient()
fakeRecorder := &record.FakeRecorder{}
fakeHandler := volumetesting.NewBlockVolumePathHandler()
Expand All @@ -307,7 +323,12 @@ func Test_Run_Negative_OneDesiredVolumeAttachThenDetachWithUnmountedVolumeUpdate
nodeName)
}

generatedVolumeName, podAddErr := dsw.AddPod(types.UniquePodName(podName), controllervolumetesting.NewPod(podName, podName), volumeSpec, nodeName)
volumeNameFromSpec, getVolumeNameErr := volumehelper.GetUniqueVolumeNameFromSpec(fakePlugin, volumeSpec)
if getVolumeNameErr != nil {
t.Fatalf("GetUniqueVolumeNameFromSpec. Expected: <no error> Actual: <%v>", getVolumeNameErr)
}

generatedVolumeName, podAddErr := dsw.AddPod(types.UniquePodName(podName), controllervolumetesting.NewPod(podName, podName), volumeSpec, nodeName, volumeNameFromSpec)
if podAddErr != nil {
t.Fatalf("AddPod failed. Expected: <no error> Actual: <%v>", podAddErr)
}
Expand Down Expand Up @@ -356,8 +377,8 @@ func Test_Run_Negative_OneDesiredVolumeAttachThenDetachWithUnmountedVolumeUpdate
func Test_Run_OneVolumeAttachAndDetachMultipleNodesWithReadWriteMany(t *testing.T) {
// Arrange
volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t)
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
asw := cache.NewActualStateOfWorld(volumePluginMgr)
dsw := cache.NewDesiredStateOfWorld()
asw := cache.NewActualStateOfWorld()
fakeKubeClient := controllervolumetesting.CreateTestClient()
fakeRecorder := &record.FakeRecorder{}
fakeHandler := volumetesting.NewBlockVolumePathHandler()
Expand All @@ -380,11 +401,16 @@ func Test_Run_OneVolumeAttachAndDetachMultipleNodesWithReadWriteMany(t *testing.
dsw.AddNode(nodeName1, false /*keepTerminatedPodVolumes*/)
dsw.AddNode(nodeName2, false /*keepTerminatedPodVolumes*/)

generatedVolumeName, podAddErr := dsw.AddPod(types.UniquePodName(podName1), controllervolumetesting.NewPod(podName1, podName1), volumeSpec, nodeName1)
volumeNameFromSpec, getVolumeNameErr := volumehelper.GetUniqueVolumeNameFromSpec(fakePlugin, volumeSpec)
if getVolumeNameErr != nil {
t.Fatalf("GetUniqueVolumeNameFromSpec. Expected: <no error> Actual: <%v>", getVolumeNameErr)
}

generatedVolumeName, podAddErr := dsw.AddPod(types.UniquePodName(podName1), controllervolumetesting.NewPod(podName1, podName1), volumeSpec, nodeName1, volumeNameFromSpec)
if podAddErr != nil {
t.Fatalf("AddPod failed. Expected: <no error> Actual: <%v>", podAddErr)
}
_, podAddErr = dsw.AddPod(types.UniquePodName(podName2), controllervolumetesting.NewPod(podName2, podName2), volumeSpec, nodeName2)
_, podAddErr = dsw.AddPod(types.UniquePodName(podName2), controllervolumetesting.NewPod(podName2, podName2), volumeSpec, nodeName2, volumeNameFromSpec)
if podAddErr != nil {
t.Fatalf("AddPod failed. Expected: <no error> Actual: <%v>", podAddErr)
}
Expand Down Expand Up @@ -448,8 +474,8 @@ func Test_Run_OneVolumeAttachAndDetachMultipleNodesWithReadWriteMany(t *testing.
func Test_Run_OneVolumeAttachAndDetachMultipleNodesWithReadWriteOnce(t *testing.T) {
// Arrange
volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t)
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
asw := cache.NewActualStateOfWorld(volumePluginMgr)
dsw := cache.NewDesiredStateOfWorld()
asw := cache.NewActualStateOfWorld()
fakeKubeClient := controllervolumetesting.CreateTestClient()
fakeRecorder := &record.FakeRecorder{}
fakeHandler := volumetesting.NewBlockVolumePathHandler()
Expand All @@ -472,12 +498,17 @@ func Test_Run_OneVolumeAttachAndDetachMultipleNodesWithReadWriteOnce(t *testing.
dsw.AddNode(nodeName1, false /*keepTerminatedPodVolumes*/)
dsw.AddNode(nodeName2, false /*keepTerminatedPodVolumes*/)

volumeNameFromSpec, getVolumeNameErr := volumehelper.GetUniqueVolumeNameFromSpec(fakePlugin, volumeSpec)
if getVolumeNameErr != nil {
t.Fatalf("GetUniqueVolumeNameFromSpec. Expected: <no error> Actual: <%v>", getVolumeNameErr)
}

// Add both pods at the same time to provoke a potential race condition in the reconciler
generatedVolumeName, podAddErr := dsw.AddPod(types.UniquePodName(podName1), controllervolumetesting.NewPod(podName1, podName1), volumeSpec, nodeName1)
generatedVolumeName, podAddErr := dsw.AddPod(types.UniquePodName(podName1), controllervolumetesting.NewPod(podName1, podName1), volumeSpec, nodeName1, volumeNameFromSpec)
if podAddErr != nil {
t.Fatalf("AddPod failed. Expected: <no error> Actual: <%v>", podAddErr)
}
_, podAddErr = dsw.AddPod(types.UniquePodName(podName2), controllervolumetesting.NewPod(podName2, podName2), volumeSpec, nodeName2)
_, podAddErr = dsw.AddPod(types.UniquePodName(podName2), controllervolumetesting.NewPod(podName2, podName2), volumeSpec, nodeName2, volumeNameFromSpec)
if podAddErr != nil {
t.Fatalf("AddPod failed. Expected: <no error> Actual: <%v>", podAddErr)
}
Expand Down
36 changes: 2 additions & 34 deletions pkg/controller/volume/cache/actual_state_of_world.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util/operationexecutor"
"k8s.io/kubernetes/pkg/volume/util/volumehelper"
)

// ActualStateOfWorld defines a set of thread-safe operations supported on
Expand Down Expand Up @@ -145,11 +144,10 @@ type AttachedVolume struct {
}

// NewActualStateOfWorld returns a new instance of ActualStateOfWorld.
func NewActualStateOfWorld(volumePluginMgr *volume.VolumePluginMgr) ActualStateOfWorld {
func NewActualStateOfWorld() ActualStateOfWorld {
return &actualStateOfWorld{
attachedVolumes: make(map[v1.UniqueVolumeName]attachedVolume),
nodesToUpdateStatusFor: make(map[types.NodeName]nodeToUpdateStatusFor),
volumePluginMgr: volumePluginMgr,
}
}

Expand All @@ -166,10 +164,6 @@ type actualStateOfWorld struct {
// the node (including the list of volumes to report attached).
nodesToUpdateStatusFor map[types.NodeName]nodeToUpdateStatusFor

// volumePluginMgr is the volume plugin manager used to create volume
// plugin objects.
volumePluginMgr *volume.VolumePluginMgr

sync.RWMutex
}

Expand Down Expand Up @@ -261,36 +255,10 @@ func (asw *actualStateOfWorld) AddVolumeToReportAsAttached(
}

func (asw *actualStateOfWorld) AddVolumeNode(
uniqueName v1.UniqueVolumeName, volumeSpec *volume.Spec, nodeName types.NodeName, devicePath string) (v1.UniqueVolumeName, error) {
volumeName v1.UniqueVolumeName, volumeSpec *volume.Spec, nodeName types.NodeName, devicePath string) (v1.UniqueVolumeName, error) {
asw.Lock()
defer asw.Unlock()

var volumeName v1.UniqueVolumeName
if volumeSpec != nil {
attachableVolumePlugin, err := asw.volumePluginMgr.FindAttachablePluginBySpec(volumeSpec)
if err != nil || attachableVolumePlugin == nil {
return "", fmt.Errorf(
"failed to get AttachablePlugin from volumeSpec for volume %q err=%v",
volumeSpec.Name(),
err)
}

volumeName, err = volumehelper.GetUniqueVolumeNameFromSpec(
attachableVolumePlugin, volumeSpec)
if err != nil {
return "", fmt.Errorf(
"failed to GetUniqueVolumeNameFromSpec for volumeSpec %q err=%v",
volumeSpec.Name(),
err)
}
} else {
// volumeSpec is nil
// This happens only on controller startup when reading the volumes from node
// status; if the pods using the volume have been removed and are unreachable
// the volumes should be detached immediately and the spec is not needed
volumeName = uniqueName
}

volumeObj, volumeExists := asw.attachedVolumes[volumeName]
if !volumeExists {
volumeObj = attachedVolume{
Expand Down
Loading

0 comments on commit 96b7de3

Please sign in to comment.