diff --git a/modules/vistas-server/.tool-versions b/modules/vistas-server/.tool-versions
new file mode 100644
index 0000000000..b1e51edda4
--- /dev/null
+++ b/modules/vistas-server/.tool-versions
@@ -0,0 +1 @@
+java adoptopenjdk-8.0.332+9
diff --git a/modules/vistas-server/dependency-reduced-pom.xml b/modules/vistas-server/dependency-reduced-pom.xml
index 3cb2562848..ab3662c8ea 100644
--- a/modules/vistas-server/dependency-reduced-pom.xml
+++ b/modules/vistas-server/dependency-reduced-pom.xml
@@ -115,29 +115,7 @@
${project.basedir}/target/classes
-
-
-
- org.jacoco
- jacoco-maven-plugin
- 0.8.8
-
-
-
- prepare-agent
-
-
-
- report
- test
-
- report
-
-
-
-
- ${basedir}/target/coverage-reports/jacoco-unit.exec
- ${basedir}/target/coverage-reports/jacoco-unit.exec
+ -Xmx1024m -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/home/mats/Desktop
diff --git a/modules/vistas-server/pom.xml b/modules/vistas-server/pom.xml
index 2560ee6fa2..92ad5b0093 100644
--- a/modules/vistas-server/pom.xml
+++ b/modules/vistas-server/pom.xml
@@ -130,32 +130,9 @@
${project.basedir}/target/classes
+ -Xmx1024m -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/home/mats/Desktop
-
- org.jacoco
- jacoco-maven-plugin
- 0.8.8
-
- ${basedir}/target/coverage-reports/jacoco-unit.exec
- ${basedir}/target/coverage-reports/jacoco-unit.exec
-
-
-
-
- prepare-agent
-
-
-
-
- report
- test
-
- report
-
-
-
-
org.apache.maven.plugins
maven-shade-plugin
@@ -223,7 +200,7 @@
com.coreoz
wisp
- 2.2.1
+ 2.4.0
compile
@@ -237,4 +214,4 @@
-
\ No newline at end of file
+
diff --git a/modules/vistas-server/src/main/java/com/craftmend/vistas/server/base/VistasScheduler.java b/modules/vistas-server/src/main/java/com/craftmend/vistas/server/base/VistasScheduler.java
index 6a6ca59e0d..aae7fe147f 100644
--- a/modules/vistas-server/src/main/java/com/craftmend/vistas/server/base/VistasScheduler.java
+++ b/modules/vistas-server/src/main/java/com/craftmend/vistas/server/base/VistasScheduler.java
@@ -1,22 +1,33 @@
package com.craftmend.vistas.server.base;
+import com.coreoz.wisp.JobStatus;
import com.coreoz.wisp.Scheduler;
import com.coreoz.wisp.schedule.Schedules;
import com.craftmend.openaudiomc.generic.platform.interfaces.TaskService;
import com.craftmend.openaudiomc.generic.service.Service;
import java.time.Duration;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.UUID;
public class VistasScheduler extends Service implements TaskService {
- private Scheduler scheduler;
+ private final Scheduler scheduler;
private int taskCount = 0;
- private List runningTasks = new ArrayList<>();
public VistasScheduler() {
scheduler = new Scheduler();
+
+ scheduler.schedule(
+ "Terminated jobs cleaner",
+ () -> scheduler
+ .jobStatus()
+ .stream()
+ .filter(job -> job.status() == JobStatus.DONE)
+ // Clean only jobs that have finished executing since at least 10 seconds
+ .filter(job -> job.lastExecutionEndedTimeInMillis() < (System.currentTimeMillis() - 10000))
+ .forEach(job -> scheduler.remove(job.name())),
+ Schedules.fixedDelaySchedule(Duration.ofSeconds(5))
+ );
}
@Override
@@ -25,29 +36,12 @@ public int scheduleAsyncRepeatingTask(Runnable runnable, int delayUntilFirst, in
int intervalMs = tickInterval * 50;
int currentTask = taskCount++;
- runningTasks.add(currentTask);
- WrappedRunnable handler = new WrappedRunnable();
-
- handler.setTask(() -> {
- if (isCancelled(currentTask)) {
- runningTasks.removeIf(task -> task == currentTask);
- return;
- }
-
- runnable.run();
-
- scheduler.schedule(
- () -> {
- handler.getTask().run();
- },
- Schedules.executeOnce(Schedules.fixedDelaySchedule(Duration.ofMillis(intervalMs)))
- );
-
- });
-
scheduler.schedule(
- handler.getTask(),
- Schedules.executeOnce(Schedules.fixedDelaySchedule(Duration.ofMillis(delayMs)))
+ currentTask + "",
+ () -> {
+ runnable.run();
+ },
+ Schedules.fixedDelaySchedule(Duration.ofMillis(intervalMs))
);
return currentTask;
@@ -62,16 +56,11 @@ public int scheduleSyncRepeatingTask(Runnable runnable, int delayUntilFirst, int
public int schduleSyncDelayedTask(Runnable runnable, int delay) {
int delayMs = delay * 50;
int currentTask = taskCount++;
- runningTasks.add(currentTask);
scheduler.schedule(
+ currentTask + "",
() -> {
- if (isCancelled(currentTask)) {
- runningTasks.removeIf(task -> task == currentTask);
- return;
- }
runnable.run();
- runningTasks.removeIf(task -> task == currentTask);
},
Schedules.executeOnce(Schedules.fixedDelaySchedule(Duration.ofMillis(delayMs)))
);
@@ -79,18 +68,15 @@ public int schduleSyncDelayedTask(Runnable runnable, int delay) {
return currentTask;
}
- public boolean isCancelled(int task) {
- return !runningTasks.contains(task);
- }
-
@Override
public void cancelRepeatingTask(int i) {
- runningTasks.removeIf(task -> task == i);
+ scheduler.cancel(i + "");
}
@Override
public void runAsync(Runnable runnable) {
scheduler.schedule(
+ UUID.randomUUID().toString(),
() -> {
runnable.run();
},
@@ -101,6 +87,7 @@ public void runAsync(Runnable runnable) {
@Override
public void runSync(Runnable runnable) {
scheduler.schedule(
+ UUID.randomUUID().toString(),
runnable,
Schedules.executeOnce(Schedules.fixedDelaySchedule(Duration.ofMillis(1)))
);
diff --git a/modules/vistas-server/src/test/java/vistas/test/TestServer.java b/modules/vistas-server/src/test/java/vistas/test/TestServer.java
index 4b77b05179..fa574babce 100644
--- a/modules/vistas-server/src/test/java/vistas/test/TestServer.java
+++ b/modules/vistas-server/src/test/java/vistas/test/TestServer.java
@@ -1,12 +1,17 @@
package vistas.test;
import com.craftmend.openaudiomc.OpenAudioMc;
+import com.craftmend.openaudiomc.generic.client.helpers.ClientRtcLocationUpdate;
+import com.craftmend.openaudiomc.generic.client.objects.ClientConnection;
+import com.craftmend.openaudiomc.generic.networking.packets.client.voice.PacketClientUpdateVoiceLocations;
+import com.craftmend.openaudiomc.generic.networking.payloads.client.voice.ClientVoiceUpdatePeerLocationsPayload;
+import com.craftmend.openaudiomc.generic.node.packets.ForwardSocketPacket;
import com.craftmend.openaudiomc.generic.oac.OpenaudioAccountService;
import com.craftmend.openaudiomc.generic.logging.OpenAudioLogger;
import com.craftmend.openaudiomc.generic.proxy.interfaces.UserHooks;
-import com.craftmend.openaudiomc.vistas.client.redis.packets.ServerRegisterPacket;
-import com.craftmend.openaudiomc.vistas.client.redis.packets.UserJoinPacket;
-import com.craftmend.openaudiomc.vistas.client.redis.packets.UserLeavePacket;
+import com.craftmend.openaudiomc.vistas.client.Vistas;
+import com.craftmend.openaudiomc.vistas.client.client.VistasRedisClient;
+import com.craftmend.openaudiomc.vistas.client.redis.packets.*;
import com.craftmend.openaudiomc.vistas.client.server.networking.VistasRedisServer;
import com.craftmend.openaudiomc.vistas.client.users.ServerUserHooks;
import com.craftmend.vistas.server.VistasServer;
@@ -20,6 +25,8 @@
import vistas.test.utils.Waiter;
import java.net.Socket;
+import java.util.HashSet;
+import java.util.Set;
import java.util.UUID;
public class TestServer extends TestCase {
@@ -68,6 +75,7 @@ public void testFullStack() {
// fake register a minecraft server
UUID fakeServer1 = UUID.randomUUID();
UUID fakeServer2 = UUID.randomUUID();
+
OpenAudioMc.getService(VistasRedisServer.class).getPacketEvents().handlePacket(null, new ServerRegisterPacket(fakeServer1));
OpenAudioMc.getService(VistasRedisServer.class).getPacketEvents().handlePacket(null, new ServerRegisterPacket(fakeServer2));
@@ -151,6 +159,70 @@ public void testFullStack() {
assertEquals(0, OpenAudioMc.resolveDependency(ServerUserHooks.class).getRemoteUsers().size());
assertEquals(0, OpenAudioMc.resolveDependency(ServerUserHooks.class).getRemoteUsers().size());
+ startLoggingMemory();
+ startStressTesting();
+ }
+
+ private void startStressTesting() {
+ // crate server
+ UUID fakeServer1 = UUID.randomUUID();
+ OpenAudioMc.getService(VistasRedisServer.class).getPacketEvents().handlePacket(null, new ServerRegisterPacket(fakeServer1));
+
+ Set updates = new HashSet<>();
+
+ // loop 50 times
+ for (int i = 0; i < 50; i++) {
+ ClientRtcLocationUpdate update = new ClientRtcLocationUpdate("RandomStreamKey" + i, 1, 2, 3, 4);
+ updates.add(update);
+ }
+
+ TempUser tempUser = new TempUser(UUID.randomUUID().toString(), UUID.randomUUID());
+
+ while (true) {
+ // create user, send 1000 movement packets, then leave
+ //System.out.println("Stress testing user " + tempUser.getName());
+ OpenAudioMc.getService(VistasRedisServer.class).getPacketEvents().handlePacket(null, new UserJoinPacket(
+ tempUser.getName(),
+ tempUser.getUuid(),
+ fakeServer1,
+ "localhost"
+ ));
+
+ for (int i = 0; i < 1000; i++) {
+ // make movement update packet
+ PacketClientUpdateVoiceLocations packet = new PacketClientUpdateVoiceLocations(new ClientVoiceUpdatePeerLocationsPayload(updates));
+ OpenAudioMc.getService(VistasRedisServer.class).getPacketEvents().handlePacket(null, new WrappedProxyPacket(
+ new ForwardSocketPacket(packet),
+ fakeServer1,
+ tempUser.getUuid()
+ ));
+ }
+
+ // user logout
+ OpenAudioMc.getService(VistasRedisServer.class).getPacketEvents().handlePacket(null, new UserLeavePacket(
+ tempUser.getName(),
+ tempUser.getUuid(),
+ fakeServer1
+ ));
+ }
+ }
+
+ private void startLoggingMemory() {
+ new Thread(() -> {
+ int max = 0;
+ while (true) {
+ int used = (int) ((Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()) / 1024 / 1024);
+ if (used > max) max = used;
+
+ System.out.println("Memory usage: " + used + "MB (max: " + max + "MB)");
+
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }).start();
}
@Getter