Skip to content

Commit

Permalink
Ensure all packets are handled in the tick after it was sent
Browse files Browse the repository at this point in the history
  • Loading branch information
Earthcomputer committed Jan 5, 2025
1 parent 2e944fb commit 707004e
Show file tree
Hide file tree
Showing 7 changed files with 315 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
/*
* Copyright (c) 2016, 2017, 2018, 2019 FabricMC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package net.fabricmc.fabric.impl.client.gametest;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import com.google.common.collect.ConcurrentHashMultiset;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import net.minecraft.util.Unit;
import net.minecraft.util.thread.ThreadExecutor;

public final class NetworkSynchronizer {
private static final Logger LOGGER = LoggerFactory.getLogger("fabric-client-gametest-api-v1");
private static final boolean DISABLED = System.getProperty("fabric.client.gametest.disableNetworkSynchronizer") != null;

private final ThreadLocal<Unit> isNettyThread = new ThreadLocal<>();
private final AtomicInteger inFlightPackets = new AtomicInteger();
private final ConcurrentHashMultiset<RunnableBox> mainThreadPacketHandlers = ConcurrentHashMultiset.create();
private final Lock morePacketsLock = new ReentrantLock();
private final Condition morePacketsCondition = morePacketsLock.newCondition();
private final AtomicBoolean invalid = new AtomicBoolean();
private boolean isRunningNetworkTasks = false;

public void preSendPacket() {
if (DISABLED) {
return;
}

inFlightPackets.incrementAndGet();
}

public void preNettyHandlePacket() {
if (DISABLED) {
return;
}

isNettyThread.set(Unit.INSTANCE);
}

public void postNettyHandlePacket() {
if (DISABLED) {
return;
}

int remainingInFlightPackets = inFlightPackets.decrementAndGet();

if (remainingInFlightPackets < 0) {
markInvalid();
return;
}

isNettyThread.remove();

if (remainingInFlightPackets == 0) {
signalMorePackets();
}
}

public void postTaskAdded(Runnable task) {
if (DISABLED) {
return;
}

if (isNettyThread.get() != null) {
mainThreadPacketHandlers.add(new RunnableBox(task));
signalMorePackets();
}
}

public void postTaskRun(Runnable task) {
if (DISABLED) {
return;
}

checkInvalid();
mainThreadPacketHandlers.remove(new RunnableBox(task));
}

public void waitForPacketHandlers(ThreadExecutor<?> executor) {
if (DISABLED) {
return;
}

while (inFlightPackets.get() > 0 || !mainThreadPacketHandlers.isEmpty()) {
while (inFlightPackets.get() > 0 && mainThreadPacketHandlers.isEmpty()) {
morePacketsLock.lock();

try {
if (!morePacketsCondition.await(10, TimeUnit.SECONDS)) {
markInvalid();
checkInvalid();
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
morePacketsLock.unlock();
}
}

isRunningNetworkTasks = true;

try {
executor.runTasks(mainThreadPacketHandlers::isEmpty);
} finally {
isRunningNetworkTasks = false;
}
}
}

public void reset() {
inFlightPackets.set(0);
mainThreadPacketHandlers.clear();
signalMorePackets();
}

public boolean isRunningNetworkTasks() {
return isRunningNetworkTasks;
}

private void signalMorePackets() {
morePacketsLock.lock();
morePacketsCondition.signal();
morePacketsLock.unlock();
}

private void markInvalid() {
if (!invalid.getAndSet(true)) {
LOGGER.error("Detected interfacing with packets at a lower level. Please disable network synchronization by setting the fabric.client.gametest.disableNetworkSynchronizer system property");
signalMorePackets();
}
}

private void checkInvalid() {
if (invalid.get()) {
throw new AssertionError("Network synchronizer in invalid state, see earlier log messages");
}
}

// Wraps a runnable to always use identity hashCode and equals
private record RunnableBox(Runnable runnable) {
@Override
public boolean equals(Object other) {
if (!(other instanceof RunnableBox(Runnable otherRunnable))) {
return false;
}

return otherRunnable == this.runnable;
}

@Override
public int hashCode() {
return System.identityHashCode(this.runnable);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
* inside a test phase task (which is a minor difference from vanilla), and then ensuring the client is still running
* the phase logic and is able to accept tasks while it is waiting for the server.
*/
// TODO: document network synchronizers here too ^
public final class ThreadingImpl {
private ThreadingImpl() {
}
Expand Down Expand Up @@ -100,10 +101,13 @@ private ThreadingImpl() {
@Nullable
public static Throwable testFailureException = null;

public static final NetworkSynchronizer CLIENTBOUND_SYNCHRONIZER = new NetworkSynchronizer();
public static final NetworkSynchronizer SERVERBOUND_SYNCHRONIZER = new NetworkSynchronizer();

@Nullable
public static Runnable taskToRun = null;

private static volatile boolean gameCrashed = false;
public static volatile boolean gameCrashed = false;

public static void enterPhase(int phase) {
while (enablePhases && (PHASER.getPhase() & PHASE_MASK) != phase) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright (c) 2016, 2017, 2018, 2019 FabricMC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package net.fabricmc.fabric.mixin.client.gametest;

import com.llamalad7.mixinextras.injector.wrapmethod.WrapMethod;
import com.llamalad7.mixinextras.injector.wrapoperation.Operation;
import io.netty.channel.ChannelHandlerContext;
import org.spongepowered.asm.mixin.Final;
import org.spongepowered.asm.mixin.Mixin;
import org.spongepowered.asm.mixin.Shadow;
import org.spongepowered.asm.mixin.injection.At;
import org.spongepowered.asm.mixin.injection.Inject;
import org.spongepowered.asm.mixin.injection.callback.CallbackInfo;

import net.minecraft.network.ClientConnection;
import net.minecraft.network.NetworkSide;
import net.minecraft.network.packet.Packet;

import net.fabricmc.fabric.impl.client.gametest.NetworkSynchronizer;
import net.fabricmc.fabric.impl.client.gametest.ThreadingImpl;

@Mixin(ClientConnection.class)
public class ClientConnectionMixin {
@Shadow
@Final
private NetworkSide side;

@WrapMethod(method = "channelRead0(Lio/netty/channel/ChannelHandlerContext;Lnet/minecraft/network/packet/Packet;)V")
private void onNettyReceivePacket(ChannelHandlerContext context, Packet<?> packet, Operation<Void> original) {
NetworkSynchronizer synchronizer = side == NetworkSide.CLIENTBOUND ? ThreadingImpl.CLIENTBOUND_SYNCHRONIZER : ThreadingImpl.SERVERBOUND_SYNCHRONIZER;
synchronizer.preNettyHandlePacket();

try {
original.call(context, packet);
} finally {
synchronizer.postNettyHandlePacket();
}
}

@Inject(method = "sendImmediately", at = @At("HEAD"))
private void onSendPacket(CallbackInfo ci) {
NetworkSynchronizer synchronizer = side == NetworkSide.CLIENTBOUND ? ThreadingImpl.SERVERBOUND_SYNCHRONIZER : ThreadingImpl.CLIENTBOUND_SYNCHRONIZER;
synchronizer.preSendPacket();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import net.minecraft.client.gui.screen.Screen;
import net.minecraft.resource.ResourcePackManager;
import net.minecraft.server.SaveLoader;
import net.minecraft.util.thread.ThreadExecutor;
import net.minecraft.world.level.storage.LevelStorage;

import net.fabricmc.fabric.impl.client.gametest.FabricClientGameTestRunner;
Expand Down Expand Up @@ -116,6 +117,7 @@ private void preRunTasksHook(CallbackInfo ci) {
private void postRunTasksHook(CallbackInfo ci, @Share("ticksPerFrame") LocalIntRef ticksPerFrame) {
// end our "merged" runTasks block if there is going to be a tick this frame
if (ticksPerFrame.get() > 0) {
ThreadingImpl.CLIENTBOUND_SYNCHRONIZER.waitForPacketHandlers((ThreadExecutor<?>) (Object) this);
postRunTasks();
inMergedRunTasksLoop = false;
}
Expand Down Expand Up @@ -146,6 +148,11 @@ private void deferDisconnect(Screen disconnectionScreen, boolean transferring, C
}
}

@Inject(method = "disconnect(Lnet/minecraft/client/gui/screen/Screen;Z)V", at = @At(value = "INVOKE", target = "Lnet/minecraft/client/MinecraftClient;cancelTasks()V"))
private void onDisconnectCancelTasks(CallbackInfo ci) {
ThreadingImpl.CLIENTBOUND_SYNCHRONIZER.reset();
}

@Inject(method = "disconnect(Lnet/minecraft/client/gui/screen/Screen;Z)V", at = @At(value = "INVOKE", target = "Lnet/minecraft/client/MinecraftClient;render(Z)V", shift = At.Shift.AFTER))
private void onDisconnectBusyWait(CallbackInfo ci) {
// give the server a chance to tick too
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@
import org.spongepowered.asm.mixin.injection.At;
import org.spongepowered.asm.mixin.injection.Inject;
import org.spongepowered.asm.mixin.injection.callback.CallbackInfo;
import org.spongepowered.asm.mixin.injection.callback.CallbackInfoReturnable;

import net.minecraft.client.MinecraftClient;
import net.minecraft.server.MinecraftServer;
import net.minecraft.util.thread.ThreadExecutor;

import net.fabricmc.fabric.impl.client.gametest.ThreadingImpl;

Expand Down Expand Up @@ -67,6 +69,8 @@ private void preRunTasks(CallbackInfo ci) {

@Inject(method = "runServer", at = @At(value = "INVOKE", target = "Lnet/minecraft/server/MinecraftServer;runTasksTillTickEnd()V", shift = At.Shift.AFTER))
private void postRunTasks(CallbackInfo ci) {
ThreadingImpl.SERVERBOUND_SYNCHRONIZER.waitForPacketHandlers((ThreadExecutor<?>) (Object) this);

ThreadingImpl.serverCanAcceptTasks = true;
ThreadingImpl.enterPhase(ThreadingImpl.PHASE_TEST);

Expand All @@ -89,10 +93,21 @@ private void postRunTasks(CallbackInfo ci) {
ThreadingImpl.enterPhase(ThreadingImpl.PHASE_TICK);
}

@Inject(method = "canExecute(Lnet/minecraft/server/ServerTask;)Z", at = @At("HEAD"), cancellable = true)
private void alwaysExecuteNetworkTask(CallbackInfoReturnable<Boolean> cir) {
if (ThreadingImpl.SERVERBOUND_SYNCHRONIZER.isRunningNetworkTasks()) {
cir.setReturnValue(true);
}
}

@Unique
private void deregisterServer() {
ThreadingImpl.serverCanAcceptTasks = false;
ThreadingImpl.PHASER.arriveAndDeregister();
ThreadingImpl.isServerRunning = false;

if (!ThreadingImpl.gameCrashed) {
ThreadingImpl.SERVERBOUND_SYNCHRONIZER.reset();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright (c) 2016, 2017, 2018, 2019 FabricMC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package net.fabricmc.fabric.mixin.client.gametest;

import org.spongepowered.asm.mixin.Mixin;
import org.spongepowered.asm.mixin.injection.At;
import org.spongepowered.asm.mixin.injection.Inject;
import org.spongepowered.asm.mixin.injection.callback.CallbackInfo;

import net.minecraft.client.MinecraftClient;
import net.minecraft.server.MinecraftServer;
import net.minecraft.util.thread.ThreadExecutor;

import net.fabricmc.fabric.impl.client.gametest.ThreadingImpl;

@Mixin(ThreadExecutor.class)
public class ThreadExecutorMixin {
@Inject(method = "send", at = @At(value = "INVOKE", target = "Ljava/util/Queue;add(Ljava/lang/Object;)Z", remap = false, shift = At.Shift.AFTER))
private void onPacketHandlerSchedule(Runnable task, CallbackInfo ci) {
switch ((Object) this) {
case MinecraftClient $ -> ThreadingImpl.CLIENTBOUND_SYNCHRONIZER.postTaskAdded(task);
case MinecraftServer $ -> ThreadingImpl.SERVERBOUND_SYNCHRONIZER.postTaskAdded(task);
default -> {
}
}
}

@Inject(method = "executeTask", at = @At(value = "INVOKE", target = "Ljava/lang/Runnable;run()V", remap = false, shift = At.Shift.AFTER))
private void onPacketHandlerRun(Runnable task, CallbackInfo ci) {
switch ((Object) this) {
case MinecraftClient $ -> ThreadingImpl.CLIENTBOUND_SYNCHRONIZER.postTaskRun(task);
case MinecraftServer $ -> ThreadingImpl.SERVERBOUND_SYNCHRONIZER.postTaskRun(task);
default -> {
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"package": "net.fabricmc.fabric.mixin.client.gametest",
"compatibilityLevel": "JAVA_21",
"mixins": [
"ClientConnectionMixin",
"CyclingButtonWidgetAccessor",
"GameOptionsAccessor",
"GameOptionsMixin",
Expand All @@ -15,6 +16,7 @@
"MouseAccessor",
"ScreenAccessor",
"ServerMainMixin",
"ThreadExecutorMixin",
"WindowMixin"
],
"plugin": "net.fabricmc.fabric.impl.client.gametest.ClientGameTestMixinConfigPlugin",
Expand Down

0 comments on commit 707004e

Please sign in to comment.