Skip to content

Commit

Permalink
Move all indexers into same package
Browse files Browse the repository at this point in the history
  • Loading branch information
rowanseymour committed Mar 24, 2022
1 parent a20bdf4 commit 83e8231
Show file tree
Hide file tree
Showing 10 changed files with 260 additions and 317 deletions.
95 changes: 0 additions & 95 deletions base_test.go

This file was deleted.

4 changes: 2 additions & 2 deletions cmd/rp-indexer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"github.com/evalphobia/logrus_sentry"
_ "github.com/lib/pq"
"github.com/nyaruka/ezconf"
"github.com/nyaruka/rp-indexer/contacts"
"github.com/nyaruka/rp-indexer/indexers"
log "github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -64,7 +64,7 @@ func main() {
log.Fatal(err)
}

ci := contacts.NewIndexer(config.ElasticURL, config.Index, 500)
ci := indexers.NewContactIndexer(config.ElasticURL, config.Index, 500)

for {
_, err := ci.Index(db, config.Rebuild, config.Cleanup)
Expand Down
74 changes: 0 additions & 74 deletions contacts/query.go

This file was deleted.

70 changes: 38 additions & 32 deletions base.go → indexers/base.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package indexer
package indexers

import (
"database/sql"
Expand All @@ -10,17 +10,24 @@ import (
"time"

"github.com/nyaruka/gocommon/jsonx"
"github.com/nyaruka/rp-indexer/utils"
"github.com/sirupsen/logrus"
)

// indexes a document
const indexCommand = `{ "index": { "_id": %d, "_type": "_doc", "version": %d, "version_type": "external", "routing": %d} }`

// deletes a document
const deleteCommand = `{ "delete" : { "_id": %d, "_type": "_doc", "version": %d, "version_type": "external", "routing": %d} }`

// Indexer is base interface for indexers
type Indexer interface {
Name() string
Index(db *sql.DB, rebuild, cleanup bool) (string, error)
Stats() (int64, int64, time.Duration)
}

type BaseIndexer struct {
type baseIndexer struct {
elasticURL string
name string // e.g. contacts, used as the alias

Expand All @@ -30,38 +37,38 @@ type BaseIndexer struct {
elapsedTotal time.Duration
}

func NewBaseIndexer(elasticURL, name string) BaseIndexer {
return BaseIndexer{elasticURL: elasticURL, name: name}
func newBaseIndexer(elasticURL, name string) baseIndexer {
return baseIndexer{elasticURL: elasticURL, name: name}
}

func (i *BaseIndexer) Name() string {
func (i *baseIndexer) Name() string {
return i.name
}

func (i *BaseIndexer) Log() *logrus.Entry {
return logrus.WithField("indexer", i.name)
func (i *baseIndexer) Stats() (int64, int64, time.Duration) {
return i.indexedTotal, i.deletedTotal, i.elapsedTotal
}

func (i *BaseIndexer) Stats() (int64, int64, time.Duration) {
return i.indexedTotal, i.deletedTotal, i.elapsedTotal
func (i *baseIndexer) log() *logrus.Entry {
return logrus.WithField("indexer", i.name)
}

// RecordComplete records a complete index and updates statistics
func (i *BaseIndexer) RecordComplete(indexed, deleted int, elapsed time.Duration) {
// records a complete index and updates statistics
func (i *baseIndexer) recordComplete(indexed, deleted int, elapsed time.Duration) {
i.indexedTotal += int64(indexed)
i.deletedTotal += int64(deleted)
i.elapsedTotal += elapsed

i.Log().WithField("indexed", indexed).WithField("deleted", deleted).WithField("elapsed", elapsed).Info("completed indexing")
i.log().WithField("indexed", indexed).WithField("deleted", deleted).WithField("elapsed", elapsed).Info("completed indexing")
}

// our response for figuring out the physical index for an alias
type infoResponse map[string]interface{}

// FindIndexes finds all our physical indexes
func (i *BaseIndexer) FindIndexes() []string {
func (i *baseIndexer) FindIndexes() []string {
response := infoResponse{}
_, err := makeJSONRequest(http.MethodGet, fmt.Sprintf("%s/%s", i.elasticURL, i.name), nil, &response)
_, err := utils.MakeJSONRequest(http.MethodGet, fmt.Sprintf("%s/%s", i.elasticURL, i.name), nil, &response)
indexes := make([]string, 0)

// error could mean a variety of things, but we'll figure that out later
Expand All @@ -77,19 +84,19 @@ func (i *BaseIndexer) FindIndexes() []string {
// reverse sort order should put our newest index first
sort.Sort(sort.Reverse(sort.StringSlice(indexes)))

i.Log().WithField("indexes", indexes).Debug("found physical indexes")
i.log().WithField("indexes", indexes).Debug("found physical indexes")

return indexes
}

// CreateNewIndex creates a new index for the passed in alias.
// creates a new index for the passed in alias.
//
// Note that we do not create an index with the passed name, instead creating one
// based on the day, for example `contacts_2018_03_05`, then create an alias from
// that index to `contacts`.
//
// If the day-specific name already exists, we append a .1 or .2 to the name.
func (i *BaseIndexer) CreateNewIndex(settings json.RawMessage) (string, error) {
func (i *baseIndexer) createNewIndex(settings json.RawMessage) (string, error) {
// create our day-specific name
index := fmt.Sprintf("%s_%s", i.name, time.Now().Format("2006_01_02"))
idx := 0
Expand All @@ -111,13 +118,13 @@ func (i *BaseIndexer) CreateNewIndex(settings json.RawMessage) (string, error) {
}

// create the new index
_, err := makeJSONRequest(http.MethodPut, fmt.Sprintf("%s/%s?include_type_name=true", i.elasticURL, index), settings, nil)
_, err := utils.MakeJSONRequest(http.MethodPut, fmt.Sprintf("%s/%s?include_type_name=true", i.elasticURL, index), settings, nil)
if err != nil {
return "", err
}

// all went well, return our physical index name
i.Log().WithField("index", index).Info("created new index")
i.log().WithField("index", index).Info("created new index")

return index, nil
}
Expand All @@ -143,9 +150,8 @@ type removeAliasCommand struct {
} `json:"remove"`
}

// UpdateAlias maps the passed in alias to the new physical index, optionally removing
// existing aliases if they exit.
func (i *BaseIndexer) UpdateAlias(newIndex string) error {
// maps this indexer's alias to the new physical index, removing existing aliases if they exist
func (i *baseIndexer) updateAlias(newIndex string) error {
commands := make([]interface{}, 0)

// find existing physical indexes
Expand All @@ -167,9 +173,9 @@ func (i *BaseIndexer) UpdateAlias(newIndex string) error {

aliasJSON := jsonx.MustMarshal(aliasCommand{Actions: commands})

_, err := makeJSONRequest(http.MethodPost, fmt.Sprintf("%s/_aliases", i.elasticURL), aliasJSON, nil)
_, err := utils.MakeJSONRequest(http.MethodPost, fmt.Sprintf("%s/_aliases", i.elasticURL), aliasJSON, nil)

i.Log().WithField("index", newIndex).Info("updated alias")
i.log().WithField("index", newIndex).Info("updated alias")

return err
}
Expand All @@ -181,8 +187,8 @@ type healthResponse struct {
} `json:"indices"`
}

// CleanupIndexes removes all indexes that are older than the currently active index
func (i *BaseIndexer) CleanupIndexes() error {
// removes all indexes that are older than the currently active index
func (i *baseIndexer) cleanupIndexes() error {
// find our current indexes
currents := i.FindIndexes()

Expand All @@ -193,7 +199,7 @@ func (i *BaseIndexer) CleanupIndexes() error {

// find all the current indexes
healthResponse := healthResponse{}
_, err := makeJSONRequest(http.MethodGet, fmt.Sprintf("%s/%s", i.elasticURL, "_cluster/health?level=indices"), nil, &healthResponse)
_, err := utils.MakeJSONRequest(http.MethodGet, fmt.Sprintf("%s/%s", i.elasticURL, "_cluster/health?level=indices"), nil, &healthResponse)
if err != nil {
return err
}
Expand All @@ -202,7 +208,7 @@ func (i *BaseIndexer) CleanupIndexes() error {
for key := range healthResponse.Indices {
if strings.HasPrefix(key, i.name) && strings.Compare(key, currents[0]) < 0 {
logrus.WithField("index", key).Info("removing old index")
_, err = makeJSONRequest(http.MethodDelete, fmt.Sprintf("%s/%s", i.elasticURL, key), nil, nil)
_, err = utils.MakeJSONRequest(http.MethodDelete, fmt.Sprintf("%s/%s", i.elasticURL, key), nil, nil)
if err != nil {
return err
}
Expand All @@ -228,11 +234,11 @@ type indexResponse struct {
}

// indexes the batch of contacts
func (i *BaseIndexer) IndexBatch(index string, batch []byte) (int, int, error) {
func (i *baseIndexer) indexBatch(index string, batch []byte) (int, int, error) {
response := indexResponse{}
indexURL := fmt.Sprintf("%s/%s/_bulk", i.elasticURL, index)

_, err := makeJSONRequest(http.MethodPut, indexURL, batch, &response)
_, err := utils.MakeJSONRequest(http.MethodPut, indexURL, batch, &response)
if err != nil {
return 0, 0, err
}
Expand Down Expand Up @@ -280,12 +286,12 @@ type queryResponse struct {
}

// GetLastModified queries a concrete index and finds the last modified document, returning its modified time
func (i *BaseIndexer) GetLastModified(index string) (time.Time, error) {
func (i *baseIndexer) GetLastModified(index string) (time.Time, error) {
lastModified := time.Time{}

// get the newest document on our index
queryResponse := queryResponse{}
_, err := makeJSONRequest(http.MethodPost, fmt.Sprintf("%s/%s/_search", i.elasticURL, index), []byte(`{ "sort": [{ "modified_on_mu": "desc" }]}`), &queryResponse)
_, err := utils.MakeJSONRequest(http.MethodPost, fmt.Sprintf("%s/%s/_search", i.elasticURL, index), []byte(`{ "sort": [{ "modified_on_mu": "desc" }]}`), &queryResponse)
if err != nil {
return lastModified, err
}
Expand Down
Loading

0 comments on commit 83e8231

Please sign in to comment.