Skip to content

Commit

Permalink
Merge branch 'main' into flow_history
Browse files Browse the repository at this point in the history
  • Loading branch information
rowanseymour committed Mar 29, 2022
2 parents cc7dc01 + 22e00ff commit 3b7469d
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 5 deletions.
3 changes: 2 additions & 1 deletion cmd/rp-indexer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"os"
"os/signal"
"syscall"
"time"

"github.com/evalphobia/logrus_sentry"
_ "github.com/lib/pq"
Expand Down Expand Up @@ -59,7 +60,7 @@ func main() {
log.WithField("indexer", idxr.Name()).WithError(err).Fatal("error during rebuilding")
}
} else {
d := indexer.NewDaemon(cfg, db, idxrs)
d := indexer.NewDaemon(cfg, db, idxrs, time.Duration(cfg.Poll)*time.Second)
d.Start()

handleSignals(d)
Expand Down
10 changes: 6 additions & 4 deletions daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,20 @@ type Daemon struct {
wg *sync.WaitGroup
quit chan bool
indexers []indexers.Indexer
poll time.Duration

prevStats map[indexers.Indexer]indexers.Stats
}

// NewDaemon creates a new daemon to run the given indexers
func NewDaemon(cfg *Config, db *sql.DB, ixs []indexers.Indexer) *Daemon {
func NewDaemon(cfg *Config, db *sql.DB, ixs []indexers.Indexer, poll time.Duration) *Daemon {
return &Daemon{
cfg: cfg,
db: db,
wg: &sync.WaitGroup{},
quit: make(chan bool),
indexers: ixs,
poll: poll,
prevStats: make(map[indexers.Indexer]indexers.Stats, len(ixs)),
}
}
Expand All @@ -41,13 +43,13 @@ func (d *Daemon) Start() {
}

for _, i := range d.indexers {
d.startIndexer(i, time.Second*5)
d.startIndexer(i)
}

d.startStatsReporter(time.Minute)
}

func (d *Daemon) startIndexer(indexer indexers.Indexer, interval time.Duration) {
func (d *Daemon) startIndexer(indexer indexers.Indexer) {
d.wg.Add(1) // add ourselves to the wait group

log := logrus.WithField("indexer", indexer.Name())
Expand All @@ -62,7 +64,7 @@ func (d *Daemon) startIndexer(indexer indexers.Indexer, interval time.Duration)
select {
case <-d.quit:
return
case <-time.After(interval):
case <-time.After(d.poll):
_, err := indexer.Index(d.db, d.cfg.Rebuild, d.cfg.Cleanup)
if err != nil {
log.WithError(err).Error("error during indexing")
Expand Down

0 comments on commit 3b7469d

Please sign in to comment.