diff --git a/frameworks/Java/netty/.sdkmanrc b/frameworks/Java/netty/.sdkmanrc new file mode 100644 index 00000000000..2478fa5a19d --- /dev/null +++ b/frameworks/Java/netty/.sdkmanrc @@ -0,0 +1,3 @@ +# Enable auto-env through the sdkman_auto_env config +# Add key=value pairs of SDKs to use below +java=24-oracle diff --git a/frameworks/Java/netty/benchmark_config.json b/frameworks/Java/netty/benchmark_config.json index 0202931297c..79c322fef4e 100644 --- a/frameworks/Java/netty/benchmark_config.json +++ b/frameworks/Java/netty/benchmark_config.json @@ -19,6 +19,25 @@ "display_name": "netty", "notes": "", "versus": "netty" + }, + "loom": { + "json_url": "/json", + "plaintext_url": "/plaintext", + "port": 8080, + "approach": "Realistic", + "classification": "Platform", + "database": "None", + "framework": "netty", + "language": "Java", + "flavor": "None", + "orm": "Raw", + "platform": "Netty", + "webserver": "None", + "os": "Linux", + "database_os": "Linux", + "display_name": "netty", + "notes": "", + "versus": "netty" } }] } diff --git a/frameworks/Java/netty/config.toml b/frameworks/Java/netty/config.toml index 3780eace78d..bea6d72db37 100644 --- a/frameworks/Java/netty/config.toml +++ b/frameworks/Java/netty/config.toml @@ -13,3 +13,16 @@ orm = "Raw" platform = "Netty" webserver = "None" versus = "netty" + +[loom] +urls.plaintext = "/plaintext" +urls.json = "/json" +approach = "Realistic" +classification = "Platform" +database = "None" +database_os = "Linux" +os = "Linux" +orm = "Raw" +platform = "Netty" +webserver = "None" +versus = "netty" diff --git a/frameworks/Java/netty/netty-loom.dockerfile b/frameworks/Java/netty/netty-loom.dockerfile new file mode 100644 index 00000000000..1405909b099 --- /dev/null +++ b/frameworks/Java/netty/netty-loom.dockerfile @@ -0,0 +1,15 @@ +FROM maven:3.9.9-eclipse-temurin-24-noble as maven +WORKDIR /netty +COPY pom.xml pom.xml +COPY src src +RUN mvn compile assembly:single -q + +FROM maven:3.9.9-eclipse-temurin-24-noble +WORKDIR /netty +COPY --from=maven /netty/target/app.jar app.jar +COPY run_netty_loom.sh run_netty_loom.sh + +EXPOSE 8080 +# see https://github.com/netty/netty/issues/14942 +# remember to run this with --privileged since https://github.com/TechEmpower/FrameworkBenchmarks/blob/c94f7f95bd751f86a57dea8b63fb8f336bdbbde3/toolset/utils/docker_helper.py#L239 does it +ENTRYPOINT "./run_netty_loom.sh" \ No newline at end of file diff --git a/frameworks/Java/netty/netty.dockerfile b/frameworks/Java/netty/netty.dockerfile index 25636ca4f2f..ecc8cd4269d 100644 --- a/frameworks/Java/netty/netty.dockerfile +++ b/frameworks/Java/netty/netty.dockerfile @@ -1,13 +1,15 @@ -FROM maven:3.6.1-jdk-11-slim as maven +FROM maven:3.9.9-eclipse-temurin-24-noble as maven WORKDIR /netty COPY pom.xml pom.xml COPY src src RUN mvn compile assembly:single -q -FROM openjdk:11.0.3-jdk-slim +FROM maven:3.9.9-eclipse-temurin-24-noble WORKDIR /netty -COPY --from=maven /netty/target/netty-example-0.1-jar-with-dependencies.jar app.jar +COPY --from=maven /netty/target/app.jar app.jar +COPY run_netty.sh run_netty.sh EXPOSE 8080 - -CMD ["java", "-server", "-XX:+UseNUMA", "-XX:+UseParallelGC", "-XX:+AggressiveOpts", "-Dio.netty.buffer.checkBounds=false", "-Dio.netty.buffer.checkAccessible=false", "-Dio.netty.iouring.iosqeAsyncThreshold=32000", "-jar", "app.jar"] +# see https://github.com/netty/netty/issues/14942 +# remember to run this with --privileged since https://github.com/TechEmpower/FrameworkBenchmarks/blob/c94f7f95bd751f86a57dea8b63fb8f336bdbbde3/toolset/utils/docker_helper.py#L239 does it +ENTRYPOINT "./run_netty.sh" \ No newline at end of file diff --git a/frameworks/Java/netty/pom.xml b/frameworks/Java/netty/pom.xml index 3b1f26781de..c9be856b254 100644 --- a/frameworks/Java/netty/pom.xml +++ b/frameworks/Java/netty/pom.xml @@ -9,10 +9,9 @@ 0.1 - 11 - 11 - 4.1.108.Final - 0.0.21.Final + 24 + 24 + 4.2.0.Final jar @@ -21,7 +20,7 @@ io.netty - netty-codec-http + netty-all ${netty.version} @@ -40,9 +39,9 @@ - io.netty.incubator - netty-incubator-transport-native-io_uring - ${io_uring.version} + io.netty + netty-transport-native-io_uring + ${netty.version} linux-x86_64 @@ -74,6 +73,7 @@ maven-assembly-plugin + app hello.HelloWebServer @@ -82,6 +82,7 @@ jar-with-dependencies + false diff --git a/frameworks/Java/netty/run_netty.sh b/frameworks/Java/netty/run_netty.sh new file mode 100755 index 00000000000..413dff9e601 --- /dev/null +++ b/frameworks/Java/netty/run_netty.sh @@ -0,0 +1,14 @@ +#!/bin/bash + +# PROFILING: -XX:+UnlockDiagnosticVMOptions -XX:+DebugNonSafepoints +JAVA_OPTIONS="--enable-native-access=ALL-UNNAMED \ + -Dio.netty.noUnsafe=false \ + --sun-misc-unsafe-memory-access=allow \ + --add-opens=java.base/java.lang=ALL-UNNAMED \ + -XX:+UseNUMA \ + -XX:+UseParallelGC \ + -Dio.netty.buffer.checkBounds=false \ + -Dio.netty.buffer.checkAccessible=false \ + $@" + +java $JAVA_OPTIONS -jar app.jar diff --git a/frameworks/Java/netty/run_netty_loom.sh b/frameworks/Java/netty/run_netty_loom.sh new file mode 100755 index 00000000000..6cc9085441f --- /dev/null +++ b/frameworks/Java/netty/run_netty_loom.sh @@ -0,0 +1,18 @@ +#!/bin/bash + +# PROFILING: -XX:+UnlockDiagnosticVMOptions -XX:+DebugNonSafepoints +JAVA_OPTIONS="--enable-native-access=ALL-UNNAMED \ + -Dio.netty.noUnsafe=false \ + --sun-misc-unsafe-memory-access=allow \ + --add-opens=java.base/java.lang=ALL-UNNAMED \ + -XX:+UseNUMA \ + -XX:+UseParallelGC \ + -Dio.netty.buffer.checkBounds=false \ + -Dio.netty.buffer.checkAccessible=false \ + -Dhello.eventloop.carrier=true \ + -XX:+UnlockExperimentalVMOptions \ + -XX:-DoJVMTIVirtualThreadTransitions \ + -Djdk.trackAllThreads=false \ + $@" + +java $JAVA_OPTIONS -jar app.jar diff --git a/frameworks/Java/netty/src/main/java/hello/Constants.java b/frameworks/Java/netty/src/main/java/hello/Constants.java new file mode 100644 index 00000000000..63da7d66851 --- /dev/null +++ b/frameworks/Java/netty/src/main/java/hello/Constants.java @@ -0,0 +1,24 @@ +package hello; + +import io.netty.util.AsciiString; +import io.netty.util.CharsetUtil; + +public class Constants { + + public static final byte[] STATIC_PLAINTEXT = "Hello, World!".getBytes(CharsetUtil.UTF_8); + public static final int STATIC_PLAINTEXT_LEN = STATIC_PLAINTEXT.length; + + public static final CharSequence PLAINTEXT_CLHEADER_VALUE = AsciiString.cached(String.valueOf(STATIC_PLAINTEXT_LEN)); + public static final int JSON_LEN = jsonLen(); + public static final CharSequence JSON_CLHEADER_VALUE = AsciiString.cached(String.valueOf(JSON_LEN)); + public static final CharSequence SERVER_NAME = AsciiString.cached("Netty"); + + private static int jsonLen() { + return JsonUtils.serializeMsg(newMsg()).length; + } + + public static Message newMsg() { + return new Message("Hello, World!"); + } + +} diff --git a/frameworks/Java/netty/src/main/java/hello/HelloServerHandler.java b/frameworks/Java/netty/src/main/java/hello/HelloServerHandler.java index c82e008c4d6..ffed975ab09 100644 --- a/frameworks/Java/netty/src/main/java/hello/HelloServerHandler.java +++ b/frameworks/Java/netty/src/main/java/hello/HelloServerHandler.java @@ -1,28 +1,20 @@ package hello; -import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_LENGTH; -import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TYPE; -import static io.netty.handler.codec.http.HttpHeaderNames.DATE; -import static io.netty.handler.codec.http.HttpHeaderNames.SERVER; -import static io.netty.handler.codec.http.HttpHeaderValues.APPLICATION_JSON; -import static io.netty.handler.codec.http.HttpHeaderValues.TEXT_PLAIN; +import static hello.HttpResponses.makeJsonResponse; +import static hello.HttpResponses.makePlaintextResponse; +import static hello.JsonUtils.acquireJsonStreamFromEventLoop; +import static hello.JsonUtils.releaseJsonStreamFromEventLoop; import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND; -import static io.netty.handler.codec.http.HttpResponseStatus.OK; import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; -import java.io.IOException; import java.text.DateFormat; import java.text.SimpleDateFormat; -import java.util.Arrays; import java.util.Date; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import com.jsoniter.output.JsonStream; -import com.jsoniter.output.JsonStreamPool; -import com.jsoniter.spi.JsonException; -import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; @@ -33,51 +25,21 @@ import io.netty.handler.codec.http.HttpRequest; import io.netty.handler.codec.http.LastHttpContent; import io.netty.util.AsciiString; -import io.netty.util.CharsetUtil; import io.netty.util.ReferenceCountUtil; import io.netty.util.concurrent.FastThreadLocal; public class HelloServerHandler extends ChannelInboundHandlerAdapter { - private static final FastThreadLocal FORMAT = new FastThreadLocal() { - @Override - protected DateFormat initialValue() { - return new SimpleDateFormat("E, dd MMM yyyy HH:mm:ss z"); - } - }; - - private static Message newMsg() { - return new Message("Hello, World!"); - } - - private static byte[] serializeMsg(Message obj) { - JsonStream stream = JsonStreamPool.borrowJsonStream(); - try { - stream.reset(null); - stream.writeVal(Message.class, obj); - return Arrays.copyOfRange(stream.buffer().data(), 0, stream.buffer().tail()); - } catch (IOException e) { - throw new JsonException(e); - } finally { - JsonStreamPool.returnJsonStream(stream); - } - } - - private static int jsonLen() { - return serializeMsg(newMsg()).length; - } + private static final FastThreadLocal FORMAT = new FastThreadLocal<>() { + @Override + protected DateFormat initialValue() { + return new SimpleDateFormat("E, dd MMM yyyy HH:mm:ss z"); + } + }; - private static final byte[] STATIC_PLAINTEXT = "Hello, World!".getBytes(CharsetUtil.UTF_8); - private static final int STATIC_PLAINTEXT_LEN = STATIC_PLAINTEXT.length; + protected volatile AsciiString date = new AsciiString(FORMAT.get().format(new Date())); - private static final CharSequence PLAINTEXT_CLHEADER_VALUE = AsciiString.cached(String.valueOf(STATIC_PLAINTEXT_LEN)); - private static final int JSON_LEN = jsonLen(); - private static final CharSequence JSON_CLHEADER_VALUE = AsciiString.cached(String.valueOf(JSON_LEN)); - private static final CharSequence SERVER_NAME = AsciiString.cached("Netty"); - - private volatile CharSequence date = new AsciiString(FORMAT.get().format(new Date())); - - HelloServerHandler(ScheduledExecutorService service) { + public HelloServerHandler(ScheduledExecutorService service) { service.scheduleWithFixedDelay(new Runnable() { private final DateFormat format = FORMAT.get(); @@ -118,42 +80,39 @@ private void process(ChannelHandlerContext ctx, HttpRequest request) throws Exce String uri = request.uri(); switch (uri) { case "/plaintext": - writePlainResponse(ctx, Unpooled.wrappedBuffer(STATIC_PLAINTEXT)); + writePlainResponse(ctx, date); return; case "/json": - byte[] json = serializeMsg(newMsg()); - writeJsonResponse(ctx, Unpooled.wrappedBuffer(json)); + // even for the virtual thread case we expect virtual threads to be executed inlined! + var stream = acquireJsonStreamFromEventLoop(); + try { + writeJsonResponse(ctx, stream, date); + } finally { + releaseJsonStreamFromEventLoop(stream); + } return; } + // we drain in-flight responses before closing the connection + channelReadComplete(ctx); FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, NOT_FOUND, Unpooled.EMPTY_BUFFER, false); - ctx.write(response).addListener(ChannelFutureListener.CLOSE); - } - - private void writePlainResponse(ChannelHandlerContext ctx, ByteBuf buf) { - ctx.write(makeResponse(buf, TEXT_PLAIN, PLAINTEXT_CLHEADER_VALUE), ctx.voidPromise()); + ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE); } - private void writeJsonResponse(ChannelHandlerContext ctx, ByteBuf buf) { - ctx.write(makeResponse(buf, APPLICATION_JSON, JSON_CLHEADER_VALUE), ctx.voidPromise()); + protected void writePlainResponse(ChannelHandlerContext ctx, AsciiString date) { + ctx.write(makePlaintextResponse(date), ctx.voidPromise()); } - private FullHttpResponse makeResponse(ByteBuf buf, CharSequence contentType, CharSequence contentLength) { - final FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, OK, buf, false); - response.headers() - .set(CONTENT_TYPE, contentType) - .set(SERVER, SERVER_NAME) - .set(DATE, date) - .set(CONTENT_LENGTH, contentLength); - return response; + protected void writeJsonResponse(ChannelHandlerContext ctx, JsonStream stream, AsciiString date) { + ctx.write(makeJsonResponse(stream, date), ctx.voidPromise()); } @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { ctx.close(); } @Override - public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + public void channelReadComplete(ChannelHandlerContext ctx) { ctx.flush(); } } diff --git a/frameworks/Java/netty/src/main/java/hello/HelloServerInitializer.java b/frameworks/Java/netty/src/main/java/hello/HelloServerInitializer.java index 060dd49930c..67692802a83 100644 --- a/frameworks/Java/netty/src/main/java/hello/HelloServerInitializer.java +++ b/frameworks/Java/netty/src/main/java/hello/HelloServerInitializer.java @@ -14,7 +14,7 @@ public class HelloServerInitializer extends ChannelInitializer { - private final ScheduledExecutorService service; + protected final ScheduledExecutorService service; public HelloServerInitializer(ScheduledExecutorService service) { this.service = service; @@ -46,6 +46,10 @@ protected boolean isContentAlwaysEmpty(final HttpMessage msg) { return false; } }) - .addLast("handler", new HelloServerHandler(service)); + .addLast("handler", newHelloServerHandler(service)); + } + + protected HelloServerHandler newHelloServerHandler(ScheduledExecutorService service) { + return new HelloServerHandler(service); } } diff --git a/frameworks/Java/netty/src/main/java/hello/HelloWebServer.java b/frameworks/Java/netty/src/main/java/hello/HelloWebServer.java index 84cc86ab273..cca8e555113 100644 --- a/frameworks/Java/netty/src/main/java/hello/HelloWebServer.java +++ b/frameworks/Java/netty/src/main/java/hello/HelloWebServer.java @@ -2,30 +2,35 @@ import java.net.InetSocketAddress; +import hello.loom.HelloLoomServerInitializer; +import hello.loom.LoomSupport; +import hello.loom.MultithreadVirtualEventExecutorGroup; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelOption; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.ServerChannel; -import io.netty.channel.epoll.Epoll; import io.netty.channel.epoll.EpollChannelOption; -import io.netty.channel.epoll.EpollEventLoopGroup; -import io.netty.channel.epoll.EpollServerSocketChannel; -import io.netty.channel.kqueue.KQueue; -import io.netty.channel.kqueue.KQueueServerSocketChannel; -import io.netty.channel.nio.NioEventLoopGroup; -import io.netty.channel.socket.nio.NioServerSocketChannel; -import io.netty.incubator.channel.uring.IOUring; -import io.netty.incubator.channel.uring.IOUringChannelOption; -import io.netty.incubator.channel.uring.IOUringEventLoopGroup; -import io.netty.incubator.channel.uring.IOUringServerSocketChannel; +import io.netty.channel.uring.IoUringChannelOption; import io.netty.util.ResourceLeakDetector; import io.netty.util.ResourceLeakDetector.Level; public class HelloWebServer { + private static final boolean EVENT_LOOP_CARRIER = Boolean.getBoolean("hello.eventloop.carrier"); + private static final IoMultiplexer PREFERRED_TRANSPORT; + static { ResourceLeakDetector.setLevel(Level.DISABLED); + String transportName = System.getProperty("hello.transport"); + if (transportName != null) { + try { + PREFERRED_TRANSPORT = IoMultiplexer.valueOf(transportName); + } catch (IllegalArgumentException e) { + System.err.println("Invalid transport name: " + transportName); + throw e; + } + } else { + PREFERRED_TRANSPORT = IoMultiplexer.type(); + } } private final int port; @@ -35,47 +40,46 @@ public HelloWebServer(int port) { } public void run() throws Exception { - // Configure the server. - if (IOUring.isAvailable()) { - doRun(new IOUringEventLoopGroup(Runtime.getRuntime().availableProcessors()), IOUringServerSocketChannel.class, IoMultiplexer.IO_URING); - } else - if (Epoll.isAvailable()) { - doRun(new EpollEventLoopGroup(), EpollServerSocketChannel.class, IoMultiplexer.EPOLL); - } else if (KQueue.isAvailable()) { - doRun(new EpollEventLoopGroup(), KQueueServerSocketChannel.class, IoMultiplexer.KQUEUE); - } else { - doRun(new NioEventLoopGroup(), NioServerSocketChannel.class, IoMultiplexer.JDK); + final var preferredTransport = PREFERRED_TRANSPORT; + System.out.printf("Using %s IoMultiplexer%n", preferredTransport); + final int coreCount = Runtime.getRuntime().availableProcessors(); + final var group = EVENT_LOOP_CARRIER? + preferredTransport.newVirtualEventExecutorGroup(coreCount) : + preferredTransport.newEventLoopGroup(coreCount); + if (EVENT_LOOP_CARRIER) { + LoomSupport.checkSupported(); + System.out.println("Using EventLoop optimized for Loom"); } - } - - private void doRun(EventLoopGroup loupGroup, Class serverChannelClass, IoMultiplexer multiplexer) throws InterruptedException { try { - InetSocketAddress inet = new InetSocketAddress(port); - - System.out.printf("Using %s IoMultiplexer%n", multiplexer); + final var serverChannelClass = preferredTransport.serverChannelClass(); + var inet = new InetSocketAddress(port); + var b = new ServerBootstrap(); - ServerBootstrap b = new ServerBootstrap(); - - if (multiplexer == IoMultiplexer.EPOLL) { - b.option(EpollChannelOption.SO_REUSEPORT, true); - } - - if (multiplexer == IoMultiplexer.IO_URING) { - b.option(IOUringChannelOption.SO_REUSEPORT, true); - } - b.option(ChannelOption.SO_BACKLOG, 8192); b.option(ChannelOption.SO_REUSEADDR, true); - b.group(loupGroup).channel(serverChannelClass).childHandler(new HelloServerInitializer(loupGroup.next())); + switch (preferredTransport) { + case EPOLL: + b.option(EpollChannelOption.SO_REUSEPORT, true); + break; + case IO_URING: + b.option(IoUringChannelOption.SO_REUSEPORT, true); + break; + } + var channelB = b.group(group).channel(serverChannelClass); + if (EVENT_LOOP_CARRIER) { + channelB.childHandler(new HelloLoomServerInitializer((MultithreadVirtualEventExecutorGroup) group, group.next())); + } else { + channelB.childHandler(new HelloServerInitializer(group.next())); + } b.childOption(ChannelOption.SO_REUSEADDR, true); Channel ch = b.bind(inet).sync().channel(); - System.out.printf("Httpd started. Listening on: %s%n", inet.toString()); + System.out.printf("Httpd started. Listening on: %s%n", inet); ch.closeFuture().sync(); } finally { - loupGroup.shutdownGracefully().sync(); + group.shutdownGracefully().sync(); } } @@ -87,5 +91,7 @@ public static void main(String[] args) throws Exception { port = 8080; } new HelloWebServer(port).run(); + + } } diff --git a/frameworks/Java/netty/src/main/java/hello/HttpResponses.java b/frameworks/Java/netty/src/main/java/hello/HttpResponses.java new file mode 100644 index 00000000000..64b88ccf086 --- /dev/null +++ b/frameworks/Java/netty/src/main/java/hello/HttpResponses.java @@ -0,0 +1,45 @@ +package hello; + +import static hello.Constants.JSON_CLHEADER_VALUE; +import static hello.Constants.PLAINTEXT_CLHEADER_VALUE; +import static hello.Constants.SERVER_NAME; +import static hello.Constants.STATIC_PLAINTEXT; +import static hello.Constants.newMsg; +import static hello.JsonUtils.serializeMsg; +import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_LENGTH; +import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TYPE; +import static io.netty.handler.codec.http.HttpHeaderNames.DATE; +import static io.netty.handler.codec.http.HttpHeaderNames.SERVER; +import static io.netty.handler.codec.http.HttpHeaderValues.APPLICATION_JSON; +import static io.netty.handler.codec.http.HttpHeaderValues.TEXT_PLAIN; +import static io.netty.handler.codec.http.HttpResponseStatus.OK; +import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; + +import com.jsoniter.output.JsonStream; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.handler.codec.http.DefaultFullHttpResponse; +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.util.AsciiString; + +public class HttpResponses { + + public static FullHttpResponse makePlaintextResponse(AsciiString date) { + return makeResponse(Unpooled.wrappedBuffer(STATIC_PLAINTEXT), TEXT_PLAIN, PLAINTEXT_CLHEADER_VALUE, date); + } + + public static FullHttpResponse makeJsonResponse(JsonStream stream, AsciiString date) { + return makeResponse(Unpooled.wrappedBuffer(serializeMsg(newMsg(), stream)), APPLICATION_JSON, JSON_CLHEADER_VALUE, date); + } + + private static FullHttpResponse makeResponse(ByteBuf buf, CharSequence contentType, CharSequence contentLength, AsciiString date) { + final FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, OK, buf, false); + response.headers() + .set(CONTENT_TYPE, contentType) + .set(SERVER, SERVER_NAME) + .set(DATE, date) + .set(CONTENT_LENGTH, contentLength); + return response; + } +} diff --git a/frameworks/Java/netty/src/main/java/hello/IoMultiplexer.java b/frameworks/Java/netty/src/main/java/hello/IoMultiplexer.java index 4a58e4f01b5..d50d5e13464 100644 --- a/frameworks/Java/netty/src/main/java/hello/IoMultiplexer.java +++ b/frameworks/Java/netty/src/main/java/hello/IoMultiplexer.java @@ -1,5 +1,59 @@ package hello; +import hello.loom.MultithreadVirtualEventExecutorGroup; +import io.netty.channel.IoHandlerFactory; +import io.netty.channel.MultiThreadIoEventLoopGroup; +import io.netty.channel.ServerChannel; +import io.netty.channel.epoll.Epoll; +import io.netty.channel.epoll.EpollIoHandler; +import io.netty.channel.epoll.EpollServerSocketChannel; +import io.netty.channel.kqueue.KQueue; +import io.netty.channel.kqueue.KQueueIoHandler; +import io.netty.channel.kqueue.KQueueServerSocketChannel; +import io.netty.channel.nio.NioIoHandler; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.channel.uring.IoUring; +import io.netty.channel.uring.IoUringIoHandler; +import io.netty.channel.uring.IoUringServerSocketChannel; + public enum IoMultiplexer { - EPOLL, KQUEUE, JDK, IO_URING + EPOLL, KQUEUE, JDK, IO_URING; + + public Class serverChannelClass() { + return switch (this) { + case EPOLL -> EpollServerSocketChannel.class; + case KQUEUE -> KQueueServerSocketChannel.class; + case JDK -> NioServerSocketChannel.class; + case IO_URING -> IoUringServerSocketChannel.class; + }; + } + + public IoHandlerFactory newIoHandlerFactory() { + return switch (this) { + case EPOLL -> EpollIoHandler.newFactory(); + case KQUEUE -> KQueueIoHandler.newFactory(); + case JDK -> NioIoHandler.newFactory(); + case IO_URING -> IoUringIoHandler.newFactory(); + }; + } + + public MultiThreadIoEventLoopGroup newEventLoopGroup(int nThreads) { + return new MultiThreadIoEventLoopGroup(nThreads, newIoHandlerFactory()); + } + + public MultithreadVirtualEventExecutorGroup newVirtualEventExecutorGroup(int nThreads) { + return new MultithreadVirtualEventExecutorGroup(nThreads, newIoHandlerFactory()); + } + + public static IoMultiplexer type() { + if (IoUring.isAvailable()) { + return IO_URING; + } else if (Epoll.isAvailable()) { + return EPOLL; + } else if (KQueue.isAvailable()) { + return KQUEUE; + } else { + return JDK; + } + } } \ No newline at end of file diff --git a/frameworks/Java/netty/src/main/java/hello/JsonUtils.java b/frameworks/Java/netty/src/main/java/hello/JsonUtils.java new file mode 100644 index 00000000000..4b7ca5723ed --- /dev/null +++ b/frameworks/Java/netty/src/main/java/hello/JsonUtils.java @@ -0,0 +1,78 @@ +package hello; + +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.lang.invoke.VarHandle; +import java.util.Arrays; + +import com.jsoniter.output.JsonStream; +import com.jsoniter.output.JsonStreamPool; +import com.jsoniter.spi.Config; +import com.jsoniter.spi.JsonException; + +import io.netty.util.concurrent.FastThreadLocal; + +public class JsonUtils { + + private static final VarHandle INDENTATION; + + static { + try { + var lookup = MethodHandles.privateLookupIn(JsonStream.class, MethodHandles.lookup()); + INDENTATION = lookup.findVarHandle(JsonStream.class, "indention", int.class); + var dummy = new JsonStream(null, 32); + INDENTATION.set(dummy, 4); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private static void setIndentation(JsonStream stream, int value) { + INDENTATION.set(stream, value); // Plain store + } + + private static final FastThreadLocal JSON_STREAM = new FastThreadLocal<>(); + + public static JsonStream acquireJsonStreamFromEventLoop() { + var stream = JSON_STREAM.get(); + if (stream == null) { + stream = new JsonStream(null, 512) { + // this is to save virtual threads to use thread locals + @Override + public Config currentConfig() { + return Config.INSTANCE; + } + }; + } else { + stream.reset(null); + JSON_STREAM.set(null); + } + return stream; + } + + public static JsonStream releaseJsonStreamFromEventLoop(JsonStream jsonStream) { + jsonStream.configCache = null; + setIndentation(jsonStream, 0); + JSON_STREAM.set(jsonStream); + return jsonStream; + } + + public static byte[] serializeMsg(Message message) { + var stream = JsonStreamPool.borrowJsonStream(); + try { + return serializeMsg(message, stream); + } finally { + // Reset the stream to avoid memory leaks + JsonStreamPool.returnJsonStream(stream); + } + } + + public static byte[] serializeMsg(Message obj, JsonStream stream) { + try { + stream.writeVal(Message.class, obj); + return Arrays.copyOfRange(stream.buffer().data(), 0, stream.buffer().tail()); + } catch (IOException e) { + throw new JsonException(e); + } + } +} diff --git a/frameworks/Java/netty/src/main/java/hello/loom/HelloLoomServerInitializer.java b/frameworks/Java/netty/src/main/java/hello/loom/HelloLoomServerInitializer.java new file mode 100644 index 00000000000..afa5db2cac5 --- /dev/null +++ b/frameworks/Java/netty/src/main/java/hello/loom/HelloLoomServerInitializer.java @@ -0,0 +1,21 @@ +package hello.loom; + +import java.util.concurrent.ScheduledExecutorService; + +import hello.HelloServerHandler; +import hello.HelloServerInitializer; + +public class HelloLoomServerInitializer extends HelloServerInitializer { + + private final MultithreadVirtualEventExecutorGroup group; + + public HelloLoomServerInitializer(MultithreadVirtualEventExecutorGroup group, ScheduledExecutorService service) { + super(service); + this.group = group; + } + + @Override + protected HelloServerHandler newHelloServerHandler(ScheduledExecutorService service) { + return new VirtualThreadHelloServerHandler(service, group); + } +} diff --git a/frameworks/Java/netty/src/main/java/hello/loom/LoomSupport.java b/frameworks/Java/netty/src/main/java/hello/loom/LoomSupport.java new file mode 100644 index 00000000000..2349072bbd2 --- /dev/null +++ b/frameworks/Java/netty/src/main/java/hello/loom/LoomSupport.java @@ -0,0 +1,71 @@ +package hello.loom; + +import java.lang.invoke.MethodHandle; +import java.lang.invoke.MethodHandles; +import java.lang.reflect.Field; +import java.util.concurrent.Executor; + +public final class LoomSupport { + private static final boolean SUPPORTED; + private static Throwable FAILURE; + + private static final MethodHandle SCHEDULER; + + static { + boolean sup; + MethodHandle scheduler; + try { + // this is required to override the default scheduler + MethodHandles.Lookup lookup = MethodHandles.lookup(); + Field schedulerField = Class.forName("java.lang.ThreadBuilders$VirtualThreadBuilder") + .getDeclaredField("scheduler"); + schedulerField.setAccessible(true); + scheduler = lookup.unreflectSetter(schedulerField); + + // this is to make sure we fail earlier! + var builder = Thread.ofVirtual(); + scheduler.invoke(builder, new Executor() { + @Override + public void execute(Runnable command) { + + } + }); + + FAILURE = null; + + sup = true; + } catch (Throwable e) { + scheduler = null; + sup = false; + FAILURE = e; + } + + SCHEDULER = scheduler; + SUPPORTED = sup; + } + + private LoomSupport() { + } + + public static boolean isSupported() { + return SUPPORTED; + } + + public static void checkSupported() { + if (!isSupported()) { + throw new UnsupportedOperationException(FAILURE); + } + } + + + public static Thread.Builder.OfVirtual setVirtualThreadFactoryScheduler(Thread.Builder.OfVirtual builder, + Executor vthreadScheduler) { + checkSupported(); + try { + SCHEDULER.invoke(builder, vthreadScheduler); + return builder; + } catch (Throwable e) { + throw new RuntimeException(e); + } + } +} diff --git a/frameworks/Java/netty/src/main/java/hello/loom/MultithreadVirtualEventExecutorGroup.java b/frameworks/Java/netty/src/main/java/hello/loom/MultithreadVirtualEventExecutorGroup.java new file mode 100644 index 00000000000..bd099534700 --- /dev/null +++ b/frameworks/Java/netty/src/main/java/hello/loom/MultithreadVirtualEventExecutorGroup.java @@ -0,0 +1,56 @@ +package hello.loom; + +import java.util.IdentityHashMap; +import java.util.concurrent.Executor; +import java.util.concurrent.ThreadFactory; + +import io.netty.channel.IoEventLoop; +import io.netty.channel.IoHandlerFactory; +import io.netty.channel.MultiThreadIoEventLoopGroup; +import io.netty.util.concurrent.FastThreadLocal; +import io.netty.util.concurrent.FastThreadLocalThread; + +public class MultithreadVirtualEventExecutorGroup extends MultiThreadIoEventLoopGroup { + + public static final int RESUMED_CONTINUATIONS_EXPECTED_COUNT = Integer.getInteger("io.netty.loom.resumed.continuations", 1024); + private ThreadFactory threadFactory; + private IdentityHashMap schedulers; + private final FastThreadLocal v_thread_factory = new FastThreadLocal<>() { + @Override + protected ThreadFactory initialValue() { + var scheduler = schedulers.get(Thread.currentThread()); + if (scheduler == null) { + return null; + } + return LoomSupport.setVirtualThreadFactoryScheduler(Thread.ofVirtual(), scheduler).factory(); + } + }; + + public MultithreadVirtualEventExecutorGroup(int nThreads, IoHandlerFactory ioHandlerFactory) { + super(nThreads, (Executor) command -> { + throw new UnsupportedOperationException("this executor is not supposed to be used"); + }, ioHandlerFactory); + } + + public ThreadFactory eventLoopVirtualThreadFactory() { + if (!(Thread.currentThread() instanceof FastThreadLocalThread)) { + throw new IllegalStateException("this method should be called from event loop fast thread local threads"); + } + return v_thread_factory.get(); + } + + @Override + protected IoEventLoop newChild(Executor executor, IoHandlerFactory ioHandlerFactory, + @SuppressWarnings("unused") Object... args) { + if (threadFactory == null) { + threadFactory = newDefaultThreadFactory(); + } + var scheduler = new VirtualThreadNettyScheduler(this, threadFactory, ioHandlerFactory, RESUMED_CONTINUATIONS_EXPECTED_COUNT); + if (schedulers == null) { + schedulers = new IdentityHashMap<>(); + } + schedulers.put(scheduler.getCarrierThread(), scheduler); + return scheduler.ioEventLoop(); + } + +} diff --git a/frameworks/Java/netty/src/main/java/hello/loom/VirtualThreadHelloServerHandler.java b/frameworks/Java/netty/src/main/java/hello/loom/VirtualThreadHelloServerHandler.java new file mode 100644 index 00000000000..9b93088fe0f --- /dev/null +++ b/frameworks/Java/netty/src/main/java/hello/loom/VirtualThreadHelloServerHandler.java @@ -0,0 +1,46 @@ +package hello.loom; + +import java.util.ArrayDeque; +import java.util.concurrent.ScheduledExecutorService; + +import com.jsoniter.output.JsonStream; + +import hello.HelloServerHandler; +import hello.HttpResponses; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.util.AsciiString; + +public class VirtualThreadHelloServerHandler extends HelloServerHandler { + + private final ArrayDeque responses = new ArrayDeque<>(); + private final MultithreadVirtualEventExecutorGroup group; + + public VirtualThreadHelloServerHandler(ScheduledExecutorService service, MultithreadVirtualEventExecutorGroup group) { + super(service); + this.group = group; + } + + @Override + protected void writePlainResponse(ChannelHandlerContext ctx, AsciiString date) { + group.eventLoopVirtualThreadFactory().newThread(() -> { + responses.add(HttpResponses.makePlaintextResponse(date)); + }).start(); + } + + @Override + protected void writeJsonResponse(ChannelHandlerContext ctx, JsonStream stream, AsciiString date) { + group.eventLoopVirtualThreadFactory().newThread(() -> { + responses.add(HttpResponses.makeJsonResponse(stream, date)); + }).start(); + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) { + var responses = this.responses; + for (int i = 0, count = responses.size(); i < count; i++) { + ctx.write(responses.poll()); + } + ctx.flush(); + } +} diff --git a/frameworks/Java/netty/src/main/java/hello/loom/VirtualThreadNettyScheduler.java b/frameworks/Java/netty/src/main/java/hello/loom/VirtualThreadNettyScheduler.java new file mode 100644 index 00000000000..8a6f59ca346 --- /dev/null +++ b/frameworks/Java/netty/src/main/java/hello/loom/VirtualThreadNettyScheduler.java @@ -0,0 +1,95 @@ +package hello.loom; + +import java.util.concurrent.Executor; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; + +import io.netty.channel.IoEventLoopGroup; +import io.netty.channel.IoHandlerFactory; +import io.netty.channel.ManualIoEventLoop; +import io.netty.util.internal.shaded.org.jctools.queues.MpscUnboundedArrayQueue; + +public class VirtualThreadNettyScheduler implements Executor { + + private static final long MAX_WAIT_TASKS_NS = TimeUnit.SECONDS.toNanos(1); + private static final long MAX_RUN_CONTINUATIONS_NS = TimeUnit.SECONDS.toNanos(1); + + private final MpscUnboundedArrayQueue externalContinuations; + private final ManualIoEventLoop ioEventLoop; + private final Thread carrierThread; + + + public VirtualThreadNettyScheduler(IoEventLoopGroup parent, ThreadFactory threadFactory, IoHandlerFactory ioHandlerFactory, int resumedContinuationsExpectedCount) { + this.externalContinuations = new MpscUnboundedArrayQueue<>(resumedContinuationsExpectedCount); + this.carrierThread = threadFactory.newThread(this::internalRun); + this.ioEventLoop = new ManualIoEventLoop(parent, carrierThread, ioHandlerFactory); + // we can start the carrier only after all the fields are initialized + carrierThread.start(); + } + + public Thread getCarrierThread() { + return carrierThread; + } + + public ManualIoEventLoop ioEventLoop() { + return ioEventLoop; + } + + private void internalRun() { + var ioEventLoop = this.ioEventLoop; + while (!ioEventLoop.isShuttingDown()) { + // runnning I/O and async tasks within Netty without blocking + int workDone = ioEventLoop.runNow(); + workDone += runExternalContinuations(MAX_RUN_CONTINUATIONS_NS); + if (workDone == 0 && externalContinuations.isEmpty()) { + ioEventLoop.run(MAX_WAIT_TASKS_NS); + } + } + while (!ioEventLoop.isTerminated()) { + ioEventLoop.runNow(); + runExternalContinuations(MAX_RUN_CONTINUATIONS_NS); + } + while (!externalContinuations.isEmpty()) { + runExternalContinuations(MAX_RUN_CONTINUATIONS_NS); + } + } + + private int runExternalContinuations(long deadlineNs) { + final long startDrainingNs = System.nanoTime(); + int executed = 0; + for (; ; ) { + var continuation = this.externalContinuations.poll(); + if (continuation == null) { + break; + } + try { + continuation.run(); + } catch (Throwable t) { + // this shouldn't really happen + } + executed++; + long elapsedNs = System.nanoTime() - startDrainingNs; + if (elapsedNs >= deadlineNs) { + return executed; + } + } + return executed; + } + + @Override + public void execute(Runnable command) { + // TODO improve it using a reject handler? It's not too strict!? + if (ioEventLoop.isShuttingDown()) { + throw new RejectedExecutionException("event loop is shutting down"); + } + if (ioEventLoop.inEventLoop(Thread.currentThread())) { + command.run(); + } else { + externalContinuations.offer(command); + // wakeup won't happen if we're shutting down! + ioEventLoop.wakeup(); + } + } + +}