From 89145097d627bc6763fdf7410bb756c844a300ee Mon Sep 17 00:00:00 2001 From: Rowan Seymour Date: Tue, 11 Oct 2022 17:17:36 -0500 Subject: [PATCH] Rework passing of shards and replicas so they're passed to the indexer constructor rather than the Index method --- cmd/rp-indexer/main.go | 4 ++-- daemon.go | 2 +- indexers/base.go | 18 +++++++++++++++--- indexers/contacts.go | 19 +++++++------------ indexers/contacts_test.go | 16 ++++++++-------- 5 files changed, 33 insertions(+), 26 deletions(-) diff --git a/cmd/rp-indexer/main.go b/cmd/rp-indexer/main.go index fd67ba9..d9893a4 100644 --- a/cmd/rp-indexer/main.go +++ b/cmd/rp-indexer/main.go @@ -55,14 +55,14 @@ func main() { } idxrs := []indexers.Indexer{ - indexers.NewContactIndexer(cfg.ElasticURL, cfg.ContactsIndex, 500), + indexers.NewContactIndexer(cfg.ElasticURL, cfg.ContactsIndex, cfg.ContactsShards, cfg.ContactsReplicas, 500), } if cfg.Rebuild { // if rebuilding, just do a complete index and quit. In future when we support multiple indexers, // the rebuild argument can be become the name of the index to rebuild, e.g. --rebuild=contacts idxr := idxrs[0] - if _, err := idxr.Index(db, true, cfg.Cleanup, cfg.ContactsShards, cfg.ContactsReplicas); err != nil { + if _, err := idxr.Index(db, true, cfg.Cleanup); err != nil { log.WithField("indexer", idxr.Name()).WithError(err).Fatal("error during rebuilding") } } else { diff --git a/daemon.go b/daemon.go index 3bdbc07..8831075 100644 --- a/daemon.go +++ b/daemon.go @@ -66,7 +66,7 @@ func (d *Daemon) startIndexer(indexer indexers.Indexer) { case <-d.quit: return case <-time.After(d.poll): - _, err := indexer.Index(d.db, d.cfg.Rebuild, d.cfg.Cleanup, d.cfg.ContactsShards, d.cfg.ContactsReplicas) + _, err := indexer.Index(d.db, d.cfg.Rebuild, d.cfg.Cleanup) if err != nil { log.WithError(err).Error("error during indexing") } diff --git a/indexers/base.go b/indexers/base.go index c23093a..dad6dbc 100644 --- a/indexers/base.go +++ b/indexers/base.go @@ -29,10 +29,12 @@ type Stats struct { // Indexer is base interface for indexers type Indexer interface { Name() string - Index(db *sql.DB, rebuild, cleanup bool, shards int, replicas int) (string, error) + Index(db *sql.DB, rebuild, cleanup bool) (string, error) Stats() Stats } +// IndexDefinition is what we pass to elastic to create an index, +// see https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-create-index.html type IndexDefinition struct { Settings struct { Index struct { @@ -45,15 +47,25 @@ type IndexDefinition struct { Mappings json.RawMessage `json:"mappings"` } +func newIndexDefinition(base []byte, shards, replicas int) *IndexDefinition { + d := &IndexDefinition{} + jsonx.MustUnmarshal(contactsIndexDef, d) + + d.Settings.Index.NumberOfShards = shards + d.Settings.Index.NumberOfReplicas = replicas + return d +} + type baseIndexer struct { elasticURL string name string // e.g. contacts, used as the alias + definition *IndexDefinition stats Stats } -func newBaseIndexer(elasticURL, name string) baseIndexer { - return baseIndexer{elasticURL: elasticURL, name: name} +func newBaseIndexer(elasticURL, name string, def *IndexDefinition) baseIndexer { + return baseIndexer{elasticURL: elasticURL, name: name, definition: def} } func (i *baseIndexer) Name() string { diff --git a/indexers/contacts.go b/indexers/contacts.go index 55612eb..0f0df90 100644 --- a/indexers/contacts.go +++ b/indexers/contacts.go @@ -7,13 +7,12 @@ import ( "fmt" "time" - "github.com/nyaruka/gocommon/jsonx" "github.com/pkg/errors" "github.com/sirupsen/logrus" ) //go:embed contacts.index.json -var contactsIndexDefinition []byte +var contactsIndexDef []byte // ContactIndexer is an indexer for contacts type ContactIndexer struct { @@ -23,15 +22,17 @@ type ContactIndexer struct { } // NewContactIndexer creates a new contact indexer -func NewContactIndexer(elasticURL, name string, batchSize int) *ContactIndexer { +func NewContactIndexer(elasticURL, name string, shards, replicas, batchSize int) *ContactIndexer { + def := newIndexDefinition(contactsIndexDef, shards, replicas) + return &ContactIndexer{ - baseIndexer: newBaseIndexer(elasticURL, name), + baseIndexer: newBaseIndexer(elasticURL, name, def), batchSize: batchSize, } } // Index indexes modified contacts and returns the name of the concrete index -func (i *ContactIndexer) Index(db *sql.DB, rebuild, cleanup bool, shards, replicas int) (string, error) { +func (i *ContactIndexer) Index(db *sql.DB, rebuild, cleanup bool) (string, error) { var err error // find our physical index @@ -47,13 +48,7 @@ func (i *ContactIndexer) Index(db *sql.DB, rebuild, cleanup bool, shards, replic // doesn't exist or we are rebuilding, create it if physicalIndex == "" || rebuild { - def := &IndexDefinition{} - jsonx.MustUnmarshal(contactsIndexDefinition, def) - - def.Settings.Index.NumberOfShards = shards - def.Settings.Index.NumberOfReplicas = replicas - - physicalIndex, err = i.createNewIndex(def) + physicalIndex, err = i.createNewIndex(i.definition) if err != nil { return "", errors.Wrap(err, "error creating new index") } diff --git a/indexers/contacts_test.go b/indexers/contacts_test.go index 463aac0..d284ce2 100644 --- a/indexers/contacts_test.go +++ b/indexers/contacts_test.go @@ -186,12 +186,12 @@ var contactQueryTests = []struct { func TestContacts(t *testing.T) { db, es := setup(t) - ix1 := indexers.NewContactIndexer(elasticURL, aliasName, 4) + ix1 := indexers.NewContactIndexer(elasticURL, aliasName, 2, 1, 4) assert.Equal(t, "indexer_test", ix1.Name()) expectedIndexName := fmt.Sprintf("indexer_test_%s", time.Now().Format("2006_01_02")) - indexName, err := ix1.Index(db, false, false, 2, 1) + indexName, err := ix1.Index(db, false, false) assert.NoError(t, err) assert.Equal(t, expectedIndexName, indexName) @@ -217,7 +217,7 @@ func TestContacts(t *testing.T) { require.NoError(t, err) // and index again... - indexName, err = ix1.Index(db, false, false, 2, 1) + indexName, err = ix1.Index(db, false, false) assert.NoError(t, err) assert.Equal(t, expectedIndexName, indexName) // same index used assertIndexerStats(t, ix1, 10, 1) @@ -238,9 +238,9 @@ func TestContacts(t *testing.T) { require.NoError(t, err) // and simulate another indexer doing a parallel rebuild - ix2 := indexers.NewContactIndexer(elasticURL, aliasName, 4) + ix2 := indexers.NewContactIndexer(elasticURL, aliasName, 2, 1, 4) - indexName2, err := ix2.Index(db, true, false, 2, 1) + indexName2, err := ix2.Index(db, true, false) assert.NoError(t, err) assert.Equal(t, expectedIndexName+"_1", indexName2) // new index used assertIndexerStats(t, ix2, 8, 0) @@ -254,8 +254,8 @@ func TestContacts(t *testing.T) { assertQuery(t, es, elastic.NewMatchQuery("name", "eric"), []int64{2}) // simulate another indexer doing a parallel rebuild with cleanup - ix3 := indexers.NewContactIndexer(elasticURL, aliasName, 4) - indexName3, err := ix3.Index(db, true, true, 2, 1) + ix3 := indexers.NewContactIndexer(elasticURL, aliasName, 2, 1, 4) + indexName3, err := ix3.Index(db, true, true) assert.NoError(t, err) assert.Equal(t, expectedIndexName+"_2", indexName3) // new index used assertIndexerStats(t, ix3, 8, 0) @@ -264,7 +264,7 @@ func TestContacts(t *testing.T) { assertIndexesWithPrefix(t, es, aliasName, []string{expectedIndexName + "_2"}) // check that the original indexer now indexes against the new index - indexName, err = ix1.Index(db, false, false, 2, 1) + indexName, err = ix1.Index(db, false, false) assert.NoError(t, err) assert.Equal(t, expectedIndexName+"_2", indexName) }