Skip to content

Commit 05c06b0

Browse files
committed
Add stats reporting cron task and optional librato config
1 parent 3a051b3 commit 05c06b0

File tree

4 files changed

+75
-22
lines changed

4 files changed

+75
-22
lines changed

daemon.go

+57-7
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,19 @@ type Daemon struct {
1616
wg *sync.WaitGroup
1717
quit chan bool
1818
indexers []indexers.Indexer
19+
20+
prevStats map[indexers.Indexer]indexers.Stats
1921
}
2022

2123
// NewDaemon creates a new daemon to run the given indexers
2224
func NewDaemon(cfg *Config, db *sql.DB, ixs []indexers.Indexer) *Daemon {
2325
return &Daemon{
24-
cfg: cfg,
25-
db: db,
26-
wg: &sync.WaitGroup{},
27-
quit: make(chan bool),
28-
indexers: ixs,
26+
cfg: cfg,
27+
db: db,
28+
wg: &sync.WaitGroup{},
29+
quit: make(chan bool),
30+
indexers: ixs,
31+
prevStats: make(map[indexers.Indexer]indexers.Stats, len(ixs)),
2932
}
3033
}
3134

@@ -40,14 +43,18 @@ func (d *Daemon) Start() {
4043
for _, i := range d.indexers {
4144
d.startIndexer(i, time.Second*5)
4245
}
46+
47+
d.startStatsReporter(time.Minute)
4348
}
4449

4550
func (d *Daemon) startIndexer(indexer indexers.Indexer, interval time.Duration) {
4651
d.wg.Add(1) // add ourselves to the wait group
4752

53+
log := logrus.WithField("indexer", indexer.Name())
54+
4855
go func() {
4956
defer func() {
50-
logrus.WithField("indexer", indexer.Name()).Info("indexer exiting")
57+
log.Info("indexer exiting")
5158
d.wg.Done()
5259
}()
5360

@@ -58,13 +65,56 @@ func (d *Daemon) startIndexer(indexer indexers.Indexer, interval time.Duration)
5865
case <-time.After(interval):
5966
_, err := indexer.Index(d.db, d.cfg.Rebuild, d.cfg.Cleanup)
6067
if err != nil {
61-
logrus.WithField("index", d.cfg.Index).WithError(err).Error("error during indexing")
68+
log.WithError(err).Error("error during indexing")
6269
}
6370
}
6471
}
6572
}()
6673
}
6774

75+
func (d *Daemon) startStatsReporter(interval time.Duration) {
76+
d.wg.Add(1) // add ourselves to the wait group
77+
78+
go func() {
79+
defer func() {
80+
logrus.Info("analytics exiting")
81+
d.wg.Done()
82+
}()
83+
84+
for {
85+
select {
86+
case <-d.quit:
87+
return
88+
case <-time.After(interval):
89+
d.reportStats()
90+
}
91+
}
92+
}()
93+
}
94+
95+
func (d *Daemon) reportStats() {
96+
metrics := make(map[string]float64, len(d.indexers)*2)
97+
98+
for _, ix := range d.indexers {
99+
stats := ix.Stats()
100+
prev := d.prevStats[ix]
101+
102+
metrics[ix.Name()+"_indexed"] = float64(stats.Indexed - prev.Indexed)
103+
metrics[ix.Name()+"_deleted"] = float64(stats.Deleted - prev.Deleted)
104+
105+
d.prevStats[ix] = stats
106+
}
107+
108+
log := logrus.NewEntry(logrus.StandardLogger())
109+
110+
for k, v := range metrics {
111+
librato.Gauge("indexer."+k, v)
112+
log = log.WithField(k, v)
113+
}
114+
115+
log.Info("stats reported")
116+
}
117+
68118
// Stop stops this daemon
69119
func (d *Daemon) Stop() {
70120
logrus.Info("daemon stopping")

indexers/base.go

+13-10
Original file line numberDiff line numberDiff line change
@@ -20,21 +20,24 @@ const indexCommand = `{ "index": { "_id": %d, "_type": "_doc", "version": %d, "v
2020
// deletes a document
2121
const deleteCommand = `{ "delete" : { "_id": %d, "_type": "_doc", "version": %d, "version_type": "external", "routing": %d} }`
2222

23+
type Stats struct {
24+
Indexed int64 // total number of documents indexed
25+
Deleted int64 // total number of documents deleted
26+
Elapsed time.Duration // total time spent actually indexing
27+
}
28+
2329
// Indexer is base interface for indexers
2430
type Indexer interface {
2531
Name() string
2632
Index(db *sql.DB, rebuild, cleanup bool) (string, error)
27-
Stats() (int64, int64, time.Duration)
33+
Stats() Stats
2834
}
2935

3036
type baseIndexer struct {
3137
elasticURL string
3238
name string // e.g. contacts, used as the alias
3339

34-
// statistics
35-
indexedTotal int64
36-
deletedTotal int64
37-
elapsedTotal time.Duration
40+
stats Stats
3841
}
3942

4043
func newBaseIndexer(elasticURL, name string) baseIndexer {
@@ -45,8 +48,8 @@ func (i *baseIndexer) Name() string {
4548
return i.name
4649
}
4750

48-
func (i *baseIndexer) Stats() (int64, int64, time.Duration) {
49-
return i.indexedTotal, i.deletedTotal, i.elapsedTotal
51+
func (i *baseIndexer) Stats() Stats {
52+
return i.stats
5053
}
5154

5255
func (i *baseIndexer) log() *logrus.Entry {
@@ -55,9 +58,9 @@ func (i *baseIndexer) log() *logrus.Entry {
5558

5659
// records a complete index and updates statistics
5760
func (i *baseIndexer) recordComplete(indexed, deleted int, elapsed time.Duration) {
58-
i.indexedTotal += int64(indexed)
59-
i.deletedTotal += int64(deleted)
60-
i.elapsedTotal += elapsed
61+
i.stats.Indexed += int64(indexed)
62+
i.stats.Deleted += int64(deleted)
63+
i.stats.Elapsed += elapsed
6164

6265
i.log().WithField("indexed", indexed).WithField("deleted", deleted).WithField("elapsed", elapsed).Info("completed indexing")
6366
}

indexers/base_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ func assertIndexesWithPrefix(t *testing.T, es *elastic.Client, prefix string, ex
7878
}
7979

8080
func assertIndexerStats(t *testing.T, ix indexers.Indexer, expectedIndexed, expectedDeleted int64) {
81-
actualIndexed, actualDeleted, _ := ix.Stats()
82-
assert.Equal(t, expectedIndexed, actualIndexed, "indexed mismatch")
83-
assert.Equal(t, expectedDeleted, actualDeleted, "deleted mismatch")
81+
actual := ix.Stats()
82+
assert.Equal(t, expectedIndexed, actual.Indexed, "indexed mismatch")
83+
assert.Equal(t, expectedDeleted, actual.Deleted, "deleted mismatch")
8484
}

indexers/contacts.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ func (i *ContactIndexer) Index(db *sql.DB, rebuild, cleanup bool) (string, error
6060
return "", errors.Wrap(err, "error finding last modified")
6161
}
6262

63-
i.log().WithField("index", physicalIndex).WithField("last_modified", lastModified).Info("indexing newer than last modified")
63+
i.log().WithField("index", physicalIndex).WithField("last_modified", lastModified).Debug("indexing newer than last modified")
6464

6565
// now index our docs
6666
start := time.Now()
@@ -243,7 +243,7 @@ func (i *ContactIndexer) indexModified(db *sql.DB, index string, lastModified ti
243243
elapsed := time.Since(start)
244244
rate := float32(processedCount) / (float32(elapsed) / float32(time.Second))
245245

246-
i.log().WithField("index", index).WithFields(logrus.Fields{"rate": int(rate), "added": createdCount, "deleted": deletedCount, "elapsed": elapsed}).Info("indexed contact batch")
246+
i.log().WithField("index", index).WithFields(logrus.Fields{"rate": int(rate), "added": createdCount, "deleted": deletedCount, "elapsed": elapsed}).Debug("indexed contact batch")
247247
}
248248

249249
return createdCount, deletedCount, nil

0 commit comments

Comments
 (0)