Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions common/search/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
version: "3.3"

services:
elasticsearch:
image: "docker.elastic.co/elasticsearch/elasticsearch:7.9.0"
ports:
- "9200:9200"
- "9300:9300"
environment:
- "http.host=0.0.0.0"
- "transport.host=0.0.0.0"
- "cluster.name=wellcome"
- "logger.level=DEBUG"
- "discovery.type=single-node"
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,12 @@ package weco.api.search.elasticsearch
import com.sksamuel.elastic4s.ElasticDsl._
import com.sksamuel.elastic4s.circe._
import com.sksamuel.elastic4s.requests.get.GetResponse
import com.sksamuel.elastic4s.requests.searches.{SearchRequest, SearchResponse}
import com.sksamuel.elastic4s.requests.searches.{
MultiSearchRequest,
MultiSearchResponse,
SearchRequest,
SearchResponse
}
import com.sksamuel.elastic4s.{ElasticClient, Hit, Index, Response}
import grizzled.slf4j.Logging
import io.circe.Decoder
Expand Down Expand Up @@ -72,6 +77,45 @@ class ElasticsearchService(elasticClient: ElasticClient)(
}
}

def executeMultiSearchRequest(
request: MultiSearchRequest
): Future[Either[ElasticsearchError, MultiSearchResponse]] =
spanFuture(
name = "ElasticSearch#executeMultiSearchRequest",
spanType = "request",
subType = "elastic",
action = "query"
) {
debug(s"Sending ES request: ${request.show}")
val transaction = Tracing.currentTransaction
withActiveTrace(elasticClient.execute(request))
.map(_.toEither)
.map {
case Right(multiResponse) =>
val accumulatedTime =
multiResponse.items.zipWithIndex.foldLeft(0L) {
(acc, itemWithIndex) =>
val (item, index) = itemWithIndex

item.response match {
case Right(itemResponse) =>
transaction.setLabel(
s"elasticTook-${index}",
itemResponse.took
)
acc + itemResponse.took
case Left(_) => acc
}
}

transaction.setLabel("elasticTookTotal", accumulatedTime)
Right(multiResponse)

case Left(err) =>
Left(ElasticsearchError(err))
}
}

private def deserialize[T](hit: Hit)(implicit decoder: Decoder[T]): T =
hit.safeTo[T] match {
case Success(work) => work
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
package weco.api.search.elasticsearch

import com.sksamuel.elastic4s.ElasticDsl.{
boolQuery,
bulk,
indexInto,
search,
termQuery
}
import com.sksamuel.elastic4s.analysis.Analysis
import com.sksamuel.elastic4s.fields.{KeywordField, TextField}
import com.sksamuel.elastic4s.requests.mappings.MappingDefinition
import org.scalatest.funspec.AnyFunSpec
import org.scalatest.matchers.should.Matchers
import weco.catalogue.internal_model.index.IndexFixtures
import weco.elasticsearch.IndexConfig
import weco.elasticsearch.test.fixtures.ElasticsearchFixtures
import weco.json.JsonUtil.toJson
import io.circe.generic.auto._
import com.sksamuel.elastic4s.ElasticDsl._
import com.sksamuel.elastic4s.Index
import com.sksamuel.elastic4s.circe.hitReaderWithCirce
import com.sksamuel.elastic4s.requests.searches.MultiSearchRequest
import org.scalatest.EitherValues
import org.scalatest.concurrent.PatienceConfiguration.Timeout
import org.scalatest.time.{Seconds, Span}
import weco.fixtures.{RandomGenerators, TestWith}

import scala.concurrent.ExecutionContext.Implicits.global

class ElasticsearchServiceTest
extends AnyFunSpec
with Matchers
with IndexFixtures
with EitherValues
with RandomGenerators
with ElasticsearchFixtures {

case class ExampleThing(id: String, name: String)

def randomThing = ExampleThing(
id = randomAlphanumeric(10).toLowerCase,
name = randomAlphanumeric(10).toLowerCase
)

def searchRequestForThingByName(index: Index, name: String) = {
search(index)
.query(
boolQuery.filter(
termQuery(field = "name", value = name)
)
)
.size(1)
}

def withExampleIndex[R](
thingsToIndex: List[ExampleThing]
)(testWith: TestWith[Index, R]): R = {
val id = KeywordField("id")
val name = TextField("name")
val mapping = MappingDefinition(properties = List(id, name))
val analysis = Analysis(analyzers = List.empty)
val indexConfig = IndexConfig(mapping, analysis)

withLocalIndex(indexConfig) { index =>
val result = elasticClient.execute(
bulk(
thingsToIndex.map { thing =>
val jsonDoc = toJson(thing).get
indexInto(index.name)
.id(thing.id)
.doc(jsonDoc)
}
).refreshImmediately
)

whenReady(result, Timeout(Span(30, Seconds))) { _ =>
getSizeOf(index) shouldBe thingsToIndex.size

testWith(index)
}
}
}

describe("executeMultiSearchRequest") {
it("performs a multiSearchRequest") {

val thingsToIndex = 0.to(randomInt(5, 10)).map(_ => randomThing).toList

withExampleIndex(thingsToIndex) { index =>
val elasticsearchService = new ElasticsearchService(elasticClient)

val searchRequests = thingsToIndex.map { thing =>
searchRequestForThingByName(
index = index,
name = thing.name
)
}

val multiSearchRequest = MultiSearchRequest(searchRequests)
val multiSearchResponseFuture =
elasticsearchService.executeMultiSearchRequest(multiSearchRequest)

whenReady(multiSearchResponseFuture) { multiSearchResponseEither =>
val multiSearchResponse = multiSearchResponseEither.right.value
val queryResults =
multiSearchResponse.items.map(_.response.right.value)
val returnedThings = queryResults.flatMap(_.to[ExampleThing])

returnedThings.toSet shouldBe thingsToIndex.toSet
}
}
}
}

describe("executeSearchRequest") {
it("performs a searchRequest") {

val thingsToIndex = 0.to(randomInt(5, 10)).map(_ => randomThing).toList

withExampleIndex(thingsToIndex) { index =>
val elasticsearchService = new ElasticsearchService(elasticClient)
val thingToQueryFor = thingsToIndex.head

val searchRequest = searchRequestForThingByName(
index = index,
name = thingToQueryFor.name
)

val searchResponseFuture =
elasticsearchService.executeSearchRequest(searchRequest)

whenReady(searchResponseFuture) { searchResponseEither =>
val searchResponse = searchResponseEither.right.value
val queryResult = searchResponse.to[ExampleThing].head

queryResult shouldBe thingToQueryFor
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class ItemLookupTest
with IndexFixtures
with ItemsGenerators
with WorkGenerators {

def createLookup(index: Index): ItemLookup =
ElasticItemLookup(elasticClient, index = index)

Expand Down