Skip to content

Commit

Permalink
Implement a close function in Cassandra to allow the user to close th…
Browse files Browse the repository at this point in the history
…e fswatcher go routine

Signed-off-by: kennyaz <[email protected]>
  • Loading branch information
kennyaz committed Jun 12, 2023
1 parent 274d6c4 commit 5dfcaa0
Show file tree
Hide file tree
Showing 5 changed files with 10 additions and 12 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ require (
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.42.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.16.0 // indirect
go.opentelemetry.io/proto/otlp v0.19.0 // indirect
go.uber.org/goleak v1.2.1 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/crypto v0.9.0 // indirect
golang.org/x/text v0.9.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A=
github.com/go-logfmt/logfmt v0.5.1 h1:otpy5pqBCBZ1ng9RQ0dPu4PN7ba75Y/aA+UpowDyNVA=
github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.2.4 h1:g01GSCwiDw2xSZfjJ2/T9M+S6pFdcNtFYsp+Y43HYDQ=
github.com/go-logr/logr v1.2.4/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
Expand Down Expand Up @@ -778,6 +779,7 @@ go.uber.org/automaxprocs v1.5.2 h1:2LxUOGiR3O6tw8ui5sZa2LAaHnsviZdVOUZw4fvbnME=
go.uber.org/automaxprocs v1.5.2/go.mod h1:eRbA25aqJrxAbsLO0xy5jVwPt7FQnRgjW+efnwa1WM0=
go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A=
go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4=
go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
Expand Down
6 changes: 6 additions & 0 deletions pkg/cassandra/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package config

import (
"fmt"
"sync/atomic"
"time"

"github.com/gocql/gocql"
Expand Down Expand Up @@ -45,6 +46,7 @@ type Configuration struct {
Authenticator Authenticator `yaml:"authenticator" mapstructure:",squash"`
DisableAutoDiscovery bool `yaml:"disable_auto_discovery" mapstructure:"-"`
TLS tlscfg.Options `mapstructure:"tls"`
isClosed atomic.Bool
}

// Authenticator holds the authentication properties needed to connect to a Cassandra cluster
Expand Down Expand Up @@ -163,6 +165,10 @@ func (c *Configuration) NewCluster(logger *zap.Logger) (*gocql.ClusterConfig, er
return cluster, nil
}

func (c *Configuration) Close() error {
return c.TLS.Close()
}

func (c *Configuration) String() string {
return fmt.Sprintf("%+v", *c)
}
13 changes: 1 addition & 12 deletions pkg/fswatcher/fswatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"path"
"path/filepath"
"sync"
"sync/atomic"

"github.com/fsnotify/fsnotify"
"go.uber.org/zap"
Expand All @@ -34,8 +33,6 @@ type FSWatcher struct {
fileHashContentMap map[string]string
onChange func()
mu sync.RWMutex
stop chan struct{}
isClosed atomic.Bool //default value is False
wg sync.WaitGroup
}

Expand Down Expand Up @@ -71,7 +68,6 @@ func New(filepaths []string, onChange func(), logger *zap.Logger) (*FSWatcher, e
logger: logger,
fileHashContentMap: make(map[string]string),
onChange: onChange,
stop: make(chan struct{}),
}

if err = w.setupWatchedPaths(filepaths); err != nil {
Expand Down Expand Up @@ -135,24 +131,17 @@ func (w *FSWatcher) watch() {
w.onChange()
}
case err, ok := <-w.watcher.Errors:
w.wg.Done()
if !ok {
return
}
w.logger.Error("fsnotifier reported an error", zap.Error(err))
case <-w.stop:
w.wg.Done()
return
}
}
}

// Close closes the watcher and stops the background go routine of FSWatcher.
func (w *FSWatcher) Close() error {
if w.isClosed.CompareAndSwap(false, true) {
close(w.stop)
w.wg.Wait()
}

return w.watcher.Close()
}

Expand Down
Binary file added tests
Binary file not shown.

0 comments on commit 5dfcaa0

Please sign in to comment.