Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
MrNavaStar committed Dec 16, 2024
1 parent 871bf8d commit 4d6b6d0
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 96 deletions.
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ velocity_version=3.4.0-SNAPSHOT
sqlib_version=3.2.4

# Dependencies
protoweaver_version=1.3.16
protoweaver_version=1.3.17
r_version=1.0.8
# https://projectlombok.org/setup/maven
lombok_version=1.18.36
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,6 @@ public class Broker implements ProtoConnectionHandler {
@Getter private static Settings settings = new Settings();
@Getter private static ProtoConnection proxy;

private static void syncSubs() {
subscriptions.forEach((sub, callbacks) -> {
if (proxy == null || !proxy.isOpen()) outgoingMessageQueue.add(sub);
else proxy.send(sub);
});
}

private static void putTopic(Topic topic, String id, DataBundle bundle) {
topic.validate();
bundle.meta().id(id).topic(topic).action(DataBundle.Action.PUT);
Expand Down Expand Up @@ -102,7 +95,9 @@ private static void subTopic(Topic topic, Consumer<DataBundle> handler) {
HashSet<Consumer<DataBundle>> handlers = subscriptions.getOrDefault(topic, new HashSet<>());
handlers.add(handler);
subscriptions.put(topic, handlers);
syncSubs();

if (proxy == null || !proxy.isOpen()) outgoingMessageQueue.add(topic);
else proxy.send(topic);
}

public static void subTopic(String topic, Consumer<DataBundle> handler) {
Expand All @@ -115,7 +110,6 @@ public static void subGlobalTopic(String topic, Consumer<DataBundle> handler) {

public void onReady(ProtoConnection protoConnection) {
proxy = protoConnection;
syncSubs();
while (!outgoingMessageQueue.isEmpty()) proxy.send(outgoingMessageQueue.remove());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public void onToggle(WhitelistToggleEvent event) {
public void onPlayerJoin(PlayerJoinEvent event) {
ServerPlayer player = ((CraftPlayer) event.getPlayer()).getHandle();
onJoin(player);
player.valid = !player.valid;
//player.valid = !player.valid;
}

@EventHandler
Expand All @@ -63,7 +63,7 @@ public void onEnable() {
SpigotConfig.disableStatSaving = settings.syncPlayerStats;
SpigotConfig.disableAdvancementSaving = settings.syncPlayerAdvancements;
});
SynchronizedMinecraft.setPlayerCallback(player -> player.valid = !player.valid);
//SynchronizedMinecraft.setPlayerCallback(player -> player.valid = !player.valid);
getServer().getPluginManager().registerEvents(new EventListener(getDataPath()), this);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,7 @@ public DataStore getTopicStore(Topic topic) {
}

public static Optional<SyncGroup> getSyncGroup(ProtoServer server) {
System.out.println("Sync Group in: " + server);

Optional<SyncGroup> g =groups.values().stream().filter(group -> group.getServers().contains(server)).findFirst();

System.out.println(g);
return g;
return groups.values().stream().filter(group -> group.getServers().contains(server)).findFirst();
}

public static DataStore getGlobalStore(Topic topic) {
Expand Down
120 changes: 42 additions & 78 deletions proxy/src/main/java/me/mrnavastar/singularity/loader/Velocity.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package me.mrnavastar.singularity.loader;

import com.google.common.collect.Sets;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import com.google.inject.Inject;
Expand All @@ -8,7 +9,6 @@
import com.velocitypowered.api.event.proxy.ProxyInitializeEvent;
import com.velocitypowered.api.plugin.Dependency;
import com.velocitypowered.api.plugin.Plugin;
import com.velocitypowered.api.proxy.ProxyServer;
import com.velocitypowered.api.proxy.server.RegisteredServer;
import me.mrnavastar.protoweaver.api.ProtoConnectionHandler;
import me.mrnavastar.protoweaver.api.ProtoWeaver;
Expand All @@ -28,7 +28,6 @@
import java.lang.reflect.Type;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Stream;

@Plugin(
id = Constants.SINGULARITY_ID,
Expand All @@ -48,7 +47,7 @@ public class Velocity implements ProtoConnectionHandler {
private static final Type DATA_BUNDLE_TYPE = new TypeToken<HashMap<String, byte[]>>(){}.getType();
private static final SQLibType<DataBundle> DATA_BUNDLE = new SQLibType<>(GsonTypes.ELEMENT, v -> GSON.toJsonTree(v.data()), v -> new DataBundle().data(GSON.fromJson(v, DATA_BUNDLE_TYPE)));

private static final HashMap<ProtoServer, HashSet<Topic>> subs = new HashMap<>();
private static final ConcurrentHashMap<Topic, Set<ProtoServer>> subscriptions = new ConcurrentHashMap<>();
private static final ConcurrentHashMap<UUID, ProtoServer> playerLocations = new ConcurrentHashMap<>();

@Inject
Expand All @@ -68,49 +67,53 @@ public void onServerPreConnect(ServerPreConnectEvent event) {

// Send player data to subscribed servers and store player network location
s.flatMap(current -> ProtoProxy.getConnectedServer(WORMHOLE, current.getServerInfo().getAddress())).ifPresent(server -> {
System.out.println("Player joining");

if (event.getPreviousServer() == null) {
System.out.println("lets send their data to " + server);

SingularityConfig.getSyncGroup(server)
.flatMap(store -> store.getTopicStore(PLAYER_TOPIC).getContainer("id", player.toString()))
.flatMap(c -> c.get(DATA_BUNDLE, "data"))
.map(data -> data.meta(new DataBundle.Meta().id(player.toString()).topic(PLAYER_TOPIC).action(DataBundle.Action.PUT)))
.ifPresent(data -> {
server.getConnection().send(data);
System.out.println("Data : " + data);
});
.ifPresent(data -> server.getConnection().send(data));
}
else playerLocations.put(player, server);
});
}

private Optional<ProtoServer> getPlayerLocation(DataBundle bundle) {
try {
UUID player = UUID.fromString(bundle.meta().id());
return Optional.ofNullable(playerLocations.get(player));
} catch (IllegalArgumentException ignore) {
return Optional.empty();
}
}

private void storeBundle(ProtoServer server, DataBundle bundle) {
if (bundle.meta().topic().global()) {
SingularityConfig.getGlobalStore(bundle.meta().topic())
.getOrCreateDefaultContainer(JavaTypes.STRING, "id", bundle.meta().id())
.put(DATA_BUNDLE, "data", bundle);
return;
}

SingularityConfig.getSyncGroup(server)
.ifPresent(store -> store.getTopicStore(bundle.meta().topic())
.getOrCreateDefaultContainer(JavaTypes.STRING, "id", bundle.meta().id())
.put(DATA_BUNDLE, "data", bundle));
}

@Override
public void onReady(ProtoConnection connection) {
ProtoProxy.getConnectedServer(connection)
.flatMap(SingularityConfig::getSyncGroup)
.ifPresent(group -> {
connection.send(group.getSettings());
System.out.println("settings send to " + connection.getRemoteAddress());
});
.ifPresent(group -> connection.send(group.getSettings()));
}

@Override
public void handlePacket(ProtoConnection connection, Object packet) {
System.out.println("Got packet from: " + connection + " of type: " + packet);

ProtoProxy.getConnectedServer(connection).ifPresent(server -> {
System.out.println("Server " + server + " is connected, able to handle packet");

switch (packet) {
// Handle a topic subscription
case Topic sub -> {
HashSet<Topic> topics = subs.getOrDefault(server, new HashSet<>());
topics.add(sub);
subs.put(server, topics);
}

case Topic sub -> subscriptions.computeIfAbsent(sub, k -> Sets.newConcurrentHashSet()).add(server);
// Process data bundle actions such as requesting and removing data
case DataBundle.Meta meta -> {
switch (meta.action()) {
Expand All @@ -124,64 +127,25 @@ public void handlePacket(ProtoConnection connection, Object packet) {
.ifPresent(DataContainer::delete);
}
}

// Process data bundle put action and data propagation
case DataBundle bundle -> {
if (!DataBundle.Action.PUT.equals(bundle.meta().action())) return;
Topic topic = bundle.meta().topic();

Stream<ProtoServer> servers;
if (topic.global()) servers = ProtoProxy.getConnectedServers(WORMHOLE).stream();
else {
Optional<SingularityConfig.SyncGroup> group = SingularityConfig.getSyncGroup(server);
if (group.isEmpty()) return;
servers = group.get().getServers().stream();
}

System.out.println("got server list: " + servers);

// Forward data to subscribed servers
servers.filter(s -> !s.equals(server))
.filter(s -> {
DataBundle.Propagation propagation = bundle.meta().propagation();
if (propagation.equals(DataBundle.Propagation.ALL)) return true;
if (propagation.equals(DataBundle.Propagation.NONE)) return false;

// if propagation is set to PLAYER
// TODO: This is maybe a race condition
try {
UUID player = UUID.fromString(bundle.meta().id());
Optional<ProtoServer> location = Optional.ofNullable(playerLocations.get(player));

System.out.println("Player location found? : " + location.isPresent());
if (location.isPresent()) System.out.println("Player location: " + location.get());

return location.isPresent() && location.get().equals(s);
} catch (IllegalArgumentException ignore) {
return false;
}
// Run expensive lookups outside the filters
Set<ProtoServer> servers = SingularityConfig.getSyncGroup(server).map(SingularityConfig.SyncGroup::getServers).orElse(new HashSet<>());
Optional<ProtoServer> location = getPlayerLocation(bundle);

subscriptions.getOrDefault(bundle.meta().topic(), new HashSet<>()).stream() // Grab all servers subbed to this topic
.filter(s -> !Objects.equals(s, server)) // Filter out the server the packet was received from
.filter(s -> bundle.meta().topic().global() || servers.contains(s)) // Filter out any servers that aren't part of the current sync group unless the topic is global
.filter(s -> { // Filter out servers based on propagation rules
if (bundle.meta().propagation().equals(DataBundle.Propagation.ALL)) return true;
if (bundle.meta().propagation().equals(DataBundle.Propagation.NONE)) return false;
return location.isPresent() && Objects.equals(location.get(), s);
})
.filter(s -> subs.getOrDefault(s, new HashSet<>()).contains(topic))
.forEach(s -> {
System.out.println("forwarding data to:" + s);
s.getConnection().send(bundle);
});

if (!bundle.meta().persist()) return;

System.out.println("Storing packet data");

// Store data in database
if (topic.global()) {
SingularityConfig.getGlobalStore(topic)
.getOrCreateDefaultContainer(JavaTypes.STRING, "id", bundle.meta().id())
.put(DATA_BUNDLE, "data", bundle);
return;
}
.forEach(s -> s.getConnection().send(bundle));

SingularityConfig.getSyncGroup(server)
.ifPresent(store -> store.getTopicStore(topic)
.getOrCreateDefaultContainer(JavaTypes.STRING, "id", bundle.meta().id())
.put(DATA_BUNDLE, "data", bundle));
if (bundle.meta().persist()) storeBundle(server, bundle);
}
default -> WORMHOLE.logWarn("Ignoring unknown packet: " + packet);
}
Expand Down

0 comments on commit 4d6b6d0

Please sign in to comment.