Skip to content

Commit

Permalink
feat: v2
Browse files Browse the repository at this point in the history
  • Loading branch information
TobiasGrether committed Mar 14, 2023
1 parent f519141 commit 1bd5063
Show file tree
Hide file tree
Showing 18 changed files with 269 additions and 854 deletions.
6 changes: 3 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>org.nethergames.proxytransport</groupId>
<artifactId>proxy-transport</artifactId>
<version>1.4.2-SNAPSHOT</version>
<version>1.5.0-SNAPSHOT</version>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
Expand Down Expand Up @@ -34,12 +34,12 @@
<dependency>
<groupId>com.github.luben</groupId>
<artifactId>zstd-jni</artifactId>
<version>1.5.2-1</version>
<version>1.5.4-2</version>
</dependency>
<dependency>
<groupId>dev.waterdog.waterdogpe</groupId>
<artifactId>waterdog</artifactId>
<version>1.2.2-SNAPSHOT</version>
<version>2.0.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
</dependencies>
Expand Down
16 changes: 3 additions & 13 deletions src/main/java/org/nethergames/proxytransport/ProxyTransport.java
Original file line number Diff line number Diff line change
@@ -1,28 +1,18 @@
package org.nethergames.proxytransport;

import dev.waterdog.waterdogpe.plugin.Plugin;
import org.nethergames.proxytransport.impl.event.NOOPEventAdapter;
import org.nethergames.proxytransport.impl.event.TransportEventAdapter;
import org.nethergames.proxytransport.integration.CustomTransportServerInfo;

public class ProxyTransport extends Plugin {
private static TransportEventAdapter eventAdapter = new NOOPEventAdapter();

@Override
public void onStartup() {
getProxy().getServerInfoMap().removeServerInfoType(CustomTransportServerInfo.TYPE);
getProxy().getServerInfoMap().registerServerInfoFactory(CustomTransportServerInfo.TYPE, CustomTransportServerInfo::new);
getLogger().info("ProxyTransport was started.");
getLogger().info("Registered type with name {}", CustomTransportServerInfo.TYPE.getIdentifier());
}

@Override
public void onEnable() {
}

public static TransportEventAdapter getEventAdapter() {
return eventAdapter;
}

public static void setEventAdapter(TransportEventAdapter eventAdapter) {
ProxyTransport.eventAdapter = eventAdapter;
getLogger().info("ProxyTransport was enabled.");
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package org.nethergames.proxytransport.decoder;

import dev.waterdog.waterdogpe.network.connection.client.ClientConnection;
import dev.waterdog.waterdogpe.network.connection.codec.BedrockBatchWrapper;
import dev.waterdog.waterdogpe.network.connection.codec.compression.SnappyCompressionCodec;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.MessageToMessageDecoder;
import lombok.RequiredArgsConstructor;
import org.cloudburstmc.protocol.common.util.Zlib;

import java.util.List;

@RequiredArgsConstructor
public class PartialDecompressor extends MessageToMessageDecoder<ByteBuf> {
public static final String NAME = "decompress";
private final ClientConnection connection;
private static final SnappyCompressionCodec snappyCompressionCodec = new SnappyCompressionCodec();


@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf compressed, List<Object> list) throws Exception {
ByteBuf decompressed = null;

compressed.markReaderIndex();
switch (connection.getPlayer().getCompression().getBedrockAlgorithm()){
case ZLIB -> decompressed = Zlib.RAW.inflate(compressed, 4 * 1024 * 1024);
case SNAPPY -> decompressed = snappyCompressionCodec.decode0(channelHandlerContext, compressed);
}

if(decompressed == null){
throw new UnsupportedOperationException("The given compression algorithm is not supported by ProxyTransport");
}

compressed.resetReaderIndex();
list.add(BedrockBatchWrapper.newInstance(compressed.retain(), decompressed));
}
}
Original file line number Diff line number Diff line change
@@ -1,20 +1,52 @@
package org.nethergames.proxytransport.encoder;

import dev.waterdog.waterdogpe.ProxyServer;
import dev.waterdog.waterdogpe.network.connection.client.ClientConnection;
import dev.waterdog.waterdogpe.network.connection.codec.BedrockBatchWrapper;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.CompositeByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import org.nethergames.proxytransport.wrapper.DataPack;
import io.netty.handler.codec.MessageToMessageEncoder;
import lombok.RequiredArgsConstructor;
import org.nethergames.proxytransport.utils.CompressionType;

public class DataPackEncoder extends MessageToByteEncoder<DataPack> {
import java.util.List;

@RequiredArgsConstructor
public class DataPackEncoder extends MessageToMessageEncoder<BedrockBatchWrapper> {
public static final String NAME = "data-pack-encoder";
private final ClientConnection clientConnection;

@Override
protected void encode(ChannelHandlerContext channelHandlerContext, DataPack prefixedDataPack, ByteBuf byteBuf) {
protected void encode(ChannelHandlerContext channelHandlerContext, BedrockBatchWrapper wrapper, List<Object> out) {
ByteBuf buf = ByteBufAllocator.DEFAULT.ioBuffer();
try {
byteBuf.writeByte(prefixedDataPack.getCompressionType().ordinal());
byteBuf.writeBytes(prefixedDataPack.getContainingBuffer());
} finally {
prefixedDataPack.getContainingBuffer().release();
// The batch was modified or the compression types mismatch
if (wrapper.isModified() && wrapper.getUncompressed() != null || (wrapper.getAlgorithm() != clientConnection.getPlayer().getCompression())) {

buf.writeByte(CompressionType.METHOD_ZSTD.ordinal());
ByteBuf source = wrapper.getUncompressed();

if (!source.isDirect() || source instanceof CompositeByteBuf) {
// ZStd-jni needs direct buffers to function properly
// Composite Buffers or indirect buffers will not generate valid NIO ByteBuffers

source = channelHandlerContext.alloc().ioBuffer(source.readableBytes());
source.writeBytes(source);
}

ByteBuf compressed = ZStdEncoder.compress(source);
buf.writeBytes(compressed);
} else if (!wrapper.isModified() && wrapper.getCompressed() != null) { // The batch is already compressed correctly and we can yeet the buffer straight to the server
buf.writeByte(CompressionType.METHOD_ZLIB.ordinal());
buf.writeBytes(wrapper.getCompressed());
}
} catch (Throwable t) {
ProxyServer.getInstance().getLogger().error("Error in DataPack Encoding", t);
}

out.add(buf);

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public static ByteBuf compress(ByteBuf buf) {
} else {
ByteBuffer compressedNio = compressed.nioBuffer(0, compressedSize);
ByteBuffer decompressedNio = buf.nioBuffer(buf.readerIndex(), buf.readableBytes());

compressedSize = Zstd.compress(compressedNio, decompressedNio, 3);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package org.nethergames.proxytransport.impl;

import dev.waterdog.waterdogpe.network.NetworkMetrics;
import dev.waterdog.waterdogpe.network.PacketDirection;
import dev.waterdog.waterdogpe.network.connection.client.ClientConnection;
import dev.waterdog.waterdogpe.network.connection.codec.batch.BedrockBatchDecoder;
import dev.waterdog.waterdogpe.network.connection.codec.batch.BedrockBatchEncoder;
import dev.waterdog.waterdogpe.network.connection.codec.client.ClientEventHandler;
import dev.waterdog.waterdogpe.network.connection.codec.compression.CompressionAlgorithm;
import dev.waterdog.waterdogpe.network.connection.codec.packet.BedrockPacketCodec;
import dev.waterdog.waterdogpe.network.serverinfo.ServerInfo;
import dev.waterdog.waterdogpe.player.ProxiedPlayer;
import io.netty.channel.*;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.util.concurrent.Promise;
import lombok.RequiredArgsConstructor;
import org.cloudburstmc.netty.channel.raknet.RakChannel;
import org.cloudburstmc.netty.channel.raknet.config.RakChannelOption;
import org.cloudburstmc.netty.channel.raknet.config.RakMetrics;
import org.nethergames.proxytransport.decoder.PartialDecompressor;
import org.nethergames.proxytransport.encoder.DataPackEncoder;

import static dev.waterdog.waterdogpe.network.connection.codec.initializer.ProxiedSessionInitializer.BATCH_DECODER;
import static dev.waterdog.waterdogpe.network.connection.codec.initializer.ProxiedSessionInitializer.getPacketCodec;

public class TransportChannelInitializer extends ChannelInitializer<Channel> {
private final ProxiedPlayer player;
private final ServerInfo serverInfo;
private final Promise<ClientConnection> promise;
private static final String FRAME_DECODER = "frame-decoder";
private static final String FRAME_ENCODER = "frame-encoder";

public TransportChannelInitializer(ProxiedPlayer player, ServerInfo serverInfo, Promise<ClientConnection> promise) {
this.player = player;
this.serverInfo = serverInfo;
this.promise = promise;
}

@Override
protected void initChannel(Channel channel) {
int rakVersion = this.player.getProtocol().getRaknetVersion();

channel.attr(PacketDirection.ATTRIBUTE).set(PacketDirection.FROM_SERVER);

NetworkMetrics metrics = this.player.getProxy().getNetworkMetrics();
if (metrics != null) {
channel.attr(NetworkMetrics.ATTRIBUTE).set(metrics);
}

if (metrics instanceof RakMetrics rakMetrics && channel instanceof RakChannel) {
channel.config().setOption(RakChannelOption.RAK_METRICS, rakMetrics);
}

channel.pipeline()
.addLast(FRAME_DECODER, new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4))
.addLast(FRAME_ENCODER, new LengthFieldPrepender(4));


ClientConnection connection = this.createConnection(channel);
channel.pipeline()
.addLast(DataPackEncoder.NAME, new DataPackEncoder(connection))
.addLast(PartialDecompressor.NAME, new PartialDecompressor(connection))
.addLast(BedrockBatchDecoder.NAME, BATCH_DECODER)
.addLast(BedrockBatchEncoder.NAME, new BedrockBatchEncoder())
.addLast(BedrockPacketCodec.NAME, getPacketCodec(rakVersion));

if (connection instanceof ChannelHandler handler) { // For reference: This will take care of the packets received being handled.
channel.pipeline().addLast(ClientConnection.NAME, handler);
}

channel.pipeline()
.addLast(ClientEventHandler.NAME, new ClientEventHandler(this.player, connection))
.addLast(new TransportChannelInitializer.ChannelActiveHandler(connection, this.promise));
}

protected ClientConnection createConnection(Channel channel) {
return new TransportClientConnection(player, serverInfo, channel);
}

@RequiredArgsConstructor
private static class ChannelActiveHandler extends ChannelInboundHandlerAdapter {
private final ClientConnection connection;
private final Promise<ClientConnection> promise;

@Override
public void channelActive(ChannelHandlerContext ctx) {
this.promise.trySuccess(this.connection);
ctx.channel().pipeline().remove(this);
}
}
}
Loading

0 comments on commit 1bd5063

Please sign in to comment.