Skip to content

Commit

Permalink
netty (fix): Fixes #3313 'Implementation is missing' error (#3357)
Browse files Browse the repository at this point in the history
  • Loading branch information
xerial authored Jan 29, 2024
1 parent 7d1ded4 commit 76dd486
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,51 +49,58 @@ class NettyRequestHandler(config: NettyServerConfig, dispatcher: NettyBackend.Fi
}

override def channelRead0(ctx: ChannelHandlerContext, msg: FullHttpRequest): Unit = {
var req: wvlet.airframe.http.HttpMessage.Request = msg.method().name() match {
case HttpMethod.GET => Http.GET(msg.uri())
case HttpMethod.POST => Http.POST(msg.uri())
case HttpMethod.PUT => Http.PUT(msg.uri())
case HttpMethod.DELETE => Http.DELETE(msg.uri())
case HttpMethod.PATCH => Http.PATCH(msg.uri())
case HttpMethod.TRACE => Http.request(wvlet.airframe.http.HttpMethod.TRACE, msg.uri())
case HttpMethod.OPTIONS => Http.request(wvlet.airframe.http.HttpMethod.OPTIONS, msg.uri())
case _ => ???
}

ctx.channel().remoteAddress() match {
case x: InetSocketAddress =>
// TODO This address might be IPv6
req = req.withRemoteAddress(ServerAddress(s"${x.getHostString}:${x.getPort}"))
case _ =>
}
try {
var req: wvlet.airframe.http.HttpMessage.Request = msg.method().name().toUpperCase match {
case HttpMethod.GET => Http.GET(msg.uri())
case HttpMethod.POST => Http.POST(msg.uri())
case HttpMethod.PUT => Http.PUT(msg.uri())
case HttpMethod.DELETE => Http.DELETE(msg.uri())
case HttpMethod.PATCH => Http.PATCH(msg.uri())
case HttpMethod.TRACE => Http.request(wvlet.airframe.http.HttpMethod.TRACE, msg.uri())
case HttpMethod.OPTIONS => Http.request(wvlet.airframe.http.HttpMethod.OPTIONS, msg.uri())
case HttpMethod.HEAD => Http.request(wvlet.airframe.http.HttpMethod.HEAD, msg.uri())
case _ =>
throw RPCStatus.INVALID_REQUEST_U1.newException(s"Unsupported HTTP method: ${msg.method()}")
}

msg.headers().names().asScala.map { x =>
req = req.withHeader(x, msg.headers().get(x))
}
val requestBody = msg.content()
val requestBodySize = requestBody.readableBytes()
if (requestBodySize > 0) {
val buf = new Array[Byte](requestBodySize)
requestBody.getBytes(requestBody.readerIndex(), buf)
req = req.withContent(buf)
}
ctx.channel().remoteAddress() match {
case x: InetSocketAddress =>
// TODO This address might be IPv6
req = req.withRemoteAddress(ServerAddress(s"${x.getHostString}:${x.getPort}"))
case _ =>
}

val rxResponse: Rx[Response] = dispatcher.apply(
req,
NettyBackend.newContext { (request: Request) =>
Rx.single(Http.response(HttpStatus.NotFound_404))
msg.headers().names().asScala.map { x =>
req = req.withHeader(x, msg.headers().get(x))
}
val requestBody = msg.content()
val requestBodySize = requestBody.readableBytes()
if (requestBodySize > 0) {
val buf = new Array[Byte](requestBodySize)
requestBody.getBytes(requestBody.readerIndex(), buf)
req = req.withContent(buf)
}
)

RxRunner.run(rxResponse) {
case OnNext(v) =>
val nettyResponse = toNettyResponse(v.asInstanceOf[Response])
writeResponse(msg, ctx, nettyResponse)
case OnError(ex) =>
val resp = RPCStatus.INTERNAL_ERROR_I0.newException(ex.getMessage, ex).toResponse
val nettyResponse = toNettyResponse(resp)
writeResponse(msg, ctx, nettyResponse)
case OnCompletion =>
val rxResponse: Rx[Response] = dispatcher.apply(
req,
NettyBackend.newContext { (request: Request) =>
Rx.single(Http.response(HttpStatus.NotFound_404))
}
)

RxRunner.run(rxResponse) {
case OnNext(v) =>
val nettyResponse = toNettyResponse(v.asInstanceOf[Response])
writeResponse(msg, ctx, nettyResponse)
case OnError(ex) =>
val resp = RPCStatus.INTERNAL_ERROR_I0.newException(ex.getMessage, ex).toResponse
val nettyResponse = toNettyResponse(resp)
writeResponse(msg, ctx, nettyResponse)
case OnCompletion =>
}
} catch {
case e: RPCException =>
writeResponse(msg, ctx, toNettyResponse(e.toResponse))
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package wvlet.airframe.http.netty

import wvlet.airframe.http.client.SyncClient
import wvlet.airframe.http.{Http, HttpMethod, HttpStatus}
import wvlet.airspec.AirSpec

class NettyServerMethodTest extends AirSpec {

initDesign(
_ + Netty.server.design
.bind[SyncClient].toProvider { (server: NettyServer) =>
Http.client.withRetryContext(_.noRetry).newSyncClient(server.localAddress)
}
)

test("Handle various http methods") { (client: SyncClient) =>
test("valid methods") {
for (
m <- Seq(
HttpMethod.GET,
HttpMethod.POST,
HttpMethod.PUT,
HttpMethod.DELETE,
HttpMethod.PATCH,
HttpMethod.TRACE,
HttpMethod.OPTIONS,
HttpMethod.HEAD
)
) {
test(s"${m}") {
val resp = client.sendSafe(Http.request(m, "/get"))
resp.status shouldBe HttpStatus.NotFound_404
}
test(s"${m.toLowerCase} (lower case)") {
val resp = client.sendSafe(Http.request(m, "/get"))
resp.status shouldBe HttpStatus.NotFound_404
}
}
}

test("reject unsupported methods") {
for (m <- Seq("UNKNOWN_METHOD", HttpMethod.CONNECT)) {
test(m) {
if (m == HttpMethod.CONNECT) {
pending("Not sure how to support CONNECT in Netty")
}
val resp = client.sendSafe(Http.request(m, "/get"))
resp.status shouldBe HttpStatus.BadRequest_400
}
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,25 +13,25 @@
*/
package wvlet.airframe.http.netty

import wvlet.airframe.http.HttpServer
import wvlet.airframe.control.Control
import wvlet.airframe.control.Control.withResource
import wvlet.airframe.http.client.SyncClient
import wvlet.airframe.http.{Http, HttpMethod, HttpServer, HttpStatus}
import wvlet.airspec.AirSpec

class NettyServerTest extends AirSpec {

override def design = {
Netty.server.designWithSyncClient
}
initDesign(_ + Netty.server.design)

test("NettyServer should be available") { (server: NettyServer) =>
test("double start should be ignored") {
server.start
}
}

test("can't start server after closing it") {
server.close()
intercept[IllegalStateException] {
server.start
}
test("can't start server after closing it") { (server: NettyServer) =>
server.close()
intercept[IllegalStateException] {
server.start
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,5 @@ object HttpMethod {
final val OPTIONS = "OPTIONS"
final val TRACE = "TRACE"
final val HEAD = "HEAD"
final val CONNECT = "CONNECT"
}

0 comments on commit 76dd486

Please sign in to comment.