From 5526523287783a02c374cb46287edaffee80424e Mon Sep 17 00:00:00 2001
From: Rowan Seymour <rowanseymour@gmail.com>
Date: Wed, 5 Jun 2024 14:10:58 -0500
Subject: [PATCH 1/6] Update CHANGELOG.md for v9.1.8

---
 CHANGELOG.md | 6 ++++++
 1 file changed, 6 insertions(+)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index 8cd433d..abae1c5 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,9 @@
+v9.1.8 (2024-06-05)
+-------------------------
+ * Update github actions versions
+ * Add healthcheck for elastic service in CI tests
+ * Update goreleaser config to v2
+
 v9.1.7 (2024-06-05)
 -------------------------
  * Remove multi-search-db CI testing because it's unreliable

From 33f791b2dcca8d1267ccdd62885e8ba91cbd965a Mon Sep 17 00:00:00 2001
From: Rowan Seymour <rowanseymour@gmail.com>
Date: Wed, 5 Jun 2024 15:38:59 -0500
Subject: [PATCH 2/6] Update README.md

---
 README.md | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/README.md b/README.md
index d79b474..0baa9d2 100644
--- a/README.md
+++ b/README.md
@@ -1,4 +1,4 @@
-# Indexer
+# 🗃️ Indexer
 
 [![Build Status](https://github.com/nyaruka/rp-indexer/workflows/CI/badge.svg)](https://github.com/nyaruka/rp-indexer/actions?query=workflow%3ACI) 
 [![codecov](https://codecov.io/gh/nyaruka/rp-indexer/branch/main/graph/badge.svg)](https://codecov.io/gh/nyaruka/rp-indexer) 

From 2db16bc338f8fa6f6162a28117d4607601254460 Mon Sep 17 00:00:00 2001
From: Rowan Seymour <rowanseymour@gmail.com>
Date: Mon, 10 Jun 2024 15:47:03 -0500
Subject: [PATCH 3/6] Add track_total_hits to GetESLastModified

---
 indexers/base.go | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/indexers/base.go b/indexers/base.go
index 348ea35..59305a9 100644
--- a/indexers/base.go
+++ b/indexers/base.go
@@ -326,7 +326,7 @@ func (i *baseIndexer) GetESLastModified(index string) (time.Time, error) {
 	_, err := utils.MakeJSONRequest(
 		http.MethodPost,
 		fmt.Sprintf("%s/%s/_search", i.elasticURL, index),
-		[]byte(`{ "sort": [{ "modified_on_mu": "desc" }], "_source": {"includes": ["modified_on", "id"]}, "size": 1}`),
+		[]byte(`{ "sort": [{ "modified_on_mu": "desc" }], "_source": {"includes": ["modified_on", "id"]}, "size": 1, "track_total_hits": false}`),
 		queryResponse,
 	)
 	if err != nil {

From 3340f1d30656faa2d8ffde7dcec3486917130318 Mon Sep 17 00:00:00 2001
From: Rowan Seymour <rowanseymour@gmail.com>
Date: Mon, 10 Jun 2024 16:31:55 -0500
Subject: [PATCH 4/6] Split up created and updated contacts in logging

---
 indexers/base.go     | 17 +++++++++++------
 indexers/contacts.go | 27 ++++++++++++++++-----------
 2 files changed, 27 insertions(+), 17 deletions(-)

diff --git a/indexers/base.go b/indexers/base.go
index 59305a9..244b9bb 100644
--- a/indexers/base.go
+++ b/indexers/base.go
@@ -267,20 +267,23 @@ type indexResponse struct {
 }
 
 // indexes the batch of contacts
-func (i *baseIndexer) indexBatch(index string, batch []byte) (int, int, error) {
+func (i *baseIndexer) indexBatch(index string, batch []byte) (int, int, int, error) {
 	response := indexResponse{}
 	indexURL := fmt.Sprintf("%s/%s/_bulk", i.elasticURL, index)
 
 	_, err := utils.MakeJSONRequest(http.MethodPut, indexURL, batch, &response)
 	if err != nil {
-		return 0, 0, err
+		return 0, 0, 0, err
 	}
 
-	createdCount, deletedCount, conflictedCount := 0, 0, 0
+	createdCount, updatedCount, deletedCount, conflictedCount := 0, 0, 0, 0
+
 	for _, item := range response.Items {
 		if item.Index.ID != "" {
 			slog.Debug("index response", "id", item.Index.ID, "status", item.Index.Status)
-			if item.Index.Status == 200 || item.Index.Status == 201 {
+			if item.Index.Status == 200 {
+				updatedCount++
+			} else if item.Index.Status == 201 {
 				createdCount++
 			} else if item.Index.Status == 409 {
 				conflictedCount++
@@ -298,8 +301,10 @@ func (i *baseIndexer) indexBatch(index string, batch []byte) (int, int, error) {
 			slog.Error("unparsed item in response")
 		}
 	}
-	slog.Debug("indexed batch", "created", createdCount, "deleted", deletedCount, "conflicted", conflictedCount)
-	return createdCount, deletedCount, nil
+
+	slog.Debug("indexed batch", "created", createdCount, "updated", updatedCount, "deleted", deletedCount, "conflicted", conflictedCount)
+
+	return createdCount, updatedCount, deletedCount, nil
 }
 
 // our response for finding the last modified document
diff --git a/indexers/contacts.go b/indexers/contacts.go
index 8a9aef7..32486e5 100644
--- a/indexers/contacts.go
+++ b/indexers/contacts.go
@@ -64,12 +64,12 @@ func (i *ContactIndexer) Index(db *sql.DB, rebuild, cleanup bool) (string, error
 
 	// now index our docs
 	start := time.Now()
-	indexed, deleted, err := i.indexModified(ctx, db, physicalIndex, lastModified.Add(-5*time.Second), rebuild)
+	created, updated, deleted, err := i.indexModified(ctx, db, physicalIndex, lastModified.Add(-5*time.Second), rebuild)
 	if err != nil {
 		return "", fmt.Errorf("error indexing documents: %w", err)
 	}
 
-	i.recordComplete(indexed, deleted, time.Since(start))
+	i.recordComplete(created+updated, deleted, time.Since(start))
 
 	// if the index didn't previously exist or we are rebuilding, remap to our alias
 	if remapAlias {
@@ -153,8 +153,8 @@ SELECT org_id, id, modified_on, is_active, row_to_json(t) FROM (
 `
 
 // IndexModified queries and indexes all contacts with a lastModified greater than or equal to the passed in time
-func (i *ContactIndexer) indexModified(ctx context.Context, db *sql.DB, index string, lastModified time.Time, rebuild bool) (int, int, error) {
-	totalFetched, totalCreated, totalDeleted := 0, 0, 0
+func (i *ContactIndexer) indexModified(ctx context.Context, db *sql.DB, index string, lastModified time.Time, rebuild bool) (int, int, int, error) {
+	totalFetched, totalCreated, totalUpdated, totalDeleted := 0, 0, 0, 0
 
 	var modifiedOn time.Time
 	var contactJSON string
@@ -168,18 +168,20 @@ func (i *ContactIndexer) indexModified(ctx context.Context, db *sql.DB, index st
 		batchStart := time.Now()        // start time for this batch
 		batchFetched := 0               // contacts fetched in this batch
 		batchCreated := 0               // contacts created in ES
+		batchUpdated := 0               // contacts updated in ES
 		batchDeleted := 0               // contacts deleted in ES
 		batchESTime := time.Duration(0) // time spent indexing for this batch
 
 		indexSubBatch := func(b *bytes.Buffer) error {
 			t := time.Now()
-			created, deleted, err := i.indexBatch(index, b.Bytes())
+			created, updated, deleted, err := i.indexBatch(index, b.Bytes())
 			if err != nil {
 				return err
 			}
 
 			batchESTime += time.Since(t)
 			batchCreated += created
+			batchUpdated += updated
 			batchDeleted += deleted
 			b.Reset()
 			return nil
@@ -191,17 +193,17 @@ func (i *ContactIndexer) indexModified(ctx context.Context, db *sql.DB, index st
 
 		// no more rows? return
 		if err == sql.ErrNoRows {
-			return 0, 0, nil
+			return 0, 0, 0, nil
 		}
 		if err != nil {
-			return 0, 0, err
+			return 0, 0, 0, err
 		}
 		defer rows.Close()
 
 		for rows.Next() {
 			err = rows.Scan(&orgID, &id, &modifiedOn, &isActive, &contactJSON)
 			if err != nil {
-				return 0, 0, err
+				return 0, 0, 0, err
 			}
 
 			batchFetched++
@@ -224,14 +226,14 @@ func (i *ContactIndexer) indexModified(ctx context.Context, db *sql.DB, index st
 			// write to elastic search in batches
 			if batchFetched%i.batchSize == 0 {
 				if err := indexSubBatch(subBatch); err != nil {
-					return 0, 0, err
+					return 0, 0, 0, err
 				}
 			}
 		}
 
 		if subBatch.Len() > 0 {
 			if err := indexSubBatch(subBatch); err != nil {
-				return 0, 0, err
+				return 0, 0, 0, err
 			}
 		}
 
@@ -239,6 +241,7 @@ func (i *ContactIndexer) indexModified(ctx context.Context, db *sql.DB, index st
 
 		totalFetched += batchFetched
 		totalCreated += batchCreated
+		totalUpdated += batchUpdated
 		totalDeleted += batchDeleted
 
 		totalTime := time.Since(start)
@@ -249,10 +252,12 @@ func (i *ContactIndexer) indexModified(ctx context.Context, db *sql.DB, index st
 			"rate", batchRate,
 			"batch_fetched", batchFetched,
 			"batch_created", batchCreated,
+			"batch_updated", batchUpdated,
 			"batch_elapsed", batchTime,
 			"batch_elapsed_es", batchESTime,
 			"total_fetched", totalFetched,
 			"total_created", totalCreated,
+			"total_updated", totalUpdated,
 			"total_elapsed", totalTime,
 		)
 
@@ -269,7 +274,7 @@ func (i *ContactIndexer) indexModified(ctx context.Context, db *sql.DB, index st
 		}
 	}
 
-	return totalCreated, totalDeleted, nil
+	return totalCreated, totalUpdated, totalDeleted, nil
 }
 
 func (i *ContactIndexer) GetDBLastModified(ctx context.Context, db *sql.DB) (time.Time, error) {

From 491961a29929bb55c39841fe14a8fb13a77eca97 Mon Sep 17 00:00:00 2001
From: Rowan Seymour <rowanseymour@gmail.com>
Date: Mon, 10 Jun 2024 16:52:11 -0500
Subject: [PATCH 5/6] Record stats inside indexing batch loop

---
 indexers/base.go     |  6 +++---
 indexers/contacts.go | 21 ++++++++++-----------
 2 files changed, 13 insertions(+), 14 deletions(-)

diff --git a/indexers/base.go b/indexers/base.go
index 244b9bb..6737329 100644
--- a/indexers/base.go
+++ b/indexers/base.go
@@ -24,7 +24,7 @@ const deleteCommand = `{ "delete" : { "_id": %d, "version": %d, "version_type":
 type Stats struct {
 	Indexed int64         // total number of documents indexed
 	Deleted int64         // total number of documents deleted
-	Elapsed time.Duration // total time spent actually indexing
+	Elapsed time.Duration // total time spent actually indexing (excludes poll delay)
 }
 
 // Indexer is base interface for indexers
@@ -84,8 +84,8 @@ func (i *baseIndexer) log() *slog.Logger {
 	return slog.With("indexer", i.name)
 }
 
-// records a complete index and updates statistics
-func (i *baseIndexer) recordComplete(indexed, deleted int, elapsed time.Duration) {
+// records indexing activity and updates statistics
+func (i *baseIndexer) recordActivity(indexed, deleted int, elapsed time.Duration) {
 	i.stats.Indexed += int64(indexed)
 	i.stats.Deleted += int64(deleted)
 	i.stats.Elapsed += elapsed
diff --git a/indexers/contacts.go b/indexers/contacts.go
index 32486e5..e15520d 100644
--- a/indexers/contacts.go
+++ b/indexers/contacts.go
@@ -63,14 +63,11 @@ func (i *ContactIndexer) Index(db *sql.DB, rebuild, cleanup bool) (string, error
 	i.log().Debug("indexing newer than last modified", "index", physicalIndex, "last_modified", lastModified)
 
 	// now index our docs
-	start := time.Now()
-	created, updated, deleted, err := i.indexModified(ctx, db, physicalIndex, lastModified.Add(-5*time.Second), rebuild)
+	err = i.indexModified(ctx, db, physicalIndex, lastModified.Add(-5*time.Second), rebuild)
 	if err != nil {
 		return "", fmt.Errorf("error indexing documents: %w", err)
 	}
 
-	i.recordComplete(created+updated, deleted, time.Since(start))
-
 	// if the index didn't previously exist or we are rebuilding, remap to our alias
 	if remapAlias {
 		err := i.updateAlias(physicalIndex)
@@ -153,7 +150,7 @@ SELECT org_id, id, modified_on, is_active, row_to_json(t) FROM (
 `
 
 // IndexModified queries and indexes all contacts with a lastModified greater than or equal to the passed in time
-func (i *ContactIndexer) indexModified(ctx context.Context, db *sql.DB, index string, lastModified time.Time, rebuild bool) (int, int, int, error) {
+func (i *ContactIndexer) indexModified(ctx context.Context, db *sql.DB, index string, lastModified time.Time, rebuild bool) error {
 	totalFetched, totalCreated, totalUpdated, totalDeleted := 0, 0, 0, 0
 
 	var modifiedOn time.Time
@@ -193,17 +190,17 @@ func (i *ContactIndexer) indexModified(ctx context.Context, db *sql.DB, index st
 
 		// no more rows? return
 		if err == sql.ErrNoRows {
-			return 0, 0, 0, nil
+			return nil
 		}
 		if err != nil {
-			return 0, 0, 0, err
+			return err
 		}
 		defer rows.Close()
 
 		for rows.Next() {
 			err = rows.Scan(&orgID, &id, &modifiedOn, &isActive, &contactJSON)
 			if err != nil {
-				return 0, 0, 0, err
+				return err
 			}
 
 			batchFetched++
@@ -226,14 +223,14 @@ func (i *ContactIndexer) indexModified(ctx context.Context, db *sql.DB, index st
 			// write to elastic search in batches
 			if batchFetched%i.batchSize == 0 {
 				if err := indexSubBatch(subBatch); err != nil {
-					return 0, 0, 0, err
+					return err
 				}
 			}
 		}
 
 		if subBatch.Len() > 0 {
 			if err := indexSubBatch(subBatch); err != nil {
-				return 0, 0, 0, err
+				return err
 			}
 		}
 
@@ -268,13 +265,15 @@ func (i *ContactIndexer) indexModified(ctx context.Context, db *sql.DB, index st
 			log.Debug("indexed contact batch")
 		}
 
+		i.recordActivity(batchCreated+batchUpdated, batchDeleted, time.Since(batchStart))
+
 		// last modified stayed the same and we didn't add anything, seen it all, break out
 		if lastModified.Equal(queryModified) && batchCreated == 0 {
 			break
 		}
 	}
 
-	return totalCreated, totalUpdated, totalDeleted, nil
+	return nil
 }
 
 func (i *ContactIndexer) GetDBLastModified(ctx context.Context, db *sql.DB) (time.Time, error) {

From 2f150c0c7cb33e233cfb6d1e25515e68562b12f8 Mon Sep 17 00:00:00 2001
From: Rowan Seymour <rowanseymour@gmail.com>
Date: Mon, 10 Jun 2024 16:58:04 -0500
Subject: [PATCH 6/6] Update CHANGELOG.md for v9.1.9

---
 CHANGELOG.md | 6 ++++++
 1 file changed, 6 insertions(+)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index abae1c5..6efa67f 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,9 @@
+v9.1.9 (2024-06-10)
+-------------------------
+ * Record stats inside indexing batch loop
+ * Split up created vs updated in progress logging
+ * Add track_total_hits to GetESLastModified
+
 v9.1.8 (2024-06-05)
 -------------------------
  * Update github actions versions