diff --git a/airframe-http-netty/src/main/scala/wvlet/airframe/http/netty/NettyRequestHandler.scala b/airframe-http-netty/src/main/scala/wvlet/airframe/http/netty/NettyRequestHandler.scala index 0d2d7e323c..176e160518 100644 --- a/airframe-http-netty/src/main/scala/wvlet/airframe/http/netty/NettyRequestHandler.scala +++ b/airframe-http-netty/src/main/scala/wvlet/airframe/http/netty/NettyRequestHandler.scala @@ -49,51 +49,58 @@ class NettyRequestHandler(config: NettyServerConfig, dispatcher: NettyBackend.Fi } override def channelRead0(ctx: ChannelHandlerContext, msg: FullHttpRequest): Unit = { - var req: wvlet.airframe.http.HttpMessage.Request = msg.method().name() match { - case HttpMethod.GET => Http.GET(msg.uri()) - case HttpMethod.POST => Http.POST(msg.uri()) - case HttpMethod.PUT => Http.PUT(msg.uri()) - case HttpMethod.DELETE => Http.DELETE(msg.uri()) - case HttpMethod.PATCH => Http.PATCH(msg.uri()) - case HttpMethod.TRACE => Http.request(wvlet.airframe.http.HttpMethod.TRACE, msg.uri()) - case HttpMethod.OPTIONS => Http.request(wvlet.airframe.http.HttpMethod.OPTIONS, msg.uri()) - case _ => ??? - } - - ctx.channel().remoteAddress() match { - case x: InetSocketAddress => - // TODO This address might be IPv6 - req = req.withRemoteAddress(ServerAddress(s"${x.getHostString}:${x.getPort}")) - case _ => - } + try { + var req: wvlet.airframe.http.HttpMessage.Request = msg.method().name().toUpperCase match { + case HttpMethod.GET => Http.GET(msg.uri()) + case HttpMethod.POST => Http.POST(msg.uri()) + case HttpMethod.PUT => Http.PUT(msg.uri()) + case HttpMethod.DELETE => Http.DELETE(msg.uri()) + case HttpMethod.PATCH => Http.PATCH(msg.uri()) + case HttpMethod.TRACE => Http.request(wvlet.airframe.http.HttpMethod.TRACE, msg.uri()) + case HttpMethod.OPTIONS => Http.request(wvlet.airframe.http.HttpMethod.OPTIONS, msg.uri()) + case HttpMethod.HEAD => Http.request(wvlet.airframe.http.HttpMethod.HEAD, msg.uri()) + case _ => + throw RPCStatus.INVALID_REQUEST_U1.newException(s"Unsupported HTTP method: ${msg.method()}") + } - msg.headers().names().asScala.map { x => - req = req.withHeader(x, msg.headers().get(x)) - } - val requestBody = msg.content() - val requestBodySize = requestBody.readableBytes() - if (requestBodySize > 0) { - val buf = new Array[Byte](requestBodySize) - requestBody.getBytes(requestBody.readerIndex(), buf) - req = req.withContent(buf) - } + ctx.channel().remoteAddress() match { + case x: InetSocketAddress => + // TODO This address might be IPv6 + req = req.withRemoteAddress(ServerAddress(s"${x.getHostString}:${x.getPort}")) + case _ => + } - val rxResponse: Rx[Response] = dispatcher.apply( - req, - NettyBackend.newContext { (request: Request) => - Rx.single(Http.response(HttpStatus.NotFound_404)) + msg.headers().names().asScala.map { x => + req = req.withHeader(x, msg.headers().get(x)) + } + val requestBody = msg.content() + val requestBodySize = requestBody.readableBytes() + if (requestBodySize > 0) { + val buf = new Array[Byte](requestBodySize) + requestBody.getBytes(requestBody.readerIndex(), buf) + req = req.withContent(buf) } - ) - RxRunner.run(rxResponse) { - case OnNext(v) => - val nettyResponse = toNettyResponse(v.asInstanceOf[Response]) - writeResponse(msg, ctx, nettyResponse) - case OnError(ex) => - val resp = RPCStatus.INTERNAL_ERROR_I0.newException(ex.getMessage, ex).toResponse - val nettyResponse = toNettyResponse(resp) - writeResponse(msg, ctx, nettyResponse) - case OnCompletion => + val rxResponse: Rx[Response] = dispatcher.apply( + req, + NettyBackend.newContext { (request: Request) => + Rx.single(Http.response(HttpStatus.NotFound_404)) + } + ) + + RxRunner.run(rxResponse) { + case OnNext(v) => + val nettyResponse = toNettyResponse(v.asInstanceOf[Response]) + writeResponse(msg, ctx, nettyResponse) + case OnError(ex) => + val resp = RPCStatus.INTERNAL_ERROR_I0.newException(ex.getMessage, ex).toResponse + val nettyResponse = toNettyResponse(resp) + writeResponse(msg, ctx, nettyResponse) + case OnCompletion => + } + } catch { + case e: RPCException => + writeResponse(msg, ctx, toNettyResponse(e.toResponse)) } } diff --git a/airframe-http-netty/src/test/scala/wvlet/airframe/http/netty/NettyServerMethodTest.scala b/airframe-http-netty/src/test/scala/wvlet/airframe/http/netty/NettyServerMethodTest.scala new file mode 100644 index 0000000000..6e0ff91fae --- /dev/null +++ b/airframe-http-netty/src/test/scala/wvlet/airframe/http/netty/NettyServerMethodTest.scala @@ -0,0 +1,67 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package wvlet.airframe.http.netty + +import wvlet.airframe.http.client.SyncClient +import wvlet.airframe.http.{Http, HttpMethod, HttpStatus} +import wvlet.airspec.AirSpec + +class NettyServerMethodTest extends AirSpec { + + initDesign( + _ + Netty.server.design + .bind[SyncClient].toProvider { (server: NettyServer) => + Http.client.withRetryContext(_.noRetry).newSyncClient(server.localAddress) + } + ) + + test("Handle various http methods") { (client: SyncClient) => + test("valid methods") { + for ( + m <- Seq( + HttpMethod.GET, + HttpMethod.POST, + HttpMethod.PUT, + HttpMethod.DELETE, + HttpMethod.PATCH, + HttpMethod.TRACE, + HttpMethod.OPTIONS, + HttpMethod.HEAD + ) + ) { + test(s"${m}") { + val resp = client.sendSafe(Http.request(m, "/get")) + resp.status shouldBe HttpStatus.NotFound_404 + } + test(s"${m.toLowerCase} (lower case)") { + val resp = client.sendSafe(Http.request(m, "/get")) + resp.status shouldBe HttpStatus.NotFound_404 + } + } + } + + test("reject unsupported methods") { + for (m <- Seq("UNKNOWN_METHOD", HttpMethod.CONNECT)) { + test(m) { + if (m == HttpMethod.CONNECT) { + pending("Not sure how to support CONNECT in Netty") + } + val resp = client.sendSafe(Http.request(m, "/get")) + resp.status shouldBe HttpStatus.BadRequest_400 + } + } + } + } + +} diff --git a/airframe-http-netty/src/test/scala/wvlet/airframe/http/netty/NettyServerTest.scala b/airframe-http-netty/src/test/scala/wvlet/airframe/http/netty/NettyServerTest.scala index 39a5999dfb..e20b44b876 100644 --- a/airframe-http-netty/src/test/scala/wvlet/airframe/http/netty/NettyServerTest.scala +++ b/airframe-http-netty/src/test/scala/wvlet/airframe/http/netty/NettyServerTest.scala @@ -13,25 +13,25 @@ */ package wvlet.airframe.http.netty -import wvlet.airframe.http.HttpServer +import wvlet.airframe.control.Control +import wvlet.airframe.control.Control.withResource +import wvlet.airframe.http.client.SyncClient +import wvlet.airframe.http.{Http, HttpMethod, HttpServer, HttpStatus} import wvlet.airspec.AirSpec class NettyServerTest extends AirSpec { - - override def design = { - Netty.server.designWithSyncClient - } + initDesign(_ + Netty.server.design) test("NettyServer should be available") { (server: NettyServer) => test("double start should be ignored") { server.start } + } - test("can't start server after closing it") { - server.close() - intercept[IllegalStateException] { - server.start - } + test("can't start server after closing it") { (server: NettyServer) => + server.close() + intercept[IllegalStateException] { + server.start } } diff --git a/airframe-http/src/main/scala/wvlet/airframe/http/HttpMethod.scala b/airframe-http/src/main/scala/wvlet/airframe/http/HttpMethod.scala index db0d81e7ae..eb00dd1a5f 100644 --- a/airframe-http/src/main/scala/wvlet/airframe/http/HttpMethod.scala +++ b/airframe-http/src/main/scala/wvlet/airframe/http/HttpMethod.scala @@ -24,4 +24,5 @@ object HttpMethod { final val OPTIONS = "OPTIONS" final val TRACE = "TRACE" final val HEAD = "HEAD" + final val CONNECT = "CONNECT" }