Skip to content

Commit

Permalink
Fixes to file/folder watching (#29)
Browse files Browse the repository at this point in the history
  • Loading branch information
mokiat authored Oct 13, 2023
1 parent e027547 commit 1e5ebba
Show file tree
Hide file tree
Showing 4 changed files with 177 additions and 131 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.21
toolchain go1.21.1

require (
github.com/fsnotify/fsnotify v1.6.0
github.com/fsnotify/fsnotify v1.6.1-0.20230713180834-9342b6df5779
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510
github.com/google/uuid v1.3.1
github.com/mokiat/gog v0.11.0
Expand Down
5 changes: 2 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46t
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY=
github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw=
github.com/fsnotify/fsnotify v1.6.1-0.20230713180834-9342b6df5779 h1:QfgYVn8mwrJRJYkoKxYKon2n/XT90QULsLMvvchhth0=
github.com/fsnotify/fsnotify v1.6.1-0.20230713180834-9342b6df5779/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM=
github.com/go-logr/logr v1.2.4 h1:g01GSCwiDw2xSZfjJ2/T9M+S6pFdcNtFYsp+Y43HYDQ=
github.com/go-logr/logr v1.2.4/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 h1:tfuBGBXKqDEevZMzYi5KSi8KkcZtzBcTgAUUtapy0OI=
Expand Down Expand Up @@ -44,7 +44,6 @@ golang.org/x/net v0.16.0 h1:7eBu7KsSvFDtSXUIDbh3aqlK4DPsZ1rByC8PFfBThos=
golang.org/x/net v0.16.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
golang.org/x/sync v0.4.0 h1:zxkM55ReGkDlKSM+Fu41A+zmbZuaPVbGMzvvdUPznYQ=
golang.org/x/sync v0.4.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE=
golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k=
Expand Down
1 change: 1 addition & 0 deletions internal/filesystem/filter_tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ func NewFilterTree() *FilterTree {
// FilterTree is a data structure that can be used to mark specific filesystem
// paths as accepted and others as rejected. This can also be achieved through
// global glob patterns.
//
// The structure then provides a means through which one can test whether
// a given file path is accepted or rejected by the filter.
type FilterTree struct {
Expand Down
300 changes: 173 additions & 127 deletions internal/pipeline/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/fsnotify/fsnotify"

"github.com/mokiat/gocrane/internal/filesystem"
"github.com/mokiat/gog/ds"
)

func Watch(
Expand All @@ -21,9 +22,6 @@ func Watch(
bootstrapEvent *ChangeEvent,

) func() error {
isEventType := func(event fsnotify.Event, eType fsnotify.Op) bool {
return event.Op&eType == eType
}

return func() error {
if bootstrapEvent != nil {
Expand All @@ -38,148 +36,196 @@ func Watch(
}
defer watcher.Close()

watchedPaths := make(map[string]struct{})
proc := &watchProcess{
verbose: verbose,
watcher: watcher,
watchFilter: watchFilter,
trackedPaths: ds.NewSet[string](1024),
}

watchPath := func(root string) map[string]struct{} {
result := make(map[string]struct{})
// Bootstrap watching.
for _, dir := range dirs {
proc.startWatching(dir)
}

filesystem.Traverse(root, func(p string, isDir bool, err error) error {
if err != nil {
log.Printf("Error traversing %q: %v", p, err)
return filesystem.ErrSkip
}
absPath, err := filesystem.ToAbsolutePath(p)
if err != nil {
log.Printf("Error converting path %q to absolute: %v", p, err)
return filesystem.ErrSkip
}
if _, ok := watchedPaths[absPath]; ok {
return filesystem.ErrSkip
}
if !watchFilter.IsAccepted(absPath) {
return filesystem.ErrSkip
}
if isDir {
if err := watcher.Add(absPath); err != nil {
log.Printf("Error adding watch to %q: %v", absPath, err)
return filesystem.ErrSkip
}
}
watchedPaths[absPath] = struct{}{}
result[absPath] = struct{}{}
for {
select {
case <-ctx.Done():
return nil
})
if verbose {
for path := range result {
log.Printf("Watching %q", path)
case event := <-watcher.Events:
changedPaths := proc.handleEvent(event)
if changedPaths != nil && !changedPaths.IsEmpty() {
out.Push(ctx, ChangeEvent{
Paths: changedPaths.Items(),
})
}
case err := <-watcher.Errors:
proc.logFSWatchError(err)
}
return result
}
}
}

unwatchPath := func(root string) map[string]struct{} {
result := make(map[string]struct{})
for p := range watchedPaths {
if strings.HasPrefix(p, root) {
result[p] = struct{}{}
// NOTE: Regardless what the documentation says, we NEED to
// try and explicitly Remove the watch as otherwise there are some
// race condition bugs.
//
// Example on Linux:
// 1. Create folder ./foo
// 2. Create file ./foo/bar
// 3. Delete folder ./foo
// 4. Create folder ./foo
// 5. Create file ./foo/bar
// The file `bar` is indicated as being located at `/bar`
// by the watcher. This bug does not appear of we remove the
// watch explicitly.
err := watcher.Remove(p)
if err == nil || errors.Is(err, fsnotify.ErrNonExistentWatch) {
delete(watchedPaths, p)
if verbose {
log.Printf("Unwatched %q", p)
}
} else {
log.Printf("Error removing watch from %q: %v", p, err)
}
}
}
return result
}
type watchProcess struct {
verbose bool
watcher *fsnotify.Watcher
watchFilter *filesystem.FilterTree

processFSEvent := func(event fsnotify.Event) {
if verbose {
log.Printf("Filesystem watch event: %s", event)
}
absPath, err := filesystem.ToAbsolutePath(event.Name)
if err != nil {
log.Printf("Error processing path: %v", err)
return
}
if !watchFilter.IsAccepted(absPath) {
if verbose {
log.Printf("Skipping excluded path %q from processing", absPath)
}
return
}
trackedPaths *ds.Set[string]
}

switch {
case isEventType(event, fsnotify.Create):
paths := watchPath(absPath)
event := ChangeEvent{
Paths: make([]string, 0, len(paths)),
}
for path := range paths {
event.Paths = append(event.Paths, path)
}
out.Push(ctx, event)
func (proc *watchProcess) handleEvent(event fsnotify.Event) *ds.Set[string] {
proc.logFSWatchEvent(event)

case isEventType(event, fsnotify.Rename):
// Rename is produced on Linux when a file is deleted.
paths := unwatchPath(absPath)
event := ChangeEvent{
Paths: make([]string, 0, len(paths)),
}
for path := range paths {
event.Paths = append(event.Paths, path)
}
out.Push(ctx, event)
absPath, err := filesystem.ToAbsolutePath(event.Name)
if err != nil {
proc.logPathAbsConvertError(event.Name, err)
return nil
}

case isEventType(event, fsnotify.Remove):
paths := unwatchPath(absPath)
event := ChangeEvent{
Paths: make([]string, 0, len(paths)),
}
for path := range paths {
event.Paths = append(event.Paths, path)
}
out.Push(ctx, event)
if !proc.shouldTrack(absPath) {
proc.logExcludedPathWatchSkip(absPath)
return nil
}

case isEventType(event, fsnotify.Chmod):
// We do nothing on these, since MacOS produces a lot of them.
switch {
case event.Has(fsnotify.Create):
return proc.startWatching(absPath)

default:
out.Push(ctx, ChangeEvent{
Paths: []string{absPath},
})
}
case event.Has(fsnotify.Rename):
// Rename is produced on Linux when a file is deleted.
return proc.stopWatching(absPath)

case event.Has(fsnotify.Remove):
return proc.stopWatching(absPath)

case event.Has(fsnotify.Chmod):
// We do nothing on these, since MacOS produces a lot of them.
return nil

default:
return ds.SetFromSlice([]string{absPath})
}
}

func (proc *watchProcess) startWatching(root string) *ds.Set[string] {
result := ds.NewSet[string](1)

filesystem.Traverse(root, func(p string, isDir bool, err error) error {
if err != nil {
proc.logTraverseError(p, err)
return filesystem.ErrSkip
}

for _, dir := range dirs {
watchPath(dir)
absPath, err := filesystem.ToAbsolutePath(p)
if err != nil {
proc.logPathAbsConvertError(p, err)
return filesystem.ErrSkip
}

for {
select {
case <-ctx.Done():
return nil
case event := <-watcher.Events:
processFSEvent(event)
case err := <-watcher.Errors:
log.Printf("Filesystem watcher error: %v", err)
if proc.isTracked(absPath) {
return filesystem.ErrSkip
}

if !proc.shouldTrack(absPath) {
return filesystem.ErrSkip
}

if isDir {
if err := proc.watcher.Add(absPath); err != nil {
proc.logFSWatchAddError(absPath, err)
return filesystem.ErrSkip
}
}

proc.trackPath(absPath)
result.Add(absPath)
return nil
})

for path := range result.Unbox() {
proc.logStartWatching(path)
}
return result
}

func (proc *watchProcess) stopWatching(root string) *ds.Set[string] {
result := ds.NewSet[string](1)

for p := range proc.trackedPaths.Unbox() {
if strings.HasPrefix(p, root) {
result.Add(p)
err := proc.watcher.Remove(p)
if err == nil || errors.Is(err, fsnotify.ErrNonExistentWatch) {
proc.untrackPath(p)
} else {
proc.logFSWatchRemoveError(p, err)
}
}
}

for path := range result.Unbox() {
proc.logStopWatching(path)
}
return result
}

func (proc *watchProcess) shouldTrack(path string) bool {
return proc.watchFilter.IsAccepted(path)
}

func (proc *watchProcess) trackPath(path string) {
proc.trackedPaths.Add(path)
}

func (proc *watchProcess) untrackPath(path string) {
proc.trackedPaths.Remove(path)
}

func (proc *watchProcess) isTracked(path string) bool {
return proc.trackedPaths.Contains(path)
}

func (proc *watchProcess) logFSWatchEvent(event fsnotify.Event) {
if proc.verbose {
log.Printf("Filesystem watch event: %s", event)
}
}

func (proc *watchProcess) logFSWatchAddError(path string, err error) {
log.Printf("Error adding watch to %q: %v", path, err)
}

func (proc *watchProcess) logFSWatchRemoveError(path string, err error) {
log.Printf("Error removing watch from %q: %v", path, err)
}

func (proc *watchProcess) logFSWatchError(err error) {
log.Printf("Filesystem watch error: %v", err)
}

func (proc *watchProcess) logStartWatching(path string) {
if proc.verbose {
log.Printf("Now watching %q", path)
}
}

func (proc *watchProcess) logStopWatching(path string) {
if proc.verbose {
log.Printf("No longer watching %q", path)
}
}

func (proc *watchProcess) logTraverseError(path string, err error) {
log.Printf("Error traversing %q: %v", path, err)
}

func (proc *watchProcess) logPathAbsConvertError(path string, err error) {
log.Printf("Error converting path %q to absolute: %v", path, err)
}

func (proc *watchProcess) logExcludedPathWatchSkip(path string) {
if proc.verbose {
log.Printf("Skipping excluded path %q from processing", path)
}
}

0 comments on commit 1e5ebba

Please sign in to comment.