Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Performance comparison with Java, zig-aio #110

Open
mrjbq7 opened this issue Aug 21, 2024 · 5 comments
Open

Performance comparison with Java, zig-aio #110

mrjbq7 opened this issue Aug 21, 2024 · 5 comments

Comments

@mrjbq7
Copy link

mrjbq7 commented Aug 21, 2024

I was beginning to use libxev in Zig, and looked at the src/bench/ping-pongs.zig benchmark, and was comparing the performance results with Java. I found a possibly missing performance optimization, if the reader and writer are in the same process, on Linux.

On Linux (12th Gen Intel Core i7-1260P), the Java version is 50% faster.

$ cat /etc/lsb-release 
DISTRIB_ID=Ubuntu
DISTRIB_RELEASE=24.04
DISTRIB_CODENAME=noble
DISTRIB_DESCRIPTION="Ubuntu 24.04 LTS"

$ uname -a
Linux linuxmini 6.8.0-40-generic #40-Ubuntu SMP PREEMPT_DYNAMIC Fri Jul  5 10:34:03 UTC 2024 x86_64 x86_64 x86_64 GNU/Linux

$ ./zig-out/bench/ping-pongs 
info: 159884.93 roundtrips/s
info: 3.13 seconds total

$ javac PingPong.java

$ java PingPong
Finished 500000 pings
249625.56 roundtrips/s
2.00 seconds total

$ java -version
openjdk version "21.0.4" 2024-07-16
OpenJDK Runtime Environment (build 21.0.4+7-Ubuntu-1ubuntu224.04)
OpenJDK 64-Bit Server VM (build 21.0.4+7-Ubuntu-1ubuntu224.04, mixed mode, sharing)

On macOS (MacBookPro, M2 Max, 64 GB), the results are roughly the same between Zig and Java.

# macOS Sonoma 14.6.1

$ uname -a
Darwin red.local 23.6.0 Darwin Kernel Version 23.6.0: Mon Jul 29 21:13:04 PDT 2024; root:xnu-10063.141.2~1/RELEASE_ARM64_T6020 arm64

$ ./zig-out/bench/ping-pongs                              
info: 62632.37 roundtrips/s
info: 7.98 seconds total

$ javac PingPong.java

$ java PingPong
Finished 500000 pings
60240.96 roundtrips/s
8.30 seconds total

$ java -version
java version "22.0.2" 2024-07-16
Java(TM) SE Runtime Environment (build 22.0.2+9-70)
Java HotSpot(TM) 64-Bit Server VM (build 22.0.2+9-70, mixed mode, sharing)

Note: I also split apart the ping-pongs binary into a ping and a pong and implemented in both in Java and Zig, and the performance on both Linux and macOS are the same when running in different processes. Java is not faster, so it makes me think there might be a missing optimization when both read and write occur in the same process in the io_uring backend.

Here's the PingPong.java:

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.WritableByteChannel;
import java.util.Iterator;
import java.util.Set;

public class PingPong {

    static final int BUFFER_SIZE = 32 * 1024;

    static final int MAX_PINGS = 500_000;

    public interface SelectListener {

        public void onAccept(ServerSocketChannel channel);
        public void onConnect(SocketChannel channel);
        public void onRead(ReadableByteChannel channel);
        public void onWrite(WritableByteChannel channel);
    }

    public static void main(String[] args) throws Exception {

        long start = System.currentTimeMillis();

        final Selector selector = Selector.open();

        ServerSocketChannel server = ServerSocketChannel.open();
        server.configureBlocking(false);
        InetSocketAddress address = new InetSocketAddress("127.0.0.1", 3131);
        server.socket().bind(address);
        
        SelectionKey serverKey = server.keyFor(selector);
        if (serverKey == null) {
            serverKey = server.register(selector, SelectionKey.OP_ACCEPT);
        } else {
            serverKey.interestOps(SelectionKey.OP_ACCEPT);
        }

        SelectListener serverListener = new SelectListener() {

            ByteBuffer buffer = ByteBuffer.allocateDirect(BUFFER_SIZE);

            public void onAccept(ServerSocketChannel channel) {

                try {

                    SocketChannel client = channel.accept();
                    client.configureBlocking(false);
                    client.socket().setSendBufferSize(BUFFER_SIZE);
                    client.socket().setReceiveBufferSize(BUFFER_SIZE);
                    client.socket().setTcpNoDelay(true);

                    SelectionKey clientKey = client.keyFor(selector);
                    if (clientKey == null) {
                        clientKey = client.register(selector, SelectionKey.OP_READ);
                    } else {
                        clientKey.interestOps(SelectionKey.OP_READ);
                    }

                    clientKey.attach(this);
                    
                } catch (IOException e) {
                    e.printStackTrace();
                    System.exit(0);
                }
            }

            public void onConnect(SocketChannel channel) {

            }

            public void onRead(ReadableByteChannel channel) {

                try {
                    buffer.clear();
                    int read = channel.read(buffer);
                    if (read <= 0) {
                        channel.close();
                        return;
                    }

                    buffer.flip();
                    int write = ((WritableByteChannel) channel).write(buffer);
                    if (write < read) {
                        channel.close();
                        return;
                    }

                } catch (IOException e) {
                    e.printStackTrace();
                    System.exit(0);
                }
            }

            public void onWrite(WritableByteChannel channel) {

            }
        };
        serverKey.attach(serverListener);

        final SocketChannel client = SocketChannel.open();
        client.configureBlocking(false);
        client.socket().setSendBufferSize(BUFFER_SIZE);
        client.socket().setReceiveBufferSize(BUFFER_SIZE);
        client.socket().setTcpNoDelay(true);

        SelectListener clientListener = new SelectListener() {

            ByteBuffer ping = ByteBuffer.wrap(new byte[] { 'P', 'I', 'N', 'G' });
            ByteBuffer buffer = ByteBuffer.allocateDirect(BUFFER_SIZE);
            int state = 0;
            int pings = 0;

            public void onAccept(ServerSocketChannel channel) {

            }

            public void onConnect(SocketChannel channel) {

                try {
                    boolean finished = channel.finishConnect();
                    assert finished;

                    SelectionKey key = channel.keyFor(selector);
                    if (key == null) {
                        key = channel.register(selector, SelectionKey.OP_READ);
                    } else {
                        key.interestOps(SelectionKey.OP_READ);
                    }

                    ping.rewind();
                    int write = ((WritableByteChannel) channel).write(ping);
                    if (write < ping.limit()) {
                        channel.close();
                        return;
                    }

                } catch (IOException e) {
                    e.printStackTrace();
                    System.exit(0);
                }
            }

            public void onRead(ReadableByteChannel channel) {

                try {
                    buffer.clear();
                    int n = channel.read(buffer);
                    if (n <= 0) {
                        channel.close();
                        return;
                    }
                    state += n;
                    pings += state / ping.limit();
                    if (pings >= MAX_PINGS) {
                        System.out.printf("Finished %d pings\n", MAX_PINGS);
                        long end = System.currentTimeMillis();
                        double seconds = (double) (end - start) / 1000;
                        double perSecond = MAX_PINGS / seconds;
                        System.out.printf("%.2f roundtrips/s\n", perSecond);
                        System.out.printf("%.2f seconds total", seconds);
                        System.exit(0);
                    }

                    state = state % ping.limit();
                    
                    ping.rewind();
                    int write = ((WritableByteChannel) channel).write(ping);
                    if (write < ping.limit()) {
                        channel.close();
                        return;
                    }

                } catch (IOException e) {
                    e.printStackTrace();
                    System.exit(0);
                }
            }

            public void onWrite(WritableByteChannel channel) {

            }
        };

        SelectionKey clientKey = client.keyFor(selector);
        int ops = client.connect(address) ? SelectionKey.OP_READ : SelectionKey.OP_CONNECT;
        if (clientKey == null) {
            clientKey = client.register(selector, ops);
        } else {
            clientKey.interestOps(ops);
        }
        clientKey.attach(clientListener);

        while (true) {
            selector.select();
            Set<SelectionKey> keys = selector.selectedKeys();
            Iterator<SelectionKey> iter = keys.iterator();
            while (iter.hasNext()) {
                SelectionKey key = iter.next();
                SelectableChannel channel = key.channel();
                iter.remove();
                if (!key.isValid()) {
                    key.interestOps(0);
                    key.cancel();
                    continue;
                }

                SelectListener listener = (SelectListener) key.attachment();
                if (key.isReadable()) {
                    listener.onRead((ReadableByteChannel) channel);
                } else if (key.isWritable()) {
                    listener.onWrite((WritableByteChannel) channel);
                } else if (key.isAcceptable()) {
                    listener.onAccept((ServerSocketChannel) channel);
                } else if (key.isConnectable()) {
                    listener.onConnect((SocketChannel) channel);
                }
            }
        }
    }
}
@mrjbq7
Copy link
Author

mrjbq7 commented Aug 21, 2024

I am not entirely sure how the write callback works in Zig, to know if the Java version that just attempts a channel.write(bytes) is doing less work than Zig is doing in the io_uring backend. We would need to follow the implementations deeper to see if its a JVM io_uring thing, or if my PingPong.java is actually less work on Linux.

I believe, but am not entirely sure that Java's using epoll() on Linux.

If I build with the epoll backend on Linux:

diff --git a/src/main.zig b/src/main.zig
index 74f451a..1cfb61c 100644
--- a/src/main.zig
+++ b/src/main.zig
@@ -36,7 +36,7 @@ pub const Backend = enum {
     /// Returns a recommend default backend from inspecting the system.
     pub fn default() Backend {
         return @as(?Backend, switch (builtin.os.tag) {
-            .linux => .io_uring,
+            .linux => .epoll,
             .ios, .macos => .kqueue,
             .wasi => .wasi_poll,
             .windows => .iocp,

Performance is much slower than the io_uring backend.

$ ./zig-out/bench/ping-pongs 
info: 97655.99 roundtrips/s
info: 5.12 seconds total

And it is not the difference between the incremental self.state = (self.state + 1) % (PING.len), and the shortcut that I took in Java, because this is only a tiny bit faster:

diff --git a/src/bench/ping-pongs.zig b/src/bench/ping-pongs.zig
index fec07c0..153a92c 100644
--- a/src/bench/ping-pongs.zig
+++ b/src/bench/ping-pongs.zig
@@ -141,33 +141,25 @@ const Client = struct {
         l: *xev.Loop,
         c: *xev.Completion,
         socket: xev.TCP,
-        buf: xev.ReadBuffer,
+        _: xev.ReadBuffer,
         r: xev.TCP.ReadError!usize,
     ) xev.CallbackAction {
         const self = self_.?;
         const n = r catch unreachable;
-        const data = buf.slice[0..n];
-
-        // Count the number of pings in our message
-        var i: usize = 0;
-        while (i < n) : (i += 1) {
-            assert(data[i] == PING[self.state]);
-            self.state = (self.state + 1) % (PING.len);
-            if (self.state == 0) {
-                self.pongs += 1;
-
-                // If we're done then exit
-                if (self.pongs > 500_000) {
-                    socket.shutdown(l, c, Client, self, shutdownCallback);
-                    return .disarm;
-                }
-
-                // Send another ping
-                const c_ping = self.completion_pool.create() catch unreachable;
-                socket.write(l, c_ping, .{ .slice = PING[0..PING.len] }, Client, self, writeCallback);
-            }
+        self.state += n;
+        self.pongs += (self.state / PING.len);
+        self.state = (self.state % PING.len);
+
+        // If we're done then exit
+        if (self.pongs > 500_000) {
+            socket.shutdown(l, c, Client, self, shutdownCallback);
+            return .disarm;
         }
 
+        // Send another ping
+        const c_ping = self.completion_pool.create() catch unreachable;
+        socket.write(l, c_ping, .{ .slice = PING[0..PING.len] }, Client, self, writeCallback);
+
         // Read again
         return .rearm;
     }

@mrjbq7
Copy link
Author

mrjbq7 commented Aug 22, 2024

I also confirmed that it doesn't matter if Java looks at the readBuffer:

                    for (int i = 0; i < n; i++) {
                        byte ch = buffer.get();
                        assert ch == ping.get(state);
                        state = (state + 1) % ping.limit();
                        if (state == 0) {
                            pings += 1;
                            if (pings > MAX_PINGS) {
                                System.out.printf("Finished %d pings\n", MAX_PINGS);
                                long end = System.currentTimeMillis();
                                double seconds = (double) (end - start) / 1000;
                                double perSecond = MAX_PINGS / seconds;
                                System.out.printf("%.2f roundtrips/s\n", perSecond);
                                System.out.printf("%.2f seconds total", seconds);
                                System.exit(0);
                            }
                        }
                    }

@mrjbq7 mrjbq7 changed the title Performance comparison with Java Performance comparison with Java, zig-aio Aug 22, 2024
@mrjbq7
Copy link
Author

mrjbq7 commented Aug 22, 2024

I modified one of the examples in zig-aio to do a comparable single-process "ping-pongs" benchmark and it goes as fast as Java on Linux (50% faster than libxev), and uses a io_uring backend.

info: 254049.52 roundtrips/s
info: 1.97 seconds total

Here's the code for it:

const std = @import("std");
const aio = @import("aio");
const coro = @import("coro");
const log = std.log.scoped(.coro_aio);

pub const aio_options: aio.Options = .{
    .debug = false, // set to true to enable debug logs
};

pub const coro_options: coro.Options = .{
    .debug = false, // set to true to enable debug logs
};

pub const std_options: std.Options = .{
    .log_level = .debug,
};

fn server(startup: *coro.ResetEvent) !void {
    var socket: std.posix.socket_t = undefined;
    try coro.io.single(aio.Socket{
        .domain = std.posix.AF.INET,
        .flags = std.posix.SOCK.STREAM | std.posix.SOCK.CLOEXEC,
        .protocol = std.posix.IPPROTO.TCP,
        .out_socket = &socket,
    });

    const address = std.net.Address.initIp4(.{ 0, 0, 0, 0 }, 3131);
    try std.posix.setsockopt(socket, std.posix.SOL.SOCKET, std.posix.SO.REUSEADDR, &std.mem.toBytes(@as(c_int, 1)));
    if (@hasDecl(std.posix.SO, "REUSEPORT")) {
        try std.posix.setsockopt(socket, std.posix.SOL.SOCKET, std.posix.SO.REUSEPORT, &std.mem.toBytes(@as(c_int, 1)));
    }
    try std.posix.bind(socket, &address.any, address.getOsSockLen());
    try std.posix.listen(socket, 128);

    startup.set();

    var client_sock: std.posix.socket_t = undefined;
    try coro.io.single(aio.Accept{ .socket = socket, .out_socket = &client_sock });

    var buf: [1024]u8 = undefined;
    var len: usize = 0;
    while (true) {
        try coro.io.single(aio.Recv{ .socket = client_sock, .buffer = &buf, .out_read = &len });
        try coro.io.single(aio.Send{ .socket = client_sock, .buffer = buf[0..len], .link = .soft });
    }

    try coro.io.multi(.{
        aio.CloseSocket{ .socket = client_sock, .link = .soft },
        aio.CloseSocket{ .socket = socket },
    });
}

fn client(startup: *coro.ResetEvent) !void {
    var socket: std.posix.socket_t = undefined;
    try coro.io.single(aio.Socket{
        .domain = std.posix.AF.INET,
        .flags = std.posix.SOCK.STREAM | std.posix.SOCK.CLOEXEC,
        .protocol = std.posix.IPPROTO.TCP,
        .out_socket = &socket,
    });

    try startup.wait();

    const address = std.net.Address.initIp4(.{ 127, 0, 0, 1 }, 3131);
    try coro.io.single(aio.Connect{
        .socket = socket,
        .addr = &address.any,
        .addrlen = address.getOsSockLen(),
    });

    const start_time = try std.time.Instant.now();

    var state: usize = 0;
    var pongs: u64 = 0;

    while (true) {
        var buf: [1024]u8 = undefined;
        var len: usize = 0;
        try coro.io.single(aio.Send{ .socket = socket, .buffer = "PING" });
        try coro.io.single(aio.Recv{ .socket = socket, .buffer = &buf, .out_read = &len });

        state += len;
        pongs += (state / 4);
        state = (state % 4);

        // If we're done then exit
        if (pongs > 500_000) {
            break;
        }
    }

    const end_time = try std.time.Instant.now();

    const elapsed = @as(f64, @floatFromInt(end_time.since(start_time)));
    std.log.info("{d:.2} roundtrips/s", .{@as(f64, @floatFromInt(pongs)) / (elapsed / 1e9)});
    std.log.info("{d:.2} seconds total", .{elapsed / 1e9});

    try coro.io.single(aio.CloseSocket{ .socket = socket });
}

pub fn main() !void {
    // var mem: [4096 * 1024]u8 = undefined;
    // var fba = std.heap.FixedBufferAllocator.init(&mem);
    var gpa: std.heap.GeneralPurposeAllocator(.{}) = .{};
    defer _ = gpa.deinit();
    var scheduler = try coro.Scheduler.init(gpa.allocator(), .{});
    defer scheduler.deinit();
    var startup: coro.ResetEvent = .{};
    _ = try scheduler.spawn(client, .{&startup}, .{});
    _ = try scheduler.spawn(server, .{&startup}, .{});
    try scheduler.run(.wait);
}

@mrjbq7
Copy link
Author

mrjbq7 commented Aug 22, 2024

It's possible the io_uring backend is just not optimal, rather than it being related to being in a missing optimization in a single process, it could just be the reduced latency exposes some underlying issue in the backend.

@mrjbq7
Copy link
Author

mrjbq7 commented Aug 23, 2024

I believe the libxev caches the current time each loop, so I added that to the Java version, but that only reduced its performance by 1-2%.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant