Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update scalafmt-core to 3.2.2 #595

Merged
merged 2 commits into from
Dec 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .scalafmt.conf
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version = 3.2.1
version = 3.2.2

style = default

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,9 @@ abstract class ServerChannel extends Closeable { self =>
case NonFatal(t) =>
logger.error(t)(s"Exception occurred during Channel shutdown.")
} finally
// If we're the last hook to run, we notify any listeners
if (countdown.decrementAndGet() == 0)
closeAndNotify()
// If we're the last hook to run, we notify any listeners
if (countdown.decrementAndGet() == 0)
closeAndNotify()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,9 @@ private[blaze] final class NIO1ClientChannel(

override def close(): Unit =
try underlying.close()
finally if (closed.compareAndSet(false, true)) {
onClose()
}
finally
if (closed.compareAndSet(false, true)) {
onClose()
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -73,41 +73,43 @@ private[nio1] object NIO1HeadStage {
ch: NIO1ClientChannel,
scratch: ByteBuffer,
buffers: Array[ByteBuffer]): WriteResult =
try if (BufferTools.areDirectOrEmpty(buffers)) {
ch.write(buffers)
if (util.BufferTools.checkEmpty(buffers)) Complete
else Incomplete
} else {
// To sidestep the java NIO "memory leak" (see http://www.evanjones.ca/java-bytebuffer-leak.html)
// We copy the data to the scratch buffer (which should be a direct ByteBuffer)
// before the write. We then check to see how much data was written and fast-forward
// the input buffers accordingly.
// This is very similar to the pattern used by the Oracle JDK implementation in its
// IOUtil class: if the provided buffers are not direct buffers, they are copied to
// temporary direct ByteBuffers and written.
@tailrec
def writeLoop(): WriteResult = {
scratch.clear()
BufferTools.copyBuffers(buffers, scratch)
scratch.flip()
try
if (BufferTools.areDirectOrEmpty(buffers)) {
ch.write(buffers)
if (util.BufferTools.checkEmpty(buffers)) Complete
else Incomplete
} else {
// To sidestep the java NIO "memory leak" (see http://www.evanjones.ca/java-bytebuffer-leak.html)
// We copy the data to the scratch buffer (which should be a direct ByteBuffer)
// before the write. We then check to see how much data was written and fast-forward
// the input buffers accordingly.
// This is very similar to the pattern used by the Oracle JDK implementation in its
// IOUtil class: if the provided buffers are not direct buffers, they are copied to
// temporary direct ByteBuffers and written.
@tailrec
def writeLoop(): WriteResult = {
scratch.clear()
BufferTools.copyBuffers(buffers, scratch)
scratch.flip()

val written = ch.write(scratch)
if (written > 0)
assert(BufferTools.fastForwardBuffers(buffers, written))

if (scratch.remaining > 0)
// Couldn't write all the data.
Incomplete
else if (util.BufferTools.checkEmpty(buffers))
// All data was written
Complete
else
// May still be able to write more to the socket buffer.
writeLoop()
}

val written = ch.write(scratch)
if (written > 0)
assert(BufferTools.fastForwardBuffers(buffers, written))

if (scratch.remaining > 0)
// Couldn't write all the data.
Incomplete
else if (util.BufferTools.checkEmpty(buffers))
// All data was written
Complete
else
// May still be able to write more to the socket buffer.
writeLoop()
writeLoop()
}

writeLoop()
} catch {
catch {
case _: ClosedChannelException => WriteError(EOF)
case e: IOException if ChannelHead.brokePipeMessages.contains(e.getMessage) =>
WriteError(EOF)
Expand Down Expand Up @@ -318,21 +320,22 @@ private[nio1] final class NIO1HeadStage(
sendInboundCommand(Disconnected)
}

try selectorLoop.executeTask(new Runnable {
def run(): Unit = {
logger.trace(
s"closeWithError($cause); readPromise: $readPromise, writePromise: $writePromise")
val c = cause match {
case Some(ex) =>
logger.error(cause.get)("Abnormal NIO1HeadStage termination")
ex
case None => EOF
try
selectorLoop.executeTask(new Runnable {
def run(): Unit = {
logger.trace(
s"closeWithError($cause); readPromise: $readPromise, writePromise: $writePromise")
val c = cause match {
case Some(ex) =>
logger.error(cause.get)("Abnormal NIO1HeadStage termination")
ex
case None => EOF
}
if (key.isValid) key.interestOps(0)
key.attach(null)
doClose(c)
}
if (key.isValid) key.interestOps(0)
key.attach(null)
doClose(c)
}
})
})
catch {
case e: RejectedExecutionException =>
logger.error(e)("Event loop closed. Closing in current thread.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,11 +220,11 @@ private final class NIO1SocketServerGroup private (
}

try
// We use `enqueueTask` deliberately so as to not jump ahead
// of channel initialization.
selectorLoop.enqueueTask(new Runnable {
override def run(): Unit = doClose()
})
// We use `enqueueTask` deliberately so as to not jump ahead
// of channel initialization.
selectorLoop.enqueueTask(new Runnable {
override def run(): Unit = doClose()
})
catch {
case _: RejectedExecutionException =>
this.logger.info("Selector loop closed. Closing in local thread.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,20 +153,22 @@ final class SelectorLoop(
private[this] def runLoop(): Unit = {
var cleanShutdown = true

try while (!isClosed) {
// Block here until some I/O event happens or someone adds
// a task to run and wakes the loop.
val selected = selector.select()
try
while (!isClosed) {
// Block here until some I/O event happens or someone adds
// a task to run and wakes the loop.
val selected = selector.select()

// Run any pending tasks. These may set interest ops,
// just compute something, etc.
taskQueue.executeTasks()
// Run any pending tasks. These may set interest ops,
// just compute something, etc.
taskQueue.executeTasks()

// We have some new I/O operations waiting for us. Process them.
if (selected > 0) {
processKeys(scratch, selector.selectedKeys)
// We have some new I/O operations waiting for us. Process them.
if (selected > 0) {
processKeys(scratch, selector.selectedKeys)
}
}
} catch {
catch {
case e: ClosedSelectorException =>
logger.error(e)("Selector unexpectedly closed")
cleanShutdown = false
Expand All @@ -192,16 +194,18 @@ final class SelectorLoop(
it.remove()

val selectable = getAttachment(k)
try if (k.isValid) {
if (selectable != null) {
selectable.opsReady(scratch)
} else {
k.cancel()
logger.error("Illegal state: selector key had null attachment.")
try
if (k.isValid) {
if (selectable != null) {
selectable.opsReady(scratch)
} else {
k.cancel()
logger.error("Illegal state: selector key had null attachment.")
}
} else if (selectable != null) {
selectable.close(None)
}
} else if (selectable != null) {
selectable.close(None)
} catch {
catch {
case t @ (NonFatal(_) | _: ControlThrowable) =>
logger.error(t)("Error performing channel operations. Closing channel.")
try selectable.close(Some(t))
Expand Down
9 changes: 5 additions & 4 deletions core/src/main/scala/org/http4s/blaze/pipeline/Stages.scala
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,11 @@ sealed trait Tail[I] extends Stage {
}

def channelRead(size: Int = -1, timeout: Duration = Duration.Inf): Future[I] =
try if (_prevStage != null) {
val f = _prevStage.readRequest(size)
checkTimeout(timeout, f)
} else _stageDisconnected()
try
if (_prevStage != null) {
val f = _prevStage.readRequest(size)
checkTimeout(timeout, f)
} else _stageDisconnected()
catch { case NonFatal(t) => Future.failed(t) }

/** Write a single outbound message to the pipeline */
Expand Down
37 changes: 19 additions & 18 deletions http/src/main/scala/org/http4s/blaze/http/http2/FrameDecoder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -61,24 +61,25 @@ private class FrameDecoder(localSettings: Http2Settings, listener: FrameListener
val endOfFrame = buffer.position() + len
buffer.limit(endOfFrame)

try frameType match {
case FrameTypes.DATA => decodeDataFrame(buffer, streamId, flags)
case FrameTypes.HEADERS => decodeHeaderFrame(buffer, streamId, flags)
case FrameTypes.PRIORITY => decodePriorityFrame(buffer, streamId)
case FrameTypes.RST_STREAM => decodeRstStreamFrame(buffer, streamId)
case FrameTypes.SETTINGS => decodeSettingsFrame(buffer, streamId, flags)
case FrameTypes.PUSH_PROMISE =>
decodePushPromiseFrame(buffer, streamId, flags)
case FrameTypes.PING => decodePingFrame(buffer, streamId, flags)
case FrameTypes.GOAWAY => decodeGoAwayFrame(buffer, streamId)
case FrameTypes.WINDOW_UPDATE =>
decodeWindowUpdateFrame(buffer, streamId)
case FrameTypes.CONTINUATION =>
decodeContinuationFrame(buffer, streamId, flags)

// this concludes the types established by HTTP/2.0, but it could be an extension
case code => onExtensionFrame(code, streamId, flags, buffer.slice())
} catch {
try
frameType match {
case FrameTypes.DATA => decodeDataFrame(buffer, streamId, flags)
case FrameTypes.HEADERS => decodeHeaderFrame(buffer, streamId, flags)
case FrameTypes.PRIORITY => decodePriorityFrame(buffer, streamId)
case FrameTypes.RST_STREAM => decodeRstStreamFrame(buffer, streamId)
case FrameTypes.SETTINGS => decodeSettingsFrame(buffer, streamId, flags)
case FrameTypes.PUSH_PROMISE =>
decodePushPromiseFrame(buffer, streamId, flags)
case FrameTypes.PING => decodePingFrame(buffer, streamId, flags)
case FrameTypes.GOAWAY => decodeGoAwayFrame(buffer, streamId)
case FrameTypes.WINDOW_UPDATE =>
decodeWindowUpdateFrame(buffer, streamId)
case FrameTypes.CONTINUATION =>
decodeContinuationFrame(buffer, streamId, flags)

// this concludes the types established by HTTP/2.0, but it could be an extension
case code => onExtensionFrame(code, streamId, flags, buffer.slice())
} catch {
case _: BufferUnderflowException =>
Error(
FRAME_SIZE_ERROR.goaway(
Expand Down