Skip to content

Commit 6eb6eec

Browse files
committed
feat: AWS CloudSearch engine
1 parent c964837 commit 6eb6eec

File tree

1 file changed

+178
-47
lines changed

1 file changed

+178
-47
lines changed

server/modules/search/aws/engine.js

+178-47
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
const _ = require('lodash')
22
const AWS = require('aws-sdk')
3-
const { pipeline } = require('stream')
3+
const { pipeline, Transform } = require('stream')
44

55
module.exports = {
66
async activate() {
@@ -20,6 +20,13 @@ module.exports = {
2020
secretAccessKey: this.config.secretAccessKey,
2121
region: this.config.region
2222
})
23+
this.clientDomain = new AWS.CloudSearchDomain({
24+
apiVersion: '2013-01-01',
25+
endpoint: this.config.endpoint,
26+
accessKeyId: this.config.accessKeyId,
27+
secretAccessKey: this.config.secretAccessKey,
28+
region: this.config.region
29+
})
2330

2431
let rebuildIndex = false
2532

@@ -141,10 +148,30 @@ module.exports = {
141148
*/
142149
async query(q, opts) {
143150
try {
151+
let suggestions = []
152+
const results = await this.clientDomain.search({
153+
query: q,
154+
partial: true,
155+
size: 50
156+
}).promise()
157+
if (results.hits.found < 5) {
158+
const suggestResults = await this.clientDomain.suggest({
159+
query: q,
160+
suggester: 'default_suggester',
161+
size: 5
162+
}).promise()
163+
suggestions = suggestResults.suggest.suggestions.map(s => s.suggestion)
164+
}
144165
return {
145-
results: [],
146-
suggestions: [],
147-
totalHits: 0
166+
results: _.map(results.hits.hit, r => ({
167+
id: r.id,
168+
path: _.head(r.fields.path),
169+
locale: _.head(r.fields.locale),
170+
title: _.head(r.fields.title),
171+
description: _.head(r.fields.description)
172+
})),
173+
suggestions: suggestions,
174+
totalHits: results.hits.found
148175
}
149176
} catch (err) {
150177
WIKI.logger.warn('Search Engine Error:')
@@ -157,80 +184,184 @@ module.exports = {
157184
* @param {Object} page Page to create
158185
*/
159186
async created(page) {
160-
await this.client.indexes.use(this.config.indexName).index([
161-
{
162-
id: page.hash,
163-
locale: page.localeCode,
164-
path: page.path,
165-
title: page.title,
166-
description: page.description,
167-
content: page.content
168-
}
169-
])
187+
await this.clientDomain.uploadDocuments({
188+
contentType: 'application/json',
189+
documents: JSON.stringify([
190+
{
191+
type: 'add',
192+
id: page.hash,
193+
fields: {
194+
locale: page.localeCode,
195+
path: page.path,
196+
title: page.title,
197+
description: page.description,
198+
content: page.content
199+
}
200+
}
201+
])
202+
}).promise()
170203
},
171204
/**
172205
* UPDATE
173206
*
174207
* @param {Object} page Page to update
175208
*/
176209
async updated(page) {
177-
await this.client.indexes.use(this.config.indexName).index([
178-
{
179-
id: page.hash,
180-
locale: page.localeCode,
181-
path: page.path,
182-
title: page.title,
183-
description: page.description,
184-
content: page.content
185-
}
186-
])
210+
await this.clientDomain.uploadDocuments({
211+
contentType: 'application/json',
212+
documents: JSON.stringify([
213+
{
214+
type: 'add',
215+
id: page.hash,
216+
fields: {
217+
locale: page.localeCode,
218+
path: page.path,
219+
title: page.title,
220+
description: page.description,
221+
content: page.content
222+
}
223+
}
224+
])
225+
}).promise()
187226
},
188227
/**
189228
* DELETE
190229
*
191230
* @param {Object} page Page to delete
192231
*/
193232
async deleted(page) {
194-
await this.client.indexes.use(this.config.indexName).index([
195-
{
196-
'@search.action': 'delete',
197-
id: page.hash
198-
}
199-
])
233+
await this.clientDomain.uploadDocuments({
234+
contentType: 'application/json',
235+
documents: JSON.stringify([
236+
{
237+
type: 'delete',
238+
id: page.hash
239+
}
240+
])
241+
}).promise()
200242
},
201243
/**
202244
* RENAME
203245
*
204246
* @param {Object} page Page to rename
205247
*/
206248
async renamed(page) {
207-
await this.client.indexes.use(this.config.indexName).index([
208-
{
209-
'@search.action': 'delete',
210-
id: page.sourceHash
211-
}
212-
])
213-
await this.client.indexes.use(this.config.indexName).index([
214-
{
215-
id: page.destinationHash,
216-
locale: page.localeCode,
217-
path: page.destinationPath,
218-
title: page.title,
219-
description: page.description,
220-
content: page.content
221-
}
222-
])
249+
await this.clientDomain.uploadDocuments({
250+
contentType: 'application/json',
251+
documents: JSON.stringify([
252+
{
253+
type: 'delete',
254+
id: page.sourceHash
255+
}
256+
])
257+
}).promise()
258+
await this.clientDomain.uploadDocuments({
259+
contentType: 'application/json',
260+
documents: JSON.stringify([
261+
{
262+
type: 'add',
263+
id: page.destinationHash,
264+
fields: {
265+
locale: page.localeCode,
266+
path: page.destinationPath,
267+
title: page.title,
268+
description: page.description,
269+
content: page.content
270+
}
271+
}
272+
])
273+
}).promise()
223274
},
224275
/**
225276
* REBUILD INDEX
226277
*/
227278
async rebuild() {
279+
WIKI.logger.info(`(SEARCH/AWS) Rebuilding Index...`)
280+
281+
const MAX_DOCUMENT_BYTES = Math.pow(2, 20)
282+
const MAX_INDEXING_BYTES = 5 * Math.pow(2, 20) - Buffer.from('[').byteLength - Buffer.from(']').byteLength
283+
const MAX_INDEXING_COUNT = 1000
284+
const COMMA_BYTES = Buffer.from(',').byteLength
285+
286+
let chunks = []
287+
let bytes = 0
288+
289+
const processDocument = async (cb, doc) => {
290+
try {
291+
if (doc) {
292+
const docBytes = Buffer.from(JSON.stringify(doc)).byteLength
293+
// -> Document too large
294+
if (docBytes >= MAX_DOCUMENT_BYTES) {
295+
throw new Error('Document exceeds maximum size allowed by AWS CloudSearch.')
296+
}
297+
298+
// -> Current batch exceeds size hard limit, flush
299+
if (docBytes + COMMA_BYTES + bytes >= MAX_INDEXING_BYTES) {
300+
await flushBuffer()
301+
}
302+
303+
if (chunks.length > 0) {
304+
bytes += COMMA_BYTES
305+
}
306+
bytes += docBytes
307+
chunks.push(doc)
308+
309+
// -> Current batch exceeds count soft limit, flush
310+
if (chunks.length >= MAX_INDEXING_COUNT) {
311+
await flushBuffer()
312+
}
313+
} else {
314+
// -> End of stream, flush
315+
await flushBuffer()
316+
}
317+
cb()
318+
} catch (err) {
319+
cb(err)
320+
}
321+
}
322+
323+
const flushBuffer = async () => {
324+
WIKI.logger.info(`(SEARCH/AWS) Sending batch of ${chunks.length}...`)
325+
try {
326+
const resp = await this.clientDomain.uploadDocuments({
327+
contentType: 'application/json',
328+
documents: JSON.stringify(_.map(chunks, doc => ({
329+
type: 'add',
330+
id: doc.id,
331+
fields: {
332+
locale: doc.locale,
333+
path: doc.path,
334+
title: doc.title,
335+
description: doc.description,
336+
content: doc.content
337+
}
338+
})))
339+
}).promise()
340+
} catch (err) {
341+
WIKI.logger.warn('(SEARCH/AWS) Failed to send batch to AWS CloudSearch: ', err)
342+
}
343+
chunks.length = 0
344+
bytes = 0
345+
}
346+
228347
await pipeline(
229348
WIKI.models.knex.column({ id: 'hash' }, 'path', { locale: 'localeCode' }, 'title', 'description', 'content').select().from('pages').where({
230349
isPublished: true,
231350
isPrivate: false
232351
}).stream(),
233-
this.client.indexes.use(this.config.indexName).createIndexingStream()
352+
new Transform({
353+
objectMode: true,
354+
transform: async (chunk, enc, cb) => await processDocument(cb, chunk),
355+
flush: async (cb) => await processDocument(cb)
356+
})
234357
)
358+
359+
WIKI.logger.info(`(SEARCH/AWS) Requesting Index Rebuild...`)
360+
await this.client.indexDocuments({
361+
DomainName: this.config.domain
362+
}).promise()
363+
364+
WIKI.logger.info(`(SEARCH/AWS) Index rebuilt successfully.`)
235365
}
236366
}
367+

0 commit comments

Comments
 (0)