-
Notifications
You must be signed in to change notification settings - Fork 182
Implementing watcher & reboot stability for data cache to master branch. #1946
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -58,7 +58,7 @@ require ( | |
| github.com/davecgh/go-spew v1.1.1 // indirect | ||
| github.com/emicklei/go-restful v2.9.5+incompatible // indirect | ||
| github.com/felixge/httpsnoop v1.0.4 // indirect | ||
| github.com/fsnotify/fsnotify v1.5.4 // indirect | ||
| github.com/fsnotify/fsnotify v1.8.0 // indirect | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this updated manually or it is updated via go mod vendor and go mod tidy?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it was from "go get github.com/fsnotify/fsnotify" |
||
| github.com/go-logr/logr v1.4.2 // indirect | ||
| github.com/go-logr/stdr v1.2.2 // indirect | ||
| github.com/go-openapi/jsonpointer v0.20.0 // indirect | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -9,6 +9,7 @@ import ( | |
| "strings" | ||
|
|
||
| csi "github.com/container-storage-interface/spec/lib/go/csi" | ||
| fsnotify "github.com/fsnotify/fsnotify" | ||
| metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
| "k8s.io/client-go/kubernetes" | ||
| "k8s.io/client-go/rest" | ||
|
|
@@ -66,7 +67,7 @@ func setupCaching(devicePath string, req *csi.NodeStageVolumeRequest, nodeId str | |
| // Clean up Volume Group before adding the PD | ||
| reduceVolumeGroup(volumeGroupName, true) | ||
| } else { | ||
| err := createVg(volumeGroupName, devicePath, raidedLocalSsdPath) | ||
| err := createVg(volumeGroupName, raidedLocalSsdPath) | ||
| if err != nil { | ||
| return mainDevicePath, err | ||
| } | ||
|
|
@@ -430,7 +431,7 @@ func getLvName(suffix string, volumeId string) string { | |
| return fmt.Sprintf("%s-%s", suffix, pvcName) | ||
| } | ||
|
|
||
| func createVg(volumeGroupName string, devicePath string, raidedLocalSsds string) error { | ||
| func createVg(volumeGroupName string, raidedLocalSsds string) error { | ||
| args := []string{ | ||
| "--zero", | ||
| "y", | ||
|
|
@@ -547,3 +548,97 @@ func fetchChunkSizeKiB(cacheSize string) (string, error) { | |
| // default chunk size unit KiB | ||
| return strconv.FormatInt(int64(chunkSize), 10) + "KiB", nil | ||
| } | ||
|
|
||
| func InitializeDataCacheNode(nodeId string) error { | ||
| raidedLocalSsdPath, err := fetchRAIDedLocalSsdPath() | ||
| if err != nil { | ||
| return err | ||
| } | ||
| volumeGroupName := getVolumeGroupName(nodeId) | ||
|
|
||
| vgExists := checkVgExists(volumeGroupName) | ||
| // Check if the required volume group already exists | ||
| if vgExists { | ||
| // Clean up Volume Group before adding the PD | ||
| reduceVolumeGroup(volumeGroupName, true) | ||
|
|
||
| // validate that raidedLSSD is part of VG | ||
| err = validateRaidedLSSDinVG(volumeGroupName, raidedLocalSsdPath) | ||
| if err != nil { | ||
| return fmt.Errorf("failed validate local ssd in vg %v: %v", volumeGroupName, err) | ||
| } | ||
| } else { | ||
| err := createVg(volumeGroupName, raidedLocalSsdPath) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
| func StartWatcher(nodeName string) { | ||
| dirToWatch := "/dev/" | ||
|
sunnylovestiramisu marked this conversation as resolved.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe add some logs to indicate the start of a watcher.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added a log. |
||
| watcher, err := fsnotify.NewWatcher() | ||
| if err != nil { | ||
| klog.V(2).ErrorS(err, "errored while creating watcher") | ||
| } | ||
| klog.V(2).Infof("Watcher started for directory %v", dirToWatch) | ||
| defer watcher.Close() | ||
|
|
||
| // out of the box fsnotify can watch a single file, or a single directory | ||
| if err := watcher.Add(dirToWatch); err != nil { | ||
| klog.V(2).ErrorS(err, "errored while adding watcher directory") | ||
| } | ||
| errorCh := make(chan error, 1) | ||
| // Handle the error received from the watcher goroutine | ||
| go watchDiskDetaches(watcher, nodeName, errorCh) | ||
|
|
||
| select { | ||
| case err := <-errorCh: | ||
| klog.Errorf("watcher encountered an error: %v", err) | ||
| } | ||
| } | ||
|
|
||
| func watchDiskDetaches(watcher *fsnotify.Watcher, nodeName string, errorCh chan error) error { | ||
| for { | ||
| select { | ||
| // watch for errors | ||
| case err := <-watcher.Errors: | ||
| errorCh <- fmt.Errorf("disk update event errored: %v", err) | ||
| // watch for events | ||
| case event := <-watcher.Events: | ||
| // In case of an event i.e. creation or deletion of any new PV, we update the VG metadata. | ||
| // This might include some non-LVM changes, no harm in updating metadata multiple times. | ||
| reduceVolumeGroup(getVolumeGroupName(nodeName), true) | ||
| klog.V(2).Infof("disk attach/detach event %#v\n", event) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| func validateRaidedLSSDinVG(vgName string, lssdPath string) error { | ||
| args := []string{ | ||
| "--noheadings", | ||
| "-o", | ||
| "pv_name", | ||
| "--select", | ||
| "vg_name=" + vgName, | ||
| } | ||
| info, err := common.RunCommand("" /* pipedCmd */, nil /* pipedCmdArg */, "pvs", args...) | ||
| if err != nil { | ||
| return fmt.Errorf("errored while checking physical volume details %v: %s", err, info) | ||
| // On error info contains the error message which we cannot use for further steps | ||
| } | ||
|
|
||
| if !strings.Contains(string(info), lssdPath) { | ||
| return addRaidedLSSDToVg(vgName, lssdPath) | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
| func addRaidedLSSDToVg(vgName, lssdPath string) error { | ||
| info, err := common.RunCommand("" /* pipedCmd */, nil /* pipedCmdArg */, "vgextend", []string{vgName, lssdPath}...) | ||
| if err != nil { | ||
| return fmt.Errorf("errored while extending VGs %v: %s", err, info) | ||
| } | ||
| return nil | ||
| } | ||
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Uh oh!
There was an error while loading. Please reload this page.