From 56de69a2252341c861768f6d7432a5501ebdd30e Mon Sep 17 00:00:00 2001 From: Yecheng Fu Date: Fri, 15 Mar 2019 17:44:23 +0800 Subject: [PATCH 1/2] Update k8s.io/klog to fix bug introduced in k8s.io/klog/pull/31 --- Gopkg.lock | 4 ++-- vendor/k8s.io/klog/klog.go | 18 +++++++++--------- vendor/k8s.io/klog/klog_file.go | 19 ++++++++++++++++--- 3 files changed, 27 insertions(+), 14 deletions(-) diff --git a/Gopkg.lock b/Gopkg.lock index 540bfbefa..166f31781 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -1391,11 +1391,11 @@ [[projects]] branch = "master" - digest = "1:ee7e865e9151970164f5642eb23b61e6515d5534fd80e24f94e25c072a13803a" + digest = "1:894fd68c1b1c5f36f9d6223ee929c355afdc35cdcb3111b09768effdc0ff2736" name = "k8s.io/klog" packages = ["."] pruneopts = "NUT" - revision = "f0c3f94178c11fe3a3503886466b306562049e72" + revision = "291f19f84ceb3e47785d3b4bda653cfcd58622a1" [[projects]] branch = "master" diff --git a/vendor/k8s.io/klog/klog.go b/vendor/k8s.io/klog/klog.go index d1a88132b..398cd1c5d 100644 --- a/vendor/k8s.io/klog/klog.go +++ b/vendor/k8s.io/klog/klog.go @@ -410,7 +410,7 @@ func InitFlags(flagset *flag.FlagSet) { } flagset.StringVar(&logging.logDir, "log_dir", "", "If non-empty, write log files in this directory") flagset.StringVar(&logging.logFile, "log_file", "", "If non-empty, use this log file") - flagset.BoolVar(&logging.toStderr, "logtostderr", false, "log to standard error instead of files") + flagset.BoolVar(&logging.toStderr, "logtostderr", true, "log to standard error instead of files") flagset.BoolVar(&logging.alsoToStderr, "alsologtostderr", false, "log to standard error as well as files") flagset.Var(&logging.verbosity, "v", "number for the log level verbosity") flagset.BoolVar(&logging.skipHeaders, "skip_headers", false, "If true, avoid header prefixes in the log messages") @@ -739,9 +739,7 @@ func (l *loggingT) output(s severity, buf *buffer, file string, line int, alsoTo } data := buf.Bytes() if l.toStderr { - if s >= l.stderrThreshold.get() { - os.Stderr.Write(data) - } + os.Stderr.Write(data) } else { if alsoToStderr || l.alsoToStderr || s >= l.stderrThreshold.get() { os.Stderr.Write(data) @@ -874,7 +872,7 @@ func (sb *syncBuffer) Sync() error { func (sb *syncBuffer) Write(p []byte) (n int, err error) { if sb.nbytes+uint64(len(p)) >= MaxSize { - if err := sb.rotateFile(time.Now()); err != nil { + if err := sb.rotateFile(time.Now(), false); err != nil { sb.logger.exit(err) } } @@ -887,13 +885,15 @@ func (sb *syncBuffer) Write(p []byte) (n int, err error) { } // rotateFile closes the syncBuffer's file and starts a new one. -func (sb *syncBuffer) rotateFile(now time.Time) error { +// The startup argument indicates whether this is the initial startup of klog. +// If startup is true, existing files are opened for apending instead of truncated. +func (sb *syncBuffer) rotateFile(now time.Time, startup bool) error { if sb.file != nil { sb.Flush() sb.file.Close() } var err error - sb.file, _, err = create(severityName[sb.sev], now) + sb.file, _, err = create(severityName[sb.sev], now, startup) sb.nbytes = 0 if err != nil { return err @@ -928,7 +928,7 @@ func (l *loggingT) createFiles(sev severity) error { logger: l, sev: s, } - if err := sb.rotateFile(now); err != nil { + if err := sb.rotateFile(now, true); err != nil { return err } l.file[s] = sb @@ -936,7 +936,7 @@ func (l *loggingT) createFiles(sev severity) error { return nil } -const flushInterval = 30 * time.Second +const flushInterval = 5 * time.Second // flushDaemon periodically flushes the log file buffers. func (l *loggingT) flushDaemon() { diff --git a/vendor/k8s.io/klog/klog_file.go b/vendor/k8s.io/klog/klog_file.go index b76a4e10b..1c4897f4f 100644 --- a/vendor/k8s.io/klog/klog_file.go +++ b/vendor/k8s.io/klog/klog_file.go @@ -97,9 +97,11 @@ var onceLogDirs sync.Once // contains tag ("INFO", "FATAL", etc.) and t. If the file is created // successfully, create also attempts to update the symlink for that tag, ignoring // errors. -func create(tag string, t time.Time) (f *os.File, filename string, err error) { +// The startup argument indicates whether this is the initial startup of klog. +// If startup is true, existing files are opened for apending instead of truncated. +func create(tag string, t time.Time, startup bool) (f *os.File, filename string, err error) { if logging.logFile != "" { - f, err := os.Create(logging.logFile) + f, err := openOrCreate(logging.logFile, startup) if err == nil { return f, logging.logFile, nil } @@ -113,7 +115,7 @@ func create(tag string, t time.Time) (f *os.File, filename string, err error) { var lastErr error for _, dir := range logDirs { fname := filepath.Join(dir, name) - f, err := os.Create(fname) + f, err := openOrCreate(fname, startup) if err == nil { symlink := filepath.Join(dir, link) os.Remove(symlink) // ignore err @@ -124,3 +126,14 @@ func create(tag string, t time.Time) (f *os.File, filename string, err error) { } return nil, "", fmt.Errorf("log: cannot create log: %v", lastErr) } + +// The startup argument indicates whether this is the initial startup of klog. +// If startup is true, existing files are opened for appending instead of truncated. +func openOrCreate(name string, startup bool) (*os.File, error) { + if startup { + f, err := os.OpenFile(name, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666) + return f, err + } + f, err := os.Create(name) + return f, err +} From 2abdda31650270c59e2a14fc871d097bc21f5f07 Mon Sep 17 00:00:00 2001 From: Yecheng Fu Date: Fri, 15 Mar 2019 17:45:00 +0800 Subject: [PATCH 2/2] Mount /dev in cleanup job and add tests for it --- provisioner/pkg/deleter/jobcontroller.go | 22 +- test/e2e/e2e_test.go | 353 +++++++++++++++-------- 2 files changed, 246 insertions(+), 129 deletions(-) diff --git a/provisioner/pkg/deleter/jobcontroller.go b/provisioner/pkg/deleter/jobcontroller.go index 3b046aedd..12b640d22 100644 --- a/provisioner/pkg/deleter/jobcontroller.go +++ b/provisioner/pkg/deleter/jobcontroller.go @@ -253,8 +253,7 @@ func (c *jobController) RemoveJob(pvName string) (CleanupState, *time.Time, erro } // NewCleanupJob creates manifest for a cleaning job. -func NewCleanupJob(pv *apiv1.PersistentVolume, volMode apiv1.PersistentVolumeMode, imageName string, nodeName string, namespace string, mountPath string, - config common.MountConfig) (*batch_v1.Job, error) { +func NewCleanupJob(pv *apiv1.PersistentVolume, volMode apiv1.PersistentVolumeMode, imageName string, nodeName string, namespace string, mountPath string, config common.MountConfig) (*batch_v1.Job, error) { priv := true // Container definition jobContainer := apiv1.Container{ @@ -290,7 +289,24 @@ func NewCleanupJob(pv *apiv1.PersistentVolume, volMode apiv1.PersistentVolumeMod Name: mountName, MountPath: config.MountDir}, } - + if volMode == apiv1.PersistentVolumeBlock { + // We need to mount /dev into clean job for block volume. + // Note that in certain docker setup, this may override `/dev/init` + // which is mounted by docker to start container process. + // https://github.com/kubernetes-sigs/sig-storage-local-static-provisioner/issues/50 + volumes = append(volumes, apiv1.Volume{ + Name: "dev", + VolumeSource: apiv1.VolumeSource{ + HostPath: &apiv1.HostPathVolumeSource{ + Path: "/dev", + }, + }, + }) + jobContainer.VolumeMounts = append(jobContainer.VolumeMounts, apiv1.VolumeMount{ + Name: "dev", + MountPath: "/dev", + }) + } // Make job query-able by some useful labels for admins. labels := map[string]string{ common.NodeNameLabel: nodeName, diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go index 05cc9af3b..6ee20a7f7 100644 --- a/test/e2e/e2e_test.go +++ b/test/e2e/e2e_test.go @@ -42,14 +42,11 @@ import ( "k8s.io/klog" "k8s.io/kubernetes/test/e2e/framework" "k8s.io/kubernetes/test/e2e/storage/utils" + "sigs.k8s.io/sig-storage-local-static-provisioner/provisioner/pkg/common" ) const ( hostBase = "/tmp" - // Path to the first volume in the test containers - // created via createLocalPod or makeLocalPod - // leveraging pv_util.MakePod - volumeDir = "/mnt/volume1" // testFile created in setupLocalVolume testFile = "test-file" // testFileContent written into testFile @@ -97,6 +94,37 @@ const ( BlockLocalVolumeType localVolumeType = "block" ) +type localVolume struct { + volumePath string + volumeType localVolumeType + loopDev string // optional, loop device path under /dev + loopFile string // optional, loop device backing file +} + +type testConfig struct { + UseJobForCleaning bool + VolumeType localVolumeType +} + +var testConfigs = []*testConfig{ + { + false, + DirectoryLocalVolumeType, + }, + { + true, + DirectoryLocalVolumeType, + }, + { + false, + BlockLocalVolumeType, + }, + { + true, + BlockLocalVolumeType, + }, +} + type localTestConfig struct { ns string nodes []v1.Node @@ -146,78 +174,90 @@ var _ = utils.SIGDescribe("PersistentVolumes-local ", func() { nodes: nodes.Items[:maxLen], nodeExecPods: make(map[string]*v1.Pod, maxLen), node0: node0, - scName: fmt.Sprintf("%v-%v", testSCPrefix, f.Namespace.Name), discoveryDir: filepath.Join(hostBase, f.Namespace.Name), } }) - Context("Local volume provisioner [Serial]", func() { - var volumePath string + // Provisioner positive tests + for _, testConfig := range testConfigs { + ctxString := fmt.Sprintf("Local volume provisioner [Serial][UseJobForCleaning: %v][VolumeType: %v]", testConfig.UseJobForCleaning, testConfig.VolumeType) + Context(ctxString, func() { + BeforeEach(func() { + setupStorageClass(config, &immediateMode) + setupLocalVolumeProvisioner(config, testConfig) + createProvisionerDaemonset(config) + }) + + AfterEach(func() { + cleanupLocalVolumeProvisioner(config) + cleanupStorageClass(config) + deleteProvisionerDaemonset(config) + }) + + It("should create and recreate local persistent volume", func() { + By(fmt.Sprintf("Creating a %s volume in discovery directory", testConfig.VolumeType)) + testVol := setupLocalVolumeProvisionerMountPoint(config, config.node0, testConfig.VolumeType) + volumePath := testVol.volumePath + + By("Waiting for a PersistentVolume to be created") + oldPV, err := waitForLocalPersistentVolume(config.client, volumePath) + Expect(err).NotTo(HaveOccurred()) + + // Create a persistent volume claim for local volume: the above volume will be bound. + By("Creating a persistent volume claim") + claim, err := config.client.CoreV1().PersistentVolumeClaims(config.ns).Create(newLocalClaim(config)) + Expect(err).NotTo(HaveOccurred()) + err = framework.WaitForPersistentVolumeClaimPhase( + v1.ClaimBound, config.client, claim.Namespace, claim.Name, framework.Poll, 1*time.Minute) + Expect(err).NotTo(HaveOccurred()) + + claim, err = config.client.CoreV1().PersistentVolumeClaims(config.ns).Get(claim.Name, metav1.GetOptions{}) + Expect(err).NotTo(HaveOccurred()) + Expect(claim.Spec.VolumeName).To(Equal(oldPV.Name)) + + // Delete the persistent volume claim: file will be cleaned up and volume be re-created. + By("Deleting the persistent volume claim to clean up persistent volume and re-create one") + writeCmd := createWriteCmd(volumePath, testFile, testFileContent, testConfig.VolumeType) + err = issueNodeCommand(config, writeCmd, config.node0) + Expect(err).NotTo(HaveOccurred()) + err = config.client.CoreV1().PersistentVolumeClaims(claim.Namespace).Delete(claim.Name, &metav1.DeleteOptions{}) + Expect(err).NotTo(HaveOccurred()) + + By("Waiting for a new PersistentVolume to be re-created") + newPV, err := waitForLocalPersistentVolume(config.client, volumePath) + Expect(err).NotTo(HaveOccurred()) + Expect(newPV.UID).NotTo(Equal(oldPV.UID)) + fileDoesntExistCmd := createFileDoesntExistCmd(volumePath, testFile) + err = issueNodeCommand(config, fileDoesntExistCmd, config.node0) + Expect(err).NotTo(HaveOccurred()) + + cleanupLocalVolumeProvisionerMountPoint(config, testVol, config.node0) + }) + }) + } + // Provisioner negative tests + Context("Local volume provisioner [Serial]", func() { BeforeEach(func() { setupStorageClass(config, &immediateMode) - setupLocalVolumeProvisioner(config) - volumePath = path.Join(config.discoveryDir, fmt.Sprintf("vol-%v", string(uuid.NewUUID()))) - setupLocalVolumeProvisionerMountPoint(config, volumePath, config.node0) + setupLocalVolumeProvisioner(config, nil) + createProvisionerDaemonset(config) }) AfterEach(func() { - cleanupLocalVolumeProvisionerMountPoint(config, volumePath, config.node0) cleanupLocalVolumeProvisioner(config) cleanupStorageClass(config) - }) - - It("should create and recreate local persistent volume", func() { - By("Starting a provisioner daemonset") - createProvisionerDaemonset(config) - - By("Waiting for a PersistentVolume to be created") - oldPV, err := waitForLocalPersistentVolume(config.client, volumePath) - Expect(err).NotTo(HaveOccurred()) - - // Create a persistent volume claim for local volume: the above volume will be bound. - By("Creating a persistent volume claim") - claim, err := config.client.CoreV1().PersistentVolumeClaims(config.ns).Create(newLocalClaim(config)) - Expect(err).NotTo(HaveOccurred()) - err = framework.WaitForPersistentVolumeClaimPhase( - v1.ClaimBound, config.client, claim.Namespace, claim.Name, framework.Poll, 1*time.Minute) - Expect(err).NotTo(HaveOccurred()) - - claim, err = config.client.CoreV1().PersistentVolumeClaims(config.ns).Get(claim.Name, metav1.GetOptions{}) - Expect(err).NotTo(HaveOccurred()) - Expect(claim.Spec.VolumeName).To(Equal(oldPV.Name)) - - // Delete the persistent volume claim: file will be cleaned up and volume be re-created. - By("Deleting the persistent volume claim to clean up persistent volume and re-create one") - writeCmd := createWriteCmd(volumePath, testFile, testFileContent, DirectoryLocalVolumeType) - err = issueNodeCommand(config, writeCmd, config.node0) - Expect(err).NotTo(HaveOccurred()) - err = config.client.CoreV1().PersistentVolumeClaims(claim.Namespace).Delete(claim.Name, &metav1.DeleteOptions{}) - Expect(err).NotTo(HaveOccurred()) - - By("Waiting for a new PersistentVolume to be re-created") - newPV, err := waitForLocalPersistentVolume(config.client, volumePath) - Expect(err).NotTo(HaveOccurred()) - Expect(newPV.UID).NotTo(Equal(oldPV.UID)) - fileDoesntExistCmd := createFileDoesntExistCmd(volumePath, testFile) - err = issueNodeCommand(config, fileDoesntExistCmd, config.node0) - Expect(err).NotTo(HaveOccurred()) - - By("Deleting provisioner daemonset") deleteProvisionerDaemonset(config) }) - It("should not create local persistent volume for filesystem volume that was not bind mounted", func() { + It("should not create local persistent volume for filesystem volume that was not bind mounted", func() { directoryPath := filepath.Join(config.discoveryDir, "notbindmount") By("Creating a directory, not bind mounted, in discovery directory") mkdirCmd := fmt.Sprintf("mkdir -p %v -m 777", directoryPath) err := issueNodeCommand(config, mkdirCmd, config.node0) Expect(err).NotTo(HaveOccurred()) - By("Starting a provisioner daemonset") - createProvisionerDaemonset(config) - By("Allowing provisioner to run for 30s and discover potential local PVs") time.Sleep(30 * time.Second) @@ -229,33 +269,12 @@ var _ = utils.SIGDescribe("PersistentVolumes-local ", func() { expectedLogMessage := "Path \"/mnt/local-storage/notbindmount\" is not an actual mountpoint" Expect(strings.Contains(logs, expectedLogMessage)).To(BeTrue()) - - By("Deleting provisioner daemonset") - deleteProvisionerDaemonset(config) }) - It("should discover dynamically created local persistent volume mountpoint in discovery directory", func() { - By("Starting a provisioner daemonset") - createProvisionerDaemonset(config) - - By("Creating a volume in discovery directory") - dynamicVolumePath := path.Join(config.discoveryDir, fmt.Sprintf("vol-%v", string(uuid.NewUUID()))) - setupLocalVolumeProvisionerMountPoint(config, dynamicVolumePath, config.node0) - - By("Waiting for the PersistentVolume to be created") - _, err := waitForLocalPersistentVolume(config.client, dynamicVolumePath) - Expect(err).NotTo(HaveOccurred()) - - By("Deleting provisioner daemonset") - deleteProvisionerDaemonset(config) - - By("Deleting volume in discovery directory") - cleanupLocalVolumeProvisionerMountPoint(config, dynamicVolumePath, config.node0) - }) - }) + // Provisioner stress tests Context("Stress with local volume provisioner [Serial]", func() { - var testVols [][]string + var testVols [][]*localVolume const ( volsPerNode = 10 // Make this non-divisable by volsPerPod to increase changes of partial binding failure @@ -265,18 +284,18 @@ var _ = utils.SIGDescribe("PersistentVolumes-local ", func() { BeforeEach(func() { setupStorageClass(config, &waitMode) - setupLocalVolumeProvisioner(config) + setupLocalVolumeProvisioner(config, nil) - testVols = [][]string{} + testVols = [][]*localVolume{} for i, node := range config.nodes { By(fmt.Sprintf("Setting up local volumes on node %q", node.Name)) - paths := []string{} + vols := []*localVolume{} for j := 0; j < volsPerNode; j++ { - volumePath := path.Join(config.discoveryDir, fmt.Sprintf("vol-%v", string(uuid.NewUUID()))) - setupLocalVolumeProvisionerMountPoint(config, volumePath, &config.nodes[i]) - paths = append(paths, volumePath) + // volumePath := path.Join(config.discoveryDir, fmt.Sprintf("vol-%v", string(uuid.NewUUID()))) + testVol := setupLocalVolumeProvisionerMountPoint(config, &config.nodes[i], DirectoryLocalVolumeType) + vols = append(vols, testVol) } - testVols = append(testVols, paths) + testVols = append(testVols, vols) } By("Starting the local volume provisioner") @@ -287,9 +306,9 @@ var _ = utils.SIGDescribe("PersistentVolumes-local ", func() { By("Deleting provisioner daemonset") deleteProvisionerDaemonset(config) - for i, paths := range testVols { - for _, volumePath := range paths { - cleanupLocalVolumeProvisionerMountPoint(config, volumePath, &config.nodes[i]) + for i, vols := range testVols { + for _, vol := range vols { + cleanupLocalVolumeProvisionerMountPoint(config, vol, &config.nodes[i]) } } cleanupLocalVolumeProvisioner(config) @@ -405,12 +424,12 @@ func cleanupStorageClass(config *localTestConfig) { framework.ExpectNoError(config.client.StorageV1().StorageClasses().Delete(config.scName, nil)) } -func setupLocalVolumeProvisioner(config *localTestConfig) { +func setupLocalVolumeProvisioner(config *localTestConfig, testConfig *testConfig) { By("Bootstrapping local volume provisioner") createServiceAccount(config) createProvisionerClusterRoleBinding(config) utils.PrivilegedTestPSPClusterRoleBinding(config.client, config.ns, false /* teardown */, []string{testServiceAccount}) - createVolumeConfigMap(config) + createVolumeConfigMap(config, testConfig) for _, node := range config.nodes { By(fmt.Sprintf("Initializing local volume discovery base path on node %v", node.Name)) @@ -490,6 +509,37 @@ func createProvisionerClusterRoleBinding(config *localTestConfig) { Expect(err).NotTo(HaveOccurred()) _, err = config.client.RbacV1beta1().ClusterRoleBindings().Create(&nodeBinding) Expect(err).NotTo(HaveOccurred()) + + // job role and rolebinding + jobRole := rbacv1beta1.Role{ + ObjectMeta: metav1.ObjectMeta{ + Name: "local-storage-provisioner-jobs-role", + Namespace: config.ns, + }, + Rules: []rbacv1beta1.PolicyRule{ + { + APIGroups: []string{"batch"}, + Resources: []string{"jobs"}, + Verbs: []string{"*"}, + }, + }, + } + jobRoleBinding := rbacv1beta1.RoleBinding{ + ObjectMeta: metav1.ObjectMeta{ + Name: "local-storage-provisioner-jobs-rolebinding", + Namespace: config.ns, + }, + Subjects: subjects, + RoleRef: rbacv1beta1.RoleRef{ + APIGroup: "rbac.authorization.k8s.io", + Kind: "Role", + Name: jobRole.Name, + }, + } + _, err = config.client.RbacV1beta1().Roles(config.ns).Create(&jobRole) + Expect(err).NotTo(HaveOccurred()) + _, err = config.client.RbacV1beta1().RoleBindings(config.ns).Create(&jobRoleBinding) + Expect(err).NotTo(HaveOccurred()) } func deleteClusterRoleBinding(config *localTestConfig) { @@ -499,16 +549,60 @@ func deleteClusterRoleBinding(config *localTestConfig) { config.client.RbacV1beta1().ClusterRoleBindings().Delete(pvBindingName, metav1.NewDeleteOptions(0)) } -func setupLocalVolumeProvisionerMountPoint(config *localTestConfig, volumePath string, node *v1.Node) { - By(fmt.Sprintf("Creating local directory at path %q", volumePath)) - mkdirCmd := fmt.Sprintf("mkdir %v -m 777", volumePath) - err := issueNodeCommand(config, mkdirCmd, node) +func createAndSetupLoopDevice(config *localTestConfig, file string, node *v1.Node, size int) { + By(fmt.Sprintf("Creating block device on node %q using file %q", node.Name, file)) + count := size / 4096 + // xfs requires at least 4096 blocks + if count < 4096 { + count = 4096 + } + ddCmd := fmt.Sprintf("dd if=/dev/zero of=%s bs=4096 count=%d", file, count) + losetupCmd := fmt.Sprintf("sudo losetup -f %s", file) + err := issueNodeCommand(config, fmt.Sprintf("%s && %s", ddCmd, losetupCmd), node) Expect(err).NotTo(HaveOccurred()) +} - By(fmt.Sprintf("Mounting local directory at path %q", volumePath)) - mntCmd := fmt.Sprintf("sudo mount --bind %v %v", volumePath, volumePath) - err = issueNodeCommand(config, mntCmd, node) +func findLoopDevice(config *localTestConfig, file string, node *v1.Node) string { + cmd := fmt.Sprintf("E2E_LOOP_DEV=$(sudo losetup | grep %s | awk '{ print $1 }') 2>&1 > /dev/null && echo ${E2E_LOOP_DEV}", file) + loopDevResult, err := issueNodeCommandWithResult(config, cmd, node) Expect(err).NotTo(HaveOccurred()) + return strings.TrimSpace(loopDevResult) +} + +func setupLocalVolumeProvisionerMountPoint(config *localTestConfig, node *v1.Node, volumeType localVolumeType) *localVolume { + volumePath := path.Join(config.discoveryDir, fmt.Sprintf("vol-%v", string(uuid.NewUUID()))) + if volumeType == DirectoryLocalVolumeType { + By(fmt.Sprintf("Creating local directory at path %q", volumePath)) + mkdirCmd := fmt.Sprintf("mkdir %v -m 777", volumePath) + err := issueNodeCommand(config, mkdirCmd, node) + Expect(err).NotTo(HaveOccurred()) + + By(fmt.Sprintf("Mounting local directory at path %q", volumePath)) + mntCmd := fmt.Sprintf("sudo mount --bind %v %v", volumePath, volumePath) + err = issueNodeCommand(config, mntCmd, node) + Expect(err).NotTo(HaveOccurred()) + return &localVolume{ + volumePath: volumePath, + volumeType: volumeType, + } + } else if volumeType == BlockLocalVolumeType { + By("Creating a new loop device") + loopFile := fmt.Sprintf("/tmp/loop-%s", string(uuid.NewUUID())) + createAndSetupLoopDevice(config, loopFile, node, 20*1024*1024) + loopDev := findLoopDevice(config, loopFile, node) + + By(fmt.Sprintf("Linking %s at %s", loopDev, volumePath)) + cmd := fmt.Sprintf("sudo ln -s %s %s", loopDev, volumePath) + err := issueNodeCommand(config, cmd, node) + Expect(err).NotTo(HaveOccurred()) + return &localVolume{ + volumePath: volumePath, + volumeType: volumeType, + loopDev: loopDev, + loopFile: loopFile, + } + } + return nil } // launchNodeExecPodForLocalPV launches a hostexec pod for local PV and waits @@ -578,19 +672,26 @@ func issueNodeCommand(config *localTestConfig, cmd string, node *v1.Node) error return err } -func cleanupLocalVolumeProvisionerMountPoint(config *localTestConfig, volumePath string, node *v1.Node) { - By(fmt.Sprintf("Unmounting the test mount point from %q", volumePath)) - umountCmd := fmt.Sprintf("[ ! -e %v ] || sudo umount %v", volumePath, volumePath) - err := issueNodeCommand(config, umountCmd, node) - Expect(err).NotTo(HaveOccurred()) +func cleanupLocalVolumeProvisionerMountPoint(config *localTestConfig, vol *localVolume, node *v1.Node) { + if vol.volumeType == DirectoryLocalVolumeType { + By(fmt.Sprintf("Unmounting the test mount point from %q", vol.volumePath)) + umountCmd := fmt.Sprintf("[ ! -e %v ] || sudo umount %v", vol.volumePath, vol.volumePath) + err := issueNodeCommand(config, umountCmd, node) + Expect(err).NotTo(HaveOccurred()) - By("Removing the test mount point") - removeCmd := fmt.Sprintf("[ ! -e %v ] || rm -r %v", volumePath, volumePath) - err = issueNodeCommand(config, removeCmd, node) - Expect(err).NotTo(HaveOccurred()) + By("Removing the test mount point") + removeCmd := fmt.Sprintf("[ ! -e %v ] || rm -r %v", vol.volumePath, vol.volumePath) + err = issueNodeCommand(config, removeCmd, node) + Expect(err).NotTo(HaveOccurred()) + } else { + By(fmt.Sprintf("Tear down block device %q on node %q at path %s", vol.loopDev, node.Name, vol.loopFile)) + losetupDeleteCmd := fmt.Sprintf("sudo losetup -d %s && sudo rm %s", vol.loopDev, vol.loopFile) + err := issueNodeCommand(config, losetupDeleteCmd, node) + Expect(err).NotTo(HaveOccurred()) + } - By("Cleaning up persistent volume") - pv, err := findLocalPersistentVolume(config.client, volumePath) + By(fmt.Sprintf("Cleaning up persistent volume at %s", vol.volumePath)) + pv, err := findLocalPersistentVolume(config.client, vol.volumePath) Expect(err).NotTo(HaveOccurred()) if pv != nil { err = config.client.CoreV1().PersistentVolumes().Delete(pv.Name, &metav1.DeleteOptions{}) @@ -598,28 +699,26 @@ func cleanupLocalVolumeProvisionerMountPoint(config *localTestConfig, volumePath } } -func createVolumeConfigMap(config *localTestConfig) { - // MountConfig and ProvisionerConfiguration from - // https://github.com/kubernetes-incubator/external-storage/blob/master/local-volume/provisioner/pkg/common/common.go - type MountConfig struct { - // The hostpath directory - HostDir string `json:"hostDir" yaml:"hostDir"` - MountDir string `json:"mountDir" yaml:"mountDir"` - } - type ProvisionerConfiguration struct { - // StorageClassConfig defines configuration of Provisioner's storage classes - StorageClassConfig map[string]MountConfig `json:"storageClassMap" yaml:"storageClassMap"` - } - var provisionerConfig ProvisionerConfiguration - provisionerConfig.StorageClassConfig = map[string]MountConfig{ +func createVolumeConfigMap(config *localTestConfig, testConfig *testConfig) { + var provisionerConfig common.ProvisionerConfiguration + + provisionerConfig.StorageClassConfig = map[string]common.MountConfig{ config.scName: { - HostDir: config.discoveryDir, - MountDir: provisionerDefaultMountRoot, + HostDir: config.discoveryDir, + MountDir: provisionerDefaultMountRoot, + BlockCleanerCommand: []string{common.DefaultBlockCleanerCommand}, + VolumeMode: "Filesystem", }, } + configMapData := make(map[string]string) data, err := yaml.Marshal(&provisionerConfig.StorageClassConfig) Expect(err).NotTo(HaveOccurred()) + configMapData["storageClassMap"] = string(data) + + if testConfig != nil && testConfig.UseJobForCleaning { + configMapData["useJobForCleaning"] = "yes" + } configMap := v1.ConfigMap{ TypeMeta: metav1.TypeMeta{ @@ -630,9 +729,7 @@ func createVolumeConfigMap(config *localTestConfig) { Name: volumeConfigName, Namespace: config.ns, }, - Data: map[string]string{ - "storageClassMap": string(data), - }, + Data: configMapData, } _, err = config.client.CoreV1().ConfigMaps(config.ns).Create(&configMap) @@ -704,6 +801,10 @@ func createProvisionerDaemonset(config *localTestConfig) { }, }, }, + { + Name: "JOB_CONTAINER_IMAGE", + Value: provisionerImageName, + }, }, VolumeMounts: []v1.VolumeMount{ {