Skip to content

Commit

Permalink
Merge pull request #628 from armanbilge/http4s-blaze
Browse files Browse the repository at this point in the history
Hello, http4s-blaze
  • Loading branch information
rossabaker authored May 24, 2022
2 parents 7f92287 + 875d98b commit 53f0730
Show file tree
Hide file tree
Showing 106 changed files with 12,476 additions and 35 deletions.
12 changes: 6 additions & 6 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ jobs:
fail-fast: false
matrix:
os: [ubuntu-latest]
scala: [3.0.2, 2.12.15, 2.13.8]
scala: [3.1.2, 2.12.15, 2.13.8]
java: [temurin@8]
runs-on: ${{ matrix.os }}
steps:
Expand Down Expand Up @@ -93,11 +93,11 @@ jobs:

- name: Make target directories
if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/main')
run: mkdir -p target examples/target http/target core/target testkit/target project/target
run: mkdir -p blaze-client/target blaze-server/target target examples/target http/target blaze-core/target core/target testkit/target project/target

- name: Compress target directories
if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/main')
run: tar cf targets.tar target examples/target http/target core/target testkit/target project/target
run: tar cf targets.tar blaze-client/target blaze-server/target target examples/target http/target blaze-core/target core/target testkit/target project/target

- name: Upload target directories
if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/main')
Expand Down Expand Up @@ -150,12 +150,12 @@ jobs:
~/Library/Caches/Coursier/v1
key: ${{ runner.os }}-sbt-cache-v2-${{ hashFiles('**/*.sbt') }}-${{ hashFiles('project/build.properties') }}

- name: Download target directories (3.0.2)
- name: Download target directories (3.1.2)
uses: actions/download-artifact@v2
with:
name: target-${{ matrix.os }}-${{ matrix.java }}-3.0.2
name: target-${{ matrix.os }}-${{ matrix.java }}-3.1.2

- name: Inflate target directories (3.0.2)
- name: Inflate target directories (3.1.2)
run: |
tar xf targets.tar
rm targets.tar
Expand Down
20 changes: 20 additions & 0 deletions .scalafmt.blaze.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
version = 3.5.2

style = default

maxColumn = 100

// Vertical alignment is pretty, but leads to bigger diffs
align.preset = none

danglingParentheses.preset = false

rewrite.rules = [
AvoidInfix
RedundantBraces
RedundantParens
AsciiSortImports
PreferCurlyFors
]

runner.dialect = scala213source3
30 changes: 27 additions & 3 deletions .scalafmt.conf
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,41 @@ style = default

maxColumn = 100

// Docstring wrapping breaks doctests
docstrings.wrap = false

// Vertical alignment is pretty, but leads to bigger diffs
align.preset = none

danglingParentheses.preset = false
danglingParentheses.preset = true

rewrite.rules = [
AvoidInfix
RedundantBraces
RedundantParens
AsciiSortImports
PreferCurlyFors
SortModifiers
]

rewrite.sortModifiers.order = [
override, implicit, private, protected, final, sealed, abstract, lazy
]

runner.dialect = scala213source3
rewrite.trailingCommas.style = multiple

project.excludeFilters = [
"scalafix/*",
"scalafix-internal/input/*",
"scalafix-internal/output/*"
]

runner.dialect = scala212

fileOverride {
"glob:**/scala-3/**/*.scala" {
runner.dialect = scala3
}
"glob:**/scala-2.13/**/*.scala" {
runner.dialect = scala213
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright 2014 http4s.org
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.http4s
package blaze
package client

import cats.effect._
import cats.syntax.all._
import org.http4s.client.RequestKey

private final class BasicManager[F[_], A <: Connection[F]](builder: ConnectionBuilder[F, A])(
implicit F: Sync[F]
) extends ConnectionManager[F, A] {
def borrow(requestKey: RequestKey): F[NextConnection] =
builder(requestKey).map(NextConnection(_, fresh = true))

override def shutdown: F[Unit] =
F.unit

override def invalidate(connection: A): F[Unit] =
F.delay(connection.shutdown())

override def release(connection: A): F[Unit] =
invalidate(connection)
}
164 changes: 164 additions & 0 deletions blaze-client/src/main/scala/org/http4s/blaze/client/BlazeClient.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
/*
* Copyright 2014 http4s.org
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.http4s
package blaze
package client

import cats.Applicative
import cats.effect.implicits._
import cats.effect.kernel.Async
import cats.effect.kernel.Deferred
import cats.effect.kernel.Resource
import cats.effect.kernel.Resource.ExitCase
import cats.effect.std.Dispatcher
import cats.syntax.all._
import org.http4s.blaze.util.TickWheelExecutor
import org.http4s.blazecore.ResponseHeaderTimeoutStage
import org.http4s.client.Client
import org.http4s.client.DefaultClient
import org.http4s.client.RequestKey
import org.http4s.client.UnexpectedStatus
import org.http4s.client.middleware.Retry
import org.http4s.client.middleware.RetryPolicy

import java.net.SocketException
import java.nio.ByteBuffer
import java.util.concurrent.TimeoutException
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._

/** Blaze client implementation */
object BlazeClient {
private[blaze] def makeClient[F[_], A <: BlazeConnection[F]](
manager: ConnectionManager[F, A],
responseHeaderTimeout: Duration,
requestTimeout: Duration,
scheduler: TickWheelExecutor,
ec: ExecutionContext,
retries: Int,
dispatcher: Dispatcher[F],
)(implicit F: Async[F]): Client[F] = {
val base = new BlazeClient[F, A](
manager,
responseHeaderTimeout,
requestTimeout,
scheduler,
ec,
dispatcher,
)
if (retries > 0)
Retry(retryPolicy(retries))(base)
else
base
}

private[this] val retryNow = Duration.Zero.some
private def retryPolicy[F[_]](retries: Int): RetryPolicy[F] = { (req, result, n) =>
result match {
case Left(_: SocketException) if n <= retries && req.isIdempotent => retryNow
case _ => None
}
}
}

private class BlazeClient[F[_], A <: BlazeConnection[F]](
manager: ConnectionManager[F, A],
responseHeaderTimeout: Duration,
requestTimeout: Duration,
scheduler: TickWheelExecutor,
ec: ExecutionContext,
dispatcher: Dispatcher[F],
)(implicit F: Async[F])
extends DefaultClient[F] {

override def run(req: Request[F]): Resource[F, Response[F]] = {
val key = RequestKey.fromRequest(req)
for {
requestTimeoutF <- scheduleRequestTimeout(key)
preparedConnection <- prepareConnection(key)
(conn, responseHeaderTimeoutF) = preparedConnection
timeout = responseHeaderTimeoutF.race(requestTimeoutF).map(_.merge)
responseResource <- Resource.eval(runRequest(conn, req, timeout))
response <- responseResource
} yield response
}

override def defaultOnError(req: Request[F])(resp: Response[F])(implicit
G: Applicative[F]
): F[Throwable] =
resp.body.compile.drain.as(UnexpectedStatus(resp.status, req.method, req.uri))

private def prepareConnection(key: RequestKey): Resource[F, (A, F[TimeoutException])] = for {
conn <- borrowConnection(key)
responseHeaderTimeoutF <- addResponseHeaderTimeout(conn)
} yield (conn, responseHeaderTimeoutF)

private def borrowConnection(key: RequestKey): Resource[F, A] =
Resource.makeCase(manager.borrow(key).map(_.connection)) {
case (conn, ExitCase.Canceled) =>
// Currently we can't just release in case of cancellation, because cancellation clears the Write state of Http1Connection, so it might result in isRecycle=true even if there's a half-written request.
manager.invalidate(conn)
case (conn, _) => manager.release(conn)
}

private def addResponseHeaderTimeout(conn: A): Resource[F, F[TimeoutException]] =
responseHeaderTimeout match {
case d: FiniteDuration =>
Resource.apply(
Deferred[F, Either[Throwable, TimeoutException]].flatMap(timeout =>
F.delay {
val stage = new ResponseHeaderTimeoutStage[ByteBuffer](d, scheduler, ec)
conn.spliceBefore(stage)
stage.init(e => dispatcher.unsafeRunSync(timeout.complete(e).void))
(timeout.get.rethrow, F.delay(stage.removeStage()))
}
)
)
case _ => resourceNeverTimeoutException
}

private def scheduleRequestTimeout(key: RequestKey): Resource[F, F[TimeoutException]] =
requestTimeout match {
case d: FiniteDuration =>
Resource.pure(F.async[TimeoutException] { cb =>
F.delay(
scheduler.schedule(
() =>
cb(
Right(new TimeoutException(s"Request to $key timed out after ${d.toMillis} ms"))
),
ec,
d,
)
).map(c => Some(F.delay(c.cancel())))
})
case _ => resourceNeverTimeoutException
}

private def runRequest(
conn: A,
req: Request[F],
timeout: F[TimeoutException],
): F[Resource[F, Response[F]]] =
conn
.runRequest(req, timeout)
.race(timeout.flatMap(F.raiseError[Resource[F, Response[F]]](_)))
.map(_.merge)

private val resourceNeverTimeoutException = Resource.pure[F, F[TimeoutException]](F.never)

}
Loading

0 comments on commit 53f0730

Please sign in to comment.