From f819976cd2ae07e39ab642a94712ea8489f1f611 Mon Sep 17 00:00:00 2001 From: Robert Kenny Date: Mon, 16 Aug 2021 15:53:14 +0100 Subject: [PATCH 1/4] Adds executeMultiSearchRequest to ElasticsearchService --- common/search/docker-compose.yml | 14 ++ .../elasticsearch/ElasticsearchService.scala | 32 ++++- .../ElasticsearchServiceTest.scala | 134 ++++++++++++++++++ .../api/stacks/services/ItemLookupTest.scala | 1 + 4 files changed, 180 insertions(+), 1 deletion(-) create mode 100644 common/search/docker-compose.yml create mode 100644 common/search/src/test/scala/weco/api/search/elasticsearch/ElasticsearchServiceTest.scala diff --git a/common/search/docker-compose.yml b/common/search/docker-compose.yml new file mode 100644 index 000000000..a1f15d4c3 --- /dev/null +++ b/common/search/docker-compose.yml @@ -0,0 +1,14 @@ +version: "3.3" + +services: + elasticsearch: + image: "docker.elastic.co/elasticsearch/elasticsearch:7.9.0" + ports: + - "9200:9200" + - "9300:9300" + environment: + - "http.host=0.0.0.0" + - "transport.host=0.0.0.0" + - "cluster.name=wellcome" + - "logger.level=DEBUG" + - "discovery.type=single-node" diff --git a/common/search/src/main/scala/weco/api/search/elasticsearch/ElasticsearchService.scala b/common/search/src/main/scala/weco/api/search/elasticsearch/ElasticsearchService.scala index 7bd52f63f..7667ee7ad 100644 --- a/common/search/src/main/scala/weco/api/search/elasticsearch/ElasticsearchService.scala +++ b/common/search/src/main/scala/weco/api/search/elasticsearch/ElasticsearchService.scala @@ -3,7 +3,7 @@ package weco.api.search.elasticsearch import com.sksamuel.elastic4s.ElasticDsl._ import com.sksamuel.elastic4s.circe._ import com.sksamuel.elastic4s.requests.get.GetResponse -import com.sksamuel.elastic4s.requests.searches.{SearchRequest, SearchResponse} +import com.sksamuel.elastic4s.requests.searches.{MultiSearchRequest, MultiSearchResponse, SearchRequest, SearchResponse} import com.sksamuel.elastic4s.{ElasticClient, Hit, Index, Response} import grizzled.slf4j.Logging import io.circe.Decoder @@ -72,6 +72,36 @@ class ElasticsearchService(elasticClient: ElasticClient)( } } + def executeMultiSearchRequest( + request: MultiSearchRequest + ): Future[Either[ElasticsearchError, MultiSearchResponse]] = + spanFuture( + name = "ElasticSearch#executeMultiSearchRequest", + spanType = "request", + subType = "elastic", + action = "query" + ) { + debug(s"Sending ES request: ${request.show}") + val transaction = Tracing.currentTransaction + withActiveTrace(elasticClient.execute(request)) + .map(_.toEither) + .map { + case Right(multiResponse) => + val accumulatedTime = multiResponse.items.foldLeft(0L) { (acc, item) => + item.response match { + case Right(itemResponse) => acc + itemResponse.took + case Left(err) => acc + } + } + + transaction.setLabel("elasticTook", accumulatedTime) + Right(multiResponse) + + case Left(err) => + Left(ElasticsearchError(err)) + } + } + private def deserialize[T](hit: Hit)(implicit decoder: Decoder[T]): T = hit.safeTo[T] match { case Success(work) => work diff --git a/common/search/src/test/scala/weco/api/search/elasticsearch/ElasticsearchServiceTest.scala b/common/search/src/test/scala/weco/api/search/elasticsearch/ElasticsearchServiceTest.scala new file mode 100644 index 000000000..5511acc4c --- /dev/null +++ b/common/search/src/test/scala/weco/api/search/elasticsearch/ElasticsearchServiceTest.scala @@ -0,0 +1,134 @@ +package weco.api.search.elasticsearch + +import com.sksamuel.elastic4s.ElasticDsl.{boolQuery, bulk, indexInto, search, termQuery} +import com.sksamuel.elastic4s.analysis.Analysis +import com.sksamuel.elastic4s.fields.{KeywordField, TextField} +import com.sksamuel.elastic4s.requests.mappings.MappingDefinition +import org.scalatest.funspec.AnyFunSpec +import org.scalatest.matchers.should.Matchers +import weco.catalogue.internal_model.index.IndexFixtures +import weco.elasticsearch.IndexConfig +import weco.elasticsearch.test.fixtures.ElasticsearchFixtures +import weco.json.JsonUtil.toJson +import io.circe.generic.auto._ +import com.sksamuel.elastic4s.ElasticDsl._ +import com.sksamuel.elastic4s.Index +import com.sksamuel.elastic4s.circe.hitReaderWithCirce +import com.sksamuel.elastic4s.requests.searches.MultiSearchRequest +import org.scalatest.EitherValues +import org.scalatest.concurrent.PatienceConfiguration.Timeout +import org.scalatest.time.{Seconds, Span} +import weco.fixtures.{RandomGenerators, TestWith} + +import scala.concurrent.ExecutionContext.Implicits.global + + +class ElasticsearchServiceTest + extends AnyFunSpec + with Matchers + with IndexFixtures + with EitherValues + with RandomGenerators + with ElasticsearchFixtures { + + case class ExampleThing(id: String, name: String) + + def randomThing = ExampleThing( + id = randomAlphanumeric(10).toLowerCase, + name = randomAlphanumeric(10).toLowerCase + ) + + def searchRequestForThingByName(index: Index, name: String) = { + search(index) + .query( + boolQuery.filter( + termQuery(field = "name", value = name) + ) + ) + .size(1) + } + + def withExampleIndex[R](thingsToIndex: List[ExampleThing])(testWith : TestWith[Index, R]) : R = { + val id = KeywordField("id") + val name = TextField("name") + val mapping = MappingDefinition(properties = List(id, name)) + val analysis = Analysis(analyzers = List.empty) + val indexConfig = IndexConfig(mapping, analysis) + + withLocalIndex(indexConfig) { index => + val result = elasticClient.execute( + bulk( + thingsToIndex.map { thing => + val jsonDoc = toJson(thing).get + indexInto(index.name) + .id(thing.id) + .doc(jsonDoc) + } + ).refreshImmediately + ) + + whenReady(result, Timeout(Span(30, Seconds))) { _ => + getSizeOf(index) shouldBe thingsToIndex.size + + testWith(index) + } + } + } + + describe("executeMultiSearchRequest") { + it("performs a multiSearchRequest") { + + val thingsToIndex = 0.to(randomInt(5,10)).map(_ => randomThing).toList + + withExampleIndex(thingsToIndex) { index => + + val elasticsearchService = new ElasticsearchService(elasticClient) + + val searchRequests = thingsToIndex.map { thing => + searchRequestForThingByName( + index = index, + name = thing.name + ) + } + + val multiSearchRequest = MultiSearchRequest(searchRequests) + val multiSearchResponseFuture = elasticsearchService.executeMultiSearchRequest(multiSearchRequest) + + whenReady(multiSearchResponseFuture) { multiSearchResponseEither => + val multiSearchResponse = multiSearchResponseEither.right.value + val queryResults = multiSearchResponse.items.map(_.response.right.value) + val returnedThings = queryResults.flatMap(_.to[ExampleThing]) + + returnedThings.toSet shouldBe thingsToIndex.toSet + } + } + } + } + + describe("executeSearchRequest") { + it("performs a searchRequest") { + + val thingsToIndex = 0.to(randomInt(5,10)).map(_ => randomThing).toList + + withExampleIndex(thingsToIndex) { index => + + val elasticsearchService = new ElasticsearchService(elasticClient) + val thingToQueryFor = thingsToIndex.head + + val searchRequest = searchRequestForThingByName( + index = index, + name = thingToQueryFor.name + ) + + val searchResponseFuture = elasticsearchService.executeSearchRequest(searchRequest) + + whenReady(searchResponseFuture) { searchResponseEither => + val searchResponse = searchResponseEither.right.value + val queryResult = searchResponse.to[ExampleThing].head + + queryResult shouldBe thingToQueryFor + } + } + } + } +} diff --git a/common/stacks/src/test/scala/weco/api/stacks/services/ItemLookupTest.scala b/common/stacks/src/test/scala/weco/api/stacks/services/ItemLookupTest.scala index 9d6330d2c..f646f7c42 100644 --- a/common/stacks/src/test/scala/weco/api/stacks/services/ItemLookupTest.scala +++ b/common/stacks/src/test/scala/weco/api/stacks/services/ItemLookupTest.scala @@ -26,6 +26,7 @@ class ItemLookupTest with IndexFixtures with ItemsGenerators with WorkGenerators { + def createLookup(index: Index): ItemLookup = ElasticItemLookup(elasticClient, index = index) From 66c6ea9f69e62633b6e47993d8176fc627e99c6d Mon Sep 17 00:00:00 2001 From: Buildkite on behalf of Wellcome Collection Date: Mon, 16 Aug 2021 14:55:01 +0000 Subject: [PATCH 2/4] Apply auto-formatting rules --- .../elasticsearch/ElasticsearchService.scala | 22 +++++++++----- .../ElasticsearchServiceTest.scala | 30 ++++++++++++------- 2 files changed, 33 insertions(+), 19 deletions(-) diff --git a/common/search/src/main/scala/weco/api/search/elasticsearch/ElasticsearchService.scala b/common/search/src/main/scala/weco/api/search/elasticsearch/ElasticsearchService.scala index 7667ee7ad..41a02d85f 100644 --- a/common/search/src/main/scala/weco/api/search/elasticsearch/ElasticsearchService.scala +++ b/common/search/src/main/scala/weco/api/search/elasticsearch/ElasticsearchService.scala @@ -3,7 +3,12 @@ package weco.api.search.elasticsearch import com.sksamuel.elastic4s.ElasticDsl._ import com.sksamuel.elastic4s.circe._ import com.sksamuel.elastic4s.requests.get.GetResponse -import com.sksamuel.elastic4s.requests.searches.{MultiSearchRequest, MultiSearchResponse, SearchRequest, SearchResponse} +import com.sksamuel.elastic4s.requests.searches.{ + MultiSearchRequest, + MultiSearchResponse, + SearchRequest, + SearchResponse +} import com.sksamuel.elastic4s.{ElasticClient, Hit, Index, Response} import grizzled.slf4j.Logging import io.circe.Decoder @@ -73,8 +78,8 @@ class ElasticsearchService(elasticClient: ElasticClient)( } def executeMultiSearchRequest( - request: MultiSearchRequest - ): Future[Either[ElasticsearchError, MultiSearchResponse]] = + request: MultiSearchRequest + ): Future[Either[ElasticsearchError, MultiSearchResponse]] = spanFuture( name = "ElasticSearch#executeMultiSearchRequest", spanType = "request", @@ -87,11 +92,12 @@ class ElasticsearchService(elasticClient: ElasticClient)( .map(_.toEither) .map { case Right(multiResponse) => - val accumulatedTime = multiResponse.items.foldLeft(0L) { (acc, item) => - item.response match { - case Right(itemResponse) => acc + itemResponse.took - case Left(err) => acc - } + val accumulatedTime = multiResponse.items.foldLeft(0L) { + (acc, item) => + item.response match { + case Right(itemResponse) => acc + itemResponse.took + case Left(err) => acc + } } transaction.setLabel("elasticTook", accumulatedTime) diff --git a/common/search/src/test/scala/weco/api/search/elasticsearch/ElasticsearchServiceTest.scala b/common/search/src/test/scala/weco/api/search/elasticsearch/ElasticsearchServiceTest.scala index 5511acc4c..cb41a4858 100644 --- a/common/search/src/test/scala/weco/api/search/elasticsearch/ElasticsearchServiceTest.scala +++ b/common/search/src/test/scala/weco/api/search/elasticsearch/ElasticsearchServiceTest.scala @@ -1,6 +1,12 @@ package weco.api.search.elasticsearch -import com.sksamuel.elastic4s.ElasticDsl.{boolQuery, bulk, indexInto, search, termQuery} +import com.sksamuel.elastic4s.ElasticDsl.{ + boolQuery, + bulk, + indexInto, + search, + termQuery +} import com.sksamuel.elastic4s.analysis.Analysis import com.sksamuel.elastic4s.fields.{KeywordField, TextField} import com.sksamuel.elastic4s.requests.mappings.MappingDefinition @@ -22,9 +28,8 @@ import weco.fixtures.{RandomGenerators, TestWith} import scala.concurrent.ExecutionContext.Implicits.global - class ElasticsearchServiceTest - extends AnyFunSpec + extends AnyFunSpec with Matchers with IndexFixtures with EitherValues @@ -48,7 +53,9 @@ class ElasticsearchServiceTest .size(1) } - def withExampleIndex[R](thingsToIndex: List[ExampleThing])(testWith : TestWith[Index, R]) : R = { + def withExampleIndex[R]( + thingsToIndex: List[ExampleThing] + )(testWith: TestWith[Index, R]): R = { val id = KeywordField("id") val name = TextField("name") val mapping = MappingDefinition(properties = List(id, name)) @@ -78,10 +85,9 @@ class ElasticsearchServiceTest describe("executeMultiSearchRequest") { it("performs a multiSearchRequest") { - val thingsToIndex = 0.to(randomInt(5,10)).map(_ => randomThing).toList + val thingsToIndex = 0.to(randomInt(5, 10)).map(_ => randomThing).toList withExampleIndex(thingsToIndex) { index => - val elasticsearchService = new ElasticsearchService(elasticClient) val searchRequests = thingsToIndex.map { thing => @@ -92,11 +98,13 @@ class ElasticsearchServiceTest } val multiSearchRequest = MultiSearchRequest(searchRequests) - val multiSearchResponseFuture = elasticsearchService.executeMultiSearchRequest(multiSearchRequest) + val multiSearchResponseFuture = + elasticsearchService.executeMultiSearchRequest(multiSearchRequest) whenReady(multiSearchResponseFuture) { multiSearchResponseEither => val multiSearchResponse = multiSearchResponseEither.right.value - val queryResults = multiSearchResponse.items.map(_.response.right.value) + val queryResults = + multiSearchResponse.items.map(_.response.right.value) val returnedThings = queryResults.flatMap(_.to[ExampleThing]) returnedThings.toSet shouldBe thingsToIndex.toSet @@ -108,10 +116,9 @@ class ElasticsearchServiceTest describe("executeSearchRequest") { it("performs a searchRequest") { - val thingsToIndex = 0.to(randomInt(5,10)).map(_ => randomThing).toList + val thingsToIndex = 0.to(randomInt(5, 10)).map(_ => randomThing).toList withExampleIndex(thingsToIndex) { index => - val elasticsearchService = new ElasticsearchService(elasticClient) val thingToQueryFor = thingsToIndex.head @@ -120,7 +127,8 @@ class ElasticsearchServiceTest name = thingToQueryFor.name ) - val searchResponseFuture = elasticsearchService.executeSearchRequest(searchRequest) + val searchResponseFuture = + elasticsearchService.executeSearchRequest(searchRequest) whenReady(searchResponseFuture) { searchResponseEither => val searchResponse = searchResponseEither.right.value From 17f155e24a72454863794522132b2286751eddee Mon Sep 17 00:00:00 2001 From: Robert Kenny Date: Tue, 17 Aug 2021 09:57:15 +0100 Subject: [PATCH 3/4] record individual and accumulated --- .../search/elasticsearch/ElasticsearchService.scala | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/common/search/src/main/scala/weco/api/search/elasticsearch/ElasticsearchService.scala b/common/search/src/main/scala/weco/api/search/elasticsearch/ElasticsearchService.scala index 7667ee7ad..95efbe29e 100644 --- a/common/search/src/main/scala/weco/api/search/elasticsearch/ElasticsearchService.scala +++ b/common/search/src/main/scala/weco/api/search/elasticsearch/ElasticsearchService.scala @@ -87,14 +87,18 @@ class ElasticsearchService(elasticClient: ElasticClient)( .map(_.toEither) .map { case Right(multiResponse) => - val accumulatedTime = multiResponse.items.foldLeft(0L) { (acc, item) => + val accumulatedTime = multiResponse.items.zipWithIndex.foldLeft(0L) { (acc, itemWithIndex) => + val (item, index) = itemWithIndex + item.response match { - case Right(itemResponse) => acc + itemResponse.took - case Left(err) => acc + case Right(itemResponse) => + transaction.setLabel(s"elasticTook-${index}", itemResponse.took) + acc + itemResponse.took + case Left(_) => acc } } - transaction.setLabel("elasticTook", accumulatedTime) + transaction.setLabel("elasticTookTotal", accumulatedTime) Right(multiResponse) case Left(err) => From aa510d04f16d29c014b69ae7e9bcb6c3bb25b8a4 Mon Sep 17 00:00:00 2001 From: Buildkite on behalf of Wellcome Collection Date: Tue, 17 Aug 2021 09:00:55 +0000 Subject: [PATCH 4/4] Apply auto-formatting rules --- .../elasticsearch/ElasticsearchService.scala | 23 +++++++++++-------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/common/search/src/main/scala/weco/api/search/elasticsearch/ElasticsearchService.scala b/common/search/src/main/scala/weco/api/search/elasticsearch/ElasticsearchService.scala index 4214f5568..0087afb1c 100644 --- a/common/search/src/main/scala/weco/api/search/elasticsearch/ElasticsearchService.scala +++ b/common/search/src/main/scala/weco/api/search/elasticsearch/ElasticsearchService.scala @@ -92,16 +92,21 @@ class ElasticsearchService(elasticClient: ElasticClient)( .map(_.toEither) .map { case Right(multiResponse) => - val accumulatedTime = multiResponse.items.zipWithIndex.foldLeft(0L) { (acc, itemWithIndex) => - val (item, index) = itemWithIndex - - item.response match { - case Right(itemResponse) => - transaction.setLabel(s"elasticTook-${index}", itemResponse.took) - acc + itemResponse.took - case Left(_) => acc + val accumulatedTime = + multiResponse.items.zipWithIndex.foldLeft(0L) { + (acc, itemWithIndex) => + val (item, index) = itemWithIndex + + item.response match { + case Right(itemResponse) => + transaction.setLabel( + s"elasticTook-${index}", + itemResponse.took + ) + acc + itemResponse.took + case Left(_) => acc + } } - } transaction.setLabel("elasticTookTotal", accumulatedTime) Right(multiResponse)