@@ -89,7 +89,7 @@ func (i *ContactIndexer) Index(db *sql.DB, rebuild, cleanup bool) (string, error
89
89
}
90
90
91
91
const sqlSelectModifiedContacts = `
92
- SELECT org_id, id, modified_on, is_active, row_to_json(t) FROM (
92
+ SELECT org_id, id, modified_on, row_to_json(t) FROM (
93
93
SELECT
94
94
id,
95
95
org_id,
@@ -98,7 +98,6 @@ SELECT org_id, id, modified_on, is_active, row_to_json(t) FROM (
98
98
language,
99
99
status,
100
100
ticket_count AS tickets,
101
- is_active,
102
101
created_on,
103
102
modified_on,
104
103
last_seen_on,
@@ -143,20 +142,19 @@ SELECT org_id, id, modified_on, is_active, row_to_json(t) FROM (
143
142
SELECT array_to_json(array_agg(DISTINCT fr.flow_id)) FROM flows_flowrun fr WHERE fr.contact_id = contacts_contact.id
144
143
) AS flow_history_ids
145
144
FROM contacts_contact
146
- WHERE modified_on >= $1
145
+ WHERE modified_on >= $1 AND is_active
147
146
ORDER BY modified_on ASC
148
147
LIMIT 100000
149
148
) t;
150
149
`
151
150
152
151
// IndexModified queries and indexes all contacts with a lastModified greater than or equal to the passed in time
153
152
func (i * ContactIndexer ) indexModified (ctx context.Context , db * sql.DB , index string , lastModified time.Time , rebuild bool ) error {
154
- totalFetched , totalCreated , totalUpdated , totalDeleted := 0 , 0 , 0 , 0
153
+ totalFetched , totalCreated , totalUpdated := 0 , 0 , 0
155
154
156
155
var modifiedOn time.Time
157
156
var contactJSON string
158
157
var id , orgID int64
159
- var isActive bool
160
158
161
159
subBatch := & bytes.Buffer {}
162
160
start := time .Now ()
@@ -166,20 +164,18 @@ func (i *ContactIndexer) indexModified(ctx context.Context, db *sql.DB, index st
166
164
batchFetched := 0 // contacts fetched in this batch
167
165
batchCreated := 0 // contacts created in ES
168
166
batchUpdated := 0 // contacts updated in ES
169
- batchDeleted := 0 // contacts deleted in ES
170
167
batchESTime := time .Duration (0 ) // time spent indexing for this batch
171
168
172
169
indexSubBatch := func (b * bytes.Buffer ) error {
173
170
t := time .Now ()
174
- created , updated , deleted , err := i .indexBatch (index , b .Bytes ())
171
+ created , updated , err := i .indexBatch (index , b .Bytes ())
175
172
if err != nil {
176
173
return err
177
174
}
178
175
179
176
batchESTime += time .Since (t )
180
177
batchCreated += created
181
178
batchUpdated += updated
182
- batchDeleted += deleted
183
179
b .Reset ()
184
180
return nil
185
181
}
@@ -198,27 +194,20 @@ func (i *ContactIndexer) indexModified(ctx context.Context, db *sql.DB, index st
198
194
defer rows .Close ()
199
195
200
196
for rows .Next () {
201
- err = rows .Scan (& orgID , & id , & modifiedOn , & isActive , & contactJSON )
197
+ err = rows .Scan (& orgID , & id , & modifiedOn , & contactJSON )
202
198
if err != nil {
203
199
return err
204
200
}
205
201
206
202
batchFetched ++
207
203
lastModified = modifiedOn
208
204
209
- if isActive {
210
- i .log ().Debug ("modified contact" , "id" , id , "modifiedOn" , modifiedOn , "contact" , contactJSON )
205
+ i .log ().Debug ("modified contact" , "id" , id , "modifiedOn" , modifiedOn , "contact" , contactJSON )
211
206
212
- subBatch .WriteString (fmt .Sprintf (indexCommand , id , modifiedOn .UnixNano (), orgID ))
213
- subBatch .WriteString ("\n " )
214
- subBatch .WriteString (contactJSON )
215
- subBatch .WriteString ("\n " )
216
- } else {
217
- i .log ().Debug ("deleted contact" , "id" , id , "modifiedOn" , modifiedOn )
218
-
219
- subBatch .WriteString (fmt .Sprintf (deleteCommand , id , modifiedOn .UnixNano (), orgID ))
220
- subBatch .WriteString ("\n " )
221
- }
207
+ subBatch .WriteString (fmt .Sprintf (indexCommand , id , modifiedOn .UnixNano (), orgID ))
208
+ subBatch .WriteString ("\n " )
209
+ subBatch .WriteString (contactJSON )
210
+ subBatch .WriteString ("\n " )
222
211
223
212
// write to elastic search in batches
224
213
if batchFetched % i .batchSize == 0 {
@@ -239,7 +228,6 @@ func (i *ContactIndexer) indexModified(ctx context.Context, db *sql.DB, index st
239
228
totalFetched += batchFetched
240
229
totalCreated += batchCreated
241
230
totalUpdated += batchUpdated
242
- totalDeleted += batchDeleted
243
231
244
232
totalTime := time .Since (start )
245
233
batchTime := time .Since (batchStart )
@@ -265,7 +253,7 @@ func (i *ContactIndexer) indexModified(ctx context.Context, db *sql.DB, index st
265
253
log .Debug ("indexed contact batch" )
266
254
}
267
255
268
- i .recordActivity (batchCreated + batchUpdated , batchDeleted , time .Since (batchStart ))
256
+ i .recordActivity (batchCreated + batchUpdated , time .Since (batchStart ))
269
257
270
258
// last modified stayed the same and we didn't add anything, seen it all, break out
271
259
if lastModified .Equal (queryModified ) && batchCreated == 0 {
0 commit comments