Skip to content

Commit 659a0cb

Browse files
committed
add cleanup option to remove old indexes that are no longer used
1 parent e05846c commit 659a0cb

File tree

3 files changed

+66
-1
lines changed

3 files changed

+66
-1
lines changed

Diff for: cmd/rp-indexer/main.go

+11
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ type config struct {
1818
Index string `help:"the alias for our contact index"`
1919
Poll int `help:"the number of seconds to wait between checking for updated contacts"`
2020
Rebuild bool `help:"whether to rebuild the index, swapping it when complete, then exiting (default false)"`
21+
Cleanup bool `help:"whether to remove old indexes after a rebuild"`
2122
LogLevel string `help:"the log level, one of error, warn, info, debug"`
2223
SentryDSN string `help:"the sentry configuration to log errors to, if any"`
2324
}
@@ -29,6 +30,7 @@ func main() {
2930
Index: "contacts",
3031
Poll: 5,
3132
Rebuild: false,
33+
Cleanup: false,
3234
LogLevel: "info",
3335
}
3436
loader := ezconf.NewLoader(&config, "indexer", "Indexes RapidPro contacts to ElasticSearch", []string{"indexer.toml"})
@@ -113,6 +115,15 @@ func main() {
113115
remapAlias = false
114116
}
115117

118+
// cleanup our aliases if appropriate
119+
if config.Cleanup {
120+
err := indexer.CleanupIndexes(config.ElasticURL, config.Index)
121+
if err != nil {
122+
logError(config.Rebuild, err, "error cleaning up aliases")
123+
continue
124+
}
125+
}
126+
116127
if config.Rebuild {
117128
os.Exit(0)
118129
}

Diff for: indexer.go

+38
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,37 @@ func FindPhysicalIndexes(url string, alias string) []string {
9797
return indexes
9898
}
9999

100+
// CleanupIndexes remover all indexes that are older than the currently active index
101+
func CleanupIndexes(url string, alias string) error {
102+
// find our current indexes
103+
currents := FindPhysicalIndexes(url, alias)
104+
105+
// no current indexes? this a noop
106+
if len(currents) == 0 {
107+
return nil
108+
}
109+
110+
// find all the current indexer
111+
healthResponse := healthResponse{}
112+
_, err := MakeJSONRequest(http.MethodGet, fmt.Sprintf("%s/%s", url, "_cluster/health?level=indices"), "", &healthResponse)
113+
if err != nil {
114+
return err
115+
}
116+
117+
// for each active index, if it starts with our alias but is before our current index, remove it
118+
for key := range healthResponse.Indices {
119+
if strings.HasPrefix(key, alias) && strings.Compare(key, currents[0]) < 0 {
120+
log.WithField("index", key).Info("removing old index")
121+
_, err = MakeJSONRequest(http.MethodDelete, fmt.Sprintf("%s/%s", url, key), "", nil)
122+
if err != nil {
123+
return err
124+
}
125+
}
126+
}
127+
128+
return nil
129+
}
130+
100131
// MakeJSONRequest is a utility function to make a JSON request, optionally decoding the response into the passed in struct
101132
func MakeJSONRequest(method string, url string, body string, jsonStruct interface{}) (*http.Response, error) {
102133
req, _ := http.NewRequest(method, url, bytes.NewReader([]byte(body)))
@@ -589,5 +620,12 @@ type indexResponse struct {
589620
} `json:"items"`
590621
}
591622

623+
// our response for our index health
624+
type healthResponse struct {
625+
Indices map[string]struct {
626+
Status string `json:"status"`
627+
} `json:"indices"`
628+
}
629+
592630
// our response for figuring out the physical index for an alias
593631
type infoResponse map[string]interface{}

Diff for: indexer_test.go

+17-1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"fmt"
77
"io/ioutil"
88
"log"
9+
"net/http"
910
"os"
1011
"testing"
1112
"time"
@@ -244,9 +245,23 @@ func TestIndexing(t *testing.T) {
244245
// remap again
245246
err = MapIndexAlias(elasticURL, indexName, newIndex)
246247
assert.NoError(t, err)
247-
248248
time.Sleep(5 * time.Second)
249249

250+
// old index still around
251+
resp, err := http.Get(fmt.Sprintf("%s/%s", elasticURL, physicalName))
252+
assert.NoError(t, err)
253+
assert.Equal(t, resp.StatusCode, http.StatusOK)
254+
255+
// cleanup our indexes, will remove our original index
256+
err = CleanupIndexes(elasticURL, indexName)
257+
assert.NoError(t, err)
258+
259+
// old physical index should be gone
260+
resp, err = http.Get(fmt.Sprintf("%s/%s", elasticURL, physicalName))
261+
assert.NoError(t, err)
262+
assert.Equal(t, resp.StatusCode, http.StatusNotFound)
263+
264+
// new index still works
250265
assertQuery(t, client, newIndex, elastic.NewMatchQuery("name", "john"), []int64{5})
251266

252267
// update our database, removing one contact, updating another
@@ -267,4 +282,5 @@ func TestIndexing(t *testing.T) {
267282

268283
// 3 is no longer in our group
269284
assertQuery(t, client, indexName, elastic.NewMatchQuery("groups", "529bac39-550a-4d6f-817c-1833f3449007"), []int64{1})
285+
270286
}

0 commit comments

Comments
 (0)