Skip to content

Commit 8914509

Browse files
committed
Rework passing of shards and replicas so they're passed to the indexer constructor rather than the Index method
1 parent 2970fd3 commit 8914509

File tree

5 files changed

+33
-26
lines changed

5 files changed

+33
-26
lines changed

cmd/rp-indexer/main.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -55,14 +55,14 @@ func main() {
5555
}
5656

5757
idxrs := []indexers.Indexer{
58-
indexers.NewContactIndexer(cfg.ElasticURL, cfg.ContactsIndex, 500),
58+
indexers.NewContactIndexer(cfg.ElasticURL, cfg.ContactsIndex, cfg.ContactsShards, cfg.ContactsReplicas, 500),
5959
}
6060

6161
if cfg.Rebuild {
6262
// if rebuilding, just do a complete index and quit. In future when we support multiple indexers,
6363
// the rebuild argument can be become the name of the index to rebuild, e.g. --rebuild=contacts
6464
idxr := idxrs[0]
65-
if _, err := idxr.Index(db, true, cfg.Cleanup, cfg.ContactsShards, cfg.ContactsReplicas); err != nil {
65+
if _, err := idxr.Index(db, true, cfg.Cleanup); err != nil {
6666
log.WithField("indexer", idxr.Name()).WithError(err).Fatal("error during rebuilding")
6767
}
6868
} else {

daemon.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ func (d *Daemon) startIndexer(indexer indexers.Indexer) {
6666
case <-d.quit:
6767
return
6868
case <-time.After(d.poll):
69-
_, err := indexer.Index(d.db, d.cfg.Rebuild, d.cfg.Cleanup, d.cfg.ContactsShards, d.cfg.ContactsReplicas)
69+
_, err := indexer.Index(d.db, d.cfg.Rebuild, d.cfg.Cleanup)
7070
if err != nil {
7171
log.WithError(err).Error("error during indexing")
7272
}

indexers/base.go

+15-3
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,12 @@ type Stats struct {
2929
// Indexer is base interface for indexers
3030
type Indexer interface {
3131
Name() string
32-
Index(db *sql.DB, rebuild, cleanup bool, shards int, replicas int) (string, error)
32+
Index(db *sql.DB, rebuild, cleanup bool) (string, error)
3333
Stats() Stats
3434
}
3535

36+
// IndexDefinition is what we pass to elastic to create an index,
37+
// see https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-create-index.html
3638
type IndexDefinition struct {
3739
Settings struct {
3840
Index struct {
@@ -45,15 +47,25 @@ type IndexDefinition struct {
4547
Mappings json.RawMessage `json:"mappings"`
4648
}
4749

50+
func newIndexDefinition(base []byte, shards, replicas int) *IndexDefinition {
51+
d := &IndexDefinition{}
52+
jsonx.MustUnmarshal(contactsIndexDef, d)
53+
54+
d.Settings.Index.NumberOfShards = shards
55+
d.Settings.Index.NumberOfReplicas = replicas
56+
return d
57+
}
58+
4859
type baseIndexer struct {
4960
elasticURL string
5061
name string // e.g. contacts, used as the alias
62+
definition *IndexDefinition
5163

5264
stats Stats
5365
}
5466

55-
func newBaseIndexer(elasticURL, name string) baseIndexer {
56-
return baseIndexer{elasticURL: elasticURL, name: name}
67+
func newBaseIndexer(elasticURL, name string, def *IndexDefinition) baseIndexer {
68+
return baseIndexer{elasticURL: elasticURL, name: name, definition: def}
5769
}
5870

5971
func (i *baseIndexer) Name() string {

indexers/contacts.go

+7-12
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,12 @@ import (
77
"fmt"
88
"time"
99

10-
"github.com/nyaruka/gocommon/jsonx"
1110
"github.com/pkg/errors"
1211
"github.com/sirupsen/logrus"
1312
)
1413

1514
//go:embed contacts.index.json
16-
var contactsIndexDefinition []byte
15+
var contactsIndexDef []byte
1716

1817
// ContactIndexer is an indexer for contacts
1918
type ContactIndexer struct {
@@ -23,15 +22,17 @@ type ContactIndexer struct {
2322
}
2423

2524
// NewContactIndexer creates a new contact indexer
26-
func NewContactIndexer(elasticURL, name string, batchSize int) *ContactIndexer {
25+
func NewContactIndexer(elasticURL, name string, shards, replicas, batchSize int) *ContactIndexer {
26+
def := newIndexDefinition(contactsIndexDef, shards, replicas)
27+
2728
return &ContactIndexer{
28-
baseIndexer: newBaseIndexer(elasticURL, name),
29+
baseIndexer: newBaseIndexer(elasticURL, name, def),
2930
batchSize: batchSize,
3031
}
3132
}
3233

3334
// Index indexes modified contacts and returns the name of the concrete index
34-
func (i *ContactIndexer) Index(db *sql.DB, rebuild, cleanup bool, shards, replicas int) (string, error) {
35+
func (i *ContactIndexer) Index(db *sql.DB, rebuild, cleanup bool) (string, error) {
3536
var err error
3637

3738
// find our physical index
@@ -47,13 +48,7 @@ func (i *ContactIndexer) Index(db *sql.DB, rebuild, cleanup bool, shards, replic
4748

4849
// doesn't exist or we are rebuilding, create it
4950
if physicalIndex == "" || rebuild {
50-
def := &IndexDefinition{}
51-
jsonx.MustUnmarshal(contactsIndexDefinition, def)
52-
53-
def.Settings.Index.NumberOfShards = shards
54-
def.Settings.Index.NumberOfReplicas = replicas
55-
56-
physicalIndex, err = i.createNewIndex(def)
51+
physicalIndex, err = i.createNewIndex(i.definition)
5752
if err != nil {
5853
return "", errors.Wrap(err, "error creating new index")
5954
}

indexers/contacts_test.go

+8-8
Original file line numberDiff line numberDiff line change
@@ -186,12 +186,12 @@ var contactQueryTests = []struct {
186186
func TestContacts(t *testing.T) {
187187
db, es := setup(t)
188188

189-
ix1 := indexers.NewContactIndexer(elasticURL, aliasName, 4)
189+
ix1 := indexers.NewContactIndexer(elasticURL, aliasName, 2, 1, 4)
190190
assert.Equal(t, "indexer_test", ix1.Name())
191191

192192
expectedIndexName := fmt.Sprintf("indexer_test_%s", time.Now().Format("2006_01_02"))
193193

194-
indexName, err := ix1.Index(db, false, false, 2, 1)
194+
indexName, err := ix1.Index(db, false, false)
195195
assert.NoError(t, err)
196196
assert.Equal(t, expectedIndexName, indexName)
197197

@@ -217,7 +217,7 @@ func TestContacts(t *testing.T) {
217217
require.NoError(t, err)
218218

219219
// and index again...
220-
indexName, err = ix1.Index(db, false, false, 2, 1)
220+
indexName, err = ix1.Index(db, false, false)
221221
assert.NoError(t, err)
222222
assert.Equal(t, expectedIndexName, indexName) // same index used
223223
assertIndexerStats(t, ix1, 10, 1)
@@ -238,9 +238,9 @@ func TestContacts(t *testing.T) {
238238
require.NoError(t, err)
239239

240240
// and simulate another indexer doing a parallel rebuild
241-
ix2 := indexers.NewContactIndexer(elasticURL, aliasName, 4)
241+
ix2 := indexers.NewContactIndexer(elasticURL, aliasName, 2, 1, 4)
242242

243-
indexName2, err := ix2.Index(db, true, false, 2, 1)
243+
indexName2, err := ix2.Index(db, true, false)
244244
assert.NoError(t, err)
245245
assert.Equal(t, expectedIndexName+"_1", indexName2) // new index used
246246
assertIndexerStats(t, ix2, 8, 0)
@@ -254,8 +254,8 @@ func TestContacts(t *testing.T) {
254254
assertQuery(t, es, elastic.NewMatchQuery("name", "eric"), []int64{2})
255255

256256
// simulate another indexer doing a parallel rebuild with cleanup
257-
ix3 := indexers.NewContactIndexer(elasticURL, aliasName, 4)
258-
indexName3, err := ix3.Index(db, true, true, 2, 1)
257+
ix3 := indexers.NewContactIndexer(elasticURL, aliasName, 2, 1, 4)
258+
indexName3, err := ix3.Index(db, true, true)
259259
assert.NoError(t, err)
260260
assert.Equal(t, expectedIndexName+"_2", indexName3) // new index used
261261
assertIndexerStats(t, ix3, 8, 0)
@@ -264,7 +264,7 @@ func TestContacts(t *testing.T) {
264264
assertIndexesWithPrefix(t, es, aliasName, []string{expectedIndexName + "_2"})
265265

266266
// check that the original indexer now indexes against the new index
267-
indexName, err = ix1.Index(db, false, false, 2, 1)
267+
indexName, err = ix1.Index(db, false, false)
268268
assert.NoError(t, err)
269269
assert.Equal(t, expectedIndexName+"_2", indexName)
270270
}

0 commit comments

Comments
 (0)