diff --git a/.github/workflows/master.yml b/.github/workflows/master.yml index 433448421..643156099 100644 --- a/.github/workflows/master.yml +++ b/.github/workflows/master.yml @@ -9,7 +9,7 @@ on: - '*.yml' branches: - master - - release/* + - opensearch jobs: scala-2_12: @@ -35,7 +35,7 @@ jobs: id: import_gpg uses: crazy-max/ghaction-import-gpg@v3 with: - gpg-private-key: ${{ secrets.GPG_PRIVATE_KEY }} + gpg-private-key: ${{ secrets.PGP_SECRET }} passphrase: ${{ secrets.PGP_PASSPHRASE }} - name: GPG user IDs @@ -48,8 +48,8 @@ jobs: - name: publish snapshot run: sbt ++2.12.12 publish env: - OSSRH_USERNAME: ${{ secrets.OSSRH_USERNAME }} - OSSRH_PASSWORD: ${{ secrets.OSSRH_PASSWORD }} + OSSRH_USERNAME: ${{ secrets.SONATYPE_USERNAME }} + OSSRH_PASSWORD: ${{ secrets.SONATYPE_PASSWORD }} scala-2_13: runs-on: ubuntu-latest @@ -74,7 +74,7 @@ jobs: id: import_gpg uses: crazy-max/ghaction-import-gpg@v3 with: - gpg-private-key: ${{ secrets.GPG_PRIVATE_KEY }} + gpg-private-key: ${{ secrets.PGP_SECRET }} passphrase: ${{ secrets.PGP_PASSPHRASE }} - name: GPG user IDs @@ -87,8 +87,8 @@ jobs: - name: publish snapshot run: sbt ++2.13.4 publish env: - OSSRH_USERNAME: ${{ secrets.OSSRH_USERNAME }} - OSSRH_PASSWORD: ${{ secrets.OSSRH_PASSWORD }} + OSSRH_USERNAME: ${{ secrets.SONATYPE_USERNAME }} + OSSRH_PASSWORD: ${{ secrets.SONATYPE_PASSWORD }} scala-3_0: @@ -114,7 +114,7 @@ jobs: id: import_gpg uses: crazy-max/ghaction-import-gpg@v3 with: - gpg-private-key: ${{ secrets.GPG_PRIVATE_KEY }} + gpg-private-key: ${{ secrets.PGP_SECRET }} passphrase: ${{ secrets.PGP_PASSPHRASE }} - name: GPG user IDs @@ -127,5 +127,5 @@ jobs: - name: publish snapshot run: sbt ++3.3.0 elastic4s-scala3/publish env: - OSSRH_USERNAME: ${{ secrets.OSSRH_USERNAME }} - OSSRH_PASSWORD: ${{ secrets.OSSRH_PASSWORD }} + OSSRH_USERNAME: ${{ secrets.SONATYPE_USERNAME }} + OSSRH_PASSWORD: ${{ secrets.SONATYPE_PASSWORD }} diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 80a6e5886..4815593fe 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -1,5 +1,7 @@ name: release +run-name: Release - ${{ inputs.version }} on ${{ inputs.branch }} + on: workflow_dispatch: inputs: @@ -9,7 +11,7 @@ on: branch: description: "The branch to release from" required: true - default: 'master' + default: 'opensearch' jobs: release: @@ -32,7 +34,7 @@ jobs: id: import_gpg uses: crazy-max/ghaction-import-gpg@v3 with: - gpg-private-key: ${{ secrets.GPG_PRIVATE_KEY }} + gpg-private-key: ${{ secrets.PGP_SECRET }} passphrase: ${{ secrets.PGP_PASSPHRASE }} - name: GPG user IDs @@ -46,22 +48,22 @@ jobs: run: sbt ++2.12.12 publishSigned env: RELEASE_VERSION: ${{ github.event.inputs.version }} - OSSRH_USERNAME: ${{ secrets.OSSRH_USERNAME }} - OSSRH_PASSWORD: ${{ secrets.OSSRH_PASSWORD }} + OSSRH_USERNAME: ${{ secrets.SONATYPE_USERNAME }} + OSSRH_PASSWORD: ${{ secrets.SONATYPE_PASSWORD }} - name: publish 2.13 release run: sbt ++2.13.4 publishSigned env: RELEASE_VERSION: ${{ github.event.inputs.version }} - OSSRH_USERNAME: ${{ secrets.OSSRH_USERNAME }} - OSSRH_PASSWORD: ${{ secrets.OSSRH_PASSWORD }} + OSSRH_USERNAME: ${{ secrets.SONATYPE_USERNAME }} + OSSRH_PASSWORD: ${{ secrets.SONATYPE_PASSWORD }} - name: publish 3.0 release run: sbt ++3.3.0 elastic4s-scala3/publishSigned env: RELEASE_VERSION: ${{ github.event.inputs.version }} - OSSRH_USERNAME: ${{ secrets.OSSRH_USERNAME }} - OSSRH_PASSWORD: ${{ secrets.OSSRH_PASSWORD }} + OSSRH_USERNAME: ${{ secrets.SONATYPE_USERNAME }} + OSSRH_PASSWORD: ${{ secrets.SONATYPE_PASSWORD }} - name: tag release run: | diff --git a/.scalafmt.conf b/.scalafmt.conf new file mode 100644 index 000000000..834f2d20f --- /dev/null +++ b/.scalafmt.conf @@ -0,0 +1 @@ +version = 2.7.5 \ No newline at end of file diff --git a/build.sbt b/build.sbt index d53d91c73..97a72b7c1 100644 --- a/build.sbt +++ b/build.sbt @@ -1,6 +1,6 @@ import Dependencies._ -ThisBuild / organizationName := "com.sksamuel.elastic4s" +ThisBuild / organizationName := "io.github.t83714" // Required due to dependency conflict in SBT // See https://github.com/sbt/sbt/issues/6997 @@ -15,7 +15,7 @@ def releaseVersion: String = sys.env.getOrElse("RELEASE_VERSION", "") def isRelease = releaseVersion != "" // the version to use to publish - either from release version or a snapshot run number -def publishVersion = if (isRelease) releaseVersion else "8.6.0." + githubRunNumber + "-SNAPSHOT" +def publishVersion = if (isRelease) releaseVersion else "8.11.6." + githubRunNumber + "-SNAPSHOT" // set by github actions and used as the snapshot build number def githubRunNumber = sys.env.getOrElse("GITHUB_RUN_NUMBER", "local") @@ -40,7 +40,7 @@ lazy val warnUnusedImport = Seq( ) lazy val commonSettings = Seq( - organization := "com.sksamuel.elastic4s", + organization := "io.github.t83714", version := publishVersion, resolvers ++= Seq(Resolver.mavenLocal), Test / parallelExecution := false, @@ -76,15 +76,15 @@ lazy val commonJvmSettings = Seq( lazy val pomSettings = Seq( - homepage := Some(url("https://github.com/sksamuel/elastic4s")), + homepage := Some(url("https://github.com/t83714/elastic4s")), licenses := Seq("Apache 2" -> url("http://www.apache.org/licenses/LICENSE-2.0")), - scmInfo := Some(ScmInfo(url("https://github.com/sksamuel/elastic4s"), "scm:git:git@github.com:sksamuel/elastic4s.git")), - apiURL := Some(url("http://github.com/sksamuel/elastic4s/")), + scmInfo := Some(ScmInfo(url("https://github.com/t83714/elastic4s"), "scm:git:git@github.com:t83714/elastic4s.git")), + apiURL := Some(url("http://github.com/t83714/elastic4s/")), pomExtra := - sksamuel - Sam Samuel - https://github.com/sksamuel + t83714 + Jacky Jiang + https://github.com/t83714 ) diff --git a/elastic4s-core/src/main/scala/com/sksamuel/elastic4s/ElasticDsl.scala b/elastic4s-core/src/main/scala/com/sksamuel/elastic4s/ElasticDsl.scala index ec6fc47e7..1cad784b6 100644 --- a/elastic4s-core/src/main/scala/com/sksamuel/elastic4s/ElasticDsl.scala +++ b/elastic4s-core/src/main/scala/com/sksamuel/elastic4s/ElasticDsl.scala @@ -26,6 +26,7 @@ import com.sksamuel.elastic4s.handlers.update.UpdateHandlers import com.sksamuel.elastic4s.handlers.validate.ValidateHandlers import com.sksamuel.elastic4s.json.XContentBuilder import com.sksamuel.elastic4s.requests.ingest.IngestHandlers +import com.sksamuel.elastic4s.requests.searchPipeline.SearchPipelineHandlers import com.sksamuel.elastic4s.requests.searches.aggs.AbstractAggregation import com.sksamuel.elastic4s.requests.searches.template.SearchTemplateHandlers import com.sksamuel.elastic4s.requests.searches.{SearchHandlers, SearchScrollHandlers, defaultCustomAggregationHandler} @@ -47,6 +48,7 @@ with IndexAliasHandlers with IndexStatsHandlers with IndexTemplateHandlers with IngestHandlers +with SearchPipelineHandlers with LocksHandlers with MappingHandlers with NodesHandlers diff --git a/elastic4s-core/src/main/scala/com/sksamuel/elastic4s/requests/searchPipeline/DeleteSearchPipelineRequest.scala b/elastic4s-core/src/main/scala/com/sksamuel/elastic4s/requests/searchPipeline/DeleteSearchPipelineRequest.scala new file mode 100644 index 000000000..38d25d22a --- /dev/null +++ b/elastic4s-core/src/main/scala/com/sksamuel/elastic4s/requests/searchPipeline/DeleteSearchPipelineRequest.scala @@ -0,0 +1,3 @@ +package com.sksamuel.elastic4s.requests.searchPipeline + +case class DeleteSearchPipelineRequest(id: String) diff --git a/elastic4s-core/src/main/scala/com/sksamuel/elastic4s/requests/searchPipeline/DeleteSearchPipelineResponse.scala b/elastic4s-core/src/main/scala/com/sksamuel/elastic4s/requests/searchPipeline/DeleteSearchPipelineResponse.scala new file mode 100644 index 000000000..7f0c05fb5 --- /dev/null +++ b/elastic4s-core/src/main/scala/com/sksamuel/elastic4s/requests/searchPipeline/DeleteSearchPipelineResponse.scala @@ -0,0 +1,3 @@ +package com.sksamuel.elastic4s.requests.searchPipeline + +case class DeleteSearchPipelineResponse(acknowledged: Boolean) diff --git a/elastic4s-core/src/main/scala/com/sksamuel/elastic4s/requests/searchPipeline/GetSearchPipelineRequest.scala b/elastic4s-core/src/main/scala/com/sksamuel/elastic4s/requests/searchPipeline/GetSearchPipelineRequest.scala new file mode 100644 index 000000000..ebb6b75f3 --- /dev/null +++ b/elastic4s-core/src/main/scala/com/sksamuel/elastic4s/requests/searchPipeline/GetSearchPipelineRequest.scala @@ -0,0 +1,3 @@ +package com.sksamuel.elastic4s.requests.searchPipeline + +case class GetSearchPipelineRequest(id: String) diff --git a/elastic4s-core/src/main/scala/com/sksamuel/elastic4s/requests/searchPipeline/GetSearchPipelineResponse.scala b/elastic4s-core/src/main/scala/com/sksamuel/elastic4s/requests/searchPipeline/GetSearchPipelineResponse.scala new file mode 100644 index 000000000..79560fbd6 --- /dev/null +++ b/elastic4s-core/src/main/scala/com/sksamuel/elastic4s/requests/searchPipeline/GetSearchPipelineResponse.scala @@ -0,0 +1,3 @@ +package com.sksamuel.elastic4s.requests.searchPipeline + +case class GetSearchPipelineResponse(data: SearchPipeline) diff --git a/elastic4s-core/src/main/scala/com/sksamuel/elastic4s/requests/searchPipeline/NormalizationProcessor.scala b/elastic4s-core/src/main/scala/com/sksamuel/elastic4s/requests/searchPipeline/NormalizationProcessor.scala new file mode 100644 index 000000000..477ef6f3b --- /dev/null +++ b/elastic4s-core/src/main/scala/com/sksamuel/elastic4s/requests/searchPipeline/NormalizationProcessor.scala @@ -0,0 +1,140 @@ +package com.sksamuel.elastic4s.requests.searchPipeline + +import com.sksamuel.elastic4s.json.{XContentBuilder, XContentFactory} + +sealed trait NormalizationTechniqueType { + def name: String +} + +object NormalizationTechniqueType { + + val defaultValue: NormalizationTechniqueType = minMax + + val values = Set(minMax, l2) + + def withName(name: String): Option[NormalizationTechniqueType] = + values.map(v => v.name -> v).toMap.get(name) + + case object minMax extends NormalizationTechniqueType { + val name = "min_max" + } + + case object l2 extends NormalizationTechniqueType { + val name = "l2" + } +} + +sealed trait CombinationTechniqueType { + def name: String +} + +object CombinationTechniqueType { + + val defaultValue: CombinationTechniqueType = arithmeticMean + + val values = Set(arithmeticMean, geometricMean, harmonicNean) + + def withName(name: String): Option[CombinationTechniqueType] = + values.map(v => v.name -> v).toMap.get(name) + + case object arithmeticMean extends CombinationTechniqueType { + val name = "arithmetic_mean" + } + + case object geometricMean extends CombinationTechniqueType { + val name = "geometric_mean" + } + + case object harmonicNean extends CombinationTechniqueType { + val name = " harmonic_mean" + } +} + +case class CombinationTechnique( + techniqueType: Option[CombinationTechniqueType] = None, + weights: Option[Seq[Double]] = None +) + +case class NormalizationProcessor( + normalizationTechnique: Option[NormalizationTechniqueType] = None, + combinationTechnique: Option[CombinationTechnique] = None, + tag: Option[String] = None, + description: Option[String] = None +) extends SearchPipelineProcessor { + val processorType = SearchPipelineProcessorType.SearchPhaseResultsProcessor + // for NormalizationProcessor, this value is ignored. If the processor fails, the pipeline always fails and returns an error. + val ignoreFailure = None + + def builderFn(): XContentBuilder = { + val xcb = XContentFactory.jsonBuilder() + xcb.startObject("normalization-processor") + tag.foreach(v => xcb.field("tag", v)) + description.foreach(v => xcb.field("description", v)) + normalizationTechnique.foreach(v => { + xcb.startObject("normalization") + xcb.field("technique", v.name) + xcb.endObject() + }) + combinationTechnique.foreach(ct => { + xcb.startObject("combination") + ct.techniqueType.foreach(t => xcb.field("technique", t.name)) + if (ct.weights.nonEmpty && ct.weights.get.length > 0) { + xcb.startObject("parameters") + xcb.array("weights", ct.weights.get.toArray) + xcb.endObject() + } + xcb.endObject() + }) + xcb.endObject() + } +} + +object NormalizationProcessor { + def fromRawResponse(resp: Map[String, Any]): NormalizationProcessor = { + if ( + resp.isEmpty || resp.get("normalization-processor").isEmpty || !resp + .get("normalization-processor") + .get + .isInstanceOf[Map[String, Any]] + ) { + NormalizationProcessor() + } else { + val pData = + resp.get("normalization-processor").get.asInstanceOf[Map[String, Any]] + NormalizationProcessor( + tag = pData.get("tag").map(_.asInstanceOf[String]), + description = pData.get("description").map(_.asInstanceOf[String]), + normalizationTechnique = pData + .get("normalization") + .flatMap { + case v: Map[String, String] => + v.get("technique") + case _ => None.asInstanceOf[Option[String]] + } + .flatMap(NormalizationTechniqueType.withName(_)), + combinationTechnique = pData + .get("combination") + .flatMap { + case v: Map[String, Any] => + Some( + CombinationTechnique( + techniqueType = v.get("technique").flatMap { + case v: String => CombinationTechniqueType.withName(v) + case _ => None + }, + weights = v.get("parameters").flatMap { + case p: Map[String, Any] => + p.get("weights").map { + case ws: Seq[Any] => ws.map(_.asInstanceOf[Double]) + case _ => Seq() + } + case _ => None + } + ) + ) + case _ => None + } + ) + } + } +} diff --git a/elastic4s-core/src/main/scala/com/sksamuel/elastic4s/requests/searchPipeline/PutSearchPipelineRequest.scala b/elastic4s-core/src/main/scala/com/sksamuel/elastic4s/requests/searchPipeline/PutSearchPipelineRequest.scala new file mode 100644 index 000000000..df8abbc78 --- /dev/null +++ b/elastic4s-core/src/main/scala/com/sksamuel/elastic4s/requests/searchPipeline/PutSearchPipelineRequest.scala @@ -0,0 +1,3 @@ +package com.sksamuel.elastic4s.requests.searchPipeline + +case class PutSearchPipelineRequest(data: SearchPipeline) diff --git a/elastic4s-core/src/main/scala/com/sksamuel/elastic4s/requests/searchPipeline/PutSearchPipelineResponse.scala b/elastic4s-core/src/main/scala/com/sksamuel/elastic4s/requests/searchPipeline/PutSearchPipelineResponse.scala new file mode 100644 index 000000000..3860654b9 --- /dev/null +++ b/elastic4s-core/src/main/scala/com/sksamuel/elastic4s/requests/searchPipeline/PutSearchPipelineResponse.scala @@ -0,0 +1,3 @@ +package com.sksamuel.elastic4s.requests.searchPipeline + +case class PutSearchPipelineResponse(acknowledged: Boolean) diff --git a/elastic4s-core/src/main/scala/com/sksamuel/elastic4s/requests/searchPipeline/SearchPipeline.scala b/elastic4s-core/src/main/scala/com/sksamuel/elastic4s/requests/searchPipeline/SearchPipeline.scala new file mode 100644 index 000000000..fe5028437 --- /dev/null +++ b/elastic4s-core/src/main/scala/com/sksamuel/elastic4s/requests/searchPipeline/SearchPipeline.scala @@ -0,0 +1,67 @@ +package com.sksamuel.elastic4s.requests.searchPipeline + +import com.sksamuel.elastic4s.json.{XContentBuilder, XContentFactory} + +case class SearchPipeline( + id: String, + version: Option[Int] = None, + description: Option[String] = None, + processors: Seq[SearchPipelineProcessor] = Seq() +) { + def builderFn(): XContentBuilder = { + val xcb = XContentFactory.jsonBuilder() + version.foreach(v => xcb.field("version", v)) + description.foreach(v => xcb.field("description", v)) + val phaseResultsProcessors = processors.filter(p => + p.processorType == SearchPipelineProcessorType.SearchPhaseResultsProcessor + ) + val requestProcessors = processors.filter(p => + p.processorType == SearchPipelineProcessorType.SearchRequestProcessor + ) + val responseProcessors = processors.filter(p => + p.processorType == SearchPipelineProcessorType.SearchResponseProcessor + ) + if (phaseResultsProcessors.length > 0) { + xcb.array( + "phase_results_processors", + phaseResultsProcessors.map(_.builderFn()).toArray + ) + } + if (requestProcessors.length > 0) { + xcb.array( + "request_processors", + requestProcessors.map(_.builderFn()).toArray + ) + } + if (responseProcessors.length > 0) { + xcb.array( + "response_processors", + responseProcessors.map(_.builderFn()).toArray + ) + } + xcb + } +} + +object SearchPipeline { + def fromRawResponse(resp: Map[String, Any]): SearchPipeline = { + val (id, pipelineData) = resp.head + val data = pipelineData.asInstanceOf[Map[String, Any]] + val processors = SearchPipelineProcessorType.values.flatMap { pType => + data + .get(pType.name) + .toSeq + .flatMap(_.asInstanceOf[Seq[Any]]) + .map(v => + SearchPipelineProcessor + .fromRawResponse(v.asInstanceOf[Map[String, Any]], pType) + ) + }.toSeq + SearchPipeline( + id = id, + version = data.get("version").map(_.asInstanceOf[Int]), + description = data.get("description").map(_.asInstanceOf[String]), + processors = processors + ) + } +} diff --git a/elastic4s-core/src/main/scala/com/sksamuel/elastic4s/requests/searchPipeline/SearchPipelineHandlers.scala b/elastic4s-core/src/main/scala/com/sksamuel/elastic4s/requests/searchPipeline/SearchPipelineHandlers.scala new file mode 100644 index 000000000..ef6e2026c --- /dev/null +++ b/elastic4s-core/src/main/scala/com/sksamuel/elastic4s/requests/searchPipeline/SearchPipelineHandlers.scala @@ -0,0 +1,59 @@ +package com.sksamuel.elastic4s.requests.searchPipeline + +import com.sksamuel.elastic4s._ +import com.sksamuel.elastic4s.handlers.ElasticErrorParser + +trait SearchPipelineHandlers { + + implicit object GetSearchPipelineRequestHandler + extends Handler[GetSearchPipelineRequest, GetSearchPipelineResponse] { + override def build(request: GetSearchPipelineRequest): ElasticRequest = { + val endpoint = s"/_search/pipeline/${request.id}" + ElasticRequest("GET", endpoint) + } + + override def responseHandler: ResponseHandler[GetSearchPipelineResponse] = + new ResponseHandler[GetSearchPipelineResponse] { + override def handle( + response: HttpResponse + ): Either[ElasticError, GetSearchPipelineResponse] = + response.statusCode match { + case 200 => + val raw = ResponseHandler.fromResponse[Map[String, Any]](response) + + Right( + GetSearchPipelineResponse(data = + SearchPipeline.fromRawResponse(raw) + ) + ) + case _ => + Left(ElasticErrorParser.parse(response)) + } + } + } + + implicit object PutSearchPipelineRequestHandler + extends Handler[PutSearchPipelineRequest, PutSearchPipelineResponse] { + + override def build(request: PutSearchPipelineRequest): ElasticRequest = { + val xcb = request.data.builderFn() + ElasticRequest( + "PUT", + s"/_search/pipeline/${request.data.id}", + HttpEntity(xcb.string) + ) + } + } + + implicit object DeleteSearchPipelineRequestHandler + extends Handler[ + DeleteSearchPipelineRequest, + DeleteSearchPipelineResponse + ] { + override def build(request: DeleteSearchPipelineRequest): ElasticRequest = { + val endpoint = s"/_search/pipeline/${request.id}" + ElasticRequest("DELETE", endpoint) + } + } + +} diff --git a/elastic4s-core/src/main/scala/com/sksamuel/elastic4s/requests/searchPipeline/SearchPipelineProcessor.scala b/elastic4s-core/src/main/scala/com/sksamuel/elastic4s/requests/searchPipeline/SearchPipelineProcessor.scala new file mode 100644 index 000000000..fe119360a --- /dev/null +++ b/elastic4s-core/src/main/scala/com/sksamuel/elastic4s/requests/searchPipeline/SearchPipelineProcessor.scala @@ -0,0 +1,81 @@ +package com.sksamuel.elastic4s.requests.searchPipeline + +import com.sksamuel.elastic4s.json.{XContentBuilder, XContentFactory} + +sealed trait SearchPipelineProcessorType { + def name: String +} + +object SearchPipelineProcessorType { + val values = Set( + SearchRequestProcessor, + SearchResponseProcessor, + SearchPhaseResultsProcessor + ) + + def withName(name: String): Option[SearchPipelineProcessorType] = + values.map(v => v.name -> v).toMap.get(name) + + case object SearchRequestProcessor extends SearchPipelineProcessorType { + val name = "request_processors" + } + case object SearchResponseProcessor extends SearchPipelineProcessorType { + val name = "response_processors" + } + case object SearchPhaseResultsProcessor extends SearchPipelineProcessorType { + val name = "phase_results_processors" + } +} + +/** Abstract representation of a [search pipeline processor](https://opensearch.org/docs/latest/search-plugins/search-pipelines/index/) + * there are three types: request_processors, response_processors & phase_results_processors + */ +trait SearchPipelineProcessor { + def processorType: SearchPipelineProcessorType + def tag: Option[String] + def description: Option[String] + def ignoreFailure: Option[Boolean] + def builderFn(): XContentBuilder +} + +object SearchPipelineProcessor { + def fromRawResponse( + resp: Map[String, Any], + processorType: SearchPipelineProcessorType + ): SearchPipelineProcessor = { + resp.head._1 match { + case "normalization-processor" => + NormalizationProcessor.fromRawResponse(resp) + case _ => + CustomSearchPipelineProcessor.fromRawResponse(resp, processorType) + } + } +} + +/** Processor defined by its name, type and raw Json options. + */ +case class CustomSearchPipelineProcessor( + processorType: SearchPipelineProcessorType, + rawJsonOptions: String +) extends SearchPipelineProcessor { + // for CustomSearchPipelineProcessor, tag, description & ignoreFailure will be supplied via `rawJsonOptions` + val tag = None + val description = None + val ignoreFailure = None + def builderFn(): XContentBuilder = XContentFactory.parse(rawJsonOptions) +} + +object CustomSearchPipelineProcessor { + def fromRawResponse( + resp: Map[String, Any], + processorType: SearchPipelineProcessorType + ): CustomSearchPipelineProcessor = { + val (id, pipelineData) = resp.head + val builder = XContentFactory.jsonBuilder() + builder.autofield(id, pipelineData) + CustomSearchPipelineProcessor( + processorType = processorType, + rawJsonOptions = builder.string + ) + } +} diff --git a/elastic4s-core/src/main/scala/com/sksamuel/elastic4s/requests/searches/SearchHandlers.scala b/elastic4s-core/src/main/scala/com/sksamuel/elastic4s/requests/searches/SearchHandlers.scala index 1121c8777..bda6930ab 100644 --- a/elastic4s-core/src/main/scala/com/sksamuel/elastic4s/requests/searches/SearchHandlers.scala +++ b/elastic4s-core/src/main/scala/com/sksamuel/elastic4s/requests/searches/SearchHandlers.scala @@ -87,6 +87,8 @@ trait SearchHandlers { request.typedKeys.map(_.toString).foreach(params.put("typed_keys", _)) + request.searchPipeline.foreach(params.put("search_pipeline", _)) + val body = request.source.getOrElse(SearchBodyBuilderFn(request, customAggregationHandler).string) ElasticRequest("POST", endpoint, params.toMap, HttpEntity(body, "application/json")) } diff --git a/elastic4s-core/src/test/scala/com/sksamuel/elastic4s/requests/mappings/KnnVectorFieldTest.scala b/elastic4s-core/src/test/scala/com/sksamuel/elastic4s/requests/mappings/KnnVectorFieldTest.scala new file mode 100644 index 000000000..35c044171 --- /dev/null +++ b/elastic4s-core/src/test/scala/com/sksamuel/elastic4s/requests/mappings/KnnVectorFieldTest.scala @@ -0,0 +1,235 @@ +package com.sksamuel.elastic4s.requests.mappings + +import com.sksamuel.elastic4s.ElasticApi +import com.sksamuel.elastic4s.fields.{FaissEncoder, FaissEncoderName, FaissScalarQuantizationType, HnswParameters, IvfParameters, KnnEngine, KnnMethodEncoder, KnnVectorField, SpaceType} +import com.sksamuel.elastic4s.handlers.fields.KnnVectorFieldBuilderFn +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +class KnnVectorFieldTest extends AnyFlatSpec with Matchers with ElasticApi { + + "A KnnVectorField" should "support empty default HnswParameters" in { + KnnVectorFieldBuilderFn + .build( + KnnVectorField(name = "myfield123", dimension = 512, HnswParameters()) + ) + .string shouldBe + """{"type":"knn_vector","dimension":512,"method":{"name":"hnsw","parameters":{}}}""" + } + + "A KnnVectorField" should "support full nsmlib HnswParameters" in { + KnnVectorFieldBuilderFn + .build( + KnnVectorField( + name = "myfield123", + dimension = 512, + HnswParameters( + engine = Some(KnnEngine.nmslib), + spaceType = Some(SpaceType.linf), + efConstruction = Some(100), + m = Some(50) + ) + ) + ) + .string shouldBe + """{"type":"knn_vector", + |"dimension":512, + |"method":{"name":"hnsw","engine":"nmslib","space_type":"linf", + |"parameters":{"ef_construction":100,"m":50}}}""".stripMargin.replace( + "\n", + "" + ) + } + + "A KnnVectorField" should "throw new error when supply unsupported parameters to nsmlib HnswParameters " in { + val exception = intercept[RuntimeException]( + KnnVectorField( + name = "myfield123", + dimension = 512, + HnswParameters( + engine = Some(KnnEngine.nmslib), + spaceType = Some(SpaceType.innerProduct), + efConstruction = Some(100), + m = Some(50), + efSearch = Some(200) + ) + ) + ) + exception shouldBe a[RuntimeException] + exception.getMessage shouldBe "nmslib or lucene doesn't support ef_search or encoder" + } + + "A KnnVectorField" should "throw new error when supply unsupported parameters (space type) to faiss HnswParameters " in { + val exception = intercept[RuntimeException]( + KnnVectorField( + name = "myfield123", + dimension = 512, + HnswParameters( + engine = Some(KnnEngine.faiss), + spaceType = Some(SpaceType.linf), + efConstruction = Some(100), + m = Some(50), + efSearch = Some(200) + ) + ) + ) + exception shouldBe a[RuntimeException] + exception.getMessage shouldBe "faiss engine doesn't support space_type linf" + } + + "A KnnVectorField" should "support faiss HnswParameters" in { + KnnVectorFieldBuilderFn + .build( + KnnVectorField( + name = "myfield123", + dimension = 512, + HnswParameters( + engine = Some(KnnEngine.faiss), + spaceType = Some(SpaceType.innerProduct), + efConstruction = Some(100), + m = Some(50) + ) + ) + ) + .string shouldBe + """{"type":"knn_vector", + |"dimension":512, + |"method":{"name":"hnsw","engine":"faiss","space_type":"innerproduct", + |"parameters":{"ef_construction":100,"m":50}}}""".stripMargin.replace( + "\n", + "" + ) + } + + "A KnnVectorField" should "support full faiss HnswParameters (PQ)" in { + KnnVectorFieldBuilderFn + .build( + KnnVectorField( + name = "myfield123", + dimension = 512, + HnswParameters( + engine = Some(KnnEngine.faiss), + spaceType = Some(SpaceType.l2), + efConstruction = Some(100), + m = Some(50), + efSearch = Some(50), + encoder = Some( + FaissEncoder( + Some(FaissEncoderName.pq), + codeSize = Some(100), + m = Some(50) + ) + ) + ) + ) + ) + .string shouldBe + """{"type":"knn_vector", + |"dimension":512, + |"method":{"name":"hnsw","engine":"faiss","space_type":"l2", + |"parameters":{"ef_construction":100,"m":50,"ef_search":50, + |"encoder":{"name":"pq","parameters":{"m":50,"code_size":100}}}}}""".stripMargin.replace( + "\n", + "" + ) + } + + "A KnnVectorField" should "support full faiss HnswParameters (SQ)" in { + KnnVectorFieldBuilderFn + .build( + KnnVectorField( + name = "myfield123", + dimension = 512, + HnswParameters( + engine = Some(KnnEngine.faiss), + spaceType = Some(SpaceType.innerProduct), + efConstruction = Some(100), + m = Some(50), + efSearch = Some(50), + encoder = Some( + FaissEncoder( + Some(FaissEncoderName.sq), + sqType = Some(FaissScalarQuantizationType.fp16), + sqClip = Some(true) + ) + ) + ) + ) + ) + .string shouldBe + """{"type":"knn_vector", + |"dimension":512, + |"method":{"name":"hnsw","engine":"faiss","space_type":"innerproduct", + |"parameters":{"ef_construction":100,"m":50,"ef_search":50, + |"encoder":{"name":"sq","parameters":{"clip":true,"type":"fp16"}}}}}""".stripMargin.replace( + "\n", + "" + ) + } + + "A KnnVectorField" should "support empty IvfParameters" in { + KnnVectorFieldBuilderFn + .build( + KnnVectorField(name = "myfield123", dimension = 512, IvfParameters()) + ) + .string shouldBe + """{"type":"knn_vector","dimension":512,"method":{"name":"ivf","engine":"faiss","parameters":{}}}""" + } + + "A KnnVectorField" should "support full faiss IvfParameters (SQ)" in { + KnnVectorFieldBuilderFn + .build( + KnnVectorField( + name = "myfield123", + dimension = 512, + IvfParameters( + engine = Some(KnnEngine.faiss), + spaceType = Some(SpaceType.innerProduct), + nlist = Some(4), + nprobes = Some(2), + encoder = Some( + FaissEncoder( + Some(FaissEncoderName.sq), + sqType = Some(FaissScalarQuantizationType.fp16), + sqClip = Some(true) + ) + ) + ) + ) + ) + .string shouldBe + """{"type":"knn_vector", + |"dimension":512, + |"method":{"name":"ivf","engine":"faiss","space_type":"innerproduct", + |"parameters":{"nlist":4,"nprobes":2, + |"encoder":{"name":"sq","parameters":{"clip":true,"type":"fp16"}}}}}""".stripMargin.replace( + "\n", + "" + ) + } + + "A KnnVectorField" should "support full lucene HnswParameters" in { + KnnVectorFieldBuilderFn + .build( + KnnVectorField( + name = "myfield123", + dimension = 512, + HnswParameters( + engine = Some(KnnEngine.lucene), + spaceType = Some(SpaceType.cosine), + efConstruction = Some(100), + m = Some(50) + ) + ) + ) + .string shouldBe + """{"type":"knn_vector", + |"dimension":512, + |"method":{"name":"hnsw","engine":"lucene","space_type":"cosinesimil", + |"parameters":{"ef_construction":100,"m":50}}}""".stripMargin.replace( + "\n", + "" + ) + } + +} diff --git a/elastic4s-core/src/test/scala/com/sksamuel/elastic4s/requests/searchPipeline/DeleteSearchPipelineRequestHandlerTest.scala b/elastic4s-core/src/test/scala/com/sksamuel/elastic4s/requests/searchPipeline/DeleteSearchPipelineRequestHandlerTest.scala new file mode 100644 index 000000000..f2792b149 --- /dev/null +++ b/elastic4s-core/src/test/scala/com/sksamuel/elastic4s/requests/searchPipeline/DeleteSearchPipelineRequestHandlerTest.scala @@ -0,0 +1,29 @@ +package com.sksamuel.elastic4s.requests.searchPipeline + +import com.sksamuel.elastic4s.HttpEntity.StringEntity +import com.sksamuel.elastic4s.{ElasticRequest, HttpResponse} +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +class DeleteSearchPipelineRequestHandlerTest extends AnyFlatSpec with SearchPipelineHandlers with Matchers { + + import DeleteSearchPipelineRequestHandler._ + + it should "build a delete search pipeline" in { + val req = DeleteSearchPipelineRequest("test-pipeline") + + build(req) shouldBe ElasticRequest("DELETE", "/_search/pipeline/test-pipeline") + } + + it should "parse a delete pipeline response" in { + val responseBody = + """ + |{ + | "acknowledged" : true + |} + |""".stripMargin + val response = HttpResponse(200, Some(StringEntity(responseBody, None)), Map.empty) + + responseHandler.handle(response).right.get shouldBe DeleteSearchPipelineResponse(true) + } +} diff --git a/elastic4s-core/src/test/scala/com/sksamuel/elastic4s/requests/searchPipeline/GetSearchPipelineRequestHandlerTest.scala b/elastic4s-core/src/test/scala/com/sksamuel/elastic4s/requests/searchPipeline/GetSearchPipelineRequestHandlerTest.scala new file mode 100644 index 000000000..211e402c8 --- /dev/null +++ b/elastic4s-core/src/test/scala/com/sksamuel/elastic4s/requests/searchPipeline/GetSearchPipelineRequestHandlerTest.scala @@ -0,0 +1,151 @@ +package com.sksamuel.elastic4s.requests.searchPipeline + +import com.sksamuel.elastic4s.HttpEntity.StringEntity +import com.sksamuel.elastic4s.json.XContentFactory +import com.sksamuel.elastic4s.{ElasticRequest, HttpResponse} +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +class GetSearchPipelineRequestHandlerTest + extends AnyFlatSpec + with SearchPipelineHandlers + with Matchers { + + import GetSearchPipelineRequestHandler._ + + it should "build a get search pipeline request" in { + val req = GetSearchPipelineRequest("test-pipeline") + + build(req) shouldBe ElasticRequest("GET", "/_search/pipeline/test-pipeline") + } + + it should "parse a get search pipeline response" in { + val responseBody = + """ + |{ + | "nlp-search-pipeline": { + | "version": 2332, + | "description": "Post processor for hybrid search", + | "phase_results_processors": [ + | { + | "normalization-processor": { + | "normalization": { + | "technique": "min_max" + | }, + | "combination": { + | "technique": "arithmetic_mean", + | "parameters": { + | "weights": [ + | 0.3, + | 0.7 + | ] + | } + | } + | } + | } + | ], + | "request_processors": [ + | { + | "filter_query" : { + | "tag" : "tag1", + | "description" : "This processor is going to restrict to publicly visible documents", + | "query" : { + | "term": { + | "visibility": "public" + | } + | } + | } + | } + | ], + | "response_processors": [ + | { + | "rename_field": { + | "field": "message", + | "target_field": "notification" + | } + | } + | ] + | } + |} + |""".stripMargin + val response = + HttpResponse(200, Some(StringEntity(responseBody, None)), Map.empty) + + val result = responseHandler.handle(response).right.get.data + val expectResult = GetSearchPipelineResponse( + data = SearchPipeline( + "nlp-search-pipeline", + version = Some(2332), + description = Some("Post processor for hybrid search"), + processors = Seq( + NormalizationProcessor( + normalizationTechnique = Some(NormalizationTechniqueType.minMax), + combinationTechnique = Some( + CombinationTechnique( + Some(CombinationTechniqueType.arithmeticMean), + Some(Seq(0.3, 0.7)) + ) + ) + ), + CustomSearchPipelineProcessor( + SearchPipelineProcessorType.SearchRequestProcessor, { + val b = XContentFactory.jsonBuilder() + b.startObject("filter_query") + b.field("tag", "tag1") + b.field( + "description", + "This processor is going to restrict to publicly visible documents" + ) + b.startObject("query") + b.startObject("term") + b.field("visibility", "public") + b.endObject() + b.endObject() + b.endObject() + b.string + } + ), + CustomSearchPipelineProcessor( + SearchPipelineProcessorType.SearchResponseProcessor, { + val b = XContentFactory.jsonBuilder() + b.startObject("rename_field") + b.field("field", "message") + b.field("target_field", "notification") + b.endObject() + b.string + } + ) + ) + ) + ) + + result.copy(processors = Seq.empty) shouldBe expectResult.data + .copy(processors = Seq.empty) + + val resultProcessors = result.processors.map(_.toString).sorted.toList + val expectedPRocessors = expectResult.data.processors.map(_.toString).sorted.toList + + resultProcessors shouldBe expectedPRocessors + } + + it should "parse a get search pipeline response with minimal values" in { + val responseBody = + """ + |{ + | "test-pipeline" : { + | "description" : "describe pipeline" + | } + |} + |""".stripMargin + val response = + HttpResponse(200, Some(StringEntity(responseBody, None)), Map.empty) + + responseHandler.handle(response).right.get shouldBe + GetSearchPipelineResponse( + data = SearchPipeline( + "test-pipeline", + description = Some("describe pipeline") + ) + ) + } +} diff --git a/elastic4s-core/src/test/scala/com/sksamuel/elastic4s/requests/searchPipeline/PutSearchPipelineRequestHandlerTest.scala b/elastic4s-core/src/test/scala/com/sksamuel/elastic4s/requests/searchPipeline/PutSearchPipelineRequestHandlerTest.scala new file mode 100644 index 000000000..eb938aeb9 --- /dev/null +++ b/elastic4s-core/src/test/scala/com/sksamuel/elastic4s/requests/searchPipeline/PutSearchPipelineRequestHandlerTest.scala @@ -0,0 +1,130 @@ +package com.sksamuel.elastic4s.requests.searchPipeline + +import com.sksamuel.elastic4s.json.{XContentFactory} +import com.sksamuel.elastic4s.testutils.StringExtensions._ +import com.sksamuel.elastic4s.{ElasticRequest, HttpEntity} +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +class PutSearchPipelineRequestHandlerTest + extends AnyFlatSpec + with SearchPipelineHandlers + with Matchers { + + import PutSearchPipelineRequestHandler._ + + it should "build a search pipeline request with a version and no processors" in { + val req = PutSearchPipelineRequest(data = + SearchPipeline( + "empty", + description = Some("Do nothing"), + version = Some(1) + ) + ) + + val correctJson = + """ + |{ + | "version":1, + | "description":"Do nothing" + | }""".stripMargin.toCompactJson + + build(req) shouldBe ElasticRequest( + "PUT", + "/_search/pipeline/empty", + HttpEntity(correctJson) + ) + } + + it should "build a search pipeline request with a normalization-processor processor using the NormalizationProcessor case class" in { + val req = PutSearchPipelineRequest( + data = SearchPipeline( + "nlp-pipeline", + description = Some("Post processor for hybrid search"), + version = Some(2332), + processors = Seq( + NormalizationProcessor( + normalizationTechnique = Some(NormalizationTechniqueType.minMax), + combinationTechnique = Some( + CombinationTechnique( + Some(CombinationTechniqueType.arithmeticMean), + Some(Seq(0.3, 0.7)) + ) + ) + ) + ) + ) + ) + val correctJson = """ + |{ + | "version": 2332, + | "description": "Post processor for hybrid search", + | "phase_results_processors": [ + | { + | "normalization-processor": { + | "normalization": { + | "technique": "min_max" + | }, + | "combination": { + | "technique": "arithmetic_mean", + | "parameters": { + | "weights": [ + | 0.3, + | 0.7 + | ] + | } + | } + | } + | } + | ] + |} + |""".stripMargin.toCompactJson + + build(req) shouldBe ElasticRequest( + "PUT", + "/_search/pipeline/nlp-pipeline", + HttpEntity( + correctJson + ) + ) + } + + it should "build a search pipeline with custom processer" in { + val req = PutSearchPipelineRequest( + SearchPipeline( + "test-custom", + processors = Seq( + CustomSearchPipelineProcessor( + SearchPipelineProcessorType.SearchResponseProcessor, { + val b = XContentFactory.jsonBuilder() + b.startObject("rename_field") + b.field("field", "message") + b.field("target_field", "notification") + b.endObject() + b.string + } + ) + ) + ) + ) + val correctJson = """ + |{ + | "response_processors": [ + | { + | "rename_field": { + | "field": "message", + | "target_field": "notification" + | } + | } + | ] + |} + |""".stripMargin.toCompactJson + + build(req) shouldBe ElasticRequest( + "PUT", + "/_search/pipeline/test-custom", + HttpEntity(correctJson) + ) + } + +} diff --git a/elastic4s-core/src/test/scala/com/sksamuel/elastic4s/testutils/StringExtensions.scala b/elastic4s-core/src/test/scala/com/sksamuel/elastic4s/testutils/StringExtensions.scala index adde51f13..4df423a25 100644 --- a/elastic4s-core/src/test/scala/com/sksamuel/elastic4s/testutils/StringExtensions.scala +++ b/elastic4s-core/src/test/scala/com/sksamuel/elastic4s/testutils/StringExtensions.scala @@ -1,5 +1,12 @@ package com.sksamuel.elastic4s.testutils +import com.sksamuel.elastic4s.JacksonSupport +import com.sksamuel.elastic4s.json.{ + ObjectValue, + XContentBuilder, + XContentFactory +} + object StringExtensions { private val LineEndingRegex = s"""(\r\n|\n)""" @@ -7,11 +14,25 @@ object StringExtensions { private val UnixLE = "\n" implicit class StringOps(val target: String) extends AnyVal { - def withWindowsLineEndings: String = target.replaceAll(LineEndingRegex, WindowsLE) + def withWindowsLineEndings: String = + target.replaceAll(LineEndingRegex, WindowsLE) def withUnixLineEndings: String = target.replaceAll(LineEndingRegex, UnixLE) def withoutSpaces: String = target.replaceAll("\\s+", "") + + def toCompactJson: String = new XContentBuilder( + XContentFactory + .jsonBuilder() + .autofield( + "test-data-root-key", + JacksonSupport.mapper + .readValue(target) + ) + .value + .asInstanceOf[ObjectValue] + .map("test-data-root-key") + ).string } } diff --git a/elastic4s-core/src/test/scala/com/sksamuel/elastic4s/testutils/StringExtensionsTest.scala b/elastic4s-core/src/test/scala/com/sksamuel/elastic4s/testutils/StringExtensionsTest.scala index 0c650442c..84da52d9c 100644 --- a/elastic4s-core/src/test/scala/com/sksamuel/elastic4s/testutils/StringExtensionsTest.scala +++ b/elastic4s-core/src/test/scala/com/sksamuel/elastic4s/testutils/StringExtensionsTest.scala @@ -14,4 +14,14 @@ class StringExtensionsTest extends AnyFlatSpec with Matchers { "one\r\ntwo\nthree\r\n".withUnixLineEndings shouldBe "one\ntwo\nthree\n" } + it should "convert JSON string to compact JSON string without whitespace" in { + s""" + | { + | "test" : 123, + | "test2" : "sdsd sds", + | "test array": [1,2, 3 ] + | } + |""".stripMargin.toCompactJson shouldBe """{"test":123,"test2":"sdsd sds","test array":[1,2,3]}""" + } + } diff --git a/elastic4s-domain/src/main/scala/com/sksamuel/elastic4s/fields/KnnVectorField.scala b/elastic4s-domain/src/main/scala/com/sksamuel/elastic4s/fields/KnnVectorField.scala new file mode 100644 index 000000000..b4168d2f3 --- /dev/null +++ b/elastic4s-domain/src/main/scala/com/sksamuel/elastic4s/fields/KnnVectorField.scala @@ -0,0 +1,336 @@ +package com.sksamuel.elastic4s.fields + +sealed trait SpaceType extends Product with Serializable { + def name: String +} + +object SpaceType { + val defaultValue: SpaceType = l2 + + val values = Set(l1, l2, innerProduct, cosine, linf) + + def withName(name: String): Option[SpaceType] = + values.map(v => v.name -> v).toMap.get(name) + + // L1 distance + case object l1 extends SpaceType { val name = "l1" } + // L2 distance + case object l2 extends SpaceType { val name = "l2" } + // Dot product (or inner product) similarity + case object innerProduct extends SpaceType { + val name = "innerproduct" + } + // Cosine similarity + case object cosine extends SpaceType { val name = "cosinesimil" } + // The Linf (for L infinity) distance + case object linf extends SpaceType { val name = "linf" } +} + +sealed trait KnnEngine { + def name: String +} + +object KnnEngine { + val defaultValue: KnnEngine = nmslib + val values = Set(nmslib, faiss, lucene) + + def withName(name: String): Option[KnnEngine] = + values.map(v => v.name -> v).toMap.get(name) + + case object nmslib extends KnnEngine { val name = "nmslib" } + case object faiss extends KnnEngine { val name = "faiss" } + case object lucene extends KnnEngine { val name = "lucene" } +} + +sealed trait KnnMethod { + def name: String +} + +object KnnMethod { + val defaultValue = hnsw + val values = Set(hnsw, ivf) + + def withName(name: String): Option[KnnMethod] = + values.map(v => v.name -> v).toMap.get(name) + + case object hnsw extends KnnMethod { val name = "hnsw" } + case object ivf extends KnnMethod { val name = "ivf" } +} + +sealed trait KnnMethodEncoderName { + def name: String +} + +sealed trait FaissEncoderName extends KnnMethodEncoderName + +object FaissEncoderName { + val defaultValue = flat + val values = Set(flat, pq, sq) + + def withName(name: String): Option[FaissEncoderName] = + values.map(v => v.name -> v).toMap.get(name) + + case object flat extends FaissEncoderName { val name = "flat" } + case object pq extends FaissEncoderName { val name = "pq" } + case object sq extends FaissEncoderName { val name = "sq" } +} + +sealed trait FaissScalarQuantizationType { + def name: String +} + +object FaissScalarQuantizationType { + val defaultValue = fp16 + val values = Set(fp16) + + def withName(name: String): Option[FaissScalarQuantizationType] = + values.map(v => v.name -> v).toMap.get(name) + + case object fp16 extends FaissScalarQuantizationType { val name = "fp16" } +} + +sealed trait KnnMethodEncoder { + def name: Option[KnnMethodEncoderName] +} + +case class FaissEncoder( + name: Option[FaissEncoderName] = None, + m: Option[Int] = None, + codeSize: Option[Int] = None, + sqClip: Option[Boolean] = None, + sqType: Option[FaissScalarQuantizationType] = None +) extends KnnMethodEncoder + +object FaissEncoder { + + def apply( + name: Option[FaissEncoderName] = None, + m: Option[Int] = None, + codeSize: Option[Int] = None, + sqClip: Option[Boolean] = None, + sqType: Option[FaissScalarQuantizationType] = None + ): FaissEncoder = { + name.getOrElse(FaissEncoderName.flat) match { + case FaissEncoderName.pq => + if (sqClip.nonEmpty || sqType.nonEmpty) { + throw new IllegalArgumentException( + "sqClip or sqType parameter is not available for pq encoder type" + ) + } + case FaissEncoderName.sq => + if (m.nonEmpty || codeSize.nonEmpty) { + throw new IllegalArgumentException( + "m or codeSize parameter is not available for sq encoder type" + ) + } + case _ => + if ( + m.nonEmpty || codeSize.nonEmpty || sqClip.nonEmpty || sqType.nonEmpty + ) { + throw new IllegalArgumentException( + "flat encoder type doesn't take any parameters" + ) + } + } + new FaissEncoder( + name = name, + m = m, + codeSize = codeSize, + sqClip = sqClip, + sqType = sqType + ) + } +} + +sealed trait KnnMethodParameters { + def name: String + def engine: Option[KnnEngine] + def spaceType: Option[SpaceType] + def encoder: Option[KnnMethodEncoder] + + private val hmslibSpaceTypes = Set( + SpaceType.l1, + SpaceType.l2, + SpaceType.innerProduct, + SpaceType.cosine, + SpaceType.linf + ) + private val faissSpaceTypesWithPQ: Set[SpaceType] = Set(SpaceType.l2) + private val faissSpaceTypes = Set(SpaceType.l2, SpaceType.innerProduct) + private val luceneSpaceTypes = Set( + SpaceType.l2, + SpaceType.innerProduct, + SpaceType.cosine + ) + + def validateSpaceType() = { + val spaceType = this match { + case p: HnswParameters => p.spaceType + case p: IvfParameters => p.spaceType + } + + val engine = (this match { + case p: HnswParameters => p.engine + case p: IvfParameters => p.engine + }).getOrElse(KnnEngine.defaultValue) + + val encoder = this match { + case p: HnswParameters => p.encoder + case p: IvfParameters => p.encoder + } + + if (spaceType.nonEmpty) { + val spaceTypeVal = spaceType.get + engine match { + case KnnEngine.nmslib => + if (!hmslibSpaceTypes.contains(spaceTypeVal)) { + throw new IllegalArgumentException( + s"""hmslib engine doesn't support space_type ${spaceTypeVal.name}""" + ) + } + case KnnEngine.faiss => + val usePQ = encoder.nonEmpty && (encoder.get match { + case e: FaissEncoder => + e.name.nonEmpty && e.name.get == FaissEncoderName.pq + case _ => false + }) + if (usePQ) { + if (!faissSpaceTypesWithPQ.contains(spaceTypeVal)) { + throw new IllegalArgumentException( + s"""faiss engine with PQ doesn't support space_type ${spaceTypeVal.name}""" + ) + } + } else { + if (!faissSpaceTypes.contains(spaceTypeVal)) { + throw new IllegalArgumentException( + s"""faiss engine doesn't support space_type ${spaceTypeVal.name}""" + ) + } + } + case KnnEngine.lucene => + if (!luceneSpaceTypes.contains(spaceTypeVal)) { + throw new IllegalArgumentException( + s"""lucene engine doesn't support space_type ${spaceTypeVal.name}""" + ) + } + } + } + } +} + +case class HnswParameters( + engine: Option[KnnEngine] = None, + spaceType: Option[SpaceType] = None, + efConstruction: Option[Int] = None, + m: Option[Int] = None, + efSearch: Option[Int] = None, + encoder: Option[KnnMethodEncoder] = None +) extends KnnMethodParameters { + val name = "hnsw" +} + +object HnswParameters { + + def apply( + engine: Option[KnnEngine] = None, + spaceType: Option[SpaceType] = None, + efConstruction: Option[Int] = None, + m: Option[Int] = None, + efSearch: Option[Int] = None, + encoder: Option[KnnMethodEncoder] = None + ): HnswParameters = { + if (engine.nonEmpty) { + engine.get match { + case KnnEngine.faiss => + // do nothing as all parameters are accepted + case _ => + if (efSearch.nonEmpty || encoder.nonEmpty) { + throw new IllegalArgumentException( + "nmslib or lucene doesn't support ef_search or encoder" + ) + } + } + } + val p = new HnswParameters( + engine = engine, + spaceType = spaceType, + efConstruction = efConstruction, + m = m, + efSearch = efSearch, + encoder = encoder + ) + p.validateSpaceType() + p + } +} + +case class IvfParameters( + engine: Option[KnnEngine] = None, + spaceType: Option[SpaceType] = None, + nlist: Option[Int] = None, + nprobes: Option[Int] = None, + encoder: Option[KnnMethodEncoder] = None +) extends KnnMethodParameters { + val name = "ivf" +} + +object IvfParameters { + def apply( + engine: Option[KnnEngine] = None, + spaceType: Option[SpaceType] = None, + nlist: Option[Int] = None, + nprobes: Option[Int] = None, + encoder: Option[KnnMethodEncoder] = None + ): IvfParameters = { + if (engine.nonEmpty) { + engine.get match { + case KnnEngine.faiss => + // do nothing as all parameters are accepted + if (encoder.nonEmpty && !encoder.get.isInstanceOf[FaissEncoder]) { + throw new IllegalArgumentException( + "encoder must be instance of FaissEncoder for faiss engine" + ) + } + case _ => + throw new IllegalArgumentException( + "only faiss engine support ivf method" + ) + } + } + val p = new IvfParameters( + // IVF only supported by faiss so set the engine to faiss if user doesn't + engine = if (engine.isEmpty) Some(KnnEngine.faiss) else engine, + spaceType = spaceType, + nlist = nlist, + nprobes = nprobes, + encoder = encoder + ) + p.validateSpaceType() + p + } +} + +case class KnnVectorField( + name: String, + dimension: Int, + parameters: KnnMethodParameters +) extends ElasticField { + override def `type`: String = KnnVectorField.`type` +} + +object KnnVectorField { + val `type`: String = "knn_vector" + + def apply( + name: String, + dimension: Int, + parameters: KnnMethodParameters + ): KnnVectorField = { + new KnnVectorField( + name = name, + dimension = dimension, + parameters = parameters + ) + } + +} diff --git a/elastic4s-domain/src/main/scala/com/sksamuel/elastic4s/requests/searches/SearchRequest.scala b/elastic4s-domain/src/main/scala/com/sksamuel/elastic4s/requests/searches/SearchRequest.scala index db4675606..d482255cc 100644 --- a/elastic4s-domain/src/main/scala/com/sksamuel/elastic4s/requests/searches/SearchRequest.scala +++ b/elastic4s-domain/src/main/scala/com/sksamuel/elastic4s/requests/searches/SearchRequest.scala @@ -61,7 +61,8 @@ case class SearchRequest(indexes: Indexes, ext: Map[String, Any] = Map.empty, knn: Option[Knn] = None, multipleKnn: Seq[Knn] = Nil, - pit: Option[Pit] = None) { + pit: Option[Pit] = None, + searchPipeline: Option[String] = None) { /** Adds a single string query to this search * @@ -303,4 +304,8 @@ case class SearchRequest(indexes: Indexes, // When a pit is provided, no target must be given copy(pit = Some(pit), indexes = Indexes(Nil)) } + + def searchPipeline(pipelineId: String): SearchRequest = { + copy(searchPipeline = Some(pipelineId)) + } } diff --git a/elastic4s-handlers/src/main/scala/com/sksamuel/elastic4s/handlers/fields/ElasticFieldBuilderFn.scala b/elastic4s-handlers/src/main/scala/com/sksamuel/elastic4s/handlers/fields/ElasticFieldBuilderFn.scala index 1fb8c9b47..abfd59639 100644 --- a/elastic4s-handlers/src/main/scala/com/sksamuel/elastic4s/handlers/fields/ElasticFieldBuilderFn.scala +++ b/elastic4s-handlers/src/main/scala/com/sksamuel/elastic4s/handlers/fields/ElasticFieldBuilderFn.scala @@ -1,6 +1,6 @@ package com.sksamuel.elastic4s.handlers.fields -import com.sksamuel.elastic4s.fields.{AggregateMetricField, AliasField, AnnotatedTextField, BinaryField, BooleanField, CompletionField, ConstantKeywordField, DateField, DateNanosField, DenseVectorField, DynamicField, ElasticField, FlattenedField, GeoPointField, GeoShapeField, HistogramField, IcuCollationKeywordField, IpField, IpRangeField, JoinField, KeywordField, MatchOnlyTextField, Murmur3Field, NestedField, NumberField, ObjectField, PercolatorField, RangeField, RankFeatureField, RankFeaturesField, SearchAsYouTypeField, TextField, TokenCountField, VersionField, WildcardField} +import com.sksamuel.elastic4s.fields.{AggregateMetricField, AliasField, AnnotatedTextField, BinaryField, BooleanField, CompletionField, ConstantKeywordField, DateField, DateNanosField, DenseVectorField, DynamicField, ElasticField, FlattenedField, GeoPointField, GeoShapeField, HistogramField, IcuCollationKeywordField, IpField, IpRangeField, JoinField, KeywordField, KnnVectorField, MatchOnlyTextField, Murmur3Field, NestedField, NumberField, ObjectField, PercolatorField, RangeField, RankFeatureField, RankFeaturesField, SearchAsYouTypeField, TextField, TokenCountField, VersionField, WildcardField} import com.sksamuel.elastic4s.json.XContentBuilder object ElasticFieldBuilderFn { @@ -17,6 +17,7 @@ object ElasticFieldBuilderFn { case f: DateField => DateFieldBuilderFn.build(f) case f: DateNanosField => DateNanosFieldBuilderFn.build(f) case f: DenseVectorField => DenseVectorFieldBuilderFn.build(f) + case f: KnnVectorField => KnnVectorFieldBuilderFn.build(f) case f: DynamicField => DynamicFieldBuilderFn.build(f) case f: FlattenedField => FlattenedFieldBuilderFn.build(f) case f: GeoPointField => GeoPointFieldBuilderFn.build(f) diff --git a/elastic4s-handlers/src/main/scala/com/sksamuel/elastic4s/handlers/fields/KnnVectorFieldBuilderFn.scala b/elastic4s-handlers/src/main/scala/com/sksamuel/elastic4s/handlers/fields/KnnVectorFieldBuilderFn.scala new file mode 100644 index 000000000..f8f22536b --- /dev/null +++ b/elastic4s-handlers/src/main/scala/com/sksamuel/elastic4s/handlers/fields/KnnVectorFieldBuilderFn.scala @@ -0,0 +1,220 @@ +package com.sksamuel.elastic4s.handlers.fields + +import com.sksamuel.elastic4s.fields.{ + FaissEncoder, + FaissEncoderName, + FaissScalarQuantizationType, + HnswParameters, + IvfParameters, + KnnEngine, + KnnVectorField, + SpaceType +} +import com.sksamuel.elastic4s.json.{XContentBuilder, XContentFactory} + +object KnnVectorFieldBuilderFn { + def toField(name: String, values: Map[String, Any]): KnnVectorField = { + KnnVectorField( + name, + values.get("dimension").map(_.asInstanceOf[Int]).get, + (values.get("method") match { + case Some(v) => + val methodFields: Map[String, Any] = v.asInstanceOf[Map[String, Any]] + val methodName = methodFields.get("name").map(_.asInstanceOf[String]) + val spaceType = methodFields + .get("space_type") + .flatMap(v => SpaceType.withName(v.asInstanceOf[String])) + val engine = methodFields + .get("engine") + .flatMap(v => KnnEngine.withName(v.asInstanceOf[String])) + + val efSearch = methodFields + .get("parameters") + .flatMap(v => + v.asInstanceOf[Map[String, Any]] + .get("ef_search") + .map(_.asInstanceOf[Int]) + ) + val efConstruction = methodFields + .get("parameters") + .flatMap(v => + v.asInstanceOf[Map[String, Any]] + .get("ef_construction") + .map(_.asInstanceOf[Int]) + ) + val hnswM = methodFields + .get("parameters") + .flatMap(v => + v.asInstanceOf[Map[String, Any]] + .get("m") + .map(_.asInstanceOf[Int]) + ) + val encoder = methodFields + .get("parameters") + .flatMap(v => + v.asInstanceOf[Map[String, Any]] + .get("encoder") + .map(_.asInstanceOf[Map[String, Any]]) + ) map { e => + val name = e + .get("name") + .map(_.asInstanceOf[String]) + .flatMap(v => FaissEncoderName.withName(v)) + val m = e + .get("parameters") + .flatMap(p => + p.asInstanceOf[Map[String, Any]] + .get("m") + .map(_.asInstanceOf[Int]) + ) + val codeSize = e + .get("parameters") + .flatMap(p => + p.asInstanceOf[Map[String, Any]] + .get("code_size") + .map(_.asInstanceOf[Int]) + ) + val sqClip = e + .get("parameters") + .flatMap(p => + p.asInstanceOf[Map[String, Any]] + .get("clip") + .map(_.asInstanceOf[Boolean]) + ) + val sqType = e + .get("parameters") + .flatMap(p => + p.asInstanceOf[Map[String, Any]] + .get("type") + .map(_.asInstanceOf[String]) + .flatMap(v => FaissScalarQuantizationType.withName(v)) + ) + FaissEncoder( + name = name, + m = m, + codeSize = codeSize, + sqClip = sqClip, + sqType = sqType + ) + } + + methodName match { + case Some("hnsw") => + HnswParameters( + engine, + spaceType, + m = hnswM, + efSearch = efSearch, + efConstruction = efConstruction, + encoder = encoder + ) + case Some("ivf") => + IvfParameters( + engine, + spaceType, + encoder = encoder, + nlist = methodFields + .get("parameters") + .flatMap(v => + v.asInstanceOf[Map[String, Any]] + .get("nlist") + .map(_.asInstanceOf[Int]) + ), + nprobes = methodFields + .get("parameters") + .flatMap(v => + v.asInstanceOf[Map[String, Any]] + .get("nprobes") + .map(_.asInstanceOf[Int]) + ) + ) + case None => + HnswParameters( + engine, + spaceType, + m = hnswM, + efSearch = efSearch, + efConstruction = efConstruction, + encoder = encoder + ) + case Some(invalidName) => + throw new IllegalArgumentException( + s"""Invalid knn_vector field method name: ${invalidName}""" + ) + } + case None => HnswParameters() + }) + ) + } + + def build(field: KnnVectorField): XContentBuilder = { + + val builder = XContentFactory.jsonBuilder() + builder.field("type", field.`type`) + builder.field("dimension", field.dimension) + + // start of `method` field + builder.startObject("method") + + val parameters = field.parameters + builder.field("name", parameters.name) + parameters.engine.foreach(v => builder.field("engine", v.name)) + parameters.spaceType.foreach(v => builder.field("space_type", v.name)) + + // start of `parameters` field + builder.startObject("parameters") + parameters match { + case p: HnswParameters => + p.efConstruction.foreach(v => builder.field("ef_construction", v)) + p.m.foreach(v => builder.field("m", v)) + p.efSearch.foreach(v => builder.field("ef_search", v)) + p.encoder.foreach { encoder => + // start of `encoder` field + builder.startObject("encoder") + encoder.name.foreach(v => builder.field("name", v.name)) + + // start of encoder `parameters` field + builder.startObject("parameters") + encoder match { + case e: FaissEncoder => + e.m.foreach(v => builder.field("m", v)) + e.codeSize.foreach(v => builder.field("code_size", v)) + e.sqClip.foreach(v => builder.field("clip", v)) + e.sqType.foreach(v => builder.field("type", v.name)) + } + // end of encoder `parameters` field + builder.endObject() + // end of `encoder` field + builder.endObject() + } + case p: IvfParameters => + p.nlist.foreach(v => builder.field("nlist", v)) + p.nprobes.foreach(v => builder.field("nprobes", v)) + p.encoder.foreach { encoder => + // start of `encoder` field + builder.startObject("encoder") + encoder.name.foreach(v => builder.field("name", v.name)) + + // start of encoder `parameters` field + builder.startObject("parameters") + encoder match { + case e: FaissEncoder => + e.m.foreach(v => builder.field("m", v)) + e.codeSize.foreach(v => builder.field("code_size", v)) + e.sqClip.foreach(v => builder.field("clip", v)) + e.sqType.foreach(v => builder.field("type", v.name)) + } + // end of encoder `parameters` field + builder.endObject() + // end of `encoder` field + builder.endObject() + } + } + // end of `parameters` field + builder.endObject() + + // end of `method` field + builder.endObject() + builder.endObject() + } +}