Skip to content

Commit

Permalink
Merge pull request #8 from nyaruka/add-cleanup
Browse files Browse the repository at this point in the history
add cleanup option to remove old indexes that are no longer used
  • Loading branch information
nicpottier authored Apr 23, 2018
2 parents e05846c + 9f2fcf7 commit 8672030
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 1 deletion.
11 changes: 11 additions & 0 deletions cmd/rp-indexer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type config struct {
Index string `help:"the alias for our contact index"`
Poll int `help:"the number of seconds to wait between checking for updated contacts"`
Rebuild bool `help:"whether to rebuild the index, swapping it when complete, then exiting (default false)"`
Cleanup bool `help:"whether to remove old indexes after a rebuild"`
LogLevel string `help:"the log level, one of error, warn, info, debug"`
SentryDSN string `help:"the sentry configuration to log errors to, if any"`
}
Expand All @@ -29,6 +30,7 @@ func main() {
Index: "contacts",
Poll: 5,
Rebuild: false,
Cleanup: false,
LogLevel: "info",
}
loader := ezconf.NewLoader(&config, "indexer", "Indexes RapidPro contacts to ElasticSearch", []string{"indexer.toml"})
Expand Down Expand Up @@ -113,6 +115,15 @@ func main() {
remapAlias = false
}

// cleanup our aliases if appropriate
if config.Cleanup {
err := indexer.CleanupIndexes(config.ElasticURL, config.Index)
if err != nil {
logError(config.Rebuild, err, "error cleaning up aliases")
continue
}
}

if config.Rebuild {
os.Exit(0)
}
Expand Down
38 changes: 38 additions & 0 deletions indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,37 @@ func FindPhysicalIndexes(url string, alias string) []string {
return indexes
}

// CleanupIndexes removes all indexes that are older than the currently active index
func CleanupIndexes(url string, alias string) error {
// find our current indexes
currents := FindPhysicalIndexes(url, alias)

// no current indexes? this a noop
if len(currents) == 0 {
return nil
}

// find all the current indexes
healthResponse := healthResponse{}
_, err := MakeJSONRequest(http.MethodGet, fmt.Sprintf("%s/%s", url, "_cluster/health?level=indices"), "", &healthResponse)
if err != nil {
return err
}

// for each active index, if it starts with our alias but is before our current index, remove it
for key := range healthResponse.Indices {
if strings.HasPrefix(key, alias) && strings.Compare(key, currents[0]) < 0 {
log.WithField("index", key).Info("removing old index")
_, err = MakeJSONRequest(http.MethodDelete, fmt.Sprintf("%s/%s", url, key), "", nil)
if err != nil {
return err
}
}
}

return nil
}

// MakeJSONRequest is a utility function to make a JSON request, optionally decoding the response into the passed in struct
func MakeJSONRequest(method string, url string, body string, jsonStruct interface{}) (*http.Response, error) {
req, _ := http.NewRequest(method, url, bytes.NewReader([]byte(body)))
Expand Down Expand Up @@ -589,5 +620,12 @@ type indexResponse struct {
} `json:"items"`
}

// our response for our index health
type healthResponse struct {
Indices map[string]struct {
Status string `json:"status"`
} `json:"indices"`
}

// our response for figuring out the physical index for an alias
type infoResponse map[string]interface{}
18 changes: 17 additions & 1 deletion indexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"io/ioutil"
"log"
"net/http"
"os"
"testing"
"time"
Expand Down Expand Up @@ -244,9 +245,23 @@ func TestIndexing(t *testing.T) {
// remap again
err = MapIndexAlias(elasticURL, indexName, newIndex)
assert.NoError(t, err)

time.Sleep(5 * time.Second)

// old index still around
resp, err := http.Get(fmt.Sprintf("%s/%s", elasticURL, physicalName))
assert.NoError(t, err)
assert.Equal(t, resp.StatusCode, http.StatusOK)

// cleanup our indexes, will remove our original index
err = CleanupIndexes(elasticURL, indexName)
assert.NoError(t, err)

// old physical index should be gone
resp, err = http.Get(fmt.Sprintf("%s/%s", elasticURL, physicalName))
assert.NoError(t, err)
assert.Equal(t, resp.StatusCode, http.StatusNotFound)

// new index still works
assertQuery(t, client, newIndex, elastic.NewMatchQuery("name", "john"), []int64{5})

// update our database, removing one contact, updating another
Expand All @@ -267,4 +282,5 @@ func TestIndexing(t *testing.T) {

// 3 is no longer in our group
assertQuery(t, client, indexName, elastic.NewMatchQuery("groups", "529bac39-550a-4d6f-817c-1833f3449007"), []int64{1})

}

0 comments on commit 8672030

Please sign in to comment.