Skip to content

Commit 00d5f46

Browse files
committed
fix indexer getting stalled if there are more than 500 contacts with same modified_on
1 parent 6a953ef commit 00d5f46

File tree

2 files changed

+15
-15
lines changed

2 files changed

+15
-15
lines changed

Diff for: cmd/rp-indexer/main.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ func main() {
9696
log.WithField("last_modified", lastModified).WithField("index", physicalIndex).Info("indexing contacts newer than last modified")
9797

9898
// now index our docs
99-
indexed, deleted, err := indexer.IndexContacts(db, config.ElasticURL, physicalIndex, lastModified)
99+
indexed, deleted, err := indexer.IndexContacts(db, config.ElasticURL, physicalIndex, lastModified.Add(-5*time.Second))
100100
if err != nil {
101101
logError(config.Rebuild, err, "error indexing contacts")
102102
continue

Diff for: indexer.go

+14-14
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,6 @@ func IndexBatch(elasticURL string, index string, batch string) (int, int, error)
180180
func IndexContacts(db *sql.DB, elasticURL string, index string, lastModified time.Time) (int, int, error) {
181181
batch := strings.Builder{}
182182
createdCount, deletedCount := 0, 0
183-
processedCount := 0
184183

185184
if index == "" {
186185
return createdCount, deletedCount, fmt.Errorf("empty index passed to IndexContacts")
@@ -194,10 +193,11 @@ func IndexContacts(db *sql.DB, elasticURL string, index string, lastModified tim
194193
start := time.Now()
195194

196195
for {
197-
batchCount := 0
198-
batchModified := lastModified
196+
rows, err := db.Query(contactQuery, lastModified)
199197

200-
rows, err := db.Query(contactQuery, lastModified.Add(time.Second*-5))
198+
queryCreated := 0
199+
queryCount := 0
200+
queryModified := lastModified
201201

202202
// no more rows? return
203203
if err == sql.ErrNoRows {
@@ -214,7 +214,8 @@ func IndexContacts(db *sql.DB, elasticURL string, index string, lastModified tim
214214
return 0, 0, err
215215
}
216216

217-
processedCount++
217+
queryCount++
218+
lastModified = modifiedOn
218219

219220
if isActive {
220221
log.WithField("id", id).WithField("modifiedOn", modifiedOn).WithField("contact", contactJSON).Debug("modified contact")
@@ -228,40 +229,39 @@ func IndexContacts(db *sql.DB, elasticURL string, index string, lastModified tim
228229
batch.WriteString("\n")
229230
}
230231

231-
// write to elastic search in batches of 500
232-
if processedCount%batchSize == 0 {
232+
// write to elastic search in batches
233+
if queryCount%batchSize == 0 {
233234
created, deleted, err := IndexBatch(elasticURL, index, batch.String())
234235
if err != nil {
235236
return 0, 0, err
236237
}
237238
batch.Reset()
238239

240+
queryCreated += created
239241
createdCount += created
240242
deletedCount += deleted
241-
batchCount += created
242243
}
243-
244-
lastModified = modifiedOn
245244
}
246245

247246
if batch.Len() > 0 {
248247
created, deleted, err := IndexBatch(elasticURL, index, batch.String())
249248
if err != nil {
250249
return 0, 0, err
251250
}
251+
252+
queryCreated += created
252253
createdCount += created
253254
deletedCount += deleted
254-
batchCount += created
255255
batch.Reset()
256256
}
257257

258-
// didn't add anything in this batch and our last modified stayed the same, seen it all, break out
259-
if batchCount == 0 && lastModified.Equal(batchModified) {
258+
// last modified stayed the same and we didn't add anything, seen it all, break out
259+
if lastModified.Equal(queryModified) && queryCreated == 0 {
260260
break
261261
}
262262

263263
elapsed := time.Now().Sub(start)
264-
rate := float32(processedCount) / (float32(elapsed) / float32(time.Second))
264+
rate := float32(queryCount) / (float32(elapsed) / float32(time.Second))
265265
log.WithFields(map[string]interface{}{
266266
"rate": int(rate),
267267
"added": createdCount,

0 commit comments

Comments
 (0)