Skip to content

Commit

Permalink
Properly handle latency packet as in ProxyBatchBridge.
Browse files Browse the repository at this point in the history
  • Loading branch information
larryTheCoder committed Jul 2, 2023
1 parent 9a26eb7 commit 4fe29a3
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ protected void encode(ChannelHandlerContext ctx, BedrockBatchWrapper msg, List<O

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf compressed, List<Object> out) {
BedrockBatchWrapper msg = BedrockBatchWrapper.newInstance(compressed.readRetainedSlice(compressed.readableBytes()), null);
BedrockBatchWrapper msg = BedrockBatchWrapper.newInstance(compressed.retain(), null);

try {
msg.setAlgorithm(connection.getPlayer().getCompression());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@
import dev.waterdog.waterdogpe.network.connection.codec.BedrockBatchWrapper;
import dev.waterdog.waterdogpe.network.connection.codec.compression.CompressionAlgorithm;
import dev.waterdog.waterdogpe.network.connection.codec.packet.BedrockPacketCodec;
import dev.waterdog.waterdogpe.network.protocol.handler.ProxyBatchBridge;
import dev.waterdog.waterdogpe.network.serverinfo.ServerInfo;
import dev.waterdog.waterdogpe.player.ProxiedPlayer;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import lombok.NonNull;
import lombok.extern.log4j.Log4j2;
import org.cloudburstmc.protocol.bedrock.codec.BedrockCodec;
import org.cloudburstmc.protocol.bedrock.codec.BedrockCodecHelper;
Expand All @@ -21,6 +23,7 @@
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
Expand Down Expand Up @@ -51,7 +54,7 @@ public TransportClientConnection(ProxiedPlayer player, ServerInfo serverInfo, Ch
this.channel = channel;
this.channel.closeFuture().addListener(future -> cleanActiveChannels());

scheduledTasks.add(focusedResetTimer.scheduleAtFixedRate(this::sendAckPing, PING_CYCLE_TIME, PING_CYCLE_TIME, TimeUnit.SECONDS));
scheduledTasks.add(focusedResetTimer.scheduleAtFixedRate(this::sendAcknowledge, PING_CYCLE_TIME, PING_CYCLE_TIME, TimeUnit.SECONDS));
scheduledTasks.add(focusedResetTimer.scheduleAtFixedRate(() -> packetSendingLimit.set(0), 1, 1, TimeUnit.SECONDS));
}

Expand All @@ -75,18 +78,29 @@ public void cleanActiveChannels() {

@Override
protected void channelRead0(ChannelHandlerContext ctx, BedrockBatchWrapper batch) {
for (var wrapper : batch.getPackets()) {
var packet = decodePacket(wrapper);
if (getPacketHandler() instanceof ProxyBatchBridge) {
onBedrockBatch(batch);
}

if (packet != null && packet.getTimestamp() == 0) {
batch.getPackets().remove(wrapper);
wrapper.release();
super.channelRead0(ctx, batch);
}

recvAckPing();
private void onBedrockBatch(@NonNull BedrockBatchWrapper batch) {
ListIterator<BedrockPacketWrapper> iterator = batch.getPackets().listIterator();
while (iterator.hasNext()) {
BedrockPacketWrapper wrapper = iterator.next();
if (wrapper.getPacket() == null) {
this.decodePacket(wrapper);
}
}

super.channelRead0(ctx, batch);
if (wrapper.getPacket() instanceof NetworkStackLatencyPacket packet && packet.getTimestamp() == 0) {
iterator.remove(); // remove from batch
wrapper.release(); // release
batch.modify();

receiveAcknowledge();
}
}
}

@Override
Expand Down Expand Up @@ -131,7 +145,7 @@ public long getPing() {
return latency;
}

public void sendAckPing() {
public void sendAcknowledge() {
var connection = getPlayer().getDownstreamConnection();
if (connection instanceof TransportClientConnection && connection.getServerInfo().getServerName().equalsIgnoreCase(getServerInfo().getServerName())) {
NetworkStackLatencyPacket packet = new NetworkStackLatencyPacket();
Expand All @@ -144,7 +158,7 @@ public void sendAckPing() {
}
}

public void recvAckPing() {
private void receiveAcknowledge() {
latency = (System.currentTimeMillis() - lastPingTimestamp) / 2;

TickSyncPacket latencyPacket = new TickSyncPacket();
Expand All @@ -154,24 +168,19 @@ public void recvAckPing() {
sendPacket(latencyPacket);
}

private NetworkStackLatencyPacket decodePacket(BedrockPacketWrapper wrapper) {
private void decodePacket(BedrockPacketWrapper wrapper) {
BedrockCodec codec = channel.pipeline().get(BedrockPacketCodec.class).getCodec();
BedrockCodecHelper codecHelper = channel.pipeline().get(BedrockPacketCodec.class).getHelper();
BedrockCodecHelper helper = channel.pipeline().get(BedrockPacketCodec.class).getHelper();

ByteBuf msg = wrapper.getPacketBuffer().retainedSlice();
try {
msg.skipBytes(wrapper.getHeaderLength()); // skip header

var definition = codec.getPacketDefinition(wrapper.getPacketId());
if (definition != null && definition.getFactory().get() instanceof NetworkStackLatencyPacket) {
return (NetworkStackLatencyPacket) codec.tryDecode(codecHelper, msg, wrapper.getPacketId());
}
wrapper.setPacket(codec.tryDecode(helper, msg, wrapper.getPacketId()));
} catch (Throwable t) {
log.warn("Failed to decode packet", t);
throw t;
} finally {
msg.release();
}

return null;
}
}

0 comments on commit 4fe29a3

Please sign in to comment.