diff --git a/.scalafmt.conf b/.scalafmt.conf index 147647fe9..64382b818 100644 --- a/.scalafmt.conf +++ b/.scalafmt.conf @@ -1,4 +1,4 @@ -version = 3.2.1 +version = 3.2.2 style = default diff --git a/core/src/main/scala/org/http4s/blaze/channel/ServerChannel.scala b/core/src/main/scala/org/http4s/blaze/channel/ServerChannel.scala index 80bd87054..85a8ea060 100644 --- a/core/src/main/scala/org/http4s/blaze/channel/ServerChannel.scala +++ b/core/src/main/scala/org/http4s/blaze/channel/ServerChannel.scala @@ -123,9 +123,9 @@ abstract class ServerChannel extends Closeable { self => case NonFatal(t) => logger.error(t)(s"Exception occurred during Channel shutdown.") } finally - // If we're the last hook to run, we notify any listeners - if (countdown.decrementAndGet() == 0) - closeAndNotify() + // If we're the last hook to run, we notify any listeners + if (countdown.decrementAndGet() == 0) + closeAndNotify() } } diff --git a/core/src/main/scala/org/http4s/blaze/channel/nio1/NIO1ClientChannel.scala b/core/src/main/scala/org/http4s/blaze/channel/nio1/NIO1ClientChannel.scala index adaab34d7..df11159cc 100644 --- a/core/src/main/scala/org/http4s/blaze/channel/nio1/NIO1ClientChannel.scala +++ b/core/src/main/scala/org/http4s/blaze/channel/nio1/NIO1ClientChannel.scala @@ -62,8 +62,9 @@ private[blaze] final class NIO1ClientChannel( override def close(): Unit = try underlying.close() - finally if (closed.compareAndSet(false, true)) { - onClose() - } + finally + if (closed.compareAndSet(false, true)) { + onClose() + } } diff --git a/core/src/main/scala/org/http4s/blaze/channel/nio1/NIO1HeadStage.scala b/core/src/main/scala/org/http4s/blaze/channel/nio1/NIO1HeadStage.scala index aeae540a9..cf1deba3a 100644 --- a/core/src/main/scala/org/http4s/blaze/channel/nio1/NIO1HeadStage.scala +++ b/core/src/main/scala/org/http4s/blaze/channel/nio1/NIO1HeadStage.scala @@ -73,41 +73,43 @@ private[nio1] object NIO1HeadStage { ch: NIO1ClientChannel, scratch: ByteBuffer, buffers: Array[ByteBuffer]): WriteResult = - try if (BufferTools.areDirectOrEmpty(buffers)) { - ch.write(buffers) - if (util.BufferTools.checkEmpty(buffers)) Complete - else Incomplete - } else { - // To sidestep the java NIO "memory leak" (see http://www.evanjones.ca/java-bytebuffer-leak.html) - // We copy the data to the scratch buffer (which should be a direct ByteBuffer) - // before the write. We then check to see how much data was written and fast-forward - // the input buffers accordingly. - // This is very similar to the pattern used by the Oracle JDK implementation in its - // IOUtil class: if the provided buffers are not direct buffers, they are copied to - // temporary direct ByteBuffers and written. - @tailrec - def writeLoop(): WriteResult = { - scratch.clear() - BufferTools.copyBuffers(buffers, scratch) - scratch.flip() + try + if (BufferTools.areDirectOrEmpty(buffers)) { + ch.write(buffers) + if (util.BufferTools.checkEmpty(buffers)) Complete + else Incomplete + } else { + // To sidestep the java NIO "memory leak" (see http://www.evanjones.ca/java-bytebuffer-leak.html) + // We copy the data to the scratch buffer (which should be a direct ByteBuffer) + // before the write. We then check to see how much data was written and fast-forward + // the input buffers accordingly. + // This is very similar to the pattern used by the Oracle JDK implementation in its + // IOUtil class: if the provided buffers are not direct buffers, they are copied to + // temporary direct ByteBuffers and written. + @tailrec + def writeLoop(): WriteResult = { + scratch.clear() + BufferTools.copyBuffers(buffers, scratch) + scratch.flip() + + val written = ch.write(scratch) + if (written > 0) + assert(BufferTools.fastForwardBuffers(buffers, written)) + + if (scratch.remaining > 0) + // Couldn't write all the data. + Incomplete + else if (util.BufferTools.checkEmpty(buffers)) + // All data was written + Complete + else + // May still be able to write more to the socket buffer. + writeLoop() + } - val written = ch.write(scratch) - if (written > 0) - assert(BufferTools.fastForwardBuffers(buffers, written)) - - if (scratch.remaining > 0) - // Couldn't write all the data. - Incomplete - else if (util.BufferTools.checkEmpty(buffers)) - // All data was written - Complete - else - // May still be able to write more to the socket buffer. - writeLoop() + writeLoop() } - - writeLoop() - } catch { + catch { case _: ClosedChannelException => WriteError(EOF) case e: IOException if ChannelHead.brokePipeMessages.contains(e.getMessage) => WriteError(EOF) @@ -318,21 +320,22 @@ private[nio1] final class NIO1HeadStage( sendInboundCommand(Disconnected) } - try selectorLoop.executeTask(new Runnable { - def run(): Unit = { - logger.trace( - s"closeWithError($cause); readPromise: $readPromise, writePromise: $writePromise") - val c = cause match { - case Some(ex) => - logger.error(cause.get)("Abnormal NIO1HeadStage termination") - ex - case None => EOF + try + selectorLoop.executeTask(new Runnable { + def run(): Unit = { + logger.trace( + s"closeWithError($cause); readPromise: $readPromise, writePromise: $writePromise") + val c = cause match { + case Some(ex) => + logger.error(cause.get)("Abnormal NIO1HeadStage termination") + ex + case None => EOF + } + if (key.isValid) key.interestOps(0) + key.attach(null) + doClose(c) } - if (key.isValid) key.interestOps(0) - key.attach(null) - doClose(c) - } - }) + }) catch { case e: RejectedExecutionException => logger.error(e)("Event loop closed. Closing in current thread.") diff --git a/core/src/main/scala/org/http4s/blaze/channel/nio1/NIO1SocketServerGroup.scala b/core/src/main/scala/org/http4s/blaze/channel/nio1/NIO1SocketServerGroup.scala index 999f30c01..f18c52610 100644 --- a/core/src/main/scala/org/http4s/blaze/channel/nio1/NIO1SocketServerGroup.scala +++ b/core/src/main/scala/org/http4s/blaze/channel/nio1/NIO1SocketServerGroup.scala @@ -220,11 +220,11 @@ private final class NIO1SocketServerGroup private ( } try - // We use `enqueueTask` deliberately so as to not jump ahead - // of channel initialization. - selectorLoop.enqueueTask(new Runnable { - override def run(): Unit = doClose() - }) + // We use `enqueueTask` deliberately so as to not jump ahead + // of channel initialization. + selectorLoop.enqueueTask(new Runnable { + override def run(): Unit = doClose() + }) catch { case _: RejectedExecutionException => this.logger.info("Selector loop closed. Closing in local thread.") diff --git a/core/src/main/scala/org/http4s/blaze/channel/nio1/SelectorLoop.scala b/core/src/main/scala/org/http4s/blaze/channel/nio1/SelectorLoop.scala index 53427c61f..44a8256d8 100644 --- a/core/src/main/scala/org/http4s/blaze/channel/nio1/SelectorLoop.scala +++ b/core/src/main/scala/org/http4s/blaze/channel/nio1/SelectorLoop.scala @@ -153,20 +153,22 @@ final class SelectorLoop( private[this] def runLoop(): Unit = { var cleanShutdown = true - try while (!isClosed) { - // Block here until some I/O event happens or someone adds - // a task to run and wakes the loop. - val selected = selector.select() + try + while (!isClosed) { + // Block here until some I/O event happens or someone adds + // a task to run and wakes the loop. + val selected = selector.select() - // Run any pending tasks. These may set interest ops, - // just compute something, etc. - taskQueue.executeTasks() + // Run any pending tasks. These may set interest ops, + // just compute something, etc. + taskQueue.executeTasks() - // We have some new I/O operations waiting for us. Process them. - if (selected > 0) { - processKeys(scratch, selector.selectedKeys) + // We have some new I/O operations waiting for us. Process them. + if (selected > 0) { + processKeys(scratch, selector.selectedKeys) + } } - } catch { + catch { case e: ClosedSelectorException => logger.error(e)("Selector unexpectedly closed") cleanShutdown = false @@ -192,16 +194,18 @@ final class SelectorLoop( it.remove() val selectable = getAttachment(k) - try if (k.isValid) { - if (selectable != null) { - selectable.opsReady(scratch) - } else { - k.cancel() - logger.error("Illegal state: selector key had null attachment.") + try + if (k.isValid) { + if (selectable != null) { + selectable.opsReady(scratch) + } else { + k.cancel() + logger.error("Illegal state: selector key had null attachment.") + } + } else if (selectable != null) { + selectable.close(None) } - } else if (selectable != null) { - selectable.close(None) - } catch { + catch { case t @ (NonFatal(_) | _: ControlThrowable) => logger.error(t)("Error performing channel operations. Closing channel.") try selectable.close(Some(t)) diff --git a/core/src/main/scala/org/http4s/blaze/pipeline/Stages.scala b/core/src/main/scala/org/http4s/blaze/pipeline/Stages.scala index c04e2f1aa..85ff8fba9 100644 --- a/core/src/main/scala/org/http4s/blaze/pipeline/Stages.scala +++ b/core/src/main/scala/org/http4s/blaze/pipeline/Stages.scala @@ -98,10 +98,11 @@ sealed trait Tail[I] extends Stage { } def channelRead(size: Int = -1, timeout: Duration = Duration.Inf): Future[I] = - try if (_prevStage != null) { - val f = _prevStage.readRequest(size) - checkTimeout(timeout, f) - } else _stageDisconnected() + try + if (_prevStage != null) { + val f = _prevStage.readRequest(size) + checkTimeout(timeout, f) + } else _stageDisconnected() catch { case NonFatal(t) => Future.failed(t) } /** Write a single outbound message to the pipeline */ diff --git a/http/src/main/scala/org/http4s/blaze/http/http2/FrameDecoder.scala b/http/src/main/scala/org/http4s/blaze/http/http2/FrameDecoder.scala index f6308cad5..e428fdc89 100644 --- a/http/src/main/scala/org/http4s/blaze/http/http2/FrameDecoder.scala +++ b/http/src/main/scala/org/http4s/blaze/http/http2/FrameDecoder.scala @@ -61,24 +61,25 @@ private class FrameDecoder(localSettings: Http2Settings, listener: FrameListener val endOfFrame = buffer.position() + len buffer.limit(endOfFrame) - try frameType match { - case FrameTypes.DATA => decodeDataFrame(buffer, streamId, flags) - case FrameTypes.HEADERS => decodeHeaderFrame(buffer, streamId, flags) - case FrameTypes.PRIORITY => decodePriorityFrame(buffer, streamId) - case FrameTypes.RST_STREAM => decodeRstStreamFrame(buffer, streamId) - case FrameTypes.SETTINGS => decodeSettingsFrame(buffer, streamId, flags) - case FrameTypes.PUSH_PROMISE => - decodePushPromiseFrame(buffer, streamId, flags) - case FrameTypes.PING => decodePingFrame(buffer, streamId, flags) - case FrameTypes.GOAWAY => decodeGoAwayFrame(buffer, streamId) - case FrameTypes.WINDOW_UPDATE => - decodeWindowUpdateFrame(buffer, streamId) - case FrameTypes.CONTINUATION => - decodeContinuationFrame(buffer, streamId, flags) - - // this concludes the types established by HTTP/2.0, but it could be an extension - case code => onExtensionFrame(code, streamId, flags, buffer.slice()) - } catch { + try + frameType match { + case FrameTypes.DATA => decodeDataFrame(buffer, streamId, flags) + case FrameTypes.HEADERS => decodeHeaderFrame(buffer, streamId, flags) + case FrameTypes.PRIORITY => decodePriorityFrame(buffer, streamId) + case FrameTypes.RST_STREAM => decodeRstStreamFrame(buffer, streamId) + case FrameTypes.SETTINGS => decodeSettingsFrame(buffer, streamId, flags) + case FrameTypes.PUSH_PROMISE => + decodePushPromiseFrame(buffer, streamId, flags) + case FrameTypes.PING => decodePingFrame(buffer, streamId, flags) + case FrameTypes.GOAWAY => decodeGoAwayFrame(buffer, streamId) + case FrameTypes.WINDOW_UPDATE => + decodeWindowUpdateFrame(buffer, streamId) + case FrameTypes.CONTINUATION => + decodeContinuationFrame(buffer, streamId, flags) + + // this concludes the types established by HTTP/2.0, but it could be an extension + case code => onExtensionFrame(code, streamId, flags, buffer.slice()) + } catch { case _: BufferUnderflowException => Error( FRAME_SIZE_ERROR.goaway(