Skip to content

Commit

Permalink
Merge pull request #12 from Cian911/swb/add-queue-polling
Browse files Browse the repository at this point in the history
Add polling
  • Loading branch information
Cian911 authored Jan 10, 2022
2 parents efe3144 + 3773b5e commit 9703ddc
Show file tree
Hide file tree
Showing 9 changed files with 255 additions and 22 deletions.
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@ Switchboard works by monitoring a directory you provide (or list of directories)

See the video below as example. Here, I give switchboard a path to watch, a destination where I want matched files to move to, and the file extension of the type of files I want to move.


[![asciicast](https://asciinema.org/a/OwbnYltbn0jcSAGzfdmujwklJ.svg)](https://asciinema.org/a/OwbnYltbn0jcSAGzfdmujwklJ)

You can also visit https://goswitchboard.io/ for all your documentation needs, news, and updates!

[![asciicast](https://asciinema.org/a/cWSUfcUCU4Wd5rQEs5Detf7gn.svg)](https://asciinema.org/a/cWSUfcUCU4Wd5rQEs5Detf7gn)

### Installation

Expand Down Expand Up @@ -62,6 +64,7 @@ Flags:
-e, --ext string File type you want to watch for.
-h, --help help for watch
-p, --path string Path you want to watch.
--poll int Specify a polling time in seconds. (default 3)
```
To get started quickly, you can run the following command, passing in the path, destination, and file extenstion you want to watch for. See the example below.
Expand Down
10 changes: 7 additions & 3 deletions cli/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type Watcher struct {
// Ext is the file extention you want to watch for
Ext string `yaml:"ext"`
// Operation is the event operation you want to watch for
// CREATE, MODIFY, REMOVE, CHMOD etc.
// CREATE, MODIFY, REMOVE, CHMOD, WRITE itc.
Operation string `yaml:"operation"`
}

Expand Down Expand Up @@ -67,11 +67,13 @@ func initCmd(runCmd cobra.Command) {
runCmd.PersistentFlags().StringP("path", "p", "", "Path you want to watch.")
runCmd.PersistentFlags().StringP("destination", "d", "", "Path you want files to be relocated.")
runCmd.PersistentFlags().StringP("ext", "e", "", "File type you want to watch for.")
runCmd.PersistentFlags().IntP("poll", "", 3, "Specify a polling time in seconds.")
runCmd.PersistentFlags().StringVar(&configFile, "config", "", "Pass an optional config file containing multiple paths to watch.")

viper.BindPFlag("path", runCmd.PersistentFlags().Lookup("path"))
viper.BindPFlag("destination", runCmd.PersistentFlags().Lookup("destination"))
viper.BindPFlag("ext", runCmd.PersistentFlags().Lookup("ext"))
viper.BindPFlag("poll", runCmd.PersistentFlags().Lookup("poll"))

var rootCmd = &cobra.Command{}
rootCmd.AddCommand(&runCmd)
Expand Down Expand Up @@ -139,7 +141,7 @@ func registerMultiConsumers() {
}

log.Println("Observing")
pw.Observe()
pw.Observe(viper.GetInt("poll"))
}

func registerSingleConsumer() {
Expand All @@ -154,5 +156,7 @@ func registerSingleConsumer() {
}

pw.Register(&pc)
pw.Observe()

log.Println("Observing")
pw.Observe(viper.GetInt("poll"))
}
6 changes: 5 additions & 1 deletion event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"log"
"os"
"time"

"github.com/cian911/switchboard/utils"
)
Expand All @@ -22,6 +23,8 @@ type Event struct {
Operation string
// IsDir is the new create vent a directory
IsDir bool
// Timestamp in unix time epoch
Timestamp time.Time
}

// New creates and returns a new event struct
Expand All @@ -31,6 +34,7 @@ func New(file, path, dest, ext string) *Event {
Path: path,
Destination: dest,
Ext: ext,
Timestamp: time.Now(),
}
}

Expand All @@ -44,7 +48,7 @@ func (e *Event) Move(path, file string) error {

// IsValidEvent checks if the event operation and file extension is valid
func (e *Event) IsValidEvent(ext string) bool {
if ext == e.Ext && e.Operation == "CREATE" {
if ext == e.Ext && e.Operation == "CREATE" || e.Operation == "WRITE" {
return true
}

Expand Down
27 changes: 27 additions & 0 deletions watcher/poller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package watcher

import (
"log"
"time"
)

// Poll polls the queue for valid events given an interval (in seconds)
func (pw *PathWatcher) Poll(interval int) {
go func() {
ticker := time.NewTicker(time.Duration(interval) * time.Second)
for {
select {
case <-ticker.C:
log.Printf("Polling... - Queue Size: %d\n", pw.Queue.Size())

for hsh, ev := range pw.Queue.Queue {
timeDiff := ev.Timestamp.Sub(time.Now())
if timeDiff < (time.Duration(-interval) * time.Second) {
pw.Notify(ev.Path, ev.Operation)
pw.Queue.Remove(hsh)
}
}
}
}
}()
}
35 changes: 35 additions & 0 deletions watcher/poller_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package watcher

import (
"testing"
"time"
)

var (
pollInterval = 1
)

func TestPoller(t *testing.T) {
t.Run("It successfully notifies of a new event", func(t *testing.T) {
pw := setupPathwatcher("/tmp")
pw.Poll(pollInterval)

ev := eventSetup(t)
pw.Queue.Add(*ev)

if pw.Queue.Size() != 1 {
t.Errorf("Queue size did not increase. want=%d, got=%d", 1, pw.Queue.Size())
}
<-time.After(3 * time.Second)

if pw.Queue.Size() != 0 {
t.Errorf("Queue size did not decrease. want=%d, got=%d", 0, pw.Queue.Size())
}
})
}

func setupPathwatcher(path string) *PathWatcher {
return &PathWatcher{
Queue: NewQueue(),
}
}
51 changes: 51 additions & 0 deletions watcher/queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package watcher

import (
"crypto/md5"
"fmt"

"github.com/cian911/switchboard/event"
)

// Q holds the Queue
type Q struct {
Queue map[string]event.Event
}

// NewQueue create a new Q object
func NewQueue() *Q {
return &Q{
Queue: make(map[string]event.Event),
}
}

// Add adds to the queue
func (q *Q) Add(ev event.Event) {
q.Queue[Hash(ev)] = ev
}

// Retrieve get an item from the queue given a valid hash
func (q *Q) Retrieve(hash string) event.Event {
return q.Queue[hash]
}

// Remove removes an item from the queue
func (q *Q) Remove(hash string) {
delete(q.Queue, hash)
}

// Size returns the size of the queue
func (q *Q) Size() int {
return len(q.Queue)
}

// Empty returns a bool indicating if the queue is empty or not
func (q *Q) Empty() bool {
return len(q.Queue) == 0
}

// Hash returns a md5 hash composed of an event File, Path, and Ext
func Hash(ev event.Event) string {
data := []byte(fmt.Sprintf("%s%s%s", ev.File, ev.Path, ev.Ext))
return fmt.Sprintf("%x", md5.Sum(data))
}
94 changes: 94 additions & 0 deletions watcher/queue_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package watcher

import (
"reflect"
"testing"
"time"

"github.com/cian911/switchboard/event"
)

var (
gFile = "sample.txt"
gPath = "/var/sample.txt"
gExt = ".txt"
)

func TestQueue(t *testing.T) {
t.Run("It adds one event to the queue", func(t *testing.T) {
q := setupQueue()
ev := testEvent(gFile, gPath, gExt)

q.Add(*ev)

if q.Size() != 1 {
t.Errorf("Could size did not increase as expected. want=%d, got=%d", 1, q.Size())
}
})

t.Run("It updates the event in the queue", func(t *testing.T) {
q := setupQueue()
ev := testEvent(gFile, gPath, gExt)

q.Add(*ev)
q.Add(*ev)
q.Add(*ev)

if q.Size() != 1 {
// Queue size should not increase
t.Errorf("Could size did not increase as expected. want=%d, got=%d", 1, q.Size())
}
})

t.Run("It gets an item from the queue", func(t *testing.T) {
q := setupQueue()
ev := testEvent(gFile, gPath, gExt)

hash := Hash(*ev)
q.Add(*ev)
e := q.Retrieve(hash)

if !reflect.DeepEqual(ev, &e) {
t.Errorf("Events are not the same. want=%v, got=%v", ev, e)
}
})

t.Run("It removes an item from the queue", func(t *testing.T) {
q := setupQueue()
ev := testEvent(gFile, gPath, gExt)

hash := Hash(*ev)
q.Add(*ev)
q.Remove(hash)

if q.Size() != 0 {
t.Errorf("Could size did not increase as expected. want=%d, got=%d", 0, q.Size())
}
})

t.Run("It returns a unique hash for a given event", func(t *testing.T) {
ev1 := testEvent(gFile, gPath, gExt)
ev2 := testEvent("sample2.txt", "/var/sample2.txt", ".txt")

h1 := Hash(*ev1)
h2 := Hash(*ev2)

if h1 == h2 {
t.Errorf("Hashes are the same when they shouldn't be. want=%s, got=%s", h1, h2)
}
})
}

func setupQueue() *Q {
return NewQueue()
}

func testEvent(file, path, ext string) *event.Event {
return &event.Event{
File: file,
Path: path,
Ext: ext,
Timestamp: time.Now(),
}

}
Loading

0 comments on commit 9703ddc

Please sign in to comment.