Skip to content

Commit

Permalink
Adding a simple layer ontop of AsyncHttpClient
Browse files Browse the repository at this point in the history
  • Loading branch information
ElPicador committed Jun 6, 2016
1 parent 58ad4de commit c0d48c8
Show file tree
Hide file tree
Showing 12 changed files with 277 additions and 498 deletions.
4 changes: 2 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@ scala:
jdk:
- oraclejdk8

script: "sbt clean coverage test"
after_success: "sbt coverageReport coveralls"
script: "sbt ++$TRAVIS_SCALA_VERSION -J-Dsun.net.maxDatagramSockets=2048 clean coverage test"
after_success: "sbt ++$TRAVIS_SCALA_VERSION coverageReport coveralls"
8 changes: 6 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,13 @@ scalaVersion := "2.11.7"
ScoverageSbtPlugin.ScoverageKeys.coverageHighlighting := false //setting to true crashes the coverage
//coverageEnabled := true

val asyncHttpClientVersion = "2.0.3"
val asyncHttpClientVersion = "2.0.4"

val json4sVersion = "3.2.11"

val scalaTestVersion = "2.2.4"
val scalaTestVersion = "2.2.6"
val scalaMockVersion = "3.2"
val scalacheckVersion = "1.12.1"

libraryDependencies += "org.asynchttpclient" % "async-http-client" % asyncHttpClientVersion

Expand All @@ -27,8 +28,11 @@ libraryDependencies += "org.json4s" %% "json4s-native" % json4sVersion

libraryDependencies += "ch.qos.logback" % "logback-classic" % "1.1.3"

libraryDependencies += "com.netaporter" %% "scala-uri" % "0.4.14"

//Testing
libraryDependencies += "org.scalatest" %% "scalatest" % scalaTestVersion % "test"
libraryDependencies += "org.scalacheck" %% "scalacheck" % scalacheckVersion % "test"
libraryDependencies += "org.scalamock" %% "scalamock-scalatest-support" % scalaMockVersion % "test"

scalacOptions ++= Seq("-feature", "-unchecked")
Expand Down
2 changes: 2 additions & 0 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,5 @@ addSbtPlugin("de.heikoseeberger" % "sbt-header" % "1.5.1")

//Auto version
addSbtPlugin("com.eed3si9n" % "sbt-buildinfo" % "0.5.0")

addSbtPlugin("net.virtual-void" % "sbt-dependency-graph" % "0.8.2")
31 changes: 13 additions & 18 deletions src/main/scala/algolia/AlgoliaClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,21 +49,21 @@ class AlgoliaClient(applicationId: String, apiKey: String) {

lazy val indexingHosts: Seq[String] =
s"https://$applicationId.$ALGOLIANET_HOST" +:
random.shuffle(Seq(
s"https://$applicationId-1.$ALGOLIANET_COM_HOST",
s"https://$applicationId-2.$ALGOLIANET_COM_HOST",
s"https://$applicationId-3.$ALGOLIANET_COM_HOST"
))
random.shuffle(Seq(
s"https://$applicationId-1.$ALGOLIANET_COM_HOST",
s"https://$applicationId-2.$ALGOLIANET_COM_HOST",
s"https://$applicationId-3.$ALGOLIANET_COM_HOST"
))

lazy val queryHosts: Seq[String] =
s"https://$applicationId-dsn.$ALGOLIANET_HOST" +:
random.shuffle(Seq(
s"https://$applicationId-1.$ALGOLIANET_COM_HOST",
s"https://$applicationId-2.$ALGOLIANET_COM_HOST",
s"https://$applicationId-3.$ALGOLIANET_COM_HOST"
))
random.shuffle(Seq(
s"https://$applicationId-1.$ALGOLIANET_COM_HOST",
s"https://$applicationId-2.$ALGOLIANET_COM_HOST",
s"https://$applicationId-3.$ALGOLIANET_COM_HOST"
))

lazy val httpClient: DispatchHttpClient = DispatchHttpClient
val httpClient: AlgoliaHttpClient = AlgoliaHttpClient()
val random: AlgoliaRandom = AlgoliaRandom
val userAgent = s"Algolia for Scala ${BuildInfo.scalaVersion} API ${BuildInfo.version}"

Expand Down Expand Up @@ -101,14 +101,9 @@ class AlgoliaClient(applicationId: String, apiKey: String) {
val hosts = if (payload.isSearch) queryHosts else indexingHosts

hosts.foldLeft(Future.failed[T](new TimeoutException())) { (future, host) =>
val start = System.currentTimeMillis()
future.recoverWith {
case e: APIClientException =>
println(s"querying $host took ${System.currentTimeMillis() - start}ms")
Future.failed(e) //No retry if 4XX
case _ =>
println(s"querying $host took ${System.currentTimeMillis() - start}ms")
httpClient request[T](host, headers, payload)
case e: APIClientException => Future.failed(e) //No retry if 4XX
case _ => httpClient.request[T](host, headers, payload)
}
}
}
Expand Down
108 changes: 108 additions & 0 deletions src/main/scala/algolia/AlgoliaHttpClient.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Copyright (c) 2016 Algolia
* http://www.algolia.com/
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/

package algolia

import algolia.http._
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.nio.NioDatagramChannel
import io.netty.resolver.dns.DnsNameResolverBuilder
import org.asynchttpclient._
import org.json4s._
import org.json4s.native.JsonMethods._

import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.util.Try

object default {
val httpReadTimeout = 2000

//httpSocketTimeout in HttpClient
val httpConnectTimeout = 2000

val httpRequestTimeout = 2000

val dnsTimeout = httpConnectTimeout / 10
}

case class AlgoliaHttpClient(httpReadTimeout: Int = default.httpReadTimeout,
httpConnectTimeout: Int = default.httpConnectTimeout,
httpRequestTimeout: Int = default.httpRequestTimeout,
dnsTimeout: Int = default.dnsTimeout) {

val asyncClientConfig =
new DefaultAsyncHttpClientConfig
.Builder()
.setConnectTimeout(httpConnectTimeout)
.setReadTimeout(httpReadTimeout)
.setRequestTimeout(httpRequestTimeout)
.build

val dnsNameResolver =
new DnsNameResolverBuilder(new NioEventLoopGroup(1).next()) //We only need 1 thread for DNS resolution
.channelType(classOf[NioDatagramChannel])
.queryTimeoutMillis(dnsTimeout)
.build

val _httpClient = new DefaultAsyncHttpClient(asyncClientConfig)

implicit val formats: Formats = AlgoliaDsl.formats

def request[T: Manifest](host: String, headers: Map[String, String], payload: HttpPayload)(implicit executor: ExecutionContext): Future[T] = {
val request = payload(host, headers, dnsNameResolver)
makeRequest(request, responseHandler)
}

def responseHandler[T: Manifest] = new AsyncCompletionHandler[T] {
override def onCompleted(response: Response) = response.getStatusCode / 100 match {
case 2 => toJson(response).extract[T]
case 4 => throw APIClientException(response.getStatusCode, (toJson(response) \ "message").extract[String])
case _ => throw UnexpectedResponse(response.getStatusCode)
}
}

def toJson(r: Response) = parse(StringInput(r.getResponseBody), useBigDecimalForDouble = true)

def makeRequest[T](request: Request, handler: AsyncHandler[T])(implicit executor: ExecutionContext): Future[T] = {
val javaFuture = _httpClient.executeRequest(request, handler)
val promise = Promise[T]()
val runnable = new java.lang.Runnable {
def run() {
promise.complete(Try(javaFuture.get()))
}
}
val exec = new java.util.concurrent.Executor {
def execute(runnable: Runnable) {
executor.execute(runnable)
}
}

javaFuture.addListener(runnable, exec)
promise.future
}

}

case class APIClientException(code: Int, message: String) extends Exception("Failure \"%s\", response status: %d".format(message, code))

case class UnexpectedResponse(code: Int) extends Exception("Unexpected response status: %d".format(code))
91 changes: 0 additions & 91 deletions src/main/scala/algolia/DispatchHttpClient.scala

This file was deleted.

43 changes: 37 additions & 6 deletions src/main/scala/algolia/http/HttpPayload.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,50 @@

package algolia.http

private[algolia] sealed trait HttpVerb
import java.net.InetAddress

import io.netty.resolver.NameResolver
import com.netaporter.uri.dsl._
import org.asynchttpclient.{Request, RequestBuilder}

private[algolia] sealed trait HttpVerb

private[algolia] case object GET extends HttpVerb
private[algolia] case object GET extends HttpVerb {
override def toString: String = "GET"
}

private[algolia] case object POST extends HttpVerb
private[algolia] case object POST extends HttpVerb {
override def toString: String = "POST"
}

private[algolia] case object PUT extends HttpVerb
private[algolia] case object PUT extends HttpVerb {
override def toString: String = "PUT"
}

private[algolia] case object DELETE extends HttpVerb
private[algolia] case object DELETE extends HttpVerb {
override def toString: String = "DELETE"
}

private[algolia] case class HttpPayload(verb: HttpVerb,
path: Seq[String],
queryParameters: Option[Map[String, String]] = None,
body: Option[String] = None,
isSearch: Boolean = true)
isSearch: Boolean = true) {

def apply(host: String, headers: Map[String, String], dnsNameResolver: NameResolver[InetAddress]): Request = {
val uri = path.foldLeft(host)((url, p) => url / p)

var builder: RequestBuilder = new RequestBuilder().setMethod(verb.toString).setUrl(uri)

headers.foreach { case (k, v) => builder = builder.addHeader(k, v) }

queryParameters.foreach(
_.foreach { case (k, v) => builder = builder.addQueryParam(k, v) }
)

body.foreach(b => builder = builder.setBody(b))

builder.setNameResolver(dnsNameResolver).build()
}

}
Loading

0 comments on commit c0d48c8

Please sign in to comment.