Skip to content

Commit 14e0008

Browse files
committed
Move some elastic functionality into its own file
1 parent 2d37e9b commit 14e0008

File tree

4 files changed

+165
-156
lines changed

4 files changed

+165
-156
lines changed

contacts/settings.go

+5-2
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
package contacts
22

3-
import _ "embed"
3+
import (
4+
_ "embed"
5+
"encoding/json"
6+
)
47

58
// settings and mappings for our index
69
//go:embed index_settings.json
7-
var IndexSettings string
10+
var IndexSettings json.RawMessage

elastic.go

+150
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
package indexer
2+
3+
import (
4+
"bytes"
5+
"encoding/json"
6+
"fmt"
7+
"io/ioutil"
8+
"net/http"
9+
"time"
10+
11+
"github.com/nyaruka/gocommon/httpx"
12+
log "github.com/sirupsen/logrus"
13+
)
14+
15+
var retryConfig *httpx.RetryConfig
16+
17+
func init() {
18+
backoffs := make([]time.Duration, 5)
19+
backoffs[0] = 1 * time.Second
20+
for i := 1; i < len(backoffs); i++ {
21+
backoffs[i] = backoffs[i-1] * 2
22+
}
23+
24+
retryConfig = &httpx.RetryConfig{Backoffs: backoffs, ShouldRetry: shouldRetry}
25+
}
26+
27+
func shouldRetry(request *http.Request, response *http.Response, withDelay time.Duration) bool {
28+
// 429 Too Many Requests is recoverable. Sometimes the server puts
29+
// a Retry-After response header to indicate when the server is
30+
// available to start processing request from client.
31+
if response.StatusCode == http.StatusTooManyRequests {
32+
return true
33+
}
34+
35+
// check for unexpected EOF
36+
bodyBytes, err := ioutil.ReadAll(response.Body)
37+
response.Body.Close()
38+
if err != nil {
39+
log.WithError(err).Error("error reading ES response, retrying")
40+
return true
41+
}
42+
43+
response.Body = ioutil.NopCloser(bytes.NewBuffer(bodyBytes))
44+
return false
45+
}
46+
47+
// MakeJSONRequest is a utility function to make a JSON request, optionally decoding the response into the passed in struct
48+
func MakeJSONRequest(method string, url string, body []byte, jsonStruct interface{}) (*http.Response, error) {
49+
req, _ := http.NewRequest(method, url, bytes.NewReader(body))
50+
req.Header.Add("Content-Type", "application/json")
51+
52+
resp, err := httpx.Do(http.DefaultClient, req, retryConfig, nil)
53+
54+
l := log.WithField("url", url).WithField("method", method).WithField("request", body)
55+
if err != nil {
56+
l.WithError(err).Error("error making ES request")
57+
return resp, err
58+
}
59+
defer resp.Body.Close()
60+
61+
// if we have a body, try to decode it
62+
jsonBody, err := ioutil.ReadAll(resp.Body)
63+
if err != nil {
64+
l.WithError(err).Error("error reading ES response")
65+
return resp, err
66+
}
67+
68+
l = l.WithField("response", string(jsonBody)).WithField("status", resp.StatusCode)
69+
70+
// error if we got a non-200
71+
if resp.StatusCode != http.StatusOK {
72+
l.WithError(err).Error("error reaching ES")
73+
return resp, fmt.Errorf("received non 200 response %d: %s", resp.StatusCode, jsonBody)
74+
}
75+
76+
if jsonStruct == nil {
77+
l.Debug("ES request successful")
78+
return resp, nil
79+
}
80+
81+
err = json.Unmarshal(jsonBody, jsonStruct)
82+
if err != nil {
83+
l.WithError(err).Error("error unmarshalling ES response")
84+
return resp, err
85+
}
86+
87+
l.Debug("ES request successful")
88+
return resp, nil
89+
}
90+
91+
// adds an alias for an index
92+
type addAliasCommand struct {
93+
Add struct {
94+
Index string `json:"index"`
95+
Alias string `json:"alias"`
96+
} `json:"add"`
97+
}
98+
99+
// removes an alias for an index
100+
type removeAliasCommand struct {
101+
Remove struct {
102+
Index string `json:"index"`
103+
Alias string `json:"alias"`
104+
} `json:"remove"`
105+
}
106+
107+
// our top level command for remapping aliases
108+
type aliasCommand struct {
109+
Actions []interface{} `json:"actions"`
110+
}
111+
112+
// our response for finding the most recent contact
113+
type queryResponse struct {
114+
Hits struct {
115+
Total struct {
116+
Value int `json:"value"`
117+
} `json:"total"`
118+
Hits []struct {
119+
Source struct {
120+
ID int64 `json:"id"`
121+
ModifiedOn time.Time `json:"modified_on"`
122+
} `json:"_source"`
123+
} `json:"hits"`
124+
} `json:"hits"`
125+
}
126+
127+
// our response for indexing contacts
128+
type indexResponse struct {
129+
Items []struct {
130+
Index struct {
131+
ID string `json:"_id"`
132+
Status int `json:"status"`
133+
Result string `json:"result"`
134+
} `json:"index"`
135+
Delete struct {
136+
ID string `json:"_id"`
137+
Status int `json:"status"`
138+
} `json:"delete"`
139+
} `json:"items"`
140+
}
141+
142+
// our response for our index health
143+
type healthResponse struct {
144+
Indices map[string]struct {
145+
Status string `json:"status"`
146+
} `json:"indices"`
147+
}
148+
149+
// our response for figuring out the physical index for an alias
150+
type infoResponse map[string]interface{}

0 commit comments

Comments
 (0)