Skip to content

Commit cc7dc01

Browse files
committed
Merge branch 'main' into flow_history
2 parents 1fa119f + 18e5991 commit cc7dc01

File tree

9 files changed

+237
-59
lines changed

9 files changed

+237
-59
lines changed

CHANGELOG.md

+6
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,9 @@
1+
v7.3.0
2+
----------
3+
* Add stats reporting cron task and optional librato config
4+
* Refactor to support different indexer types
5+
* Update golang.org/x/sys
6+
17
v7.2.0
28
----------
39
* Tweak README

cmd/rp-indexer/main.go

+39-44
Original file line numberDiff line numberDiff line change
@@ -3,86 +3,81 @@ package main
33
import (
44
"database/sql"
55
"os"
6-
"time"
6+
"os/signal"
7+
"syscall"
78

89
"github.com/evalphobia/logrus_sentry"
910
_ "github.com/lib/pq"
1011
"github.com/nyaruka/ezconf"
12+
indexer "github.com/nyaruka/rp-indexer"
1113
"github.com/nyaruka/rp-indexer/indexers"
1214
log "github.com/sirupsen/logrus"
1315
)
1416

15-
type config struct {
16-
ElasticURL string `help:"the url for our elastic search instance"`
17-
DB string `help:"the connection string for our database"`
18-
Index string `help:"the alias for our contact index"`
19-
Poll int `help:"the number of seconds to wait between checking for updated contacts"`
20-
Rebuild bool `help:"whether to rebuild the index, swapping it when complete, then exiting (default false)"`
21-
Cleanup bool `help:"whether to remove old indexes after a rebuild"`
22-
LogLevel string `help:"the log level, one of error, warn, info, debug"`
23-
SentryDSN string `help:"the sentry configuration to log errors to, if any"`
24-
}
25-
2617
func main() {
27-
config := config{
28-
ElasticURL: "http://localhost:9200",
29-
DB: "postgres://localhost/temba?sslmode=disable",
30-
Index: "contacts",
31-
Poll: 5,
32-
Rebuild: false,
33-
Cleanup: false,
34-
LogLevel: "info",
35-
}
36-
loader := ezconf.NewLoader(&config, "indexer", "Indexes RapidPro contacts to ElasticSearch", []string{"indexer.toml"})
18+
cfg := indexer.NewDefaultConfig()
19+
loader := ezconf.NewLoader(cfg, "indexer", "Indexes RapidPro contacts to ElasticSearch", []string{"indexer.toml"})
3720
loader.MustLoad()
3821

3922
// configure our logger
4023
log.SetOutput(os.Stdout)
4124
log.SetFormatter(&log.TextFormatter{})
4225

43-
level, err := log.ParseLevel(config.LogLevel)
26+
level, err := log.ParseLevel(cfg.LogLevel)
4427
if err != nil {
4528
log.Fatalf("Invalid log level '%s'", level)
4629
}
4730
log.SetLevel(level)
4831

4932
// if we have a DSN entry, try to initialize it
50-
if config.SentryDSN != "" {
51-
hook, err := logrus_sentry.NewSentryHook(config.SentryDSN, []log.Level{log.PanicLevel, log.FatalLevel, log.ErrorLevel})
33+
if cfg.SentryDSN != "" {
34+
hook, err := logrus_sentry.NewSentryHook(cfg.SentryDSN, []log.Level{log.PanicLevel, log.FatalLevel, log.ErrorLevel})
5235
hook.Timeout = 0
5336
hook.StacktraceConfiguration.Enable = true
5437
hook.StacktraceConfiguration.Skip = 4
5538
hook.StacktraceConfiguration.Context = 5
5639
if err != nil {
57-
log.Fatalf("invalid sentry DSN: '%s': %s", config.SentryDSN, err)
40+
log.Fatalf("invalid sentry DSN: '%s': %s", cfg.SentryDSN, err)
5841
}
5942
log.StandardLogger().Hooks.Add(hook)
6043
}
6144

62-
db, err := sql.Open("postgres", config.DB)
45+
db, err := sql.Open("postgres", cfg.DB)
6346
if err != nil {
64-
log.Fatal(err)
47+
log.Fatalf("unable to connect to database")
6548
}
6649

67-
ci := indexers.NewContactIndexer(config.ElasticURL, config.Index, 500)
68-
69-
for {
70-
_, err := ci.Index(db, config.Rebuild, config.Cleanup)
50+
idxrs := []indexers.Indexer{
51+
indexers.NewContactIndexer(cfg.ElasticURL, cfg.Index, 500),
52+
}
7153

72-
if err != nil {
73-
if config.Rebuild {
74-
log.WithField("index", config.Index).WithError(err).Fatal("error during rebuilding")
75-
} else {
76-
log.WithField("index", config.Index).WithError(err).Error("error during indexing")
77-
}
54+
if cfg.Rebuild {
55+
// if rebuilding, just do a complete index and quit. In future when we support multiple indexers,
56+
// the rebuild argument can be become the name of the index to rebuild, e.g. --rebuild=contacts
57+
idxr := idxrs[0]
58+
if _, err := idxr.Index(db, true, cfg.Cleanup); err != nil {
59+
log.WithField("indexer", idxr.Name()).WithError(err).Fatal("error during rebuilding")
7860
}
61+
} else {
62+
d := indexer.NewDaemon(cfg, db, idxrs)
63+
d.Start()
7964

80-
// if we were rebuilding then we're done
81-
if config.Rebuild {
82-
os.Exit(0)
83-
}
65+
handleSignals(d)
66+
}
67+
}
8468

85-
// sleep a bit before starting again
86-
time.Sleep(time.Second * 5)
69+
// handleSignals takes care of trapping quit, interrupt or terminate signals and doing the right thing
70+
func handleSignals(d *indexer.Daemon) {
71+
sigs := make(chan os.Signal, 1)
72+
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
73+
74+
for {
75+
sig := <-sigs
76+
switch sig {
77+
case syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT:
78+
log.WithField("signal", sig).Info("received exit signal, exiting")
79+
d.Stop()
80+
return
81+
}
8782
}
8883
}

config.go

+33
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package indexer
2+
3+
import "os"
4+
5+
type Config struct {
6+
ElasticURL string `help:"the url for our elastic search instance"`
7+
DB string `help:"the connection string for our database"`
8+
Index string `help:"the alias for our contact index"`
9+
Poll int `help:"the number of seconds to wait between checking for updated contacts"`
10+
Rebuild bool `help:"whether to rebuild the index, swapping it when complete, then exiting (default false)"`
11+
Cleanup bool `help:"whether to remove old indexes after a rebuild"`
12+
LogLevel string `help:"the log level, one of error, warn, info, debug"`
13+
SentryDSN string `help:"the sentry configuration to log errors to, if any"`
14+
15+
LibratoUsername string `help:"the username that will be used to authenticate to Librato"`
16+
LibratoToken string `help:"the token that will be used to authenticate to Librato"`
17+
InstanceName string `help:"the unique name of this instance used for analytics"`
18+
}
19+
20+
func NewDefaultConfig() *Config {
21+
hostname, _ := os.Hostname()
22+
23+
return &Config{
24+
ElasticURL: "http://localhost:9200",
25+
DB: "postgres://localhost/temba?sslmode=disable",
26+
Index: "contacts",
27+
Poll: 5,
28+
Rebuild: false,
29+
Cleanup: false,
30+
LogLevel: "info",
31+
InstanceName: hostname,
32+
}
33+
}

daemon.go

+134
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
package indexer
2+
3+
import (
4+
"database/sql"
5+
"sync"
6+
"time"
7+
8+
"github.com/nyaruka/librato"
9+
"github.com/nyaruka/rp-indexer/indexers"
10+
"github.com/sirupsen/logrus"
11+
)
12+
13+
type Daemon struct {
14+
cfg *Config
15+
db *sql.DB
16+
wg *sync.WaitGroup
17+
quit chan bool
18+
indexers []indexers.Indexer
19+
20+
prevStats map[indexers.Indexer]indexers.Stats
21+
}
22+
23+
// NewDaemon creates a new daemon to run the given indexers
24+
func NewDaemon(cfg *Config, db *sql.DB, ixs []indexers.Indexer) *Daemon {
25+
return &Daemon{
26+
cfg: cfg,
27+
db: db,
28+
wg: &sync.WaitGroup{},
29+
quit: make(chan bool),
30+
indexers: ixs,
31+
prevStats: make(map[indexers.Indexer]indexers.Stats, len(ixs)),
32+
}
33+
}
34+
35+
// Start starts this daemon
36+
func (d *Daemon) Start() {
37+
// if we have a librato token, configure it
38+
if d.cfg.LibratoToken != "" {
39+
librato.Configure(d.cfg.LibratoUsername, d.cfg.LibratoToken, d.cfg.InstanceName, time.Second, d.wg)
40+
librato.Start()
41+
}
42+
43+
for _, i := range d.indexers {
44+
d.startIndexer(i, time.Second*5)
45+
}
46+
47+
d.startStatsReporter(time.Minute)
48+
}
49+
50+
func (d *Daemon) startIndexer(indexer indexers.Indexer, interval time.Duration) {
51+
d.wg.Add(1) // add ourselves to the wait group
52+
53+
log := logrus.WithField("indexer", indexer.Name())
54+
55+
go func() {
56+
defer func() {
57+
log.Info("indexer exiting")
58+
d.wg.Done()
59+
}()
60+
61+
for {
62+
select {
63+
case <-d.quit:
64+
return
65+
case <-time.After(interval):
66+
_, err := indexer.Index(d.db, d.cfg.Rebuild, d.cfg.Cleanup)
67+
if err != nil {
68+
log.WithError(err).Error("error during indexing")
69+
}
70+
}
71+
}
72+
}()
73+
}
74+
75+
func (d *Daemon) startStatsReporter(interval time.Duration) {
76+
d.wg.Add(1) // add ourselves to the wait group
77+
78+
go func() {
79+
defer func() {
80+
logrus.Info("analytics exiting")
81+
d.wg.Done()
82+
}()
83+
84+
for {
85+
select {
86+
case <-d.quit:
87+
return
88+
case <-time.After(interval):
89+
d.reportStats()
90+
}
91+
}
92+
}()
93+
}
94+
95+
func (d *Daemon) reportStats() {
96+
metrics := make(map[string]float64, len(d.indexers)*2)
97+
98+
for _, ix := range d.indexers {
99+
stats := ix.Stats()
100+
prev := d.prevStats[ix]
101+
102+
indexedInPeriod := stats.Indexed - prev.Indexed
103+
deletedInPeriod := stats.Deleted - prev.Deleted
104+
elapsedInPeriod := stats.Elapsed - prev.Elapsed
105+
rateInPeriod := float64(0)
106+
if indexedInPeriod > 0 && elapsedInPeriod > 0 {
107+
rateInPeriod = float64(indexedInPeriod) / (float64(elapsedInPeriod) / float64(time.Second))
108+
}
109+
110+
metrics[ix.Name()+"_indexed"] = float64(indexedInPeriod)
111+
metrics[ix.Name()+"_deleted"] = float64(deletedInPeriod)
112+
metrics[ix.Name()+"_rate"] = rateInPeriod
113+
114+
d.prevStats[ix] = stats
115+
}
116+
117+
log := logrus.NewEntry(logrus.StandardLogger())
118+
119+
for k, v := range metrics {
120+
librato.Gauge("indexer."+k, v)
121+
log = log.WithField(k, v)
122+
}
123+
124+
log.Info("stats reported")
125+
}
126+
127+
// Stop stops this daemon
128+
func (d *Daemon) Stop() {
129+
logrus.Info("daemon stopping")
130+
librato.Stop()
131+
132+
close(d.quit)
133+
d.wg.Wait()
134+
}

go.mod

+1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ require (
55
github.com/lib/pq v1.10.4
66
github.com/nyaruka/ezconf v0.2.1
77
github.com/nyaruka/gocommon v1.17.1
8+
github.com/nyaruka/librato v1.0.0
89
github.com/olivere/elastic/v7 v7.0.22
910
github.com/pkg/errors v0.9.1
1011
github.com/sirupsen/logrus v1.8.1

go.sum

+6
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfC
3636
github.com/jmoiron/sqlx v1.3.4/go.mod h1:2BljVx/86SuTyjE+aPYlHCTNvZrnJXghYGpNiXLBMCQ=
3737
github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY=
3838
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
39+
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
3940
github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc=
4041
github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw=
4142
github.com/leodido/go-urn v1.2.1/go.mod h1:zt4jvISO2HfUBqxjfIshjdMTYS56ZS/qv49ictyFfxY=
@@ -53,6 +54,8 @@ github.com/nyaruka/ezconf v0.2.1 h1:TDXWoqjqYya1uhou1mAJZg7rgFYL98EB0Tb3+BWtUh0=
5354
github.com/nyaruka/ezconf v0.2.1/go.mod h1:ey182kYkw2MIi4XiWe1FR/mzI33WCmTWuceDYYxgnQw=
5455
github.com/nyaruka/gocommon v1.17.1 h1:4bbNp+0/BIbne4VDiKOxh3kcbdvEu/WsrsZiG/VyRZ8=
5556
github.com/nyaruka/gocommon v1.17.1/go.mod h1:nmYyb7MZDM0iW4DYJKiBzfKuE9nbnx+xSHZasuIBOT0=
57+
github.com/nyaruka/librato v1.0.0 h1:Vznj9WCeC1yZXbBYyYp40KnbmXLbEkjKmHesV/v2SR0=
58+
github.com/nyaruka/librato v1.0.0/go.mod h1:pkRNLFhFurOz0QqBz6/DuTFhHHxAubWxs4Jx+J7yUgg=
5659
github.com/nyaruka/phonenumbers v1.0.71/go.mod h1:sDaTZ/KPX5f8qyV9qN+hIm+4ZBARJrupC6LuhshJq1U=
5760
github.com/olivere/elastic/v7 v7.0.22 h1:esBA6JJwvYgfms0EVlH7Z+9J4oQ/WUADF2y/nCNDw7s=
5861
github.com/olivere/elastic/v7 v7.0.22/go.mod h1:VDexNy9NjmtAkrjNoI7tImv7FR4tf5zUA3ickqu5Pc8=
@@ -63,12 +66,14 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
6366
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
6467
github.com/shopspring/decimal v1.2.0 h1:abSATXmQEYyShuxI4/vyW3tV1MrKAJzCZ/0zLUXYbsQ=
6568
github.com/shopspring/decimal v1.2.0/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o=
69+
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
6670
github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE=
6771
github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
6872
github.com/smartystreets/assertions v1.1.1/go.mod h1:tcbTF8ujkAEcZ8TElKY+i30BzYlVhC/LOxJk7iOWnoo=
6973
github.com/smartystreets/go-aws-auth v0.0.0-20180515143844-0c1422d1fdb9/go.mod h1:SnhjPscd9TpLiy1LpzGSKh3bXCfxxXuqd9xmQJy3slM=
7074
github.com/smartystreets/gunit v1.4.2/go.mod h1:ZjM1ozSIMJlAz/ay4SG8PeKF00ckUp+zMHZXV9/bvak=
7175
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
76+
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
7277
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
7378
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
7479
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
@@ -96,6 +101,7 @@ golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJ
96101
golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
97102
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
98103
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
104+
golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
99105
golang.org/x/sys v0.0.0-20190502145724-3ef323f4f1fd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
100106
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
101107
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=

0 commit comments

Comments
 (0)