diff --git a/.scalafmt.conf b/.scalafmt.conf index c91d0d6af..3f9bc940b 100644 --- a/.scalafmt.conf +++ b/.scalafmt.conf @@ -1,4 +1,4 @@ -version = 2.7.5 +version = 3.0.2 style = default diff --git a/build.sbt b/build.sbt index aba5bed0e..f9f43f6e6 100644 --- a/build.sbt +++ b/build.sbt @@ -31,15 +31,24 @@ lazy val commonSettings = Seq( }, run / fork := true, developers ++= List( - Developer("bryce-anderson" , "Bryce L. Anderson" , "bryce.anderson22@gamil.com" , url("https://github.com/bryce-anderson")), - Developer("rossabaker" , "Ross A. Baker" , "ross@rossabaker.com" , url("https://github.com/rossabaker")), - Developer("ChristopherDavenport" , "Christopher Davenport" , "chris@christopherdavenport.tech" , url("https://github.com/ChristopherDavenport")) + Developer( + "bryce-anderson", + "Bryce L. Anderson", + "bryce.anderson22@gamil.com", + url("https://github.com/bryce-anderson")), + Developer( + "rossabaker", + "Ross A. Baker", + "ross@rossabaker.com", + url("https://github.com/rossabaker")), + Developer( + "ChristopherDavenport", + "Christopher Davenport", + "chris@christopherdavenport.tech", + url("https://github.com/ChristopherDavenport")) ), - licenses := Seq("Apache-2.0" -> url("https://www.apache.org/licenses/LICENSE-2.0.html")), - homepage := Some(url("https://github.com/http4s/blaze")), - scmInfo := Some( ScmInfo( url("https://github.com/http4s/blaze"), @@ -47,7 +56,7 @@ lazy val commonSettings = Seq( Some("scm:git:git@github.com:http4s/blaze.git") ) ), - startYear := Some(2014), + startYear := Some(2014) ) ThisBuild / githubWorkflowJavaVersions := Seq("adopt@1.8") @@ -65,7 +74,8 @@ ThisBuild / githubWorkflowBuild := Seq( WorkflowStep.Sbt(List("validate-ci")) ) -lazy val blaze = project.in(file(".")) +lazy val blaze = project + .in(file(".")) .enablePlugins(Http4sOrgPlugin) .enablePlugins(NoPublishPlugin) .settings(commonSettings) @@ -93,7 +103,8 @@ lazy val core = Project("blaze-core", file("core")) buildInfoOptions += BuildInfoOption.BuildTime, mimaBinaryIssueFilters ++= Seq( // private constructor for which there are no sensible defaults - ProblemFilters.exclude[DirectMissingMethodProblem]("org.http4s.blaze.channel.nio1.NIO1SocketServerGroup.this") + ProblemFilters.exclude[DirectMissingMethodProblem]( + "org.http4s.blaze.channel.nio1.NIO1SocketServerGroup.this") ) ) .dependsOn(testkit % Test) @@ -107,15 +118,19 @@ lazy val http = Project("blaze-http", file("http")) // Test Dependencies libraryDependencies += asyncHttpClient % Test, mimaBinaryIssueFilters ++= Seq( - ProblemFilters.exclude[MissingClassProblem]("org.http4s.blaze.http.http2.PingManager$PingState"), - ProblemFilters.exclude[MissingClassProblem]("org.http4s.blaze.http.http2.PingManager$PingState$"), - ProblemFilters.exclude[MissingClassProblem]("org.http4s.blaze.http.http2.client.ALPNClientSelector$ClientProvider"), - ProblemFilters.exclude[MissingClassProblem]("org.http4s.blaze.http.http2.server.ALPNServerSelector$ServerProvider") + ProblemFilters.exclude[MissingClassProblem]( + "org.http4s.blaze.http.http2.PingManager$PingState"), + ProblemFilters.exclude[MissingClassProblem]( + "org.http4s.blaze.http.http2.PingManager$PingState$"), + ProblemFilters.exclude[MissingClassProblem]( + "org.http4s.blaze.http.http2.client.ALPNClientSelector$ClientProvider"), + ProblemFilters.exclude[MissingClassProblem]( + "org.http4s.blaze.http.http2.server.ALPNServerSelector$ServerProvider") ) ) .dependsOn(testkit % Test, core % "test->test;compile->compile") -lazy val examples = Project("blaze-examples",file("examples")) +lazy val examples = Project("blaze-examples", file("examples")) .enablePlugins(NoPublishPlugin) .settings(commonSettings) .settings(Revolver.settings) @@ -124,7 +139,11 @@ lazy val examples = Project("blaze-examples",file("examples")) /* Helper Functions */ // use it in the local development process -addCommandAlias("validate", ";scalafmtCheckAll ;javafmtCheckAll ;+test:compile ;test ;unusedCompileDependenciesTest ;mimaReportBinaryIssues") +addCommandAlias( + "validate", + ";scalafmtCheckAll ;scalafmtSbtCheck ;javafmtCheckAll ;+test:compile ;test ;unusedCompileDependenciesTest ;mimaReportBinaryIssues") // use it in the CI pipeline -addCommandAlias("validate-ci", ";scalafmtCheckAll ;javafmtCheckAll ;test ;unusedCompileDependenciesTest ;mimaReportBinaryIssues") +addCommandAlias( + "validate-ci", + ";scalafmtCheckAll ;scalafmtSbtCheck ;javafmtCheckAll ;test ;unusedCompileDependenciesTest ;mimaReportBinaryIssues") diff --git a/core/src/main/scala/org/http4s/blaze/channel/ServerChannel.scala b/core/src/main/scala/org/http4s/blaze/channel/ServerChannel.scala index 61b4839a0..80bd87054 100644 --- a/core/src/main/scala/org/http4s/blaze/channel/ServerChannel.scala +++ b/core/src/main/scala/org/http4s/blaze/channel/ServerChannel.scala @@ -36,8 +36,8 @@ abstract class ServerChannel extends Closeable { self => /** Close out any resources associated with the [[ServerChannel]] * - * @note Regardless of the number of times `close()` is called - * this method will only be called once. + * @note + * Regardless of the number of times `close()` is called this method will only be called once. */ protected def closeChannel(): Unit @@ -46,7 +46,8 @@ abstract class ServerChannel extends Closeable { self => /** Close the [[ServerChannel]] and execute any shutdown hooks * - * @note this method is idempotent + * @note + * this method is idempotent */ final def close(): Unit = { val hooks = shutdownHooks.synchronized { @@ -64,8 +65,8 @@ abstract class ServerChannel extends Closeable { self => scheduleHooks(hooks) } - /** Wait for this server channel to close, including execution of all successfully - * registered shutdown hooks. + /** Wait for this server channel to close, including execution of all successfully registered + * shutdown hooks. */ final def join(): Unit = shutdownHooks.synchronized { @@ -75,11 +76,14 @@ abstract class ServerChannel extends Closeable { self => /** Add code to be executed when the [[ServerChannel]] is closed * - * @note There are no guarantees as to order of shutdown hook execution - * or that they will be executed sequentially. + * @note + * There are no guarantees as to order of shutdown hook execution or that they will be executed + * sequentially. * - * @param f hook to execute on shutdown - * @return true if the hook was successfully registered, false otherwise. + * @param f + * hook to execute on shutdown + * @return + * true if the hook was successfully registered, false otherwise. */ final def addShutdownHook(f: () => Unit)(implicit ec: ExecutionContext = Execution.directec): Boolean = diff --git a/core/src/main/scala/org/http4s/blaze/channel/ServerChannelGroup.scala b/core/src/main/scala/org/http4s/blaze/channel/ServerChannelGroup.scala index f2c33d066..51418c814 100644 --- a/core/src/main/scala/org/http4s/blaze/channel/ServerChannelGroup.scala +++ b/core/src/main/scala/org/http4s/blaze/channel/ServerChannelGroup.scala @@ -22,8 +22,9 @@ import scala.util.Try /** Abstraction for binding a server socket and handling connections. * - * @note Implementations may have resources associated with - * them before binding any sockets and should be closed. + * @note + * Implementations may have resources associated with them before binding any sockets and should + * be closed. */ trait ServerChannelGroup { diff --git a/core/src/main/scala/org/http4s/blaze/channel/nio1/NIO1HeadStage.scala b/core/src/main/scala/org/http4s/blaze/channel/nio1/NIO1HeadStage.scala index b68ae9abe..6dd2e9a2b 100644 --- a/core/src/main/scala/org/http4s/blaze/channel/nio1/NIO1HeadStage.scala +++ b/core/src/main/scala/org/http4s/blaze/channel/nio1/NIO1HeadStage.scala @@ -39,11 +39,11 @@ private[nio1] object NIO1HeadStage { /** Performs the read operation * - * @param scratch a ByteBuffer in which to load read data. The method - * doesn't take and ownership interest in the buffer, eg it's - * reference is not retained. - * @return a `Try` representing successfully loading data into `scratch`, or - * the failure cause. + * @param scratch + * a ByteBuffer in which to load read data. The method doesn't take and ownership interest in + * the buffer, eg it's reference is not retained. + * @return + * a `Try` representing successfully loading data into `scratch`, or the failure cause. */ private def performRead(ch: NIO1ClientChannel, scratch: ByteBuffer, size: Int): Try[Unit] = try { @@ -64,8 +64,10 @@ private[nio1] object NIO1HeadStage { } /** Perform the write operation for this channel - * @param buffers buffers to be written to the channel - * @return a WriteResult that is one of Complete, Incomplete or WriteError(e: Exception) + * @param buffers + * buffers to be written to the channel + * @return + * a WriteResult that is one of Complete, Incomplete or WriteError(e: Exception) */ private def performWrite( ch: NIO1ClientChannel, @@ -338,8 +340,7 @@ private[nio1] final class NIO1HeadStage( } } - /** Unsets a channel interest - * only to be called by the SelectorLoop thread + /** Unsets a channel interest only to be called by the SelectorLoop thread */ private[this] def unsetOp(op: Int): Unit = // assert(Thread.currentThread() == loop, diff --git a/core/src/main/scala/org/http4s/blaze/channel/nio1/NIO1SocketServerGroup.scala b/core/src/main/scala/org/http4s/blaze/channel/nio1/NIO1SocketServerGroup.scala index 5e4847587..999f30c01 100644 --- a/core/src/main/scala/org/http4s/blaze/channel/nio1/NIO1SocketServerGroup.scala +++ b/core/src/main/scala/org/http4s/blaze/channel/nio1/NIO1SocketServerGroup.scala @@ -59,8 +59,9 @@ object NIO1SocketServerGroup { /** Create a new [[NIO1SocketServerGroup]] from the [[SelectorLoopPool]]. * - * @note The worker pool is not owned by the group and therefore not - * shutdown when the group is shutdown. + * @note + * The worker pool is not owned by the group and therefore not shutdown when the group is + * shutdown. */ def create( acceptorPool: SelectorLoopPool, @@ -70,11 +71,9 @@ object NIO1SocketServerGroup { ): ServerChannelGroup = new NIO1SocketServerGroup(acceptorPool, workerPool, channelOptions, maxConnections) - /** Create a new [[NIO1SocketServerGroup]] with a fresh [[FixedSelectorPool] - * ] - * - * The resulting [[ServerChannelGroup]] takes ownership of the created pool, - * shutting it down when the group is shutdown. + /** Create a new [[NIO1SocketServerGroup]] with a fresh + * [[FixedSelectorPool] ] The resulting [[ServerChannelGroup]] takes ownership of the created + * pool, shutting it down when the group is shutdown. */ def fixed( workerThreads: Int = DefaultPoolSize, @@ -114,9 +113,10 @@ object NIO1SocketServerGroup { /** A thread resource group for NIO1 network operations * - * @param workerPool [[SelectorLoopPool]] that will belong to this group. The group - * assumes responsibility for shutting it down. Shutting down the - * pool after giving it to this group will result in undefined behavior. + * @param workerPool + * [[SelectorLoopPool]] that will belong to this group. The group assumes responsibility for + * shutting it down. Shutting down the pool after giving it to this group will result in + * undefined behavior. */ private final class NIO1SocketServerGroup private ( acceptorPool: SelectorLoopPool, @@ -247,8 +247,8 @@ private final class NIO1SocketServerGroup private ( toClose.foreach(_.close()) } - /** Create a [[org.http4s.blaze.channel.ServerChannel]] that will serve the - * services on the requisite sockets + /** Create a [[org.http4s.blaze.channel.ServerChannel]] that will serve the services on the + * requisite sockets */ override def bind( address: InetSocketAddress, diff --git a/core/src/main/scala/org/http4s/blaze/channel/nio1/Selectable.scala b/core/src/main/scala/org/http4s/blaze/channel/nio1/Selectable.scala index 60898cf08..fb985e72b 100644 --- a/core/src/main/scala/org/http4s/blaze/channel/nio1/Selectable.scala +++ b/core/src/main/scala/org/http4s/blaze/channel/nio1/Selectable.scala @@ -20,17 +20,15 @@ import java.nio.ByteBuffer /** Type that can be registered with a [[SelectorLoop]] * - * When registered with a `SelectorLoop` it will be notified - * when it has events ready. + * When registered with a `SelectorLoop` it will be notified when it has events ready. */ private trait Selectable { /** Called by the `SelectorLoop` when events are ready * - * @param scratch a `ByteBuffer` that can be used for scratch area. - * This buffer is strictly borrowed for the life of the - * method call and will be passed to other `Selectable` - * instances. + * @param scratch + * a `ByteBuffer` that can be used for scratch area. This buffer is strictly borrowed for the + * life of the method call and will be passed to other `Selectable` instances. */ def opsReady(scratch: ByteBuffer): Unit diff --git a/core/src/main/scala/org/http4s/blaze/channel/nio1/SelectorLoop.scala b/core/src/main/scala/org/http4s/blaze/channel/nio1/SelectorLoop.scala index 59ff099d5..53427c61f 100644 --- a/core/src/main/scala/org/http4s/blaze/channel/nio1/SelectorLoop.scala +++ b/core/src/main/scala/org/http4s/blaze/channel/nio1/SelectorLoop.scala @@ -34,12 +34,15 @@ import scala.util.control.{ControlThrowable, NonFatal} /** A special thread that listens for events on the provided selector. * - * @param selector `Selector` to listen on. - * @param bufferSize Size of the scratch buffer instantiated for this thread. - * @param threadFactory Factory to make the `Thread` instance to run the loop. + * @param selector + * `Selector` to listen on. + * @param bufferSize + * Size of the scratch buffer instantiated for this thread. + * @param threadFactory + * Factory to make the `Thread` instance to run the loop. * - * @note when the `SelectorLoop` is closed all registered `Selectable`s - * are closed with it. + * @note + * when the `SelectorLoop` is closed all registered `Selectable`s are closed with it. */ final class SelectorLoop( selector: Selector, @@ -73,9 +76,8 @@ final class SelectorLoop( /** Schedule the provided `Runnable` for execution, potentially running it now * - * The provided task may be executed *now* if the calling thread is `this` - * `SelectorLoop`, otherwise it is added to the task queue to be executed by - * the `SelectorLoop` thread later. + * The provided task may be executed *now* if the calling thread is `this` `SelectorLoop`, + * otherwise it is added to the task queue to be executed by the `SelectorLoop` thread later. */ @inline @throws[RejectedExecutionException] @@ -87,12 +89,12 @@ final class SelectorLoop( /** Schedule to provided `Runnable` for execution later * - * The task will be added to the end of the queue of tasks scheduled - * for execution regardless of where this method is called. + * The task will be added to the end of the queue of tasks scheduled for execution regardless of + * where this method is called. * - * @see `executeTask` for a method that will execute the task now if - * the calling thread is `this` `SelectorLoop`, or schedule it for - * later otherwise. + * @see + * `executeTask` for a method that will execute the task now if the calling thread is `this` + * `SelectorLoop`, or schedule it for later otherwise. */ @throws[RejectedExecutionException] def enqueueTask(runnable: Runnable): Unit = @@ -116,11 +118,11 @@ final class SelectorLoop( /** Initialize a new `Selectable` channel * - * The `SelectableChannel` is added to the selector loop the - * `Selectable` will be notified when it has events ready. + * The `SelectableChannel` is added to the selector loop the `Selectable` will be notified when + * it has events ready. * - * @note the underlying `SelectableChannel` _must_ be - * configured in non-blocking mode. + * @note + * the underlying `SelectableChannel` _must_ be configured in non-blocking mode. */ def initChannel( ch: NIO1Channel, @@ -245,16 +247,16 @@ final class SelectorLoop( throw ex } - /** A Runnable that will only execute in this selector loop and provides - * access to the `SelectorLoop`s scratch buffer. + /** A Runnable that will only execute in this selector loop and provides access to the + * `SelectorLoop`s scratch buffer. */ abstract class LoopRunnable extends Runnable { /** Execute the task with the borrowed scratch `ByteBuffer` * - * @param scratch a `ByteBuffer` that is owned by the parent - * `SelectorLoop`, and as such, the executing task - * _must not_ retain a refer to it. + * @param scratch + * a `ByteBuffer` that is owned by the parent `SelectorLoop`, and as such, the executing task + * _must not_ retain a refer to it. */ def run(scratch: ByteBuffer): Unit diff --git a/core/src/main/scala/org/http4s/blaze/channel/nio2/ClientChannelFactory.scala b/core/src/main/scala/org/http4s/blaze/channel/nio2/ClientChannelFactory.scala index e9c2354e0..c040dc2c6 100644 --- a/core/src/main/scala/org/http4s/blaze/channel/nio2/ClientChannelFactory.scala +++ b/core/src/main/scala/org/http4s/blaze/channel/nio2/ClientChannelFactory.scala @@ -33,9 +33,11 @@ import scala.util.control.NonFatal * * Provides a way to easily make TCP connections which can then serve as the Head for a pipeline * - * @param bufferSize default buffer size to perform reads - * @param group The `AsynchronousChannelGroup` which will manage the connection. - * `None` will use the system default + * @param bufferSize + * default buffer size to perform reads + * @param group + * The `AsynchronousChannelGroup` which will manage the connection. `None` will use the system + * default */ final class ClientChannelFactory( bufferSize: Int = DefaultBufferSize, diff --git a/core/src/main/scala/org/http4s/blaze/channel/package.scala b/core/src/main/scala/org/http4s/blaze/channel/package.scala index b86cfc473..0ffc7f3ef 100644 --- a/core/src/main/scala/org/http4s/blaze/channel/package.scala +++ b/core/src/main/scala/org/http4s/blaze/channel/package.scala @@ -31,8 +31,8 @@ package object channel { val DefaultPoolSize: Int = math.max(4, Runtime.getRuntime.availableProcessors() + 1) - /** Default max number of connections that can be active at any time. - * A negative number means that there is no max. + /** Default max number of connections that can be active at any time. A negative number means that + * there is no max. */ val DefaultMaxConnections: Int = 512 } diff --git a/core/src/main/scala/org/http4s/blaze/pipeline/Command.scala b/core/src/main/scala/org/http4s/blaze/pipeline/Command.scala index 906d6268e..414625bcb 100644 --- a/core/src/main/scala/org/http4s/blaze/pipeline/Command.scala +++ b/core/src/main/scala/org/http4s/blaze/pipeline/Command.scala @@ -22,18 +22,19 @@ import scala.util.control.NoStackTrace object Command { trait InboundCommand - /** Signals that the pipeline [[HeadStage]] is connected and ready to accept read and write requests */ + /** Signals that the pipeline [[HeadStage]] is connected and ready to accept read and write + * requests + */ case object Connected extends InboundCommand - /** Signals to the tail of the pipeline that it has been disconnected and - * shutdown. Any following reads or writes will result in an exception, [[EOF]], - * a general Exception signaling the stage is not connected, or otherwise. + /** Signals to the tail of the pipeline that it has been disconnected and shutdown. Any following + * reads or writes will result in an exception, [[EOF]], a general Exception signaling the stage + * is not connected, or otherwise. */ case object Disconnected extends InboundCommand - /** Signals to the entire pipeline that the [[HeadStage]] has been disconnected and - * shutdown. Any following reads or writes will result in an exception, [[EOF]] - * or otherwise + /** Signals to the entire pipeline that the [[HeadStage]] has been disconnected and shutdown. Any + * following reads or writes will result in an exception, [[EOF]] or otherwise */ case object EOF extends Exception("EOF") with InboundCommand with NoStackTrace { override def toString: String = getMessage diff --git a/core/src/main/scala/org/http4s/blaze/pipeline/PipelineBuilder.scala b/core/src/main/scala/org/http4s/blaze/pipeline/PipelineBuilder.scala index 4ebe2ff66..e72251bc3 100644 --- a/core/src/main/scala/org/http4s/blaze/pipeline/PipelineBuilder.scala +++ b/core/src/main/scala/org/http4s/blaze/pipeline/PipelineBuilder.scala @@ -16,11 +16,13 @@ package org.http4s.blaze.pipeline -/** By requiring a LeafBuilder, you are ensuring that the pipeline is capped - * with a TailStage as the only way to get a LeafBuilder if by capping with a - * TailStage or getting a new LeafBuilder from a TailStage - * @param leaf the capped pipeline - * @tparam I type the pipeline will read and write +/** By requiring a LeafBuilder, you are ensuring that the pipeline is capped with a TailStage as the + * only way to get a LeafBuilder if by capping with a TailStage or getting a new LeafBuilder from a + * TailStage + * @param leaf + * the capped pipeline + * @tparam I + * type the pipeline will read and write */ final class LeafBuilder[I] private[pipeline] (leaf: Tail[I]) { def prepend[N](stage: MidStage[N, I]): LeafBuilder[N] = { @@ -49,8 +51,8 @@ object LeafBuilder { def apply[T](leaf: TailStage[T]): LeafBuilder[T] = new LeafBuilder[T](leaf) } -/** Facilitates starting a pipeline from a MidStage. Can be appended and prepended - * to build up the pipeline +/** Facilitates starting a pipeline from a MidStage. Can be appended and prepended to build up the + * pipeline */ final class TrunkBuilder[I1, O] private[pipeline] ( protected val head: MidStage[I1, _], diff --git a/core/src/main/scala/org/http4s/blaze/pipeline/Stages.scala b/core/src/main/scala/org/http4s/blaze/pipeline/Stages.scala index 6e025f6c1..876879f42 100644 --- a/core/src/main/scala/org/http4s/blaze/pipeline/Stages.scala +++ b/core/src/main/scala/org/http4s/blaze/pipeline/Stages.scala @@ -53,32 +53,31 @@ sealed trait Stage { /** Start the stage, allocating resources etc. * - * This method should not effect other stages by sending commands etc unless it creates them. - * It is not impossible that the stage will receive other commands besides `Connected` - * before this method is called. It is not impossible for this method to be called multiple - * times by misbehaving stages. It is therefore recommended that the method be idempotent. + * This method should not effect other stages by sending commands etc unless it creates them. It + * is not impossible that the stage will receive other commands besides `Connected` before this + * method is called. It is not impossible for this method to be called multiple times by + * misbehaving stages. It is therefore recommended that the method be idempotent. */ protected def stageStartup(): Unit = logger.debug(s"Starting up.") /** Shuts down the stage, deallocating resources, etc. * - * It will be called when the stage receives a `Disconnected` - * command unless [[inboundCommand]] is overridden. This method - * should not send or `Disconnected` commands. + * It will be called when the stage receives a `Disconnected` command unless [[inboundCommand]] + * is overridden. This method should not send or `Disconnected` commands. * - * It is possible that this will not be called due to failure of - * other stages to propagate shutdown commands. Conversely, it is - * also possible for this to be called more than once due to the - * reception of multiple shutdown commands. It is therefore - * recommended that the method be idempotent. + * It is possible that this will not be called due to failure of other stages to propagate + * shutdown commands. Conversely, it is also possible for this to be called more than once due to + * the reception of multiple shutdown commands. It is therefore recommended that the method be + * idempotent. */ protected def stageShutdown(): Unit = logger.debug(s"Shutting down.") /** Handle basic startup and shutdown commands. * - * @param cmd a command originating from the channel + * @param cmd + * a command originating from the channel */ def inboundCommand(cmd: InboundCommand): Unit = cmd match { @@ -205,11 +204,15 @@ sealed trait Tail[I] extends Stage { /////////////////////////////////////////////////////////////////// /** Schedules a timeout and sets it to race against the provided future - * @param p Promise[T] to be completed by whichever comes first: - * the timeout or resolution of the Future[T] f - * @param f Future the timeout is racing against - * @param timeout time from now which is considered a timeout - * @tparam T type of result expected + * @param p + * Promise[T] to be completed by whichever comes first: the timeout or resolution of the + * Future[T] f + * @param f + * Future the timeout is racing against + * @param timeout + * time from now which is considered a timeout + * @tparam T + * type of result expected */ private def scheduleTimeout[T](p: Promise[T], f: Future[T], timeout: Duration): Unit = { val r = new Runnable { @@ -237,21 +240,25 @@ sealed trait Head[O] extends Stage { /** Called by the next inbound `Stage` to signal interest in reading data. * - * @param size Hint as to the size of the message intended to be read. May not be meaningful or honored. - * @return `Future` that will resolve with the requested inbound data, or an error. + * @param size + * Hint as to the size of the message intended to be read. May not be meaningful or honored. + * @return + * `Future` that will resolve with the requested inbound data, or an error. */ def readRequest(size: Int): Future[O] /** Data that the next inbound `Stage` wants to send outbound. * - * @return a `Future` that resolves when the data has been handled. + * @return + * a `Future` that resolves when the data has been handled. */ def writeRequest(data: O): Future[Unit] /** Collection of data that the next inbound `Stage` wants to sent outbound. * * It is generally assumed that the order of elements has meaning. - * @return a `Future` that resolves when the data has been handled. + * @return + * a `Future` that resolves when the data has been handled. */ def writeRequest(data: collection.Seq[O]): Future[Unit] = data.foldLeft[Future[Unit]](FutureUnit)((f, d) => f.flatMap(_ => writeRequest(d))(directec)) @@ -274,8 +281,7 @@ sealed trait Head[O] extends Stage { } } - /** Receives inbound commands - * Override to capture commands. + /** Receives inbound commands Override to capture commands. */ override def inboundCommand(cmd: InboundCommand): Unit = { super.inboundCommand(cmd) @@ -324,7 +330,8 @@ trait HeadStage[O] extends Head[O] { /** Close the channel with an error * - * @note EOF is a valid error to close the channel with and signals normal termination. + * @note + * EOF is a valid error to close the channel with and signals normal termination. */ protected def doClosePipeline(cause: Option[Throwable]): Unit @@ -355,7 +362,7 @@ trait MidStage[I, O] extends Tail[I] with Head[O] { stageShutdown() if (_prevStage != null && _nextStage != null) { - logger.debug(s"Removed mid-stage: ${name}") + logger.debug(s"Removed mid-stage: $name") val me: MidStage[I, I] = ev(this) _prevStage._nextStage = me._nextStage me._nextStage._prevStage = me._prevStage @@ -363,6 +370,6 @@ trait MidStage[I, O] extends Tail[I] with Head[O] { _nextStage = null _prevStage = null } else - logger.debug(s"Attempted to remove a disconnected mid-stage: ${name}") + logger.debug(s"Attempted to remove a disconnected mid-stage: $name") } } diff --git a/core/src/main/scala/org/http4s/blaze/pipeline/stages/ByteToObjectStage.scala b/core/src/main/scala/org/http4s/blaze/pipeline/stages/ByteToObjectStage.scala index a3682324b..68565309e 100644 --- a/core/src/main/scala/org/http4s/blaze/pipeline/stages/ByteToObjectStage.scala +++ b/core/src/main/scala/org/http4s/blaze/pipeline/stages/ByteToObjectStage.scala @@ -33,19 +33,23 @@ trait ByteToObjectStage[O] extends MidStage[ByteBuffer, O] { ///////////////////////////////////////////////////////////////////////////// /** Encode objects to buffers - * @param in object to decode - * @return sequence of ByteBuffers to pass to the head + * @param in + * object to decode + * @return + * sequence of ByteBuffers to pass to the head */ def messageToBuffer(in: O): collection.Seq[ByteBuffer] /** Method that decodes ByteBuffers to objects. None reflects not enough data to decode a message * Any unused data in the ByteBuffer will be recycled and available for the next read. * - * WARNING: don't count on the underlying array of the ByteBuffer. This uses the slice method, which - * could preserve access to the buffer, but mess with the various positions. + * WARNING: don't count on the underlying array of the ByteBuffer. This uses the slice method, + * which could preserve access to the buffer, but mess with the various positions. * - * @param in ByteBuffer of immediately available data - * @return optional message if enough data was available + * @param in + * ByteBuffer of immediately available data + * @return + * optional message if enough data was available */ def bufferToMessage(in: ByteBuffer): Option[O] diff --git a/core/src/main/scala/org/http4s/blaze/pipeline/stages/OneMessageStage.scala b/core/src/main/scala/org/http4s/blaze/pipeline/stages/OneMessageStage.scala index 32a223f74..ea1255cd8 100644 --- a/core/src/main/scala/org/http4s/blaze/pipeline/stages/OneMessageStage.scala +++ b/core/src/main/scala/org/http4s/blaze/pipeline/stages/OneMessageStage.scala @@ -19,12 +19,11 @@ package org.http4s.blaze.pipeline.stages import org.http4s.blaze.pipeline.MidStage import scala.concurrent.Future -/** Holds a single element that when read, will eject itself - * from the pipeline. - * @note There is an intrinsic race condition between this stage - * removing itself and write commands. Therefore, pipeline - * reads and writes must be performed in a thread safe manner - * until the first read has completed. +/** Holds a single element that when read, will eject itself from the pipeline. + * @note + * There is an intrinsic race condition between this stage removing itself and write commands. + * Therefore, pipeline reads and writes must be performed in a thread safe manner until the first + * read has completed. */ class OneMessageStage[T](element: T) extends MidStage[T, T] { override def name: String = "OneMessageStage" diff --git a/core/src/main/scala/org/http4s/blaze/pipeline/stages/monitors/BasicConnectionMonitor.scala b/core/src/main/scala/org/http4s/blaze/pipeline/stages/monitors/BasicConnectionMonitor.scala index e143254f6..4f4d8091d 100644 --- a/core/src/main/scala/org/http4s/blaze/pipeline/stages/monitors/BasicConnectionMonitor.scala +++ b/core/src/main/scala/org/http4s/blaze/pipeline/stages/monitors/BasicConnectionMonitor.scala @@ -18,9 +18,8 @@ package org.http4s.blaze.pipeline.stages.monitors import java.util.concurrent.atomic.{AtomicInteger, AtomicLong} -/** Simple connection information monitor - * This monitor provides only the most basic connection information: - * inbound and outbound bytes and live connections +/** Simple connection information monitor This monitor provides only the most basic connection + * information: inbound and outbound bytes and live connections */ class BasicConnectionMonitor extends ConnectionMonitor { private val inboundBytes = new AtomicLong(0) diff --git a/core/src/main/scala/org/http4s/blaze/util/Actor.scala b/core/src/main/scala/org/http4s/blaze/util/Actor.scala index 2e8b1b4c6..50eec94bf 100644 --- a/core/src/main/scala/org/http4s/blaze/util/Actor.scala +++ b/core/src/main/scala/org/http4s/blaze/util/Actor.scala @@ -24,9 +24,8 @@ import scala.util.control.NonFatal import org.http4s.blaze.util.Actor.DefaultMaxIterations -/** Lightweight actor system HEAVILY inspired by the scalaz actors. - * scalaz actors would have been a good fit except a heavyweight dependency - * is very undesirable for this library. +/** Lightweight actor system HEAVILY inspired by the scalaz actors. scalaz actors would have been a + * good fit except a heavyweight dependency is very undesirable for this library. */ private[blaze] abstract class Actor[M]( ec: ExecutionContext, diff --git a/core/src/main/scala/org/http4s/blaze/util/BasicThreadFactory.scala b/core/src/main/scala/org/http4s/blaze/util/BasicThreadFactory.scala index 15205e5c7..b2e3caa8b 100644 --- a/core/src/main/scala/org/http4s/blaze/util/BasicThreadFactory.scala +++ b/core/src/main/scala/org/http4s/blaze/util/BasicThreadFactory.scala @@ -23,8 +23,8 @@ private[blaze] object BasicThreadFactory { /** Construct a basic `ThreadFactory` * - * Resulting threads are named with their prefix plus '-unique_id' - * where unique id is an increasing integer. + * Resulting threads are named with their prefix plus '-unique_id' where unique id is an + * increasing integer. */ def apply(prefix: String, daemonThreads: Boolean): ThreadFactory = new Impl(prefix, daemonThreads) diff --git a/core/src/main/scala/org/http4s/blaze/util/BufferTools.scala b/core/src/main/scala/org/http4s/blaze/util/BufferTools.scala index 2a282cfaa..7a0dc3f9d 100644 --- a/core/src/main/scala/org/http4s/blaze/util/BufferTools.scala +++ b/core/src/main/scala/org/http4s/blaze/util/BufferTools.scala @@ -32,7 +32,8 @@ object BufferTools { /** Allocate a new `ByteBuffer` on the heap * - * @param size size of desired `ByteBuffer` + * @param size + * size of desired `ByteBuffer` */ def allocate(size: Int): ByteBuffer = ByteBuffer.allocate(size) @@ -66,8 +67,8 @@ object BufferTools { new String(arr, charset) } - /** Join the two buffers into a single ByteBuffer. This method is - * guaranteed to return a ByteBuffer, but it may be empty. + /** Join the two buffers into a single ByteBuffer. This method is guaranteed to return a + * ByteBuffer, but it may be empty. */ def concatBuffers(oldbuff: ByteBuffer, newbuff: ByteBuffer): ByteBuffer = if (oldbuff == null) @@ -101,9 +102,13 @@ object BufferTools { /** Take a slice of bytes from the `ByteBuffer`, consuming the bytes. * - * @param buffer `ByteBuffer` to slice - * @param size number of bytes to slice. Must be less than or equal to the number of bytes remaining in `buffer`. - * @return the resulting view + * @param buffer + * `ByteBuffer` to slice + * @param size + * number of bytes to slice. Must be less than or equal to the number of bytes remaining in + * `buffer`. + * @return + * the resulting view */ def takeSlice(buffer: ByteBuffer, size: Int): ByteBuffer = { if (size < 0 || size > buffer.remaining()) @@ -121,8 +126,10 @@ object BufferTools { /** Check the array of buffers to ensure they are all empty * - * @param buffers `ByteBuffer`s to check for data - * @return true if they are empty, false if there is data remaining + * @param buffers + * `ByteBuffer`s to check for data + * @return + * true if they are empty, false if there is data remaining */ def checkEmpty(buffers: Array[ByteBuffer]): Boolean = { @tailrec @@ -133,9 +140,9 @@ object BufferTools { checkEmpty(buffers.length - 1) } - /** Replaces any empty buffers except for the last one with the `emptyBuffer` - * to allow GC of depleted ByteBuffers and returns the index of the first - * non-empty ByteBuffer, or the last index, whichever comes first. + /** Replaces any empty buffers except for the last one with the `emptyBuffer` to allow GC of + * depleted ByteBuffers and returns the index of the first non-empty ByteBuffer, or the last + * index, whichever comes first. */ def dropEmpty(buffers: Array[ByteBuffer]): Int = { val max = buffers.length - 1 @@ -149,8 +156,10 @@ object BufferTools { /** Check the array of buffers to ensure they are all empty * - * @param buffers `ByteBuffer`s to check for data - * @return true if they are empty, false if there is data remaining + * @param buffers + * `ByteBuffer`s to check for data + * @return + * true if they are empty, false if there is data remaining */ def checkEmpty(buffers: Iterable[ByteBuffer]): Boolean = !buffers.iterator.exists(_.hasRemaining) @@ -161,14 +170,16 @@ object BufferTools { charset.decode(b).toString() } - /** Copies as much data from the input buffers as possible without modifying positions - * of the input buffers + /** Copies as much data from the input buffers as possible without modifying positions of the + * input buffers * - * @param buffers collection of buffers to copy. This may be an empty array and the array - * may contain `null` elements. The positions, marks, and marks of the input - * buffers will not be modified. - * @param out `ByteBuffer` that the data will be copied into. This must not be `null` - * @return Number of bytes copied. + * @param buffers + * collection of buffers to copy. This may be an empty array and the array may contain `null` + * elements. The positions, marks, and marks of the input buffers will not be modified. + * @param out + * `ByteBuffer` that the data will be copied into. This must not be `null` + * @return + * Number of bytes copied. */ private[blaze] def copyBuffers(buffers: Array[ByteBuffer], out: ByteBuffer): Int = { val start = out.position() @@ -198,11 +209,14 @@ object BufferTools { /** Forward the positions of the collection of `ByteBuffer`s * - * @param buffers `ByteBuffers` to modify. The positions will be incremented from the - * first in the collection to the last. - * @param size Number of bytes to fast-forward the arrays - * @return whether there was enough bytes in the collection of buffers or if the size - * overran the available data. + * @param buffers + * `ByteBuffers` to modify. The positions will be incremented from the first in the collection + * to the last. + * @param size + * Number of bytes to fast-forward the arrays + * @return + * whether there was enough bytes in the collection of buffers or if the size overran the + * available data. */ private[blaze] def fastForwardBuffers(buffers: Array[ByteBuffer], size: Int): Boolean = { require(size >= 0) @@ -240,8 +254,9 @@ object BufferTools { * * The passed buffer is not mutated, even temporarily. * - * @note this is not intended to be a high performance method and should only - * be used for debugging purposes. + * @note + * this is not intended to be a high performance method and should only be used for debugging + * purposes. */ private[blaze] def hexString(buffer: ByteBuffer, limit: Int = Int.MaxValue): String = { val sb = new StringBuilder(buffer.toString) diff --git a/core/src/main/scala/org/http4s/blaze/util/Execution.scala b/core/src/main/scala/org/http4s/blaze/util/Execution.scala index dffa43fce..a7e89728c 100644 --- a/core/src/main/scala/org/http4s/blaze/util/Execution.scala +++ b/core/src/main/scala/org/http4s/blaze/util/Execution.scala @@ -50,10 +50,9 @@ object Execution { /** A trampolining `ExecutionContext` * - * This `ExecutionContext` is run thread locally to avoid context switches. - * Because this is a thread local executor, if there is a dependence between - * the submitted `Runnable`s and the thread becomes blocked, there will be - * a deadlock. + * This `ExecutionContext` is run thread locally to avoid context switches. Because this is a + * thread local executor, if there is a dependence between the submitted `Runnable`s and the + * thread becomes blocked, there will be a deadlock. */ val trampoline: ExecutionContext = new ExecutionContext { private val local = new ThreadLocal[ThreadLocalTrampoline] @@ -73,8 +72,8 @@ object Execution { /** Execute `Runnable`s directly on the current thread, using a stack frame. * - * This is not safe to use for recursive function calls as you will ultimately - * encounter a stack overflow. For those situations, use `trampoline`. + * This is not safe to use for recursive function calls as you will ultimately encounter a stack + * overflow. For those situations, use `trampoline`. */ val directec: ExecutionContext = new ExecutionContext { def execute(runnable: Runnable): Unit = runnable.run() diff --git a/core/src/main/scala/org/http4s/blaze/util/SerialExecutionContext.scala b/core/src/main/scala/org/http4s/blaze/util/SerialExecutionContext.scala index a754a65c6..ccceea9ca 100644 --- a/core/src/main/scala/org/http4s/blaze/util/SerialExecutionContext.scala +++ b/core/src/main/scala/org/http4s/blaze/util/SerialExecutionContext.scala @@ -20,12 +20,13 @@ import scala.concurrent.ExecutionContext /** Serialize execution of work, ensuring that no passed work is executed in parallel. * - * Tasks are executed sequentially, in the order they are offered. Each task has a - * happens-before relationship with subsequent tasks, meaning mutations performed - * in a task are observed by all sequent tasks. + * Tasks are executed sequentially, in the order they are offered. Each task has a happens-before + * relationship with subsequent tasks, meaning mutations performed in a task are observed by all + * sequent tasks. * - * @param parent `ExecutionContext` with which to perform the work, which may consist - * of many tasks queued in the `SerialExecutionContext`. + * @param parent + * `ExecutionContext` with which to perform the work, which may consist of many tasks queued in + * the `SerialExecutionContext`. */ class SerialExecutionContext( parent: ExecutionContext diff --git a/core/src/main/scala/org/http4s/blaze/util/StageTools.scala b/core/src/main/scala/org/http4s/blaze/util/StageTools.scala index 5698f863d..2085e5bb3 100644 --- a/core/src/main/scala/org/http4s/blaze/util/StageTools.scala +++ b/core/src/main/scala/org/http4s/blaze/util/StageTools.scala @@ -28,10 +28,13 @@ private[http4s] object StageTools { /** Accumulate bytes from a pipeline * - * @param bytes the minimum number of by bytes desired - * @param stage pipeline stage that you want to pull the bytes from - * @return a `Future` which contains a `ByteBuffer` with at least `bytes` bytes or a - * `Throwable` received by the pipeline. + * @param bytes + * the minimum number of by bytes desired + * @param stage + * pipeline stage that you want to pull the bytes from + * @return + * a `Future` which contains a `ByteBuffer` with at least `bytes` bytes or a `Throwable` + * received by the pipeline. */ def accumulateAtLeast(bytes: Int, stage: TailStage[ByteBuffer]): Future[ByteBuffer] = if (bytes < 0) diff --git a/core/src/main/scala/org/http4s/blaze/util/TaskQueue.scala b/core/src/main/scala/org/http4s/blaze/util/TaskQueue.scala index 5f28a6df3..0965ee976 100644 --- a/core/src/main/scala/org/http4s/blaze/util/TaskQueue.scala +++ b/core/src/main/scala/org/http4s/blaze/util/TaskQueue.scala @@ -80,9 +80,9 @@ private[blaze] final class TaskQueue { /** Execute all tasks in the queue * - * All tasks, including those that are scheduled during the invocation of - * this method, either by this a task that is executed or those scheduled - * from another thread, will be executed in the calling thread. + * All tasks, including those that are scheduled during the invocation of this method, either by + * this a task that is executed or those scheduled from another thread, will be executed in the + * calling thread. */ def executeTasks(): Unit = { // spin while we wait for the tail of the node to resolve diff --git a/core/src/main/scala/org/http4s/blaze/util/ThreadLocalScratchBuffer.scala b/core/src/main/scala/org/http4s/blaze/util/ThreadLocalScratchBuffer.scala index e0867200e..feb47f6ec 100644 --- a/core/src/main/scala/org/http4s/blaze/util/ThreadLocalScratchBuffer.scala +++ b/core/src/main/scala/org/http4s/blaze/util/ThreadLocalScratchBuffer.scala @@ -19,13 +19,11 @@ package org.http4s.blaze.util import java.nio.ByteBuffer import org.log4s.getLogger -/** Create a cache of thread-local ByteBuffer instances useful - * for scratch purposes. +/** Create a cache of thread-local ByteBuffer instances useful for scratch purposes. * - * This is an advanced feature and should be used with extreme - * caution. It is _very_ easy to let one of these buffers escape - * the current thread, or even be used improperly by the _same_ - * thread, resulting in data corruption. + * This is an advanced feature and should be used with extreme caution. It is _very_ easy to let + * one of these buffers escape the current thread, or even be used improperly by the _same_ thread, + * resulting in data corruption. */ private[blaze] final class ThreadLocalScratchBuffer(useDirect: Boolean) { private[this] val logger = getLogger @@ -37,10 +35,8 @@ private[blaze] final class ThreadLocalScratchBuffer(useDirect: Boolean) { /** Get a thread-local scratch buffer * - * The resulting buffer is stored in a ThreadLocal and is thus - * shared between invocations by a single thread. As such, the - * resulting buffers must only be used within the current call - * stack. + * The resulting buffer is stored in a ThreadLocal and is thus shared between invocations by a + * single thread. As such, the resulting buffers must only be used within the current call stack. */ final def getScratchBuffer(size: Int): ByteBuffer = { val b = localBuffer.get() @@ -58,8 +54,7 @@ private[blaze] final class ThreadLocalScratchBuffer(useDirect: Boolean) { /** Remove current scratch buffer. * - * Clears the current thread local buffer. This is useful for - * making them available to GC. + * Clears the current thread local buffer. This is useful for making them available to GC. */ final def clearScratchBuffer(): Unit = { logger.trace("Removing thread local ByteBuffer") diff --git a/core/src/main/scala/org/http4s/blaze/util/TickWheelExecutor.scala b/core/src/main/scala/org/http4s/blaze/util/TickWheelExecutor.scala index 94e84fd90..6a87b5325 100644 --- a/core/src/main/scala/org/http4s/blaze/util/TickWheelExecutor.scala +++ b/core/src/main/scala/org/http4s/blaze/util/TickWheelExecutor.scala @@ -26,17 +26,20 @@ import org.log4s.getLogger /** Low resolution execution scheduler * - * @note The ideas for [[org.http4s.blaze.util.TickWheelExecutor]] is based off of loosely came from the - * Akka scheduler, which was based on the Netty HashedWheelTimer which was in term - * based on concepts in George Varghese - * and Tony Lauck's paper 'Hashed - * and Hierarchical Timing Wheels: data structures to efficiently implement a - * timer facility' + * @note + * The ideas for [[org.http4s.blaze.util.TickWheelExecutor]] is based off of loosely came from + * the Akka scheduler, which was based on the Netty HashedWheelTimer which was in term based on + * concepts in George Varghese and Tony + * Lauck's paper 'Hashed and + * Hierarchical Timing Wheels: data structures to efficiently implement a timer facility' * - * @constructor primary constructor which immediately spins up a thread and begins ticking + * @constructor + * primary constructor which immediately spins up a thread and begins ticking * - * @param wheelSize number of spokes on the wheel. Each tick, the wheel will advance a spoke - * @param tick duration between ticks + * @param wheelSize + * number of spokes on the wheel. Each tick, the wheel will advance a spoke + * @param tick + * duration between ticks */ class TickWheelExecutor(wheelSize: Int = DefaultWheelSize, val tick: Duration = 200.milli) { require(wheelSize > 0, "Need finite size number of ticks") @@ -80,15 +83,17 @@ class TickWheelExecutor(wheelSize: Int = DefaultWheelSize, val tick: Duration = /** Schedule the `Runnable` on the [[TickWheelExecutor]] * - * Execution is performed on the [[TickWheelExecutor]]s thread, so only extremely small - * tasks should be submitted with this method. - * timeouts of Inf duration are ignored, timeouts of zero or negative duration are executed - * immediately on the submitting thread. + * Execution is performed on the [[TickWheelExecutor]] s thread, so only extremely small tasks + * should be submitted with this method. timeouts of Inf duration are ignored, timeouts of zero + * or negative duration are executed immediately on the submitting thread. * - * @param r `Runnable` to be executed - * @param timeout `Duration` to wait before execution - * @return a [[Cancellable]]. This is not a `java.util.concurrent.Cancellable`, - * which is a richer interface. + * @param r + * `Runnable` to be executed + * @param timeout + * `Duration` to wait before execution + * @return + * a [[Cancellable]]. This is not a `java.util.concurrent.Cancellable`, which is a richer + * interface. */ def schedule(r: Runnable, timeout: Duration): Cancelable = schedule(r, Execution.directec, timeout) @@ -98,11 +103,15 @@ class TickWheelExecutor(wheelSize: Int = DefaultWheelSize, val tick: Duration = * timeouts of Inf duration are ignored, timeouts of zero or negative duration are executed * immediately on the submitting thread. * - * @param r `Runnable` to be executed - * @param ec `ExecutionContext` to submit the `Runnable` - * @param timeout `Duration` to wait before execution - * @return a [[Cancellable]]. This is not a `java.util.concurrent.Cancellable`, - * which is a richer interface. + * @param r + * `Runnable` to be executed + * @param ec + * `ExecutionContext` to submit the `Runnable` + * @param timeout + * `Duration` to wait before execution + * @return + * a [[Cancellable]]. This is not a `java.util.concurrent.Cancellable`, which is a richer + * interface. */ def schedule(r: Runnable, ec: ExecutionContext, timeout: Duration): Cancelable = if (alive) @@ -193,7 +202,8 @@ class TickWheelExecutor(wheelSize: Int = DefaultWheelSize, val tick: Duration = /** Removes expired and canceled elements from this bucket, executing expired elements * - * @param time current system time (in nanoseconds) + * @param time + * current system time (in nanoseconds) */ def prune(time: Long): Unit = { @tailrec @@ -218,10 +228,14 @@ class TickWheelExecutor(wheelSize: Int = DefaultWheelSize, val tick: Duration = } /** A Link in a single linked list which can also be passed to the user as a Cancelable - * @param r [[java.lang.Runnable]] which will be executed after the expired time - * @param ec [[scala.concurrent.ExecutionContext]] on which to execute the Runnable - * @param expiration time in nanoseconds after which this Node is expired - * @param next next Node in the list or `tailNode` if this is the last element + * @param r + * [[java.lang.Runnable]] which will be executed after the expired time + * @param ec + * [[scala.concurrent.ExecutionContext]] on which to execute the Runnable + * @param expiration + * time in nanoseconds after which this Node is expired + * @param next + * next Node in the list or `tailNode` if this is the last element */ final private class Node( r: Runnable, diff --git a/core/src/main/scala/org/http4s/blaze/util/package.scala b/core/src/main/scala/org/http4s/blaze/util/package.scala index 4cf39da51..9cc085651 100644 --- a/core/src/main/scala/org/http4s/blaze/util/package.scala +++ b/core/src/main/scala/org/http4s/blaze/util/package.scala @@ -21,10 +21,12 @@ import scala.concurrent.Future package object util { - /** Constructs an assertion error with a reference back to our issue tracker. Use only with head hung low. */ + /** Constructs an assertion error with a reference back to our issue tracker. Use only with head + * hung low. + */ private[blaze] def bug(message: String): AssertionError = new AssertionError( - s"This is a bug. Please report to https://github.com/http4s/blaze/issues: ${message}") + s"This is a bug. Please report to https://github.com/http4s/blaze/issues: $message") // Can replace with `Future.unit` when we drop 2.11 support private[blaze] val FutureUnit: Future[Unit] = Future.successful(()) diff --git a/core/src/test/scala/org/http4s/blaze/pipeline/stages/SSLStageSuite.scala b/core/src/test/scala/org/http4s/blaze/pipeline/stages/SSLStageSuite.scala index 20eea7c90..1b120bb13 100644 --- a/core/src/test/scala/org/http4s/blaze/pipeline/stages/SSLStageSuite.scala +++ b/core/src/test/scala/org/http4s/blaze/pipeline/stages/SSLStageSuite.scala @@ -228,9 +228,12 @@ class SSLStageSuite extends BlazeTestSuite { /** This stage assumes the data coming from Tail is secured * * It was a pain in the end to make this thing which is really only useful for testing - * @param data collection of ByteBuffers to hold in the Head - * @param engine SSLEngine to use for encryption - * @param handshakeInterval interval with which to induce another handshake + * @param data + * collection of ByteBuffers to hold in the Head + * @param engine + * SSLEngine to use for encryption + * @param handshakeInterval + * interval with which to induce another handshake */ class SSLSeqHead(data: Seq[ByteBuffer], engine: SSLEngine, handshakeInterval: Int = -1) extends SeqHead[ByteBuffer](data) { diff --git a/http/src/main/scala/org/http4s/blaze/http/BodyReader.scala b/http/src/main/scala/org/http4s/blaze/http/BodyReader.scala index e92cf8a66..0e5190b5a 100644 --- a/http/src/main/scala/org/http4s/blaze/http/BodyReader.scala +++ b/http/src/main/scala/org/http4s/blaze/http/BodyReader.scala @@ -26,9 +26,9 @@ import scala.util.{Failure, Success} /** Representation of a HTTP message body * - * @note The release of resources must be idempotent, meaning that `discard()` may - * be called after complete consumption of the body and it may also be called - * numerous times. + * @note + * The release of resources must be idempotent, meaning that `discard()` may be called after + * complete consumption of the body and it may also be called numerous times. */ trait BodyReader { @@ -45,20 +45,21 @@ trait BodyReader { * * This may be a result of being discarded, failure, or deletion of the data stream. * - * Because [[BodyReader]] is async it is not, in general, possible to definitively determine - * if more data remains in the stream. Therefore, the contract of this method is that a return - * value of `true` guarantees that no more data can be obtained from this [[BodyReader]], but a - * return value of `false` does not guarantee more data. + * Because [[BodyReader]] is async it is not, in general, possible to definitively determine if + * more data remains in the stream. Therefore, the contract of this method is that a return value + * of `true` guarantees that no more data can be obtained from this [[BodyReader]], but a return + * value of `false` does not guarantee more data. */ def isExhausted: Boolean /** Accumulate any remaining data. * - * The remainder of the message body will be accumulated into a single buffer. If no data remains, - * the `ByteBuffer` will be empty as defined by `ByteBuffer.hasRemaining()` + * The remainder of the message body will be accumulated into a single buffer. If no data + * remains, the `ByteBuffer` will be empty as defined by `ByteBuffer.hasRemaining()` * - * @param max maximum bytes to accumulate before resulting in a failed future with the exception - * [[BodyReader.BodyReaderOverflowException]]. + * @param max + * maximum bytes to accumulate before resulting in a failed future with the exception + * [[BodyReader.BodyReaderOverflowException]]. */ def accumulate(max: Int = Int.MaxValue): Future[ByteBuffer] = BodyReader.accumulate(max, this) @@ -79,8 +80,8 @@ object BodyReader { /** The canonical empty [[BodyReader]] * - * This should be the instance you use if you want to signal that the message body is - * in guaranteed to be empty. + * This should be the instance you use if you want to signal that the message body is in + * guaranteed to be empty. */ val EmptyBodyReader: BodyReader = new BodyReader { override def discard(): Unit = () @@ -90,10 +91,11 @@ object BodyReader { /** Construct a [[BodyReader]] with exactly one chunk of data * - * This method takes ownership if the passed `ByteBuffer`: any changes to the underlying - * buffer will be visible to the consumer of this [[BodyReader]] and vice versa. + * This method takes ownership if the passed `ByteBuffer`: any changes to the underlying buffer + * will be visible to the consumer of this [[BodyReader]] and vice versa. * - * @note if the passed buffer is empty, the `EmptyBodyReader` is returned. + * @note + * if the passed buffer is empty, the `EmptyBodyReader` is returned. */ def singleBuffer(buffer: ByteBuffer): BodyReader = if (!buffer.hasRemaining) EmptyBodyReader @@ -121,8 +123,8 @@ object BodyReader { } } - /** The remainder of the message body will be accumulated into a single buffer. If no data remains, - * the `ByteBuffer` will be empty as defined by `ByteBuffer.hasRemaining()` + /** The remainder of the message body will be accumulated into a single buffer. If no data + * remains, the `ByteBuffer` will be empty as defined by `ByteBuffer.hasRemaining()` */ def accumulate(max: Int, body: BodyReader): Future[ByteBuffer] = { require(max >= 0) diff --git a/http/src/main/scala/org/http4s/blaze/http/ClientResponse.scala b/http/src/main/scala/org/http4s/blaze/http/ClientResponse.scala index 3e6d258e5..31f08d009 100644 --- a/http/src/main/scala/org/http4s/blaze/http/ClientResponse.scala +++ b/http/src/main/scala/org/http4s/blaze/http/ClientResponse.scala @@ -27,10 +27,14 @@ import scala.util.control.NonFatal /** HTTP response received by the client * - * @param code Response code - * @param status Response message. This have no meaning for the HTTP connection, its just for human enjoyment. - * @param headers Response headers - * @param body [[BodyReader]] used to consume the response body. + * @param code + * Response code + * @param status + * Response message. This have no meaning for the HTTP connection, its just for human enjoyment. + * @param headers + * Response headers + * @param body + * [[BodyReader]] used to consume the response body. */ case class ClientResponse(code: Int, status: String, headers: Headers, body: BodyReader) diff --git a/http/src/main/scala/org/http4s/blaze/http/HeaderNames.scala b/http/src/main/scala/org/http4s/blaze/http/HeaderNames.scala index 0f40ecce7..9c2906dc5 100644 --- a/http/src/main/scala/org/http4s/blaze/http/HeaderNames.scala +++ b/http/src/main/scala/org/http4s/blaze/http/HeaderNames.scala @@ -19,8 +19,7 @@ package org.http4s.blaze.http import scala.annotation.tailrec import scala.collection.BitSet -/** Incomplete collection of header names, all lower case for - * easy compatibility with HTTP/2. +/** Incomplete collection of header names, all lower case for easy compatibility with HTTP/2. */ object HeaderNames { val Connection = "connection" @@ -30,8 +29,8 @@ object HeaderNames { val TE = "te" val TransferEncoding = "transfer-encoding" - /** Check of the header key is a valid HTTP/2 header key. - * Pseudo headers are considered invalid and should be screened before hand. + /** Check of the header key is a valid HTTP/2 header key. Pseudo headers are considered invalid + * and should be screened before hand. */ def validH2HeaderKey(str: String): Boolean = { val s = str.length() diff --git a/http/src/main/scala/org/http4s/blaze/http/HttpClientSession.scala b/http/src/main/scala/org/http4s/blaze/http/HttpClientSession.scala index a7f919cfb..c119e47e6 100644 --- a/http/src/main/scala/org/http4s/blaze/http/HttpClientSession.scala +++ b/http/src/main/scala/org/http4s/blaze/http/HttpClientSession.scala @@ -23,8 +23,7 @@ import scala.concurrent.duration.Duration /** Representation of a concrete HTTP session * - * The [[HttpClientSession]] represents a true HTTP client session, either - * HTTP/1.x or HTTP/2. + * The [[HttpClientSession]] represents a true HTTP client session, either HTTP/1.x or HTTP/2. */ sealed trait HttpClientSession { @@ -46,10 +45,11 @@ sealed trait HttpClientSession { /** Close the session within the specified duration. * * The provided duration is only an advisory upper bound and the implementation may choose to - * close the session at any time before the expiration of the `within` parameter. The - * returned `Future[Unit]` will resolve when all resources have been freed from the client. + * close the session at any time before the expiration of the `within` parameter. The returned + * `Future[Unit]` will resolve when all resources have been freed from the client. * - * @note This will generally entail closing the socket connection. + * @note + * This will generally entail closing the socket connection. */ def close(within: Duration): Future[Unit] @@ -63,8 +63,9 @@ trait Http2ClientSession extends HttpClientSession { /** An estimate for the current quality of the session * - * @return a number with domain [0, 1] signifying the health or quality of - * the session. The scale is intended to be linear. + * @return + * a number with domain [0, 1] signifying the health or quality of the session. The scale is + * intended to be linear. */ def quality: Double @@ -79,12 +80,13 @@ object HttpClientSession { /** Releases the resources associated with this dispatch * - * This may entail closing the connection or returning it to a connection - * pool, depending on the client implementation and the state of the session - * responsible for dispatching the associated request. + * This may entail closing the connection or returning it to a connection pool, depending on + * the client implementation and the state of the session responsible for dispatching the + * associated request. * - * @note `release()` must be idempotent and never throw an exception, otherwise - * the session will be corrupted. + * @note + * `release()` must be idempotent and never throw an exception, otherwise the session will be + * corrupted. */ def release(): Unit } @@ -94,7 +96,9 @@ object HttpClientSession { /** Able to dispatch now */ case object Ready extends Status - /** The session is busy and cannot dispatch a message at this time, but may be able to in the future */ + /** The session is busy and cannot dispatch a message at this time, but may be able to in the + * future + */ case object Busy extends Status /** The session will no longer be able to dispatch requests */ diff --git a/http/src/main/scala/org/http4s/blaze/http/HttpRequest.scala b/http/src/main/scala/org/http4s/blaze/http/HttpRequest.scala index 0f822817a..834a5aff5 100644 --- a/http/src/main/scala/org/http4s/blaze/http/HttpRequest.scala +++ b/http/src/main/scala/org/http4s/blaze/http/HttpRequest.scala @@ -18,11 +18,15 @@ package org.http4s.blaze.http /** Standard HTTP request * - * @param method HTTP request method - * @param url request url - * @param headers request headers - * @param body function which returns the next chunk of the request body. Termination is - * signaled by an __empty__ `ByteBuffer` as determined by `ByteBuffer.hasRemaining()`. + * @param method + * HTTP request method + * @param url + * request url + * @param headers + * request headers + * @param body + * function which returns the next chunk of the request body. Termination is signaled by an + * __empty__ `ByteBuffer` as determined by `ByteBuffer.hasRemaining()`. */ case class HttpRequest( method: Method, @@ -41,9 +45,9 @@ case class HttpRequest( /** A String representation of this request that includes the headers * - * @note it is generally a security flaw to log headers as they may contain - * sensitive user data. As such, this method should be used sparingly - * and almost never in a production environment. + * @note + * it is generally a security flaw to log headers as they may contain sensitive user data. As + * such, this method should be used sparingly and almost never in a production environment. */ def sensitiveToString: String = formatStr(headers.toString) } diff --git a/http/src/main/scala/org/http4s/blaze/http/http2/ByteBufferInputStream.scala b/http/src/main/scala/org/http4s/blaze/http/http2/ByteBufferInputStream.scala index 62ad840d6..78b3db20c 100644 --- a/http/src/main/scala/org/http4s/blaze/http/http2/ByteBufferInputStream.scala +++ b/http/src/main/scala/org/http4s/blaze/http/http2/ByteBufferInputStream.scala @@ -19,9 +19,8 @@ package org.http4s.blaze.http.http2 import java.io.{IOException, InputStream} import java.nio.ByteBuffer -/** Wrap a `ByteBuffer` in an `InputStream` interface. - * This is just an adapter to work with the twitter hpack - * implementation. I would really like to get rid of it. +/** Wrap a `ByteBuffer` in an `InputStream` interface. This is just an adapter to work with the + * twitter hpack implementation. I would really like to get rid of it. */ private final class ByteBufferInputStream(buffer: ByteBuffer) extends InputStream { private[this] var markSize = -1 diff --git a/http/src/main/scala/org/http4s/blaze/http/http2/Connection.scala b/http/src/main/scala/org/http4s/blaze/http/http2/Connection.scala index 7effc071d..f1cce195d 100644 --- a/http/src/main/scala/org/http4s/blaze/http/http2/Connection.scala +++ b/http/src/main/scala/org/http4s/blaze/http/http2/Connection.scala @@ -27,24 +27,28 @@ private trait Connection { /** An estimate for the current quality of the connection * - * Quality is intended to provide a metric for health of a session. - * Factors considered may be the number of outstanding streams, available - * outbound streams, and flow window status and behavior. + * Quality is intended to provide a metric for health of a session. Factors considered may be the + * number of outstanding streams, available outbound streams, and flow window status and + * behavior. * - * @see [[Http2ClientSession]] + * @see + * [[Http2ClientSession]] * - * @return a number with domain [0, 1] signifying the health or quality of - * the session. The scale is intended to be linear. + * @return + * a number with domain [0, 1] signifying the health or quality of the session. The scale is + * intended to be linear. */ def quality: Double /** Get the status of session * - * Status is intended to be used for deciding if a session is ready for - * dispatches or needs to be cleaned up. + * Status is intended to be used for deciding if a session is ready for dispatches or needs to be + * cleaned up. * - * @note The status is racy and is only intended to be used as advisory. - * @see `quality` for a metric of health of the session + * @note + * The status is racy and is only intended to be used as advisory. + * @see + * `quality` for a metric of health of the session */ def status: Status @@ -53,9 +57,9 @@ private trait Connection { /** Signal that the session should shutdown within the grace period * - * Only the first invocation is guaranteed to run, and the behavior of further - * invocations result in implementation specific behavior. The resultant - * `Future` will resolve once the session has drained. + * Only the first invocation is guaranteed to run, and the behavior of further invocations result + * in implementation specific behavior. The resultant `Future` will resolve once the session has + * drained. */ def drainSession(gracePeriod: Duration): Future[Unit] @@ -64,8 +68,8 @@ private trait Connection { /** Create a new outbound stream * - * Resources are not necessarily allocated to this stream, therefore it is - * not guaranteed to succeed. + * Resources are not necessarily allocated to this stream, therefore it is not guaranteed to + * succeed. */ // TODO: right now this only benefits the client. We need to get the push-promise support for the server side def newOutboundStream(): HeadStage[StreamFrame] @@ -80,15 +84,14 @@ private[blaze] object Connection { final def running: Boolean = this == Running } - /** The `Running` state represents a session that is active and able to accept - * new streams. + /** The `Running` state represents a session that is active and able to accept new streams. */ case object Running extends State sealed abstract class Closing extends State - /** The `Draining` state represents a session that is no longer accepting new - * streams and is in the process of draining existing connection. + /** The `Draining` state represents a session that is no longer accepting new streams and is in + * the process of draining existing connection. */ case object Draining extends Closing diff --git a/http/src/main/scala/org/http4s/blaze/http/http2/ConnectionImpl.scala b/http/src/main/scala/org/http4s/blaze/http/http2/ConnectionImpl.scala index 6d19ae4f1..51f51ebfd 100644 --- a/http/src/main/scala/org/http4s/blaze/http/http2/ConnectionImpl.scala +++ b/http/src/main/scala/org/http4s/blaze/http/http2/ConnectionImpl.scala @@ -31,8 +31,9 @@ import scala.util.{Failure, Success} /** Representation of the HTTP/2 connection. * - * @note the TailStage needs to be ready to go as this session will start - * reading from the channel immediately. + * @note + * the TailStage needs to be ready to go as this session will start reading from the channel + * immediately. */ private final class ConnectionImpl( tailStage: TailStage[ByteBuffer], diff --git a/http/src/main/scala/org/http4s/blaze/http/http2/FlowStrategy.scala b/http/src/main/scala/org/http4s/blaze/http/http2/FlowStrategy.scala index a84bf07ce..0a69411ff 100644 --- a/http/src/main/scala/org/http4s/blaze/http/http2/FlowStrategy.scala +++ b/http/src/main/scala/org/http4s/blaze/http/http2/FlowStrategy.scala @@ -26,22 +26,28 @@ trait FlowStrategy { /** Decide if the session window needs to send a WINDOW_UPDATE frame * - * @note This must not mutate the [[SessionFlowControl]] in any way. - * @note This verison should only be used in situations where the stream associated - * with the data does not exist. For example, it may have already closed and - * sent a RST frame. + * @note + * This must not mutate the [[SessionFlowControl]] in any way. + * @note + * This verison should only be used in situations where the stream associated with the data + * does not exist. For example, it may have already closed and sent a RST frame. * - * @param session the session [[SessionFlowControl]] - * @return number of bytes to update the session flow window with. + * @param session + * the session [[SessionFlowControl]] + * @return + * number of bytes to update the session flow window with. */ def checkSession(session: SessionFlowControl): Int /** Decide if the stream and/or the session need a WINDOW_UPDATE frame * - * @note This must not mutate the [[SessionFlowControl]] or the [[StreamFlowWindow]] in any way. + * @note + * This must not mutate the [[SessionFlowControl]] or the [[StreamFlowWindow]] in any way. * - * @param stream the stream [[StreamFlowWindow]] - * @return the number of bytes to update the session and stream flow window with. + * @param stream + * the stream [[StreamFlowWindow]] + * @return + * the number of bytes to update the session and stream flow window with. */ def checkStream(stream: StreamFlowWindow): Increment } diff --git a/http/src/main/scala/org/http4s/blaze/http/http2/FrameDecoder.scala b/http/src/main/scala/org/http4s/blaze/http/http2/FrameDecoder.scala index a513978e5..50186b390 100644 --- a/http/src/main/scala/org/http4s/blaze/http/http2/FrameDecoder.scala +++ b/http/src/main/scala/org/http4s/blaze/http/http2/FrameDecoder.scala @@ -96,12 +96,17 @@ private class FrameDecoder(localSettings: Http2Settings, listener: FrameListener * * https://tools.ietf.org/html/rfc7540#section-5.5 * - * @param code the frame type of this extension frame - * @param streamId stream id associated with this extension frame - * @param flags the flags associated with this extension frame - * @param buffer the payload associated with this extension frame - * @return result of handling the message. If this extension frame is of - * unknown type, it MUST be ignored by spec. + * @param code + * the frame type of this extension frame + * @param streamId + * stream id associated with this extension frame + * @param flags + * the flags associated with this extension frame + * @param buffer + * the payload associated with this extension frame + * @return + * result of handling the message. If this extension frame is of unknown type, it MUST be + * ignored by spec. */ def onExtensionFrame(code: Byte, streamId: Int, flags: Byte, buffer: ByteBuffer): Result = { val _ = (code, streamId, flags, buffer) @@ -298,8 +303,8 @@ private object FrameDecoder { /** Get the length field of the frame, consuming the bytes from the buffer. * - * @return -1 if the buffer doesn't have 3 bytes for the length field, - * and the length field otherwise. + * @return + * -1 if the buffer doesn't have 3 bytes for the length field, and the length field otherwise. */ def getLengthField(buffer: ByteBuffer): Int = ((buffer.get() & 0xff) << 16) | @@ -318,8 +323,8 @@ private object FrameDecoder { Priority.Dependent(Flags.DepID(rawInt), ex, priority) } - /** Read the padding length and strip the requisite bytes from the end of - * the buffer using the `ByteBuffer.limit` method. + /** Read the padding length and strip the requisite bytes from the end of the buffer using the + * `ByteBuffer.limit` method. */ private def limitPadding(padding: Int, buffer: ByteBuffer): MaybeError = if (padding == 0) Continue diff --git a/http/src/main/scala/org/http4s/blaze/http/http2/FrameEncoder.scala b/http/src/main/scala/org/http4s/blaze/http/http2/FrameEncoder.scala index 4097a1e5a..5ab4295b3 100644 --- a/http/src/main/scala/org/http4s/blaze/http/http2/FrameEncoder.scala +++ b/http/src/main/scala/org/http4s/blaze/http/http2/FrameEncoder.scala @@ -53,8 +53,8 @@ private final class FrameEncoder(remoteSettings: Http2Settings, headerEncoder: H /** Generate stream data frame(s) for the specified data * - * If the data exceeds the peers MAX_FRAME_SIZE setting, it is fragmented - * into a series of frames. + * If the data exceeds the peers MAX_FRAME_SIZE setting, it is fragmented into a series of + * frames. */ def dataFrame(streamId: Int, endStream: Boolean, data: ByteBuffer): collection.Seq[ByteBuffer] = { val limit = maxFrameSize @@ -74,9 +74,8 @@ private final class FrameEncoder(remoteSettings: Http2Settings, headerEncoder: H /** Generate stream header frames from the provided header sequence * - * If the compressed representation of the headers exceeds the MAX_FRAME_SIZE - * setting of the peer, it will be broken into a HEADERS frame and a series of - * CONTINUATION frames. + * If the compressed representation of the headers exceeds the MAX_FRAME_SIZE setting of the + * peer, it will be broken into a HEADERS frame and a series of CONTINUATION frames. */ def headerFrame( streamId: Int, diff --git a/http/src/main/scala/org/http4s/blaze/http/http2/FrameListener.scala b/http/src/main/scala/org/http4s/blaze/http/http2/FrameListener.scala index d21026e93..d8f2a0c6f 100644 --- a/http/src/main/scala/org/http4s/blaze/http/http2/FrameListener.scala +++ b/http/src/main/scala/org/http4s/blaze/http/http2/FrameListener.scala @@ -22,22 +22,21 @@ import Http2Settings.Setting /** Handles the HTTP messages defined in https://tools.ietf.org/html/rfc7540 * * It is expected that implementations will handle validation errors based on the incoming data: - * the supplier of the data (commonly a [[FrameDecoder]]) is responsible for ensuring that - * the frames are properly formatted but doesn't concern itself with validating the sanity of a - * frames content. + * the supplier of the data (commonly a [[FrameDecoder]]) is responsible for ensuring that the + * frames are properly formatted but doesn't concern itself with validating the sanity of a frames + * content. * * For example, the [[FrameDecoder]] is responsible for ensuring that the body of a WINDOW_UPDATE - * is 4 bytes long and the [[FrameListener]] would be expected to signal an error if the - * window increment was 0 or the update was for an idle stream. + * is 4 bytes long and the [[FrameListener]] would be expected to signal an error if the window + * increment was 0 or the update was for an idle stream. */ private trait FrameListener { /** Determine whether we are in the midst of a sequence of header and header continuation frames * - * Each header block is processed as a discrete unit. Header blocks - * MUST be transmitted as a contiguous sequence of frames, with no - * interleaved frames of any other type or from any other stream. - * https://tools.ietf.org/html/rfc7540#section-4.3 + * Each header block is processed as a discrete unit. Header blocks MUST be transmitted as a + * contiguous sequence of frames, with no interleaved frames of any other type or from any other + * stream. https://tools.ietf.org/html/rfc7540#section-4.3 */ def inHeaderSequence: Boolean @@ -45,10 +44,14 @@ private trait FrameListener { * * https://tools.ietf.org/html/rfc7540#section-6.1 * - * @param streamId stream id associated with this data frame. The codec will never set this to 0. - * @param endStream is the last inbound frame for this stream - * @param data raw data of this message. Does NOT include padding. - * @param flowSize bytes counted against the flow window. Includes the data and padding + * @param streamId + * stream id associated with this data frame. The codec will never set this to 0. + * @param endStream + * is the last inbound frame for this stream + * @param data + * raw data of this message. Does NOT include padding. + * @param flowSize + * bytes counted against the flow window. Includes the data and padding */ def onDataFrame(streamId: Int, endStream: Boolean, data: ByteBuffer, flowSize: Int): Result @@ -56,12 +59,17 @@ private trait FrameListener { * * https://tools.ietf.org/html/rfc7540#section-6.2 * - * @param streamId stream id associated with this data frame. The codec will never set this to 0. - * @param priority optional priority data associated with this frame. - * @param endHeaders This frame contains the entire headers block and is not followed by CONTINUATION frames. - * @param endStream The headers block is the last inbound block of the stream. This MAY be followed by - * CONTINUATION frames, which are considered part of this headers block. - * @param data compressed binary header data + * @param streamId + * stream id associated with this data frame. The codec will never set this to 0. + * @param priority + * optional priority data associated with this frame. + * @param endHeaders + * This frame contains the entire headers block and is not followed by CONTINUATION frames. + * @param endStream + * The headers block is the last inbound block of the stream. This MAY be followed by + * CONTINUATION frames, which are considered part of this headers block. + * @param data + * compressed binary header data * @return */ def onHeadersFrame( @@ -75,16 +83,21 @@ private trait FrameListener { * * https://tools.ietf.org/html/rfc7540#section-6.10 * - * @param streamId stream id associated with this data frame. The codec will never set this to 0. - * @param endHeaders this is the last CONTINUATION frame of the headers sequence - * @param data compressed binary header data + * @param streamId + * stream id associated with this data frame. The codec will never set this to 0. + * @param endHeaders + * this is the last CONTINUATION frame of the headers sequence + * @param data + * compressed binary header data */ def onContinuationFrame(streamId: Int, endHeaders: Boolean, data: ByteBuffer): Result /** Called on successful receipt of a PRIORITY frame * - * @param streamId stream id associated with this priority frame. The codec will never set this to 0. - * @param priority priority data + * @param streamId + * stream id associated with this priority frame. The codec will never set this to 0. + * @param priority + * priority data */ def onPriorityFrame(streamId: Int, priority: Priority.Dependent): Result @@ -92,8 +105,10 @@ private trait FrameListener { * * https://tools.ietf.org/html/rfc7540#section-6.4 * - * @param streamId stream id associated with this RST_STREAM frame. The codec will never set this to 0. - * @param code error code detailing the reason for the reset. + * @param streamId + * stream id associated with this RST_STREAM frame. The codec will never set this to 0. + * @param code + * error code detailing the reason for the reset. * @return */ def onRstStreamFrame(streamId: Int, code: Long): Result @@ -102,8 +117,8 @@ private trait FrameListener { * * https://tools.ietf.org/html/rfc7540#section-6.5 * - * @param settings Settings contained in this frame. If this is an ack, - * `settings` will be `None`. + * @param settings + * Settings contained in this frame. If this is an ack, `settings` will be `None`. */ def onSettingsFrame(settings: Option[Seq[Setting]]): Result @@ -111,10 +126,14 @@ private trait FrameListener { * * https://tools.ietf.org/html/rfc7540#section-6.6 * - * @param streamId stream id associated with this PUSH_PROMISE frame. The codec will never set this to 0. - * @param promisedId This stream id must be a stream in the idle state. The codec will never set this to 0. - * @param end_headers This is the last frame of this promise block. - * @param data compressed binary header data. + * @param streamId + * stream id associated with this PUSH_PROMISE frame. The codec will never set this to 0. + * @param promisedId + * This stream id must be a stream in the idle state. The codec will never set this to 0. + * @param end_headers + * This is the last frame of this promise block. + * @param data + * compressed binary header data. */ def onPushPromiseFrame( streamId: Int, @@ -126,8 +145,10 @@ private trait FrameListener { * * https://tools.ietf.org/html/rfc7540#section-6.7 * - * @param ack if this is an acknowledgment of an outbound PING frame. - * @param data opaque data associated with this PING frame. + * @param ack + * if this is an acknowledgment of an outbound PING frame. + * @param data + * opaque data associated with this PING frame. */ def onPingFrame(ack: Boolean, data: Array[Byte]): Result @@ -135,9 +156,12 @@ private trait FrameListener { * * https://tools.ietf.org/html/rfc7540#section-6.8 * - * @param lastStream last stream id that may have been processed by the peer. - * @param errorCode error code describing the reason for the GOAWAY frame. - * @param debugData opaque data that may be useful for debugging purposes. + * @param lastStream + * last stream id that may have been processed by the peer. + * @param errorCode + * error code describing the reason for the GOAWAY frame. + * @param debugData + * opaque data that may be useful for debugging purposes. */ def onGoAwayFrame(lastStream: Int, errorCode: Long, debugData: Array[Byte]): Result @@ -145,9 +169,10 @@ private trait FrameListener { * * https://tools.ietf.org/html/rfc7540#section-6.9 * - * @param streamId stream id to update. Stream id of 0 means this is for the session flow window. - * @param sizeIncrement number of bytes to increment the flow window. The codec will never set - * this to less than 1. + * @param streamId + * stream id to update. Stream id of 0 means this is for the session flow window. + * @param sizeIncrement + * number of bytes to increment the flow window. The codec will never set this to less than 1. */ def onWindowUpdateFrame(streamId: Int, sizeIncrement: Int): Result } diff --git a/http/src/main/scala/org/http4s/blaze/http/http2/FrameSerializer.scala b/http/src/main/scala/org/http4s/blaze/http/http2/FrameSerializer.scala index 9db05ebe7..512905311 100644 --- a/http/src/main/scala/org/http4s/blaze/http/http2/FrameSerializer.scala +++ b/http/src/main/scala/org/http4s/blaze/http/http2/FrameSerializer.scala @@ -32,13 +32,17 @@ private[http2] object FrameSerializer { /** Create a DATA frame * - * @param streamId stream id of the associated data frame - * @param endStream whether to set the END_STREAM flag - * @param padding number of octets by which to pad the message, with 0 meaning the flag is not set, - * 1 meaning the flag is set and the pad length field is added, and padding = [2-256] - * meaning the flag is set, length field is added, and (padding - 1) bytes (0x00) are - * added to the end of the frame. - * @param data data consisting of the payload + * @param streamId + * stream id of the associated data frame + * @param endStream + * whether to set the END_STREAM flag + * @param padding + * number of octets by which to pad the message, with 0 meaning the flag is not set, 1 meaning + * the flag is set and the pad length field is added, and padding = [2-256] meaning the flag is + * set, length field is added, and (padding - 1) bytes (0x00) are added to the end of the + * frame. + * @param data + * data consisting of the payload */ def mkDataFrame( streamId: Int, diff --git a/http/src/main/scala/org/http4s/blaze/http/http2/HeaderAggregatingFrameListener.scala b/http/src/main/scala/org/http4s/blaze/http/http2/HeaderAggregatingFrameListener.scala index 8b7d6579b..92b70905d 100644 --- a/http/src/main/scala/org/http4s/blaze/http/http2/HeaderAggregatingFrameListener.scala +++ b/http/src/main/scala/org/http4s/blaze/http/http2/HeaderAggregatingFrameListener.scala @@ -22,16 +22,16 @@ import org.http4s.blaze.http.Headers import org.http4s.blaze.util.BufferTools import Http2Exception.PROTOCOL_ERROR -/** A [[FrameListener]] that decodes raw HEADERS, PUSH_PROMISE, - * and CONTINUATION frames from ByteBuffer packets to a complete - * collections of headers. +/** A [[FrameListener]] that decodes raw HEADERS, PUSH_PROMISE, and CONTINUATION frames from + * ByteBuffer packets to a complete collections of headers. * - * If the size of the raw header blcok exceeds the MAX_HEADER_LIST_SIZE parameter - * we send a GOAWAY frame. This can legally be handled with a 431 response, but the - * headers must be processed to keep the header decompressor in a valid state. + * If the size of the raw header blcok exceeds the MAX_HEADER_LIST_SIZE parameter we send a GOAWAY + * frame. This can legally be handled with a 431 response, but the headers must be processed to + * keep the header decompressor in a valid state. * https://tools.ietf.org/html/rfc7540#section-10.5.1 * - * @note This class is not 'thread safe' and should be treated accordingly. + * @note + * This class is not 'thread safe' and should be treated accordingly. */ private abstract class HeaderAggregatingFrameListener( localSettings: Http2Settings, @@ -58,10 +58,14 @@ private abstract class HeaderAggregatingFrameListener( /** Called on the successful receipt of a complete HEADERS block * - * @param streamId stream id of the HEADERS block. The codec will never pass 0. - * @param priority optional priority information associated with this HEADERS block. - * @param endStream this is the last inbound frame for this stream. - * @param headers decompressed headers. + * @param streamId + * stream id of the HEADERS block. The codec will never pass 0. + * @param priority + * optional priority information associated with this HEADERS block. + * @param endStream + * this is the last inbound frame for this stream. + * @param headers + * decompressed headers. */ def onCompleteHeadersFrame( streamId: Int, @@ -72,9 +76,12 @@ private abstract class HeaderAggregatingFrameListener( /** Called on the successful receipt of a complete PUSH_PROMISE block * - * @param streamId stream id of the associated stream. The codec will never pass 0. - * @param promisedId promised stream id. This must be a valid, idle stream id. The codec will never pass 0. - * @param headers decompressed headers. + * @param streamId + * stream id of the associated stream. The codec will never pass 0. + * @param promisedId + * promised stream id. This must be a valid, idle stream id. The codec will never pass 0. + * @param headers + * decompressed headers. */ def onCompletePushPromiseFrame(streamId: Int, promisedId: Int, headers: Headers): Result diff --git a/http/src/main/scala/org/http4s/blaze/http/http2/HeaderDecoder.scala b/http/src/main/scala/org/http4s/blaze/http/http2/HeaderDecoder.scala index e0a7fd3cd..787ea3e8e 100644 --- a/http/src/main/scala/org/http4s/blaze/http/http2/HeaderDecoder.scala +++ b/http/src/main/scala/org/http4s/blaze/http/http2/HeaderDecoder.scala @@ -27,16 +27,16 @@ import com.twitter.hpack.{Decoder, HeaderListener} import scala.collection.immutable.VectorBuilder import scala.util.control.NonFatal -/** Decoder of HEADERS frame payloads into a collection of key-value - * pairs using a HPACK decoder. +/** Decoder of HEADERS frame payloads into a collection of key-value pairs using a HPACK decoder. * - * @param maxHeaderListSize maximum allowed size of the header block. + * @param maxHeaderListSize + * maximum allowed size of the header block. * - * "The value is based on the uncompressed size of header fields, - * including the length of the name and value in octets plus an - * overhead of 32 octets for each header field." + * "The value is based on the uncompressed size of header fields, including the length of the name + * and value in octets plus an overhead of 32 octets for each header field." * https://tools.ietf.org/html/rfc7540#section-6.5.2 - * @param maxTableSize maximum compression table to maintain + * @param maxTableSize + * maximum compression table to maintain */ private final class HeaderDecoder( maxHeaderListSize: Int, diff --git a/http/src/main/scala/org/http4s/blaze/http/http2/HeaderEncoder.scala b/http/src/main/scala/org/http4s/blaze/http/http2/HeaderEncoder.scala index c83a2fead..087a0c224 100644 --- a/http/src/main/scala/org/http4s/blaze/http/http2/HeaderEncoder.scala +++ b/http/src/main/scala/org/http4s/blaze/http/http2/HeaderEncoder.scala @@ -26,8 +26,8 @@ import com.twitter.hpack.Encoder /** HTTP/2 HPACK header encoder * - * @param initialMaxTableSize maximum HPACK table size the peer - * will allow. + * @param initialMaxTableSize + * maximum HPACK table size the peer will allow. */ class HeaderEncoder(initialMaxTableSize: Int) { private[this] val encoder = new Encoder(initialMaxTableSize) diff --git a/http/src/main/scala/org/http4s/blaze/http/http2/Http2Exception.scala b/http/src/main/scala/org/http4s/blaze/http/http2/Http2Exception.scala index 68257061f..95251ea79 100644 --- a/http/src/main/scala/org/http4s/blaze/http/http2/Http2Exception.scala +++ b/http/src/main/scala/org/http4s/blaze/http/http2/Http2Exception.scala @@ -33,7 +33,8 @@ sealed abstract class Http2Exception(msg: String) /** Convert this exception to a stream exception * - * @note If this is already a stream exception but with a different stream id, the id will be changed + * @note + * If this is already a stream exception but with a different stream id, the id will be changed */ final def toStreamException(streamId: Int): Http2StreamException = this match { @@ -50,9 +51,8 @@ sealed abstract class Http2Exception(msg: String) /** Was the exception due to refusal by the peer. * - * These exceptions are safe to automatically retry even if the HTTP method - * is not an idempotent method. See https://tools.ietf.org/html/rfc7540#section-8.1.4 - * for more details. + * These exceptions are safe to automatically retry even if the HTTP method is not an idempotent + * method. See https://tools.ietf.org/html/rfc7540#section-8.1.4 for more details. */ final def isRefusedStream: Boolean = code == Http2Exception.REFUSED_STREAM.code diff --git a/http/src/main/scala/org/http4s/blaze/http/http2/Http2Settings.scala b/http/src/main/scala/org/http4s/blaze/http/http2/Http2Settings.scala index 80d30cef4..0a2dbb9c8 100644 --- a/http/src/main/scala/org/http4s/blaze/http/http2/Http2Settings.scala +++ b/http/src/main/scala/org/http4s/blaze/http/http2/Http2Settings.scala @@ -24,52 +24,46 @@ import scala.collection.mutable * * These represent the HTTP2 settings for either the client or server. * - * @see https://tools.ietf.org/html/rfc7540#section-6.5.2, where the - * doc strings were obtained. + * @see + * https://tools.ietf.org/html/rfc7540#section-6.5.2, where the doc strings were obtained. */ sealed abstract class Http2Settings { - /** Allows the sender to inform the remote endpoint of the maximum size - * of the header compression table used to decode header blocks, in - * octets. The encoder can select any size equal to or less than this - * value by using signaling specific to the header compression format - * inside a header block. + /** Allows the sender to inform the remote endpoint of the maximum size of the header compression + * table used to decode header blocks, in octets. The encoder can select any size equal to or + * less than this value by using signaling specific to the header compression format inside a + * header block. */ def headerTableSize: Int - /** Indicates the sender's initial window size (in octets) for - * stream-level flow control. + /** Indicates the sender's initial window size (in octets) for stream-level flow control. */ def initialWindowSize: Int /** This setting can be used to disable server push (Section 8.2). */ def pushEnabled: Boolean - /** Indicates the maximum number of concurrent streams that the - * sender will allow. This limit is directional: it applies to the - * number of streams that the sender permits the receiver to create. - * Initially, there is no limit to this value. It is recommended that - * this value be no smaller than 100, so as to not unnecessarily limit - * parallelism. + /** Indicates the maximum number of concurrent streams that the sender will allow. This limit is + * directional: it applies to the number of streams that the sender permits the receiver to + * create. Initially, there is no limit to this value. It is recommended that this value be no + * smaller than 100, so as to not unnecessarily limit parallelism. * - * A value of 0 for SETTINGS_MAX_CONCURRENT_STREAMS SHOULD NOT be - * treated as special by endpoints. A zero value does prevent the - * creation of new streams; however, this can also happen for any - * limit that is exhausted with active streams. Servers SHOULD only - * set a zero value for short durations; if a server does not wish to - * accept requests, closing the connection is more appropriate. + * A value of 0 for SETTINGS_MAX_CONCURRENT_STREAMS SHOULD NOT be treated as special by + * endpoints. A zero value does prevent the creation of new streams; however, this can also + * happen for any limit that is exhausted with active streams. Servers SHOULD only set a zero + * value for short durations; if a server does not wish to accept requests, closing the + * connection is more appropriate. */ def maxConcurrentStreams: Int - /** Indicates the size of the largest frame payload that the sender is - * willing to receive, in octets. + /** Indicates the size of the largest frame payload that the sender is willing to receive, in + * octets. */ def maxFrameSize: Int - /** This advisory setting informs a peer of the maximum size of header - * list that the sender is prepared to accept, in octets. The value is - * based on the uncompressed size of header fields, including the length - * of the name and value in octets plus an overhead of 32 octets for each + /** This advisory setting informs a peer of the maximum size of header list that the sender is + * prepared to accept, in octets. The value is based on the uncompressed size of header fields, + * including the length of the name and value in octets plus an overhead of 32 octets for each * header field. */ def maxHeaderListSize: Int @@ -145,7 +139,8 @@ object Http2Settings { /** Helper for extracting invalid settings * - * @see https://tools.ietf.org/html/rfc7540#section-6.5.2 + * @see + * https://tools.ietf.org/html/rfc7540#section-6.5.2 */ object InvalidSetting { def unapply(setting: Setting): Option[Http2Exception] = @@ -159,7 +154,7 @@ object Http2Settings { case Setting(_, value) if value < 0 => Some( Http2Exception.PROTOCOL_ERROR.goaway( - s"Integer overflow for setting ${setting.name}: ${value}")) + s"Integer overflow for setting ${setting.name}: $value")) case _ => None } diff --git a/http/src/main/scala/org/http4s/blaze/http/http2/InboundStreamState.scala b/http/src/main/scala/org/http4s/blaze/http/http2/InboundStreamState.scala index a9fd85fde..8960369d1 100644 --- a/http/src/main/scala/org/http4s/blaze/http/http2/InboundStreamState.scala +++ b/http/src/main/scala/org/http4s/blaze/http/http2/InboundStreamState.scala @@ -18,6 +18,7 @@ package org.http4s.blaze.http.http2 /** Representation of inbound streams * - * @note this is only a marker trait + * @note + * this is only a marker trait */ private trait InboundStreamState extends StreamState diff --git a/http/src/main/scala/org/http4s/blaze/http/http2/OutboundStreamState.scala b/http/src/main/scala/org/http4s/blaze/http/http2/OutboundStreamState.scala index a17f3c575..6bb85a4c7 100644 --- a/http/src/main/scala/org/http4s/blaze/http/http2/OutboundStreamState.scala +++ b/http/src/main/scala/org/http4s/blaze/http/http2/OutboundStreamState.scala @@ -18,11 +18,11 @@ package org.http4s.blaze.http.http2 /** Representation of outbound streams * - * For outbound streams we need a concrete StreamState to send messages - * to, but we can't expect that we will have HEADERS to send right when - * it is born, so we need to make the stream ID lazy since they must be - * used in monotonically increasing order. + * For outbound streams we need a concrete StreamState to send messages to, but we can't expect + * that we will have HEADERS to send right when it is born, so we need to make the stream ID lazy + * since they must be used in monotonically increasing order. * - * @note this is a marker trait + * @note + * this is a marker trait */ private trait OutboundStreamState extends StreamState diff --git a/http/src/main/scala/org/http4s/blaze/http/http2/PriorKnowledgeHandshaker.scala b/http/src/main/scala/org/http4s/blaze/http/http2/PriorKnowledgeHandshaker.scala index cd8f172cb..e487818a3 100644 --- a/http/src/main/scala/org/http4s/blaze/http/http2/PriorKnowledgeHandshaker.scala +++ b/http/src/main/scala/org/http4s/blaze/http/http2/PriorKnowledgeHandshaker.scala @@ -32,11 +32,10 @@ abstract class PriorKnowledgeHandshaker[T](localSettings: ImmutableHttp2Settings /** Handle the prior knowledge preface * - * The preface is the magic string "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n" - * which is intended to cause HTTP/1.x connections to fail gracefully. - * For clients, this involves sending the magic string and for servers - * this consists of receiving the magic string. The return value of this - * function is any unconsumed inbound data. + * The preface is the magic string "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n" which is intended to cause + * HTTP/1.x connections to fail gracefully. For clients, this involves sending the magic string + * and for servers this consists of receiving the magic string. The return value of this function + * is any unconsumed inbound data. */ protected def handlePreface(): Future[ByteBuffer] diff --git a/http/src/main/scala/org/http4s/blaze/http/http2/Priority.scala b/http/src/main/scala/org/http4s/blaze/http/http2/Priority.scala index 95aec64fe..e14989140 100644 --- a/http/src/main/scala/org/http4s/blaze/http/http2/Priority.scala +++ b/http/src/main/scala/org/http4s/blaze/http/http2/Priority.scala @@ -24,8 +24,8 @@ object Priority { /** object representing the contents of a PRIORITY frame * - * This is also used for the HEADERS frame which is logically - * a series of headers with a possible PRIORITY frame + * This is also used for the HEADERS frame which is logically a series of headers with a possible + * PRIORITY frame */ final case class Dependent(dependentStreamId: Int, exclusive: Boolean, priority: Int) extends Priority { diff --git a/http/src/main/scala/org/http4s/blaze/http/http2/SessionCore.scala b/http/src/main/scala/org/http4s/blaze/http/http2/SessionCore.scala index 99a31f2af..3ce5bc47e 100644 --- a/http/src/main/scala/org/http4s/blaze/http/http2/SessionCore.scala +++ b/http/src/main/scala/org/http4s/blaze/http/http2/SessionCore.scala @@ -19,9 +19,9 @@ package org.http4s.blaze.http.http2 import scala.concurrent.ExecutionContext import scala.concurrent.duration.Duration -/** The Http2 session has a lot of interconnected pieces and the SessionCore - * provides a 'bag-o-references' so that each component can reference each - * other. This helps to avoid construction order conflicts. +/** The Http2 session has a lot of interconnected pieces and the SessionCore provides a + * 'bag-o-references' so that each component can reference each other. This helps to avoid + * construction order conflicts. */ private abstract class SessionCore { // Fields @@ -50,26 +50,27 @@ private abstract class SessionCore { /** Shutdown the session due to unhandled exception * * This is an emergency shutdown, and the session is in an undefined state. - * @note this method must be idempotent (even for reentrant calls) as it - * may be recalled by streams during the close process, etc. + * @note + * this method must be idempotent (even for reentrant calls) as it may be recalled by streams + * during the close process, etc. */ def invokeShutdownWithError(ex: Option[Throwable], phase: String): Unit /** Signal to the session to shutdown gracefully based direction from the remote peer * - * This entails draining the [[StreamManager]] and waiting for all write interests - * to drain. + * This entails draining the [[StreamManager]] and waiting for all write interests to drain. * - * @see `invokeDrain` for the locally initiated analog + * @see + * `invokeDrain` for the locally initiated analog */ def invokeGoAway(lastHandledOutboundStream: Int, error: Http2SessionException): Unit /** Signal for the session to begin draining based on the direction of the local peer * - * This entails draining the [[StreamManager]] and waiting for all write interests - * to drain. + * This entails draining the [[StreamManager]] and waiting for all write interests to drain. * - * @see `invokeGoAway` for the remote initiated analog + * @see + * `invokeGoAway` for the remote initiated analog */ def invokeDrain(gracePeriod: Duration): Unit } diff --git a/http/src/main/scala/org/http4s/blaze/http/http2/SessionFlowControl.scala b/http/src/main/scala/org/http4s/blaze/http/http2/SessionFlowControl.scala index 1681cc395..8c2dab75e 100644 --- a/http/src/main/scala/org/http4s/blaze/http/http2/SessionFlowControl.scala +++ b/http/src/main/scala/org/http4s/blaze/http/http2/SessionFlowControl.scala @@ -19,10 +19,11 @@ package org.http4s.blaze.http.http2 /** Flow control representation of a Http2 Session */ abstract class SessionFlowControl { - /** Create a new [[StreamFlowWindow]] for a stream which will update and check the - * bounds of the session flow control state. + /** Create a new [[StreamFlowWindow]] for a stream which will update and check the bounds of the + * session flow control state. * - * @note the stream [[StreamFlowWindow]] is not thread safe. + * @note + * the stream [[StreamFlowWindow]] is not thread safe. */ def newStreamFlowWindow(streamId: Int): StreamFlowWindow @@ -31,8 +32,10 @@ abstract class SessionFlowControl { /** Observe inbound bytes that don't belong to an active inbound stream * - * @param count bytes observed - * @return `true` if there was sufficient session flow window remaining, `false` otherwise. + * @param count + * bytes observed + * @return + * `true` if there was sufficient session flow window remaining, `false` otherwise. */ def sessionInboundObserved(count: Int): Boolean @@ -50,9 +53,9 @@ abstract class SessionFlowControl { /** Update the session outbound window * - * @note there is no way to withdraw outbound bytes directly from - * the session as there should always be an associated stream - * when sending flow control counted bytes outbound. + * @note + * there is no way to withdraw outbound bytes directly from the session as there should always + * be an associated stream when sending flow control counted bytes outbound. */ def sessionOutboundAcked(count: Int): Option[Http2Exception] } diff --git a/http/src/main/scala/org/http4s/blaze/http/http2/SessionFlowControlImpl.scala b/http/src/main/scala/org/http4s/blaze/http/http2/SessionFlowControlImpl.scala index ea941e8ac..f47c265f9 100644 --- a/http/src/main/scala/org/http4s/blaze/http/http2/SessionFlowControlImpl.scala +++ b/http/src/main/scala/org/http4s/blaze/http/http2/SessionFlowControlImpl.scala @@ -52,8 +52,10 @@ private class SessionFlowControlImpl( /** Called when bytes have been consumed from a live stream * - * @param stream stream associated with the consumed bytes - * @param consumed number of bytes consumed + * @param stream + * stream associated with the consumed bytes + * @param consumed + * number of bytes consumed */ protected def onStreamBytesConsumed(stream: StreamFlowWindow, consumed: Int): Unit = { val _ = consumed @@ -80,10 +82,11 @@ private class SessionFlowControlImpl( // Concrete methods ///////////////////////////////////////////////////////////// - /** Create a new [[StreamFlowWindow]] for a stream which will update and check the - * bounds of the session flow control state. + /** Create a new [[StreamFlowWindow]] for a stream which will update and check the bounds of the + * session flow control state. * - * @note the stream [[StreamFlowWindow]] is not thread safe. + * @note + * the stream [[StreamFlowWindow]] is not thread safe. */ final override def newStreamFlowWindow(streamId: Int): StreamFlowWindow = { require(0 < streamId) @@ -96,8 +99,10 @@ private class SessionFlowControlImpl( /** Observe inbound bytes that don't belong to an active inbound stream * - * @param count bytes observed - * @return `true` if there was sufficient session flow window remaining, `false` otherwise. + * @param count + * bytes observed + * @return + * `true` if there was sufficient session flow window remaining, `false` otherwise. */ final override def sessionInboundObserved(count: Int): Boolean = { logger.trace(s"Observed $count inbound session bytes. $sessionWindowString") @@ -141,9 +146,9 @@ private class SessionFlowControlImpl( /** Update the session outbound window * - * @note there is no way to withdraw outbound bytes directly from - * the session as there should always be an associated stream - * when sending flow control counted bytes outbound. + * @note + * there is no way to withdraw outbound bytes directly from the session as there should always + * be an associated stream when sending flow control counted bytes outbound. */ final override def sessionOutboundAcked(count: Int): Option[Http2Exception] = { logger.trace(s"$count outbound session bytes were ACKed. $sessionWindowString") diff --git a/http/src/main/scala/org/http4s/blaze/http/http2/SessionFrameListener.scala b/http/src/main/scala/org/http4s/blaze/http/http2/SessionFrameListener.scala index 5435c2d73..8d2e6a99e 100644 --- a/http/src/main/scala/org/http4s/blaze/http/http2/SessionFrameListener.scala +++ b/http/src/main/scala/org/http4s/blaze/http/http2/SessionFrameListener.scala @@ -25,8 +25,8 @@ import org.http4s.blaze.http.http2.Http2Settings.Setting /** Receives frames from the `Http2FrameDecoder` * - * Concurrency is not controlled by this type; it is expected that thread safety - * will be managed by the [[ConnectionImpl]]. + * Concurrency is not controlled by this type; it is expected that thread safety will be managed by + * the [[ConnectionImpl]]. */ private class SessionFrameListener( session: SessionCore, diff --git a/http/src/main/scala/org/http4s/blaze/http/http2/SettingsDecoder.scala b/http/src/main/scala/org/http4s/blaze/http/http2/SettingsDecoder.scala index 66800f82d..a4f5f2f74 100644 --- a/http/src/main/scala/org/http4s/blaze/http/http2/SettingsDecoder.scala +++ b/http/src/main/scala/org/http4s/blaze/http/http2/SettingsDecoder.scala @@ -22,7 +22,8 @@ import org.http4s.blaze.http.http2.Http2Settings.Setting import org.http4s.blaze.http.http2.bits.{Flags, FrameTypes} /** Utility for decoding a settings frame - * @see https://tools.ietf.org/html/rfc7540#section-6.5 + * @see + * https://tools.ietf.org/html/rfc7540#section-6.5 */ private[blaze] object SettingsDecoder { @@ -36,8 +37,10 @@ private[blaze] object SettingsDecoder { /** Decode a settings frame * - * @param buffer `ByteBuffer` consisting of exactly the frame, including the header. - * @return A [[SettingsFrame]] or a [[Http2Exception]] + * @param buffer + * `ByteBuffer` consisting of exactly the frame, including the header. + * @return + * A [[SettingsFrame]] or a [[Http2Exception]] */ def decodeSettingsFrame(buffer: ByteBuffer): Either[Http2Exception, SettingsFrame] = { val len = FrameDecoder.getLengthField(buffer) @@ -56,9 +59,12 @@ private[blaze] object SettingsDecoder { /** Decode the body of a SETTINGS frame * - * @param buffer `ByteBuffer` containing exactly the body of the settings frame - * @param streamId stream id obtained from the frame header. Must be 0x0. - * @param flags flags obtained from the frame header. + * @param buffer + * `ByteBuffer` containing exactly the body of the settings frame + * @param streamId + * stream id obtained from the frame header. Must be 0x0. + * @param flags + * flags obtained from the frame header. */ def decodeSettingsFrame( buffer: ByteBuffer, diff --git a/http/src/main/scala/org/http4s/blaze/http/http2/StreamFlowWindow.scala b/http/src/main/scala/org/http4s/blaze/http/http2/StreamFlowWindow.scala index bcaf864f9..48a8dba20 100644 --- a/http/src/main/scala/org/http4s/blaze/http/http2/StreamFlowWindow.scala +++ b/http/src/main/scala/org/http4s/blaze/http/http2/StreamFlowWindow.scala @@ -18,8 +18,8 @@ package org.http4s.blaze.http.http2 /** Representation of the flow control state of a stream belonging to a session * - * The `StreamFlowWindow` provides the tools for tracking the flow window for - * both the individual stream and the session that it belongs to. + * The `StreamFlowWindow` provides the tools for tracking the flow window for both the individual + * stream and the session that it belongs to. */ abstract class StreamFlowWindow { @@ -39,8 +39,8 @@ abstract class StreamFlowWindow { final def outboundWindow: Int = math.min(streamOutboundWindow, sessionFlowControl.sessionOutboundWindow) - /** Determine whether we have available flow window remaining, considering - * both the stream and the session flow windows + /** Determine whether we have available flow window remaining, considering both the stream and the + * session flow windows */ final def outboundWindowAvailable: Boolean = streamOutboundWindow > 0 && sessionFlowControl.sessionOutboundWindow > 0 @@ -49,19 +49,21 @@ abstract class StreamFlowWindow { * * If an error is returned, the internal state _must not_ be modified. * - * @param delta change in intial window size. Maybe be positive or negative, but must not - * cause the window to overflow Int.MaxValue. + * @param delta + * change in intial window size. Maybe be positive or negative, but must not cause the window + * to overflow Int.MaxValue. */ def remoteSettingsInitialWindowChange(delta: Int): Option[Http2Exception] /** Signal that a stream window update was received for `count` bytes */ def streamOutboundAcked(count: Int): Option[Http2Exception] - /** Request to withdraw bytes from the outbound window of the stream - * and the session. + /** Request to withdraw bytes from the outbound window of the stream and the session. * - * @param request maximum bytes to withdraw - * @return actual bytes withdrawn from the window + * @param request + * maximum bytes to withdraw + * @return + * actual bytes withdrawn from the window */ def outboundRequest(request: Int): Int @@ -73,13 +75,15 @@ abstract class StreamFlowWindow { * If there are sufficient bytes in the stream and session flow windows, they are subtracted, * otherwise the window is unmodified. * - * @return `true` if withdraw was successful, `false` otherwise. + * @return + * `true` if withdraw was successful, `false` otherwise. */ def inboundObserved(count: Int): Boolean /** Signal that `count` bytes have been consumed by the stream * - * @note The consumed bytes are also counted for the session flow window. + * @note + * The consumed bytes are also counted for the session flow window. */ def inboundConsumed(count: Int): Unit diff --git a/http/src/main/scala/org/http4s/blaze/http/http2/StreamFrame.scala b/http/src/main/scala/org/http4s/blaze/http/http2/StreamFrame.scala index 4e5fe0a9c..5c11ffa90 100644 --- a/http/src/main/scala/org/http4s/blaze/http/http2/StreamFrame.scala +++ b/http/src/main/scala/org/http4s/blaze/http/http2/StreamFrame.scala @@ -28,10 +28,11 @@ sealed trait StreamFrame { /** Data frame for http2 * - * @param endStream if this is the last message of the stream - * @param data actual stream data. The `ByteBuffer` indexes may be modified by the receiver. - * The `ByteBuffer` indexes are considered owned by this DataFrame, but its - * data must not be modified. + * @param endStream + * if this is the last message of the stream + * @param data + * actual stream data. The `ByteBuffer` indexes may be modified by the receiver. The `ByteBuffer` + * indexes are considered owned by this DataFrame, but its data must not be modified. */ case class DataFrame(endStream: Boolean, data: ByteBuffer) extends StreamFrame { def flowBytes: Int = data.remaining() @@ -39,9 +40,12 @@ case class DataFrame(endStream: Boolean, data: ByteBuffer) extends StreamFrame { /** Headers frame for http2 * - * @param priority priority of this stream - * @param endStream signal if this is the last frame of the stream - * @param headers attached headers + * @param priority + * priority of this stream + * @param endStream + * signal if this is the last frame of the stream + * @param headers + * attached headers */ case class HeadersFrame(priority: Priority, endStream: Boolean, headers: Headers) extends StreamFrame { diff --git a/http/src/main/scala/org/http4s/blaze/http/http2/StreamIdManager.scala b/http/src/main/scala/org/http4s/blaze/http/http2/StreamIdManager.scala index 1159c805d..94434528c 100644 --- a/http/src/main/scala/org/http4s/blaze/http/http2/StreamIdManager.scala +++ b/http/src/main/scala/org/http4s/blaze/http/http2/StreamIdManager.scala @@ -74,11 +74,13 @@ private final class StreamIdManager private ( /** Mark the stream id non-idle, and any idle inbound streams with lower ids * - * If the stream id is an inbound stream id and is idle then the specified it - * and all inbound id's preceding it are marked as non-idle. + * If the stream id is an inbound stream id and is idle then the specified it and all inbound + * id's preceding it are marked as non-idle. * - * @param id stream id to observe - * @return `true` if observed, `false` otherwise. + * @param id + * stream id to observe + * @return + * `true` if observed, `false` otherwise. */ def observeInboundId(id: Int): Boolean = if (!isIdleInboundId(id)) false @@ -89,7 +91,8 @@ private final class StreamIdManager private ( /** Acquire the next outbound stream id * - * @return the next streamId wrapped in `Some` if it exists, `None` otherwise. + * @return + * the next streamId wrapped in `Some` if it exists, `None` otherwise. */ def takeOutboundId(): Option[Int] = // Returns `None` if the stream id overflows, which is when a signed Int overflows diff --git a/http/src/main/scala/org/http4s/blaze/http/http2/StreamManager.scala b/http/src/main/scala/org/http4s/blaze/http/http2/StreamManager.scala index 309d248f3..e194ea4f8 100644 --- a/http/src/main/scala/org/http4s/blaze/http/http2/StreamManager.scala +++ b/http/src/main/scala/org/http4s/blaze/http/http2/StreamManager.scala @@ -21,9 +21,8 @@ import scala.concurrent.Future /** Manager of the active streams for a session * - * The `StreamManager` can be thought of as the collection of - * active streams and some associated helper methods for performing - * operations relevant to the HTTP/2 protocol. + * The `StreamManager` can be thought of as the collection of active streams and some associated + * helper methods for performing operations relevant to the HTTP/2 protocol. */ private trait StreamManager { @@ -35,11 +34,12 @@ private trait StreamManager { /** Update the flow windows of open streams due to a change of the initial flow window * - * A receiver MUST adjust the size of all stream flow-control windows that - * it maintains by the difference between the new value and the old value. + * A receiver MUST adjust the size of all stream flow-control windows that it maintains by the + * difference between the new value and the old value. * https://tools.ietf.org/html/rfc7540#section-6.9.2 * - * @param delta difference between the new initial window and the previous initial window + * @param delta + * difference between the new initial window and the previous initial window */ def initialFlowWindowChange(delta: Int): MaybeError @@ -55,22 +55,24 @@ private trait StreamManager { /** Creates a new OutboundStreamState which hasn't been allocated a stream id * - * Errors are returned lazily since resources aren't acquired until the write of - * the streams prelude. + * Errors are returned lazily since resources aren't acquired until the write of the streams + * prelude. */ def newOutboundStream(): OutboundStreamState - /** Cause the associated stream to be reset, if it exists as if due to - * a RST_STREAM frame. + /** Cause the associated stream to be reset, if it exists as if due to a RST_STREAM frame. * - * @param cause the reason the stream was reset + * @param cause + * the reason the stream was reset */ def rstStream(cause: Http2StreamException): MaybeError /** Called by a `StreamState` to remove itself from the StreamManager * - * @param streamState the stream being closed - * @return true if the stream existed and was closed, false otherwise + * @param streamState + * the stream being closed + * @return + * true if the stream existed and was closed, false otherwise */ def streamClosed(streamState: StreamState): Boolean @@ -79,24 +81,25 @@ private trait StreamManager { /** Update the specified flow window with the specified bytes * - * @note stream ID 0 indicates the session flow window + * @note + * stream ID 0 indicates the session flow window */ def flowWindowUpdate(streamId: Int, sizeIncrement: Int): MaybeError /** Close the `StreamManager` and all the associated streams immediately * - * Close all the streams of the session now, most commonly due to an error - * in the session. For a controlled shutdown, use `goAway`. + * Close all the streams of the session now, most commonly due to an error in the session. For a + * controlled shutdown, use `goAway`. */ def forceClose(cause: Option[Throwable]): Unit /** Drain the `StreamManager` gracefully * - * All outbound streams with ID's above the specified last handled ID will - * be reset with a REFUSED_STREAM stream error to signal that they were - * rejected by the remote peer. + * All outbound streams with ID's above the specified last handled ID will be reset with a + * REFUSED_STREAM stream error to signal that they were rejected by the remote peer. * - * @return a `Future` that will resolve once all streams have been drained + * @return + * a `Future` that will resolve once all streams have been drained */ def drain(lastHandledOutboundStream: Int, error: Http2SessionException): Future[Unit] } diff --git a/http/src/main/scala/org/http4s/blaze/http/http2/StreamState.scala b/http/src/main/scala/org/http4s/blaze/http/http2/StreamState.scala index 31877938f..ee470cc08 100644 --- a/http/src/main/scala/org/http4s/blaze/http/http2/StreamState.scala +++ b/http/src/main/scala/org/http4s/blaze/http/http2/StreamState.scala @@ -26,9 +26,9 @@ private trait StreamState extends HeadStage[StreamFrame] with WriteInterest { /** Whether the `StreamState` is part of the H2 session * - * This is used by client streams to signal that they haven't yet become - * part of the H2 session since they are 'lazy' until they have emitted - * the first HEADERS frame, at which point they get assigned a stream id. + * This is used by client streams to signal that they haven't yet become part of the H2 session + * since they are 'lazy' until they have emitted the first HEADERS frame, at which point they get + * assigned a stream id. */ def initialized: Boolean @@ -38,8 +38,8 @@ private trait StreamState extends HeadStage[StreamFrame] with WriteInterest { /** The flow window associated with this stream */ def flowWindow: StreamFlowWindow - /** Called when the outbound flow window of the session or this stream has had some data - * acked and we may now be able to make forward progress. + /** Called when the outbound flow window of the session or this stream has had some data acked and + * we may now be able to make forward progress. */ def outboundFlowWindowChanged(): Unit diff --git a/http/src/main/scala/org/http4s/blaze/http/http2/StreamStateImpl.scala b/http/src/main/scala/org/http4s/blaze/http/http2/StreamStateImpl.scala index 7123dc075..1c2bd5eab 100644 --- a/http/src/main/scala/org/http4s/blaze/http/http2/StreamStateImpl.scala +++ b/http/src/main/scala/org/http4s/blaze/http/http2/StreamStateImpl.scala @@ -27,15 +27,14 @@ import scala.concurrent.{Future, Promise} /** Virtual pipeline head for representing HTTP/2 streams * - * It provides the junction for de-multiplexing stream messages - * into an individual stream. It handles commands and errors for the - * stream and manages the lifetime in the parent session accordingly. + * It provides the junction for de-multiplexing stream messages into an individual stream. It + * handles commands and errors for the stream and manages the lifetime in the parent session + * accordingly. * - * @note While `StreamState` does enforce the end-stream semantics - * defined by HTTP/2, it doesn't attempt to enforce the semantics - * of the HTTP dispatch, specifically it doesn't enforce that - * HEADERS come before DATA, etc, and that duty belongs to the - * streams dispatcher. + * @note + * While `StreamState` does enforce the end-stream semantics defined by HTTP/2, it doesn't + * attempt to enforce the semantics of the HTTP dispatch, specifically it doesn't enforce that + * HEADERS come before DATA, etc, and that duty belongs to the streams dispatcher. */ private abstract class StreamStateImpl(session: SessionCore) extends StreamState { // State associated with the streams inbound data flow @@ -142,8 +141,8 @@ private abstract class StreamStateImpl(session: SessionCore) extends StreamState doRegisterWriteInterest() } - /** Called when the outbound flow window of the session or this stream has had some data - * acked and we may now be able to make forward progress. + /** Called when the outbound flow window of the session or this stream has had some data acked and + * we may now be able to make forward progress. */ final override def outboundFlowWindowChanged(): Unit = if (writePromise != null && flowWindow.outboundWindowAvailable) diff --git a/http/src/main/scala/org/http4s/blaze/http/http2/WriteController.scala b/http/src/main/scala/org/http4s/blaze/http/http2/WriteController.scala index 7065fc662..b08250190 100644 --- a/http/src/main/scala/org/http4s/blaze/http/http2/WriteController.scala +++ b/http/src/main/scala/org/http4s/blaze/http/http2/WriteController.scala @@ -27,11 +27,13 @@ import scala.concurrent.Future /** Generic interface used by HTTP2 types to write data */ private trait WriteController { - /** Register a [[WriteInterest]] with this listener to be invoked later once it is - * possible to write data to the outbound channel. + /** Register a [[WriteInterest]] with this listener to be invoked later once it is possible to + * write data to the outbound channel. * - * @param interest the `WriteListener` with an interest in performing a write operation. - * @return true if registration successful, false otherwise + * @param interest + * the `WriteListener` with an interest in performing a write operation. + * @return + * true if registration successful, false otherwise */ def registerWriteInterest(interest: WriteInterest): Boolean @@ -40,13 +42,15 @@ private trait WriteController { /** Queue multiple buffers for writing * - * @return true if the data was scheduled for writing, false otherwise. + * @return + * true if the data was scheduled for writing, false otherwise. */ def write(data: Seq[ByteBuffer]): Boolean /** Queue a buffer for writing * - * @return true if the data was scheduled for writing, false otherwise. + * @return + * true if the data was scheduled for writing, false otherwise. */ def write(data: ByteBuffer): Boolean } diff --git a/http/src/main/scala/org/http4s/blaze/http/http2/WriteControllerImpl.scala b/http/src/main/scala/org/http4s/blaze/http/http2/WriteControllerImpl.scala index 67216b072..261bd94b2 100644 --- a/http/src/main/scala/org/http4s/blaze/http/http2/WriteControllerImpl.scala +++ b/http/src/main/scala/org/http4s/blaze/http/http2/WriteControllerImpl.scala @@ -30,11 +30,12 @@ import scala.util.control.NonFatal /** Gracefully coordinate writes * - * The `WriteController`'s job is to direct outbound data in a fair, efficient, and - * thread safe manner. All calls to the `WriteController` are expected to come from - * within the session executor. + * The `WriteController`'s job is to direct outbound data in a fair, efficient, and thread safe + * manner. All calls to the `WriteController` are expected to come from within the session + * executor. * - * @param highWaterMark number of bytes that will trigger a flush. + * @param highWaterMark + * number of bytes that will trigger a flush. */ private final class WriteControllerImpl( session: SessionCore, diff --git a/http/src/main/scala/org/http4s/blaze/http/http2/WriteInterest.scala b/http/src/main/scala/org/http4s/blaze/http/http2/WriteInterest.scala index f8a1f6201..078737aca 100644 --- a/http/src/main/scala/org/http4s/blaze/http/http2/WriteInterest.scala +++ b/http/src/main/scala/org/http4s/blaze/http/http2/WriteInterest.scala @@ -23,12 +23,12 @@ private trait WriteInterest { /** Invoked by the [[WriteController]] that this `WriteInterest` is registered with. * - * Before being invoked, `this` must be unregistered from the [[WriteController]] and - * it is safe to add `this` back as an interest before returning the corresponding - * data, if desired. + * Before being invoked, `this` must be unregistered from the [[WriteController]] and it is safe + * to add `this` back as an interest before returning the corresponding data, if desired. * - * @note this method will be called by the `WriteController` from within - * the sessions serial executor. + * @note + * this method will be called by the `WriteController` from within the sessions serial + * executor. */ def performStreamWrite(): collection.Seq[ByteBuffer] } diff --git a/http/src/main/scala/org/http4s/blaze/http/http2/server/ALPNServerSelector.scala b/http/src/main/scala/org/http4s/blaze/http/http2/server/ALPNServerSelector.scala index cd69d6454..bab97a666 100644 --- a/http/src/main/scala/org/http4s/blaze/http/http2/server/ALPNServerSelector.scala +++ b/http/src/main/scala/org/http4s/blaze/http/http2/server/ALPNServerSelector.scala @@ -21,18 +21,20 @@ import java.nio.ByteBuffer import javax.net.ssl.SSLEngine import java.util import org.http4s.blaze.internal.compat.CollectionConverters._ -import org.http4s.blaze.pipeline.{LeafBuilder, TailStage, Command => Cmd} +import org.http4s.blaze.pipeline.{Command => Cmd, LeafBuilder, TailStage} import org.http4s.blaze.util.Execution.trampoline import scala.util.{Failure, Success} import scala.util.control.NonFatal /** Dynamically inject an appropriate pipeline using ALPN negotiation. * - * @param engine the `SSLEngine` in use for the connection - * @param selector selects the preferred protocol from the sequence of - * supported clients. May receive an empty sequence. - * @param builder builds the appropriate pipeline based on the negotiated - * protocol + * @param engine + * the `SSLEngine` in use for the connection + * @param selector + * selects the preferred protocol from the sequence of supported clients. May receive an empty + * sequence. + * @param builder + * builds the appropriate pipeline based on the negotiated protocol */ final class ALPNServerSelector( engine: SSLEngine, diff --git a/http/src/test/scala/org/http4s/blaze/http/http2/mocks/MockStreamManager.scala b/http/src/test/scala/org/http4s/blaze/http/http2/mocks/MockStreamManager.scala index a8e06ca08..2715bf3ad 100644 --- a/http/src/test/scala/org/http4s/blaze/http/http2/mocks/MockStreamManager.scala +++ b/http/src/test/scala/org/http4s/blaze/http/http2/mocks/MockStreamManager.scala @@ -17,7 +17,7 @@ package org.http4s.blaze.http.http2.mocks import org.http4s.blaze.http.Headers -import org.http4s.blaze.http.http2.{Result, MaybeError, _} +import org.http4s.blaze.http.http2.{MaybeError, Result, _} import scala.collection.mutable import scala.concurrent.Future diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 20ef8fe1e..31c932afc 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -1,10 +1,10 @@ import sbt._ object Dependencies { - lazy val logbackClassic = "ch.qos.logback" % "logback-classic" % "1.2.5" - lazy val twitterHPACK = "com.twitter" % "hpack" % "1.0.2" - lazy val asyncHttpClient = "org.asynchttpclient" % "async-http-client" % "2.12.3" - lazy val log4s = "org.log4s" %% "log4s" % "1.10.0" - lazy val munit = "org.scalameta" %% "munit" % "0.7.27" - lazy val scalacheckMunit = "org.scalameta" %% "munit-scalacheck" % munit.revision + lazy val logbackClassic = "ch.qos.logback" % "logback-classic" % "1.2.5" + lazy val twitterHPACK = "com.twitter" % "hpack" % "1.0.2" + lazy val asyncHttpClient = "org.asynchttpclient" % "async-http-client" % "2.12.3" + lazy val log4s = "org.log4s" %% "log4s" % "1.10.0" + lazy val munit = "org.scalameta" %% "munit" % "0.7.27" + lazy val scalacheckMunit = "org.scalameta" %% "munit-scalacheck" % munit.revision }