diff --git a/cmd/rp-indexer/main.go b/cmd/rp-indexer/main.go index 7129c06..021bea1 100644 --- a/cmd/rp-indexer/main.go +++ b/cmd/rp-indexer/main.go @@ -8,7 +8,7 @@ import ( "github.com/evalphobia/logrus_sentry" _ "github.com/lib/pq" "github.com/nyaruka/ezconf" - indexer "github.com/nyaruka/rp-indexer" + "github.com/nyaruka/rp-indexer/indexers" log "github.com/sirupsen/logrus" ) @@ -54,7 +54,7 @@ func main() { hook.StacktraceConfiguration.Skip = 4 hook.StacktraceConfiguration.Context = 5 if err != nil { - log.Fatalf("Invalid sentry DSN: '%s': %s", config.SentryDSN, err) + log.Fatalf("invalid sentry DSN: '%s': %s", config.SentryDSN, err) } log.StandardLogger().Hooks.Add(hook) } @@ -64,66 +64,20 @@ func main() { log.Fatal(err) } - for { - // find our physical index - physicalIndexes := indexer.FindPhysicalIndexes(config.ElasticURL, config.Index) - log.WithField("physicalIndexes", physicalIndexes).WithField("index", config.Index).Debug("found physical indexes") - - physicalIndex := "" - if len(physicalIndexes) > 0 { - physicalIndex = physicalIndexes[0] - } - - // whether we need to remap our alias after building - remapAlias := false - - // doesn't exist or we are rebuilding, create it - if physicalIndex == "" || config.Rebuild { - physicalIndex, err = indexer.CreateNewIndex(config.ElasticURL, config.Index) - if err != nil { - logError(config.Rebuild, err, "error creating new index") - continue - } - log.WithField("index", config.Index).WithField("physicalIndex", physicalIndex).Info("created new physical index") - remapAlias = true - } - - lastModified, err := indexer.GetLastModified(config.ElasticURL, physicalIndex) - if err != nil { - logError(config.Rebuild, err, "error finding last modified") - continue - } + ci := indexers.NewContactIndexer(config.ElasticURL, config.Index, 500) - start := time.Now() - log.WithField("last_modified", lastModified).WithField("index", physicalIndex).Info("indexing contacts newer than last modified") + for { + _, err := ci.Index(db, config.Rebuild, config.Cleanup) - // now index our docs - indexed, deleted, err := indexer.IndexContacts(db, config.ElasticURL, physicalIndex, lastModified.Add(-5*time.Second)) if err != nil { - logError(config.Rebuild, err, "error indexing contacts") - continue - } - log.WithField("added", indexed).WithField("deleted", deleted).WithField("index", physicalIndex).WithField("elapsed", time.Now().Sub(start)).Info("completed indexing") - - // if the index didn't previously exist or we are rebuilding, remap to our alias - if remapAlias { - err := indexer.MapIndexAlias(config.ElasticURL, config.Index, physicalIndex) - if err != nil { - logError(config.Rebuild, err, "error remapping alias") - continue - } - remapAlias = false - } - - // cleanup our aliases if appropriate - if config.Cleanup { - err := indexer.CleanupIndexes(config.ElasticURL, config.Index) - if err != nil { - logError(config.Rebuild, err, "error cleaning up aliases") - continue + if config.Rebuild { + log.WithField("index", config.Index).WithError(err).Fatal("error during rebuilding") + } else { + log.WithField("index", config.Index).WithError(err).Error("error during indexing") } } + // if we were rebuilding then we're done if config.Rebuild { os.Exit(0) } @@ -132,12 +86,3 @@ func main() { time.Sleep(time.Second * 5) } } - -func logError(fatal bool, err error, msg string) { - if fatal { - log.WithError(err).Fatal(msg) - } else { - log.WithError(err).Error(msg) - time.Sleep(time.Second * 5) - } -} diff --git a/contacts/query.go b/contacts/query.go deleted file mode 100644 index d909a02..0000000 --- a/contacts/query.go +++ /dev/null @@ -1,74 +0,0 @@ -package contacts - -import ( - "database/sql" - "time" -) - -const sqlSelectModified = ` -SELECT org_id, id, modified_on, is_active, row_to_json(t) FROM ( - SELECT - id, - org_id, - uuid, - name, - language, - status, - ticket_count AS tickets, - is_active, - created_on, - modified_on, - last_seen_on, - EXTRACT(EPOCH FROM modified_on) * 1000000 AS modified_on_mu, - ( - SELECT array_to_json(array_agg(row_to_json(u))) - FROM (SELECT scheme, path FROM contacts_contacturn WHERE contact_id = contacts_contact.id) u - ) AS urns, - ( - SELECT jsonb_agg(f.value) - FROM ( - SELECT - CASE - WHEN value ? 'ward' - THEN jsonb_build_object('ward_keyword', trim(substring(value ->> 'ward' from '(?!.* > )([^>]+)'))) - ELSE '{}'::jsonb - END || district_value.value AS value - FROM ( - SELECT - CASE - WHEN value ? 'district' - THEN jsonb_build_object('district_keyword', trim(substring(value ->> 'district' from '(?!.* > )([^>]+)'))) - ELSE '{}'::jsonb - END || state_value.value as value - FROM ( - SELECT - CASE - WHEN value ? 'state' - THEN jsonb_build_object('state_keyword', trim(substring(value ->> 'state' from '(?!.* > )([^>]+)'))) - ELSE '{}' :: jsonb - END || jsonb_build_object('field', key) || value as value - FROM jsonb_each(contacts_contact.fields) - ) state_value - ) AS district_value - ) AS f - ) AS fields, - ( - SELECT array_to_json(array_agg(g.uuid)) FROM ( - SELECT contacts_contactgroup.uuid - FROM contacts_contactgroup_contacts, contacts_contactgroup - WHERE contact_id = contacts_contact.id AND contacts_contactgroup_contacts.contactgroup_id = contacts_contactgroup.id - ) g - ) AS groups, - ( - SELECT f.uuid FROM flows_flow f WHERE f.id = contacts_contact.current_flow_id - ) AS flow - FROM contacts_contact - WHERE modified_on >= $1 - ORDER BY modified_on ASC - LIMIT 500000 -) t; -` - -func FetchModified(db *sql.DB, lastModified time.Time) (*sql.Rows, error) { - return db.Query(sqlSelectModified, lastModified) -} diff --git a/contacts/settings.go b/contacts/settings.go deleted file mode 100644 index 4bb3608..0000000 --- a/contacts/settings.go +++ /dev/null @@ -1,10 +0,0 @@ -package contacts - -import ( - _ "embed" - "encoding/json" -) - -// settings and mappings for our index -//go:embed index_settings.json -var IndexSettings json.RawMessage diff --git a/go.mod b/go.mod index c40cbc7..fcfb589 100644 --- a/go.mod +++ b/go.mod @@ -1,17 +1,32 @@ module github.com/nyaruka/rp-indexer require ( - github.com/certifi/gocertifi v0.0.0-20180118203423-deb3ae2ef261 // indirect github.com/evalphobia/logrus_sentry v0.4.5 - github.com/getsentry/raven-go v0.0.0-20180405121644-d1470f50d3a3 // indirect - github.com/kylelemons/godebug v1.1.0 // indirect - github.com/lib/pq v0.0.0-20180327071824-d34b9ff171c2 + github.com/lib/pq v1.10.4 github.com/nyaruka/ezconf v0.2.1 - github.com/nyaruka/gocommon v1.3.0 + github.com/nyaruka/gocommon v1.17.1 github.com/olivere/elastic/v7 v7.0.22 + github.com/pkg/errors v0.9.1 github.com/sirupsen/logrus v1.8.1 - github.com/stretchr/testify v1.5.1 + github.com/stretchr/testify v1.7.0 +) + +require ( + github.com/certifi/gocertifi v0.0.0-20180118203423-deb3ae2ef261 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/fatih/structs v1.0.0 // indirect + github.com/getsentry/raven-go v0.0.0-20180405121644-d1470f50d3a3 // indirect + github.com/go-chi/chi v4.1.2+incompatible // indirect + github.com/josharian/intern v1.0.0 // indirect + github.com/kylelemons/godebug v1.1.0 // indirect + github.com/mailru/easyjson v0.7.6 // indirect + github.com/naoina/go-stringutil v0.1.0 // indirect + github.com/naoina/toml v0.1.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/shopspring/decimal v1.2.0 // indirect + golang.org/x/net v0.0.0-20210614182718-04defd469f4e // indirect golang.org/x/sys v0.0.0-20220319134239-a9b59b0215f8 // indirect + gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect ) -go 1.16 +go 1.17 diff --git a/go.sum b/go.sum index 18f5213..f78ba21 100644 --- a/go.sum +++ b/go.sum @@ -1,6 +1,7 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/aws/aws-sdk-go v1.35.20/go.mod h1:tlPOdRjfxPBpNIwqDj61rmsnA85v9jc0Ps9+muhnW+k= +github.com/aws/aws-sdk-go v1.40.56/go.mod h1:585smgzpB/KqRA+K3y/NL/oYRqQvpNJYvLm+LY1U59Q= github.com/certifi/gocertifi v0.0.0-20180118203423-deb3ae2ef261 h1:6/yVvBsKeAw05IUj4AzvrxaCnDjN4nUqKjW9+w5wixg= github.com/certifi/gocertifi v0.0.0-20180118203423-deb3ae2ef261/go.mod h1:GJKEexRPVJrBSOjoqN5VNOIKJ5Q3RViH6eu3puDRwx4= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= @@ -15,34 +16,44 @@ github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8 github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= github.com/getsentry/raven-go v0.0.0-20180405121644-d1470f50d3a3 h1:md1zEr2oSVWYNfQj+6TL/nmAFf5gY3Tp44lzskzK9QU= github.com/getsentry/raven-go v0.0.0-20180405121644-d1470f50d3a3/go.mod h1:KungGk8q33+aIAZUIVWZDr2OfAEBsO49PX4NzFV5kcQ= +github.com/go-chi/chi v4.1.2+incompatible h1:fGFk2Gmi/YKXk0OmGfBh0WgmN3XB8lVnEyNz34tQRec= +github.com/go-chi/chi v4.1.2+incompatible/go.mod h1:eB3wogJHnLi3x/kFX2A+IbTBlXxmMeXJVKy9tTv1XzQ= +github.com/go-playground/locales v0.14.0/go.mod h1:sawfccIbzZTqEDETgFXqTho0QybSa7l++s0DH+LDiLs= +github.com/go-playground/universal-translator v0.18.0/go.mod h1:UvRDBj+xPUEGrFYl+lu/H90nyDXpg0fqeB/AQUGNTVA= github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= +github.com/gofrs/uuid v3.3.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.5.2 h1:X2ev0eStA3AbceY54o37/0PQ/UWqKEiiO2dKL5OPaFM= github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= +github.com/jmoiron/sqlx v1.3.4/go.mod h1:2BljVx/86SuTyjE+aPYlHCTNvZrnJXghYGpNiXLBMCQ= github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= -github.com/lib/pq v0.0.0-20180327071824-d34b9ff171c2 h1:hRGSmZu7j271trc9sneMrpOW7GN5ngLm8YUZIPzf394= -github.com/lib/pq v0.0.0-20180327071824-d34b9ff171c2/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= +github.com/leodido/go-urn v1.2.1/go.mod h1:zt4jvISO2HfUBqxjfIshjdMTYS56ZS/qv49ictyFfxY= +github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= +github.com/lib/pq v1.10.4 h1:SO9z7FRPzA03QhHKJrH5BXA6HU1rS4V2nIVrrNC1iYk= +github.com/lib/pq v1.10.4/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/mailru/easyjson v0.7.6 h1:8yTIVnZgCoiM1TgqoeTl+LfU5Jg6/xL3QhGQnimLYnA= github.com/mailru/easyjson v0.7.6/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= +github.com/mattn/go-sqlite3 v1.14.6/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU= github.com/naoina/go-stringutil v0.1.0 h1:rCUeRUHjBjGTSHl0VC00jUPLz8/F9dDzYI70Hzifhks= github.com/naoina/go-stringutil v0.1.0/go.mod h1:XJ2SJL9jCtBh+P9q5btrd/Ylo8XwT/h1USek5+NqSA0= github.com/naoina/toml v0.1.1 h1:PT/lllxVVN0gzzSqSlHEmP8MJB4MY2U7STGxiouV4X8= github.com/naoina/toml v0.1.1/go.mod h1:NBIhNtsFMo3G2szEBne+bO4gS192HuIYRqfvOWb4i1E= github.com/nyaruka/ezconf v0.2.1 h1:TDXWoqjqYya1uhou1mAJZg7rgFYL98EB0Tb3+BWtUh0= github.com/nyaruka/ezconf v0.2.1/go.mod h1:ey182kYkw2MIi4XiWe1FR/mzI33WCmTWuceDYYxgnQw= -github.com/nyaruka/gocommon v1.3.0 h1:IqaPT4KQ2oVq/2Ivp/c+RVCs8v71+RzPU2VhMoRrgpU= -github.com/nyaruka/gocommon v1.3.0/go.mod h1:w7lKxIkm/qLAoO9Y3aI1LV7EiYogn6+1C8MTEjxTC9M= -github.com/nyaruka/phonenumbers v1.0.34/go.mod h1:GQ0cTHlrxPrhoLwyQ1blyN1hO794ygt6FTHWrFB5SSc= +github.com/nyaruka/gocommon v1.17.1 h1:4bbNp+0/BIbne4VDiKOxh3kcbdvEu/WsrsZiG/VyRZ8= +github.com/nyaruka/gocommon v1.17.1/go.mod h1:nmYyb7MZDM0iW4DYJKiBzfKuE9nbnx+xSHZasuIBOT0= +github.com/nyaruka/phonenumbers v1.0.71/go.mod h1:sDaTZ/KPX5f8qyV9qN+hIm+4ZBARJrupC6LuhshJq1U= github.com/olivere/elastic/v7 v7.0.22 h1:esBA6JJwvYgfms0EVlH7Z+9J4oQ/WUADF2y/nCNDw7s= github.com/olivere/elastic/v7 v7.0.22/go.mod h1:VDexNy9NjmtAkrjNoI7tImv7FR4tf5zUA3ickqu5Pc8= github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc= @@ -61,8 +72,10 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= -github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= +github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -71,12 +84,12 @@ golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvx golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= -golang.org/x/net v0.0.0-20180921000356-2f5d2388922f/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= -golang.org/x/net v0.0.0-20200202094626-16171245cfb2 h1:CCH4IOTTfewWjGOlSp+zGcjutRKlBEZQ6wTn8ozI/nI= golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20210614182718-04defd469f4e h1:XpT3nA5TvE525Ne3hInMh6+GETgn27Zfm9dxsThnX2Q= +golang.org/x/net v0.0.0-20210614182718-04defd469f4e/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -85,10 +98,15 @@ golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190502145724-3ef323f4f1fd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20220319134239-a9b59b0215f8 h1:OH54vjqzRWmbJ62fjuhxy7AxFFgoHN0/DPc/UrL8cAs= golang.org/x/sys v0.0.0-20220319134239-a9b59b0215f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M= +golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= @@ -102,7 +120,10 @@ google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZi google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/go-playground/assert.v1 v1.2.1/go.mod h1:9RXL0bg/zibRAgZUYszZSwO/z8Y/a8bDuhia5mkpMnE= +gopkg.in/go-playground/validator.v9 v9.31.0/go.mod h1:+c9/zcJMFNgbLvly1L1V+PpxWdVbfP1avr/N00E2vyQ= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/indexer.go b/indexer.go deleted file mode 100644 index ce242f1..0000000 --- a/indexer.go +++ /dev/null @@ -1,309 +0,0 @@ -package indexer - -import ( - "bytes" - "database/sql" - _ "embed" - "encoding/json" - "fmt" - "net/http" - "sort" - "strings" - "time" - - "github.com/nyaruka/rp-indexer/contacts" - log "github.com/sirupsen/logrus" -) - -var batchSize = 500 - -// CreateNewIndex creates a new index for the passed in alias. -// -// Note that we do not create an index with the passed name, instead creating one -// based on the day, for example `contacts_2018_03_05`, then create an alias from -// that index to `contacts`. -// -// If the day-specific name already exists, we append a .1 or .2 to the name. -func CreateNewIndex(url string, alias string) (string, error) { - // create our day-specific name - physicalIndex := fmt.Sprintf("%s_%s", alias, time.Now().Format("2006_01_02")) - idx := 0 - - // check if it exists - for { - resp, err := http.Get(fmt.Sprintf("%s/%s", url, physicalIndex)) - if err != nil { - return "", err - } - // not found, great, move on - if resp.StatusCode == http.StatusNotFound { - break - } - - // was found, increase our index and try again - idx++ - physicalIndex = fmt.Sprintf("%s_%s_%d", alias, time.Now().Format("2006_01_02"), idx) - } - - // initialize our index - createURL := fmt.Sprintf("%s/%s?include_type_name=true", url, physicalIndex) - _, err := MakeJSONRequest(http.MethodPut, createURL, contacts.IndexSettings, nil) - if err != nil { - return "", err - } - - // all went well, return our physical index name - log.WithField("index", physicalIndex).Info("created index") - return physicalIndex, nil -} - -// GetLastModified queries our index and finds the last modified contact, returning it -func GetLastModified(url string, index string) (time.Time, error) { - lastModified := time.Time{} - if index == "" { - return lastModified, fmt.Errorf("empty index passed to GetLastModified") - } - - // get the newest document on our index - queryResponse := queryResponse{} - _, err := MakeJSONRequest(http.MethodPost, fmt.Sprintf("%s/%s/_search", url, index), lastModifiedQuery, &queryResponse) - if err != nil { - return lastModified, err - } - - if len(queryResponse.Hits.Hits) > 0 { - lastModified = queryResponse.Hits.Hits[0].Source.ModifiedOn - } - return lastModified, nil -} - -// FindPhysicalIndexes finds all the physical indexes for the passed in alias -func FindPhysicalIndexes(url string, alias string) []string { - indexResponse := infoResponse{} - _, err := MakeJSONRequest(http.MethodGet, fmt.Sprintf("%s/%s", url, alias), nil, &indexResponse) - indexes := make([]string, 0) - - // error could mean a variety of things, but we'll figure that out later - if err != nil { - return indexes - } - - // our top level key is our physical index name - for key := range indexResponse { - indexes = append(indexes, key) - } - - // reverse sort order should put our newest index first - sort.Sort(sort.Reverse(sort.StringSlice(indexes))) - return indexes -} - -// CleanupIndexes removes all indexes that are older than the currently active index -func CleanupIndexes(url string, alias string) error { - // find our current indexes - currents := FindPhysicalIndexes(url, alias) - - // no current indexes? this a noop - if len(currents) == 0 { - return nil - } - - // find all the current indexes - healthResponse := healthResponse{} - _, err := MakeJSONRequest(http.MethodGet, fmt.Sprintf("%s/%s", url, "_cluster/health?level=indices"), nil, &healthResponse) - if err != nil { - return err - } - - // for each active index, if it starts with our alias but is before our current index, remove it - for key := range healthResponse.Indices { - if strings.HasPrefix(key, alias) && strings.Compare(key, currents[0]) < 0 { - log.WithField("index", key).Info("removing old index") - _, err = MakeJSONRequest(http.MethodDelete, fmt.Sprintf("%s/%s", url, key), nil, nil) - if err != nil { - return err - } - } - } - - return nil -} - -// IndexBatch indexes the batch of contacts -func IndexBatch(elasticURL string, index string, batch []byte) (int, int, error) { - response := indexResponse{} - indexURL := fmt.Sprintf("%s/%s/_bulk", elasticURL, index) - - _, err := MakeJSONRequest(http.MethodPut, indexURL, batch, &response) - if err != nil { - return 0, 0, err - } - - createdCount, deletedCount, conflictedCount := 0, 0, 0 - for _, item := range response.Items { - if item.Index.ID != "" { - log.WithField("id", item.Index.ID).WithField("status", item.Index.Status).Debug("index response") - if item.Index.Status == 200 || item.Index.Status == 201 { - createdCount++ - } else if item.Index.Status == 409 { - conflictedCount++ - } else { - log.WithField("id", item.Index.ID).WithField("batch", batch).WithField("result", item.Index.Result).Error("error indexing contact") - } - } else if item.Delete.ID != "" { - log.WithField("id", item.Index.ID).WithField("status", item.Index.Status).Debug("delete response") - if item.Delete.Status == 200 { - deletedCount++ - } else if item.Delete.Status == 409 { - conflictedCount++ - } - } else { - log.Error("unparsed item in response") - } - } - log.WithField("created", createdCount).WithField("deleted", deletedCount).WithField("conflicted", conflictedCount).Debug("indexed batch") - - return createdCount, deletedCount, nil -} - -// IndexContacts queries and indexes all contacts with a lastModified greater than or equal to the passed in time -func IndexContacts(db *sql.DB, elasticURL string, index string, lastModified time.Time) (int, int, error) { - batch := &bytes.Buffer{} - createdCount, deletedCount, processedCount := 0, 0, 0 - - if index == "" { - return createdCount, deletedCount, fmt.Errorf("empty index passed to IndexContacts") - } - - var modifiedOn time.Time - var contactJSON string - var id, orgID int64 - var isActive bool - - start := time.Now() - - for { - rows, err := contacts.FetchModified(db, lastModified) - - queryCreated := 0 - queryCount := 0 - queryModified := lastModified - - // no more rows? return - if err == sql.ErrNoRows { - return 0, 0, nil - } - if err != nil { - return 0, 0, err - } - defer rows.Close() - - for rows.Next() { - err = rows.Scan(&orgID, &id, &modifiedOn, &isActive, &contactJSON) - if err != nil { - return 0, 0, err - } - - queryCount++ - processedCount++ - lastModified = modifiedOn - - if isActive { - log.WithField("id", id).WithField("modifiedOn", modifiedOn).WithField("contact", contactJSON).Debug("modified contact") - batch.WriteString(fmt.Sprintf(indexCommand, id, modifiedOn.UnixNano(), orgID)) - batch.WriteString("\n") - batch.WriteString(contactJSON) - batch.WriteString("\n") - } else { - log.WithField("id", id).WithField("modifiedOn", modifiedOn).Debug("deleted contact") - batch.WriteString(fmt.Sprintf(deleteCommand, id, modifiedOn.UnixNano(), orgID)) - batch.WriteString("\n") - } - - // write to elastic search in batches - if queryCount%batchSize == 0 { - created, deleted, err := IndexBatch(elasticURL, index, batch.Bytes()) - if err != nil { - return 0, 0, err - } - batch.Reset() - - queryCreated += created - createdCount += created - deletedCount += deleted - } - } - - if batch.Len() > 0 { - created, deleted, err := IndexBatch(elasticURL, index, batch.Bytes()) - if err != nil { - return 0, 0, err - } - - queryCreated += created - createdCount += created - deletedCount += deleted - batch.Reset() - } - - // last modified stayed the same and we didn't add anything, seen it all, break out - if lastModified.Equal(queryModified) && queryCreated == 0 { - break - } - - elapsed := time.Since(start) - rate := float32(processedCount) / (float32(elapsed) / float32(time.Second)) - log.WithFields(map[string]interface{}{ - "rate": int(rate), - "added": createdCount, - "deleted": deletedCount, - "elapsed": elapsed, - "index": index}).Info("updated contact index") - - rows.Close() - } - - return createdCount, deletedCount, nil -} - -// MapIndexAlias maps the passed in alias to the new physical index, optionally removing -// existing aliases if they exit. -func MapIndexAlias(elasticURL string, alias string, newIndex string) error { - commands := make([]interface{}, 0) - - // find existing physical indexes - existing := FindPhysicalIndexes(elasticURL, alias) - for _, idx := range existing { - remove := removeAliasCommand{} - remove.Remove.Alias = alias - remove.Remove.Index = idx - commands = append(commands, remove) - - log.WithField("index", idx).WithField("alias", alias).Info("removing old alias") - } - - // add our new index - add := addAliasCommand{} - add.Add.Alias = alias - add.Add.Index = newIndex - commands = append(commands, add) - - log.WithField("index", newIndex).WithField("alias", alias).Info("adding new alias") - - aliasURL := fmt.Sprintf("%s/_aliases", elasticURL) - aliasJSON, err := json.Marshal(aliasCommand{Actions: commands}) - if err != nil { - return err - } - _, err = MakeJSONRequest(http.MethodPost, aliasURL, aliasJSON, nil) - return err -} - -// gets our last modified contact -var lastModifiedQuery = []byte(`{ "sort": [{ "modified_on_mu": "desc" }]}`) - -// indexes a contact -const indexCommand = `{ "index": { "_id": %d, "_type": "_doc", "version": %d, "version_type": "external", "routing": %d} }` - -// deletes a contact -const deleteCommand = `{ "delete" : { "_id": %d, "_type": "_doc", "version": %d, "version_type": "external", "routing": %d} }` diff --git a/indexer_test.go b/indexer_test.go deleted file mode 100644 index 1a5af69..0000000 --- a/indexer_test.go +++ /dev/null @@ -1,386 +0,0 @@ -package indexer - -import ( - "context" - "database/sql" - "fmt" - "io/ioutil" - "log" - "net/http" - "net/http/httptest" - "os" - "testing" - "time" - - _ "github.com/lib/pq" - "github.com/olivere/elastic/v7" - "github.com/sirupsen/logrus" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -const elasticURL = "http://localhost:9200" -const indexName = "rp_elastic_test" - -func setup(t *testing.T) (*sql.DB, *elastic.Client) { - testDB, err := ioutil.ReadFile("testdb.sql") - require.NoError(t, err) - - db, err := sql.Open("postgres", "postgres://nyaruka:nyaruka@localhost:5432/elastic_test?sslmode=disable") - require.NoError(t, err) - - _, err = db.Exec(string(testDB)) - require.NoError(t, err) - - client, err := elastic.NewClient(elastic.SetURL(elasticURL), elastic.SetTraceLog(log.New(os.Stdout, "", log.LstdFlags)), elastic.SetSniff(false)) - require.NoError(t, err) - - existing := FindPhysicalIndexes(elasticURL, indexName) - for _, idx := range existing { - _, err = client.DeleteIndex(idx).Do(context.Background()) - require.NoError(t, err) - } - - logrus.SetLevel(logrus.DebugLevel) - - return db, client -} - -func assertQuery(t *testing.T, client *elastic.Client, index string, query elastic.Query, hits []int64) { - results, err := client.Search().Index(index).Query(query).Sort("id", true).Pretty(true).Do(context.Background()) - assert.NoError(t, err) - assert.Equal(t, int64(len(hits)), results.Hits.TotalHits.Value) - - if int64(len(hits)) == results.Hits.TotalHits.Value { - for i, hit := range results.Hits.Hits { - assert.Equal(t, fmt.Sprintf("%d", hits[i]), hit.Id) - } - } -} - -func TestIndexing(t *testing.T) { - batchSize = 4 - db, client := setup(t) - - physicalName, err := CreateNewIndex(elasticURL, indexName) - assert.NoError(t, err) - - added, deleted, err := IndexContacts(db, elasticURL, physicalName, time.Time{}) - assert.NoError(t, err) - assert.Equal(t, 9, added) - assert.Equal(t, 0, deleted) - - time.Sleep(2 * time.Second) - - assertQuery(t, client, physicalName, elastic.NewMatchQuery("name", "JOHn"), []int64{4}) - - // prefix on name matches both john and joanne, but no ajodi - assertQuery(t, client, physicalName, elastic.NewMatchQuery("name", "JO"), []int64{4, 6}) - assertQuery(t, client, physicalName, elastic.NewTermQuery("name.keyword", "JOHN DOE"), []int64{4}) - - // can search on both first and last name - boolQuery := elastic.NewBoolQuery().Must( - elastic.NewMatchQuery("name", "john"), - elastic.NewMatchQuery("name", "doe")) - assertQuery(t, client, physicalName, boolQuery, []int64{4}) - - // can search on a long name - assertQuery(t, client, physicalName, elastic.NewMatchQuery("name", "Ajodinabiff"), []int64{5}) - - assertQuery(t, client, physicalName, elastic.NewMatchQuery("language", "eng"), []int64{1}) - - // test contact, not indexed - assertQuery(t, client, physicalName, elastic.NewMatchQuery("language", "fra"), []int64{}) - - assertQuery(t, client, physicalName, elastic.NewMatchQuery("status", "B"), []int64{3}) - assertQuery(t, client, physicalName, elastic.NewMatchQuery("status", "S"), []int64{2}) - - assertQuery(t, client, physicalName, elastic.NewMatchQuery("org_id", "1"), []int64{1, 2, 3, 4}) - - assertQuery(t, client, physicalName, elastic.NewMatchQuery("tickets", 2), []int64{1}) - assertQuery(t, client, physicalName, elastic.NewMatchQuery("tickets", 1), []int64{2, 3}) - assertQuery(t, client, physicalName, elastic.NewRangeQuery("tickets").Gt(0), []int64{1, 2, 3}) - - assertQuery(t, client, physicalName, elastic.NewMatchQuery("flow", "6d3cf1eb-546e-4fb8-a5ca-69187648fbf6"), []int64{2, 3}) - assertQuery(t, client, physicalName, elastic.NewMatchQuery("flow", "4eea8ff1-4fe2-4ce5-92a4-0870a499973a"), []int64{4}) - - // created_on range query - assertQuery(t, client, physicalName, elastic.NewRangeQuery("created_on").Gt("2017-01-01"), []int64{1, 6, 8}) - - // last_seen_on range query - assertQuery(t, client, physicalName, elastic.NewRangeQuery("last_seen_on").Lt("2019-01-01"), []int64{3, 4}) - - // last_seen_on is set / not set queries - assertQuery(t, client, physicalName, elastic.NewExistsQuery("last_seen_on"), []int64{1, 2, 3, 4, 5, 6}) - assertQuery(t, client, physicalName, elastic.NewBoolQuery().MustNot(elastic.NewExistsQuery("last_seen_on")), []int64{7, 8, 9}) - - // urn query - query := elastic.NewNestedQuery("urns", elastic.NewBoolQuery().Must( - elastic.NewMatchQuery("urns.scheme", "facebook"), - elastic.NewMatchQuery("urns.path.keyword", "1000001"))) - assertQuery(t, client, physicalName, query, []int64{8}) - - // urn substring query - query = elastic.NewNestedQuery("urns", elastic.NewBoolQuery().Must( - elastic.NewMatchQuery("urns.scheme", "tel"), - elastic.NewMatchPhraseQuery("urns.path", "779"))) - assertQuery(t, client, physicalName, query, []int64{1, 2, 3, 6}) - - // urn substring query with more characters (77911) - query = elastic.NewNestedQuery("urns", elastic.NewBoolQuery().Must( - elastic.NewMatchQuery("urns.scheme", "tel"), - elastic.NewMatchPhraseQuery("urns.path", "77911"))) - assertQuery(t, client, physicalName, query, []int64{1}) - - // urn substring query with more characters (600055) - query = elastic.NewNestedQuery("urns", elastic.NewBoolQuery().Must( - elastic.NewMatchQuery("urns.scheme", "tel"), - elastic.NewMatchPhraseQuery("urns.path", "600055"))) - assertQuery(t, client, physicalName, query, []int64{5}) - - // match a contact with multiple tel urns - query = elastic.NewNestedQuery("urns", elastic.NewBoolQuery().Must( - elastic.NewMatchQuery("urns.scheme", "tel"), - elastic.NewMatchPhraseQuery("urns.path", "222"))) - assertQuery(t, client, physicalName, query, []int64{1}) - - // text query - query = elastic.NewNestedQuery("fields", elastic.NewBoolQuery().Must( - elastic.NewMatchQuery("fields.field", "17103bb1-1b48-4b70-92f7-1f6b73bd3488"), - elastic.NewMatchQuery("fields.text", "the rock"))) - assertQuery(t, client, physicalName, query, []int64{1}) - - // people with no nickname - notQuery := elastic.NewBoolQuery().MustNot( - elastic.NewNestedQuery("fields", elastic.NewBoolQuery().Must( - elastic.NewMatchQuery("fields.field", "17103bb1-1b48-4b70-92f7-1f6b73bd3488"), - elastic.NewExistsQuery("fields.text")))) - assertQuery(t, client, physicalName, notQuery, []int64{2, 3, 4, 5, 6, 7, 8, 9}) - - // no tokenizing of field text - query = elastic.NewNestedQuery("fields", elastic.NewBoolQuery().Must( - elastic.NewMatchQuery("fields.field", "17103bb1-1b48-4b70-92f7-1f6b73bd3488"), - elastic.NewMatchQuery("fields.text", "rock"))) - assertQuery(t, client, physicalName, query, []int64{}) - - // number field range query - query = elastic.NewNestedQuery("fields", elastic.NewBoolQuery().Must( - elastic.NewMatchQuery("fields.field", "05bca1cd-e322-4837-9595-86d0d85e5adb"), - elastic.NewRangeQuery("fields.number").Gt(10))) - assertQuery(t, client, physicalName, query, []int64{2}) - - // datetime field range query - query = elastic.NewNestedQuery("fields", elastic.NewBoolQuery().Must( - elastic.NewMatchQuery("fields.field", "e0eac267-463a-4c00-9732-cab62df07b16"), - elastic.NewRangeQuery("fields.datetime").Lt(time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC)))) - assertQuery(t, client, physicalName, query, []int64{3}) - - // state query - query = elastic.NewNestedQuery("fields", elastic.NewBoolQuery().Must( - elastic.NewMatchQuery("fields.field", "22d11697-edba-4186-b084-793e3b876379"), - elastic.NewMatchPhraseQuery("fields.state", "washington"))) - assertQuery(t, client, physicalName, query, []int64{5}) - - query = elastic.NewNestedQuery("fields", elastic.NewBoolQuery().Must( - elastic.NewMatchQuery("fields.field", "22d11697-edba-4186-b084-793e3b876379"), - elastic.NewMatchQuery("fields.state_keyword", " washington"))) - assertQuery(t, client, physicalName, query, []int64{5}) - - // doesn't include country - query = elastic.NewNestedQuery("fields", elastic.NewBoolQuery().Must( - elastic.NewMatchQuery("fields.field", "22d11697-edba-4186-b084-793e3b876379"), - elastic.NewMatchQuery("fields.state_keyword", "usa"))) - assertQuery(t, client, physicalName, query, []int64{}) - - query = elastic.NewNestedQuery("fields", elastic.NewBoolQuery().Must( - elastic.NewMatchQuery("fields.field", "22d11697-edba-4186-b084-793e3b876379"), - elastic.NewMatchPhraseQuery("fields.state", "usa"))) - assertQuery(t, client, physicalName, query, []int64{}) - - // district query - query = elastic.NewNestedQuery("fields", elastic.NewBoolQuery().Must( - elastic.NewMatchQuery("fields.field", "fcab2439-861c-4832-aa54-0c97f38f24ab"), - elastic.NewMatchPhraseQuery("fields.district", "king"))) - assertQuery(t, client, physicalName, query, []int64{7, 9}) - - // phrase matches all - query = elastic.NewNestedQuery("fields", elastic.NewBoolQuery().Must( - elastic.NewMatchQuery("fields.field", "fcab2439-861c-4832-aa54-0c97f38f24ab"), - elastic.NewMatchPhraseQuery("fields.district", "King-Côunty"))) - assertQuery(t, client, physicalName, query, []int64{7}) - - query = elastic.NewNestedQuery("fields", elastic.NewBoolQuery().Must( - elastic.NewMatchQuery("fields.field", "fcab2439-861c-4832-aa54-0c97f38f24ab"), - elastic.NewMatchQuery("fields.district_keyword", "King-Côunty"))) - assertQuery(t, client, physicalName, query, []int64{7}) - - // ward query - query = elastic.NewNestedQuery("fields", elastic.NewBoolQuery().Must( - elastic.NewMatchQuery("fields.field", "a551ade4-e5a0-4d83-b185-53b515ad2f2a"), - elastic.NewMatchPhraseQuery("fields.ward", "district"))) - assertQuery(t, client, physicalName, query, []int64{8}) - - query = elastic.NewNestedQuery("fields", elastic.NewBoolQuery().Must( - elastic.NewMatchQuery("fields.field", "a551ade4-e5a0-4d83-b185-53b515ad2f2a"), - elastic.NewMatchQuery("fields.ward_keyword", "central district"))) - assertQuery(t, client, physicalName, query, []int64{8}) - - // no substring though on keyword - query = elastic.NewNestedQuery("fields", elastic.NewBoolQuery().Must( - elastic.NewMatchQuery("fields.field", "a551ade4-e5a0-4d83-b185-53b515ad2f2a"), - elastic.NewMatchQuery("fields.ward_keyword", "district"))) - assertQuery(t, client, physicalName, query, []int64{}) - - // group query - assertQuery(t, client, physicalName, elastic.NewMatchQuery("groups", "4ea0f313-2f62-4e57-bdf0-232b5191dd57"), []int64{1}) - assertQuery(t, client, physicalName, elastic.NewMatchQuery("groups", "529bac39-550a-4d6f-817c-1833f3449007"), []int64{1, 2}) - assertQuery(t, client, physicalName, elastic.NewMatchQuery("groups", "4c016340-468d-4675-a974-15cb7a45a5ab"), []int64{}) - - lastModified, err := GetLastModified(elasticURL, physicalName) - assert.NoError(t, err) - assert.Equal(t, time.Date(2017, 11, 10, 21, 11, 59, 890662000, time.UTC), lastModified.In(time.UTC)) - - // map our index over - err = MapIndexAlias(elasticURL, indexName, physicalName) - assert.NoError(t, err) - time.Sleep(5 * time.Second) - - // try a test query to check it worked - assertQuery(t, client, indexName, elastic.NewMatchQuery("name", "john"), []int64{4}) - - // look up our mapping - physical := FindPhysicalIndexes(elasticURL, indexName) - assert.Equal(t, physicalName, physical[0]) - - // rebuild again - newIndex, err := CreateNewIndex(elasticURL, indexName) - assert.NoError(t, err) - - added, deleted, err = IndexContacts(db, elasticURL, newIndex, time.Time{}) - assert.NoError(t, err) - assert.Equal(t, 9, added) - assert.Equal(t, 0, deleted) - - // remap again - err = MapIndexAlias(elasticURL, indexName, newIndex) - assert.NoError(t, err) - time.Sleep(5 * time.Second) - - // old index still around - resp, err := http.Get(fmt.Sprintf("%s/%s", elasticURL, physicalName)) - assert.NoError(t, err) - assert.Equal(t, resp.StatusCode, http.StatusOK) - - // cleanup our indexes, will remove our original index - err = CleanupIndexes(elasticURL, indexName) - assert.NoError(t, err) - - // old physical index should be gone - resp, err = http.Get(fmt.Sprintf("%s/%s", elasticURL, physicalName)) - assert.NoError(t, err) - assert.Equal(t, resp.StatusCode, http.StatusNotFound) - - // new index still works - assertQuery(t, client, newIndex, elastic.NewMatchQuery("name", "john"), []int64{4}) - - // update our database, removing one contact, updating another - dbUpdate, err := ioutil.ReadFile("testdb_update.sql") - assert.NoError(t, err) - _, err = db.Exec(string(dbUpdate)) - assert.NoError(t, err) - - added, deleted, err = IndexContacts(db, elasticURL, indexName, lastModified) - assert.NoError(t, err) - assert.Equal(t, 1, added) - assert.Equal(t, 1, deleted) - - time.Sleep(5 * time.Second) - - // should only match new john, old john is gone - assertQuery(t, client, indexName, elastic.NewMatchQuery("name", "john"), []int64{2}) - - // 3 is no longer in our group - assertQuery(t, client, indexName, elastic.NewMatchQuery("groups", "529bac39-550a-4d6f-817c-1833f3449007"), []int64{1}) - -} -func TestRetryServer(t *testing.T) { - responseCounter := 0 - responses := []func(w http.ResponseWriter, r *http.Request){ - func(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Content-Length", "5") - }, - func(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Content-Length", "1") - }, - func(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Content-Length", "1") - }, - func(w http.ResponseWriter, r *http.Request) { - resp := `{ - "took": 1, - "timed_out": false, - "_shards": { - "total": 2, - "successful": 2, - "skipped": 0, - "failed": 0 - }, - "hits": { - "total": 1, - "max_score": null, - "hits": [ - { - "_index": "rp_elastic_test_2020_08_14_1", - "_type": "_doc", - "_id": "1", - "_score": null, - "_routing": "1", - "_source": { - "id": 1, - "org_id": 1, - "uuid": "c7a2dd87-a80e-420b-8431-ca48d422e924", - "name": null, - "language": "eng", - "is_active": true, - "created_on": "2017-11-10T16:11:59.890662-05:00", - "modified_on": "2017-11-10T16:11:59.890662-05:00", - "last_seen_on": "2020-08-04T21:11:00-04:00", - "modified_on_mu": 1.510348319890662e15, - "urns": [ - { - "scheme": "tel", - "path": "+12067791111" - }, - { - "scheme": "tel", - "path": "+12067792222" - } - ], - "fields": [ - { - "text": "the rock", - "field": "17103bb1-1b48-4b70-92f7-1f6b73bd3488" - } - ], - "groups": [ - "4ea0f313-2f62-4e57-bdf0-232b5191dd57", - "529bac39-550a-4d6f-817c-1833f3449007" - ] - }, - "sort": [1] - } - ] - } - }` - - w.Write([]byte(resp)) - }, - } - ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - responses[responseCounter](w, r) - responseCounter++ - })) - defer ts.Close() - FindPhysicalIndexes(ts.URL, "rp_elastic_test") - require.Equal(t, responseCounter, 4) -} diff --git a/indexers/base.go b/indexers/base.go new file mode 100644 index 0000000..808ddcb --- /dev/null +++ b/indexers/base.go @@ -0,0 +1,303 @@ +package indexers + +import ( + "database/sql" + "encoding/json" + "fmt" + "net/http" + "sort" + "strings" + "time" + + "github.com/nyaruka/gocommon/jsonx" + "github.com/nyaruka/rp-indexer/utils" + "github.com/sirupsen/logrus" +) + +// indexes a document +const indexCommand = `{ "index": { "_id": %d, "_type": "_doc", "version": %d, "version_type": "external", "routing": %d} }` + +// deletes a document +const deleteCommand = `{ "delete" : { "_id": %d, "_type": "_doc", "version": %d, "version_type": "external", "routing": %d} }` + +// Indexer is base interface for indexers +type Indexer interface { + Name() string + Index(db *sql.DB, rebuild, cleanup bool) (string, error) + Stats() (int64, int64, time.Duration) +} + +type baseIndexer struct { + elasticURL string + name string // e.g. contacts, used as the alias + + // statistics + indexedTotal int64 + deletedTotal int64 + elapsedTotal time.Duration +} + +func newBaseIndexer(elasticURL, name string) baseIndexer { + return baseIndexer{elasticURL: elasticURL, name: name} +} + +func (i *baseIndexer) Name() string { + return i.name +} + +func (i *baseIndexer) Stats() (int64, int64, time.Duration) { + return i.indexedTotal, i.deletedTotal, i.elapsedTotal +} + +func (i *baseIndexer) log() *logrus.Entry { + return logrus.WithField("indexer", i.name) +} + +// records a complete index and updates statistics +func (i *baseIndexer) recordComplete(indexed, deleted int, elapsed time.Duration) { + i.indexedTotal += int64(indexed) + i.deletedTotal += int64(deleted) + i.elapsedTotal += elapsed + + i.log().WithField("indexed", indexed).WithField("deleted", deleted).WithField("elapsed", elapsed).Info("completed indexing") +} + +// our response for figuring out the physical index for an alias +type infoResponse map[string]interface{} + +// FindIndexes finds all our physical indexes +func (i *baseIndexer) FindIndexes() []string { + response := infoResponse{} + _, err := utils.MakeJSONRequest(http.MethodGet, fmt.Sprintf("%s/%s", i.elasticURL, i.name), nil, &response) + indexes := make([]string, 0) + + // error could mean a variety of things, but we'll figure that out later + if err != nil { + return indexes + } + + // our top level key is our physical index name + for key := range response { + indexes = append(indexes, key) + } + + // reverse sort order should put our newest index first + sort.Sort(sort.Reverse(sort.StringSlice(indexes))) + + i.log().WithField("indexes", indexes).Debug("found physical indexes") + + return indexes +} + +// creates a new index for the passed in alias. +// +// Note that we do not create an index with the passed name, instead creating one +// based on the day, for example `contacts_2018_03_05`, then create an alias from +// that index to `contacts`. +// +// If the day-specific name already exists, we append a .1 or .2 to the name. +func (i *baseIndexer) createNewIndex(settings json.RawMessage) (string, error) { + // create our day-specific name + index := fmt.Sprintf("%s_%s", i.name, time.Now().Format("2006_01_02")) + idx := 0 + + // check if it exists + for { + resp, err := http.Get(fmt.Sprintf("%s/%s", i.elasticURL, index)) + if err != nil { + return "", err + } + // not found, great, move on + if resp.StatusCode == http.StatusNotFound { + break + } + + // was found, increase our index and try again + idx++ + index = fmt.Sprintf("%s_%s_%d", i.name, time.Now().Format("2006_01_02"), idx) + } + + // create the new index + _, err := utils.MakeJSONRequest(http.MethodPut, fmt.Sprintf("%s/%s?include_type_name=true", i.elasticURL, index), settings, nil) + if err != nil { + return "", err + } + + // all went well, return our physical index name + i.log().WithField("index", index).Info("created new index") + + return index, nil +} + +// our top level command for remapping aliases +type aliasCommand struct { + Actions []interface{} `json:"actions"` +} + +// adds an alias for an index +type addAliasCommand struct { + Add struct { + Index string `json:"index"` + Alias string `json:"alias"` + } `json:"add"` +} + +// removes an alias for an index +type removeAliasCommand struct { + Remove struct { + Index string `json:"index"` + Alias string `json:"alias"` + } `json:"remove"` +} + +// maps this indexer's alias to the new physical index, removing existing aliases if they exist +func (i *baseIndexer) updateAlias(newIndex string) error { + commands := make([]interface{}, 0) + + // find existing physical indexes + existing := i.FindIndexes() + for _, idx := range existing { + remove := removeAliasCommand{} + remove.Remove.Alias = i.name + remove.Remove.Index = idx + commands = append(commands, remove) + + logrus.WithField("indexer", i.name).WithField("index", idx).Debug("removing old alias") + } + + // add our new index + add := addAliasCommand{} + add.Add.Alias = i.name + add.Add.Index = newIndex + commands = append(commands, add) + + aliasJSON := jsonx.MustMarshal(aliasCommand{Actions: commands}) + + _, err := utils.MakeJSONRequest(http.MethodPost, fmt.Sprintf("%s/_aliases", i.elasticURL), aliasJSON, nil) + + i.log().WithField("index", newIndex).Info("updated alias") + + return err +} + +// our response for our index health +type healthResponse struct { + Indices map[string]struct { + Status string `json:"status"` + } `json:"indices"` +} + +// removes all indexes that are older than the currently active index +func (i *baseIndexer) cleanupIndexes() error { + // find our current indexes + currents := i.FindIndexes() + + // no current indexes? this a noop + if len(currents) == 0 { + return nil + } + + // find all the current indexes + healthResponse := healthResponse{} + _, err := utils.MakeJSONRequest(http.MethodGet, fmt.Sprintf("%s/%s", i.elasticURL, "_cluster/health?level=indices"), nil, &healthResponse) + if err != nil { + return err + } + + // for each active index, if it starts with our alias but is before our current index, remove it + for key := range healthResponse.Indices { + if strings.HasPrefix(key, i.name) && strings.Compare(key, currents[0]) < 0 { + logrus.WithField("index", key).Info("removing old index") + _, err = utils.MakeJSONRequest(http.MethodDelete, fmt.Sprintf("%s/%s", i.elasticURL, key), nil, nil) + if err != nil { + return err + } + } + } + + return nil +} + +// our response for indexing contacts +type indexResponse struct { + Items []struct { + Index struct { + ID string `json:"_id"` + Status int `json:"status"` + Result string `json:"result"` + } `json:"index"` + Delete struct { + ID string `json:"_id"` + Status int `json:"status"` + } `json:"delete"` + } `json:"items"` +} + +// indexes the batch of contacts +func (i *baseIndexer) indexBatch(index string, batch []byte) (int, int, error) { + response := indexResponse{} + indexURL := fmt.Sprintf("%s/%s/_bulk", i.elasticURL, index) + + _, err := utils.MakeJSONRequest(http.MethodPut, indexURL, batch, &response) + if err != nil { + return 0, 0, err + } + + createdCount, deletedCount, conflictedCount := 0, 0, 0 + for _, item := range response.Items { + if item.Index.ID != "" { + logrus.WithField("id", item.Index.ID).WithField("status", item.Index.Status).Debug("index response") + if item.Index.Status == 200 || item.Index.Status == 201 { + createdCount++ + } else if item.Index.Status == 409 { + conflictedCount++ + } else { + logrus.WithField("id", item.Index.ID).WithField("batch", batch).WithField("result", item.Index.Result).Error("error indexing document") + } + } else if item.Delete.ID != "" { + logrus.WithField("id", item.Index.ID).WithField("status", item.Index.Status).Debug("delete response") + if item.Delete.Status == 200 { + deletedCount++ + } else if item.Delete.Status == 409 { + conflictedCount++ + } + } else { + logrus.Error("unparsed item in response") + } + } + logrus.WithField("created", createdCount).WithField("deleted", deletedCount).WithField("conflicted", conflictedCount).Debug("indexed batch") + + return createdCount, deletedCount, nil +} + +// our response for finding the last modified document +type queryResponse struct { + Hits struct { + Total struct { + Value int `json:"value"` + } `json:"total"` + Hits []struct { + Source struct { + ID int64 `json:"id"` + ModifiedOn time.Time `json:"modified_on"` + } `json:"_source"` + } `json:"hits"` + } `json:"hits"` +} + +// GetLastModified queries a concrete index and finds the last modified document, returning its modified time +func (i *baseIndexer) GetLastModified(index string) (time.Time, error) { + lastModified := time.Time{} + + // get the newest document on our index + queryResponse := queryResponse{} + _, err := utils.MakeJSONRequest(http.MethodPost, fmt.Sprintf("%s/%s/_search", i.elasticURL, index), []byte(`{ "sort": [{ "modified_on_mu": "desc" }]}`), &queryResponse) + if err != nil { + return lastModified, err + } + + if len(queryResponse.Hits.Hits) > 0 { + lastModified = queryResponse.Hits.Hits[0].Source.ModifiedOn + } + return lastModified, nil +} diff --git a/indexers/base_test.go b/indexers/base_test.go new file mode 100644 index 0000000..c4a4d54 --- /dev/null +++ b/indexers/base_test.go @@ -0,0 +1,84 @@ +package indexers_test + +import ( + "context" + "database/sql" + "io/ioutil" + "log" + "os" + "sort" + "strconv" + "strings" + "testing" + + "github.com/nyaruka/rp-indexer/indexers" + "github.com/olivere/elastic/v7" + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +const elasticURL = "http://localhost:9200" +const aliasName = "indexer_test" + +func setup(t *testing.T) (*sql.DB, *elastic.Client) { + testDB, err := ioutil.ReadFile("../testdb.sql") + require.NoError(t, err) + + db, err := sql.Open("postgres", "postgres://nyaruka:nyaruka@localhost:5432/elastic_test?sslmode=disable") + require.NoError(t, err) + + _, err = db.Exec(string(testDB)) + require.NoError(t, err) + + es, err := elastic.NewClient(elastic.SetURL(elasticURL), elastic.SetTraceLog(log.New(os.Stdout, "", log.LstdFlags)), elastic.SetSniff(false)) + require.NoError(t, err) + + // delete all indexes with our alias prefix + existing, err := es.IndexNames() + require.NoError(t, err) + + for _, name := range existing { + if strings.HasPrefix(name, aliasName) { + _, err = es.DeleteIndex(name).Do(context.Background()) + require.NoError(t, err) + } + } + + logrus.SetLevel(logrus.DebugLevel) + + return db, es +} + +func assertQuery(t *testing.T, client *elastic.Client, query elastic.Query, expected []int64, msgAndArgs ...interface{}) { + results, err := client.Search().Index(aliasName).Query(query).Sort("id", true).Pretty(true).Do(context.Background()) + assert.NoError(t, err) + + actual := make([]int64, len(results.Hits.Hits)) + for h, hit := range results.Hits.Hits { + asInt, _ := strconv.Atoi(hit.Id) + actual[h] = int64(asInt) + } + + assert.Equal(t, expected, actual, msgAndArgs...) +} + +func assertIndexesWithPrefix(t *testing.T, es *elastic.Client, prefix string, expected []string) { + all, err := es.IndexNames() + require.NoError(t, err) + + actual := []string{} + for _, name := range all { + if strings.HasPrefix(name, prefix) { + actual = append(actual, name) + } + } + sort.Strings(actual) + assert.Equal(t, expected, actual) +} + +func assertIndexerStats(t *testing.T, ix indexers.Indexer, expectedIndexed, expectedDeleted int64) { + actualIndexed, actualDeleted, _ := ix.Stats() + assert.Equal(t, expectedIndexed, actualIndexed, "indexed mismatch") + assert.Equal(t, expectedDeleted, actualDeleted, "deleted mismatch") +} diff --git a/indexers/contacts.go b/indexers/contacts.go new file mode 100644 index 0000000..e25d198 --- /dev/null +++ b/indexers/contacts.go @@ -0,0 +1,250 @@ +package indexers + +import ( + "bytes" + "database/sql" + _ "embed" + "encoding/json" + "fmt" + "time" + + "github.com/pkg/errors" + "github.com/sirupsen/logrus" +) + +//go:embed contacts.settings.json +var contactsSettings json.RawMessage + +// ContactIndexer is an indexer for contacts +type ContactIndexer struct { + baseIndexer + + batchSize int +} + +// NewContactIndexer creates a new contact indexer +func NewContactIndexer(elasticURL, name string, batchSize int) *ContactIndexer { + return &ContactIndexer{ + baseIndexer: newBaseIndexer(elasticURL, name), + batchSize: batchSize, + } +} + +// Index indexes modified contacts and returns the name of the concrete index +func (i *ContactIndexer) Index(db *sql.DB, rebuild, cleanup bool) (string, error) { + var err error + + // find our physical index + physicalIndexes := i.FindIndexes() + + physicalIndex := "" + if len(physicalIndexes) > 0 { + physicalIndex = physicalIndexes[0] + } + + // whether we need to remap our alias after building + remapAlias := false + + // doesn't exist or we are rebuilding, create it + if physicalIndex == "" || rebuild { + physicalIndex, err = i.createNewIndex(contactsSettings) + if err != nil { + return "", errors.Wrap(err, "error creating new index") + } + i.log().WithField("index", physicalIndex).Info("created new physical index") + remapAlias = true + } + + lastModified, err := i.GetLastModified(physicalIndex) + if err != nil { + return "", errors.Wrap(err, "error finding last modified") + } + + i.log().WithField("index", physicalIndex).WithField("last_modified", lastModified).Info("indexing newer than last modified") + + // now index our docs + start := time.Now() + indexed, deleted, err := i.indexModified(db, physicalIndex, lastModified.Add(-5*time.Second)) + if err != nil { + return "", errors.Wrap(err, "error indexing documents") + } + + i.recordComplete(indexed, deleted, time.Since(start)) + + // if the index didn't previously exist or we are rebuilding, remap to our alias + if remapAlias { + err := i.updateAlias(physicalIndex) + if err != nil { + return "", errors.Wrap(err, "error updating alias") + } + remapAlias = false + } + + // cleanup our aliases if appropriate + if cleanup { + err := i.cleanupIndexes() + if err != nil { + return "", errors.Wrap(err, "error cleaning up old indexes") + } + } + + return physicalIndex, nil +} + +const sqlSelectModifiedContacts = ` +SELECT org_id, id, modified_on, is_active, row_to_json(t) FROM ( + SELECT + id, + org_id, + uuid, + name, + language, + status, + ticket_count AS tickets, + is_active, + created_on, + modified_on, + last_seen_on, + EXTRACT(EPOCH FROM modified_on) * 1000000 AS modified_on_mu, + ( + SELECT array_to_json(array_agg(row_to_json(u))) + FROM (SELECT scheme, path FROM contacts_contacturn WHERE contact_id = contacts_contact.id) u + ) AS urns, + ( + SELECT jsonb_agg(f.value) + FROM ( + SELECT + CASE + WHEN value ? 'ward' + THEN jsonb_build_object('ward_keyword', trim(substring(value ->> 'ward' from '(?!.* > )([^>]+)'))) + ELSE '{}'::jsonb + END || district_value.value AS value + FROM ( + SELECT + CASE + WHEN value ? 'district' + THEN jsonb_build_object('district_keyword', trim(substring(value ->> 'district' from '(?!.* > )([^>]+)'))) + ELSE '{}'::jsonb + END || state_value.value as value + FROM ( + SELECT + CASE + WHEN value ? 'state' + THEN jsonb_build_object('state_keyword', trim(substring(value ->> 'state' from '(?!.* > )([^>]+)'))) + ELSE '{}' :: jsonb + END || jsonb_build_object('field', key) || value as value + FROM jsonb_each(contacts_contact.fields) + ) state_value + ) AS district_value + ) AS f + ) AS fields, + ( + SELECT array_to_json(array_agg(g.uuid)) FROM ( + SELECT contacts_contactgroup.uuid + FROM contacts_contactgroup_contacts, contacts_contactgroup + WHERE contact_id = contacts_contact.id AND contacts_contactgroup_contacts.contactgroup_id = contacts_contactgroup.id + ) g + ) AS groups, + ( + SELECT f.uuid FROM flows_flow f WHERE f.id = contacts_contact.current_flow_id + ) AS flow + FROM contacts_contact + WHERE modified_on >= $1 + ORDER BY modified_on ASC + LIMIT 500000 +) t; +` + +// IndexModified queries and indexes all contacts with a lastModified greater than or equal to the passed in time +func (i *ContactIndexer) indexModified(db *sql.DB, index string, lastModified time.Time) (int, int, error) { + batch := &bytes.Buffer{} + createdCount, deletedCount, processedCount := 0, 0, 0 + + var modifiedOn time.Time + var contactJSON string + var id, orgID int64 + var isActive bool + + start := time.Now() + + for { + rows, err := db.Query(sqlSelectModifiedContacts, lastModified) + + queryCreated := 0 + queryCount := 0 + queryModified := lastModified + + // no more rows? return + if err == sql.ErrNoRows { + return 0, 0, nil + } + if err != nil { + return 0, 0, err + } + defer rows.Close() + + for rows.Next() { + err = rows.Scan(&orgID, &id, &modifiedOn, &isActive, &contactJSON) + if err != nil { + return 0, 0, err + } + + queryCount++ + processedCount++ + lastModified = modifiedOn + + if isActive { + logrus.WithField("id", id).WithField("modifiedOn", modifiedOn).WithField("contact", contactJSON).Debug("modified contact") + + batch.WriteString(fmt.Sprintf(indexCommand, id, modifiedOn.UnixNano(), orgID)) + batch.WriteString("\n") + batch.WriteString(contactJSON) + batch.WriteString("\n") + } else { + logrus.WithField("id", id).WithField("modifiedOn", modifiedOn).Debug("deleted contact") + + batch.WriteString(fmt.Sprintf(deleteCommand, id, modifiedOn.UnixNano(), orgID)) + batch.WriteString("\n") + } + + // write to elastic search in batches + if queryCount%i.batchSize == 0 { + created, deleted, err := i.indexBatch(index, batch.Bytes()) + if err != nil { + return 0, 0, err + } + batch.Reset() + + queryCreated += created + createdCount += created + deletedCount += deleted + } + } + + if batch.Len() > 0 { + created, deleted, err := i.indexBatch(index, batch.Bytes()) + if err != nil { + return 0, 0, err + } + + queryCreated += created + createdCount += created + deletedCount += deleted + batch.Reset() + } + + // last modified stayed the same and we didn't add anything, seen it all, break out + if lastModified.Equal(queryModified) && queryCreated == 0 { + break + } + + rows.Close() + + elapsed := time.Since(start) + rate := float32(processedCount) / (float32(elapsed) / float32(time.Second)) + + i.log().WithField("index", index).WithFields(logrus.Fields{"rate": int(rate), "added": createdCount, "deleted": deletedCount, "elapsed": elapsed}).Info("indexed contact batch") + } + + return createdCount, deletedCount, nil +} diff --git a/contacts/index_settings.json b/indexers/contacts.settings.json similarity index 100% rename from contacts/index_settings.json rename to indexers/contacts.settings.json diff --git a/indexers/contacts_test.go b/indexers/contacts_test.go new file mode 100644 index 0000000..8ef9916 --- /dev/null +++ b/indexers/contacts_test.go @@ -0,0 +1,268 @@ +package indexers_test + +import ( + "fmt" + "testing" + "time" + + _ "github.com/lib/pq" + "github.com/nyaruka/gocommon/jsonx" + "github.com/nyaruka/rp-indexer/indexers" + "github.com/olivere/elastic/v7" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +var contactQueryTests = []struct { + query elastic.Query + expected []int64 +}{ + {elastic.NewMatchQuery("org_id", "1"), []int64{1, 2, 3, 4}}, + {elastic.NewMatchQuery("name", "JOHn"), []int64{4}}, + {elastic.NewTermQuery("name.keyword", "JOHN DOE"), []int64{4}}, + {elastic.NewBoolQuery().Must(elastic.NewMatchQuery("name", "john"), elastic.NewMatchQuery("name", "doe")), []int64{4}}, // can search on both first and last name + {elastic.NewMatchQuery("name", "Ajodinabiff"), []int64{5}}, // long name + {elastic.NewMatchQuery("language", "eng"), []int64{1}}, + {elastic.NewMatchQuery("status", "B"), []int64{3}}, + {elastic.NewMatchQuery("status", "S"), []int64{2}}, + {elastic.NewMatchQuery("tickets", 2), []int64{1}}, + {elastic.NewMatchQuery("tickets", 1), []int64{2, 3}}, + {elastic.NewRangeQuery("tickets").Gt(0), []int64{1, 2, 3}}, + {elastic.NewMatchQuery("flow", "6d3cf1eb-546e-4fb8-a5ca-69187648fbf6"), []int64{2, 3}}, + {elastic.NewMatchQuery("flow", "4eea8ff1-4fe2-4ce5-92a4-0870a499973a"), []int64{4}}, + {elastic.NewRangeQuery("created_on").Gt("2017-01-01"), []int64{1, 6, 8}}, // created_on range + {elastic.NewRangeQuery("last_seen_on").Lt("2019-01-01"), []int64{3, 4}}, // last_seen_on range + {elastic.NewExistsQuery("last_seen_on"), []int64{1, 2, 3, 4, 5, 6}}, // last_seen_on is set + {elastic.NewBoolQuery().MustNot(elastic.NewExistsQuery("last_seen_on")), []int64{7, 8, 9}}, // last_seen_on is not set + { + elastic.NewNestedQuery("urns", elastic.NewBoolQuery().Must( + elastic.NewMatchQuery("urns.scheme", "facebook"), + elastic.NewMatchQuery("urns.path.keyword", "1000001"), + )), + []int64{8}, + }, + { // urn substring + elastic.NewNestedQuery("urns", elastic.NewBoolQuery().Must( + elastic.NewMatchQuery("urns.scheme", "tel"), + elastic.NewMatchPhraseQuery("urns.path", "779"), + )), + []int64{1, 2, 3, 6}, + }, + { // urn substring with more characters (77911) + elastic.NewNestedQuery("urns", elastic.NewBoolQuery().Must( + elastic.NewMatchQuery("urns.scheme", "tel"), + elastic.NewMatchPhraseQuery("urns.path", "77911"), + )), + []int64{1}, + }, + { // urn substring with more characters (600055) + elastic.NewNestedQuery("urns", elastic.NewBoolQuery().Must( + elastic.NewMatchQuery("urns.scheme", "tel"), + elastic.NewMatchPhraseQuery("urns.path", "600055"), + )), + []int64{5}, + }, + { // match a contact with multiple tel urns + elastic.NewNestedQuery("urns", elastic.NewBoolQuery().Must( + elastic.NewMatchQuery("urns.scheme", "tel"), + elastic.NewMatchPhraseQuery("urns.path", "222"), + )), + []int64{1}, + }, + { // text field + elastic.NewNestedQuery("fields", elastic.NewBoolQuery().Must( + elastic.NewMatchQuery("fields.field", "17103bb1-1b48-4b70-92f7-1f6b73bd3488"), + elastic.NewMatchQuery("fields.text", "the rock")), + ), + []int64{1}, + }, + { // people with no nickname + elastic.NewBoolQuery().MustNot( + elastic.NewNestedQuery("fields", elastic.NewBoolQuery().Must( + elastic.NewMatchQuery("fields.field", "17103bb1-1b48-4b70-92f7-1f6b73bd3488"), + elastic.NewExistsQuery("fields.text")), + ), + ), + []int64{2, 3, 4, 5, 6, 7, 8, 9}, + }, + { // no tokenizing of field text + elastic.NewNestedQuery("fields", elastic.NewBoolQuery().Must( + elastic.NewMatchQuery("fields.field", "17103bb1-1b48-4b70-92f7-1f6b73bd3488"), + elastic.NewMatchQuery("fields.text", "rock"), + )), + []int64{}, + }, + { // number field range + elastic.NewNestedQuery("fields", elastic.NewBoolQuery().Must( + elastic.NewMatchQuery("fields.field", "05bca1cd-e322-4837-9595-86d0d85e5adb"), + elastic.NewRangeQuery("fields.number").Gt(10), + )), + []int64{2}, + }, + { // datetime field range + elastic.NewNestedQuery("fields", elastic.NewBoolQuery().Must( + elastic.NewMatchQuery("fields.field", "e0eac267-463a-4c00-9732-cab62df07b16"), + elastic.NewRangeQuery("fields.datetime").Lt(time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC)), + )), + []int64{3}, + }, + { // state field + elastic.NewNestedQuery("fields", elastic.NewBoolQuery().Must( + elastic.NewMatchQuery("fields.field", "22d11697-edba-4186-b084-793e3b876379"), + elastic.NewMatchPhraseQuery("fields.state", "washington"), + )), + []int64{5}, + }, + { + elastic.NewNestedQuery("fields", elastic.NewBoolQuery().Must( + elastic.NewMatchQuery("fields.field", "22d11697-edba-4186-b084-793e3b876379"), + elastic.NewMatchQuery("fields.state_keyword", " washington"), + )), + []int64{5}, + }, + { // doesn't include country + elastic.NewNestedQuery("fields", elastic.NewBoolQuery().Must( + elastic.NewMatchQuery("fields.field", "22d11697-edba-4186-b084-793e3b876379"), + elastic.NewMatchQuery("fields.state_keyword", "usa"), + )), + []int64{}, + }, + { + elastic.NewNestedQuery("fields", elastic.NewBoolQuery().Must( + elastic.NewMatchQuery("fields.field", "22d11697-edba-4186-b084-793e3b876379"), + elastic.NewMatchPhraseQuery("fields.state", "usa"), + )), + []int64{}, + }, + { // district field + elastic.NewNestedQuery("fields", elastic.NewBoolQuery().Must( + elastic.NewMatchQuery("fields.field", "fcab2439-861c-4832-aa54-0c97f38f24ab"), + elastic.NewMatchPhraseQuery("fields.district", "king"), + )), + []int64{7, 9}, + }, + { // phrase matches all + elastic.NewNestedQuery("fields", elastic.NewBoolQuery().Must( + elastic.NewMatchQuery("fields.field", "fcab2439-861c-4832-aa54-0c97f38f24ab"), + elastic.NewMatchPhraseQuery("fields.district", "King-Côunty"), + )), + []int64{7}, + }, + { + elastic.NewNestedQuery("fields", elastic.NewBoolQuery().Must( + elastic.NewMatchQuery("fields.field", "fcab2439-861c-4832-aa54-0c97f38f24ab"), + elastic.NewMatchQuery("fields.district_keyword", "King-Côunty"), + )), + []int64{7}, + }, + { // ward field + elastic.NewNestedQuery("fields", elastic.NewBoolQuery().Must( + elastic.NewMatchQuery("fields.field", "a551ade4-e5a0-4d83-b185-53b515ad2f2a"), + elastic.NewMatchPhraseQuery("fields.ward", "district"), + )), + []int64{8}, + }, + { + elastic.NewNestedQuery("fields", elastic.NewBoolQuery().Must( + elastic.NewMatchQuery("fields.field", "a551ade4-e5a0-4d83-b185-53b515ad2f2a"), + elastic.NewMatchQuery("fields.ward_keyword", "central district"), + )), + []int64{8}, + }, + { // no substring though on keyword + elastic.NewNestedQuery("fields", elastic.NewBoolQuery().Must( + elastic.NewMatchQuery("fields.field", "a551ade4-e5a0-4d83-b185-53b515ad2f2a"), + elastic.NewMatchQuery("fields.ward_keyword", "district"), + )), + []int64{}, + }, + {elastic.NewMatchQuery("groups", "4ea0f313-2f62-4e57-bdf0-232b5191dd57"), []int64{1}}, + {elastic.NewMatchQuery("groups", "529bac39-550a-4d6f-817c-1833f3449007"), []int64{1, 2}}, + {elastic.NewMatchQuery("groups", "4c016340-468d-4675-a974-15cb7a45a5ab"), []int64{}}, +} + +func TestContacts(t *testing.T) { + db, es := setup(t) + + ix1 := indexers.NewContactIndexer(elasticURL, aliasName, 4) + assert.Equal(t, "indexer_test", ix1.Name()) + + expectedIndexName := fmt.Sprintf("indexer_test_%s", time.Now().Format("2006_01_02")) + + indexName, err := ix1.Index(db, false, false) + assert.NoError(t, err) + assert.Equal(t, expectedIndexName, indexName) + + time.Sleep(1 * time.Second) + + assertIndexerStats(t, ix1, 9, 0) + assertIndexesWithPrefix(t, es, aliasName, []string{expectedIndexName}) + + for _, tc := range contactQueryTests { + src, _ := tc.query.Source() + assertQuery(t, es, tc.query, tc.expected, "query mismatch for %s", string(jsonx.MustMarshal(src))) + } + + lastModified, err := ix1.GetLastModified(indexName) + assert.NoError(t, err) + assert.Equal(t, time.Date(2017, 11, 10, 21, 11, 59, 890662000, time.UTC), lastModified.In(time.UTC)) + + // now make some contact changes, removing one contact, updating another + _, err = db.Exec(` + DELETE FROM contacts_contactgroup_contacts WHERE id = 3; + UPDATE contacts_contact SET name = 'John Deer', modified_on = '2020-08-20 14:00:00+00' where id = 2; + UPDATE contacts_contact SET is_active = FALSE, modified_on = '2020-08-22 15:00:00+00' where id = 4;`) + require.NoError(t, err) + + // and index again... + indexName, err = ix1.Index(db, false, false) + assert.NoError(t, err) + assert.Equal(t, expectedIndexName, indexName) // same index used + assertIndexerStats(t, ix1, 10, 1) + + time.Sleep(1 * time.Second) + + assertIndexesWithPrefix(t, es, aliasName, []string{expectedIndexName}) + + // should only match new john, old john is gone + assertQuery(t, es, elastic.NewMatchQuery("name", "john"), []int64{2}) + + // 3 is no longer in our group + assertQuery(t, es, elastic.NewMatchQuery("groups", "529bac39-550a-4d6f-817c-1833f3449007"), []int64{1}) + + // change John's name to Eric.. + _, err = db.Exec(` + UPDATE contacts_contact SET name = 'Eric', modified_on = '2020-08-20 14:00:00+00' where id = 2;`) + require.NoError(t, err) + + // and simulate another indexer doing a parallel rebuild + ix2 := indexers.NewContactIndexer(elasticURL, aliasName, 4) + + indexName2, err := ix2.Index(db, true, false) + assert.NoError(t, err) + assert.Equal(t, expectedIndexName+"_1", indexName2) // new index used + assertIndexerStats(t, ix2, 8, 0) + + time.Sleep(1 * time.Second) + + // check we have a new index but the old index is still around + assertIndexesWithPrefix(t, es, aliasName, []string{expectedIndexName, expectedIndexName + "_1"}) + + // and the alias points to the new index + assertQuery(t, es, elastic.NewMatchQuery("name", "eric"), []int64{2}) + + // simulate another indexer doing a parallel rebuild with cleanup + ix3 := indexers.NewContactIndexer(elasticURL, aliasName, 4) + indexName3, err := ix3.Index(db, true, true) + assert.NoError(t, err) + assert.Equal(t, expectedIndexName+"_2", indexName3) // new index used + assertIndexerStats(t, ix3, 8, 0) + + // check we cleaned up indexes besides the new one + assertIndexesWithPrefix(t, es, aliasName, []string{expectedIndexName + "_2"}) + + // check that the original indexer now indexes against the new index + indexName, err = ix1.Index(db, false, false) + assert.NoError(t, err) + assert.Equal(t, expectedIndexName+"_2", indexName) +} diff --git a/testdb_update.sql b/testdb_update.sql deleted file mode 100644 index e551846..0000000 --- a/testdb_update.sql +++ /dev/null @@ -1,8 +0,0 @@ --- update one of our contacts -DELETE FROM contacts_contactgroup_contacts WHERE id = 3; -UPDATE contacts_contact SET name = 'John Deer', modified_on = '2020-08-20 14:00:00+00' where id = 2; - --- delete one of our others -UPDATE contacts_contact SET is_active = FALSE, modified_on = '2020-08-22 15:00:00+00' where id = 4; - - diff --git a/elastic.go b/utils/http.go similarity index 60% rename from elastic.go rename to utils/http.go index 02f8a23..a246b4a 100644 --- a/elastic.go +++ b/utils/http.go @@ -1,4 +1,4 @@ -package indexer +package utils import ( "bytes" @@ -46,9 +46,7 @@ func shouldRetry(request *http.Request, response *http.Response, withDelay time. // MakeJSONRequest is a utility function to make a JSON request, optionally decoding the response into the passed in struct func MakeJSONRequest(method string, url string, body []byte, jsonStruct interface{}) (*http.Response, error) { - req, _ := http.NewRequest(method, url, bytes.NewReader(body)) - req.Header.Add("Content-Type", "application/json") - + req, _ := httpx.NewRequest(method, url, bytes.NewReader(body), map[string]string{"Content-Type": "application/json"}) resp, err := httpx.Do(http.DefaultClient, req, retryConfig, nil) l := log.WithField("url", url).WithField("method", method).WithField("request", body) @@ -87,64 +85,3 @@ func MakeJSONRequest(method string, url string, body []byte, jsonStruct interfac l.Debug("ES request successful") return resp, nil } - -// adds an alias for an index -type addAliasCommand struct { - Add struct { - Index string `json:"index"` - Alias string `json:"alias"` - } `json:"add"` -} - -// removes an alias for an index -type removeAliasCommand struct { - Remove struct { - Index string `json:"index"` - Alias string `json:"alias"` - } `json:"remove"` -} - -// our top level command for remapping aliases -type aliasCommand struct { - Actions []interface{} `json:"actions"` -} - -// our response for finding the most recent contact -type queryResponse struct { - Hits struct { - Total struct { - Value int `json:"value"` - } `json:"total"` - Hits []struct { - Source struct { - ID int64 `json:"id"` - ModifiedOn time.Time `json:"modified_on"` - } `json:"_source"` - } `json:"hits"` - } `json:"hits"` -} - -// our response for indexing contacts -type indexResponse struct { - Items []struct { - Index struct { - ID string `json:"_id"` - Status int `json:"status"` - Result string `json:"result"` - } `json:"index"` - Delete struct { - ID string `json:"_id"` - Status int `json:"status"` - } `json:"delete"` - } `json:"items"` -} - -// our response for our index health -type healthResponse struct { - Indices map[string]struct { - Status string `json:"status"` - } `json:"indices"` -} - -// our response for figuring out the physical index for an alias -type infoResponse map[string]interface{} diff --git a/utils/http_test.go b/utils/http_test.go new file mode 100644 index 0000000..fd8dea7 --- /dev/null +++ b/utils/http_test.go @@ -0,0 +1,40 @@ +package utils_test + +import ( + "net/http" + "net/http/httptest" + "testing" + + "github.com/nyaruka/rp-indexer/utils" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestRetryServer(t *testing.T) { + responseCounter := 0 + responses := []func(w http.ResponseWriter, r *http.Request){ + func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Length", "5") + }, + func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Length", "1") + }, + func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Length", "1") + }, + func(w http.ResponseWriter, r *http.Request) { + w.Write([]byte(`{"foo": 1}`)) + }, + } + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + responses[responseCounter](w, r) + responseCounter++ + })) + defer ts.Close() + + resp, err := utils.MakeJSONRequest("GET", ts.URL, nil, nil) + assert.NoError(t, err) + assert.Equal(t, 200, resp.StatusCode) + + require.Equal(t, responseCounter, 4) +}