Skip to content

Commit

Permalink
Merge branch 'main' into opensearch
Browse files Browse the repository at this point in the history
  • Loading branch information
rowanseymour committed Dec 13, 2024
2 parents 836311b + 1f7e63e commit 359470e
Show file tree
Hide file tree
Showing 15 changed files with 273 additions and 196 deletions.
7 changes: 2 additions & 5 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
name: CI
on: [push, pull_request]
env:
go-version: "1.22.x"
go-version: "1.23.x"
jobs:
test:
name: Test
runs-on: ubuntu-latest

services:
postgres:
image: postgres:14-alpine
image: postgres:15-alpine
env:
POSTGRES_DB: indexer_test
POSTGRES_USER: indexer_test
Expand All @@ -25,9 +25,6 @@ jobs:
OPENSEARCH_JAVA_OPTS: -Xms512m -Xmx512m
DISABLE_INSTALL_DEMO_CONFIG: true
OPENSEARCH_INITIAL_ADMIN_PASSWORD: temba
ports:
- 9200:9200
options: --health-cmd "curl http://localhost:9200/_cluster/health" --health-interval 10s --health-timeout 5s --health-retries 5

steps:
- name: Checkout code
Expand Down
14 changes: 14 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
v9.3.0 (2024-12-13)
-------------------------
* Send metrics to cloudwatch

v9.2.1 (2024-10-08)
-------------------------
* Don't include status groups in contact indexing

v9.2.0 (2024-07-17)
-------------------------
* Test against postgresql 15
* Update dependencies
* Test against Elasticsearch 8

v9.1.9 (2024-06-10)
-------------------------
* Record stats inside indexing batch loop
Expand Down
35 changes: 1 addition & 34 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,40 +46,7 @@ For use with RapidPro, you will want to configure these settings:

Recommended settings for error reporting:

* `INDEXER_SENTRY_DSN`: The DSN to use when logging errors to Sentry

### Reference

These are the configuration options that can be provided as parameters or environment variables. If using environment
varibles, convert to uppercase, replace dashes with underscores and prefix the name with `INDEXER_`, e.g. `-log-level`
becomes `INDEXER_LOG_LEVEL`.

```
-cleanup
whether to remove old indexes after a rebuild
-db string
the connection string for our database (default "postgres://localhost/rapidpro?sslmode=disable")
-debug-conf
print where config values are coming from
-elastic-url string
the url for our elastic search instance (default "http://localhost:9200")
-help
print usage information
-index string
the alias for our contact index (default "contacts")
-librato-username
the Librato username for metrics reporting
-librato-token
the Librato token for metrics reporting
-log-level string
the log level, one of error, warn, info, debug (default "info")
-poll int
the number of seconds to wait between checking for updated contacts (default 5)
-rebuild
whether to rebuild the index, swapping it when complete, then exiting (default false)
-sentry-dsn string
the sentry configuration to log errors to, if any
```
* `INDEXER_SENTRY_DSN`: DSN to use when logging errors to Sentry

## Development

Expand Down
35 changes: 22 additions & 13 deletions cmd/rp-indexer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ import (
"github.com/getsentry/sentry-go"
_ "github.com/lib/pq"
"github.com/nyaruka/ezconf"
"github.com/nyaruka/gocommon/aws/cwatch"
indexer "github.com/nyaruka/rp-indexer/v9"
"github.com/nyaruka/rp-indexer/v9/indexers"
"github.com/nyaruka/rp-indexer/v9/runtime"
slogmulti "github.com/samber/slog-multi"
slogsentry "github.com/samber/slog-sentry"
)
Expand All @@ -25,7 +27,7 @@ var (
)

func main() {
cfg := indexer.NewDefaultConfig()
cfg := runtime.NewDefaultConfig()
loader := ezconf.NewLoader(cfg, "indexer", "Indexes RapidPro contacts to ElasticSearch", []string{"indexer.toml"})
loader.MustLoad()

Expand All @@ -36,15 +38,14 @@ func main() {
os.Exit(1)
}

rt := &runtime.Runtime{Config: cfg}

// configure our logger
logHandler := slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: level})
slog.SetDefault(slog.New(logHandler))

logger := slog.With("comp", "main")
logger.Info("starting indexer", "version", version, "released", date)

// if we have a DSN entry, try to initialize it
if cfg.SentryDSN != "" {
if rt.Config.SentryDSN != "" {
err := sentry.Init(sentry.ClientOptions{
Dsn: cfg.SentryDSN,
EnableTracing: false,
Expand All @@ -56,7 +57,7 @@ func main() {

defer sentry.Flush(2 * time.Second)

logger = slog.New(
logger := slog.New(
slogmulti.Fanout(
logHandler,
slogsentry.Option{Level: slog.LevelError}.NewSentryHandler(),
Expand All @@ -66,24 +67,32 @@ func main() {
slog.SetDefault(logger)
}

db, err := sql.Open("postgres", cfg.DB)
log := slog.With("comp", "main")
log.Info("starting indexer", "version", version, "released", date)

rt.DB, err = sql.Open("postgres", cfg.DB)
if err != nil {
log.Error("unable to connect to database", "error", err)
}

rt.CW, err = cwatch.NewService(rt.Config.AWSAccessKeyID, rt.Config.AWSSecretAccessKey, rt.Config.AWSRegion, rt.Config.CloudwatchNamespace, rt.Config.DeploymentID)
if err != nil {
logger.Error("unable to connect to database")
log.Error("unable to create cloudwatch service", "error", err)
}

idxrs := []indexers.Indexer{
indexers.NewContactIndexer(cfg.ElasticURL, cfg.ContactsIndex, cfg.ContactsShards, cfg.ContactsReplicas, 500),
indexers.NewContactIndexer(rt.Config.ElasticURL, rt.Config.ContactsIndex, rt.Config.ContactsShards, rt.Config.ContactsReplicas, 500),
}

if cfg.Rebuild {
if rt.Config.Rebuild {
// if rebuilding, just do a complete index and quit. In future when we support multiple indexers,
// the rebuild argument can be become the name of the index to rebuild, e.g. --rebuild=contacts
idxr := idxrs[0]
if _, err := idxr.Index(db, true, cfg.Cleanup); err != nil {
logger.Error("error during rebuilding", "error", err, "indexer", idxr.Name())
if _, err := idxr.Index(rt, true, rt.Config.Cleanup); err != nil {
log.Error("error during rebuilding", "error", err, "indexer", idxr.Name())
}
} else {
d := indexer.NewDaemon(cfg, db, idxrs, time.Duration(cfg.Poll)*time.Second)
d := indexer.NewDaemon(rt, idxrs)
d.Start()

handleSignals(d)
Expand Down
38 changes: 0 additions & 38 deletions config.go

This file was deleted.

52 changes: 30 additions & 22 deletions daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,20 @@ package indexer

import (
"context"
"database/sql"
"fmt"
"log/slog"
"sync"
"time"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/cloudwatch/types"
"github.com/nyaruka/gocommon/analytics"
"github.com/nyaruka/rp-indexer/v9/indexers"
"github.com/nyaruka/rp-indexer/v9/runtime"
)

type Daemon struct {
cfg *Config
db *sql.DB
rt *runtime.Runtime
wg *sync.WaitGroup
quit chan bool
indexers []indexers.Indexer
Expand All @@ -24,27 +25,19 @@ type Daemon struct {
}

// NewDaemon creates a new daemon to run the given indexers
func NewDaemon(cfg *Config, db *sql.DB, ixs []indexers.Indexer, poll time.Duration) *Daemon {
func NewDaemon(rt *runtime.Runtime, ixs []indexers.Indexer) *Daemon {
return &Daemon{
cfg: cfg,
db: db,
rt: rt,
wg: &sync.WaitGroup{},
quit: make(chan bool),
indexers: ixs,
poll: poll,
poll: time.Duration(rt.Config.Poll) * time.Second,
prevStats: make(map[indexers.Indexer]indexers.Stats, len(ixs)),
}
}

// Start starts this daemon
func (d *Daemon) Start() {
// if we have a librato token, configure it
if d.cfg.LibratoToken != "" {
analytics.RegisterBackend(analytics.NewLibrato(d.cfg.LibratoUsername, d.cfg.LibratoToken, d.cfg.InstanceName, time.Second, d.wg))
}

analytics.Start()

for _, i := range d.indexers {
d.startIndexer(i)
}
Expand All @@ -68,7 +61,7 @@ func (d *Daemon) startIndexer(indexer indexers.Indexer) {
case <-d.quit:
return
case <-time.After(d.poll):
_, err := indexer.Index(d.db, d.cfg.Rebuild, d.cfg.Cleanup)
_, err := indexer.Index(d.rt, d.rt.Config.Rebuild, d.rt.Config.Cleanup)
if err != nil {
log.Error("error during indexing", "error", err)
}
Expand Down Expand Up @@ -107,7 +100,8 @@ func (d *Daemon) reportStats(includeLag bool) {
defer cancel()

log := slog.New(slog.Default().Handler())
metrics := make(map[string]float64, len(d.indexers)*2)
guages := make(map[string]float64, len(d.indexers)*3)
metrics := make([]types.MetricDatum, 0, len(d.indexers)*3)

for _, ix := range d.indexers {
stats := ix.Stats()
Expand All @@ -121,9 +115,17 @@ func (d *Daemon) reportStats(includeLag bool) {
rateInPeriod = float64(indexedInPeriod) / (float64(elapsedInPeriod) / float64(time.Second))
}

metrics[ix.Name()+"_indexed"] = float64(indexedInPeriod)
metrics[ix.Name()+"_deleted"] = float64(deletedInPeriod)
metrics[ix.Name()+"_rate"] = rateInPeriod
guages[ix.Name()+"_indexed"] = float64(indexedInPeriod)
guages[ix.Name()+"_deleted"] = float64(deletedInPeriod)
guages[ix.Name()+"_rate"] = rateInPeriod

dims := []types.Dimension{{Name: aws.String("Index"), Value: aws.String(ix.Name())}}

metrics = append(metrics,
types.MetricDatum{MetricName: aws.String("IndexerIndexed"), Dimensions: dims, Value: aws.Float64(float64(indexedInPeriod)), Unit: types.StandardUnitCount},
types.MetricDatum{MetricName: aws.String("IndexerDeleted"), Dimensions: dims, Value: aws.Float64(float64(deletedInPeriod)), Unit: types.StandardUnitCount},
types.MetricDatum{MetricName: aws.String("IndexerRate"), Dimensions: dims, Value: aws.Float64(rateInPeriod), Unit: types.StandardUnitCountSecond},
)

d.prevStats[ix] = stats

Expand All @@ -132,16 +134,22 @@ func (d *Daemon) reportStats(includeLag bool) {
if err != nil {
log.Error("error getting db last modified", "index", ix.Name(), "error", err)
} else {
metrics[ix.Name()+"_lag"] = lag.Seconds()
guages[ix.Name()+"_lag"] = lag.Seconds()

metrics = append(metrics, types.MetricDatum{MetricName: aws.String("IndexerLag"), Dimensions: dims, Value: aws.Float64(lag.Seconds()), Unit: types.StandardUnitSeconds})
}
}
}

for k, v := range metrics {
for k, v := range guages {
analytics.Gauge("indexer."+k, v)
log = log.With(k, v)
}

if _, err := d.rt.CW.Client.PutMetricData(ctx, d.rt.CW.Prepare(metrics)); err != nil {
log.Error("error putting metrics", "error", err)
}

log.Info("stats reported")
}

Expand All @@ -151,7 +159,7 @@ func (d *Daemon) calculateLag(ctx context.Context, ix indexers.Indexer) (time.Du
return 0, fmt.Errorf("error getting ES last modified: %w", err)
}

dbLastModified, err := ix.GetDBLastModified(ctx, d.db)
dbLastModified, err := ix.GetDBLastModified(ctx, d.rt.DB)
if err != nil {
return 0, fmt.Errorf("error getting DB last modified: %w", err)
}
Expand Down
Loading

0 comments on commit 359470e

Please sign in to comment.