Skip to content

Commit

Permalink
Merge pull request #683 from http4s/merge-core-1.0
Browse files Browse the repository at this point in the history
Merge http4s/http4s main
  • Loading branch information
rossabaker authored May 26, 2022
2 parents 910a23b + 1106ba1 commit 7bb10bc
Show file tree
Hide file tree
Showing 41 changed files with 163 additions and 384 deletions.
12 changes: 1 addition & 11 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ jobs:
fail-fast: false
matrix:
os: [ubuntu-latest]
scala: [3.1.2, 2.12.15, 2.13.8]
scala: [3.1.2, 2.13.8]
java: [temurin@8]
runs-on: ${{ matrix.os }}
steps:
Expand Down Expand Up @@ -160,16 +160,6 @@ jobs:
tar xf targets.tar
rm targets.tar
- name: Download target directories (2.12.15)
uses: actions/download-artifact@v2
with:
name: target-${{ matrix.os }}-${{ matrix.java }}-2.12.15

- name: Inflate target directories (2.12.15)
run: |
tar xf targets.tar
rm targets.tar
- name: Download target directories (2.13.8)
uses: actions/download-artifact@v2
with:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ import scala.concurrent.duration._
* be borrowed. Helps deal with connections that are closed while
* idling in the pool for an extended period.
*/
sealed abstract class BlazeClientBuilder[F[_]] private (
final class BlazeClientBuilder[F[_]] private (
val responseHeaderTimeout: Duration,
val idleTimeout: Duration,
val requestTimeout: Duration,
Expand Down Expand Up @@ -100,56 +100,6 @@ sealed abstract class BlazeClientBuilder[F[_]] private (

protected final val logger: Logger = getLogger(this.getClass)

@deprecated("Preserved for binary compatibility", "0.23.8")
private[BlazeClientBuilder] def this(
responseHeaderTimeout: Duration,
idleTimeout: Duration,
requestTimeout: Duration,
connectTimeout: Duration,
userAgent: Option[`User-Agent`],
maxTotalConnections: Int,
maxWaitQueueLimit: Int,
maxConnectionsPerRequestKey: RequestKey => Int,
sslContext: SSLContextOption,
checkEndpointIdentification: Boolean,
maxResponseLineSize: Int,
maxHeaderLength: Int,
maxChunkSize: Int,
chunkBufferMaxSize: Int,
parserMode: ParserMode,
bufferSize: Int,
executionContextConfig: ExecutionContextConfig,
scheduler: Resource[F, TickWheelExecutor],
asynchronousChannelGroup: Option[AsynchronousChannelGroup],
channelOptions: ChannelOptions,
customDnsResolver: Option[RequestKey => Either[Throwable, InetSocketAddress]],
F: Async[F],
) = this(
responseHeaderTimeout = responseHeaderTimeout,
idleTimeout = idleTimeout,
requestTimeout = requestTimeout,
connectTimeout = connectTimeout,
userAgent = userAgent,
maxTotalConnections = maxTotalConnections,
maxWaitQueueLimit = maxWaitQueueLimit,
maxConnectionsPerRequestKey = maxConnectionsPerRequestKey,
sslContext = sslContext,
checkEndpointIdentification = checkEndpointIdentification,
maxResponseLineSize = maxResponseLineSize,
maxHeaderLength = maxHeaderLength,
maxChunkSize = maxChunkSize,
chunkBufferMaxSize = chunkBufferMaxSize,
parserMode = parserMode,
bufferSize = bufferSize,
executionContextConfig = executionContextConfig,
scheduler = scheduler,
asynchronousChannelGroup = asynchronousChannelGroup,
channelOptions = channelOptions,
customDnsResolver = customDnsResolver,
retries = 0,
maxIdleDuration = Duration.Inf,
)(F)

private def copy(
responseHeaderTimeout: Duration = responseHeaderTimeout,
idleTimeout: Duration = idleTimeout,
Expand Down Expand Up @@ -200,7 +150,7 @@ sealed abstract class BlazeClientBuilder[F[_]] private (
customDnsResolver = customDnsResolver,
retries = retries,
maxIdleDuration = maxIdleDuration,
) {}
)

@deprecated(
"Do not use - always returns cats.effect.unsafe.IORuntime.global.compute." +
Expand Down Expand Up @@ -467,7 +417,7 @@ object BlazeClientBuilder {
customDnsResolver = None,
retries = 2,
maxIdleDuration = Duration.Inf,
) {}
)

@deprecated(
"Most users should use the default execution context provided. " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,25 +101,4 @@ private object ConnectionManager {
maxIdleDuration,
)
}

@deprecated("Preserved for binary compatibility", "0.23.8")
def pool[F[_]: Async, A <: Connection[F]](
builder: ConnectionBuilder[F, A],
maxTotal: Int,
maxWaitQueueLimit: Int,
maxConnectionsPerRequestKey: RequestKey => Int,
responseHeaderTimeout: Duration,
requestTimeout: Duration,
executionContext: ExecutionContext,
): F[ConnectionManager.Stateful[F, A]] =
pool(
builder,
maxTotal,
maxWaitQueueLimit,
maxConnectionsPerRequestKey,
responseHeaderTimeout,
requestTimeout,
executionContext,
Duration.Inf,
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ private final class Http1Connection[F[_]](
}

val writeRequest: F[Boolean] = getChunkEncoder(req, mustClose, rr)
.write(rr, req.body)
.write(rr, req.entity)
.onError {
case EOF => F.delay(shutdownWithError(EOF))
case t =>
Expand Down Expand Up @@ -337,14 +337,14 @@ private final class Http1Connection[F[_]](
val status: Status = parser.getStatus()
val httpVersion: HttpVersion = parser.getHttpVersion()

val (attributes, body): (Vault, EntityBody[F]) = if (doesntHaveBody) {
val (attributes, entity): (Vault, Entity[F]) = if (doesntHaveBody) {
// responses to HEAD requests do not have a body
cleanUpAfterReceivingResponse(closeOnFinish, headers)
(Vault.empty, EmptyBody)
(Vault.empty, Entity.Empty)
} else {
// We are to the point of parsing the body and then cleaning up
val (rawBody, _): (EntityBody[F], () => Future[ByteBuffer]) =
collectBodyFromParser(buffer, onEofWhileReadingBody _)
val (rawEntity, _): (Entity[F], () => Future[ByteBuffer]) =
collectEntityFromParser(buffer, onEofWhileReadingBody _)

// to collect the trailers we need a cleanup helper and an effect in the attribute map
val (trailerCleanup, attributes): (() => Unit, Vault) =
Expand All @@ -371,9 +371,9 @@ private final class Http1Connection[F[_]](
if (parser.contentComplete()) {
trailerCleanup()
cleanUpAfterReceivingResponse(closeOnFinish, headers)
attributes -> rawBody
attributes -> rawEntity
} else
attributes -> rawBody.onFinalizeCaseWeak {
attributes -> Entity(rawEntity.body.onFinalizeCaseWeak {
case ExitCase.Succeeded =>
F.delay { trailerCleanup(); cleanUpAfterReceivingResponse(closeOnFinish, headers); }
.evalOn(executionContext)
Expand All @@ -382,7 +382,7 @@ private final class Http1Connection[F[_]](
trailerCleanup(); cleanUpAfterReceivingResponse(closeOnFinish, headers);
stageShutdown()
}.evalOn(executionContext)
}
})
}

cb(
Expand All @@ -391,7 +391,12 @@ private final class Http1Connection[F[_]](
status = status,
httpVersion = httpVersion,
headers = headers,
body = body.interruptWhen(idleTimeoutS),
entity = entity match {
case Entity.Default(body, length) =>
Entity[F](body.interruptWhen(idleTimeoutS), length)
case Entity.Strict(_) | Entity.Empty =>
entity
},
attributes = attributes,
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import cats.effect.std.Semaphore
import cats.effect.syntax.all._
import cats.syntax.all._
import org.http4s.client.RequestKey
import org.http4s.internal.CollectionCompat
import org.log4s.getLogger

import java.time.Instant
Expand Down Expand Up @@ -52,29 +51,6 @@ private final class PoolManager[F[_], A <: Connection[F]](
)(implicit F: Async[F])
extends ConnectionManager.Stateful[F, A] { self =>

@deprecated("Preserved for binary compatibility", "0.23.8")
private[PoolManager] def this(
builder: ConnectionBuilder[F, A],
maxTotal: Int,
maxWaitQueueLimit: Int,
maxConnectionsPerRequestKey: RequestKey => Int,
responseHeaderTimeout: Duration,
requestTimeout: Duration,
semaphore: Semaphore[F],
executionContext: ExecutionContext,
F: Async[F],
) = this(
builder,
maxTotal,
maxWaitQueueLimit,
maxConnectionsPerRequestKey,
responseHeaderTimeout,
requestTimeout,
semaphore,
executionContext,
Duration.Inf,
)(F)

private sealed case class PooledConnection(conn: A, borrowDeadline: Option[Deadline])

private sealed case class Waiting(
Expand Down Expand Up @@ -444,7 +420,7 @@ private final class PoolManager[F[_], A <: Connection[F]](
def isClosed: F[Boolean] = F.delay(self.isClosed)
def allocated: F[Map[RequestKey, Int]] = F.delay(self.allocated.toMap)
def idleQueueDepth: F[Map[RequestKey, Int]] =
F.delay(CollectionCompat.mapValues(self.idleQueues.toMap)(_.size))
F.delay(self.idleQueues.toMap.view.mapValues(_.size).toMap)
def waitQueueDepth: F[Int] = F.delay(self.waitQueue.size)
}
}
Expand Down
25 changes: 0 additions & 25 deletions blaze-client/src/main/scala/org/http4s/client/blaze/package.scala

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -90,15 +90,15 @@ class BlazeClient213Suite extends BlazeClientBase {
client.expect[String](h)
}

val sucessRequests =
val successfulRequests =
(1 to Runtime.getRuntime.availableProcessors * 5).toList.parTraverse { _ =>
val h = successHosts(Random.nextInt(successHosts.length))
client.expect[String](h).map(_.nonEmpty)
}

val allRequests = for {
_ <- failedRequests.handleErrorWith(_ => IO.unit).replicateA(5)
r <- sucessRequests
r <- successfulRequests
} yield r

allRequests
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ package client
import cats.effect._
import cats.effect.kernel.Resource
import cats.effect.std.Dispatcher
import cats.syntax.all._
import cats.implicits.catsSyntaxApplicativeId
import cats.syntax.all._
import fs2.Stream
import io.netty.channel.ChannelHandlerContext
import io.netty.handler.codec.http.HttpMethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ class ClientTimeoutSuite extends CatsEffectSuite with DispatcherIOFixture {
case (tickWheel, dispatcher) =>
// Sending request body hangs so the idle timeout will kick-in after 1s and interrupt the request
val body = Stream.emit[IO, Byte](1.toByte) ++ Stream.never[IO]
val req = Request(method = Method.POST, uri = www_foo_com, body = body)
val req = Request(method = Method.POST, uri = www_foo_com, entity = Entity(body))
val h = new SlowTestHead(Seq(mkBuffer(resp)), 3.seconds, tickWheel)
val c = mkClient(h, tickWheel, dispatcher)(idleTimeout = 1.second)

Expand All @@ -155,7 +155,7 @@ class ClientTimeoutSuite extends CatsEffectSuite with DispatcherIOFixture {
(for {
_ <- IO.unit
body = Stream.emit[IO, Byte](1.toByte) ++ Stream.never[IO]
req = Request(method = Method.POST, uri = www_foo_com, body = body)
req = Request(method = Method.POST, uri = www_foo_com, entity = Entity(body))
q <- Queue.unbounded[IO, Option[ByteBuffer]]
h = new QueueTestHead(q)
(f, b) = resp.splitAt(resp.length - 1)
Expand All @@ -176,7 +176,7 @@ class ClientTimeoutSuite extends CatsEffectSuite with DispatcherIOFixture {
.fixedRate[IO](500.millis)
.take(3)
.mapChunks(_ => Chunk.array(Array.fill(chunkBufferMaxSize + 1)(1.toByte)))
val req = Request(method = Method.POST, uri = www_foo_com, body = body)
val req = Request(method = Method.POST, uri = www_foo_com, entity = Entity(body))
val h = new SlowTestHead(Seq(mkBuffer(resp)), 2000.millis, tickWheel)
val c = mkClient(h, tickWheel, dispatcher)(idleTimeout = 1.second)

Expand Down

This file was deleted.

This file was deleted.

Loading

0 comments on commit 7bb10bc

Please sign in to comment.